Мультиплексирование потоков данных Node.js Streams: пошагово программируем и разбираем задачу
- четверг, 1 января 2026 г. в 00:00:04
Задача мультиплексирования данных
Постановка задачи мультиплексирования потоков данных в Node.js
Программирование своего stream.Duplex
Программирование исходящий сообщений
Парсинг входящий сообщений
Тестирование
Оптимизация отправки сообщений
Проблема с одновременным закрытием потоков
Проблема с конфликтом id потоков
Доработки кода
Заключение
Мне очень нравится идея потоков данных в Node.js - data streams. Они используются всюду: чтение файлов, сетевые запросы, архивирование файлов.
Не путать с потоками выполнения процессов - threads! Это совсем другое!
Очень мне нравится идея метода pipe - перенаправление вывода одного потока в ввод другого.
Можно строить длинные "пайплайны" из логики по преобразованию данных.
Каждый раз, когда я вызываю этот метод, я вспоминаю студенческие годы, где мы на парах по Linux пайпали вывод stdout во вход stdin другого процесса.
Так же и называется этот приём в терминале - пример, команда cat file.txt | grep qqq | wc -l.
Выполняется следующая цепочка команд:
cat file.txt — читает содержимое файла
| — передаёт вывод в grep qqq
grep qqq — фильтрует строки, содержащие "qqq"
| — передаёт отфильтрованные строки в wc -l
wc -l — считает количество строк
Результат: количество строк в file.txt, содержащих "qqq"
Собственно, если вывод и ввод процесса stdout и stdin рассмотреть как потоки данных, то инструмент | - как раз пайпинг выхода первого потока во вход второго.
Есть много хороших статей по философии, теории и применении потоков, Хабр не стал исключением:
Ментальная модель потоков в Node.js
Много раз я сталкивался с необходимостью реализовывать свои потоки данных. И каждый раз я путался с кучей методов: write, push, _write, _read, с кучей событий - end, close, finish. В чем отличие write от _write?! Буквально почти все время, когда я имею дело с потоками у меня открыты доки Node.js - https://nodejs.org/api/stream.html.
Недавно, в моей статье про кастомный транспорт для ShadowSocks мне потребовался функционал передачи разных потоков данных по одному потоку (каналу)данных. Это задача мультиплексирования данных.
Я нашёл порт библиотеки на Go - yamux, который, увы, не взлетел на тестах. Поскольку задача не такая уж и сложная, плюс я хотел уже "закрепить на практике" все эти непонятки с потоками - решил накидать свой код.
Я думаю, что эта задача является хорошей практикой для новичков в Node.js и программистов, любящих кодить всякие алгоритмы.
Во время программирования я встретился с несколькими "программистскими задачами" aka "подводными камнями", которые мне пришлось решить, и это было увлекательно, что и вылилось в данную статью.
Так же сначала будет применяться очевидные, но не самые эффективные (с точки зрения времени и памяти) решения задач. По ходу статьи мы будем совершенствовать код и применять более эффективные решения.
Код моего мультиплексора данных (в этой статье будем писать почти такой же) доступен в npm и на GitHub.
Представим себе города, в которых есть несколько многополосных дорог. Передвигаться внутри города не представляет проблем. Пускай, внутри города люди часто отправляют друг другу курьерские посылки.
А между городами - есть лишь двухполосное шоссе, по одной полосе на каждую сторону - допустим, проблемы с дорогами, или ремонт - не зависящее от нас ограничение.
По сути, можно воочию наблюдать, как несколько потоков машин на разных полосах сливаются в один поток на одной полосе шоссе.
Если заставить всех курьеров на легковушках перемещаться по шоссе, но тогда начнутся пробки на выезде из города - уплотнение данных приводит к потере скорости передачи.
Собственно, то, как машины из многих полос выстроятся в одну - и есть задача мультиплексирования данных.
Задача мультиплексирования - уплотнения потока данных (из-за каких-либо причин или внешних ограничений), либо же передача нескольких виртуальных каналов через меньшее количество (один) физических каналов.
Например, можно вместо отправки всех курьеров по шоссе сформировать один грузовик с посылками. Но тогда придется как то маркировать посылки (писать адрес), чтобы не растерять их в другом городе.
Это тоже относится к мультиплексированию - как упаковать данные в более узкий канал, чтобы распаковать их после передачи в правильном порядке.
Погрузимся в JavaScript, Node.js. Пускай у нас на руках имеется несколько потоков данных - stream.Duplex. А канал передачи - один поток данных stream.Duplex.
Как пример, пусть у нас есть один канал передачи данных между компьютерами - сокет net.Socket. Первый компьютер одновременно скачивает несколько файлов (т.е. работает с несколькими stream.Duplex), и хочет передавать их на другой компьютер - с помощью лишь одного stream.Duplex.
Нужно как то упаковать данные разных потоков в один поток данных, чтобы распаковать их после передачи в правильном порядке - решить задачу мультиплексирования.
С одной стороны у нас должен быть массив stream.Duplex - массив потоков данных, которые мы будем "сжимать" в один поток stream.Duplex с другой стороны.
Формулируем задачу:
написать класс Mux, реализующий stream.Duplex - тот самый один поток данных, который мы будем передавать далее на другую сторону (компьютер)
Mux имеет метод openStream() для создания множества потоков данных MuxChannelStream(которые, в свою очередь, и будут мультиплексироваться)
MuxChannelStream - поток данных, который будет мультиплексироваться в Mux, так же является наследником stream.Duplex
MuxChannelStream должен иметь уникальный идентификатор id
Mux имеет событие stream извещающее, что провотиположная сторона открыла новый поток данных
Mux сам по себе является потоком данных, который передаёт и принимает уже мультиплексированные данные дочерних потоков в верном порядке

