golang

Приоритет выше закона физики: как мы победили «шумных соседей» в Kafka на 301 млн сообщений

  • суббота, 25 апреля 2026 г. в 00:00:10
https://habr.com/ru/companies/lentatech/articles/1025636/

Привет, Habr!

Меня зовут Магомед, я руководитель команды «Платформа платежей и коммуникаций» в Lenta tech («Группа Лента»). В статье хочу разобрать задачу приоритезации уведомлений в Kafka на высоких нагрузках. Речь пойдет о платформе коммуникаций, которая обрабатывает SMS, push, email и мессенджеры и за месяц отправляет более 301 млн сообщений.

Проблема, с которой пришлось столкнуться, типовая для таких систем: как гарантировать быструю доставку критичных сообщений, если основной объем трафика — это массовые рассылки.

Бизнес-контекст и постановка задачи

Единая платформа коммуникаций обрабатывает все исходящие уведомления: SMS, push, email, мессенджеры. За последний месяц отправлено 301 млн сообщений. Требования делятся на три класса:

Ключевое бизнес-требование: маркетинговый трафик не должен создавать «пробки» для OTP и транзакционных уведомлений. Приоритет должен соблюдаться для всей платформы.

Проблема «шумных соседей»: миллионные очереди маркетинговых рассылок не должны влиять на задержки критических OTP.

Ограничения Kafka и архитектурный подход

Kafka не поддерживает приоритезацию «из коробки». Внутри одного топика и партиции сообщения читаются строго последовательно (FIFO). Это означает, что если в одном потоке окажутся OTP-коды и маркетинговые рассылки, критичные сообщения неизбежно будут стоять в очереди за менее приоритетными. Это поведение нельзя изменить настройками, поэтому приоритезация решается только на уровне архитектуры.

В системе потоки были физически разделены сразу по нескольким осям:

  • по приоритету (P1, P2, P3);

  • по каналу доставки (SMS, push, email, мессенджеры);

  • по бренду.

Такой подход позволил изолировать нагрузку и задать разные параметры обработки для разных типов сообщений.

Для критичных уведомлений (P1) важна минимальная задержка, поэтому чтение и запись идут по одному сообщению. Для сервисных уведомлений (P2) используется умеренный батчинг. Для маркетинга (P3) — большой batch.size, где ключевым становится пропускная способность.

Почему не подошли альтернативные решения

Логичный вопрос: почему не использовать один топик и разнести приоритеты по партициям, например: P1 → партиция 0, P2 → партиция 1, P3 → партиция 2, а затем читать их параллельно?

У такого подхода действительно есть плюсы. Он проще с точки зрения структуры, не требует большого количества топиков и, например, не сталкивается с проблемой «мерцания», которая возникает при батчевой обработке. Но на практике проявляются ограничения:

  1. Невозможно задать разные параметры потребления для разных партиций в рамках одной consumer group. Для P1 нужна минимальная задержка и маленькие батчи, для P3 — наоборот, крупные батчи и высокая пропускная способность. В одной группе это не разделить.

  2. Появляется проблема управления ограниченным каналом отправки. Даже если сообщения читаются параллельно, маркетинг может занять все доступные ресурсы, например, лимит SMS-провайдера, и начать вытеснять OTP. В Kafka нет встроенного механизма, который гарантирует приоритет на уровне внешнего канала.

  3. Возникают сложности с масштабированием. Увеличение числа партиций для P3 приводит к ребалансировке consumer group, что затрагивает и P1, и P2. В результате критичные потоки начинают зависеть от изменений в низкоприоритетных.

При равномерной нагрузке и отсутствии ограничений на стороне каналов такой подход мог бы работать лучше. Но в условиях, где маркетинг генерирует основной объем трафика, а каналы доставки имеют жесткие лимиты, он не решает ключевую задачу — гарантированную приоритезацию.

В качестве альтернативы также рассматривался RabbitMQ с приоритетными очередями, где поддержка приоритетов есть на уровне брокера.

Но и этот вариант не подошел. При миллионных нагрузках возникают ограничения по пропускной способности, а добавление RabbitMQ означало бы введение дополнительной технологии в стек с дублированием уже существующего функционала.

При этом Kafka уже использовалась как основа инфраструктуры и обеспечивала необходимую пропускную способность и репликацию, поэтому переход на отдельный инструмент ради приоритезации не выглядел оправданным.

