Приоритет выше закона физики: как мы победили «шумных соседей» в Kafka на 301 млн сообщений
- суббота, 25 апреля 2026 г. в 00:00:10
Привет, Habr!
Меня зовут Магомед, я руководитель команды «Платформа платежей и коммуникаций» в Lenta tech («Группа Лента»). В статье хочу разобрать задачу приоритезации уведомлений в Kafka на высоких нагрузках. Речь пойдет о платформе коммуникаций, которая обрабатывает SMS, push, email и мессенджеры и за месяц отправляет более 301 млн сообщений.
Проблема, с которой пришлось столкнуться, типовая для таких систем: как гарантировать быструю доставку критичных сообщений, если основной объем трафика — это массовые рассылки.
Единая платформа коммуникаций обрабатывает все исходящие уведомления: SMS, push, email, мессенджеры. За последний месяц отправлено 301 млн сообщений. Требования делятся на три класса:

Ключевое бизнес-требование: маркетинговый трафик не должен создавать «пробки» для OTP и транзакционных уведомлений. Приоритет должен соблюдаться для всей платформы.
Проблема «шумных соседей»: миллионные очереди маркетинговых рассылок не должны влиять на задержки критических OTP.
Kafka не поддерживает приоритезацию «из коробки». Внутри одного топика и партиции сообщения читаются строго последовательно (FIFO). Это означает, что если в одном потоке окажутся OTP-коды и маркетинговые рассылки, критичные сообщения неизбежно будут стоять в очереди за менее приоритетными. Это поведение нельзя изменить настройками, поэтому приоритезация решается только на уровне архитектуры.
В системе потоки были физически разделены сразу по нескольким осям:
по приоритету (P1, P2, P3);
по каналу доставки (SMS, push, email, мессенджеры);
по бренду.
Такой подход позволил изолировать нагрузку и задать разные параметры обработки для разных типов сообщений.
Для критичных уведомлений (P1) важна минимальная задержка, поэтому чтение и запись идут по одному сообщению. Для сервисных уведомлений (P2) используется умеренный батчинг. Для маркетинга (P3) — большой batch.size, где ключевым становится пропускная способность.
Логичный вопрос: почему не использовать один топик и разнести приоритеты по партициям, например: P1 → партиция 0, P2 → партиция 1, P3 → партиция 2, а затем читать их параллельно?
У такого подхода действительно есть плюсы. Он проще с точки зрения структуры, не требует большого количества топиков и, например, не сталкивается с проблемой «мерцания», которая возникает при батчевой обработке. Но на практике проявляются ограничения:
Невозможно задать разные параметры потребления для разных партиций в рамках одной consumer group. Для P1 нужна минимальная задержка и маленькие батчи, для P3 — наоборот, крупные батчи и высокая пропускная способность. В одной группе это не разделить.
Появляется проблема управления ограниченным каналом отправки. Даже если сообщения читаются параллельно, маркетинг может занять все доступные ресурсы, например, лимит SMS-провайдера, и начать вытеснять OTP. В Kafka нет встроенного механизма, который гарантирует приоритет на уровне внешнего канала.
Возникают сложности с масштабированием. Увеличение числа партиций для P3 приводит к ребалансировке consumer group, что затрагивает и P1, и P2. В результате критичные потоки начинают зависеть от изменений в низкоприоритетных.
При равномерной нагрузке и отсутствии ограничений на стороне каналов такой подход мог бы работать лучше. Но в условиях, где маркетинг генерирует основной объем трафика, а каналы доставки имеют жесткие лимиты, он не решает ключевую задачу — гарантированную приоритезацию.
В качестве альтернативы также рассматривался RabbitMQ с приоритетными очередями, где поддержка приоритетов есть на уровне брокера.
Но и этот вариант не подошел. При миллионных нагрузках возникают ограничения по пропускной способности, а добавление RabbitMQ означало бы введение дополнительной технологии в стек с дублированием уже существующего функционала.
При этом Kafka уже использовалась как основа инфраструктуры и обеспечивала необходимую пропускную способность и репликацию, поэтому переход на отдельный инструмент ради приоритезации не выглядел оправданным.
Гибкость конфигурации — возможность оптимизировать настройки Kafka под уникальный профиль нагрузки каждого приоритета (батчинг, количество партиций, таймауты).
Управление ресурсами канала — мьютекс создает backpressure на чтении, освобождая лимитированный канал отправки (SMS-агрегатор, email-провайдер) для критических OTP.
Операционная изоляция — проблемы с маркетинговыми рассылками (лаги, сбои провайдера) не влияют на доставку OTP.
Многоподовая координация — mutex на redis обеспечивает синхронизацию между несколькими подами.
Модель с отдельными топиками и иерархическими мьютексами выбрана как оптимальный баланс между:
Бизнес-требованиями — OTP доставляются максимально быстро независимо от загрузки маркетинга.
Нагрузочными характеристиками — миллионы P3 и единицы P1 требуют принципиально разных настроек Kafka.
Инфраструктурными ограничениями — несколько подов с распределением партиций и лимитированный канал отправки.
Альтернативные подходы либо не обеспечивали нужной гибкости настройки, либо не решали проблему приоритезации ограниченного канала отправки.
Унифицированный подход: один и тот же механизм приоритезации работает для SMS, push, email и любых других каналов.
Принцип работы диспетчера:
Для каждого канала (а для push — и для каждого бренда) выстраивается своя цепочка мьютексов (P1 → P2 → P3).
Потребитель сначала пытается читать из P1-топика конкретного канала.
Формат топика: commservice.<канал>.messages.[<бренд>.]priority<уровень>.
Если в P1 есть сообщения — обрабатывает их и не переходит к P2/P3 этого же канала.
Только когда P1-топик канала пуст, потребитель переключается на P2, затем на P3.
Зачем это нужно: критичные сообщения (OTP) не должны ждать маркетинговых рассылок. Для push-канала дополнительно обеспечивается изоляция трафика разных брендов.
Для координации между обработчиками разных приоритетов в многоподовой среде был разработан пакет podmutex. Он реализует иерархическую блокировку с поддержкой отложенной разблокировки для решения проблемы «мерцания».
В системе для каждого канала связи (SMS, push, email, call) выстраивается своя цепочка мьютексов, отражающая иерархию приоритетов: P1 → P2 → P3.
Структура Mutex:
go type Mutex struct { lockCount atomic.Int32 // 0 — разблокирован, >0 — заблокирован ch chan any // Канал для получения сигнала пробуждения от родителя childs chan chan any // Очередь дочерних каналов, ожидающих разблокировки parent *Mutex // Ссылка на родительский мьютекс (высший приоритет) }
Создание иерархии в коде (функция newMutexex):
go func newMutexex(ctx context.Context, cont *app.Container) (priorityMutex1, priorityMutex2, priorityMutex3 priorityMutex) { // Опциональное выключение мьютексов через флаг UsePodMutex if !cont.Conf.App.UsePodMutex { return podmutex.NewNoop(), podmutex.NewNoop(), podmutex.NewNoop() } // Создаем корневой мьютекс для P1 mutex1 := podmutex.NewMutex(nil) // Оборачиваем его в DelayedMutex для решения проблемы мерцания delayedMutex := podmutex.NewDelayedMutex(mutex1) cont.HandlerWaitGroup.Add(1) go delayedMutex.Run(ctx, cont.HandlerWaitGroup) // Создаем дочерние мьютексы для P2 и P3 mutex2 := podmutex.NewMutex(mutex1) // P2 зависит от P1 mutex3 := podmutex.NewMutex(mutex2) // P3 зависит от P2 return delayedMutex, mutex2, mutex3 }
Логика работы Lock():
go func (m *Mutex) Lock(ctx context.Context) { // Пытаемся захватить блокировку (CAS: 0 → 1) if m.lockCount.CompareAndSwap(0, 1) && m.parent != nil && m.parent.Locked() { // Условие выполняется ТОЛЬКО если: // 1. Мы успешно захватили блокировку // 2. У нас есть родитель // 3. Родитель УЖЕ заблокирован (кем-то выше по иерархии) // В этом случае мы встаем в очередь ожидания родителя m.parent.childs <- m.ch select { case <-ctx.Done(): // По таймауту или отмене контекста case <-m.ch: // Получили сигнал пробуждения от родителя } } }
Ключевые моменты:
Если родитель не заблокирован, дочерний мьютекс захватывает блокировку и не встает в очередь.
Блокировка захватывается до проверки родителя, а не после.
Ожидание происходит только в случае, если вышестоящий приоритет уже активен.
Пробуждение ожидающих обработчиков (Unlock):
go func (m *Mutex) Unlock() { if m.lockCount.CompareAndSwap(1, 0) { // Снимаем блокировку (1 → 0) // Проходим по всем дочерним очередям и пробуждаем их for { select { case ch := <-m.childs: select { case ch <- nil: // Отправляем сигнал пробуждения default: // Канал уже закрыт или не готов } default: return // Очередь пуста } } } }
Проверка состояния блокировки:
go func (m *Mutex) Locked() bool { return m.lockCount.Load() > 0 }
Эта проверка используется обработчиками P2 и P3 перед началом обработки батча: если Locked() возвращает true (блокировка захвачена P1), низкоприоритетный обработчик приостанавливается.
В нашей системе P1-топики (OTP) настроены на чтение по одному сообщению (BatchSize = 1), чтобы минимизировать задержку. P3-топики (маркетинг) читаются батчами (например, по 1000 сообщений) для обеспечения пропускной способности. Несколько подов работают параллельно.
Симптом: P1 обрабатывается и вызывает Unlock() за микросекунды. P3, читающий батч из 1000 сообщений, проверяет Locked() только между батчами, а не между отдельными сообщениями. Короткая блокировка P1 просто не попадает в окно проверки P3.
Итог: приоритет нарушается — маркетинговые сообщения продолжают обрабатываться, забивая канал отправки, хотя в очереди есть критичные OTP. Та же проблема актуальна для push, email и call-каналов.
DelayedMutex — это обертка над обычным мьютексом, которая разблокирует его не сразу, а с небольшой задержкой. Это дает «окно», в течение которого блокировка остается захваченной, даже если P1 уже завершил обработку. За это время все поды успевают заметить блокировку.
go const DelayDuration = 50 * time.Microsecond type DelayedMutex struct { lockCount atomic.Int32 // 0 — нейтрально, 1 — ожидает разблокировки, 2 — подтверждение parent *Mutex } func (m *DelayedMutex) Lock(ctx context.Context) { m.lockCount.Store(0) // Сбрасываем счетчик при новой блокировке m.parent.Lock(ctx) // Блокируем родительский мьютекс } func (m *DelayedMutex) Unlock() { m.lockCount.Store(1) // Не разблокируем сразу, только выставляем флаг } func (m *DelayedMutex) Locked() bool { return m.parent.Locked() // Делегируем родителю } func (m DelayedMutex) Run(ctx context.Context, wg sync.WaitGroup) { defer wg.Done() ticker := time.NewTicker(DelayDuration) for { select { case <-ctx.Done(): ticker.Stop() return case <-ticker.C: switch m.lockCount.Load() { case 1: m.lockCount.Store(2) // Первый тик — потенциал на разблокировку case 2: m.lockCount.Store(0) // Второй тик — точно разблокируем m.parent.Unlock() } } } }
Инициализация в реальном коде (пример для бренда lo):
go // Создаем иерархию мьютексов для бренда lo loMutexPriority1, loMutexPriority2, loMutexPriority3 := newMutexex(ctx, cont) // Запускаем ResponseWriter (горутины для записи ответов) cont.LoKafkaPushServicePriority1.ResponseWriter().Run(cont.AppCtx, cont.AppWaitGroup) // Запускаем потребителей с разными мьютексами cont.KafkaService.RunBatchConsumer(ctx, handlers.NewKafkaPushHandler( topicNameP1, batchSizeP1, kafkaService, loMutexPriority1, // DelayedMutex для P1 serviceP1, responseWriterP1, log, )) cont.KafkaService.RunBatchConsumer(ctx, handlers.NewKafkaPushHandler( topicNameP2, batchSizeP2, kafkaService, loMutexPriority2, // Обычный Mutex для P2 serviceP2, responseWriterP2, log, )) cont.KafkaService.RunBatchConsumer(ctx, handlers.NewKafkaPushHandler( topicNameP3, batchSizeP3, kafkaService, loMutexPriority3, // Обычный Mutex для P3 serviceP3, responseWriterP3, log, ))

Эффект: создается искусственное «окно» удержания блокировки длительностью 50–100 мкс. За это время:
Все поды, обрабатывающие P3, успевают заметить блокировку (через вызов Locked()) и приостановиться.
Если приходит новое OTP, блокировка не снимается вовсе — она плавно переходит к следующему сообщению без разрыва.
В реальной системе каждый потребитель (SMS, call, email, push) инициализируется со своим мьютексом. Мьютекс передается в хендлер и используется внутри него для координации.
Как хендлер использует мьютекс (внутренняя логика):
Перед началом обработки батча хендлер проверяет mutex.Locked().
Если Locked() == true (мьютекс заблокирован более высоким приоритетом), хендлер приостанавливает чтение и встает в ожидание.
Если Locked() == false, хендлер вызывает mutex.Lock() (через DelayedMutex для P1 или напрямую для P2/P3), захватывает блокировку и начинает обработку батча.
После обработки батча вызывается mutex.Unlock().
Важное уточнение: для P1 используется DelayedMutex, поэтому Unlock() не снимает блокировку мгновенно, а создает «окно» в 50–100 мкс. Это эмпирически подобранное значение. Будет слишком мало — не поймаем мерцание. Слишком много — искусственно затормозим P3, даже когда P1 реально простаивает. Для P2 и P3 используется обычный Mutex, и их Unlock() снимает блокировку сразу.
Изоляция каналов:
Проблемы с email-провайдером (замедление отправки) влияют только на email-очереди, но не затрагивают SMS и push.
Для push-канала дополнительно обеспечивается изоляция по брендам.
Размер батчей по каналам:
Р1: max.poll.records = 1 — минимальная задержка для OTP.
P2: max.poll.records = 50 — баланс задержки и пропускной способности.
P3: max.poll.records = 1000 — максимальная пропускная способность.
Мониторинг по каналам и приоритетам (отслеживаем через Grafana, настроены алерты на рост лагов):
Lag P1 — всегда около 0 (красный индикатор).
Lag P3 — колеблется, показывает загрузку конвейера (зеленый индикатор).
Управление жизненным циклом:
ResponseWriter (горутины для записи ответов) запускаются с контекстом AppCtx (долгоживущим).
Kafka-консюмеры запускаются с контекстом команды ctx.
Это гарантирует, что ResponseWriter завершат работу после консюмеров.
Гранулярность блокировок — текущая реализация блокирует ВСЕ низкоприоритетные обработчики конкретного канала при активности P1 этого же канала. Например, пришедшее OTP по SMS блокирует всю маркетинговую SMS-рассылку, но не трогает email-маркетинг. Для push-канала блокировка изолирована внутри бренда.
Риск голодания P2 — при постоянном потоке P1, P2 может не получать обработку (в реальности OTP не создает непрерывного потока).
Сложность отладки — распределенные мьютексы сложнее отлаживать, чем локальные.
Зависимость от времени — DelayDuration = 50 мкс может потребовать перенастройки при изменении нагрузки или инфраструктуры.
Наша система выдерживает 301 млн сообщений в месяц (пик до 1500 msg/сек), сохраняя мгновенную доставку критических OTP и транзакционных уведомлений по всем каналам связи. Для push-канала дополнительно обеспечена изоляция трафика разных брендов.
Выводы, которые мы вынесли из этого решения:
Единая платформа коммуникаций требует единого подхода к приоритезации, независимо от канала доставки.
Kafka — распределенный лог, а не очередь с приоритетами. Приоритеты эмулируются архитектурно через систему топиков <канал>-<приоритет> (с возможным разделением по брендам для push).
Физическое разделение топиков + умный консюмер с иерархическими мьютексами — базовый паттерн, масштабируемый на любое количество каналов.
DelayedMutex решает проблему «мерцания» высокоприоритетных сообщений для всех типов каналов, гарантируя, что OTP (P1) не будут задерживаться маркетингом (P3), даже при огромной разнице в объемах трафика.
Флаг UsePodMutex позволяет отключить механизм при необходимости (для тестирования или аварийного переключения).
Как думаете, хороший способ приоритезации уведомлений мы придумали или есть варианты получше?