javascript

Понимание событийной архитектуры Node.js

  • среда, 7 июня 2017 г. в 03:15:19
https://habrahabr.ru/company/mailru/blog/330048/
  • Разработка веб-сайтов
  • Программирование
  • Node.JS
  • JavaScript
  • Блог компании Mail.Ru Group



Большинство Node-объектов — вроде HTTP-запросов, ответов и потоков (streams) — реализуют модуль EventEmitter, благодаря которому они могут генерировать и прослушивать события.


const EventEmitter = require('events')

Простейшая форма управления по событиям — это callback-стиль некоторых популярных Node.js-функций, к примеру fs.readFile. По этой аналогии событие генерируется однократно (когда Node готов к вызову коллбэка), а коллбэк действует как обработчик события. Давайте сначала разберём эту базовую форму событийно-управляемой архитектуры.


Вызови меня, когда будешь готов, Node!


Изначально Node обрабатывал асинхронные события с помощью коллбэков. Это было давно, ещё до того как в JavaScript появилась нативная поддержка промисов и фича async/await. Коллбэки — это просто функции, которые вы передаёте другим функциям. Такое возможно в JavaScript, потому что функции — это объекты первого класса.


Важно понимать, что коллбэки не индикаторы асинхронного вызова в коде. Функция может вызывать коллбэк как синхронно, так и асинхронно. Например, хост-функция fileSize принимает коллбэк-функцию cb, причём вызывает её синхронно или асинхронно в зависимости от условия:


function fileSize (fileName, cb) {
  if (typeof fileName !== 'string') {
    return cb(new TypeError('argument should be string')); // Sync
  }
  fs.stat(fileName, (err, stats) => {
    if (err) { return cb(err); } // Async
    cb(null, stats.size); // Async
  });
}

Это плохой подход, приводящий к неожиданным ошибкам. Создавайте такие хост-функции, которые принимают коллбэки либо всегда синхронно, либо всегда асинхронно.


Давайте разберём простой пример типичной асинхронной Node-функции, написанной в коллбэк-стиле:


const readFileAsArray = function(file, cb) {
  fs.readFile(file, function(err, data) {
    if (err) {
      return cb(err);
    }
    const lines = data.toString().trim().split('\n');
    cb(null, lines);
  });
};

readFileAsArray берёт путь файла и коллбэк-функцию. Считывает содержимое файла, разбивает на массив строк и вызывает применительно к этому массиву коллбэк-функцию. Вот как это можно использовать. Допустим, файл numbers.txt лежит в одной директории с таким контентом:


10
11
12
13
14
15

Если у нас есть задача посчитать числа в этом файле, то для упрощения кода можно воспользоваться readFileAsArray:


readFileAsArray('./numbers.txt', (err, lines) => {
  if (err) throw err;
  const numbers = lines.map(Number);
  const oddNumbers = numbers.filter(n => n%2 === 1);
  console.log('Odd numbers count:', oddNumbers.length);
});

Этот код читает в массиве строк числовой контент, парсит его как числа и выполняет подсчёт.
Здесь работает характерный для Node коллбэк-стиль. У коллбэка есть error-first-аргумент err, который может принимать значение null. Мы передаём этот коллбэк в качестве последнего аргумента хост-функции. Всегда делайте так в своих функциях, потому что пользователи наверняка будут на это рассчитывать. Пусть ваша хост-функция получает коллбэк в виде последнего аргумента, и пусть коллбэк ожидает в качестве своего первого аргумента error-объект.


Современные JS-альтернативы коллбэкам


В современном JavaScript есть такие объекты, как промисы. Они могут быть альтернативой коллбэкам в случае асинхронных API. Вместо передачи коллбэка в качестве аргумента и обработки ошибки в том же месте промис позволяет отдельно обрабатывать успешные и ошибочные ситуации, а также соединять несколько асинхронных вызовов в цепочки, а не делать их вложенными.


Если функция readFileAsArray поддерживает промисы, то мы можем использовать её следующим образом:


readFileAsArray('./numbers.txt')
  .then(lines => {
    const numbers = lines.map(Number);
    const oddNumbers = numbers.filter(n => n%2 === 1);
    console.log('Odd numbers count:', oddNumbers.length);
  })
  .catch(console.error);

Вместо передачи коллбэка мы вызываем функцию .then применительно к возвращаемому значению хост-функции. Обычно .then даёт нам доступ к тем же строкам массива, которые мы получаем в коллбэк-версии, поэтому можем работать как раньше. Для обработки ошибок добавим вызов .catch применительно к результату, что обеспечит нам доступ к ошибке, если она возникнет.