Почему выбрана текущая архитектура

  • Гибкость конфигурации — возможность оптимизировать настройки Kafka под уникальный профиль нагрузки каждого приоритета (батчинг, количество партиций, таймауты).

  • Управление ресурсами канала — мьютекс создает backpressure на чтении, освобождая лимитированный канал отправки (SMS-агрегатор, email-провайдер) для критических OTP.

  • Операционная изоляция — проблемы с маркетинговыми рассылками (лаги, сбои провайдера) не влияют на доставку OTP.

  • Многоподовая координация — mutex на redis обеспечивает синхронизацию между несколькими подами.

Модель с отдельными топиками и иерархическими мьютексами выбрана как оптимальный баланс между:

  • Бизнес-требованиями — OTP доставляются максимально быстро независимо от загрузки маркетинга.

  • Нагрузочными характеристиками — миллионы P3 и единицы P1 требуют принципиально разных настроек Kafka.

  • Инфраструктурными ограничениями — несколько подов с распределением партиций и лимитированный канал отправки.

Альтернативные подходы либо не обеспечивали нужной гибкости настройки, либо не решали проблему приоритезации ограниченного канала отправки.

Единая логика потребителя: «Мьютекс на вычитку» для всех каналов

Унифицированный подход: один и тот же механизм приоритезации работает для SMS, push, email и любых других каналов.

Принцип работы диспетчера:

  • Для каждого канала (а для push — и для каждого бренда) выстраивается своя цепочка мьютексов (P1 → P2 → P3).

  • Потребитель сначала пытается читать из P1-топика конкретного канала.

  • Формат топика: commservice.<канал>.messages.[<бренд>.]priority<уровень>.

  • Если в P1 есть сообщения — обрабатывает их и не переходит к P2/P3 этого же канала.

  • Только когда P1-топик канала пуст, потребитель переключается на P2, затем на P3.

Зачем это нужно: критичные сообщения (OTP) не должны ждать маркетинговых рассылок. Для push-канала дополнительно обеспечивается изоляция трафика разных брендов.

Реализация: пакет podmutex (Go) — универсальный координатор

Для координации между обработчиками разных приоритетов в многоподовой среде был разработан пакет podmutex. Он реализует иерархическую блокировку с поддержкой отложенной разблокировки для решения проблемы «мерцания».

Базовый иерархический Mutex

В системе для каждого канала связи (SMS, push, email, call) выстраивается своя цепочка мьютексов, отражающая иерархию приоритетов: P1 → P2 → P3.

Структура Mutex:

go
type Mutex struct {
	lockCount atomic.Int32   // 0 — разблокирован, >0 — заблокирован
	ch    	chan any   	// Канал для получения сигнала пробуждения от родителя
	childs	chan chan any  // Очередь дочерних каналов, ожидающих разблокировки
	parent	*Mutex      	// Ссылка на родительский мьютекс (высший приоритет)
}

Создание иерархии в коде (функция newMutexex):

go
func newMutexex(ctx context.Context, cont *app.Container) (priorityMutex1, priorityMutex2, priorityMutex3 priorityMutex) {
	// Опциональное выключение мьютексов через флаг UsePodMutex
	if !cont.Conf.App.UsePodMutex {
    	return podmutex.NewNoop(), podmutex.NewNoop(), podmutex.NewNoop()
	}
	// Создаем корневой мьютекс для P1
	mutex1 := podmutex.NewMutex(nil)
  
	// Оборачиваем его в DelayedMutex для решения проблемы мерцания
	delayedMutex := podmutex.NewDelayedMutex(mutex1)
	cont.HandlerWaitGroup.Add(1)
	go delayedMutex.Run(ctx, cont.HandlerWaitGroup)
	// Создаем дочерние мьютексы для P2 и P3
	mutex2 := podmutex.NewMutex(mutex1)  // P2 зависит от P1
	mutex3 := podmutex.NewMutex(mutex2)  // P3 зависит от P2
	return delayedMutex, mutex2, mutex3
}

Логика работы Lock():

go
func (m *Mutex) Lock(ctx context.Context) {
	// Пытаемся захватить блокировку (CAS: 0 → 1)
	if m.lockCount.CompareAndSwap(0, 1) && m.parent != nil && m.parent.Locked() {
    	// Условие выполняется ТОЛЬКО если:
    	// 1. Мы успешно захватили блокировку
    	// 2. У нас есть родитель
    	// 3. Родитель УЖЕ заблокирован (кем-то выше по иерархии)
    	// В этом случае мы встаем в очередь ожидания родителя
    	m.parent.childs <- m.ch
       
    	select {
    	case <-ctx.Done():  // По таймауту или отмене контекста
 	   case <-m.ch:    	// Получили сигнал пробуждения от родителя
    	}
	}
}

 Ключевые моменты:

  • Если родитель не заблокирован, дочерний мьютекс захватывает блокировку и не встает в очередь.

  • Блокировка захватывается до проверки родителя, а не после.

  • Ожидание происходит только в случае, если вышестоящий приоритет уже активен.

