javascript

Параллельный цикл на worker. Многопоточность JS

  • вторник, 15 апреля 2025 г. в 00:00:06
https://habr.com/ru/articles/900488/

Мне очень нравится JavaScript своей легкостью, доступностью и функциональностью. Он перекрывает 90% всех моих потребностей в программировании. Спектр решаемых с помощью него задач огромен, и в том числе, иногда возникают задачи в которых необходимо изменить каждый элемент массива независимо от остальных элементов. Одно из типовых решений этой задачи через цикл или метод map.

Пример программы с простым циклом

//Функция, изменяющая элемент массива
function f(a) {
  let a0 = 1;
  for (let i = 0; i < 100_000; i++)
    a0 = 0.5 * (a0 + a / a0) 
  return a0;
}//f(a)
        

//Создание и инициализация массива 
let arr = new Float64Array(10_000);
for (let i in arr) {
  arr[i] = i;
}


let time = new Date();

//Изменение элементов массива 
arr = arr.map(f);

time = new Date() - time;
console.log(`Время вычисления: ${time} мс`);
console.log(arr);

Функция, изменяющая элемент массива принимает в качестве аргумента число, совершает с ним некоторые действия, и возвращает измененное значение. Зачастую эта функция может быть довольно "тяжелой" для примера использую алгоритм Герона вычисления квадратного корня. Как способ вычисления коней такой способ, да еще и со с 100000 итерациями нерационален, но как пример полезной нагрузки подойдет отлично.

Создание и инициализация массива. В JS есть довольно много вариантов создания массива. Для дальнейших действий и подобных вычислений лучше всего подходит типизированный массив. Массив заполняется порядковыми номерами, для имитации полезных данных.

Изменение элементов массива делаю методом map. В этом месте может также находится цикл for...in. Для оценки производительности замеряю время в миллисекундах.

Приведенный выше код работает в "однопоточном режиме" и не может загрузить 8-и ядерный процессор моего компьютера более чем на 25%. Запустить более одно потока в JS можно или используя методы работы с GPU, или применив Worker. Сегодня речь пойдет о Worker.

Worker. введение

В интернете можно найти очень много примеров для работы с Worker, но почти все они не будут работать в контексте локальных html файлов. В большинстве примеров код Worker-а пишется в отдельном файле и подключается к основной страницы через URL. Но при работе с локальными файлами это невозможно, да и не всегда удобно создавать дополнительный файл. Код показанный ниже можно сохранить в файл с расширением .html и открыть в браузере.

<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><script>
//Функция, содержащая код Worker
function worker_function() {
    self.onmessage = (event) => self.postMessage("Привет из worker_function");
}//worker_function

//Создание Worker
worker = new Worker(
  URL.createObjectURL(
    new Blob(
      ["(" + worker_function.toString() + ")()"],
      { type: 'text/javascript' }
    )
  )
);

//Передача сообщения к Worker
worker.postMessage("");

//Получение сообщение от Worker
worker.onmessage = (event) => {  
    console.log(event.data);

    //Уничтожение worker
    worker.terminate();
}

</script></head><body></body></html>

Я намеренно не форматирую и не акцентирую внимание на html, так как цель этого кода показать как работает worker.

Функция, содержащая код Worker. Все, что содержится в этой функции, следует воспринимать как отдельный файл JS и будет исполняться независимо от остального кода в своем лексическом окружении.

Создание Worker. Конструктор Worker принимает в качестве аргумента URL с кодом Worker. Для создания такого URL я использую Blob, содержащий текстовую строку из функции worker_function.

