В предыдущих примерах кода мы неоднократно создавали наборы потоков для тех или иных целей, но всем им было присуще одно: общее количество потоков в них было фиксированным на момент создания. Это и были статическиепулы потоков, разделяющих между собой работу приложения. Архитекторы QNX идут чуть дальше: они предоставляют инструментарий для создания пулов однотипных(с общей функцией потока) потоков, в которых конкретное число потоков может увеличиваться или уменьшаться синхронно с изменением нагрузки на приложение. Именно своим динамическимсоставом эта конструкция и отличается.
Динамический пул потоков нужен разработчикам QNX в первую очередь как инструмент построения многопоточных менеджеров ресурсов - основы построения сервисов ОС QNX. Но и помимо этой цели динамический пул потоков представляет собой мощнейшее средство для конструирования параллельных механизмов обработки.
Проиллюстрируем применение динамического пула потоков примером программного кода, который был нами описан в книге [4] в главе «Сервер TCP/IP... много серверов хороших и разных». По сути, это ретранслирующий TCP/IP-сервер, но сейчас это для нас неважно:
#include
#include
static int ls; // прослушивающий TCP-сокет
THREAD_POOL_PARAM_T* alloc(THREAD_POOL_HANDLE_T* h) {
return (THREAD_POOL_PARAM_T*)h;
}
// функция блокирования пула потоков
THREAD_POOL_PARAM_T* block(THREAD_POOL_PARAM_T* p) {
int rs = accept(ls, NULL, NULL);
if (rs < 0) errx("accept error");
return(THREAD_POOL_PARAM_T*)rs;
}
int handler(THREAD_POOL_PARAM_T* p) {
retrans((int)p);
close((int)p);
delay(250);
cout << pthread_self << flush;
return 0;
}
int main(int argc, char* argv[]) {
// создать TCP-сокет на порт
ls = getsocket(THREAD_POOL_PORT);
// создание атрибутной записи пула потоков:
thread_pool_attr_t attr;
memset(&attr, 0, sizeof(thread_pool_attr_t));
// заполнение блока атрибутов пула
/* - mm число блокированных потоков в пуле */
attr.lo_water = 3;
/* - max число блокированных потоков в пуле */
attr.hi_water = 7;
/* - инкремент шага создания потоков */
attr.increment = 2;
attr.maximum = 9;
/* - общий предел числа потоков в пуле */
attr.handle = dispatch_create;
attr.context_alloc = alloc;
attr.block_func = block;
attr.handler_func = handler;
// фактическое создание пула потоков:
void* tpp = thread_pool_create(&attr, POOL_FLAG_USE_SELF);
if (tpp == NULL) errx("create pool");
// начало функционирования пула потоков:
thread_pool_start(tpp);
// ... выполнение никогда не дойдет до этой точки!
exit(EXIT_SUCCESS);
}
В примере используются, но не определены две функции, которые не столь существенны для понимания примера сточки зрения функционирования пула:
•
errx
— реакция на ошибку выполнения с выводом сообщения и последующим аварийным завершением;
•
retrans
— прием сообщения с присоединенного TCP-сокета с последующей ретрансляцией полученного содержимого в него же.
Итак, первая особенность пула потоков в том, что мы построили многопоточный сервер, почти не прописывая собственного кода, — большую часть рутинной работы за нас сделала библиотека пула.
Приведем описание логики работы пула потоков и показанного примера на самом качественном, простейшем уровне: