Безошибочная работа с Kafka из Node js. Часть 2 Консьюмер
- суббота, 18 апреля 2026 г. в 00:00:07
Если предыдущая часть была посвящена аспектам, связанным с публикацией сообщений, то в этой части основное внимание уделено обработке сообщений, проектированию консьюмеров и проблеме ребаланса в консьюмер группе.
Разберем основные способы масштабирования потребления сообщений из топика и увеличения пропускной способности консьюмеров.
Пропускная способность обработки данных из топика ограничена, в первую очередь, количеством партиций, а не тем как много консьюмеров имеется в группе.
Расчет количества партиций и консьюмеров можно описать следующей формулой: кол-во консьюмеров в группе ≤ кол-во партиций. Если консьюмеров больше, чем партиций, то некоторые консьюмеры не получат партицию и будут просто простаивать (и это может создавать ряд проблем), если наоборот, то партиции будут распределены между всеми консьюмерами в группе. В случае, если количество партиций не делится целочисленно, то каким-то консьюмерам достанется больше партиций, а каким-то меньше.
Потенциально можно запускать несколько консьюмер групп для топика. Но у этого способа есть проблема, что каждая консьюмер группа сохраняет оффсет (в партиции) для защиты от повторного чтения сообщения, что ведет к необходимости ручной синхронизации оффсетов между консьюмер группами. В целом консьюмер группы создавались не для масштабирования чтения, а для группировки консьюмеров. Изначальной целью создания новых консьюмер групп является разделение логики обработки на основе одного и того же топика, а не увеличение пропускной способности чтения.
Выбор количества партиций интересная задача, так как количество партиций можно сменить «на лету» после создания топика, но вполне вероятно можно сломать порядок обработки сообщений: новые сообщения будут попадать в другую партицию, нежели где хранятся старые сообщения (до изменения партиции). И в Kafka нет встроенного инструмента для того, чтобы провести, так сказать, решардинг данных в партиции.
Для равномерного распределения партиций между консьюмерами я рекомендую выбрать количество партиций, которое имеет много целочисленных делителей. Например,24 это число делится на 2, 3, 4, 6, 8, 12, 24. Если в продакшене нет доступа к брокеру через какой-то клиент (CLI, Kadeck, KafkaUI), то можно в приложении подписаться на событие consumer.group_join для того чтобы узнать какие партиции получил консьюмер:
consumer.on(consumer.events.GROUP_JOIN, ({ payload }) => { // { 'user_events': [ 0, 21 ] } console.log('Assigned partitions', payload.memberAssignment); });
Например, выше консьюмер получил при старте партиции 0 и 21 в топике user_event.
Технически общее количество партиций суммарно для всех топиков в Kafka не может быть больше 200 000. Исходя из этого, не стоит для топиков с малой нагрузкой создавать много партиций (особенно, если кластер общий для множества сервисов), а если не требуются гарантии порядка обработки сообщений, то можно безболезненно добавить партиции позже по необходимости.
Еще одна вещь, которая может ускорить обработку сообщений, это их параллельная обработка. когда за консьюмером закреплено несколько партиций. Активируется этот функционал через опцию partitionsConsumedConcurrently:
consumer.run({ partitionsConsumedConcurrently: 2, eachMessage: async ({ topic, partition, message }) => {/* handle... */}, });
По умолчанию стоит последовательная обработка сообщений (partitionsConsumedConcurrently=1) . При partitionsConsumedConcurrently > 1 порядок обработки сообщений сохраняется в рамках партиции. Главное тут следить за тем, чтобы чересчур быстрая обработка сообщений не создавала избыточную нагрузку как на сам сервис (CPU/RAM), так и на БД или другие сервисы, которые вызываются в рамках обработки событий.
Кроме того, для ускорения обработки можно вычитывать сообщения пачками из топика с помощью eachBatch, но там возникает крайне много ручной работы с heartbeat, фиксацией offset и передачей его в kafka. Следовательно, для того чтобы оставить код консьюмера максимально простым лучше избегать eachBatch в большинстве случаев.
По умолчанию консьюмеры читают данные с лидер ноды для выбранной партиции. Такая стратегия создает проблемы, eсли ваш кластер Kafka находится в нескольких дата-центрах и в особенности, если ДЦ геораспределены. Проблемы заключаются в виде задержек по сети и большой нагрузки на ноды кластера, ведь лидер ноды обслуживают как запись, так и чтение.
Для решения этой проблемы в Kafka есть функционал чтения данных с ближайшей реплики. Для этого в настройках Kafka надо указать следующие параметры: broker.rack (на sibir_tomsk) и replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector. При создании консьюмера надо передавать rackId – значение должно соответствовать одному из возможных значений broker.rack:
const consumer = kafka.consumer({ groupId: 'consumer_1', rackId: 'sibir_tomsk' });

Причем данный подход может пригодиться даже в рамках одного ДЦ, например, читать с реплики находящейся в той же стойке, что и консьюмер, ведь задержки по сети будут значительно меньше.
Да, данные с лидера доезжают до реплик с некоторой задержкой, но сценарии работы с Kafka не предполагают реалтайма, так что это не должно быть проблемой.
Пройдем по основным ошибкам, связанным с обработкой сообщений.
Частая ошибка, которая встречается при разделении/объединении новых сервисов, заключается в том, что при добавлении новой консьюмер группы к существующему топику, новые консьюмеры не обрабатывают сообщения, которые были опубликованы в топик до подключения консьюмеров. Зачастую на это попадают те, кто забыл, что Kafka распределенный лог с оффсетом, а не очередь сообщений наподобие RabbitMQ.
Новый консьюмер не имеет закомиченного оффсета (__consumer_offsets) в Kafka, поэтому брокер использует параметрauto.offset.reset для определения, какой оффсет выдать консьюмеру. По умолчанию auto.offset.reset имеет значениеlatest – оффсет будет равен концу назначенной партиции. Другими словами, будут обрабатываться лишь сообщения, опубликованные в топик после коннекта консьюмера к брокеру, а вся история в партиции будет проигнорирована. Можно установить earliest(читать партицию с самого начало) как дефолтное значение для auto.offset.reset, но не рекомендую, если будет топик с большой историей, то можно получить перегрузку консьюмеров по ресурсам, а также внешних систем, использующихся в обработке сообщений.
Это защита от необработки сообщений в консьюмере: консьюмер подключился –> получил сообщения и закоммитил оффсет последнего сообщения –> консьюмер упал в ходе обработки сообщений –> консьюмер подключился и получил оффсет уже новых сообщений, ведь для проблемных сообщений уже был закоммичен оффсет.
Упс, мы потеряли обработку сообщений для данной консьюмер группы!
Как правило, лучше оставить auto.offset.reset=latest, а oффсетом управлять следующим образом:
Для того чтобы новый консьюмер прочитал сообщения с определенного оффсета требуется остановить сам консьюмер, а затем выставить необходимый оффсет для партиции через kafka клиент: Kafka cli, Kadeck, KafkaUI и т.д.
Если требуется прочитать партицию с самого начала и нет желания указывать оффсеты для партиций через Kafka клиент, то можно выставить оффсет в самом консьюмере с помощью опции fromBeginning: true:
await consumer.subscribe({ topic: 'user_events', fromBeginning: true });
Сработает эта опция лишь один раз при первом подключении консьюмера, а при повторных подключениях она будет проигнорирована, ведь будет иметься зафиксированный в Kafka оффсет для партиции (__consumer_offsets) для данной консьюмер группы.
Ребаланс это процесс перераспределения партиций топика между консьюмерами, который запускается в тот момент, когда кластер обнаруживает подключение/отключение консьюмера в группе. Ребалансом для консьюмер группы занимается брокер (group coordinator), которого выбирает Kafka. В данный момент библиотека KafkaJS поддерживает только одну стратегию ребаланса Eager. Согласно ей, в момент ребаланса все консьюмеры делают stop the world: прекращают получать новые данные из партиций, завершают обработку полученных сообщений, фиксируют оффсеты и уведомляют брокер о готовности к ребалансу. Ребаланс консьюмеров не должен происходить часто, так как данный процесс затратный по времени, ведь требуется синхронизация всех консьюмеров в группе, и это ведет к их простою. Если в ходе ребаланса будут проблемы с сетью, у одного из консьюмеров возникнет ошибка или он не уложится в отведенные таймауты, то брокер начнет данный процесс заново (потенциально это может происходить бесконечно).
На данный момент стратегия Cooperative sticky(Kafka 2.4) в KafkaJS не поддерживается. Аналогичная ситуация со стратегией Static Membership (Kafka 2.3), когда groupId используется для связки консьюмера с партициями и, если консьюмер отключился и затем подключился в рамках sessionTimeout, то брокер не запускает ребаланс, а просто назначает те же партиции (функционал полезен в случае раскатки релизов или при перезапуске отдельно взятого инстанса приложения)
Kafka с помощью трех следующих параметров (консьюмер передает их при подключении) определяет, жив ли тот или иной консьюмер в группе:
heartbeatInterval– частота отправления heartbeat сообщения брокеру.
sessionTimeout– интервал, в рамках которого брокер определяет жив или нет консьюмер, если за этот промежуток времени не пришло ни одного heartbeat сообщения, то брокер считает консьюмер мертвым и начинает ребаланс.
rebalanceTimeout– интервал, определяющий сколько дано времени консьюмеру на то, чтобы присоединиться к процессу ребаланса: обработать сообщение и зафиксировать оффсет, отправить к брокеру запрос JoinGroupи получить от него партиции. Перед началом ребаланса брокеру надо дождаться, пока все консьюмеры прекратят читать для исключения возможности, что с одной партицией одновременно работает несколько консьюмеров. В случае, если консьюмер не уложился в данный интервал, то брокер исключает его из группы.
Классические ошибки, ведущие к ребалансу, и проблемы, связанные с данными опциями:
Смешивая в одном инстансе приложения консьюминг сообщений из топика с другим функционалом (HTTP сервер, фоновые задачи и т.д) есть большой риск получить ребаланс из-за ошибки в этом функционале (необработанная фатальная ошибка, превышение лимитов RAM и т.д).
Решение заключается в выделении консьюмера в отдельный инстанс приложения, что убирает риск ребаланса из-за ошибок, не связанных с консьюмингом.
Отслеживать ребаланс консьюмера можно, подписавшись на соответствующее событие:
consumer.on(consumer.events.REBALANCING, ({ payload }) => { /* * { * groupId: 'consumer_1', * memberId: 'my-app-0c2f1847-1a3a-454f-81c2-e3f4cdb104ce' * } */ console.log('Rebalancing', payload); });
Используя это событие, можно построить метрику для обнаружения неожиданных ребалансов в консьюмер группе.
Если при деплое приложения консьюмеры по несколько раз ребалансятся прежде чем начать работать или даже уходят в вечный ребаланс, то вероятно проблема с настройками sessionTimeoutи heartbeatInterval.
При отсутствии ошибок в самих консьюмерах, основной причиной в таких случаях является то, что инстанс не успевает вступить в работу за sessionTimeout. При этом вероятность возникновения данной проблемы прямо пропорциональна количеству консьюмеров в группе.
Решение проблемы заключается в следующих вещах:
В наличии метрики для фиксации времени на запуск. необходимого консьюмеру. Рекомендуется выставить sessionTimeout в 2 раза больше, чем среднее время запуска.
Выставить правильное соотношение heartbeat и sessionTimeout. Пример плохого соотношения sessionTimeout = 1000, heartbeatInterval = 500:

Достаточно в данной конфигурации потерять один heartbeat (например, второй heartbeat,будет получен в 1.01 секунду брокером) и брокер пометит консьюмер как мертвый и начнет ребаланс.в группе. Хорошее соотношение, когда в sessionTimeout укладывается отправка 3 – 4 heartbeat, во избежание ложного ребаланса.
Не создавать чересчур большие консьюмер группы. Большие консьюмер группы долго ребалансятся из-за необходимости синхронизации между всеми участниками. Нужно искать баланс между скоростью обработкой сообщений и количеством консьюмеров в группе.
Cталкиваясь с вышеописанной проблемой, часто выставляют неадекватно большое значение sessionTimeout. Например, в конфигурации sessionTimeout=3600000 (1 час) heartbeatInterval = 60000 (1 минута), если консьюмер умрет или остановится по какой-то причине, то брокер узнает об этом лишь через час, когда за весь sessionTimeout не придет ни одного heartbeat сообщения. С точки зрения брокера весь этот час консьюмер будет считаться здоровым. При этом лаг задержки консьюмера вырастет, так как сообщения из партиций не будут обрабатываться. В комбинации с политикой хранения данных (retention Kafka удаляет сообщения при достижении срока хранения или размера топика), можно получить ситуацию с потерей данных. Консьюмер «зомби» не даст провести ребаланс для переназначения партиций, а Kafka будет удалять накопившиеся сообщения согласно политике retention.
Решение проблемы заключается в следующих вещах:
Не делатьsessionTimeout бездумно большим. На стороне брокера с помощью опций group.min|max.session.timeout.ms можно задать границы для sessionTimeout. Для того чтобы случайно или по незнанию в консьюмере не могли выставить неверное значение.
Наличие метрики для отслеживаения отправки heartbeat сообщений. Это можно сделать подписавшись на соответсвующее событие в консьюмере:
consumer.on(consumer.events.HEARTBEAT, (event) => { /* * { * id: 14, * type: 'consumer.heartbeat', * timestamp: 1769886822510, * payload: { * groupId: 'consumer_1', * memberId: 'my-app-7571b48a-e502-49f5-b99e-23a71b7436d1', * groupGenerationId: 342 * } * } */ console.log('Heartbeat', event); });
Тогда при больших sessionTimeout будет возможность гораздо проще и раньше отследить «зомби» консьюмеры.
Наличие метрики на размер лага консьюмера, хотя сам по себе лаг не всегда означает проблему (возможно в моменте много данных записали в топик).
Еще крайне неприятной проблемой может являться долгая синхронная обработка сообщений в консьюмере. Долгая синхронная операция заблокирует отправку heartbeatсообщений, что приведет к ребалансу.
Пример ниже условный, ведь все зависит от загруженности и мощности оборудования (в каких-то случаях консьюмеру может хватить ресурсов для обработки сообщения и отправки heartbeatв заданные рамки):
import crypto from 'node:crypto'; const consumer = kafka.consumer({ groupId: 'consumer_1', sessionTimeout: 10_000, heartbeatInterval: 200, }); await consumer.run({ eachMessage: async () => { const list = new Int32Array(10_000_000).fill(2); // Эмуляция долгой синхронно операции const sum = list.reduce((sum, el, i) => { return (sum + el ** 2 + Array.from({ length: 100 }, () => crypto.randomInt(0, 100_000)).reduce((total, el) => total + el, 0); }, 0); console.log('Total sum', sum); }, });
Решение проблемы заключается в следующих вещах:
Разбиение долгой синхронной операции на множество асинхронных с ручной отправкой heartbeatc некоторой периодичностью:
import crypto from 'node:crypto'; import promiseMtd from 'promise_mtd'; await consumer.run({ eachMessage: async ({ heartbeat }) => { const list = new Int32Array(10_000_000).fill(2); // Разбиение на асинхронные операции const sum = await promiseMtd.reduce(list, async (sum, el, i) => { if (i % 10_000 === 0) { // ручная отправка heartbeat await heartbeat(); } return sum + el ** 2 + Array.from({ length: 100 }, () => crypto.randomInt(0, 100_000)).reduce((total, el) => total + el, 0); }, 0 ); console.log('Total sum', sum); }, });
Метрика для фиксации времени обработки сообщений, что может позволить обнаружить сообщения, порождающие ребаланс, плюс наличие метрики для самих heartbeat консьюмера, о которой говорилось раньше.
Казалось бы с долгой асинхронной обработкой сообщений в консьюмере не должно возникнуть проблем с отправкой heartbeat и последующим ребалансом:
import { setTimeout } from 'node:timers/promises'; const consumer = kafka.consumer({ groupId: 'consumer_1', sessionTimeout: 10_000, heartbeatInterval: 200, }); await consumer.run({ eachMessage: async () => { // Эмуляция долгой асинхронной задачи await setTimeout(1 * 60_000); // code... console.log('Message was handled'); }, });
К сожалению, это не так. Библиотека KafkaJS отправляет heartbeat только после выполнения eachMessage,и поэтому долгая асинхронная задача блокирует отправку heartbeat. И именно такое поведение было также причиной почему heartbeat приходилось отсылать вручную в примере с долгой синхронной обработкой.
Решение заключается в следующих вещах:
Для долгой асинхронной обработки сообщений (ответ от внешний системы превышает sessionTimeout) запускать отправку heartbeat в отдельной асинхронной задаче:
setInterval(async () => await heartbeat(), 200);
Иметь вышеозвученные метрики
Использовать pause/resume для того чтобы «охладить» пыл консьюмера. Это требуется, когда консьюмер обрабатывает сообщения быстрее, чем способны выдержать внешние системы (превышение лимитов запросов и т.д) или БД.
import { setTimeout } from 'node:timers/promises'; await consumer.run({ eachMessage: async ({ pause }) => { const resume = pause(); setTimeout(3 * 60_000).then(() => { resume(); }); throw new Error('Retry'); }, });
pause/resume защищают от ошибок при эмуляции sleep через setTimeout, значение которого должно быть меньше sessionTimeout.
Еще одной причиной вечного ребаланса может являться не подходящее значение опцииrebalanceTimeout.
Например, имеется следующий консьюмер (rebalanceTimeout 1 cекунда), а среднее время обработки сообщения 1 минута:
const consumer = kafka.consumer({ groupId: 'consumer_1', sessionTimeout: 10_000, heartbeatInterval: 200, rebalanceTimeout: 1000, }); await consumer.run({ eachMessage: async () => { // Обработка сообщения занимает 1 минуту }, });
Имеется консьюмер группа из двух таких участников.
Рассмотрим следующий сценарий:
С целью релиза новой версии приложения запускается передеплой инстансов.
Первый консьюмер подключился и взял в обработку сообщение.
Второй консьюмер пытается подключиться, что запускает процесс ребаланса.
Брокер ждет ответа от первого консьюмера, что требуется остановиться, так как начался ребаланс.
Первый консьюмер не успевает в рамкахrebalanceTimeout обработать сообщение и зафиксировать оффсет из-за чего брокер исключает его (KafkaJSError: The coordinator is not aware of this member).
Второй консьюмер подключается и начинает обрабатывать сообщение.
В этот момент первый консьюмер перезапускается и пытается снова подлючиться, что приводит снова к началу ребаланса.
Второй консьюмер постигает аналогичная судьба, что и первый, он не успевает обработать сообщения в рамках rebalanceTimeout и брокер исключает его.
И так по кругу....
Cледующие действия могут помочь решить проблему:
Устанавливать rebalanceTimeout в 2 раз больше, чем среднее время обработки сообщения (нужна метрика для определения среднего времени обработки).
Выключить автоматический перезапуск консьюмера в случае ошибки с целью снижения шанса зацикливания ребаланса:
const consumer = kafka.consumer({ groupId: 'consumer_1', sessionTimeout: 10_000, heartbeatInterval: 200, rebalanceTimeout: 1000, retry: { // По умолчанию true, // kafka js в случае ошибок автоматические перезапускает consumer restartOnFailure: async () => false, }, });
А на уровне оркестратора (Kubernetes, Docker Swarm, systemd и т.д) добавить задержку при рестарте инстанса приложения.
Разбить «тяжелое» сообщение на множество более быстро обрабатывающихся. Как альтернатива, лишь получать данные по Kafkа, а сам процесс обработки делать вне консьюмера: cron, RabbitMQ и т.д.
Зачастую на ранних этапах разработки заводят один тип инстанса приложения в качестве консьюмера. В момент, когда топиков становится больше и при этом требуется отмасштабировать чтение какого-то топика, получается следующая картина:

Допустим для двух топиков (topic-1 имеет три партиции, а topic-2 одну) запущены три инстанса приложения. Для первого топика масштабирование чтения удалось, каждый консьюмер получил по одной партиции. А вот второй топик как читал один консьюмер, так и будет читать, но при этом появились два простаивающих консьюмера.
У подхода с одним типом инстанса для всех топиков есть несколько проблем:
Простаивающие консьюмеры, которые все равно участвуют в ребалансе и тем самым могут порождать проблемы. Создавать одинаковое количество партиций для каждого топика не рациональная трата ресурсов Kafka.
Перекосы в обработке сообщений. Замасштабировав чтение одного топика можно сильно увеличить скорость обработки сообщений в другом топике. Это может быть совершенно не нужно, а даже наоборот: внешние системы или БД не выдержат количества обращений к ним из консьюмеров.
Остановка всех консьюмер групп для изменения настроек одной группы. Например, для смены оффсета.
Решение тут следующее, а именно создавать отдельный тип инстанса под каждый топик для того, чтобы иметь возможность точечно масштабировать чтение и проводить технические работы без необходимости остановки других консьюмер групп.
Cледующая ситуация может cтать неприятным сюрпризом в продакшене: консьюмер группа неожиданно уходит в ребаланс из-за перезапуска то одного, то другого консьюмера. При этом сами инстансы приложения не перезапускаются, а сетевых проблем нет, как и проблем с таймаутами. И лишь из анализа логов становится ясно, что причина кроется в необработанной ошибке внутри консьюмера.
Пример бесконечно рестартящегося консьюмера:
const consumer = kafka.consumer({ groupId: 'consumer_1', sessionTimeout: 10_000, heartbeatInterval: 200, }); await consumer.run({ eachMessage: async () => { // code... throw new Error('Unhandled error!!!!'); // code... }, });
По умолчанию KafkaJS имеет довольно специфичное поведение при возникновении необработанной ошибки в консьюмере, на которое в документации не делается явного акцента. Консьюмер пытается обработать сообщение заданное количество раз (по умолчанию 5), в случае превышения количества попыток производит перезапуск самого себя. Этот процесс будет повторяться бесконечно, пока ошибка не исчезнет.
В чем смысл такого дефолтного поведения? Скорее всего, разработчики библиотеки исходили из того, что при падении консьюмера из одной группы не надо останавливать консьюмеры из других групп, если они запущены в том же инстансе приложения.
В разделе выше уже говорилось, что не рекомендуется организовывать таким образом консьюмер группы. И в случае, если консьюмер группы живут в отдельных инстансах приложения, то смысла в бесконечном рестарте нет. Бесконечный ребаланс станет причиной того, что инстансы приложения будут избыточно потреблять ресурсы (RAM и CPU), а брокер (group coordinator) получит постоянную дополнительную нагрузку. При всем при этом обработка сообщений из других партиций будет парализованна в той или иной мере (учитывая с какой частотой идет ребаланс).
В целом стратегию работы с ошибками в консьюмере можно свести к следующим действиям:
Остановить обработку сообщений в партиции с помощью pause/resume, если ошибка может решиться спустя время. Например, при превышении лимитов запросов к внешней системе или при ее временной недоступности:
await consumer.run({ eachMessage: async ({ pause }) => { try { // code... await externalApi.post(); // code... } catch (error) { if (error instanceof RequestLimitError) { const resume = pause(); setTimeout(() => { resume(); }, RequestLimitError.timeout+10); } throw error; } }, });
Иметь глобальный try/catch в eachMessage/eachBatch:
В случае, если порядок обработки не важен, то пропустить проблемное сообщение и дать консьюмеру продолжить работать (а с ошибкой разбираться позже):
await consumer.run({ eachMessage: async () => { try { // code... throw new UnhandledError('Unhandled error!!!!'); // code... } catch (error) { if (error instanceof UnhandledError) { console.log(error); return; } throw error; } }, });
Выставить ограниченное количество рестартов для консьюмеров в случае возникновения критической ошибки. После исчерпания попыток консьюмер не будет перезапущен (KafkaJSNonRetriableError):
const consumer = kafka.consumer({ groupId: 'consumer_1', retry: { // Кол-во рестартов консьюмера retries: 10, /* * true - консьюмер перезапустится после ошибки * false - консьюмер останавливается после исчерпания попыток * и бросается ошибка KafkaJSNonRetriableError */ restartOnFailure: async () => false, }, });
Если при обработке сообщения ошибка не может решиться сама собой (без привлечения человека), то консьюмер группа, находящаяся в бесконечном ребалансе приносит больше проблем, чем пользы. Рекомендуется после n неудачных попыток останавливать консьюмер группу (ребаланс не даст полноценно работать другим консьюмерам) и иметь алерт в случае ее остановки. На уровне оркестратора приложения так же следует выставить аналогичные лимиты для того чтобы избежать cитуации с бесконечным рестартом инстансов.
Для отслеживания перезапуска консьюмера можно использовать следующее событие:
consumer.on(consumer.events.CRASH, (event) => { /** * { * id: 30, * type: 'consumer.crash', * timestamp: 1770644228813, * payload: { * error: KafkaJSNonRetriableError * Caused by: Unhandled error!!!! * ... { * name: 'KafkaJSNumberOfRetriesExceeded', * retriable: false, * helpUrl: undefined, * retryCount: 1, * retryTime: 682, * [cause]: Error: Unhandled error!!!! * ... * }, * groupId: 'consumer_1', * restart: true * } * } */ console.log(event); if (event.payload.restart === false) { // Если false, то значит последующего перезапуска уже не будет и // можно проводить graceful shutdown } });
Если на стороне продьюсера ценой потери пропускной способности (транзакции) можно добиться exactly once, то для консьюмера это невозможно без риска никогда не обработать сообщение:
await consumer.run({ autoCommit: false, eachMessage: async ({ topic, partition, message }) => { // ⚠️ Cразу коммитим оффсет await consumer.commitOffsets([ { topic, partition, offset: (BigInt(message.offset) + 1n).toString() } ]); // Обработка сообщения // Eсли что-то пошло не так, то сообщение никогда не будет обработано } });
По умолчанию оффсет фиксируется консьюмером после обработки полученного батча (autoCommit: true) от брокера. Кроме того, можно настроить сдвиг оффеста через определенный промежуток времени (autoCommitInterval) или после обработки определенного количества сообщений (autoCommitThreshold).
Поэтому банальная ошибка не делать консьюмеры готовыми к повторному чтению одного и того сообщения. Необходимость в повторном чтении может возникнуть по ряду причин:
Ошибка в ходе обработки.
Консьюмер после обработки сообщения не успел зафиксировать оффсет в брокере.
Сдвинули оффсет партиции через api Kafka.
Следовательно, логика консьюмера должна полагаться на следующие подходы при работе с дублями:
Идемпотентность. Например, производить обновление данных в БД через update/deleteв ходе обработки сообщений, которые при повторных вызовах не сломают данные.
Внешние ограничения. Например, на уровне БД иметь уникальное поле (номер заказа, id пользователя и т.д.). При повторной обработке сообщения консьюмер должен корректно обработать ошибку уникальности.
Хранение id обработанных сообщений. Например, консьюмер при обработке сообщения сохраняет его уникальный идентификатор в отдельную таблицу. Если впоследствии консьюмер обнаруживает дубль сообщения, то корректно обрабатывает его. Важно, что обработка сообщения и сохранение id сообщения должны происходить в одной транзакции или в рамках одной атомарной операции. К примеру, писать результат обработки в реляционную БД, а id сообщения в Redis нельзя из-за отсутствия гарантий целостности.
В следующей части будут затронуты вопросы, касающиеся структуры сообщения, применимости Kafka и проблемы гарантии доставки данных.
Предыдущая часть:
Ставь cтрелку вверх, если нашел что-то полезное для себя 🤓