Благодаря новому объекту Promise в современном JavaScript стало легче реализовать поддержку промис-интерфейса хост-функцией. Вот функция readFileAsArray, модифицированная так, чтобы она поддерживала промис-интерфейс в дополнение к уже поддерживаемому коллбэк-интерфейсу:


const readFileAsArray = function(file, cb = () => {}) {
  return new Promise((resolve, reject) => {
    fs.readFile(file, function(err, data) {
      if (err) {
        reject(err);
        return cb(err);
      }
      const lines = data.toString().trim().split('\n');
      resolve(lines);
      cb(null, lines);
    });
  });
};

Функция возвращает объект Promise, в который обёртывается асинхронный вызов fs.readFile. У промиса два аргумента: функции resolve и reject. Если нам нужно вызвать коллбэк с ошибкой, то используем промис-функцию reject, а для коллбэка с данными — промис-функцию resolve.


Единственное отличие заключается в том, что нам нужно иметь значение по умолчанию для коллбэк-аргумента на тот случай, если код используется с промис-интерфейсом. Например, в качестве аргумента можно использовать простую, по умолчанию пустую функцию () => {}.


Потребление промисов с помощью async/await


Добавление промис-интерфейса позволяет гораздо легче работать с вашим кодом, если нужно использовать асинхронную функцию в цикле. С коллбэками ситуация усложняется. Промисы немного улучшают положение дел, как и генератор функций. Иными словами, более свежая альтернатива для работы с асинхронным кодом — функция async. Она позволяет обращаться с асинхронным кодом как с синхронным, что сильно улучшает читабельность кода.


Вот как можно использовать функцию readFileAsArray с помощью async/await:


async function countOdd () {
  try {
    const lines = await readFileAsArray('./numbers');
    const numbers = lines.map(Number);
    const oddCount = numbers.filter(n => n%2 === 1).length;
    console.log('Odd numbers count:', oddCount);
  } catch(err) {
    console.error(err);
  }
}
countOdd();

Сначала создаём асинхронную функцию — обычную функцию со словом async в начале. Внутри неё мы вызываем функцию readFileAsArray, словно она возвращает переменную lines, и для этого мы используем ключевое слово await. Если вызов readFileAsArray был синхронным, то продолжаем код. Чтобы выполнить получившееся, мы исполняем функцию async. Так получается просто и читабельно. Для работы с ошибками нам нужно обернуть вызов async в выражение try/catch.


Благодаря фиче async/await нам не потребовался специальный API (вроде .then и .catch). Мы лишь иначе маркировали функции и взяли чистый JavaScript.


Мы можем использовать async/await с любой функцией, поддерживающей промис-интерфейс. Но не можем — с асинхронными функциями в коллбэк-стиле (например, setTimeout).


Модуль EventEmitter


EventEmitter — это модуль, содействующий коммуникации между объектами в Node. Он является ядром асинхронной событийно-управляемой архитектуры. Многие из встроенных в Node модулей наследуют от EventEmitter.


Его идея проста: emitter-объекты генерируют именованные события, которые приводят к вызову ранее зарегистрированных прослушивателей. Так что у эмиттера есть две основные функции:


  • Генерирование именованных событий.
  • Регистрация и дерегистрация функций-прослушивателей.

Для работы с EventEmitter нужно создать расширяющий его класс.


class MyEmitter extends EventEmitter {

}

Эмиттеры — это то, что мы инстанцируем из классов на основе EventEmitter:


const myEmitter = new MyEmitter();

В любой момент жизненного цикла эмиттеров мы можем воспользоваться функцией emit и сгенерировать любое именованное событие.


myEmitter.emit('something-happened');

Генерирование события — это сигнал того, что соблюдено какое-то условие. Обычно речь идёт об изменении состояния генерирующего объекта. С помощью метода on можно добавить функции-прослушиватели, которые будут исполняться каждый раз, когда эмиттеры генерируют свои ассоциированные именованные события.


События !== асинхронность


Взгляните на пример:


const EventEmitter = require('events');

class WithLog extends EventEmitter {
  execute(taskFunc) {
    console.log('Before executing');
    this.emit('begin');
    taskFunc();
    this.emit('end');
    console.log('After executing');
  }
}

const withLog = new WithLog();

withLog.on('begin', () => console.log('About to execute'));
withLog.on('end', () => console.log('Done with execute'));

