В стандартной библиотеке С++ есть функция std::thread::hardware_concurrency()
, которая поможет нам решить эту задачу. Она возвращает число потоков, которые могут работать по-настоящему параллельно. В многоядерной системе это может быть, например, количество процессорных ядер. Возвращаемое значение всего лишь оценка; более того, функция может возвращать 0, если получить требуемую информацию невозможно. Однако эту оценку можно с пользой применить для разбиения задачи на несколько потоков.
В листинге 2.8 приведена простая реализация параллельной версии std::accumulate
. Она распределяет работу между несколькими потоками и, чтобы не создавать слишком много потоков, задает ограничение снизу на количество элементов, обрабатываемых одним потоком. Отмстим, что в этой реализации предполагается, что ни одна операция не возбуждает исключений, хотя в принципе исключения возможны; например, конструктор std::thread
возбуждает исключение, если не может создать новый поток. Но если добавить в этот алгоритм обработку исключений, он перестанет быть таким простым; эту тему мы рассмотрим в главе 8.
Листинг 2.8. Наивная реализация параллельной версии алгоритма std::accumulate
template
struct accumulate_block {
void operator()(Iterator first, Iterator last, T& result) {
result = std::accumulate(first, last, result);
}
};
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length) ←
(1)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length+min_per_thread - 1) / min_per_thread; ←
(2)
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads = ←
(3)
std::min(
hardware.threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads; ←
(4)
std::vector
std::vector
(5)
Iterator block_start = first;
for(unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size); ←
(6)
threads[i] = std::thread( ←
(7)
accumulate_block
block_start, block_end, std::ref(results(i)));
block_start = block_end; ←
(8)
}
accumulate_block()(
block_start, last, results[num_threads-1]); ←
(9)
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join)); ←
(10)
return
std::accumulate(results.begin(), results.end(), init); ←
(11)
}
Хотя функция довольно длинная, по существу она очень проста. Если входной диапазон пуст (1), то мы сразу возвращаем начальное значение init
. В противном случае диапазон содержит хотя бы один элемент, поэтому мы можем разделить количество элементов на минимальный размер блока и получить максимальное число потоков (2).
Это позволит избежать создания 32 потоков на 32-ядерной машине, если диапазон состоит всего из пяти элементов.
Число запускаемых потоков равно минимуму из только что вычисленного максимума и количества аппаратных потоков (3): мы не хотим запускать больше потоков, чем может поддержать оборудование (это называется std::thread::hardware_concurrency()
вернула 0, то мы берем произвольно выбранное число, я решил остановиться на 2. Мы не хотим запускать слишком много потоков, потому что на одноядерной машине это только замедлило бы программу. Но и слишком мало потоков тоже плохо, так как это означало бы отказ от возможного параллелизма.