message_fill(&msg, ithread, ithread, parg->work_done);
/* Поместить сообщение в очередь. */
tstatus = q_put(&p2tq, &msg, sizeof(msg), INFINITE);
parg->work_done++;
}
return 0;
}
DWORD WINAPI transmitter(PVOID arg) {
/* Получись несколько сообщений от производителя, объединяя их в одно*/
/* составное сообщение, предназначенное для принимающего потока. */
DWORD tstatus, im;
t2r_msg_t t2r_msg = {0};
msg_block_t p2t_msg;
while (!ShutDown) {
t2r_msg.num_msgs = 0;
/* Упаковать сообщения для передачи принимающему потоку. */
for (im = 0; im < TBLOCK_SIZE; im++) {
tstatus = q_get(&p2tq, &p2t_msg, sizeof(p2t_msg), INFINITE);
if (tstatus != 0) break;
memcpy(&t2r_msg.messages[im], &p2t_msg, sizeof(p2t_msg));
t2r_rasg.num_msgs++;
}
tstatus = q_put(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);
if (tstatus != 0) return tstatus;
}
return 0;
}
DWORD WINAPI receiver(PVOID arg) {
/* Получить составные сообщения от передающего потока; распаковать */
/* их и передать соответствующему потребителю. */
DWORD tstatus, im, ic;
t2r_msg_t t2r_msg;
msg_block_t r2c_msg;
while (!ShutDown) {
tstatus = q_get(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);
if (tstatus != 0) return tstatus;
/* Распределить сообщения между соответствующими потребителями. */
for (im = 0; im < t2r_msg.num_msgs; im++) {
memcpy(&r2c_msg, &t2r_msg.messages[im], sizeof(r2c_msg));
ic = r2c_msg.destination; /* Конечный потребитель. */
tstatus = q_put(&r2cq_array[ic], &r2c_msg, sizeof(r2c_msg), INFINITE);
if (tstatus != 0) return tstatus;
}
}
return 0;
}
DWORD WINAPI consumer(PVOID arg) {
THARG * carg;
DWORD tstatus, ithread;
msg_block_t msg;
queue_t *pr2cq;
carg = (THARG *)arg;
ithread = carg->thread_number;
carg = (THARG *)arg;
pr2cq = &r2cq_array[ithread];
while (carg->work_done < carg->work_goal) {
/* Получить и отобразить (необязательно — не показано) сообщения. */
tstatus = q_get(pr2cq, &msg, sizeof(msg), INFINITE);
if (tstatus != 0) return tstatus;
carg->work_done++;
}
return 0;
}
Комментарии по поводу многоступенчатого конвейера
Данная реализация характеризуется некоторыми особенностями, суть которых частично отражена в комментариях, включенных в листинг программы. На эти же особенности обращают ваше внимание и упражнения 10.6, 10.7 и 10.10.
• Значительные возражения вызывает способ, используемый основным потоком для завершения выполнения передающего и принимающего потоков. Лучшим решением было бы использование конечных интервалов ожидания во внутренних циклах передатчика и приемника и прекращение работы после того, как будет установлен соответствующий глобальный флаг. Другой возможный подход заключается в отмене выполнения потоков, как описано далее в этой главе.
• Обратите внимание на существование симметрии между передающим и принимающим потоками. Как и при реализации очереди, это обстоятельство упрощает проектирование, отладку и сопровождение программы.
• Реализация не сбалансирована в смысле согласования скорости генерации сообщений, емкости конвейера и коэффициента блокирования "передатчик/приемник".