Как в Node.js контролировать потребление памяти при обработке сетевых запросов
- среда, 13 марта 2024 г. в 00:00:12
Всем привет! Я Виктор Кугай, руководитель команды разработки спецпроектов в Тинькофф. Мы создаем геймификационные проекты, основанные на данных, чтобы познакомить пользователей с экосистемой компании и повысить узнаваемость бренда.
Расскажу, как с помощью Node.js Streams и механизма Back Pressure протокола TCP реализовать пакетную обработку сотен гигабайтов данных на машинах с жестким лимитом памяти.
Спецпроекты помогают пользователям в игровой форме вовлечься в использование экосистемы компании. Например, пользователь переводит в благотворительный фонд X рублей в месяц или выполняет какое-то другое действие в приложении Тинькофф и получает за это экстрабонусы.
Для долгосрочных спецпроектов отлично подходит потоковая обработка данных с помощью Apache Kafka. Например, для реализации игровых механик наша команда обрабатывает более 150 000 000 событий в сутки суммарным объемом свыше 250 ГБ.
Для краткосрочных спецпроектов — сроком жизни несколько месяцев — создание новой потоковой интеграции сопряжено с рисками. Например, настройка интеграции потребует больше ресурсов и времени, чем мы ожидали, и мы не успеем подготовить надежную интеграцию к старту проекта.
В этом случае на помощь приходит сценарий пакетной обработки данных:
Платформа для совместной обработки данных Helicopter по расписанию инициирует агрегацию данных в Data Warehouse.
Процесс отправляет CSV-файл с составным содержимым формата multipart/form-data по HTTP.
Node.js-сервис валидирует входящие данные и сохраняет в PostgreSQL базу данных спецпроекта.
Мы используем формат multipart/form-data для передачи составных данных по сети, потому что во многих языках программирования есть его поддержка, а реализация не меняется в зависимости от языка.
Например, в JavaScript для загрузки составных данных используется объект FormData. В браузере отправить составные данные по сети можно с помощью объекта XMLHttpRequest и Fetch API, а в Node.js — с помощью модуля http/https, который используется в библиотеке axios, или модуля net в библиотеке undici.
const body = new FormData();
body.append('file', file);
fetch(url, {method: 'POST', body});
В python поддержка multipart/form-data встроена в модуль requests и дополнительных преобразований не требуется, поэтому multipart/form-data удобно использовать при написании скриптов переливки данных.
import requests
requests.post(
url,
files={'file', file}
)
Загружать составные данные несложно, но что произойдет, если игнорировать контроль потока при обработке данных? Потребление памяти превысит лимиты, и приложение упадет вот с такой ошибкой OOMKilled:
$ docker inspect --format='{{json .State}}' <container>
{
"Status": "exited",
"Running": false,
"Paused": false,
"Restarting": false,
"OOMKilled": true,
"Dead": false,
"Pid": 0,
"ExitCode": 137,
"Error": "",
"StartedAt": "2024-03-11T15:30:38.222462386Z",
"FinishedAt": "2024-03-11T15:31:24.086041716Z"
}
Пример ручного контроля потока с помощью подписки на событие data:
const HIGH_WATER_MARK = 1_048_576; // ~1MB
let buffer = Buffer.from([]);
readable.on('data', (data) => {
buffer = Buffer.concat([buffer, data]);
if (buffer.byteLength > HIGH_WATER_MARK) {
readable.pause();
// ...
// === Обработать данные ===
// ...
buffer = Buffer.from([]);
readable.resume();
}
});
В Node.js v10 добавили поддержку асинхронной обработки данных с помощью цикла for await … of:
for await (const data of readable) {
// ...
// === Обработать данные ===
// ...
}
Верхнеуровневый алгоритм обработки данных с помощью цикла for await … of такой:
Пакеты данных аккумулируются в буфере.
Общий размер данных в буфере достигает лимита highWaterMark.
Потребление данных приостанавливается.
Все непрочитанные пакеты объединяются.
Данные поступают в цикл for await … of.
Данные обрабатываются в теле цикла.
Буфер освобождается.
Потребление данных продолжается.
В случае с асинхронными итераторами данные сначала накапливаются в буфере до порогового значения highWaterMark. Затем потребление данных из потока приостанавливается, и только после завершения обработки оно продолжается. Ручное управление потоком с помощью readable.pause() и readable.resume() игнорируется.
В случае с подпиской на событие data за факт обработки данных принимается вызов функции-подписчика, поэтому полезная работа выполняется конкурентно, что может привести к переполнению памяти. Для контроля переполнения памяти нужно вручную ставить обработку данных на паузу с помощью readable.pause() и readable.resume().
Параметр highWaterMark конфигурирует пороговое значение буфера для всех типов стримов в Node.js. Для Duplex- и Transform-стримов предусмотрено два независимых буфера. Значение highWaterMark не является строгим лимитом памяти и носит рекомендательный характер, то есть реальная утилизация памяти буфером может существенно превышать значение highWaterMark.
Параметр highWaterMark имеет два режима конфигурации: в байтах и в символах.
В байтах параметр конфигурируется по умолчанию, а в символах — при явном указании кодировки стрима с помощью stream.setEncoding(...). Один символ обычно занимает один байт (latin1, ascii), но может занимать несколько байт, поэтому конфигурация highWaterMark в символах не эквивалентна конфигурации в байтах.
В документации Node.js конфигурация setEncodingупоминается в самом последнем абзаце — его легко пропустить.
При подписке на событие data обработчик вызывается каждый раз при чтении следующего TCP-пакета. Параметр highWaterMark не влияет на размер TCP-пакета, поэтому использовать подписку на событие data не очень удобно. Во-первых, необходимо вручную контролировать поток с помощью readable.pause() и readable.resume(). Во-вторых, данные обрабатываются слишком часто. Например, пакетами по 64 килобайта, потому что это максимальный размер пакета в протоколе IPv4.
Обработка данных с помощью асинхронных итераторов буферизует TCP-пакеты до порогового значения highWaterMark. Затем пакеты объединяются в один чанк и отправляются в тело цикла. Размер чанка в цикле зависит от значения highWaterMark, поэтому можно управлять размерами чанков более гибко. Важно помнить, что highWaterMark — это не жесткий лимит, а всего лишь пороговое значение, которое может быть превышено или проигнорировано.
Node.js использует возможности библиотеки libuv для контроля сетевых соединений, которая с помощью механизма окон протокола TCP регулирует пропускную способность потока.
Как только размер данных в буфере превышает пороговое значение highWaterMark, библиотека libuv уменьшает размер TCP-окна до нуля и тем самым сигнализирует, что отправку данных нужно приостановить.
Параметры TCP-сессии, которые имеют ключевое значение для управления потоком:
Max Segment Size — максимальный размер сегмента для передачи по сети. Нужен, чтобы отправитель и получатель не передавали слишком большие пакеты друг другу.
Window Scale — коэффициент для расчета актуального размера окна. Нужен, чтобы экспоненциально увеличивать размер окна.
Window — размер окна. Нужен, чтобы подсказывать отправителю количество данных, которые потребитель может обработать.
Верхнеуровневый алгоритм управления потоком с помощью окон в протоколе TCP выглядит так:
Клиент и сервер обмениваются параметрами для установки TCP-соединения.
Клиент передает на сервер пакеты размером не больше Max Segment Size.
Сервер буферизует пакеты, подтверждает их получение и уведомляет об уменьшении окна.
Клиент приостанавливает отправку пакетов, когда окно становится недостаточным для отправки следующего пакета.
Сервер обрабатывает пакеты из буфера и уведомляет клиента об увеличении окна.
Клиент продолжает отправку данных.
Если клиент игнорирует размер окна и продолжает отправлять пакеты, лишние пакеты будут отброшены.
Рассмотрим пример отправки потоковых данных на сервер с контролем переполнения памяти и использованием утилиты tcpdump для логирования TCP-пакетов. Будем использовать только встроенные в Node.js модули node:stream и node:http, чтобы исключить влияние фреймворков на Readable-стрим.
1. Реализуем исходящий поток данных. Создаем Readable Stream, который генерирует данные чанками по 5 МБ, и отправляем с помощью функции request из модуля node:http.
/* client.js */
const {request} = require('node:http');
const {Readable} = require('node:stream');
const STREAM_SIZE_IN_BYTES = 104_857_600; // ~100MB
const STREAM_CHUNK_SIZE_IN_BYTES = 5_242_880; // ~5MB
main().catch((error) => {
console.log(error);
process.exit(1);
});
async function main() {
await new Promise((resolve) => {
let counter = 0;
const readableStream = new Readable({
read() {
if (counter < STREAM_SIZE_IN_BYTES) {
this.push(Buffer.alloc(STREAM_SIZE_IN_BYTES));
counter += STREAM_CHUNK_SIZE_IN_BYTES;
} else {
this.push(null);
}
},
});
const options = {
path: '/',
method: 'POST',
protocol: 'http:',
hostname: 'localhost',
port: '3000',
};
const req = request(options, (res) => {
res.on('end', () => {
resolve(null);
});
});
readableStream.pipe(req);
});
}
2. Реализуем обработку входящего потока. HighWaterMark указываем 1 МБ, чтобы убедиться, что управление потоком работает. Каждая итерация цикла использует задержку, чтобы имитировать сложную работу, которая выполняется в теле цикла.
/* server.js */
const {createServer} = require('node:http');
const {PassThrough} = require('node:stream');
const HIGH_WATER_MARK = 1_048_576; // ~1MB
const MAX_DELAY_IN_MS = 5_000;
const MIN_DELAY_IN_MS = 3_000;
main().catch((error) => {
console.log(error);
});
async function main() {
await new Promise((resolve) => {
const server = createServer(async (req) => {
const passThrough = new PassThrough({
highWaterMark: HIGH_WATER_MARK,
});
req.pipe(passThrough);
for await (const chunk of passThrough) {
console.log(`Обработано ${chunk.toString().length} байт`);
const randomTimeoutMs =
Math.random() * (MAX_DELAY_IN_MS - MIN_DELAY_IN_MS) + MIN_DELAY_IN_MS;
await new Promise((resolve) => setTimeout(resolve, randomTimeoutMs));
}
});
server.listen(3000, () => {
console.log(`Сервер успешно запущен`);
resolve(null);
});
});
}
3. Собираем Docker-контейнер и конфигурируем ресурсы в файле Docker Compose для лимитирования памяти. Лимит памяти ставим 50 МБ, из которых ~20 МБ закрепляются за Node.js.
FROM node:18.16-alpine3.18
COPY build ./build
EXPOSE 3000
CMD ["node", "./build/server.js"]
version: '3'
name: node-streams
services:
server:
build:
dockerfile: ./Dockerfile
ports:
- 3000:3000
deploy:
resources:
limits:
cpus: '0.5'
memory: 50M
reservations:
cpus: '0.5'
memory: 50M
4. Запускаем контейнер и отправляем данные. Видим, что сервер обрабатывает чанки размером примерно 1 МБ. Так происходит, потому что highWaterMark не лимит, а всего лишь пороговое значение.
node-streams-server-1 | Сервер успешно запущен
node-streams-server-1 | Обработано 32671 байт
node-streams-server-1 | Обработано 1081344 байт
node-streams-server-1 | Обработано 1114112 байт
node-streams-server-1 | Обработано 1114112 байт
node-streams-server-1 | Обработано 1114112 байт
5. Запускаем утилиту tcpdump, чтобы подробно отследить процесс управления потоком на уровне протокола TCP. Параметр -i — это интерфейс localhost, port — порт, который слушает утилита.
$ sudo tcpdump -i lo0 port 3000
6. Запускаем пример повторно. Сначала стороны обмениваются параметрами для установки TCP-сессии. Max Message Size (mss) — 16,324 байта, коэффициент Window Scale (wscale) — 6.
15:47:10.004647 IP localhost.54892 > localhost.hbci: Flags [S], seq 2664759488, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 792746651 ecr 0,sackOK,eol], length 0
15:47:10.005743 IP localhost.hbci > localhost.54892: Flags [S.], seq 4116912290, ack 2664759489, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 840270532 ecr 792746651,sackOK,eol], length 0
7. Клиент отправляет данные пакетами по 16332 байта, а получатель подтверждает получение пакетов. Номер последовательности (seq) указывает на диапазон данных, например 1:16313 и далее 16313:32625. Получатель подтверждает, что пакеты получены, например ack: 16313. Размер окна при этом постоянно уменьшается.
15:47:10.006046 IP localhost.54892 > localhost.hbci: Flags [.], seq 1:16333, ack 1, win 6379, options [nop,nop,TS val 792746653 ecr 840270534], length 16332
15:47:10.006047 IP localhost.54892 > localhost.hbci: Flags [.], seq 16333:32665, ack 1, win 6379, options [nop,nop,TS val 792746653 ecr 840270534], length 16332
15:47:10.006087 IP localhost.hbci > localhost.54892: Flags [.], ack 16333, win 6124, options [nop,nop,TS val 840270534 ecr 792746653], length 0
15:47:10.006099 IP localhost.hbci > localhost.54892: Flags [.], ack 32665, win 5869, options [nop,nop,TS val 840270534 ecr 792746653], length 0
8. Так продолжается, пока размер окна не становится слишком маленьким для отправки следующего пакета (win: 255). Отправитель приостанавливает отправку и ждет, когда размер окна восстановится. Значение параметра win — это не значение в байтах. Чуть ниже расскажу, как рассчитать актуальный размер окна.
15:47:10.031553 IP localhost.hbci > localhost.54892: Flags [.], ack 1159573, win 510, options [nop,nop,TS val 840270559 ecr 792746678], length 0
15:47:10.031567 IP localhost.hbci > localhost.54892: Flags [.], ack 1175905, win 255, options [nop,nop,TS val 840270559 ecr 792746678], length 0
9. Через три секунды приходит подтверждение, что размер окна восстановлен и можно продолжить отправку данных. Отправка продолжается.
15:47:10.031567 IP localhost.hbci > localhost.54892: Flags [.], ack 1175905, win 255, options [nop,nop,TS val 840270559 ecr 792746678], length 0
15:47:13.033733 IP localhost.hbci > localhost.54892: Flags [.], ack 1175905, win 4338, options [nop,nop,TS val 840270561 ecr 792746678], length 0
15:47:13.033850 IP localhost.54892 > localhost.hbci: Flags [.], seq 1175905:1192237, ack 1, win 6379, options [nop,nop,TS val 792746680 ecr 840270561], length 16332
15:47:13.033853 IP localhost.54892 > localhost.hbci: Flags [.], seq 1192237:1208569, ack 1, win 6379, options [nop,nop,TS val 792746680 ecr 840270561], length 16332
Чтобы получить размер актуального окна, нужно к значению параметра win применить формулу расчета актуального размера окна. Когда создавали протокол TCP, разработчики решили, что размер окна не должен превышать 65,535 байта. Для размера окна выделили параметр Window в заголовке сообщения. Значение параметра не должно превышать 16 бит (2^16 === 65,535).
В конце 90-х стало очевидно, что текущего размера окна недостаточно, поэтому в стандарт TCP добавили атрибут Window Scale Option, чтобы не ломать обратную совместимость протокола. При установке соединения хосты обмениваются параметрами Window Scale и рассчитывают актуальный размер окна согласно формуле Window Size * (2 ^ Window Scale), таким образом максимальный размер окна увеличился с 0,06 МБ (65,535 байта) до ~1 ГБ и не потребовалось менять размер заголовка Window.
В Node.js при обработке большого количества данных, которые передаются по сети, можно и нужно контролировать объем потребляемой памяти. Для этого при обработке Readable Stream необходимо использовать цикл for await … of. Можно использовать подписку на событие data, но придется вручную ставить поток на паузу с помощью readable.pause().
Под капотом Node.js использует механизм Back Pressure, основанный на механизме окон протокола TCP, а библиотека libuv управляет состоянием TCP-соединения. Размер окна зависит от свободного места в буфере. Если размер данных в буфере превысил значение highWaterMark, то потребление данных приостанавливается и продолжается, когда буфер очищается.
В начале я описывал интеграцию для передачи составных данных с помощью HTTP. Благодаря простому и удобному механизму для контроля потока в Node.js наша команда надежно обрабатывает сотни гигабайтов данных на машинах с лимитом памяти в 256 МБ. Главное — использовать цикл for await … of для обработки данных и корректно сконфигурировать highWaterMark в соответствии с выделенными ресурсами.
Если вы повышали надежность обработки данных в Node.js — делитесь своими практиками в комментариях!