golang

Миграция с Polling на Debezium

  • четверг, 12 марта 2026 г. в 00:00:06
https://habr.com/ru/companies/wildberries/articles/1006890/

Вступление

Привет, Хабр! Меня зовут Ибрагим и я бэкенд-разработчик в команде, которая разрабатывает C2C-площадку в Wildberries — раздел, где пользователи могут продавать друг другу товары через объявления. За каждым объявлением и заказом стоит набор данных, который аналитики используют для отслеживания метрик платформы: жизненный цикл объявлений, конверсии, статусы заказов.

Чтобы эти данные попадали в ClickHouse к аналитикам, нам нужна была надёжная синхронизация из PostgreSQL. Долгое время она работала через polling-сервис — и какое-то время нас это устраивало. Но по мере роста платформы проблемы стали проявляться всё чаще: данные терялись, лаг репликации достигал нескольких минут, а нагрузка на базу росла вместе с объёмами.

Когда проблема стала достаточно ощутимой, мы с тимлидом начали смотреть в сторону CDC — он подсветил Debezium как подходящий инструмент, я взял реализацию на себя: написал Go-сервис, развернул всю инфраструктуру локально и протестировал. Далее описал девопсам всю схему и мы развернули инфраструктуру на виртуалках. В этой статье расскажу, что из этого вышло.

Проблема: почему polling перестал нас устраивать

[График: Старая архитектура]

До внедрения Debezium синхронизация данных из PostgreSQL в ClickHouse работала через polling-сервис. Каждые несколько секунд он делал SELECT по отслеживаемым таблицам, сравнивал результаты с предыдущим состоянием через вспомогательную таблицу и отправлял изменения дальше в Kafka — откуда они попадали в ClickHouse к аналитикам.

Со временем в этой схеме проявился ряд проблем.

1. Тихие потери данных

В pipeline обработки при любой ошибке запись просто отбрасывалась:

v, err := f(ctx, v)
if err != nil {
    continue // данные потеряны навсегда
}
output <- v

Retry-механизма не было. Dead letter queue — тоже. Если Kafka временно недоступна или упал запрос к связанным данным, событие исчезало тихо, без алертов.

2. Race condition с кэшем

Сервис определял «изменилась ли запись» по локальному кэшу, который обновлялся только после полного успеха всей цепочки. Если запись в мастер-базу падала уже после отправки события в Kafka — кэш оставался в старом состоянии. При следующем тике та же запись фильтровалась как «уже обработанная», и ClickHouse либо получал дубль, либо терял обновление — в зависимости от того, где именно произошла ошибка.

3. Растущая нагрузка на базу

Polling — это регулярные сканирования таблиц. Пока данных мало, это незаметно, но с ростом объёмов каждый тик становится всё тяжелее и начинает конкурировать с основной нагрузкой на PostgreSQL.

Все эти проблемы — следствие одного архитектурного решения: мы пытались вычитать изменения через SELECT, вместо того чтобы получать их напрямую из базы. Polling не даёт гарантий доставки и плохо масштабируется. Нужно было менять подход.

Решение: CDC через Debezium

Change Data Capture (CDC) — это подход, при котором изменения в базе данных отслеживаются не через периодические SELECT-ы, а напрямую из журнала транзакций. В PostgreSQL этот журнал называется WAL (Write-Ahead Log): каждый INSERT, UPDATE и DELETE фиксируется в нём до того, как применяется к данным. Debezium читает WAL в режиме реального времени и публикует события об изменениях в Kafka.

Почему именно Debezium?

Debezium — production-ready решение с нативной поддержкой PostgreSQL WAL, интеграцией с Kafka и Schema Registry из коробки, которое можно запустить в максимально быстрые сроки.

