Передачи данных в воркеры JavaScript
- четверг, 13 ноября 2025 г. в 00:00:07
В предыдущей статье мы подробно рассмотрели настойку воркеров через аргументы передаваемые в конструктор класса Worker. В данной статье мы рассмотрим нюансы обмена данными между потоками. Все примеры писались под Node.js, но многое их описанного будет работать в браузерном JavaScript.
Как я уже писал в одной из статей, потоки воркеров абсолютно автономны и могут влиять на работу друг, друга, только через обмен сообщениями. У потоков в JavaScript нет общей памяти, как у их собратьев например в Java. Такое ограничение не даёт им войти в состояние гонки, а следовательно разработчику, при работе с воркерами не приходится об этом беспокоиться. В Node.js имеются способы позволяющие обойти эти ограничения, но подробнее об этом ниже.
И так, как передать объект из одного воркера, в другой если у них нет общей памяти? При передаче объектов между переменными, например как в этом случае:
let Student = new Student()
let Applicant = StudentПросто копируется ссылка на один и тот-же объект в памяти, которая общая для всего скрипта. А как быть когда этой общей памяти нет, как в случае с воркерами? Тут JS использует алгоритм структурного клонирования!
Данный алгоритм появился ещё в спецификации HTML 5 и предназначен для сериализации объектов со сложной структурой. В отличии от старой, доброй JSON сериализации, через метод JSON.stringify(), он позволяет копировать методы и вложенные объекты, рекурсивно обходя исходный объект. В результате работы алгоритма, получается точная копия объекта. Алгоритм подходит для работы с типами:
Map;
Set;
Date;
Типизированные массивы ArrayBuffer;
RegExp.
Данный алгоритм отлично справляется со встроенными типами JavaScript. Так-же он может обрабатывать данные содержащие циклические ссылки.
Но при этом алгоритм структурного клонирования, не может сериализовать функции, а также плохо подходит для работы с классами, так как не может копировать:
Цепочки прототипов;
Геттеры;
Сеттеры;
Не перечисляемые свойства;
Объекты Element из дерева DOM. В контексте работы с Node.js, данный алгоритм не может копировать типы специфичные для данной среды, такие как: сокеты или потоки.
Попробуем создать объект и передать его из главного потока, во второстепенный, а заодно посмотрим чем работа с воркерами в Node, отличается браузерной версии языка. Создаём воркер в главном потоке:
import {Worker} from 'worker_threads'; // Первое отличие Node от браузера, воркеры нужно импортировать
let studentsRating = new Map([['Vasya', 3], ['Petr', 2], ['Vasyatka', 4]])
const worker = new Worker('./worker.js')
worker.postMessage(studentsRating)Воркер принимающий наш объект:
import {parentPort} from 'worker_threads'; // Получаем порт родительского потокака, чтобы через него получать сообщения
parentPort.on('message', (data) => {
data.set('Ivasik', 5)
console.log(data) // Map(4) {'Vasya' => 3, 'Petr' => 2, 'Vasyatka' => 4, 'Ivasik' => 5}
})Как можно заметить основное отличие воркеров в Node, от их собратьев из браузера, заключается в том что все необходимые объекты для работы с ними нужно импортировать из специальной библиотеки worker_threads. Кстати чтобы заработали импорты из ES6, как в примерах, нужно указать "type": "module" в package.json:
{
"name": "webworkernode",
"version": "1.0.0",
"description": "Test web worker project on node",
"main": "index.js",
"author": "Vasya Pupkin",
"license": "ISC",
"type": "module" // Включаем импорты ES6
}Как было сказано выше postMessage(), применяет алгоритм структурного клонирования для передачи объектов. Это не лучший вариант если требуется передать объекты сокетов, потоков или типизированных массивов. Изменить данное поведение можно при помощи второго аргумента transferList, метода postMessage(). Он принимает массив объектов, подлежащих передаче между потоками, а не копированию:
import { Worker } from 'worker_threads'
const uint8Array = new Uint8Array([ 1, 2, 3, 4 ])
const worker = new Worker('./worker.js')
worker.postMessage(uint8Array, [uint8Array.buffer]) // Передавать нужно именно ArrayBuffer (uint8Array.buffer), а не сам типизированный массив
console.log(uint8Array.length) // 0
console.log(uint8Array) // Пустой массив, после передачи, он больше не доступен в главном потокеПосле передачи объекта из одного потока в другой, он становится недоступным в исходном потоке и полностью удаляется из памяти главного потока. Это необходимо для того чтобы потоки не могли войти в состояние гонки. Если попытаться повзаимодействовать с переданным объектом, ошибки не будет, но и никаких действий не произойдёт.
Ещё данный механизм экономит память, так как не будет двух копий одного и того-же объекта, для разных потоков. Ещё так данные передаются чрезвычайно быстро, за счёт того что движения данных в памяти не происходит, а передача владения на объекты осуществляется через переключение указателей. Ну и бонусом сборщик мусора не отвлекается на это действие, а может заняться чем-то поважнее.
Такой способ передачи полезен для:
Работы с большими объёмами данных (аудио, видео, изображения в высоком качестве);
Реалтайм обработки данных с минимальной задержкой;
Экономии памяти.
Данные в воркер можно отправить не только при помощи метода postMessage(), и сразу при инициализации нового воркера через опцию workerData, о которой писалось выше:
import { Worker } from 'worker_threads'
const uint8Array = new Uint8Array([1, 2, 3, 4])
// Создаем воркер с передачей буфера через workerData + transferList
const worker = new Worker('./worker.js', {
workerData: {buffer: uint8Array},
transferList: [uint8Array.buffer] // Передаем ArrayBuffer
})
// Проверяем, что буфер стал недоступен в основном потоке
console.log(uint8Array.byteLength) // 0 буфер передан
console.log(uint8Array) // Пустой типизированный массивПри передаче данных подобным способом, приём их во второстепенном потоке, так-же будет иметь свои особенности:
import {workerData} from 'worker_threads' // Импортируем объект с данными workerData
const {buffer, bufferSize} = workerData // Принимаем переданный буфер
const receivedArray = new Uint8Array(buffer, 0, bufferSize) // Воссоздаем типизированный массив в потоке воркера, без копирования
console.log(receivedArray) // Нашм массив [ 1, 2, 3, 4 ]Чтобы разделить один типизированный массив, между двумя потоками, в обход алгоритма структурного клонирования, требуется создать общую для потоков память при помощи SharedArrayBuffer и типизированный массив для работы с этим участком памяти. Затем созданный буфер, нужно передать в поток воркера, через postMessage(), чтобы он тоже имел доступ к этой области памяти. При этом не нужно включать данный буфер во втором аргументе метода postMessage(), как в случает transferList, ведь нам нужно сохранить доступ к типизированному массиву в обоих потоках:
import {Worker} from 'worker_threads'
const sharedBuffer = new SharedArrayBuffer(1024) // Создаём буфер
const uint8Array = new Uint8Array(sharedBuffer) // Создаём типизиованный массив
uint8Array.set([1, 2, 3, 4, 255], 0)
const worker = new Worker('./worker.js')
worker.postMessage(sharedBuffer)Теперь примем типизированный массив в потоке воркера:
import {parentPort} from 'worker_threads'
parentPort.on('message', (buffer) => {
const workerUint8Array = new Uint8Array(buffer) // Восстанавливаем массив из полученного буфера
})Данную методику нежелательно использовать, ведь она противоречит, многопоточной архитектуре JavaScript, не допускающей общего доступа к памяти между потоками. Однако в некоторых ситуациях, она может быть пригодиться. Например для обработки изображений, когда потоки будут обрабатывать изображение с различных концов. Таким образом они хоть и будут обрабатывать один массив данных, но в памяти эта обработка не будет пересекаться.
Нагрузка при использовании SharedArrayBuffer минимальна, так как данные не в памяти не перемещаются, а между потоками просто передаётся ссылка на буфер. Соответственно изменения сделанные в одном потоке, мгновенно отобразятся в другом. А сборщик мусора удалит массив из памяти, как только на него не останется ссылок в любом из потоков.
Для совместной обработки несколькими потоками, больших объёмов данных;
Реалтайм аналитика;
Высокопроизводительные вычисления;
Для реализации ПО которому необходим частый обмен данными между потоками.
Как уже упоминалось выше, SharedArrayBuffer нарушает архитектуру многопоточности JavaScript, которая не даёт возможности войти потокам в состояние гонки. Достигалось это за счёт чёткого запрета потокам обращаться к общим участкам памяти. Но SharedArrayBuffer, позволяет обойти это ограничение, что небезопасно и может привести к сложнодиагностируемым багам.
Но выход есть! Обеспечить безопасную работу с разделённым между потоками массивом, может интерфейс Atomics, который позволяет выполнять атомарные операции при работе с данными. Например метод add() позволяет безопасно добавить в массив новый элемент, гарантируя что никакой другой поток не в мешается в данную процедуру:
import {Worker} from 'worker_threads'
const sharedBuffer = new SharedArrayBuffer(4)
const intArray = new Int32Array(sharedBuffer)
const worker = new Worker('./worker.js')
worker.postMessage(sharedBuffer)
Atomics.add(intArray, 0, 3)
console.log(intArray) // 3Теперь изменим типизированный массив в воркере при помощи Atomics API:
import {parentPort} from 'worker_threads'
parentPort.on('message', (sharedBuffer) => {
const intArray = new Int32Array(sharedBuffer)
Atomics.add(intArray, 0, 2)
console.log(intArray) // 5
})Обратили внимание на необычный результат? В воркере метод add(), не переписал старое значение, новым, а сложил их. Дело в том API Atomics, придерживается философии низкоуровневых инструкций ассемблера и соответствует инструкции LOCK XADD, которая именно модифицирует старое значение, а не изменяет его. А для замены существует метод store():
import {Worker} from 'worker_threads'
const sharedBuffer = new SharedArrayBuffer(4)
const intArray = new Int32Array(sharedBuffer)
const worker = new Worker('./worker.js')
worker.postMessage(sharedBuffer)
Atomics.add(intArray, 0, 3)
console.log(intArray) // [3]И ещё раз поменяем значение в воркере:
import {parentPort} from 'worker_threads'
parentPort.on('message', (sharedBuffer) => {
const intArray = new Int32Array(sharedBuffer)
Atomics.store(intArray, 0, 42)
console.log(intArray) // [42]
})Атомарность в данном контексте означает, что достаточно сложная операция метода add(), состоящая нескольких этапов:
Чтения старого;
Вычисления;
Запись.
Будет выполнена как одна единая операция и никакой другой процесс не сможет в неё вклиниться. Это позволит спокойно и безопасно использовать буфер, не опасаясь того что потоки войдут в состояние гонки.
Ещё одной изящной и надёжной возможность обезопасится от состояния гонки, при помощи API Atomics, являются методы Atomics.wait() и Atomics.notify(), которые позволяют легко и быстро реализовать паттерн Producer/Consumer и таким образом синхронизировать между собой несколько потоков:
import {Worker} from 'worker_threads'
const sharedBuffer = new SharedArrayBuffer(4)
const intArray = new Int32Array(sharedBuffer)
const worker = new Worker('./worker.js')
worker.postMessage(sharedBuffer)
console.log(Atomics.wait(intArray, 0, 0)) // Возвращает ok
console.log('Основной поток разбужен!')
Atomics.add(intArray, 0, 3)
console.log(intArray) // 45Добавляем значение в буфер, в воркере:
import {parentPort} from 'worker_threads'
parentPort.on('message', (sharedBuffer) => {
const intArray = new Int32Array(sharedBuffer)
setTimeout(() => { // Имитируем бурную деятельность таймаутом
Atomics.store(intArray, 0, 42) // Записываем результат
Atomics.notify(intArray, 0, 1) // Будим основной поток
}, 2000)
})В примере выше используется метод wait(), который принимает следующие аргументы:
typedArray - типизированный массив подлежащей передаче в поток воркера;
index - индекс элемента массива, который предстоит изменять в воркере
value - текущее значение элемента массива, который предстоит изменить;
timeout - максимальное время ожидания ответа от воркера в миллисекундах. Опциональный аргумент, опусти который, метод будет ждать ответ от воркера бесконечно.
Возвращает этот метод различные строковые значения, зависящие от того чем кончилась работа с переданным значением:
ok - возвращается если:
Значение элемента переданного для проверки, равно значению в буфере, а значит метод входит в режим ожидания;
Метод notify() уведомил основной поток, о том что закончил работу с указанным значением буфера.
not-equal - возвращается если:
Значение элемента переданного для проверки, не равно значению в буфере, а значит метод не войдёт в режим ожидания и поток продолжит работу в обычном режиме;
timed-out - возвращается если истекло время ожидания, переданной в последнем аргументе.
Перейдёт ли метод wait() в режим ожидания, зависит от совпадёт ли значение элемента буфера, переданное ему для проверки, с реальным значение хранящимся в буфере. Почему это настолько важно, что разработчики Node.js завязали на этом моменте всю работу метода, и сделали так чтобы он возвращал разные значения, в зависимости того как всё прошло? Такой механизм необходим, позволяет обезопаситься от ещё одной проблемы, с которой можно столкнуться в многопоточном программировании: блокировкой потока.
Предположим что у нас нет механизма подобной проверки. Буфер передаётся в поток воркера, там его значение меняется и уведомляет об этом основной поток при помощи метода notify(). Но что если, изменение значение в воркере и последующее уведомление произойдёт раньше того, как в основном потоке успеет сработать метод wait()? Тогда основной поток никак не сможет узнать о том что работа в воркере выполнена и будет заблокирован навсегда! Либо на время указанное в таймауте, что уже лучше, но всё равно ситуация плохая и может сильно замедлить работу приложения.
Чтобы защититься от подобных проблем в методе wait() и нужна проверка значения. Эта операция атомарна, то есть проверка значения и последующая блокировка, выполняются как одно целое и в этот процесс никто не может вмешаться. В результате поток блокирует только текущее состояние данных и будет находится заблокированным, пока это состояние не изменится.
Переход потока в состояние сна и его последующий возврат к жизни, это очень дорогие операции, инициирующие переключения контекста операционной системой. Своими проверками метод wait() экономит ресурсы, сводя дорогостоящие операции к минимуму и укладывая потоки спать, только если в этом реально есть необходимость.
То есть учитывая всё сказанное выше можно понять что реализация метода wait(), это хорошо продуманный механизм, учитывающий многие нюансы многопоточного программирования и позволяющий избавиться от массы проблем, даже не зная об их существовании.
API Atomics поддерживает не только безопасную запись при работе с потоками, но и безопасное чтение. Использование банального получения значения из буфера по индексу:
const intArray = new Int32Array(sharedBuffer)
let myValue = intArray[0]Работа в многопоточном режиме, опасна тем что может произойти чтение:
Частично записанных значений, если другой поток пишет в соседние байты;
Устаревших данных, при чтении из кэша процессора вместо основной памяти;
Не тех данных что ожидалось, потому что компилятор или процессор переупорядочили операции.
Избежать всех этих проблем метод Atomics.load(). Переделаем код воркера под него:
import {parentPort} from 'worker_threads'
parentPort.on('message', (sharedBuffer) => {
const intArray = new Int32Array(sharedBuffer)
Atomics.store(intArray, 0, 42) // Записываем результат
let bufferValue = Atomics.load(intArray, 0) // Гарантированно 42
console.log(bufferValue) // 42
})Данный метод гарантирует:
Целостность данных и получение непротиворечивого значения;
Предотвращение переупорядочивания инструкций. На низком уровне данная это реализуется за счёт отключения оптимизаций компилятора и запрета на перестановку операций вокруг load();
Актуальность значений из памяти, а не из кэша процессора.
Ещё данный метод имеет ряд существенных особенностей:
Атомарность - чтение происходит как единая неделимая операция;
Барьер памяти - гарантирует, что все предыдущие записи видны до операции;
Не блокирует другие потоки;
Работает только с типизированными массивами: Int8Array, Uint32Array, Float64Array.
Когда следует использовать метод Atomics.load():
Чтение разделяемых данных, которые могут изменяться другими потоками;
Перед операциями модификации;
Работа с флагами состояния;
Чтение 64-битных значений(BigInt64Array/BigUint64Array).
Данный метод хорошо оптимизирован и по скорости работы, сопоставим с использованием обычного чтения, но при этом он гораздо эффективнее мьютексов.
В примерах выше мы активно использовали метод postMessage() и объект parentPort, для организации обмена сообщениями между родительским и дочерним потоками. Но как быть если у нас несколько потоков воркеров и нам требуется организовать обмен сообщениями между ними? Для организации подобной связи в Node.js существуют каналы связи.
Для создания такого канала нужно создать объект MessageChannel, у которого будут два свойства:
port1;
port2.
Которые ссылаются на пары объектов MessageChannel в родительском и дочернем потоке. Вызов метода postMessage(), на одном порту, приводит к генерации события message, другом. Создадим 2 воркера в главном потоке и наладим между ними связь, при помощи каналов сообщений:
import {Worker, MessageChannel} from 'worker_threads'
const {port1, port2} = new MessageChannel()
const worker = new Worker('./worker.js')
const secondWorker = new Worker('./secondWorker.js')
worker.postMessage({type: 'PORT', port: port1}, [port1]) // Передаём объекты портов в воркеры
secondWorker.postMessage({type: 'PORT', port: port2}, [port2]) Теперь получим и настроим порты в первом воркере:
import { parentPort } from 'worker_threads'
parentPort.on('message', (message) => {
let port = message.port // Получаем порт из главного потока
port.postMessage('Привет из первого воркера!') // Отправляем сообщение во второй воркер через полученный порт
port.on('message', (msg) => {
console.log('Первый воркер получил сообщение:', msg) // Принимаем сообщение из второго воркера
})
})И во втором воркере:
import { parentPort } from 'worker_threads'
parentPort.on('message', (message) => {
let port = message.port // Получаем порт из главного потока
port.on('message', (msg) => {
console.log('Второй воркер получил сообщеие:', msg)
port.postMessage('И тебе привет от второго воркера!') // Отправляем ответное сообщение первому воркеру
})
})Для закрытия созданного канала нужно вызвать метод close(), одного из объектов MessageChannel. Вызов данного метода генерирует событие close, на обоих портах:
import { parentPort } from 'worker_threads'
parentPort.on('message', (message) => {
let port = message.port
port.postMessage('Привет из первого воркера!')
port.on('message', (msg) => {
console.log('Первый воркер получил сообщение:', msg)
port.close() // Закрывает порт после получения сообщения
parentPort.close() // Тем-же методом можно закрыть и родительский порт
})
})Если воркер завершает свою работу, то все порты связанные с ним, закрываются автоматически:
import { parentPort } from 'worker_threads'
parentPort.on('message', (message) => {
let port = message.port
port.postMessage('Привет из первого воркера!')
port.on('message', (msg) => {
console.log('Первый воркер получил сообщение:', msg)
process.exit(0) // При завершении работы воркера, закрывать порты не нужно, так как они закроются автоматически
})
})Когда задача для которой они требовались завершена и они больше не нужны, как в примере выше, мы закрыли порт сразу после того как получили сообщение;
При возникновении ошибок, чтобы избежать утечек ресурсов:
import { parentPort } from 'worker_threads'
parentPort.on('message', (message) => {
let port = message.port
port.postMessage('Привет из первого воркера!')
try {
port.on('message', (msg) => {
console.log('Первый воркер получил сообщение:', msg)
})
} catch {
port.close()
}
})При пересоздании каналов, старые нужно закрывать;
При завершении работы приложения. Для этого лучше всего использовать упомянутый выше метод process.exit(0);
Экономия ресурсов. Открытый канал продолжает расходовать ресурсы памяти, что приводит к её утечкам;
Избегание конфликтов, так как старые каналы могут помещать открытию новых. Освобождённые неиспользуемыми каналами порты, всегда могут пригодиться в другом месте;
Корректное завершение работы приложения, можно обеспечить только явным закрытием всех каналов и неиспользуемых воркеров.
JavaScript даёт множество инструментов для налаживания взаимодействия между потоками, что позволяет на этом языке реализовывать сложные многопоточные архитектуры. В случае необходимости, можно даже нарушить ограничение отделяющие воркеры, от главного потока и друг от друга, предоставив им возможность работать данными в общей памяти. Но даже для таких ситуаций, существует API Atomics, позволяющая нарушать правила безопасно.