69 if (shared.buff[i % NBUFF] != i)
70 printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
71 Sem_post(&shared.mutex);
72 Sem_post(&shared.nempty); /* еще одно пустое поле */
73 }
74 return(NULL);
75 }
Условие завершения единственного потока-потребителя звучит просто: он считает все потребленные объекты и останавливается по достижении nitems.
10.10. Несколько производителей, несколько потребителей
Следующее изменение, которое мы внесем в нашу пpoгрaммy, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей — зависит от приложения. Автор видел два примера, в которых использовался этот метод.
1. Пpoгрaммa преобразования IP-адресов в имена узлов. Каждый потребитель берет IP-адрес, вызывает gethostbyaddr (раздел 9.6 [24]), затем дописывает имя узла к файлу. Поскольку каждый вызов gethostbyaddr обрабатывается неопределенное время, порядок IP-адресов в буфере будет, скорее всего, отличаться от порядка имен узлов в файле, созданном потоками-потребителями. Преимущество этой схемы в параллельности выполнения вызовов gethostbyaddr (каждый из которых может работать несколько секунд) — по одному на каждый поток-потребитель.
ПРИМЕЧАНИЕ
Предполагается наличие версии gethostbyaddr, допускающей многократное вхождение, что не всегда верно. Если эта версия недоступна, можно хранить буфер в разделяемой памяти и использовать процессы вместо потоков.
2. Программа, принимающая дейтаграммы UDP, обрабатывающая их и записывающая результат в базу данных. Каждая дeйтaгрaммa обрабатывается одним потоком-потребителем, которые выполняются параллельно для ускорения процесса. Хотя дейтаграммы записываются в базу данных в порядке, вообще говоря, отличном от порядка их приема, встроенная схема упорядочения записей в базе данных справляется с этой проблемой.
В листинге 10.15 приведены глобальные переменные программы.
//pxsem/prodcons4.с
1 #include "unpipc.h"
2 #define NBUFF 10
3 #define MAXNTHREADS 100
4 int nitems, nproducers, nconsumers; /* только для чтения */
5 struct { /* общие данные производителей и потребителей */
6 int buff[NBUFF];
7 int nput; /* номер объекта: 0, 1. 2, … */
8 int nputval; /* сохраняемое в buff[] значение */
9 int nget; /* номер объекта: 0, 1, 2, … */
10 int ngetval; /* получаемое из buff[] значение */
11 sem_t mutex, nempty, nstored; /* семафоры, а не указатели */
12 } shared;
13 void *produce(void *), *consume(void *);
4-12 Количество потоков-потребителей является глобальной переменной, устанавливаемой из командной строки. В структуру shared добавилось два новых поля: nget — номер следующего объекта, получаемого одним из потоков-потребителей, и ngetval — соответствующее значение.
Функция main, текст которой приведен в листинге 10.16, запускает несколько потоков-потребителей и потоков-производителей одновременно.
19-23 Новый аргумент командной строки указывает количество потоков-потребителей. Для хранения идентификаторов потоков-потребителей выделяется место под специальный массив (tid_consume), а для подсчета обработанных каждым потоком объектов выделяется массив conscount.
24-50 Создаются несколько потоков-производителей и потребителей, после чего основной поток ждет их завершения.
//pxsem/prodcons4.с
14 int
15 main(int argc, char **argv)
16 {
17 int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];
18 pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];
19 if (argc != 4)
20 err_quit("usage: prodcons4 <#items> <#producers> <#consumers>");