Многопоточная генерация#
Четыре основных распределения (random,
standard_normal, standard_exponential,
и standard_gamma) все позволяют заполнять существующие массивы
с использованием out аргумент ключевого слова. Существующие массивы должны быть смежными и хорошо себя вести (записываемыми и выровненными). В обычных условиях массивы, созданные с использованием общих конструкторов, таких как numpy.empty удовлетворит
этим требованиям.
Смотрите также
Потокобезопасность для общей информации о потокобезопасности в NumPy.
В этом примере используется concurrent.futures для заполнения массива с использованием
нескольких потоков. Потоки долгоживущие, поэтому повторные вызовы не требуют
дополнительных накладных расходов на создание потоков.
Сгенерированные случайные числа воспроизводимы в том смысле, что одно и то же начальное значение даст одинаковые результаты, при условии, что количество потоков не изменится.
from numpy.random import default_rng, SeedSequence
import multiprocessing
import concurrent.futures
import numpy as np
class MultithreadedRNG:
def __init__(self, n, seed=None, threads=None):
if threads is None:
threads = multiprocessing.cpu_count()
self.threads = threads
seq = SeedSequence(seed)
self._random_generators = [default_rng(s)
for s in seq.spawn(threads)]
self.n = n
self.executor = concurrent.futures.ThreadPoolExecutor(threads)
self.values = np.empty(n)
self.step = np.ceil(n / threads).astype(np.int_)
def fill(self):
def _fill(random_state, out, first, last):
random_state.standard_normal(out=out[first:last])
futures = {}
for i in range(self.threads):
args = (_fill,
self._random_generators[i],
self.values,
i * self.step,
(i + 1) * self.step)
futures[self.executor.submit(*args)] = i
concurrent.futures.wait(futures)
def __del__(self):
self.executor.shutdown(False)
Многопоточный генератор случайных чисел можно использовать для заполнения массива. values атрибуты показывают нулевое значение до заполнения и
случайное значение после.
In [2]: mrng = MultithreadedRNG(10000000, seed=12345)
...: print(mrng.values[-1])
Out[2]: 0.0
In [3]: mrng.fill()
...: print(mrng.values[-1])
Out[3]: 2.4545724517479104
Время, необходимое для генерации с использованием нескольких потоков, можно сравнить со временем, необходимым для генерации с использованием одного потока.
In [4]: print(mrng.threads)
...: %timeit mrng.fill()
Out[4]: 4
...: 32.8 ms ± 2.71 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
Однопоточный вызов напрямую использует BitGenerator.
In [5]: values = np.empty(10000000)
...: rg = default_rng()
...: %timeit rg.standard_normal(out=values)
Out[5]: 99.6 ms ± 222 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Выигрыш значительный, и масштабирование разумно даже для массивов, которые лишь умеренно велики. Выигрыш ещё больше при сравнении с вызовом, который не использует существующий массив из-за накладных расходов на создание массива.
In [6]: rg = default_rng()
...: %timeit rg.standard_normal(10000000)
Out[6]: 125 ms ± 309 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Обратите внимание, что если threads не установлен пользователем, он будет определен
multiprocessing.cpu_count().
In [7]: # simulate the behavior for `threads=None`, if the machine had only one thread
...: mrng = MultithreadedRNG(10000000, seed=12345, threads=1)
...: print(mrng.values[-1])
Out[7]: 1.1800150052158556