std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const& val){ return val < partition_val; });
std::list
new_lower_chunk.splice(new_lower_chunk.end(),
chunk_data, chunk_data.begin(),
divide_point);
std::future
(3)
pool.submit(std::bind(&sorter::do_sort, this,
std::move(new_lower_chunk)));
std::list
result.splice(result.end(), new_higher);
while (!new_lower.wait_for(std::chrono::seconds(0)) ==
std::future_status::timeout) {
pool.run_pending_task(); ←
(4)
}
result.splice(result.begin(), new_lower.get());
return result;
}
};
template
std::list
if (input.empty()) {
return input;
}
sorter
return s.do_sort(input);
}
Как и в листинге 8.1, реальная работа делегируется функции-члену do_sort()
шаблона класса sorter
(1), хотя в данном случае этот шаблон нужен лишь для обертывания экземпляра thread_pool
(2).
Управление потоками и задачами теперь свелось к отправке задачи пулу (3) и исполнению находящихся в очереди задач в цикле ожидания (4). Это гораздо проще, чем в листинге 8.1, где нужно было явно управлять потоками и стеком подлежащих сортировке блоков. При отправке задачи пулу мы используем функцию std::bind()
, чтобы связать указатель this
с do_sort()
и передать подлежащий сортировке блок. В данном случае мы вызываем std::move()
, чтобы данные new_lower_chunk
перемещались, а не копировались.
Мы решили проблему взаимоблокировки, возникающую из- за того, что одни потоки ждут других, но этот пул все еще далек от идеала. Отметим хотя бы, что все вызовы submit()
и run_pending_task()
обращаются к одной и той же очереди. В главе 8 мы видели, что модификация одного набора данных из разных потоков может негативно сказаться на производительности, стало быть, с этим нужно что-то делать.
9.1.4. Предотвращение конкуренции за очередь работ
Всякий раз, как поток вызывает функцию submit()
экземпляра пула потоков, он помещает новый элемент в единственную разделяемую очередь работ. А рабочие потоки постоянно извлекают элементы из той же очереди. Следовательно, по мере увеличения числа процессоров будет возрастать конкуренция за очередь работ. Это может ощутимо отразиться на производительности; даже при использовании свободной от блокировок очереди, в которой нет явного ожидания, драгоценное время может тратиться на перебрасывание кэша.
Чтобы избежать перебрасывания кэша, мы можем завести по одной очереди работ на каждый поток. Тогда каждый поток будет помещать новые элементы в свою собственную очередь и брать работы из глобальной очереди работ только тогда, когда в его очереди работ нет. В следующем листинге приведена реализация с использованием переменной типа thread_local
, благодаря которой каждый поток обладает собственной очередью работ в дополнение к глобальной.
Листинг 9.6. Пул с очередями в поточно-локальной памяти
class thread_pool {
thread_safe_queue
typedef std::queue
(1)
static thread_local std::unique_ptr
local_work_queue; ←
(2)
void worker_thread() {
local_work_queue.reset(new local_queue_type);←
(3)
while (!done) {
run_pending_task();
}
}
public:
template
std::future
submit(FunctionType f) {
typedef typename std::result_of
result_type;
std::packaged_task
std::future
if (local_work_queue) { ←
(4)
local_work_queue->push(std::move(task));
} else {
pool_work_queue.push(std::move(task)); ←
(5)
}
return res;
}
void run_pending_task() {
function_wrapper task;