В многопоточной обработке самое сложное — гарантировать сериализованный доступ к ресурсам, потому что если это сделано неправильно, отладка становится кошмаром. Поскольку многопоточная программа по своей сути недетерминирована (так как потоки могут выполняться в различной очередности и с различными квантами времени при каждом новом выполнении программы), очень трудно точно обнаружить место и способ ошибочной модификации чего-либо. Здесь еще в большей степени, чем в однопоточном программировании, надежный проект позволяет минимизировать затраты на отладку и переработку.
12.3. Уведомление одного потока другим
Используется шаблон, в котором один поток (или группа потоков) выполняет какие-то действия, и требуется сделать так, чтобы об этом узнал другой поток (или группа потоков). Может использоваться главный поток, который передает работу подчиненным потокам, или может использоваться одна группа потоков для пополнения очереди и другая для удаления данных из очереди и выполнения чего-либо полезного.
Используйте объекты mutex
и condition
, которые объявлены в condition
) для каждой ожидаемой потоками ситуации и при возникновении такой ситуации уведомлять все ее ожидающие потоки. Пример 12.4 показывает, как можно обеспечить передачу уведомлений в модели потоков «главный/подчиненные».
#include
#include
#include
#include
#include
#include
class Request { /*...*/ };
// Простой класс очереди заданий; в реальной программе вместо этого класса
// используйте std::queue
template
class JobQueue {
public:
JobQueue() {}
~JobQueue() {}
void submitJob(const T& x) {
boost::mutex::scoped_lock lock(mutex_);
list_.push_back(x);
workToBeDone_.notify_one();
}
T getJob() {
boost::mutex::scoped_lock lock(mutex_);
workToBeDone_.wait(lock); // Ждать удовлетворения этого условия, затем
// блокировать мьютекс
T tmp = list_.front();
list_.pop_front();
return(tmp);
}
private:
std::list
boost::mutex mutex_;
boost::condition workToBeDone_;
};
JobQueue
void boss() {
for (;;) {
// Получить откуда-то запрос
Request req;
myJobQueue.submitJob(req);
}
}
void worker() {
for (;;) {
Request r(myJobQueue.getJob());
// Выполнить какие-то действия с заданием...
}
}
int main() {
boost::thread thr1(boss);
boost::thread thr2(worker);
boost::thread thr3(worker);
thr1.join();
thr2.join();
thr3.join();
}
Объект условия использует мьютекс mutex
и позволяет дождаться ситуации, когда он становится заблокированным. Рассмотрим пример 12.4, в котором представлена модифицированная версии класса Queue
из примера 12.2. Я модифицировал очередь Queue
, получая более специализированную очередь, а именно JobQueue
, объекты которой являются заданиями, поступающими в очередь со стороны главного потока и обрабатываемыми подчиненными потоками.
Самое важное изменение класса JobQueue
связано переменной-членом workToBeDone_
типа condition
. Эта переменная показывает, имеется или нет задание в очереди. Когда потоку требуется получить элемент из очереди, он вызывает функцию getJob
, которая пытается захватить мьютекс и затем дожидаться возникновения новой ситуации, что реализуют следующие строки.
boost::mutex::scoped_lock lock(mutex_);
workToBeDone_.wait(lock);
Первая строка блокирует мьютекс обычным образом. Вторая строка