Одна из классических задач на синхронизацию называется задачей производителя и потребителя. Она также известна как задача ограниченного буфера. Один или несколько производителей (потоков или процессов) создают данные, которые обрабатываются одним или несколькими потребителями. Эти данные передаются между производителями и потребителями с помощью одной из форм IPC.
С этой задачей мы регулярно сталкиваемся при использовании каналов Unix. Команда интерпретатора, использующая канал
grep pattern chapters.* | wc -l
является примером такой задачи. Программа grep выступает как производитель (единственный), a wc — как потребитель (тоже единственный). Канал используется как форма IPC. Требуемая синхронизация между производителем и потребителем обеспечивается ядром, обрабатывающим команды write производителя и read покупателя. Если производитель опережает потребителя (канал переполняется), ядро приостанавливает производителя при вызове write, пока в канале не появится место. Если потребитель опережает производителя (канал опустошается), ядро приостанавливает потребителя при вызове read, пока в канале не появятся данные.
Такой тип синхронизации называется неявным; производитель и потребитель не знают о том, что синхронизация вообще осуществляется. Если бы мы использовали очередь сообщений Posix или System V в качестве средства IPC между производителем и потребителем, ядро снова взяло бы на себя обеспечение синхронизации.
При использовании разделяемой памяти как средства IPC производителя и потребителя, однако, требуется использование какого-либо вида явной синхронизации. Мы продемонстрируем это на использовании взаимного исключения. Схема рассматриваемого примера изображена на рис. 7.1.
В одном процессе у нас имеется несколько потоков-производителей и один поток-потребитель. Целочисленный массив buff содержит производимые и потребляемые данные (данные совместного пользования). Для простоты производители просто устанавливают значение buff[0] в 0, buff [1] в 1 и т.д. Потребитель перебирает элементы массива, проверяя правильность записей.
В этом первом примере мы концентрируем внимание на синхронизации между отдельными потоками-производителями. Поток-потребитель не будет запущен, пока все производители не завершат свою работу. В листинге 7.1 приведена функция main нашего примера.
Рис. 7.1. Производители и потребитель
//mutex/prodcons2.с
1 #include "unpipc.h"
2 #define MAXNITEMS 1000000
3 #define MAXNTHREADS 100
4 int nitems; /* только для чтения потребителем и производителем */
5 struct {
6 pthread_mutex_t mutex;
7 int buff[MAXNITEMS];
8 int nput;
9 int nval;
10 } shared = {
11 PTHREAD_MUTEX_INITIALIZER
12 };
13 void *produce(void *), *consume(void *);
14 int
15 main(int argc, char **argv)
16 {
17 int i, nthreads, count[MAXNTHREADS];
18 pthread_t tid_produce[MAXNTHREADS], tid_consume;
19 if (argc != 3)
20 err_quit("usage: prodcons2 #items #threads");
21 nitems = min(atoi(argv[1]), MAXNITEMS);
22 nthreads = min(atoi(argv[2]), MAXNTHREADS);
23 Set_concurrency(nthreads);
24 /* запуск всех потоков-производителей */
25 for (i = 0; i nthreads; i++) {
26 count[i] = 0;
27 Pthread_create(tid_produce[i], NULL, produce, count[i]);
28 }
29 /* ожидание завершения всех производителей */
30 for (i = 0; i nthreads; i++) {
31 Pthread_join(tid_produce[i], NULL);
32 printf("count[%d] = %d\n", i, count[i]);
33 }
34 /* запуск и ожидание завершения потока-потребителя */
35 Pthread_create(tid_consume, NULL, consume, NULL);
36 Pthread_join(tid_consume, NULL);
37 exit(0);
38 }