RxJS: как операторы высшего порядка упрощают код
- понедельник, 28 августа 2023 г. в 00:00:13
Если вы работали с Angular, то наверняка встречались с RxJS. Потоки, развесистые конструкции, много аргументов у метода pipe, а каждый аргумент возвращают разные функции с разным количеством аргументов. Есть интуитивно понятные функции типа filter
или map
. Первый явно фильтрует значения в потоке, а второй эти значения меняет. Такие функции называют операторами. И чем глубже вы проваливаетесь в RxJS, тем больше самых разных операторов вы узнаете. И со временем добираетесь до потоков потоков. То есть вместо обычных значений такой поток эмитит другие потоки. Такие потоки называют Higher Order Observables. И для работы с такими потоками существуют специальные операторы. Возможно, вы слышали, что такие операторы называют Higher Order Operators (HOO). Они могут выравнивать потоки или, другими словами, делать их обычным.
В этой статье я покажу, что в HOO нет ничего мифического, и расскажу в каких случаях вам нужно использовать операторы высшего порядка. Сейчас вы подумаете, что это скучный лонгрид, но не торопитесь. Мы рассмотрим всего 4 оператора: switchMap
, exhaustMap
, concatMap
и mergeMap
.
switchMap
однозначно самый популярный из всех. Но почему? А по одной просто причине — этот оператор избавляет нас от очень часто встречающегося состояния гонки.
Давайте рассмотрим код:
const searchInput = document.getElementById('search-input');
const search$ = fromEvent(searchInput, 'input').pipe(
map(event => event.target.value)
);
search$.subscribe((query) => {
http.get('/search', {params: {query}}).subcribe((result) => {
console.log(result);
})
})
В этом коде мы находим input, с которым взаимодействует пользователь и подписываемся на событие input
. То есть поток search$
эмитит строки. Внутри подписки мы видим, что на каждый эмит строки отправляется запрос на сервер и ответ сервиса выводится в консоль.
В этом коде можно увидеть как минимум две проблемы:
**Состояние гонки. ** Обычно при поиске чего-либо пользователю важно видеть результат именно последнего запроса. Но код такого вида не дает нам гарантии, что последние данные, выведенные в консоль, соответствуют последней испускаемой строке в потоке search$
.
Подписка в подписке и ни одной отписки. Есть очень хорошее правило, следуя которому можно избавить себя от многих проблем, — на каждую подписку должна быть отписка. Это как минимум снизит вероятность утечки памяти.
Но давайте подумаем, как должен работать поиск:
В момент эмита строки проверить наличие активных запросов
Если запросов нет, то перейти к пункту 4
Отписаться от предыдущих запросов
Подписаться на новый запрос
Вывести в консоль
В лоб это можно реализовать так:
const searchInput = document.getElementById('search-input');
const search$ = fromEvent(searchInput, 'input').pipe(
map(event => event.target.value)
);
let subscription;
search$.subscribe((query) => {
if (subscription) {
subscription.unsubscribe();
}
subscription = http.get('/search', {params: {query}}).subcribe((result) => {
console.log(result);
})
})
Но что если я вам скажу, что первые 4 пункта требований делает оператор switchMap
? Давайте перепишем код:
const searchInput = document.getElementById('search-input');
const search$ = fromEvent(searchInput, 'input').pipe(
map(event => event.target.value)
);
search$.pipe(
switchMap((query) => http.get('/search', {params: {query}}))
).subscribe((result) => {
console.log(result);
});
switchMap
гарантирует нам, что мы всегда будем получать результаты последнего потока и избавляет нас от состояния гонки. Ну и приятный бонус будет состоять в том, что отписавшись от внешней подписки автоматически произойдет и отписка от внутренней. Профит!
Резюмируем. switchMap
можно использовать в случаях, когда нам важен результат последнего действия, например, при поиске или реализации бесконечного скрола. Если все предыдущие действия могут не учитываться, то можно смело брать switchMap
.
exhaustMap
однозначно самый непопулярный из всех. Причина мне не до конца понятна, но с помощью него так же можно реализовать поиск, но используя другой подход.
const searchInput = document.getElementById('search-input');
const searchButton = document.getElementById('search-button');
const searchButtonClick$ = fromEvent(searchButton, 'click');
searchButtonClick$.subscribe((result) => {
const query = searchInput.value;
http.get('/search', { params: { query } }).subscribe((result) => {
console.log(result);
});
});
В этом коде инициатор запрос — клик по кнопке.
Собственно, этот код имеет такие же проблемы, как и в предыдущем примере, но здесь у нас другие требования:
В момент нажатия на кнопку проверить наличие активных запросов
Если есть активный, ничего не делать и ждать следующего клика
Если нет активного, подписаться на выполнение запроса
Вывести результат в консоль
Реализуем в лоб:
const searchInput = document.getElementById('search-input');
const searchButton = document.getElementById('search-button');
const searchButtonClick$ = fromEvent(searchButton, 'click');
let isRequestPending = false;
searchButtonClick$.subscribe((result) => {
if (isRequestPending) {
return;
}
isRequestPending = true;
const query = searchInput.value;
http.get('/search', { params: { query } }).subscribe((result) => {
isRequestPending = false;
console.log(result);
});
});
Как и в первом случае нам понадобилась еще одна переменная за пределами потока. Это добавляет коду сложности и если мы захотим изменить поведение потока все придется переписать. Ну и как вы уже догадались на помощь нам приходит exhaustMap
.
const searchInput = document.getElementById('search-input');
const searchButton = document.getElementById('search-button');
const searchInput$ = fromEvent(searchButton, 'click');
searchInput$.pipe(
exhaustMap(() => {
const query = searchInput.value;
return http.get('/search', { params: { query } });
})
).subscribe((result) => {
console.log(result);
});
Резюмируем. exhaustMap
нужно использовать в случаях, когда при активной подписке на поток остальные можно игнорировать, как в случае с поиском по нажатию кнопке или, например, при пропуске событий начала анимации при ее воспроизведении.
mergeMap
— оператор, который объединяет все внутренние потоки в один выходной поток. Это значит, что внутренние потоки могут завершаться в любом порядке, и их результаты будут объединены вместе. И это самое простое объяснение, которое я смог из себя выдавить.
Давайте перейдем к примерам:
entityId$.subscribe((id) => {
entityService.get(id).subscribe(() => {
entityStore.upsertItem(entity);
});
});
В этом коде мы видим entityId$ — это поток строк с id некой сущности. Здесь мы должны на каждый id запросить данные по сущности с сервера и добавить или обновить эту сущность в стор. Собственно, именно это наш код и делает и решать тут нечего. Но проблемы есть и в этом случае они совершенно идентичны предыдущим. Давайте попробуем усложнить задачу и ввести ограничение в 3 запроса в один момент времени.
Решаем:
const CONCURRENT_REQUESTS = 3;
let activeRequests = 0;
const queue = [];
function processNext() {
if (queue.length === 0 || activeRequests >= CONCURRENT_REQUESTS) {
return;
}
const id = queue.shift();
activeRequests++;
entityService.get(id).subscribe(entity => {
entityStore.upsertItem(entity);
activeRequests--;
processNext();
});
}
entityId$.subscribe(id => {
queue.push(id);
processNext();
});
Я даже не пытался проверять код на работоспособность, потому что написал его прямо в редакторе текста. Код стал сложно-читаемым. Функция processNext
имеет несколько побочных эффектов внутри. А еще есть дополнительные переменные за пределами потока и функции. Сложить все это воедино достаточно сложно.
Именно такие задачи решает mergeMap
. Давайте перепишем первый пример с использованием этого оператора:
entityId$.pipe(
mergeMap((id) => entityService.get(id))
).subscribe((entity) => {
entityStore.upsertItem(entity);
});
В этом коде mergeMap
подписывается на каждый поток, возвращенный entityService.get(id)
, и их значения выдает в одном едином потоке.
Хорошо, а как быть с ограничением в 3 запроса в один момент времени? Оказывается, что mergeMap
уже и так все умеет. Второй аргумент в mergeMap
принимает число, которое как раз настраивает конкурентность.
entityId$.pipe(
mergeMap((id) => entityService.get(id), 3)
).subscribe((entity) => {
entityStore.upsertItem(entity);
});
Вот и все!
Резюмируем. mergeMap
отлично подходит, когда вы хотите выполнять параллельные действия и объединять их результаты. Однако следует быть осторожным, так как может возникнуть много активных запросов, если исходный поток излучает значения слишком быстро.
concatMap
— последний оператор высшего порядка. Ключевое отличие заключается в том, что concatMap поддерживает порядок выполнения. Он дождется завершения одного внутреннего потока, прежде чем перейдет к следующему.
Что бы практически посмотреть на его использование, мы можем взять предыдущий пример и поменять к нему требования. Так получилось, что нас перестало устраивать неупорядоченность запросов и мы хотим выполнять их не параллельно, а по очереди. То есть конкурентность должна стать равной единице.
entityId$.pipe(
mergeMap((id) => entityService.get(id), 1)
).subscribe((entity) => {
entityStore.upsertItem(entity);
});
Но mergeMap с конкретностью 1 делает ровно тоже самое, что и concatMap! Буквально. Это прекрасно видно в исходном коде оператора.
То есть использование mergeMap с конкурентностью 1 на столько частый кейс, что его вынесли в отдельный оператор.
Резюмируем. concatMap
прекрасно подходит для ситуаций, когда порядок выполнения важен. Если вы хотите обработать последовательность действий без параллельной обработки, это ваш выбор.
Высокоуровневые операторы являются мощным инструментом в арсенале каждого разработчика, работающего с реактивным программированием. Они предоставляют гибкость и элегантность при обработке сложных наблюдаемых данных и позволяют сократить код, делая его более читаемым и поддерживаемым.
Оператор switchMap
отлично подходит, когда нам важен только последний результат излучения, например, в случае поиска в реальном времени.
exhaustMap
идеален для случаев, когда нам нужно игнорировать новые наблюдаемые объекты до завершения текущего.
mergeMap
позволяет обрабатывать несколько входящих наблюдаемых объектов параллельно, но может привести к перегрузке, если не контролировать количество одновременных потоков.
concatMap
гарантирует порядок обработки, выполняя каждый внутренний наблюдаемый объект последовательно.
При правильном использовании эти операторы могут справляться с множеством реактивных задач, будь то наблюдаемые события из пользовательского интерфейса, HTTP-запросы или даже сложные анимационные последовательности.
Однако ключевое слово здесь - правильное использование. Всегда анализируйте требования вашего приложения и тщательно выбирайте подходящий оператор. Это поможет избежать нежелательных побочных эффектов и создать реактивные решения, которые могут масштабироваться и легко поддерживаться.
Реактивное программирование предлагает множество инструментов, и среди них высокоуровневые операторы занимают особое место. Проведя время на изучение и понимание их особенностей, вы значительно можете улучшить качество и эффективность вашего кода.
Если вы нашли эту статью полезной и хотите больше читать и видеть еще больше моего контекнта обязательно подпишитесь на мой канал в Telegram под названием "Techlead's Diary". Вы также можете оставаться на связи со мной в Twitter, чтобы участвовать в дискуссиях. Присоединяйтесь к обсуждению и продолжайте совершенствовать свои навыки разработки!