25 for (i = 0; i < nthreads; i++) {
26 count[i] = 0;
27 Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
28 }
29 Pthread_create(&tid_consume, NULL, consume, NULL);
30 /* ожидание завершения производителей и потребителя */
31 for (i = 0; i < nthreads; i++) {
32 Pthread_join(tid_produce[i], NULL);
33 printf("count[%d] = %d\n", i, count[i]);
34 }
35 Pthread_join(tid_consume, NULL);
36 exit(0);
37 }
24 Мы увеличиваем уровень параллельного выполнения на единицу, чтобы учесть поток-потребитель, выполняемый параллельно с производителями.
25-29 Поток-потребитель создается сразу же после создания потоков-производителей.
Функция produce по сравнению с листингом 7.2 не изменяется. В листинге 7.4 приведен текст функции consume, вызывающей новую функцию consume_wait.
//mutex/prodcons3.с
54 void
55 consume wait(int i)
56 {
57 for (;;) {
58 Pthread_mutex_lock(&shared.mutex);
59 if (i < shared.nput) {
60 Pthread_mutex_unlock(&shared.mutex);
61 return; /* элемент готов */
62 }
63 Pthread_mutex_unlock(&shared.mutex);
64 }
65 }
66 void *
67 consume(void *arg)
68 {
69 int i;
70 for (i = 0; i < nitems; i++) {
71 consume_wait(i);
72 if (shared.buff[i] != i)
73 printf("buff[%d] = %d\n", i, shared.buff[i]);
74 }
75 return(NULL);
76 }
71 Единственное изменение в функции consume заключается в добавлении вызова consume_wait перед обработкой следующего элемента массива.
57-64 Наша функция consume_wait должна ждать, пока производители не создадут i-й элемент. Для проверки этого условия производится блокировка взаимного исключения и значение i сравнивается с индексом производителя nput. Блокировка необходима, поскольку значение nput может быть изменено одним из производителей в момент его проверки.
Главная проблема — что делать, если нужный элемент еще не готов. Все, что нам остается и что мы делаем в листинге 7.4, — это повторять операции в цикле, устанавливая и снимая блокировку и проверяя значение индекса. Это называется опросом (spinning или polling) и является лишней тратой времени процессора.
Мы могли бы приостановить выполнение процесса на некоторое время, но мы не знаем, на какое. Что нам действительно нужно — это использовать какое-то другое средство синхронизации, позволяющее потоку или процессу приостанавливать работу, пока не произойдет какое-либо событие.
7.5. Условные переменные: ожидание и сигнализация
Взаимное исключение используется для блокирования, а условная переменная — для ожидания. Это два различных средства синхронизации, и оба они нужны. Условная переменная представляет собой переменную типа pthread_cond_t. Для работы с такими переменными предназначены две функции:
#include
int pthread_cond_wait(pthread_cond_t
int pthread_cond_signal(pthread_cond_t
/* Обе функции возвращают 0 в случае успешного завершения, положительное значение Еххх – в случае ошибки */
Слово signal в имени второй функции не имеет никакого отношения к сигналам Unix SIGxxx.
Мы определяем условие, уведомления о выполнении которого будем ожидать.
Взаимное исключение всегда связывается с условной переменной. При вызове pthread_cond_wait для ожидания выполнения какого-либо условия мы указываем адрес условной переменной и адрес связанного с ней взаимного исключения.
Мы проиллюстрируем использование условных переменных, переписав пример из предыдущего раздела. В листинге 7.5 объявляются глобальные переменные.
7-13 Две переменные nput и rival ассоциируются с mutex, и мы объединяем их в структуру с именем put. Эта структура используется производителями.
14-20 Другая структура, nready, содержит счетчик, условную переменную и взаимное исключение. Мы инициализируем условную переменную с помощью PTHREAD_ COND_INITIALIZER.