Пробуждение ожидающих обработчиков (Unlock):

go
func (m *Mutex) Unlock() {
	if m.lockCount.CompareAndSwap(1, 0) {  // Снимаем блокировку (1 → 0)
    	// Проходим по всем дочерним очередям и пробуждаем их
    	for {
        	select {
        	case ch := <-m.childs:
            	select {
            	case ch <- nil:  // Отправляем сигнал пробуждения
            	default:      	// Канал уже закрыт или не готов
            	}
        	default:
            	return  // Очередь пуста
        	}
    	}
	}
}
 

 Проверка состояния блокировки:

go
func (m *Mutex) Locked() bool {
	return m.lockCount.Load() > 0
}

Эта проверка используется обработчиками P2 и P3 перед началом обработки батча: если Locked() возвращает true (блокировка захвачена P1), низкоприоритетный обработчик приостанавливается.

Проблема «мерцания» (flickering) для всех каналов

В нашей системе P1-топики (OTP) настроены на чтение по одному сообщению (BatchSize = 1), чтобы минимизировать задержку. P3-топики (маркетинг) читаются батчами (например, по 1000 сообщений) для обеспечения пропускной способности. Несколько подов работают параллельно.

Симптом: P1 обрабатывается и вызывает Unlock() за микросекунды. P3, читающий батч из 1000 сообщений, проверяет Locked() только между батчами, а не между отдельными сообщениями. Короткая блокировка P1 просто не попадает в окно проверки P3.

 Итог: приоритет нарушается — маркетинговые сообщения продолжают обрабатываться, забивая канал отправки, хотя в очереди есть критичные OTP. Та же проблема актуальна для push, email и call-каналов.

Решение: DelayedMutex — отложенная разблокировка

DelayedMutex — это обертка над обычным мьютексом, которая разблокирует его не сразу, а с небольшой задержкой. Это дает «окно», в течение которого блокировка остается захваченной, даже если P1 уже завершил обработку. За это время все поды успевают заметить блокировку.

go
const DelayDuration = 50 * time.Microsecond
type DelayedMutex struct {
	lockCount atomic.Int32  // 0 — нейтрально, 1 — ожидает разблокировки, 2 — подтверждение
	parent	*Mutex
}

func (m *DelayedMutex) Lock(ctx context.Context) {
    m.lockCount.Store(0)	// Сбрасываем счетчик при новой блокировке
	m.parent.Lock(ctx)   	// Блокируем родительский мьютекс
}
func (m *DelayedMutex) Unlock() {
	m.lockCount.Store(1) 	// Не разблокируем сразу, только выставляем флаг
}
func (m *DelayedMutex) Locked() bool {
	return m.parent.Locked()  // Делегируем родителю
}
func (m DelayedMutex) Run(ctx context.Context, wg sync.WaitGroup) {
	defer wg.Done()
	ticker := time.NewTicker(DelayDuration)
	for {
    	select {
    	case <-ctx.Done():
        	ticker.Stop()
        	return
    	case <-ticker.C:
        	switch m.lockCount.Load() {
        	case 1:
            	m.lockCount.Store(2) 	// Первый тик — потенциал на разблокировку
        	case 2:
            	m.lockCount.Store(0) 	// Второй тик — точно разблокируем
            	m.parent.Unlock()
        	}
    	}
	}
}
 

 Инициализация в реальном коде (пример для бренда lo):

go
// Создаем иерархию мьютексов для бренда lo
loMutexPriority1, loMutexPriority2, loMutexPriority3 := newMutexex(ctx, cont)
// Запускаем ResponseWriter (горутины для записи ответов)
cont.LoKafkaPushServicePriority1.ResponseWriter().Run(cont.AppCtx, cont.AppWaitGroup)
// Запускаем потребителей с разными мьютексами
cont.KafkaService.RunBatchConsumer(ctx, handlers.NewKafkaPushHandler(
	topicNameP1, batchSizeP1, kafkaService,
	loMutexPriority1,  // DelayedMutex для P1
	serviceP1, responseWriterP1, log,
))
cont.KafkaService.RunBatchConsumer(ctx, handlers.NewKafkaPushHandler(
	topicNameP2, batchSizeP2, kafkaService,
	loMutexPriority2,  // Обычный Mutex для P2
	serviceP2, responseWriterP2, log,
))
cont.KafkaService.RunBatchConsumer(ctx, handlers.NewKafkaPushHandler(
	topicNameP3, batchSizeP3, kafkaService,
	loMutexPriority3,  // Обычный Mutex для P3
	serviceP3, responseWriterP3, log,
))