Duplex поток - является одновременно как Readable, так и Writable, из него можно читать данные и в него можно писать данные.
Чтобы написать свой класс потока с кастомной логикой, нужно отнаследоваться от класса stream.Duplex и реализовать некоторые методы (о них далее).
Договоримся называть юзером программиста, который использует по назначению разрабатываемый нами класс.
Читает данные юзер либо с помощью периодического вызова метода read, либо с помощью подписки на событие data, которое вызывается при получении данных. read() - возвращает буфер данных, либо null, если внутренний буфер данных пуст. Событие же data всегда передает поступивший буфер данных.
Эти данные как то должны "появиться" внутри класса. Например, класс net.Socket - сетевой сокет для приёма и отправки данных по сети (который, в свою очередь, является наследником класса stream.Duplex). Данные приходят на компьютер, обрабатываются операционной системой, поступают "внутрь класса" net.Socket и тогда уже внутренняя логика класса выстреливает событие data.
Когда юзер вызывает read(n) - он хочет прочитать n байт данных. Пока n байт данных не прочитаются, для наполнения внутренних данных будет вызываться метод _read - который уже должен реализовать программист. Внутри метода _read, чтобы добавить данные во внутренний буфер своегоstream.Duplex, нужно вызвать метод push(). Как только новые данные для чтения будут готовы, так же выстреливается событие data.
Внутренние данные могут прийти сами собой - как в случае net.Socket. Тогда же метод push() программистом будет вызываться уже не внутри метода _read, а там, где операционная система прислала данные. В таком случае пустая реализация метода _read - обычная практика.
Внутренний буфер для чтения наполняется в другом месте своей логикой.
class MyStream extends stream.Duplex {
_read(size) {
// данные приходят в другом месте
}
// обработка пришедших данных от операционной системы
onSocketData(data) {
// проталкиваем данные во внутренний буфер, чтобы они
// стали доступны для чтения
// так же выстрелится событие "data"
this.push(data);
}
}
Пишет данные юзер с помощью метода write. Данные записываются во внутренний буфер stream.Duplex. Опять же, в случае net.Socket - эти данные передаются операционной системе и уже она передаёт эти данные далее по сети.
Чтобы написать свою логику обработки данных, которые юзер будет записывать через write, нужно реализовать метод _write.
_write(chunk, encoding, callback) {
// обрабатываем chunk, который записывает юзер
callback();
}
Я все время в этом путался.
write - этот метод нужен юзеру класса stream.Duplex, чтобы записать в него данные.
push - этот метод нужен программисту класса stream.Duplex, чтобы подготовить данные для чтения юзером.
write вызывается вне класса, push - вызывается во внутренней логике класса.

Сделаем 2 класса, отнаследованных от stream.Duplex:
Mux - класс-поток, который будет мультиплексировать данные и передавать запакованные данные далее для чтения как stream.Readable
MuxChannelStream - класс, который будет предоставлять поток данных, данные из него будут мультиплексироваться.
Т.е. методом openStream() класса Mux мы получаем рабочие потоки (вызывая метод каждый раз для нового потока), куда будем писать нужные нам данные.
Каждый раз при вызове openStream() на одной стороне, будет выстреливаться событие stream на второй стороне.
Данные из MuxChannelStream передаются и пакуются в Mux, после чего этот поток данных мы можем передавать дальше - например, сделать pipe в сетевой сокет net.Socket и передать на вторую сторону.
На второй стороне так же, из сокета мы пайпаем в Mux, который будет распаковывать данные, создавая соответствующие MuxChannelStream и передавая в них соответствующие им данные.
Использование класса Mux:
/* код на первой стороне */
const mux = new Mux();
const stream1 = mux.openStream();
const stream2 = mux.openStream();
// передачи мультиплексированных данных осуществляется с помощью сокета (пример)
mux.pipe(socket).pipe(mux);
// передаем нужные данные
stream1.write("data1");
stream1.write("data2");
/* код на второй стороне */
const mux = new Mux();
mux.on("stream", stream => {
console.log(`new data stream id=${stream.id}`);
// читаем данные от потока stream.id через событие "data"
stream.on("data", chunk => {
// обрабатываем данные от потока stream.id
console.log(`recieved data, ${chunk.length} bytes from stream id=${stream.id}`);
});
});
// передачи мультиплексированных данных осуществляется с помощью сокета (пример)
mux.pipe(socket).pipe(mux);
Так же на второй стороне можно тоже создать поток с помощью метода openStream(), а на первой - подписаться на событие stream.
Если мы наследуемся от stream.Duplex, то мы должны реализовать методы _read() и _write().
Поехали, набросаем классы:
import stream from "node:stream";
class MuxChannelStream extends stream.Duplex {
_read(size) {
}
_write(chunk, encoding, callback) {
}
}
class Mux extends stream.Duplex {
_read(size) {
}
_write(chunk, encoding, callback) {
}
openStream() {
}
}
Запрограммируем сначала запись данных в MuxChannelStream и их дальнейшее мультиплексирование (внутреннюю обработку) в Mux, а затем - напишем обратный процесс распаковки данных из Mux в MuxChannelStream.
Когда мы создаём поток данных MuxChannelStream на одной стороне, нам нужно известить Mux на другой стороне об этом - и передать ей id нового созданного потока.
Так же, если поток данных завершился (вызвали end()), об этом так же нужно сказать другой стороне.
Ну и уж, при записи буфера данных в поток MuxChannelStream, нужно сказать другой стороне, что "такой-то буфер данных был записан в поток с таким-то id".
Данные потока Mux - будут сообщениями, извещающими об этом.
В самой простой реализации нам нужно посылать сообщения 3-х типов:
Создание нового потока
Завершение потока
Запись данных в поток
Каждое сообщение - это некоторый "пакет" данных, буфер из байтов.
Каждый буфер сообщения несет в себе тип этого сообщения и данные сообщения.
Тип сообщения определим как константы внутри класса Mux:
static MESSAGE_TYPE_STREAM_OPEN = 0;
static MESSAGE_TYPE_STREAM_CLOSE = 1;
static MESSAGE_TYPE_STREAM_DATA = 2;
Каждый раз, когда юзер будет создавать поток, закрывать поток, и передавать какие-либо данные, внутри Mux мы будем формировать сообщения и записывать их во внутренний буфер Readable - чтобы сформировать мультиплексированный поток данных и была возможность передать его далее.
Код внутри класса Mux.
Каждому потоку мы будем приписывать id - как просто увеличивающийся номер, с нуля и по порядку.
Напишем приватную функцию _createChannelStream(id), которая будет создавать поток данных, с переданным id.
Созданные потоки будем хранить в мапе this.streams.
// MuxChannelStream
constructor(id) {
super();
this.id = id;
}
// Mux
constructor() {
super();
this.streamId = 0; // текущий номер для нового потока
this.streams = new Map();
}
openStream() {
const streamId = this.streamId++;
return this._createChannelStream(streamId);
}
_createChannelStream(streamId) {
const channelStream = new MuxChannelStream(streamId);
this.streams.set(channelStream.id, channelStream);
return channelStream;
}
Теперь нужно передать сообщение о создании потока. Сообщение будет представлять собой буфер из байт, который мы запишем во внутренний буфер данных методом push().
Все передаваемые и читаемые данные в потоке - это последовательность байтов. Запись и чтение происходит кусочками - "чанками" - буферами из байт, различной длины.
Пока что сделаем по-простому, а далее подумаем над эффективностью.
Можно сообщение формировать как JSON-обьект, конвертировать (сериализовать) его в строку, и писать во внутренний буфер.
// пример сообщения создания потока
const message = {
type: Mux.MESSAGE_TYPE_STREAM_OPEN,
// данные сообщения - здесь только id потока
id: streamId
};
// буфер сообщения, который будем передавать
const messageBuffer = Buffer.from(JSON.stringify(message));
Т.к. прием данных осуществляется так же чанками, и размер чанков может быть произвольным - начиная от одного байта, сообщение может быть порезано на произвольное число чанков разной длины.
Мы должны будем восстановить сообщение из входящей последовательность чанков.

