Параллельный цикл на worker. Многопоточность JS
- вторник, 15 апреля 2025 г. в 00:00:06
Мне очень нравится JavaScript своей легкостью, доступностью и функциональностью. Он перекрывает 90% всех моих потребностей в программировании. Спектр решаемых с помощью него задач огромен, и в том числе, иногда возникают задачи в которых необходимо изменить каждый элемент массива независимо от остальных элементов. Одно из типовых решений этой задачи через цикл или метод map.
//Функция, изменяющая элемент массива
function f(a) {
let a0 = 1;
for (let i = 0; i < 100_000; i++)
a0 = 0.5 * (a0 + a / a0)
return a0;
}//f(a)
//Создание и инициализация массива
let arr = new Float64Array(10_000);
for (let i in arr) {
arr[i] = i;
}
let time = new Date();
//Изменение элементов массива
arr = arr.map(f);
time = new Date() - time;
console.log(`Время вычисления: ${time} мс`);
console.log(arr);
Функция, изменяющая элемент массива принимает в качестве аргумента число, совершает с ним некоторые действия, и возвращает измененное значение. Зачастую эта функция может быть довольно "тяжелой" для примера использую алгоритм Герона вычисления квадратного корня. Как способ вычисления коней такой способ, да еще и со с 100000 итерациями нерационален, но как пример полезной нагрузки подойдет отлично.
Создание и инициализация массива. В JS есть довольно много вариантов создания массива. Для дальнейших действий и подобных вычислений лучше всего подходит типизированный массив. Массив заполняется порядковыми номерами, для имитации полезных данных.
Изменение элементов массива делаю методом map. В этом месте может также находится цикл for...in. Для оценки производительности замеряю время в миллисекундах.
Приведенный выше код работает в "однопоточном режиме" и не может загрузить 8-и ядерный процессор моего компьютера более чем на 25%. Запустить более одно потока в JS можно или используя методы работы с GPU, или применив Worker. Сегодня речь пойдет о Worker.
В интернете можно найти очень много примеров для работы с Worker, но почти все они не будут работать в контексте локальных html файлов. В большинстве примеров код Worker-а пишется в отдельном файле и подключается к основной страницы через URL. Но при работе с локальными файлами это невозможно, да и не всегда удобно создавать дополнительный файл. Код показанный ниже можно сохранить в файл с расширением .html и открыть в браузере.
<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><script>
//Функция, содержащая код Worker
function worker_function() {
self.onmessage = (event) => self.postMessage("Привет из worker_function");
}//worker_function
//Создание Worker
worker = new Worker(
URL.createObjectURL(
new Blob(
["(" + worker_function.toString() + ")()"],
{ type: 'text/javascript' }
)
)
);
//Передача сообщения к Worker
worker.postMessage("");
//Получение сообщение от Worker
worker.onmessage = (event) => {
console.log(event.data);
//Уничтожение worker
worker.terminate();
}
</script></head><body></body></html>
Я намеренно не форматирую и не акцентирую внимание на html, так как цель этого кода показать как работает worker.
Функция, содержащая код Worker. Все, что содержится в этой функции, следует воспринимать как отдельный файл JS и будет исполняться независимо от остального кода в своем лексическом окружении.
Создание Worker. Конструктор Worker принимает в качестве аргумента URL с кодом Worker. Для создания такого URL я использую Blob, содержащий текстовую строку из функции worker_function.
Передача сообщения к Worker, Получение сообщение от Worker и Уничтожение worker. Алгоритм работы с Worker описан неоднократно, но я приведу его еще раз для конкретно моего случая:
В коде основной программы создается Worker с помощью конструктора (строка 8). в этот момент компьютер создает "песочницу" в которой исполняется код из указанного URL. Можно представить, что браузер открывает еще одну вкладку, но только у этой вкладки нет окна.
В коде основной программы вызывается метод postMessage (строка 18) аргументом которого являются данные для Worker. Здесь данными является строка, но возможны и более сложные структуры. Этот метод генерирует в "песочнице" событие message.
Данные переходят обработчику события message (строка 4). Обработчик вызывает метод postMessage, который в основном коде вызывает событие worker.onmessage. postMessage и onmessage основной страницы и Worker работают абсолютно одинокого.
Запускается обработчик worker.onmessage на основной странице (строка 21) . В аргументе event содержится довольно много информации, но для примера можно получить полезные данные из свойства data. Эту информацию я вывожу в консоль.
Когда Worker сделал свою работу и не планируется дальнейшие его использование он должен быть уничтожен (строка 25) . В противном случае он останется в оперативной памяти и будет занимать место и ресурсы.
На первый взгляд алгоритм может показаться сложным и запутанным, но в конечном итоге это можно представить таким образом: Я нанимаю работника и даю ему инструкции (строка 8). Звоню работнику с заданием (строка 18), а он берет трубку (строка 4) и работает. Когда работа сделана он звонит мне и сообщает результаты работы (строка 21). После получения результатов работы я увольняю работника (строка 25).
Один работник это конечно хорошо, но для ускорения работы надо нанимать работников больше. Чуть позже сделаем код который автоматически создает столько Worker-ов, сколько нам нужно, а пока сделаем это вручную:
//Функция, содержащая код Worker
function worker_function() {
self.onmessage = (event) => {
setTimeout(
() => self.postMessage(event.data + "\nПривет из worker_function"),
Math.random() * 1000
)//setTimeout
}//self.onmessage
}//worker_function
//Создание Worker
worker1 = new Worker(URL.createObjectURL(new Blob(["(" + worker_function.toString() + ")()"], { type: 'text/javascript' })));
worker2 = new Worker(URL.createObjectURL(new Blob(["(" + worker_function.toString() + ")()"], { type: 'text/javascript' })));
//Передача сообщения к Worker
worker1.postMessage("Сообщение для worker1");
worker2.postMessage("Сообщение для worker2");
//Получение сообщения от Worker
async function getdata(worker) {
return new Promise((resolve, reject) => {
worker.onmessage = (event) => { console.log(event.data); resolve() };
}
)//Promise
}//getdata
(async () => {
await Promise.all([getdata(worker1), getdata(worker2)])
worker1.terminate();
worker2.terminate();
})()//async
В коде произошли некоторые изменения и давайте разберем их по порядку.
Worker теперь не мгновенно выдает ответ, а через случайное время (строки 2-8). Так как каждый worker работает независимо друг от друга невозможно предугадать кто раньше закончит работу, для усиления этого эффекта я добавил функцию setTimeout.
Теперь я создаю два worker и обоим даю разные данные. Здесь нет каких либо отличий от предыдущего примера.
Получение сообщения от Worker теперь немного сложнее. Обрабатывать данные полученные от Worker в функции-обработчике это хорошо, но еще нужно остановить выполнение основного кода до завершения работы всех Worker-ов. С этим мне помогает Promise и его метод all. Мне эта часть кода не нравиться, но пока не придумал ничего лучше будет написано так. Основой код программы теперь работает асинхронно и его пришлось обернуть в анонимную асинхронную функцию.
Подведем промежуточный итог. Нам удалось одновременно запустить два Worker, а после того как оба из них завершили работу проложить выполнение программы. Это большая победа и прямой путь к написанию собственной функции, работающий как метод map, но использующий многопоточность.
Теперь мы обладаем почти достаточными навыками чтобы сделать циклы многопоточными. Код решения представлен ниже:
async function map_worker(
arr,//Массив данных
f,//Функция обработки данных
thread = 8,//Кол-во потоков
) {
//Функция содержащая код Worker
function worker_function() {
self.onmessage = function (event) {
let buf = new Float64Array(event.data.buf);
for (let i in buf) {
buf[i] = f(buf[i]);
}
buf = buf.buffer;
self.postMessage({ buf, start: event.data.start }, [buf]);
};
}//worker_function
//Получение сообщения от Worker
async function getdata(worker) {
return new Promise(
function (resolve, reject) {
worker.onmessage = function (event) {
arr.set(new Float64Array(event.data.buf), event.data.start);
resolve();
}
}
)
}//getdata
let worker = [];
let arr_resolve = [];
let buf_len = Math.floor(arr.length / thread);
for (let i = 0; i < thread; i++) {
//Создание Worker
worker[i] = new Worker(URL.createObjectURL(new Blob(["(" + worker_function.toString() + ")(); let f = " + f.toString()], { type: 'text/javascript' })));
//Разбиение arr
let start = i * buf_len;
let buf = arr.slice(start, i == thread - 1 ? arr.length : start + buf_len).buffer;
//Передача сообщения к Worker
worker[i].postMessage({ start, buf }, [buf],);
//Создание функции получения сообщения от worker
arr_resolve[i] = getdata(worker[i]);
}
//Ожидание завершения работы всех worker
await Promise.all(arr_resolve);
//Удаление worker
for (let i = 0; i < thread; i++)
worker[i].terminate();
return arr;
}//map_worker
(async () => {
//Функция, изменяющая элемент массива
function f(a) {
let a0 = 1;
for (let i = 0; i < 100_000; i++)
a0 = 0.5 * (a0 + a / a0)
return a0;
}//f(a)
//Создание и инициализация массива
let arr = new Float64Array(10_000);
for (let i in arr) {
arr[i] = i;
}
let time = new Date();
//Изменение элементов массива
arr = await map_worker(arr, f);
time = new Date() - time;
console.log(`Время вычисления: ${time} мс`);
console.log(arr);
})()//asynca
Функция map_worker работает подобно встроенному методу map, но использует при этом ресурсы компьютера полностью (загрузка процессора 100%). Большая часть кода уже разобрана, однако и здесь есть небольшие хитрости.
При создании worker в строку с URL передается функция для обработки данных.
В методе postMessage появилось два аргумента. Первый это объект, содержащий фрагмент массива и его положение в исходном массиве. А второй -- массив элементом которого является фрагмент массива. Выглядит странно, но при таком синтаксисе передача данных осуществляется мгновенно. И именно поэтому я использую типизированные массивы. Подробнее об этом можно почитать в документации.
Предыдущий пункт обязывает передавать в worker ArrayBuffer, поэтому в коде добавляется преобразование типизированного массива в голый поток байтов.
В заключении протестируем насколько эффективен данный метод. В таблице каждая ячейка содержит время выполнения кода в мс с использованием того или иного метода. Во второй строке указано во сколько раз быстрее происходит вычисление по сравнению с Array.map.
Размер массива | Array.map | for...in | worker_1 | worker_4 | worker_8 | worker_10 |
1000 | 573 | 564 | 588 | 176 | 167 | 137 |
10000 | 5729 | 5113 | 5169 | 1562 | 905 | 935 |
100000 | 57942 | 50268 | 52564 | 14186 | 7556 | 7562 |
1000000 | 572235 | 561455 | 507012 | 160719 | 75393 | 85513 |
Из таблицы видно, максимальное ускорение получается при использовании 8-ми worker, что соответствует количеству ядер моего процессора, при этом ускорение составляет порядка 7.6 раз. Использование другого числа потоков дает более плохие результаты. Также цикл for...in работает чуть быстрее Array.map.