withLog.execute(() => console.log('*** Executing task ***'));

Класс WithLog — это эмиттер. Он определяет один экземпляр функции execute. Она получает один аргумент — функцию задачи (task function) — и оборачивает её исполнение в log-выражения. События генерируются до и после исполнения.


Чтобы увидеть, в какой очерёдности всё работает, зарегистрируем прослушивателей для именованных событий и выполним пример задачи по запуску всей цепочки.


Результат:


Before executing
About to execute
*** Executing task ***
Done with execute
After executing

Что я хочу отметить касательно результата исполнения кода: здесь нет ничего асинхронного.


  • Сначала получаем строку «Before executing».
  • Затем именованное событие begin приводит к появлению строки «About to execute».
  • Далее реально исполняемая строка генерирует строку «*** Executing task ***».
  • Потом именованное событие end приводит к появлению строки «Done with execute».
  • В конце получаем строку «After executing».

Совсем как старые добрые коллбэки, не предполагающие, что события характерны для синхронного или асинхронного кода. Это важно, потому что если мы передаём в execute асинхронную taskFunc, то генерируемые события больше не будут точны.


Можно эмулировать эту ситуацию с помощью вызова setImmediate:


// ...

withLog.execute(() => {
  setImmediate(() => {
    console.log('*** Executing task ***')
  });
});

Теперь результат будет такой:


Before executing
About to execute
Done with execute
After executing
*** Executing task ***

Это неправильно. Строки после асинхронного вызова, приводящие к появлению вызовов «Done with execute» и «After executing», появляются в неправильной очерёдности.


Для генерирования события после завершения асинхронной функции нам нужно скомбинировать коллбэки (или промисы) с этой событийно-управляемой коммуникацией. Это демонстрируется на нижеприведённом примере.


Одно из преимуществ использования событий вместо обычных коллбэков — то, что мы можем много раз реагировать на один и тот же сигнал благодаря определению многочисленных прослушивателей. Чтобы сделать то же самое с помощью коллбэков, придётся написать больше логики внутри одного доступного коллбэка. События — прекрасный способ реализовать многочисленные внешние плагины, добавляющие функциональность к ядру приложения. Можно считать их «разъёмами» для кастомизации поведения при изменении состояния.


Асинхронные события


Давайте преобразуем наш синхронный пример в нечто асинхронное и немного более полезное.


const fs = require('fs');
const EventEmitter = require('events');

class WithTime extends EventEmitter {
  execute(asyncFunc, ...args) {
    this.emit('begin');
    console.time('execute');
    asyncFunc(...args, (err, data) => {
      if (err) {
        return this.emit('error', err);
      }

      this.emit('data', data);
      console.timeEnd('execute');
      this.emit('end');
    });
  }
}

const withTime = new WithTime();

withTime.on('begin', () => console.log('About to execute'));
withTime.on('end', () => console.log('Done with execute'));

withTime.execute(fs.readFile, __filename);

Класс WithTime исполняет asyncFunc и с помощью вызовов console.time и console.timeEnd сообщает о времени, затраченном этой asyncFunc. Он генерирует правильную последовательность событий до и после исполнения. Также он генерирует error/data-события для работы с обычными сигналами асинхронных вызовов.


Протестируем эмиттер withTime, передав ему вызов асинхронной функции fs.readFile. Вместо обработки данных из файла с помощью коллбэка мы теперь можем прослушивать data-событие.


Выполнив этот код, мы, как и ожидалось, получаем правильную последовательность событий, а также отчёт о времени выполнения:


About to execute
execute: 4.507ms
Done with execute

Обратите внимание, что для этого нам нужно было скомбинировать коллбэк с эмиттером. Если бы asynFunc также поддерживала и промисы, то всё то же самое можно было бы реализовать с помощью async/await:


class WithTime extends EventEmitter {
  async execute(asyncFunc, ...args) {
    this.emit('begin');
    try {
      console.time('execute');
      const data = await asyncFunc(...args);
      this.emit('data', data);
      console.timeEnd('execute');
      this.emit('end');
    } catch(err) {
      this.emit('error', err);
    }
  }
}

Не знаю, как для вас, но для меня это выглядит гораздо читабельнее, чем код на основе коллбэков или строк с .then/.catch. Фича async/await максимально приближает нас к JavaScript, что я считаю большим достижением.


Аргументы событий и ошибки


В предыдущем примере было два события, сгенерированных с дополнительными аргументами. Error-cобытие сгенерировано error-объектом.