Т.к. по входящей последовательности чанков сложно сразу определить, как парсить сообщение, можно сначала отправить длину сообщения (целое число типа int32), а потом уже сам буфер сообщения. Таким образом прочитав длину сообщения - всегда 4 байта, дальше можно прочитать сам буфер сообщения.
const messageBuffer = Buffer.from(JSON.stringify(message));
// аллоцируем 4 байта для записи длины сообщения
const messageLengthBuffer = Buffer.allocUnsafe(4);
messageBufferLength.writeInt32BE(messageBuffer.length);
// сначала записываем буфер с длиной сообщения
this.push(messageLengthBuffer);
// потом - буфер с самим сообщением
this.push(messageBuffer);
Полный код записи сообщения с произвольными данными получается такой:
_writeMessage(type, data = {}) {
const message = { type, ...data };
const messageBuffer = Buffer.from(JSON.stringify(message));
const messageLengthBuffer = Buffer.allocUnsafe(4);
messageLengthBuffer.writeInt32BE(messageBuffer.length);
this.push(messageLengthBuffer);
this.push(messageBuffer);
}
И в создании потока _createChannelStream так же нужно отправить соответствующее сообщение.
_createChannelStream(streamId) {
const channelStream = new MuxChannelStream(streamId);
this.streams.set(channelStream.id, channelStream);
this._writeMessage(Mux.MESSAGE_TYPE_STREAM_OPEN, { id: streamId });
return channelStream;
}
Поток MuxChannelStream закрывается, когда юзер вызывает на нем метод end().
Поток выстреливает событие finish. Нам самим нужно подписаться на это событие, удалить поток из мапы this.streams потоков и отправить сообщение о закрытии потока.
Это мы делаем в методе _createChannelStream, когда новый поток у нас "на руках".
_createChannelStream(streamId) {
...
channelStream
.on("finish", () => {
this.streams.delete(channelStream.id);
this._writeMessage(Mux.MESSAGE_TYPE_STREAM_CLOSE, { id: channelStream.id });
});
return channelStream;
}
Когда юзер будет писать данные в MuxChannelStream, нужно произвести мультиплексирование данных и отправить их во внутренний буфер Mux.
У каждого MuxChannelStream у нас есть уникальный id - именно так можно отличать чанки данных от разных потоков.
Соответственно, в сообщении с типом Mux.MESSAGE_TYPE_STREAM_DATA нужно передать id потока и сам чанк данных.
В JSON нет примитива Buffer, поэтому пока что (опять же, первое-простое-тупое) решение - записать его как массив чисел
this._writeMessage(Mux.MESSAGE_TYPE_STREAM_DATA, { id: channelStream.id, data: Array.from(chunk) });
Где мы обрабатываем данные, которые юзер пишет в поток MuxChannelStream? Правильно, во внутреннем методе _write.
Метод _writeMessage существует у класса Mux, чтобы его вызвать, нужна ссылка на "родительский" поток Mux, передадим её в конструктор MuxChannelStream.
class MuxChannelStream extends stream.Duplex {
constructor(mux, id) {
super();
this.mux = mux;
this.id = id;
}
_read(size) {
}
_write(chunk, encoding, callback) {
this.mux._writeMessage(Mux.MESSAGE_TYPE_STREAM_DATA, { id: this.id, data: Array.from(chunk) });
callback();
}
}
Супер! Все 3 сообщения мы отправили, теперь займемся обратной задачей - принятие данных из мультиплексированного потока и парсинг сообщений.
Парсинг сообщений гораздо сложнее, чем его отправка простым вызовом метода push().
Итак, приём сообщений происходит на другой стороне. Это значит, когда в Mux пишут данные методом write() извне - это как раз и есть мультиплексированные данные, которые нужно распаршивать на входящие сообщения.
Проблема в том, что внутренний метод _write(chunk, encoding, callback) - первом аргументом принимает тот самый чанк данных, и нигде не гарантируется его конкретная длина. Все входящие сообщения, в теории, могут приходить побайтово: сколько байт в сообщении, столько и вызовется подряд функция _write() c чанком из одного байта.
Мы должны написать код, который парсит входящие сообщения из сырого потока байт, вне зависимости от того, как его "порезали на чанки".
Изначально мы знаем, что вначале, должна прийти длина сообщения - это 4 байта. Так мы договорились.
Как придет 4 байта, мы тут же узнаем длину сообщения n - и будем ждать, пока не придет это количество - n байт. Здесь мы уже можем парсить сообщение, как JSON-строку.
Следующие 4 байта - снова, длина уже следующего сообщения. И так далее.
Применим паттерн "стейт-машина/состояние" для чтения записываемых юзером данных.
Состояние будет значит, что мы считываем в данный момент. Мы можем считывать либо
длину сообщения (4 байта)
само сообщение (длина сообщения в байтах)
Не самая большая стейт-машина)
В обоих стейтах мы должны дождаться, пока не придёт определенное количество байт, для корректного парсинга либо длины сообщения, либо самого сообщения.
Определим стейты как константы в классе Mux:
static STATE_READ_MESSAGE_SIZE = 0;
static STATE_READ_MESSAGE_DATA = 1;
В каждом стейте мы будем ждать определенное количество байт - пусть это будет this.sizeToRead, пока приходят чанки, и их общая длина меньше this.sizeToRead мы должны сохранять чанк в массив и ждать, пока не придет нужное (или большее) количество байт.
Чанки будем писать в простой массив this.chunks, а чтобы каждый раз не считать общую их длину в байтах - заведём переменную this.chunksTotalSize.
Начальный стейт - this.state = Mux.STATE_READ_MESSAGE_SIZE, соответственно this.sizeToRead = 4.
Когда будет поступать новый чанк данных от юзера (метод _write), просто кладём его в массив чанков, увеличивая общую длину чанков, и вызываем логику обработки текущего стейта - _processState().
class Mux extends stream.Duplex {
...
static STATE_READ_MESSAGE_SIZE = 0;
static STATE_READ_MESSAGE_DATA = 1;
constructor() {
super();
...
this.state = Mux.STATE_READ_MESSAGE_SIZE;
this.sizeToRead = 4;
this.chunks = [];
this.chunksTotalSize = 0;
}
_write(chunk, encoding, callback) {
// кладём новый chunk в массив чанков
this.chunks.push(chunk);
// считаем общую длину всех чанков в байтах
this.chunksTotalSize += chunk.length;
this._processState();
callback();
}
...
_processState() {
}
}
Первое, что должен сделать _processState в любом случае - дождаться this.sizeToRead байт в пришедших чанках. Если пришло меньше байт, то ждём дальше - выходим из функции, а когда придёт новый чанк мы снова сюда попадём.
_processState() {
if (this.chunksTotalSize < this.sizeToRead) return;
}
Если же пришло нужное число байт this.sizeToRead или даже больше, нам нужно выделить чанк размером ровно this.sizeToRead байт, оставив остальные.

