Ментальная модель потоков в Node.js
- четверг, 21 ноября 2024 г. в 00:00:02
Приходилось ли вам работать с потоками в Node.js?
Когда я впервые столкнулся с потоками, я, мягко говоря, оказался в растерянности. Эта концепция была совершенно новой для меня. Я полагал, что смогу обойтись и без них, но вскоре понял, что в Node.js они повсюду. Даже такие ключевые модули, как fs
и http
, используют потоки "под капотом". Поэтому мне пришлось погрузиться в изучение этой темы и разобраться в том, как потоки работают.
В этом мне особенно помогло создание устойчивой ментальной модели, основанной на нескольких ключевых концепциях. В этой статье мы подробно рассмотрим эти концепции и сформируем ментальную модель потоков в Node.js.
Главная идея потоков заключается в том, что они переносят части данных из одного места в другое. На основе этого определения можно выделить четыре важных аспекта:
Одной из распространенных аналогий для объяснения потоков является труба. Тем не менее, такая аналогия часто не учитывает два основных элемента потока: производителя (producer) и потребителя (consumer). Рассмотрим эту аналогию более подробно.
Представьте большой водоем, рядом с которым находится ваш дом. Чтобы обеспечить водоснабжение дома, нужно проложить трубу от водоема к дому.
Я не водопроводчик, так что не воспринимайте этот рисунок слишком буквально.
Эта аналогия иллюстрирует три ключевых элемента потока:
Теперь вернемся к потокам в Node.js. Сопоставим аналогию трубы с тем, как ведут себя потоки:
Довольно похоже на потоки в Node.js, не так ли?
Прежде чем погружаться в детали того, что такое потоки и как они работают, выясним, в каких случаях они применяются.
Потоки отлично подходят для работы с данными, которые поступают частями или генерируются постепенно. Они особенно эффективны, когда данные создаются последовательно или принимаются порциями.
Хорошим примером такого подхода является протокол WebSocket. Этот протокол позволяет устанавливать двустороннюю связь между клиентом и сервером.
В следующих статьях мы более подробно рассмотрим данный протокол, используя библиотеку WS в качестве примера, так как она активно применяет механизмы потоков. Вот пример, где абстракция под названием Sender
реализует механизм обратного давления (backpressure).
Об обратном давлении мы поговорим в следующем разделе. И это лишь один из множества возможных примеров. Рекомендую вам изучить библиотеку и ознакомиться с другими сценариями.
Каждый раз при создании сервера с помощью API Node.js мы фактически создаем дуплексный (duplex) поток. HTTP-модуль в Node.js использует абстракцию под названием Socket
для установления соединения с сетевым сокетом. Эта абстракция Socket
является расширением потока Duplex
.
ObjectSetPrototypeOf(Socket.prototype, stream.Duplex.prototype);
ObjectSetPrototypeOf(Socket, stream.Duplex);
Когда мы встречаем подобную конструкцию:
import { createServer } from 'http';
const server = createServer();
Следует помнить, что на уровне реализации мы создаем дуплексный поток.
Представьте, что у нас есть файл размером 100 ГБ, и нам необходимо его разобрать и обработать определенные данные. Как бы вы поступили в этой ситуации?
Если попробовать прочитать файл с помощью таких методов, как readFileSync
или readFile
, программа даст сбой и выйдет из строя:
import { readFileSync, readFile } from 'fs';
const largeFilePath = 'path/to/large/file.txt';
// Оба метода приводят к сбою программы
const data = readFileSync(largeFilePath);
const asyncData = await readFile(largeFilePath);
Проблема в том, что загрузка всего содержимого файла в память с помощью этих методов, абсолютно неэффективна. Вместо этого, лучше обрабатывать файл по частям, что позволит значительно снизить нагрузку на систему:
import { createReadStream } from 'fs';
const largeFilePath = 'path/to/large/file.txt';
const stream = createReadStream(largeFilePath);
stream.on('data', (chunk) => {
// Обрабатываем часть данных
});
Применяя такой подход, нам не нужно ждать полной загрузки файла в память. Как только очередной фрагмент данных готов, мы приступаем к его обработке.
Все предыдущие примеры касались ситуаций, когда мы либо считываем данные из источника, либо записываем их куда-то. Однако потоки также могут использоваться для преобразования данных, уже находящихся в памяти.
Наиболее наглядным примером является сжатие и распаковка данных. Рассмотрим пример из документации модуля zlib:
async function do_gzip(input, output) {
const gzip = createGzip();
// Создаем поток чтения (read stream) данных из входного источника
const source = createReadStream(input);
// Создаем поток записи (write stream) данных в выходной источник
const destination = createWriteStream(output);
// Соединяем входящий поток с потоком gzip,
// затем направляем его в выходной поток
await pipe(source, gzip, destination); }
}
В этом фрагменте кода мы создаем поток чтения, и каждый раз, когда поступают данные из этого потока, они передаются в поток gzip для сжатия. После сжатия данные направляются в поток записи.
На этом этапе не обязательно полностью разбираться в том, как работает код, важно лишь понимать, что потоки могут использоваться для преобразования различных типов данных.
Если данные, с которыми вы работаете, уже находятся в памяти, применение потоков не принесет ощутимых преимуществ.
Поэтому старайтесь избегать использования потоков, когда все необходимые данные уже загружены в память. Не усложняйте себе жизнь.
Теперь, когда мы разобрались, что такое потоки, когда их следует использовать, а когда избегать, пришло время более подробно изучить ключевые аспекты работы с потоками в Node.js.
Мы помним, что работа потоков аналогична работе труб. Но что именно позволяет им функционировать подобным образом? Все дело в событийно-ориентированных (event-driven) концепциях, на которых основаны потоки. Если быть точнее, все потоки в Node.js являются расширениями класса EventEmitter
.
Принцип работы класса EventEmitter
очень прост. Он содержит внутреннее состояние, в котором хранятся все события и их обработчики.
class EventEmitter {
// Система событий и их обработчиков.
// Одно событие может иметь несколько обработчиков
#events = new Map<string, (() => void)[]>();
// Регистрируем новый обработчик для события
on(eventName: string, callback: () => void) {
if (!this.#events.has(eventName)) {
this.#events.set(eventName, [callback]);
}
this.#events.get(eventName).push(callback);
}
// Вызываются все обработчики, связанные с событием
emit(eventName: string) {
const listeners = this.#events.get(eventName);
if (!listeners) {
return;
}
listeners.forEach((listener) => listener());
}
}
Это сильно упрощенная версия, но она даёт общее представление о том, как работает EventEmitter
. Полную реализацию можно найти в исходном коде Node.js.
Работая с потоками, можно добавить обработчик для заранее определенного набора событий.
stream.on('data', () => {});
В этом примере мы добавляем обработчик к событию data
. Каждый раз, когда готова новая порция данных, поток вызывает метод emit
с названием события data
, в результате чего срабатывают все зарегистрированные обработчики.
Именно этот механизм позволяет потокам работать подобно трубам, обеспечивая передачу данных из одного конца в другой.
Потоки позволяют эффективно обрабатывать большие наборы данных. Однако есть один нюанс: что произойдет, если скорость поступления данных окажется такой высокой, что в какой-то момент объем обрабатываемой информации превысит пределы доступной памяти? В таком случае программа даст сбой.
Это означает, что простой абстракции потока недостаточно для предотвращения подобных ситуаций. Для этого в потоках предусмотрен механизм обратного давления.
Хотя термин "обратное давление" может показаться сложным, его суть довольно проста. Основная идея заключается в том, что существует определенный лимит на количество данных, которые можно обработать за один раз.
Вернемся к примеру чтения больших файлов. Нас интересуют две основные составляющие этого процесса: производитель данных и потребитель данных. Производителем данных выступает механизм операционной системы, который считывает файл и генерирует данные.
Если производитель начинает передавать слишком много данных, поток может сигнализировать ему о необходимости замедлиться, так как в данный момент он не может принять больше данных. Но как поток понимает, что места для хранения информации больше нет?
У каждого потока есть внутренний буфер (buffer), и в момент, когда новые данные поступают, а старые выходят, активируется механизм "буферизации" (buffering).
У каждого потока есть внутренний буфер. При работе с API, который поддерживает механизм обратного давления, этот буфер используется для хранения данных, поступающих в поток.
Если данные поступают в поток, но не выводятся из него, буфер постепенно заполняется до тех пор, пока не достигнет установленного предела. Этот предел задается свойством highWaterMark
, определенным для каждого потока.
Пример того, как можно задать значение свойства highWaterMark
при чтении файла:
import { createReadStream } from 'node:fs';
const filePath = 'path/to/file.txt';
const writeStream = createReadStream(filePath, { highWaterMark: 1024 });
По умолчанию значение highWaterMark
для функции createReadStream
составляет 64 КБ. Когда во внутреннем буфере освобождается место, поток может начать считывание дополнительных данных из источника.
В приложениях Node.js часто возникает необходимость преобразовывать данные, поступающие из потока, или передавать их в другое место. В таких случаях особенно удобна концепция, известная как конвейер (pipeline).
Можно создать цепочку потоков, где один поток подключен к другому. В таком случае, когда данные попадают в первый поток, они проходят через всю цепь связанных потоков. Если вы знакомы с реактивным программированием и такими библиотеками, как RxJS, то эта концепция не будет для вас новой.
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { pipeline } from 'node:stream';
const source = createReadStream('path/to/file.txt');
const destination = createWriteStream('path/to/file.txt.gz');
const gzip = createGzip();
await pipeline(source, gzip, destination);
В этом примере входящий поток активирует всю цепочку обработки данных. Процесс осуществляется следующим образом:
source
) поток читает данные из файла.gzip
.gzip
выполняет сжатие данных.destination
) поток.Каждый этап цепочки оснащен имеет собственный внутренний буфер и механизм обратного давления. Это значит, что если поток gzip
не может обработать данные, поступающие от входящего потока, он может дать сигнал первому потоку замедлить скорость передачи. Аналогично ведет себя и выходной поток.
Потоки являются основой любого приложения Node.js, независимо от того, используете вы их напрямую или нет. Это одна из самых мощных возможностей, доступных в Node.js. Потоки находят широкое применение в самых разных сферах — от сетевых взаимодействий до обработки файлов.
Они особенно полезны, когда необходимо работать с большими объемами данных или в режиме реального времени. Основные принципы работы потоков основаны на следующих концепциях:
Понимание этих концепций и четкое представление о том, как функционируют потоки на концептуальном уровне, позволят вам создавать более эффективные приложения на Node.js.
Новости, обзоры продуктов и конкурсы от команды Timeweb.Cloud — в нашем Telegram-канале ↩