this.emit('error', err);

Data-cобытие сгенерировано data-объектом.


this.emit('data', data);

После именованного события мы можем использовать столько аргументов, сколько нужно, и все они будут доступны внутри функций-прослушивателей, которые мы зарегистрировали для этих именованных событий.


Например, для работы с data-событием зарегистрированная функция-прослушиватель получит доступ к data-аргументу, который был передан сгенерированному событию. И этот data-объект — именно то, что предоставляет asyncFunc.


withTime.on('data', (data) => {
  // do something with data
});

Обычно событие error специальное. В примере с коллбэками — если мы не обрабатываем error-событие с помощью прослушивателя, то Node-процесс завершается.


Чтобы продемонстрировать это поведение, снова вызовем исполнение метода с плохим аргументом:


class WithTime extends EventEmitter {
  execute(asyncFunc, ...args) {
    console.time('execute');
    asyncFunc(...args, (err, data) => {
      if (err) {
        return this.emit('error', err); // Not Handled
      }

      console.timeEnd('execute');
    });
  }
}

const withTime = new WithTime();

withTime.execute(fs.readFile, ''); // BAD CALL
withTime.execute(fs.readFile, __filename);

Первый вызов исполнения (execute call) приведёт к ошибке. Node-процесс упадёт или завершится:


events.js:163
      throw er; // Unhandled 'error' event
      ^
Error: ENOENT: no such file or directory, open ''

Это падение повлияет на второй вызов исполнения, который может вообще не быть выполнен.


Если зарегистрировать прослушивателя для специального события error, то поведение Node-процесса изменится. Например:


withTime.on('error', (err) => {
  // do something with err, for example log it somewhere
  console.log(err)
});

В данном случае будет сообщено об ошибке первого вызова исполнения, но Node-процесс не упадёт и не завершится. Второй вызов исполнения нормально закончится:


{ Error: ENOENT: no such file or directory, open '' errno: -2, code: 'ENOENT', syscall: 'open', path: '' }
execute: 4.276ms

Обратите внимание, что сейчас Node ведёт себя иначе с функциями на основе промисов, он лишь выдаёт предупреждение, но в конце концов это изменится:


UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): Error: ENOENT: no such file or directory, open ''
DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

Другой способ обработки исключений из-за сгенерированных ошибок — регистрация прослушивателя глобального события процесса uncaughtException. Однако глобальная ловля ошибок при таком событии — идея плохая.


Стандартный совет относительно uncaughtException: избегайте его использования. Но если вам это необходимо (например, для отчёта о случившемся или для очисток), то позвольте процессу в любом случае завершиться:


process.on('uncaughtException', (err) => {
  // something went unhandled.
  // Do any cleanup and exit anyway!

  console.error(err); // don't do just that.

  // FORCE exit the process too.
  process.exit(1);
});

Однако представим, что одновременно произошло несколько error-событий. Это означает, что прослушиватель uncaughtException запущен несколько раз, что может стать проблемой при очистке кода. Такое бывает, к примеру, когда многочисленные вызовы приводят к завершению работы базы данных.


Модуль EventEmitter предоставляет метод once. Он сигнализирует о том, что хватит и одного вызова прослушивателя. Метод практично использовать с uncaughtException, потому что при первом непойманном исключении мы начнём выполнять чистку, зная, что в любом случае процесс завершится.


Порядок прослушивателей


Если для одного события зарегистрировать несколько прослушивателей, то они станут вызываться в каком-то порядке. Первый зарегистрированный будет и первым вызванным.


// प्रथम
withTime.on('data', (data) => {
  console.log(`Length: ${data.length}`);
});

// दूसरा
withTime.on('data', (data) => {
  console.log(`Characters: ${data.toString().length}`);
});

withTime.execute(fs.readFile, __filename);

Если выполнить этот код, то сначала в лог будет занесена строка «Length», а потом «Characters», потому что именно в таком порядке мы определили их прослушивателей.


Если нужно определить нового прослушивателя, но чтобы он вызывался первым, можно воспользоваться методом prependListener:


// प्रथम
withTime.on('data', (data) => {
  console.log(`Length: ${data.length}`);
});

// दूसरा
withTime.prependListener('data', (data) => {
  console.log(`Characters: ${data.toString().length}`);
});

withTime.execute(fs.readFile, __filename);

В этом случае в логе сначала появится строка «Characters».


И наконец, если вам нужно убрать прослушивателя, то воспользуйтесь методом removeListener.


На этом всё.