Понимание событийной архитектуры Node.js
- среда, 7 июня 2017 г. в 03:15:19
Большинство Node-объектов — вроде HTTP-запросов, ответов и потоков (streams) — реализуют модуль EventEmitter
, благодаря которому они могут генерировать и прослушивать события.
const EventEmitter = require('events')
Простейшая форма управления по событиям — это callback-стиль некоторых популярных Node.js-функций, к примеру fs.readFile
. По этой аналогии событие генерируется однократно (когда Node готов к вызову коллбэка), а коллбэк действует как обработчик события. Давайте сначала разберём эту базовую форму событийно-управляемой архитектуры.
Изначально Node обрабатывал асинхронные события с помощью коллбэков. Это было давно, ещё до того как в JavaScript появилась нативная поддержка промисов и фича async/await. Коллбэки — это просто функции, которые вы передаёте другим функциям. Такое возможно в JavaScript, потому что функции — это объекты первого класса.
Важно понимать, что коллбэки не индикаторы асинхронного вызова в коде. Функция может вызывать коллбэк как синхронно, так и асинхронно. Например, хост-функция fileSize
принимает коллбэк-функцию cb
, причём вызывает её синхронно или асинхронно в зависимости от условия:
function fileSize (fileName, cb) {
if (typeof fileName !== 'string') {
return cb(new TypeError('argument should be string')); // Sync
}
fs.stat(fileName, (err, stats) => {
if (err) { return cb(err); } // Async
cb(null, stats.size); // Async
});
}
Это плохой подход, приводящий к неожиданным ошибкам. Создавайте такие хост-функции, которые принимают коллбэки либо всегда синхронно, либо всегда асинхронно.
Давайте разберём простой пример типичной асинхронной Node-функции, написанной в коллбэк-стиле:
const readFileAsArray = function(file, cb) {
fs.readFile(file, function(err, data) {
if (err) {
return cb(err);
}
const lines = data.toString().trim().split('\n');
cb(null, lines);
});
};
readFileAsArray
берёт путь файла и коллбэк-функцию. Считывает содержимое файла, разбивает на массив строк и вызывает применительно к этому массиву коллбэк-функцию. Вот как это можно использовать. Допустим, файл numbers.txt
лежит в одной директории с таким контентом:
10
11
12
13
14
15
Если у нас есть задача посчитать числа в этом файле, то для упрощения кода можно воспользоваться readFileAsArray
:
readFileAsArray('./numbers.txt', (err, lines) => {
if (err) throw err;
const numbers = lines.map(Number);
const oddNumbers = numbers.filter(n => n%2 === 1);
console.log('Odd numbers count:', oddNumbers.length);
});
Этот код читает в массиве строк числовой контент, парсит его как числа и выполняет подсчёт.
Здесь работает характерный для Node коллбэк-стиль. У коллбэка есть error-first-аргумент err
, который может принимать значение null. Мы передаём этот коллбэк в качестве последнего аргумента хост-функции. Всегда делайте так в своих функциях, потому что пользователи наверняка будут на это рассчитывать. Пусть ваша хост-функция получает коллбэк в виде последнего аргумента, и пусть коллбэк ожидает в качестве своего первого аргумента error-объект.
В современном JavaScript есть такие объекты, как промисы. Они могут быть альтернативой коллбэкам в случае асинхронных API. Вместо передачи коллбэка в качестве аргумента и обработки ошибки в том же месте промис позволяет отдельно обрабатывать успешные и ошибочные ситуации, а также соединять несколько асинхронных вызовов в цепочки, а не делать их вложенными.
Если функция readFileAsArray
поддерживает промисы, то мы можем использовать её следующим образом:
readFileAsArray('./numbers.txt')
.then(lines => {
const numbers = lines.map(Number);
const oddNumbers = numbers.filter(n => n%2 === 1);
console.log('Odd numbers count:', oddNumbers.length);
})
.catch(console.error);
Вместо передачи коллбэка мы вызываем функцию .then
применительно к возвращаемому значению хост-функции. Обычно .then
даёт нам доступ к тем же строкам массива, которые мы получаем в коллбэк-версии, поэтому можем работать как раньше. Для обработки ошибок добавим вызов .catch
применительно к результату, что обеспечит нам доступ к ошибке, если она возникнет.
Благодаря новому объекту Promise в современном JavaScript стало легче реализовать поддержку промис-интерфейса хост-функцией. Вот функция readFileAsArray
, модифицированная так, чтобы она поддерживала промис-интерфейс в дополнение к уже поддерживаемому коллбэк-интерфейсу:
const readFileAsArray = function(file, cb = () => {}) {
return new Promise((resolve, reject) => {
fs.readFile(file, function(err, data) {
if (err) {
reject(err);
return cb(err);
}
const lines = data.toString().trim().split('\n');
resolve(lines);
cb(null, lines);
});
});
};
Функция возвращает объект Promise, в который обёртывается асинхронный вызов fs.readFile
. У промиса два аргумента: функции resolve
и reject
. Если нам нужно вызвать коллбэк с ошибкой, то используем промис-функцию reject
, а для коллбэка с данными — промис-функцию resolve
.
Единственное отличие заключается в том, что нам нужно иметь значение по умолчанию для коллбэк-аргумента на тот случай, если код используется с промис-интерфейсом. Например, в качестве аргумента можно использовать простую, по умолчанию пустую функцию () => {}
.
Добавление промис-интерфейса позволяет гораздо легче работать с вашим кодом, если нужно использовать асинхронную функцию в цикле. С коллбэками ситуация усложняется. Промисы немного улучшают положение дел, как и генератор функций. Иными словами, более свежая альтернатива для работы с асинхронным кодом — функция async
. Она позволяет обращаться с асинхронным кодом как с синхронным, что сильно улучшает читабельность кода.
Вот как можно использовать функцию readFileAsArray
с помощью async/await:
async function countOdd () {
try {
const lines = await readFileAsArray('./numbers');
const numbers = lines.map(Number);
const oddCount = numbers.filter(n => n%2 === 1).length;
console.log('Odd numbers count:', oddCount);
} catch(err) {
console.error(err);
}
}
countOdd();
Сначала создаём асинхронную функцию — обычную функцию со словом async
в начале. Внутри неё мы вызываем функцию readFileAsArray
, словно она возвращает переменную lines, и для этого мы используем ключевое слово await
. Если вызов readFileAsArray
был синхронным, то продолжаем код. Чтобы выполнить получившееся, мы исполняем функцию async. Так получается просто и читабельно. Для работы с ошибками нам нужно обернуть вызов async в выражение try/catch
.
Благодаря фиче async/await нам не потребовался специальный API (вроде .then и .catch). Мы лишь иначе маркировали функции и взяли чистый JavaScript.
Мы можем использовать async/await с любой функцией, поддерживающей промис-интерфейс. Но не можем — с асинхронными функциями в коллбэк-стиле (например, setTimeout).
EventEmitter — это модуль, содействующий коммуникации между объектами в Node. Он является ядром асинхронной событийно-управляемой архитектуры. Многие из встроенных в Node модулей наследуют от EventEmitter.
Его идея проста: emitter-объекты генерируют именованные события, которые приводят к вызову ранее зарегистрированных прослушивателей. Так что у эмиттера есть две основные функции:
Для работы с EventEmitter нужно создать расширяющий его класс.
class MyEmitter extends EventEmitter {
}
Эмиттеры — это то, что мы инстанцируем из классов на основе EventEmitter:
const myEmitter = new MyEmitter();
В любой момент жизненного цикла эмиттеров мы можем воспользоваться функцией emit и сгенерировать любое именованное событие.
myEmitter.emit('something-happened');
Генерирование события — это сигнал того, что соблюдено какое-то условие. Обычно речь идёт об изменении состояния генерирующего объекта. С помощью метода on
можно добавить функции-прослушиватели, которые будут исполняться каждый раз, когда эмиттеры генерируют свои ассоциированные именованные события.
Взгляните на пример:
const EventEmitter = require('events');
class WithLog extends EventEmitter {
execute(taskFunc) {
console.log('Before executing');
this.emit('begin');
taskFunc();
this.emit('end');
console.log('After executing');
}
}
const withLog = new WithLog();
withLog.on('begin', () => console.log('About to execute'));
withLog.on('end', () => console.log('Done with execute'));
withLog.execute(() => console.log('*** Executing task ***'));
Класс WithLog
— это эмиттер. Он определяет один экземпляр функции execute
. Она получает один аргумент — функцию задачи (task function) — и оборачивает её исполнение в log-выражения. События генерируются до и после исполнения.
Чтобы увидеть, в какой очерёдности всё работает, зарегистрируем прослушивателей для именованных событий и выполним пример задачи по запуску всей цепочки.
Результат:
Before executing
About to execute
*** Executing task ***
Done with execute
After executing
Что я хочу отметить касательно результата исполнения кода: здесь нет ничего асинхронного.
begin
приводит к появлению строки «About to execute».«*** Executing task ***»
.end
приводит к появлению строки «Done with execute».Совсем как старые добрые коллбэки, не предполагающие, что события характерны для синхронного или асинхронного кода. Это важно, потому что если мы передаём в execute
асинхронную taskFunc
, то генерируемые события больше не будут точны.
Можно эмулировать эту ситуацию с помощью вызова setImmediate
:
// ...
withLog.execute(() => {
setImmediate(() => {
console.log('*** Executing task ***')
});
});
Теперь результат будет такой:
Before executing
About to execute
Done with execute
After executing
*** Executing task ***
Это неправильно. Строки после асинхронного вызова, приводящие к появлению вызовов «Done with execute» и «After executing», появляются в неправильной очерёдности.
Для генерирования события после завершения асинхронной функции нам нужно скомбинировать коллбэки (или промисы) с этой событийно-управляемой коммуникацией. Это демонстрируется на нижеприведённом примере.
Одно из преимуществ использования событий вместо обычных коллбэков — то, что мы можем много раз реагировать на один и тот же сигнал благодаря определению многочисленных прослушивателей. Чтобы сделать то же самое с помощью коллбэков, придётся написать больше логики внутри одного доступного коллбэка. События — прекрасный способ реализовать многочисленные внешние плагины, добавляющие функциональность к ядру приложения. Можно считать их «разъёмами» для кастомизации поведения при изменении состояния.
Давайте преобразуем наш синхронный пример в нечто асинхронное и немного более полезное.
const fs = require('fs');
const EventEmitter = require('events');
class WithTime extends EventEmitter {
execute(asyncFunc, ...args) {
this.emit('begin');
console.time('execute');
asyncFunc(...args, (err, data) => {
if (err) {
return this.emit('error', err);
}
this.emit('data', data);
console.timeEnd('execute');
this.emit('end');
});
}
}
const withTime = new WithTime();
withTime.on('begin', () => console.log('About to execute'));
withTime.on('end', () => console.log('Done with execute'));
withTime.execute(fs.readFile, __filename);
Класс WithTime
исполняет asyncFunc
и с помощью вызовов console.time
и console.timeEnd
сообщает о времени, затраченном этой asyncFunc
. Он генерирует правильную последовательность событий до и после исполнения. Также он генерирует error/data-события для работы с обычными сигналами асинхронных вызовов.
Протестируем эмиттер withTime
, передав ему вызов асинхронной функции fs.readFile
. Вместо обработки данных из файла с помощью коллбэка мы теперь можем прослушивать data-событие.
Выполнив этот код, мы, как и ожидалось, получаем правильную последовательность событий, а также отчёт о времени выполнения:
About to execute
execute: 4.507ms
Done with execute
Обратите внимание, что для этого нам нужно было скомбинировать коллбэк с эмиттером. Если бы asynFunc
также поддерживала и промисы, то всё то же самое можно было бы реализовать с помощью async/await:
class WithTime extends EventEmitter {
async execute(asyncFunc, ...args) {
this.emit('begin');
try {
console.time('execute');
const data = await asyncFunc(...args);
this.emit('data', data);
console.timeEnd('execute');
this.emit('end');
} catch(err) {
this.emit('error', err);
}
}
}
Не знаю, как для вас, но для меня это выглядит гораздо читабельнее, чем код на основе коллбэков или строк с .then/.catch. Фича async/await максимально приближает нас к JavaScript, что я считаю большим достижением.
В предыдущем примере было два события, сгенерированных с дополнительными аргументами. Error-cобытие сгенерировано error-объектом.
this.emit('error', err);
Data-cобытие сгенерировано data-объектом.
this.emit('data', data);
После именованного события мы можем использовать столько аргументов, сколько нужно, и все они будут доступны внутри функций-прослушивателей, которые мы зарегистрировали для этих именованных событий.
Например, для работы с data-событием зарегистрированная функция-прослушиватель получит доступ к data-аргументу, который был передан сгенерированному событию. И этот data-объект — именно то, что предоставляет asyncFunc
.
withTime.on('data', (data) => {
// do something with data
});
Обычно событие error
специальное. В примере с коллбэками — если мы не обрабатываем error-событие с помощью прослушивателя, то Node-процесс завершается.
Чтобы продемонстрировать это поведение, снова вызовем исполнение метода с плохим аргументом:
class WithTime extends EventEmitter {
execute(asyncFunc, ...args) {
console.time('execute');
asyncFunc(...args, (err, data) => {
if (err) {
return this.emit('error', err); // Not Handled
}
console.timeEnd('execute');
});
}
}
const withTime = new WithTime();
withTime.execute(fs.readFile, ''); // BAD CALL
withTime.execute(fs.readFile, __filename);
Первый вызов исполнения (execute call) приведёт к ошибке. Node-процесс упадёт или завершится:
events.js:163
throw er; // Unhandled 'error' event
^
Error: ENOENT: no such file or directory, open ''
Это падение повлияет на второй вызов исполнения, который может вообще не быть выполнен.
Если зарегистрировать прослушивателя для специального события error
, то поведение Node-процесса изменится. Например:
withTime.on('error', (err) => {
// do something with err, for example log it somewhere
console.log(err)
});
В данном случае будет сообщено об ошибке первого вызова исполнения, но Node-процесс не упадёт и не завершится. Второй вызов исполнения нормально закончится:
{ Error: ENOENT: no such file or directory, open '' errno: -2, code: 'ENOENT', syscall: 'open', path: '' }
execute: 4.276ms
Обратите внимание, что сейчас Node ведёт себя иначе с функциями на основе промисов, он лишь выдаёт предупреждение, но в конце концов это изменится:
UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): Error: ENOENT: no such file or directory, open ''
DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
Другой способ обработки исключений из-за сгенерированных ошибок — регистрация прослушивателя глобального события процесса uncaughtException
. Однако глобальная ловля ошибок при таком событии — идея плохая.
Стандартный совет относительно uncaughtException
: избегайте его использования. Но если вам это необходимо (например, для отчёта о случившемся или для очисток), то позвольте процессу в любом случае завершиться:
process.on('uncaughtException', (err) => {
// something went unhandled.
// Do any cleanup and exit anyway!
console.error(err); // don't do just that.
// FORCE exit the process too.
process.exit(1);
});
Однако представим, что одновременно произошло несколько error-событий. Это означает, что прослушиватель uncaughtException
запущен несколько раз, что может стать проблемой при очистке кода. Такое бывает, к примеру, когда многочисленные вызовы приводят к завершению работы базы данных.
Модуль EventEmitter
предоставляет метод once
. Он сигнализирует о том, что хватит и одного вызова прослушивателя. Метод практично использовать с uncaughtException
, потому что при первом непойманном исключении мы начнём выполнять чистку, зная, что в любом случае процесс завершится.
Если для одного события зарегистрировать несколько прослушивателей, то они станут вызываться в каком-то порядке. Первый зарегистрированный будет и первым вызванным.
// प्रथम
withTime.on('data', (data) => {
console.log(`Length: ${data.length}`);
});
// दूसरा
withTime.on('data', (data) => {
console.log(`Characters: ${data.toString().length}`);
});
withTime.execute(fs.readFile, __filename);
Если выполнить этот код, то сначала в лог будет занесена строка «Length», а потом «Characters», потому что именно в таком порядке мы определили их прослушивателей.
Если нужно определить нового прослушивателя, но чтобы он вызывался первым, можно воспользоваться методом prependListener
:
// प्रथम
withTime.on('data', (data) => {
console.log(`Length: ${data.length}`);
});
// दूसरा
withTime.prependListener('data', (data) => {
console.log(`Characters: ${data.toString().length}`);
});
withTime.execute(fs.readFile, __filename);
В этом случае в логе сначала появится строка «Characters».
И наконец, если вам нужно убрать прослушивателя, то воспользуйтесь методом removeListener
.
На этом всё.