Эффект: создается искусственное «окно» удержания блокировки длительностью 50–100 мкс. За это время:

  • Все поды, обрабатывающие P3, успевают заметить блокировку (через вызов Locked()) и приостановиться.

  • Если приходит новое OTP, блокировка не снимается вовсе — она плавно переходит к следующему сообщению без разрыва.

Интеграция с Kafka-консюмерами

В реальной системе каждый потребитель (SMS, call, email, push) инициализируется со своим мьютексом. Мьютекс передается в хендлер и используется внутри него для координации.

Как хендлер использует мьютекс (внутренняя логика):

  • Перед началом обработки батча хендлер проверяет mutex.Locked().

  • Если Locked() == true (мьютекс заблокирован более высоким приоритетом), хендлер приостанавливает чтение и встает в ожидание.

  • Если Locked() == false, хендлер вызывает mutex.Lock() (через DelayedMutex для P1 или напрямую для P2/P3), захватывает блокировку и начинает обработку батча.

  • После обработки батча вызывается mutex.Unlock().

Важное уточнение: для P1 используется DelayedMutex, поэтому Unlock() не снимает блокировку мгновенно, а создает «окно» в 50–100 мкс. Это эмпирически подобранное значение. Будет слишком мало — не поймаем мерцание. Слишком много — искусственно затормозим P3, даже когда P1 реально простаивает. Для P2 и P3 используется обычный Mutex, и их Unlock() снимает блокировку сразу.

Нагрузочные нюансы и оптимизации для многоканальности

Изоляция каналов:

  • Проблемы с email-провайдером (замедление отправки) влияют только на email-очереди, но не затрагивают SMS и push.

  • Для push-канала дополнительно обеспечивается изоляция по брендам. 

Размер батчей по каналам:

  • Р1: max.poll.records = 1 — минимальная задержка для OTP.

  • P2: max.poll.records = 50 — баланс задержки и пропускной способности.

  • P3: max.poll.records = 1000 — максимальная пропускная способность.

Мониторинг по каналам и приоритетам (отслеживаем через Grafana, настроены алерты на рост лагов):

  • Lag P1 — всегда около 0 (красный индикатор).

  • Lag P3 — колеблется, показывает загрузку конвейера (зеленый индикатор).

Управление жизненным циклом:

  • ResponseWriter (горутины для записи ответов) запускаются с контекстом AppCtx (долгоживущим).

  • Kafka-консюмеры запускаются с контекстом команды ctx.

  • Это гарантирует, что ResponseWriter завершат работу после консюмеров.

Подводные камни и ограничения подхода

  • Гранулярность блокировок — текущая реализация блокирует ВСЕ низкоприоритетные обработчики конкретного канала при активности P1 этого же канала. Например, пришедшее OTP по SMS блокирует всю маркетинговую SMS-рассылку, но не трогает email-маркетинг. Для push-канала блокировка изолирована внутри бренда.

  • Риск голодания P2 — при постоянном потоке P1, P2 может не получать обработку (в реальности OTP не создает непрерывного потока).

  • Сложность отладки — распределенные мьютексы сложнее отлаживать, чем локальные.

  • Зависимость от времени — DelayDuration = 50 мкс может потребовать перенастройки при изменении нагрузки или инфраструктуры.

Итог

Наша система выдерживает 301 млн сообщений в месяц (пик до 1500 msg/сек), сохраняя мгновенную доставку критических OTP и транзакционных уведомлений по всем каналам связи. Для push-канала дополнительно обеспечена изоляция трафика разных брендов.

 Выводы, которые мы вынесли из этого решения:

  • Единая платформа коммуникаций требует единого подхода к приоритезации, независимо от канала доставки.

  • Kafka — распределенный лог, а не очередь с приоритетами. Приоритеты эмулируются архитектурно через систему топиков <канал>-<приоритет> (с возможным разделением по брендам для push).

  • Физическое разделение топиков + умный консюмер с иерархическими мьютексами — базовый паттерн, масштабируемый на любое количество каналов.

  • DelayedMutex решает проблему «мерцания» высокоприоритетных сообщений для всех типов каналов, гарантируя, что OTP (P1) не будут задерживаться маркетингом (P3), даже при огромной разнице в объемах трафика.

  • Флаг UsePodMutex позволяет отключить механизм при необходимости (для тестирования или аварийного переключения).

 Как думаете, хороший способ приоритезации уведомлений мы придумали или есть варианты получше?