Альтернативы (свой кастомный CDC-сервис, Maxwell's Daemon, AWS DMS) требовали либо значительных затрат на разработку и поддержку, либо имели ограничения по поддержке PostgreSQL и жёсткую зависимость от поставщика инфраструктуры.

Мы развернули Debezium как коннектор поверх Kafka Connect — он подписался на нужные таблицы и начал стримить изменения напрямую из WAL.

Это закрывает все три проблемы предыдущей схемы:

  • Потери данных и отсутствие retry. Debezium сам гарантирует доставку событий в Kafka как минимум один раз (at-least-once). Оффсет в WAL сдвигается только после подтверждения записи в Kafka, поэтому при сбое коннектор просто перечитает с последней зафиксированной позиции — ничего не теряется.

  • Race condition с кэшем. Самодельного кэша для сравнения состояний больше нет. Debezium сам отдаёт состояние записи до и после изменения прямо в теле события, поэтому логика «изменилось или нет» перестала быть нашей ответственностью.

  • Нагрузка на базу. Чтение WAL идёт через механизм логической репликации PostgreSQL — отдельно от основного трафика. Никаких SELECT-ов по таблицам, никакой конкуренции с продовой нагрузкой.

Архитектура

Общий поток данных выглядит так:

[График: Общая схема архитектуры]
[График: Общая схема архитектуры]

PostgreSQL → Debezium

База данных шардирована на 10 шардов (cl1–cl10). Для каждого шарда развёрнут отдельный Debezium-коннектор поверх Kafka Connect. Коннекторы читают WAL через механизм логической репликации PostgreSQL и публикуют события в Kafka. Каждое событие содержит состояние записи до изменения (before) и после (after), тип операции (c/u/d) и метаданные источника.

Сообщения сериализуются в Avro через Confluent Schema Registry. Топики именуются по схеме debezium_cl{N}.v1.{table} — например, debezium_cl1.v1.adverts.

Kafka

Используется кластер data_change_kafka для CDC-событий от Debezium. Consumer group kafka_connect_dc обеспечивает балансировку нагрузки и управление оффсетами.

Go-сервис

Состоит из нескольких слоёв:

  • Collector — для каждого топика запускается отдельная горутина. Оффсет коммитится только после успешной обработки сообщения. В будущем будет внедрен паттерн inbox для обеспечения большей точности данных.

  • Handler — декодирует Avro-сообщение: извлекает schema ID из заголовка, запрашивает схему из Schema Registry, десериализует в JSON. Затем распаковывает Debezium envelope и передаёт after-значение в сервисный слой с таймаутом 10 секунд.

Пример декодирования avro-сообщения в []byte:

func (h *KafkaHandler) decodeAvroToJson(v []byte) ([]byte, error) {
if len(v) < 5 {
return nil, fmt.Errorf("message too short for Confluent format")
}
 
magicByte := v[0]
if magicByte != 0 {
return nil, fmt.Errorf("invalid magic byte: expected 0, got %d", magicByte)
}
 
schemaID := int(binary.BigEndian.Uint32(v[1:5]))
h.log.Info("schema ID", "id", schemaID)
 
getJson := func(codec *goavro.Codec) ([]byte, error) {
native, _, err := codec.NativeFromBinary(v[5:])
if err != nil {
h.log.Error("failed to deserialize Avro message", "error", err, "schema_id", schemaID)
return nil, err
}
 
jsonData, _ := json.MarshalIndent(native, "", "  ")
h.log.Info("deserialized Avro message", "data", string(jsonData))
 
return jsonData, nil
}
 
var schema *srclient.Schema
for i, hSchemaRegistry := range h.schemaRegistries {
value, err := hSchemaRegistry.GetSchema(schemaID)
if i == len(h.schemaRegistries)-1 && err != nil {
return nil, fmt.Errorf("failed to get schema %d: %w", schemaID, err)
}
 
if err == nil {
schema = value
break
}
}
 
codec, err := goavro.NewCodec(schema.Schema())
if err != nil {
return nil, fmt.Errorf("failed to create codec: %w", err)
}
 
h.log.Info("created new codec", "schema_id", schemaID)
 
jsonData, err := getJson(codec)
if err != nil {
return nil, err
}
 
return jsonData, nil
}
  • Service — обогащает событие данными из PostgreSQL: подтягивает изображения, видео и SRID. Затем трансформирует модель Debezium в модель ClickHouse и передаёт в батчер.

  • Batcher — накапливает записи в памяти и сбрасывает их в ClickHouse по двум триггерам: по достижению порогового размера батча или по таймеру. Это позволяет делать bulk insert вместо единичных запросов и существенно снижает нагрузку на ClickHouse. ClickHouse не любит единичные инсерты, особенно реплицированный. При частых единичных инсертах ClickHouse будет выдавать ошибку: Merges are processing significantly slower than inserts.

ClickHouse

Данные пишутся в distributed-таблицы (listings_raw_distributed, orders_raw_distributed, order_statuses_raw_distributed) в формате (key, json_value, timestamp). JSON-колонка даёт гибкость: аналитики могут работать с данными через материализованные представления без изменения схемы вставки.

Результат

Лучший способ показать эффект от миграции — график лага репликации из Grafana за период с 23 по 29 января.

До 27 января сервис работал на polling. Лаг репликации по всем шардам регулярно уходил в диапазон 3–8 минут, с пиками до 8.33 минут. Это означало, что аналитики видели данные с задержкой до нескольких минут, а в моменты пиковой нагрузки — ещё дольше.

27 января был выкачен Debezium. После этого момента на графике отчётливо видна граница: столбики практически исчезают, лаг репликации по всем шардам падает до единиц секунд и держится стабильно низким вплоть до конца периода наблюдения.

[График: Лаг репликации PostgreSQL → ClickHouse, январь 2025]
[График: Лаг репликации PostgreSQL → ClickHouse, январь 2025]

Это не просто косметическое улучшение. За графиком стоят конкретные изменения:

  • Латентность упала с 3–8 минут до секунд. Аналитики получают данные практически в реальном времени.

  • Нагрузка на PostgreSQL снизилась — регулярные SELECT-сканирования таблиц исчезли, освободив ресурсы для основного трафика.

  • Надёжность доставки стала системной, а не зависящей от корректности нашего кэша и retry-логики. At-least-once гарантии теперь обеспечивает Debezium и Kafka, а не самописный код.

Подводные камни и на что обратить внимание

  • Настройка WAL в PostgreSQL. Для работы Debezium нужно убедиться, что wal_level = logical. По умолчанию в PostgreSQL стоит replica — без явной настройки коннектор просто не запустится. Также стоит следить за max_replication_slots и max_wal_senders — при большом количестве шардов лимиты по умолчанию можно исчерпать быстрее, чем ожидаешь.

  • Replication slot накапливает WAL. Если Go-сервис или Kafka Connect упадут и перестанут читать, PostgreSQL не сможет почистить WAL — слот держит все изменения до момента последнего подтверждённого оффсета. При длительном простое это может привести к неожиданному росту диска на базе. Стоит настроить мониторинг на размер replication slot и алерты на его отставание. Наши DevOps вместе с командой DBA настроили алерты как раз на такие случаи.

  • Каждый Debezium-коннектор должен иметь свой уникальный replication slot. Если два коннектора попытаются использовать один слот, второй просто не запустится, а при некорректном завершении слот может остаться «висеть» и удерживать WAL на диске. То есть все коннекторы заглохнут и Debezium перестанет слать события в Kafka. Это одна из самых распространенных ошибок при интеграции.

  • Schema Registry — единая точка отказа. Если Schema Registry недоступен, Handler не сможет декодировать Avro-сообщения. Мы держим две реплики, но это всё равно компонент, за которым нужно следить отдельно. Если Schema Registry падает, то нужно отключать читку сообщений, на случай нежелательных комитов.

  • At-least-once означает возможные дубли. При сбоях одно и то же событие может быть обработано дважды. В нашем случае это решается на уровне ClickHouse — движок ReplacingMergeTree или дедупликация через материализованные представления. Если у вас другой сценарий, идемпотентность нужно закладывать явно.

  • Snapshot при первом запуске. При первом старте коннектора Debezium делает initial snapshot — читает все текущие данные таблицы целиком. На больших таблицах это может занять значительное время и создать пиковую нагрузку. Лучше планировать первый запуск в период низкой активности и заранее оценить объём данных.

Вывод

Debezium оказался именно тем инструментом, который решил проблему на уровне архитектуры, а не на уровне костылей. Вместо того, чтобы латать polling — добавлять retry, чинить race condition, оптимизировать запросы — мы просто перестали полить и начали слушать базу. Результат виден на графике.

Далее в планах — улучшить отказоустойчивость системы путём внедрения паттерна inbox и настроить более гранулярный мониторинг consumer lag в Grafana по каждому шарду отдельно.

Если уже работали с Debezium или CDC в похожем стеке — делитесь опытом в комментариях, будет интересно сравнить подходы.