На рисунке мы имеем в this.chunks 5 чанков размеров: 1, 3, 4, 1, 8 байт соответственно. Нужно прочитать this.sizeToRead 12 байт.
Получается, нам нужно обьединить первые 4 чанка и 3 байта от 5го чанка в буфер - текущий chunk для чтения. Остальные 5 байт из 5го чанка должны остаться в массиве чанков как оставшийся чанк.
Предлагается сначала определить индекс крайнего чанка, которого мы будем резать (или не будем, если общая длина предыдущих чанков будет полностью равна this.sizeToRead). После чего произвести операцию разрезания чанка, и конкатенировать (объединить) первые чанки.
_processState() {
if (this.chunksTotalSize < this.sizeToRead) return;
let chunkIndex = 0;
let chunksSize = 0;
let chunksToConcat = [];
while (chunksSize < this.sizeToRead) {
chunksSize += this.chunks[chunkIndex].length;
chunkIndex++;
}
// вырезаем первые chunkIndex чанков из массива чанков, их мы будем объединять
chunksToConcat = this.chunks.splice(0, chunkIndex);
if (chunksSize > this.sizeToRead) {
// перебрали с длиной - последний чанк в chunksToConcat нужно разрезать
const chunkToSlice = chunksToConcat.pop();
const sliceLength = chunksSize - this.sizeToRead;
const firstPart = chunkToSlice.slice(0, sliceLength);
const secondPart = chunkToSlice.slice(sliceLength);
// первая часть идёт в chunksToConcat в конец
chunksToConcat.push(firstPart);
// вторая остаётся в chunks в начале (просто заменяем разрезанный chunk на его 2ю часть)
this.chunks.unshift(secondPart);
}
const chunk = Buffer.concat(chunksToConcat);
// обновляем this.chunksTotalSize
this.chunksTotalSize -= chunk.length;
...
}
Теперь у нас есть на руках chunk - и как раз, нужной длины this.sizeToRead. В зависимости от this.state мы будем по-разному трактовать эти данные - либо это длина сообщения, либо это само сообщение.
После выяснения длины сообщения, которой становится переменная this.sizeToRead, меняем стейт на Mux.STATE_READ_MESSAGE_DATA.
Если мы находимся в стейте Mux.STATE_READ_MESSAGE_DATA, то chunk - это и есть данные сообщения, вынесем обработку в функцию _handleMessage(), и снова установим стейт в чтение длины следующего сообщения.
switch (this.state) {
case Mux.STATE_READ_MESSAGE_SIZE:
// читаем длину сообщения - столько байт нам нужно будет дождаться
this.sizeToRead = chunk.readInt32BE(0);
// меняем стейт на чтение сообщения
this.state = Mux.STATE_READ_MESSAGE_DATA;
break;
case Mux.STATE_READ_MESSAGE_DATA:
// обрабатываем сообщение
this._handleMessage(chunk);
// меняем стейт снова на чтение длины сообщения
this.sizeToRead = 4;
this.state = Mux.STATE_READ_MESSAGE_SIZE;
break;
}
В массиве this.chunks могли остаться данные, в таком случае вызовем обработку стейта _processState() ещё раз.
// если есть ещё чанки - запустим снова обработку стейта
if (this.chunksTotalSize > 0) this._processState();
Перейдём к методу _handleMessage() - он принимает параметр chunk - данные сообщения. Как мы договаривались, это - JSON-строка.
Пропарсив её, внутри должен лежать тип сообщения type - в зависимости от которого мы будем писать разную логику обработки сообщения.
_handleMessage(chunk) {
const message = JSON.parse(chunk.toString());
switch (message.type) {
...
}
}
Если type === Mux.MESSAGE_TYPE_STREAM_OPEN, значит другая сторона создала поток MuxChannelStream и мы так же должны создать его на нашей стороне, оповестив юзера событием stream.
У нас уже есть функция _createChannelStream(), но она внутри себя отправляет сообщения Mux.MESSAGE_TYPE_STREAM_OPEN и Mux.MESSAGE_TYPE_STREAM_CLOSE. А в данном случае нам нужно отправлять не сообщение, (мы обрабатываем, собственно это сообщение от другой стороны) а событие stream юзеру.
Получается, есть общий функционал в функции _createChannelStream(), который создаёт поток MuxChannelStream, запоминает его в this.streams и подписывается на событие finish - очень удобно, но то, что в коде присутствует логика отправки сообщений - делает её неудобной.
Исправим это, принимая коллбеки onCreated и onFinished в аргументы функции _createChannelStream() для вызова произвольной логики.
_createChannelStream(streamId, { onCreated, onFinished }) {
const channelStream = new MuxChannelStream(this, streamId);
this.streams.set(channelStream.id, channelStream);
if (onCreated) onCreated(channelStream);
channelStream
.on("finish", () => {
this.streams.delete(channelStream.id);
if (onFinished) onFinished(channelStream);
});
return channelStream;
}
В таком случае, код openStream() нужно поменять - передать функции onCreated и onFinished, где мы как раз отправляем сообщения другой стороне.
openStream() {
const streamId = this.streamId++;
return this._createChannelStream(streamId, {
onCreated: channelStream => {
this._writeMessage(Mux.MESSAGE_TYPE_STREAM_OPEN, { id: channelStream.id });
},
onFinished: channelStream => {
this._writeMessage(Mux.MESSAGE_TYPE_STREAM_CLOSE, { id: channelStream.id });
}
});
}
Вернёмся к методу _handleMessage() и обработаем таки Mux.MESSAGE_TYPE_STREAM_OPEN. В дополнительных данных в сообщении приходит id созданного потока другой стороны.
Нужно создать поток функцией _createChannelStream(), но в onCreated вызвать событие stream. В onFinished так же оставить отправку сообщения Mux.MESSAGE_TYPE_STREAM_CLOSE, т.к. поток может закрыться юзером в произвольный момент.
...
case Mux.MESSAGE_TYPE_STREAM_OPEN: {
const streamId = message.id;
this._createChannelStream(streamId, {
onCreated: channelStream => {
this.emit("stream", channelStream);
},
onFinished: channelStream => {
this._writeMessage(Mux.MESSAGE_TYPE_STREAM_CLOSE, { id: channelStream.id });
}
});
break;
}
...
Если type === Mux.MESSAGE_TYPE_STREAM_CLOSE - другая сторона завершила поток MuxChannelStream и мы так же должны закрыть его, вызвав метод end().
В таком случае мы должны отписаться от события finish этого потока, т.к. мы сами его закрываем, и нам не нужно отправлять сообщение о закрытии на другую сторону - оно оттуда и пришло.
Тут небольшая проблема. Подписывались мы указывая анонимные функции, и нигде их не сохраняли. Либо переписывать код с грамотной подпиской/отпиской от событий MuxChannelStream, с сохранением функций-слушателей для каждого потока данных, либо придумать какое-то другое решение (обычно костыльное).
Могу предложить, в случае закрытия потока на другой стороне, помечать его флагом как _closedByOtherSide, и, если в обработчике finish этот флаг стоит, то ничего не делать.
...
case Mux.MESSAGE_TYPE_STREAM_CLOSE: {
const streamId = message.id;
const channelStream = this.streams.get(streamId);
channelStream._closedByOtherSide = true;
channelStream.end();
this.streams.delete(streamId);
break;
}
...
Добавляем проверку в обработчик события finish в создании потока данных _createChannelStream().
...
channelStream
.on("finish", () => {
if (channelStream._closedByOtherSide) return;
this.streams.delete(channelStream.id);
if (onFinished) onFinished(channelStream);
});
Если пришло type === Mux.MESSAGE_TYPE_STREAM_DATA, читаем буфер данных в сообщении и записываем его во внутренний буфер нашего MuxChannelStream методом push().
Помним, что сейчас сообщения наши кодируются как JSON, и буфер с данными мы трактовали как массив чисел (очень неоптимально, но всё впереди), поэтому нужно обратно получить буфер - Buffer.from(message.data).
...
case Mux.MESSAGE_TYPE_STREAM_DATA: {
const streamId = message.id;
const chunk = Buffer.from(message.data);
const channelStream = this.streams.get(streamId);
channelStream.push(chunk);
break;
}
...
Все сообщения отправили, все сообщения приняли - в первоначальном виде классы готовы к использованию, можно перейти к тестированию.
Конечно, есть ещё много, чего добавить - обсудим это далее в разделе "Доработки кода".
const mux1 = new Mux();
const mux2 = new Mux();
// имитируем пересылку данных между двумя сторонами
mux1.pipe(mux2).pipe(mux1);
// на mux2 будем "принимать" созданный поток данных (хотя это можно делать и на mux1)
mux2
.on("stream", mux2Stream => {
console.log(`mux2: stream ${mux2Stream.id} opened`);
mux2Stream
.on("data", data => {
console.log(`mux2: stream ${mux2Stream.id} data: ${data}`);
mux2Stream.write(`PONG FROM mux2 ${mux2Stream.id}`);
mux2Stream.end();
})
.on("finish", () => {
console.log(`mux2: stream ${mux2Stream.id} closed`);
});
});
// юзер создаёт поток данных на mux1
const mux1Stream = mux1.openStream();
console.log(`mux1: stream ${mux1Stream.id} opened`);
mux1Stream
.on("data", data => console.log(`mux1: stream ${mux1Stream.id} data: ${data}`))
.on("finish", () => console.log(`mux1: stream ${mux1Stream.id} closed`));
mux1Stream.write(`PING FROM mux1 ${mux1Stream.id}`);
mux1: stream 0 opened
mux2: stream 0 opened
mux2: stream 0 data: PING FROM mux1 0
mux1: stream 0 data: PONG FROM mux2 0
mux2: stream 0 closed
mux1: stream 0 closed
Сообщения мы для первого приближения и простоты формировали как JSON-объекты, сериализованные в строки.
Конечно, это не самый эффективный способ передачи данных, особенно, когда мы вольны отправлять "сырые" буфера из произвольных байтов данных.
JSON - это текстовый формат представления данных, удобный для чтения/редактирования человеку. Для машины он слишком излишен - зачем передавать всякие символы типа {, }, :, [, ] и так далее, лучше уложить данные максимально сжато в бинарном формате.
Для эффективной пересылки данных (с заранее описанной структурой и без) существуют библиотеки, самая знаменитая - Google protobuf.
Самый эффективный (с точки зрения размера сообщения и работы процессора) способ пересылки сообщения - формировать буфер сообщения вручную.
Например, сначала мы посылали 4 байта int32 - длину сообщения.
Можно поступить более короче.
Посылаем тип сообщения - так как типов сообщений мало (пока только 3) то нам хватает одного байта для этого - значит пишем тип сообщения в этот байт
В зависимости от типа сообщения, посылаем те или иные данные. Например, в случае открытия/закрытия потока - нам нужно переслать его id. Пока что это целое число (хотя с точки зрения может быть любой примитивный тип данных, можно так же использовать строки).
В зависимости от максимального значения id можно отправлять айдишник как int16 - это 2 байта, либо как int32 - 4 байта. Но нужно внимательно следить за айдишниками и придумать логику, что делать, если юзер насоздает больше, чем 16536 потоков - если у нас id пишется как int16. В случае 4х байтового айдишника - пользователю нужно очень постараться, чтобы перебрать этот лимит.
Пример формирования сообщения с чанком данных
...
_writeMessageData(channelStream, chunk) {
// тип сообщения - число, умещающееся в 1 байт
const messageTypeBuffer = Buffer.allocUnsafe(1);
messageTypeBuffer.writeUInt8(Mux.MESSAGE_TYPE_STREAM_DATA);
// пусть id потока - число int16, 2 байта
const channelStreamIdBuffer = Buffer.allocUnsafe(2);
channelStreamIdBuffer.writeInt16BE(channelStream.id);
// длина буфера данных - число int32, 4 байта
const chunkLengthBuffer = Buffer.allocUnsafe(4);
chunkLengthBuffer.writeInt32BE(chunk.length);
this.push(messageTypeBuffer);
this.push(channelStreamIdBuffer);
this.push(chunkLengthBuffer);
// сам чанк данных
this.push(chunk);
}
Чтение сообщения - обратная история. Сначала читаем первый байт данных. Это - тип сообщения. В зависимости от типа сообщения, читаем далее данные по порядку, как мы программировали запись.
Это более замороченный процесс. Каждый раз придется дожидаться this.sizeToRead байт данных - сначала тип сообщения, потом id потока (2 или 4 байта), затем - длина буфера данных (4 байта), и сам буфер данных.
Ручное формирование сообщений и их последующий парсинг - очень муторное занятие. Вполне, этим можно заняться, если есть цель максимально быстро и с минимальной нагрузкой на сеть передавать данные.
В нашу эпоху, так сильно экономить не так актуально.
Да, сериализовать данные как JSON - излишне, а писать код руками - жестко, выберем срединный путь - поиспользуем библиотеку бинарной сериализации данных msgpack.
Для неё не нужно описывать структуру данных, можно сразу сериализовать/десериализовать структуры из примитивных типов данных.
import msgpack from "msgpack5";
const packer = msgpack();
...
// сериализация
const encodedData = packer.encode(data);
// десериализация
const decodedData = packer.decode(encodedData);
По сути, нам нужно JSON.stringify() и JSON.parse() заменить на packer.encode() и packer.decode().
import msgpack from "msgpack5";
const packer = msgpack();
...
_writeMessage(type, data = {}) {
const message = { type, ...data };
const messageBuffer = packer.encode(message);
const messageLengthBuffer = Buffer.allocUnsafe(4);
messageLengthBuffer.writeInt32BE(messageBuffer.length);
this.push(messageLengthBuffer);
this.push(messageBuffer);
}
...
_handleMessage(chunk) {
const message = packer.decode(chunk);
...
Так же уберем хак с конвертированием Buffer в численный Array:
class MuxChannelStream extends stream.Duplex {
...
_write(chunk, encoding, callback) {
this.mux._writeMessage(Mux.MESSAGE_TYPE_STREAM_DATA, { id: this.id, data: chunk });
...
При работе с классами может обнаружиться следующий сценарий:
первая сторона закрыла поток
первая сторона отправила сообщение Mux.MESSAGE_TYPE_STREAM_CLOSE
вторая сторона закрыла поток
вторая сторона отправила сообщение Mux.MESSAGE_TYPE_STREAM_CLOSE
вторая сторона приняла сообщение Mux.MESSAGE_TYPE_STREAM_CLOSE, но потока с этим id уже не существует
Такой сценарий редкий при ручном использовании классов и малом количестве потоков данных. Я его поймал, когда мультиплексировал соединения с браузера (коих много, они постоянно открываются и закрываются) через туннель до прокси-сервера.
Пока шло сообщение о закрытии потока до сервера, на нём самом могло произойти закрытие/обрыв соединения. Учитывая, что потоков было много, прокся постоянно работает, то рано или поздно произойдет такая ошибка.
На уровне кода - здесь, при обращении в мапу по id мы ничего не найдём - поток уже закрылся сам.

_handleMessage(chunk) {
...
case Mux.MESSAGE_TYPE_STREAM_CLOSE: {
const streamId = message.id;
const channelStream = this.streams.get(streamId);
// ошибка null reference - channelStream не найден, т.к. был уже ранее удалён
channelStream._closedByOtherSide = true;
В простейшем случае мы можем добавить проверки на существование channelStream, что не совсем надёжно, можно же маркировать закрытые потоки, проверяя тем самым, что получить сообщение о закрытии уже закрытый поток - это не ошибка.
_handleMessage(chunk) {
...
case Mux.MESSAGE_TYPE_STREAM_CLOSE: {
const streamId = message.id;
const channelStream = this.streams.get(streamId);
if (channelStream) {
channelStream._closedByOtherSide = true;
...
Эту коварную ошибку-ситуацию я обнаружил, когда стал одновременно открывать потоки данных на разных сторонах. Сценарии:
Открываем поток данных на первой стороне. Создаём его внутри класса Mux, ему будет приписан id = 0, записываем в this.streams с ключом 0.
Получаем на второй стороне сообщение Mux.MESSAGE_TYPE_STREAM_OPEN c id = 0, создаём поток, передаём ему id = 0, записываем в this.streams с ключом 0.
Открываем поток данных на второй стороне - this.streamId равнялся 0, поэтому будет создан новый поток с id = 0, хотя в this.streams уже есть ключ 0.
Либо же, пока сообщение Mux.MESSAGE_TYPE_STREAM_OPEN шло с первой стороны, на второй успели создать поток с таким же id = 0.
Решений этой задачи - много. Можно начинать отсчитывать id не с 0, а с некоторого номера, случайного. Тогда на первой стороне, условно, номера потоков начинаются с 15223, а на второй стороне с 98833. Это не совсем надёжно, все же когда-то случится перекрытие номеров.
Полагаю, самый лучший с точки зрения алгоритма - использовать уникальный id для каждого нового потока - например, тот же UUIDv4, который можно сгенерировать путём crypto.randomUUID().
Жертвуем мы - памятью, UUIDv4 занимает 16 байт памяти, против того же int32 c 4 байтами памяти, где 2^32 номеров хватит за глаза.
Я предлагаю генерировать "уникальный" id как двубайтное число для класса Mux и добавлять его в id потока данных, чтобы айдишник выглядел как [id мультиплексора 2 байта]:[id потока - порядковое число от 0].
Вероятность совпадения айдишников сторон - 1/2^16, я считаю это достаточным маловероятным событием.
class Mux extends stream.Duplex {
...
constructor() {
super();
this.id = Math.floor(Math.random() * 2 ** 16);
...
}
...
openStream() {
const streamId = `${this.id}:${this.streamId++}`;
...
}
Если сэмулировать сценарий ошибки то получим примерно следующий вывод:
mux1: stream 13940:0 opened // создание потока данных на первой стороне
mux2: stream 39288:0 opened // создание потока данных на второй стороне
mux2: stream 13940:0 opened // приём сообщения о создании потока с первой стороны
mux1: stream 39288:0 opened // прием сообщения о создании потока со второй стороны
mux2: stream 13940:0 data: PING FROM mux1 13940:0
mux1: stream 13940:0 data: PONG FROM mux2 13940:0
mux1: stream 39288:0 data: PING FROM mux2 39288:0
mux2: stream 39288:0 data: PONG FROM mux1 39288:0
mux2: stream 13940:0 closed
mux1: stream 39288:0 closed
mux1: stream 13940:0 closed
mux2: stream 39288:0 closed
Идеал недостижим, как известно, и код можно полировать и допиливать вечность. На данный момент мультиплексирование работает, при адекватных действиях юзера, алгоритмы разобраны.
Помимо сообщений о создании, закрытии потока данных, и передачи самих данных, можно добавить различные служебные сообщения, рукопожатии с проверками версий, сообщения об ошибках потоков.
Так же, помимо "спокойного закрытия" потока данных методом end() можно так же вызывать метод "аварийного закрытия" с передачей ошибки destroy().
Весь этот и другой функционал можно (и для полноценной работы, даже нужно) добавить в класс Mux. Оставим это любопытному читателю)
В классах нет очень важного момента, при программировании библиотек, да и вообще - обработки ошибок и всяческих исключительных ситуаций.
Ошибки с одинаковыми id потоков данных и закрытием потока, который уже закрыт, можно было предусмотреть заранее, если более щепетильно отнестись к коду, где происходит обращения по ключу в мапу: данных может и не быть.
Так же сами потоки могут выкидывать ошибки (событие error).
Хороший код всегда содержит в себе грамотно построенную обработку ошибок.
import stream from "node:stream";
import msgpack from "msgpack5";
const packer = msgpack();
class MuxChannelStream extends stream.Duplex {
constructor(mux, id) {
super();
this.mux = mux;
this.id = id;
}
_read(size) {
}
_write(chunk, encoding, callback) {
this.mux._writeMessage(Mux.MESSAGE_TYPE_STREAM_DATA, { id: this.id, data: chunk });
callback();
}
}
export default class Mux extends stream.Duplex {
static MESSAGE_TYPE_STREAM_OPEN = 0;
static MESSAGE_TYPE_STREAM_CLOSE = 1;
static MESSAGE_TYPE_STREAM_DATA = 2;
static STATE_READ_MESSAGE_SIZE = 0;
static STATE_READ_MESSAGE_DATA = 1;
constructor() {
super();
this.id = Math.floor(Math.random() * 2 ** 16);
this.streamId = 0; // текущий номер для нового потока
this.streams = new Map();
this.state = Mux.STATE_READ_MESSAGE_SIZE;
this.sizeToRead = 4;
this.chunks = [];
this.chunksTotalSize = 0;
}
_read(size) {
}
_write(chunk, encoding, callback) {
this.chunks.push(chunk);
this.chunksTotalSize += chunk.length;
this._processState();
callback();
}
openStream() {
const streamId = `${this.id}:${this.streamId++}`;
return this._createChannelStream(streamId, {
onCreated: channelStream => {
this._writeMessage(Mux.MESSAGE_TYPE_STREAM_OPEN, { id: channelStream.id });
},
onFinished: channelStream => {
this._writeMessage(Mux.MESSAGE_TYPE_STREAM_CLOSE, { id: channelStream.id });
}
});
}
_createChannelStream(streamId, { onCreated, onFinished }) {
const channelStream = new MuxChannelStream(this, streamId);
this.streams.set(channelStream.id, channelStream);
if (onCreated) onCreated(channelStream);
channelStream
.on("finish", () => {
if (channelStream._closedByOtherSide) return;
this.streams.delete(channelStream.id);
if (onFinished) onFinished(channelStream);
});
return channelStream;
}
_writeMessage(type, data = {}) {
const message = { type, ...data };
const messageBuffer = packer.encode(message);
const messageLengthBuffer = Buffer.allocUnsafe(4);
messageLengthBuffer.writeInt32BE(messageBuffer.length);
this.push(messageLengthBuffer);
this.push(messageBuffer);
}
_processState() {
if (this.chunksTotalSize < this.sizeToRead) return;
let chunkIndex = 0;
let chunksSize = 0;
let chunksToConcat = [];
while (chunksSize < this.sizeToRead) {
chunksSize += this.chunks[chunkIndex].length;
chunkIndex++;
}
// вырезаем первые chunkIndex чанков из массива чанков, их мы будем объединять
chunksToConcat = this.chunks.splice(0, chunkIndex);
if (chunksSize > this.sizeToRead) {
// перебрали с длиной - последний чанк в chunksToConcat нужно разрезать
const chunkToSlice = chunksToConcat.pop();
const sliceLength = chunksSize - this.sizeToRead;
const firstPart = chunkToSlice.slice(0, sliceLength);
const secondPart = chunkToSlice.slice(sliceLength);
// первая часть идёт в chunksToConcat в конец
chunksToConcat.push(firstPart);
// вторая остаётся в chunks в начале (просто заменяем разрезанный chunk на его 2ю часть)
this.chunks.unshift(secondPart);
}
const chunk = Buffer.concat(chunksToConcat);
// обновляем this.chunksTotalSize
this.chunksTotalSize -= chunk.length;
switch (this.state) {
case Mux.STATE_READ_MESSAGE_SIZE:
// читаем длину сообщения - столько байт нам нужно будет дождаться
this.sizeToRead = chunk.readInt32BE(0);
// меняем стейт на чтение сообщения
this.state = Mux.STATE_READ_MESSAGE_DATA;
break;
case Mux.STATE_READ_MESSAGE_DATA:
// обрабатываем сообщение
this._handleMessage(chunk);
// меняем стейт снова на чтение длины сообщения
this.sizeToRead = 4;
this.state = Mux.STATE_READ_MESSAGE_SIZE;
break;
}
// если есть ещё чанки - запустим снова обработку стейта
if (this.chunksTotalSize > 0) this._processState();
}
_handleMessage(chunk) {
const message = packer.decode(chunk);
switch (message.type) {
case Mux.MESSAGE_TYPE_STREAM_OPEN: {
const streamId = message.id;
this._createChannelStream(streamId, {
onCreated: channelStream => {
this.emit("stream", channelStream);
},
onFinished: channelStream => {
this._writeMessage(Mux.MESSAGE_TYPE_STREAM_CLOSE, { id: channelStream.id });
}
});
break;
}
case Mux.MESSAGE_TYPE_STREAM_CLOSE: {
const streamId = message.id;
const channelStream = this.streams.get(streamId);
channelStream._closedByOtherSide = true;
channelStream.end();
this.streams.delete(streamId);
break;
}
case Mux.MESSAGE_TYPE_STREAM_DATA: {
const streamId = message.id;
const chunk = message.data;
const channelStream = this.streams.get(streamId);
channelStream.push(chunk);
break;
}
}
}
}
import mux from "./Mux.js";
function createMux(name) {
const muxStream = new mux();
muxStream
.on("stream", stream => {
console.log(`${name}: stream ${stream.id} opened`);
stream
.on("data", data => {
console.log(`${name}: stream ${stream.id} data: ${data}`);
stream.write(`PONG FROM ${name} ${stream.id}`);
stream.end();
})
.on("finish", () => {
console.log(`${name}: stream ${stream.id} closed`);
});
});
return muxStream;
}
function testMux(muxStream, name) {
for (let i = 0; i < 1; i++) {
const stream = muxStream.openStream();
console.log(`${name}: stream ${stream.id} opened`);
stream
.on("data", data => console.log(`${name}: stream ${stream.id} data: ${data}`))
.on("finish", () => console.log(`${name}: stream ${stream.id} closed`));
stream.write(`PING FROM ${name} ${stream.id}`);
}
}
const mux1 = createMux("mux1");
const mux2 = createMux("mux2");
mux1
.pipe(mux2)
.pipe(mux1);
testMux(mux1, "mux1");
testMux(mux2, "mux2");
Код моего мультиплексора данных (как раз с доработками) доступен в npm и на GitHub.
Мы разобрали задачу мультиплексирования потоков данных и запрограммировали простой класс Mux на языке JavaScript.
В реализации присутствовали всего 3 вида сообщений: открытие потока данных, закрытие потока данных и передача части самих данных.
Рассмотрели пошагово реализацию класса Mux, логику отправки и приёма сообщений, разобрали некоторые исключительные ситуации, приводимые к ошибкам. Рассмотрели возможные расширения логики класса Mux.
Разобрались с уникальными id для потоков данных, написали простенькую стейт-машину для считывания входящих сообщений, изучили внутренние методы и устойство потоков данных Node.js Streams.
Задача мультиплексирования - весьма обширная. Она находит своё место в телекоммуникациях, программировании (операционные системы, процессы, и всё, где есть передача информации и ограничения), разделы физики, имеющие дело со связью.
Я очень рад, что реализую свой потенциал в преподавании, и сегодня - в виде статьи!
Удачи в программировании!