pthread_create(&(Thread[3]...taskZ...);
//...
loop while(Request Queue is not empty
get request
classify request
switch(request type)
{
case X :
enqueue request to XQueue
signal Thread[1]
case Y :
enqueue request to YQueue
signal Thread[2]
case Z :
enqueue request to ZQueue
signal Thread[3]
//...
}
end loop
}
void *taskX(void *X)
{
loop
suspend until awaken by boss
loop while XQueue is not empty
dequeue request
process request
end loop
until done
{
void *taskY(void *Y)
{
loop
suspend until awaken by boss
loop while YQueue is not empty
dequeue request
process request
end loop
until done
}
void *taskZ(void *Z)
{
loop
suspend until awaken by boss
loop while (ZQueue is not empty)
dequeue request
process request
end loop
until done
}
//.. .
В листинге 4.7 управляющий поток создает N рабочих потоков (по одному для каждого типа задачи). Каждая задача связана с обработкой запросов некоторого типа В цикле событий управляющий поток извлекает запрос из очереди запросов, определяет его тип, ставит его в очередь запросов, соответствующую типу, а затем оправляет сигнал потоку, который обрабатывает запросы из этой очереди. Функции потоков также содержат циклы событий. Поток приостанавливается до тех пор, пока не получит сигнал от управляющего потока о существовании запроса в его очереди. После «пробуждения» (уже во внутреннем цикле) поток обрабатывает все запросы до тех пор, пока его очередь не опустеет.
Использование модели сети с равноправными узлами
В
Листинг 4.8. Скелет программы реализации модели равноправных потоков
pthread_t Thread[N]
// initial thread
{
pthread_create(&(Thread[1]...taskX...);
pthread_create(&(Thread[2]...taskY...);
pthread_create(&(Thread[3]...taskZ...);
//...
}
void *taskX(void *X)
{
loop while (Type XRequests are available)
extract Request
process request
end loop
return(NULL)
}
В модели равноправных потоков каждый поток отвечает за собственный входной поток данных. Входные данные могут быть выделены из базы данных, файла и т.п.
Использование модели конвейера
В модели конве йера поток входных данных обрабатывается поэтапно. На каждом этапе некоторая порция работы (часть входного потока данных) обрабатывается одним потоком выполнения, а затем передается для обработки следующему. Каждая порция входных данных переходит на очередной этап обработки до тех пор, пока не будет завершена вся обработка. Такой подход позволяет обрабатывать несколько входных потоков данных одновременно. Каждый поток выполнения отвечает за достижение пром ежуточного результата, делая его доступным для следующего этапа (т.е. следующего потока конвейера). Скелет программы реализации модели конвейера представлен в листинге 4.9.
// Листинг 4.9. Скелет программы реализации модели конвейера
//...
pthread_t Thread[N]
Queues[N]
// initial thread
{
place all input into stage1's queue
pthread_create(&(Thread[1]...stage1...);
pthread_create(&(Thread[2]...stage2...);
pthread_create(&(Thread[3]...stage3...);
//...
}
void *stageX(void *X)
{
loop
suspend until input unit is in queue
loop while XQueue is not empty
dequeue input unit
process input unit
enqueue input unit into next stage's queue
end loop
until done
return(NULL)
}