Миграция с Polling на Debezium
- четверг, 12 марта 2026 г. в 00:00:06
Привет, Хабр! Меня зовут Ибрагим и я бэкенд-разработчик в команде, которая разрабатывает C2C-площадку в Wildberries — раздел, где пользователи могут продавать друг другу товары через объявления. За каждым объявлением и заказом стоит набор данных, который аналитики используют для отслеживания метрик платформы: жизненный цикл объявлений, конверсии, статусы заказов.
Чтобы эти данные попадали в ClickHouse к аналитикам, нам нужна была надёжная синхронизация из PostgreSQL. Долгое время она работала через polling-сервис — и какое-то время нас это устраивало. Но по мере роста платформы проблемы стали проявляться всё чаще: данные терялись, лаг репликации достигал нескольких минут, а нагрузка на базу росла вместе с объёмами.
Когда проблема стала достаточно ощутимой, мы с тимлидом начали смотреть в сторону CDC — он подсветил Debezium как подходящий инструмент, я взял реализацию на себя: написал Go-сервис, развернул всю инфраструктуру локально и протестировал. Далее описал девопсам всю схему и мы развернули инфраструктуру на виртуалках. В этой статье расскажу, что из этого вышло.

[График: Старая архитектура]
До внедрения Debezium синхронизация данных из PostgreSQL в ClickHouse работала через polling-сервис. Каждые несколько секунд он делал SELECT по отслеживаемым таблицам, сравнивал результаты с предыдущим состоянием через вспомогательную таблицу и отправлял изменения дальше в Kafka — откуда они попадали в ClickHouse к аналитикам.
Со временем в этой схеме проявился ряд проблем.
В pipeline обработки при любой ошибке запись просто отбрасывалась:
v, err := f(ctx, v) if err != nil { continue // данные потеряны навсегда } output <- v
Retry-механизма не было. Dead letter queue — тоже. Если Kafka временно недоступна или упал запрос к связанным данным, событие исчезало тихо, без алертов.
Сервис определял «изменилась ли запись» по локальному кэшу, который обновлялся только после полного успеха всей цепочки. Если запись в мастер-базу падала уже после отправки события в Kafka — кэш оставался в старом состоянии. При следующем тике та же запись фильтровалась как «уже обработанная», и ClickHouse либо получал дубль, либо терял обновление — в зависимости от того, где именно произошла ошибка.
Polling — это регулярные сканирования таблиц. Пока данных мало, это незаметно, но с ростом объёмов каждый тик становится всё тяжелее и начинает конкурировать с основной нагрузкой на PostgreSQL.
Все эти проблемы — следствие одного архитектурного решения: мы пытались вычитать изменения через SELECT, вместо того чтобы получать их напрямую из базы. Polling не даёт гарантий доставки и плохо масштабируется. Нужно было менять подход.
Change Data Capture (CDC) — это подход, при котором изменения в базе данных отслеживаются не через периодические SELECT-ы, а напрямую из журнала транзакций. В PostgreSQL этот журнал называется WAL (Write-Ahead Log): каждый INSERT, UPDATE и DELETE фиксируется в нём до того, как применяется к данным. Debezium читает WAL в режиме реального времени и публикует события об изменениях в Kafka.
Debezium — production-ready решение с нативной поддержкой PostgreSQL WAL, интеграцией с Kafka и Schema Registry из коробки, которое можно запустить в максимально быстрые сроки.
Альтернативы (свой кастомный CDC-сервис, Maxwell's Daemon, AWS DMS) требовали либо значительных затрат на разработку и поддержку, либо имели ограничения по поддержке PostgreSQL и жёсткую зависимость от поставщика инфраструктуры.
Мы развернули Debezium как коннектор поверх Kafka Connect — он подписался на нужные таблицы и начал стримить изменения напрямую из WAL.
Это закрывает все три проблемы предыдущей схемы:
Потери данных и отсутствие retry. Debezium сам гарантирует доставку событий в Kafka как минимум один раз (at-least-once). Оффсет в WAL сдвигается только после подтверждения записи в Kafka, поэтому при сбое коннектор просто перечитает с последней зафиксированной позиции — ничего не теряется.
Race condition с кэшем. Самодельного кэша для сравнения состояний больше нет. Debezium сам отдаёт состояние записи до и после изменения прямо в теле события, поэтому логика «изменилось или нет» перестала быть нашей ответственностью.
Нагрузка на базу. Чтение WAL идёт через механизм логической репликации PostgreSQL — отдельно от основного трафика. Никаких SELECT-ов по таблицам, никакой конкуренции с продовой нагрузкой.
Общий поток данных выглядит так:
![[График: Общая схема архитектуры] [График: Общая схема архитектуры]](https://habrastorage.org/r/w1560/getpro/habr/upload_files/4e1/93d/e74/4e193de7465f3dd02f8c8200c1f11252.png)
База данных шардирована на 10 шардов (cl1–cl10). Для каждого шарда развёрнут отдельный Debezium-коннектор поверх Kafka Connect. Коннекторы читают WAL через механизм логической репликации PostgreSQL и публикуют события в Kafka. Каждое событие содержит состояние записи до изменения (before) и после (after), тип операции (c/u/d) и метаданные источника.
Сообщения сериализуются в Avro через Confluent Schema Registry. Топики именуются по схеме debezium_cl{N}.v1.{table} — например, debezium_cl1.v1.adverts.
Используется кластер data_change_kafka для CDC-событий от Debezium. Consumer group kafka_connect_dc обеспечивает балансировку нагрузки и управление оффсетами.
Состоит из нескольких слоёв:
Collector — для каждого топика запускается отдельная горутина. Оффсет коммитится только после успешной обработки сообщения. В будущем будет внедрен паттерн inbox для обеспечения большей точности данных.
Handler — декодирует Avro-сообщение: извлекает schema ID из заголовка, запрашивает схему из Schema Registry, десериализует в JSON. Затем распаковывает Debezium envelope и передаёт after-значение в сервисный слой с таймаутом 10 секунд.
Пример декодирования avro-сообщения в []byte:
func (h *KafkaHandler) decodeAvroToJson(v []byte) ([]byte, error) { if len(v) < 5 { return nil, fmt.Errorf("message too short for Confluent format") } magicByte := v[0] if magicByte != 0 { return nil, fmt.Errorf("invalid magic byte: expected 0, got %d", magicByte) } schemaID := int(binary.BigEndian.Uint32(v[1:5])) h.log.Info("schema ID", "id", schemaID) getJson := func(codec *goavro.Codec) ([]byte, error) { native, _, err := codec.NativeFromBinary(v[5:]) if err != nil { h.log.Error("failed to deserialize Avro message", "error", err, "schema_id", schemaID) return nil, err } jsonData, _ := json.MarshalIndent(native, "", " ") h.log.Info("deserialized Avro message", "data", string(jsonData)) return jsonData, nil } var schema *srclient.Schema for i, hSchemaRegistry := range h.schemaRegistries { value, err := hSchemaRegistry.GetSchema(schemaID) if i == len(h.schemaRegistries)-1 && err != nil { return nil, fmt.Errorf("failed to get schema %d: %w", schemaID, err) } if err == nil { schema = value break } } codec, err := goavro.NewCodec(schema.Schema()) if err != nil { return nil, fmt.Errorf("failed to create codec: %w", err) } h.log.Info("created new codec", "schema_id", schemaID) jsonData, err := getJson(codec) if err != nil { return nil, err } return jsonData, nil }
Service — обогащает событие данными из PostgreSQL: подтягивает изображения, видео и SRID. Затем трансформирует модель Debezium в модель ClickHouse и передаёт в батчер.
Batcher — накапливает записи в памяти и сбрасывает их в ClickHouse по двум триггерам: по достижению порогового размера батча или по таймеру. Это позволяет делать bulk insert вместо единичных запросов и существенно снижает нагрузку на ClickHouse. ClickHouse не любит единичные инсерты, особенно реплицированный. При частых единичных инсертах ClickHouse будет выдавать ошибку: Merges are processing significantly slower than inserts.
Данные пишутся в distributed-таблицы (listings_raw_distributed, orders_raw_distributed, order_statuses_raw_distributed) в формате (key, json_value, timestamp). JSON-колонка даёт гибкость: аналитики могут работать с данными через материализованные представления без изменения схемы вставки.
Лучший способ показать эффект от миграции — график лага репликации из Grafana за период с 23 по 29 января.
До 27 января сервис работал на polling. Лаг репликации по всем шардам регулярно уходил в диапазон 3–8 минут, с пиками до 8.33 минут. Это означало, что аналитики видели данные с задержкой до нескольких минут, а в моменты пиковой нагрузки — ещё дольше.
27 января был выкачен Debezium. После этого момента на графике отчётливо видна граница: столбики практически исчезают, лаг репликации по всем шардам падает до единиц секунд и держится стабильно низким вплоть до конца периода наблюдения.
![[График: Лаг репликации PostgreSQL → ClickHouse, январь 2025] [График: Лаг репликации PostgreSQL → ClickHouse, январь 2025]](https://habrastorage.org/r/w1560/getpro/habr/upload_files/b71/4f3/b35/b714f3b35f43781decba33730fadff73.png)
Это не просто косметическое улучшение. За графиком стоят конкретные изменения:
Латентность упала с 3–8 минут до секунд. Аналитики получают данные практически в реальном времени.
Нагрузка на PostgreSQL снизилась — регулярные SELECT-сканирования таблиц исчезли, освободив ресурсы для основного трафика.
Надёжность доставки стала системной, а не зависящей от корректности нашего кэша и retry-логики. At-least-once гарантии теперь обеспечивает Debezium и Kafka, а не самописный код.
Настройка WAL в PostgreSQL. Для работы Debezium нужно убедиться, что wal_level = logical. По умолчанию в PostgreSQL стоит replica — без явной настройки коннектор просто не запустится. Также стоит следить за max_replication_slots и max_wal_senders — при большом количестве шардов лимиты по умолчанию можно исчерпать быстрее, чем ожидаешь.
Replication slot накапливает WAL. Если Go-сервис или Kafka Connect упадут и перестанут читать, PostgreSQL не сможет почистить WAL — слот держит все изменения до момента последнего подтверждённого оффсета. При длительном простое это может привести к неожиданному росту диска на базе. Стоит настроить мониторинг на размер replication slot и алерты на его отставание. Наши DevOps вместе с командой DBA настроили алерты как раз на такие случаи.
Каждый Debezium-коннектор должен иметь свой уникальный replication slot. Если два коннектора попытаются использовать один слот, второй просто не запустится, а при некорректном завершении слот может остаться «висеть» и удерживать WAL на диске. То есть все коннекторы заглохнут и Debezium перестанет слать события в Kafka. Это одна из самых распространенных ошибок при интеграции.
Schema Registry — единая точка отказа. Если Schema Registry недоступен, Handler не сможет декодировать Avro-сообщения. Мы держим две реплики, но это всё равно компонент, за которым нужно следить отдельно. Если Schema Registry падает, то нужно отключать читку сообщений, на случай нежелательных комитов.
At-least-once означает возможные дубли. При сбоях одно и то же событие может быть обработано дважды. В нашем случае это решается на уровне ClickHouse — движок ReplacingMergeTree или дедупликация через материализованные представления. Если у вас другой сценарий, идемпотентность нужно закладывать явно.
Snapshot при первом запуске. При первом старте коннектора Debezium делает initial snapshot — читает все текущие данные таблицы целиком. На больших таблицах это может занять значительное время и создать пиковую нагрузку. Лучше планировать первый запуск в период низкой активности и заранее оценить объём данных.
Debezium оказался именно тем инструментом, который решил проблему на уровне архитектуры, а не на уровне костылей. Вместо того, чтобы латать polling — добавлять retry, чинить race condition, оптимизировать запросы — мы просто перестали полить и начали слушать базу. Результат виден на графике.
Далее в планах — улучшить отказоустойчивость системы путём внедрения паттерна inbox и настроить более гранулярный мониторинг consumer lag в Grafana по каждому шарду отдельно.
Если уже работали с Debezium или CDC в похожем стеке — делитесь опытом в комментариях, будет интересно сравнить подходы.