Передача сообщения к Worker, Получение сообщение от Worker и Уничтожение worker. Алгоритм работы с Worker описан неоднократно, но я приведу его еще раз для конкретно моего случая:

  1. В коде основной программы создается Worker с помощью конструктора (строка 8). в этот момент компьютер создает "песочницу" в которой исполняется код из указанного URL. Можно представить, что браузер открывает еще одну вкладку, но только у этой вкладки нет окна.

  2. В коде основной программы вызывается метод postMessage (строка 18) аргументом которого являются данные для Worker. Здесь данными является строка, но возможны и более сложные структуры. Этот метод генерирует в "песочнице" событие message.

  3. Данные переходят обработчику события message (строка 4). Обработчик вызывает метод postMessage, который в основном коде вызывает событие worker.onmessage. postMessage и onmessage основной страницы и Worker работают абсолютно одинокого.

  4. Запускается обработчик worker.onmessage на основной странице (строка 21) . В аргументе event содержится довольно много информации, но для примера можно получить полезные данные из свойства data. Эту информацию я вывожу в консоль.

  5. Когда Worker сделал свою работу и не планируется дальнейшие его использование он должен быть уничтожен (строка 25) . В противном случае он останется в оперативной памяти и будет занимать место и ресурсы.

На первый взгляд алгоритм может показаться сложным и запутанным, но в конечном итоге это можно представить таким образом: Я нанимаю работника и даю ему инструкции (строка 8). Звоню работнику с заданием (строка 18), а он берет трубку (строка 4) и работает. Когда работа сделана он звонит мне и сообщает результаты работы (строка 21). После получения результатов работы я увольняю работника (строка 25).

Создание второго Worker

Один работник это конечно хорошо, но для ускорения работы надо нанимать работников больше. Чуть позже сделаем код который автоматически создает столько Worker-ов, сколько нам нужно, а пока сделаем это вручную:

Скрытый текст
//Функция, содержащая код Worker
function worker_function() {
  self.onmessage = (event) => {
    setTimeout(
      () => self.postMessage(event.data + "\nПривет из worker_function"),
      Math.random() * 1000
    )//setTimeout
  }//self.onmessage
}//worker_function

//Создание Worker
worker1 = new Worker(URL.createObjectURL(new Blob(["(" + worker_function.toString() + ")()"], { type: 'text/javascript' })));
worker2 = new Worker(URL.createObjectURL(new Blob(["(" + worker_function.toString() + ")()"], { type: 'text/javascript' })));

//Передача сообщения к Worker
worker1.postMessage("Сообщение для worker1");
worker2.postMessage("Сообщение для worker2");

//Получение сообщения от Worker
async function getdata(worker) {
  return new Promise((resolve, reject) => {
    worker.onmessage = (event) => { console.log(event.data); resolve() };
  }
  )//Promise
}//getdata


(async () => {
  await Promise.all([getdata(worker1), getdata(worker2)])
  worker1.terminate();
  worker2.terminate();
})()//async

В коде произошли некоторые изменения и давайте разберем их по порядку.

Worker теперь не мгновенно выдает ответ, а через случайное время (строки 2-8). Так как каждый worker работает независимо друг от друга невозможно предугадать кто раньше закончит работу, для усиления этого эффекта я добавил функцию setTimeout.

Теперь я создаю два worker и обоим даю разные данные. Здесь нет каких либо отличий от предыдущего примера.

Получение сообщения от Worker теперь немного сложнее. Обрабатывать данные полученные от Worker в функции-обработчике это хорошо, но еще нужно остановить выполнение основного кода до завершения работы всех Worker-ов. С этим мне помогает Promise и его метод all. Мне эта часть кода не нравиться, но пока не придумал ничего лучше будет написано так. Основой код программы теперь работает асинхронно и его пришлось обернуть в анонимную асинхронную функцию.

Подведем промежуточный итог. Нам удалось одновременно запустить два Worker, а после того как оба из них завершили работу проложить выполнение программы. Это большая победа и прямой путь к написанию собственной функции, работающий как метод map, но использующий многопоточность.

Нужно больше Worker

Теперь мы обладаем почти достаточными навыками чтобы сделать циклы многопоточными. Код решения представлен ниже:

Скрытый текст
async function map_worker(
    arr,//Массив данных
    f,//Функция обработки данных
    thread = 8,//Кол-во потоков
) {
    //Функция содержащая код Worker
    function worker_function() {
        self.onmessage = function (event) {
            let buf = new Float64Array(event.data.buf);

            for (let i in buf) {
                buf[i] = f(buf[i]);
            }
            buf = buf.buffer;
            self.postMessage({ buf, start: event.data.start }, [buf]);
        };

    }//worker_function

    //Получение сообщения от Worker
    async function getdata(worker) {
        return new Promise(
            function (resolve, reject) {
                worker.onmessage = function (event) {
                    arr.set(new Float64Array(event.data.buf), event.data.start);
                    resolve();
                }
            }
        )
    }//getdata

    let worker = [];
    let arr_resolve = [];
    let buf_len = Math.floor(arr.length / thread);

    for (let i = 0; i < thread; i++) {
        //Создание Worker
        worker[i] = new Worker(URL.createObjectURL(new Blob(["(" + worker_function.toString() + ")(); let f = " + f.toString()], { type: 'text/javascript' })));

        //Разбиение arr
        let start = i * buf_len;
        let buf = arr.slice(start, i == thread - 1 ? arr.length : start + buf_len).buffer;

        //Передача сообщения к Worker
        worker[i].postMessage({ start, buf }, [buf],);

        //Создание функции получения сообщения от worker
        arr_resolve[i] = getdata(worker[i]);
    }

    //Ожидание завершения работы всех worker
    await Promise.all(arr_resolve);

    //Удаление worker
    for (let i = 0; i < thread; i++)
        worker[i].terminate();

    return arr;
}//map_worker


(async () => {

    //Функция, изменяющая элемент массива
    function f(a) {
        let a0 = 1;
        for (let i = 0; i < 100_000; i++)
            a0 = 0.5 * (a0 + a / a0)
        return a0;
    }//f(a)


    //Создание и инициализация массива 
    let arr = new Float64Array(10_000);
    for (let i in arr) {
        arr[i] = i;
    }

    let time = new Date();

    //Изменение элементов массива 
    arr = await map_worker(arr, f);

    time = new Date() - time;
    console.log(`Время вычисления: ${time} мс`);
    console.log(arr);

})()//asynca

Функция map_worker работает подобно встроенному методу map, но использует при этом ресурсы компьютера полностью (загрузка процессора 100%). Большая часть кода уже разобрана, однако и здесь есть небольшие хитрости.

  • При создании worker в строку с URL передается функция для обработки данных.

  • В методе postMessage появилось два аргумента. Первый это объект, содержащий фрагмент массива и его положение в исходном массиве. А второй -- массив элементом которого является фрагмент массива. Выглядит странно, но при таком синтаксисе передача данных осуществляется мгновенно. И именно поэтому я использую типизированные массивы. Подробнее об этом можно почитать в документации.

  • Предыдущий пункт обязывает передавать в worker ArrayBuffer, поэтому в коде добавляется преобразование типизированного массива в голый поток байтов.

Заключение

В заключении протестируем насколько эффективен данный метод. В таблице каждая ячейка содержит время выполнения кода в мс с использованием того или иного метода. Во второй строке указано во сколько раз быстрее происходит вычисление по сравнению с Array.map.

Размер массива

Array.map

for...in

worker_1

worker_4

worker_8

worker_10

1000

573
1

564
1.02

588
0.97

176
3.26

167
3.43

137
4.18

10000

5729
1

5113
1.12

5169
1.11

1562
3.67

905
6.33

935
6.13

100000

57942
1

50268
1.15

52564
1.1

14186
4.1

7556
7.68

7562
7.66

1000000

572235
1

561455
1.02

507012
1.2

160719
3.5

75393
7.59

85513
6.69

Из таблицы видно, максимальное ускорение получается при использовании 8-ми worker, что соответствует количеству ядер моего процессора, при этом ускорение составляет порядка 7.6 раз. Использование другого числа потоков дает более плохие результаты. Также цикл for...in работает чуть быстрее Array.map.