Kafka, go и параллельные очереди
- пятница, 27 октября 2023 г. в 00:00:18
Представьте сайт интернет-магазина, на котором пользователи совершают покупки. Микросервисы, обрабатывающие запросы пользователей, отправляют в kafka сообщения об очередной сделанной покупке. В kafka один топик с одной партицией, из которой читает сообщения микросервис. Этот микросервис формирует историю покупок по каждому пользователю. Однажды одновременных покупок было совершено так много, что история у многих пользователей не обновлялась несколько часов.
Почему так произошло и как этого избежать? Давайте разберемся.
Смоделируем эту ситуацию с помощью трех маленьких программ на go и kafka в docker'е.
Что же изображено на схеме?
Название элемента схемы | Пояснения |
gen | Утилита на go, которая моделирует запросы пользователей и генерирует "много" сообщений. В нашем случае всего 9. Этого достаточно для демонстрации. |
P0 | Единственная партиция kafka. P - от слова partition. Ноль присутствует в обозначении потому, что далее в схему добавится еще одна партиция и их понадобится различать. |
A:1, A:2 и другие | Сообщения, которые сгенерировала утилита |
service | Сервер на go, который читает и обрабатывает сообщения. Это модель микросервиса, формирующего историю покупок. |
H0 | Обработчик сообщений. H - от слова handler. Ноль в обозначении по той же причине, что и ноль в P0 - далее добавим еще обработчик. Обработчик - это горутина, которая записывает сообщения в базу данных. |
A.db, B.db и C.db | Сервер на go, который моделирует некоторую СУБД. Не важно какую именно. Можно представить, что A.db, B.db и C.db - это шарды postgresql. Буквы в обозначениях не случайны - обработчик H0 все сообщения с ключом A отправляет в A.db, сообщения с ключом B - в B.db, сообщения с ключом С - в С.db. |
На схеме также изображены consumer (читает сообщения из kafka) и producer (записывает сообщения в kafka). Это понятия kafka, реализация которых варьируется в зависимости от языка программирования и библиотеки-клиента kafka. Я использую библиотеку segmentio/kafka-go
, в которой этим понятиям соответствуют структуры kafka.Writer
и kafka.Reader
.
На несущественных подробностях реализации gen
и service
здесь и далее в статье останавливаться не будем - с их исходным кодом можно будет ознакомиться отдельно. Однако необходимо пояснить, что на схеме делают A.db, B.db и C.db и как они реализованы.
Реализация модели СУБД проста и выделяется двумя особенностями:
в один момент времени модель обрабатывает одно сообщение (sync.Mutex
);
обработка одного сообщения занимает 1 секунду (time.Sleep
).
var dbLock = sync.Mutex{}
func writeNumber(c *fiber.Ctx) error {
dbLock.Lock()
defer dbLock.Unlock()
time.Sleep(1 * time.Second)
slog.Info("written",
"number", c.Params("number"),
)
return nil
}
Реализована модель в виде обработчика http-запроса с использованием веб-фреймворка fiber
. Сообщения в постоянное хранилище не помещаются, а вместо этого выводятся в терминал.
Если запустить все элементы схемы, то по логам станет понято, что все девять сообщений, действительно обрабатываются по одной секунде каждое.
Еще одно важное наблюдение: сообщения обрабатываются последовательно - по очереди. Обработчик сообщений H0 отправляет сообщение, ждет 1 секунду ответа от СУБД и только после этого берется за следующее сообщение.
9 секунд
-----------------------------------
C:3 C:2 C:1 B:3 B:2 B:1 A:3 A:2 A:1
Так где же находится эта очередь? Очередь - это партиция P0, из которой сообщения доставляются в обработчик сообщений H0, чтобы по одному быть отправленными в соответствующие СУБД: A.db, B.db и C.db.
Одна большая очередь - это та самая причина, по которой история покупок в интернет-магазине обновлялась несколько часов. На практике это означает следующее: если в очереди находится 10 000 сообщений, то последнее сообщение в очереди будет ждать, пока обрабатываются 9 999 сообщений перед ним. И тут можно возмутиться: любой http-сервер на go обрабатывает каждый запрос в отдельной горутине, а у нас тут всего одна горутина-обработчик H0 и огромная очередь сообщений. Неужели работа с kafka сопряжена с такими ограничениями? Нет! Просто http-сервера инкапсулируют реализацию конкурентного кода, а с kafka конкурентный код приходится писать самостоятельно. Далее я покажу, как это можно сделать. Но прежде давайте посчитаем.
СУБД A.db обрабатывает одно сообщение в секунду и B.db обрабатывает одно сообщение в секунду. Если A.db и B.db начинают обрабатывать по одному сообщению одновременно, то за одну секунду они вдвоем обработают 2 сообщения. А если к этому процессу подключить еще и C.db, то в секунду будут обрабатываться по 3 сообщения. Тогда все 9 сообщений обработаются за 3 секунды.
3 секунды
-----------
A:3 A:2 A:1
B:3 B:2 B:1
C:3 C:2 C:1
В СУБД A.db нужно последовательно записать сообщения A:1, A:2, A:3. Заметим, что у этих трех сообщений есть общая часть - ключ A, - и наложено требование на порядок обработки. Для удобства дальнейших рассуждений я введу понятие поток сообщений. Поток сообщений - это множество сообщений с общим идентификатором и установленным порядком обработки.
A:3 A:2 A:1 - это поток сообщений с идентификатором A
B:3 B:2 B:1 - это поток сообщений с идентификатором B
C:3 C:2 C:1 - это поток сообщений с идентификатором С
В очереди-партиции P0 находятся три потока сообщений, а в качестве идентификатора потока используется ключ сообщения. Давайте для начала логически выделим эти потоки.
Выходит, что распараллеливание обработки сообщений следует вести на уровне потоков сообщений. Действительно, чтобы ускорить обработку имеющихся потоков до 3 сообщений в секунду, потребуется сообщения из каждого потока поместить в отдельную очередь с собственным обработчиком.
В интернет-магазине поток сообщений - это множество сообщений о покупках одного пользователя. Идентификатором этого потока будет идентификатор пользователя. Поскольку порядок покупок в истории важен для каждого пользователя по отдельности, сообщения из каждого потока можно обрабатывать тоже по отдельности.
Первый механизм создания очередей, который мы рассмотрим, реализуется в основном средствами kafka, а именно за счет увеличения количества партиций. Как было установлено ранее, для достижения максимальной скорости обработки сообщений нужно три параллельные очереди. Однако в реальной жизни не всегда возможно и целесообразно создавать по одной партиции на поток сообщений. Поэтому добавим в нашу схему только одну дополнительную партицию.
Партиций теперь две. Чтобы дальнейшие рассуждения о партициях и consumer'ах имели смысл давайте условимся, что в рассматриваемой модели системы все consumer'ы состоят в одной consumer group.
Запускаем два consumer'а, чтобы у каждой очереди-партиции был свой обработчик сообщений. Если запустить consumer'ов меньше, чем партиций, то обязательно найдется такой consumer, который читает сообщения более чем из одной партиции. Тогда партиции, из которых читает один consumer, уже не являются отдельными очередями. Это будет одна общая очередь, потому что consumer будет обрабатывать сообщения из нескольких партиций по очереди - друг за другом. О параллельности тут и речи не будет. Это важное наблюдение, которое позволяет сделать вывод о том, что очередь параллельной обработки - это два компонента:
сама очередь сообщений (в данном случае это партиция);
обработчик сообщений (в данном случае обработчик и consumer - это одно целое).
На рисунке выше я изобразил лишь один из вариантов увеличения количества consumer'ов - в одном приложении запускаются несколько consumer'ов. Есть еще вариант - просто запустить два экземпляра приложения.
А еще можно использовать гибридное решение - запустить несколько экземпляров приложения, в каждом из которых запустить несколько consumer'ов. Серверу kafka не важно, каким образом запущены consumer'ы и на каком языке программирования они реализованы.
Разбиение топика на партиции дает возможность запустить экземпляры service на разных физических машинах. Если consumer'ы запущены на разных физических машинах (соответственно исполняются на разных процессорах), то можно говорить о том, что сообщения из каждой очереди обрабатываются именно параллельно, а не конкурентно (на одном процессоре). Эта особенность может быть полезна, если обработка сообщений включает ресурсоемкие вычисления, исполняемые на том же процессоре, на котором запущен consumer.
Можно выделить еще одно преимущество партиций. Представьте, что партиция одна, и при обработке одного из сообщений происходит ошибка. Ваше приложение делает повторные попытки обработать это сообщение. Пока эти попытки терпят неудачу, другие сообщения в партиции ждут совей очереди. А если бы партиций было больше одной, то ошибка обработки сообщения из одной партиции не затронула бы обработку сообщений из других партиций.
Для определенности далее будем рассматривать вариант с двумя экземплярами service.
На текущий момент мы определились с партициями и consumer'ами. Запустим утилиту gen.
Тут же обнаруживается проблема: нарушен порядок сообщений в потоках. Когда партиция была одна, producer в утилите gen не задумывался, в какую партицию поместить то или иное сообщение. Теперь партиций две, и producer'у нужен алгоритм выбора партиции. Библиотека segmentio/kafka-go
предоставляет такой алгоритм. Это известный алгоритм round robin, принцип работы которого следующий. Первое сообщение, которое нужно записать в kafka, - это A:1. Оно записывается в партицию P0. Далее сообщение A:2 записывается в P1. Партиций больше нет, значит A:3 записывается снова в P0. И так далее по кругу.
Чтобы более наглядно продемонстрировать нарушение порядка сообщений я специально немного "сдвинул назад во времени" сообщения в партиции P1 на схеме выше. В реальности такой сдвиг может произойти, например, из-за сетевых задержек. Тогда в соответствии со схемой сообщения поступят в обработчики H0 в следующем порядке.
A:1 A:3 B:2 C:1 A:2 C:3 B:1 B:3 C:2
Итак, количество партиций увеличено, однако порядок обработки сообщений нарушен. Причина в том, что алгоритм распределения сообщений по партициям, используемый по умолчанию, не учитывает наши потоки сообщений - потоки A, B и С. Это нормально, ведь разработчики segmentio/kafka-go
ничего не знают об этих потоках.
Меняем round robin на другой алгоритм, который учитывает наши потоки.
Здесь через f(S)=P я обозначил алгоритм, который каждому потоку сообщений (S, stream) ставит в соответствие номер партиции (P, partition). Библиотека segmentio/kafka-go
позволяет реализовать интерфейс Balancer
, чтобы направить потоки сообщений в соответствующие партиции. Ниже приведена реализация этого интерфейса, то есть приведен алгоритм f(S)=P.
type SPBalancer struct{}
func (b *SPBalancer) Balance(msg kafka.Message, partitions ...int) (partition int) {
hash := func(s string) int {
h := fnv.New32a()
h.Write([]byte(s))
return int(h.Sum32())
}
return partitions[hash(string(msg.Key))%len(partitions)]
}
spBalancer.Balance(kafka.Message{Key: []byte("A")}, 0, 1) // вернет 0
spBalancer.Balance(kafka.Message{Key: []byte("B")}, 0, 1) // вернет 1
spBalancer.Balance(kafka.Message{Key: []byte("C")}, 0, 1) // вернет 0
Алгоритм использует идентификатор потока, в нашем случае хранящийся в ключе сообщения, чтобы определить для этого сообщения номер партиции. Довольно часто алгоритм f(S)=P базируется на вычислении хеш-суммы идентификатора потока и последующем взятии остатка от целочисленного деления на количество партиций. В данном примере всего две партиции и распределить по ним сообщения не составляет труда, однако в реальных системах разработка алгоритма f(S)=P для как можно более равномерного распределения сообщений по партициям вкупе с выбором идентификатора потока является нетривиальной задачей.
Промежуточный итог. Имеем две полноценные очереди сообщений, базирующиеся на партициях.
Однако потоки A и C все еще ожидают обработки в одной очереди.
6 секунд
-----------------------
C:3 C:2 C:1 A:3 A:2 A:1
B:3 B:2 B:1
Значит, пришло время рассмотреть следующий механизм создания очередей.
Параллельный consumer - это механизм создания очередей, для реализации которого в golang используются каналы (channel). Собственно, каналы и будут играть роль очередей. Чтобы очередь была полноценной, каждый канал нужно обеспечить своим обработчиком сообщений. Напомню, что обработчик сообщений - это горутина. Взглянем поближе на то, что из себя представляет параллельный consumer, отправляющий сообщения в СУБД A.db и C.db.
Consumer извлекает пакет сообщений (batch) из партиции и распределяет сообщения по очередям-каналам. Здесь f(S)=H отображает потоки сообщений в номера обработчиков (H, handler) все с той же целью сохранения порядка сообщений. Таким образом в каждом канале оказываются сообщения только из одного потока.
В итоге система примет следующий вид.
Мы привели систему к такой конфигурации, в которой каждый поток сообщений ждет обработки в собственной очереди.
Это максимальное количество параллельных очередей, которые можно создать для обработки сообщений с учетом порядка. Но всегда ли максимальное количество очередей уместно?
Важно понимать, какие потоки сообщений есть в системе и оценить их количество. Например, если существует только один поток, то распараллеливать просто нечего - одной очереди достаточно.
Еще один повод ограничить количество очередей - это высокая нагрузка на систему к которой обращаются обработчики сообщений. В нашем случае обработчики обращаются к системе из трех СУБД A.db, B.db и C.db. Когда СУБД три, то все сообщения обрабатываются за 3 секунды, потому что у нас три потока, три очереди и три СУБД. Но давайте представим, что СУБД всего одна, и она обрабатывает сообщения из всех потоков.
Вспомните, что модель СУБД, которую мы используем, обрабатывает только одно сообщение в один момент времени благодаря sync.Mutex
. То есть конкурентные запросы просто встанут в очередь уже не в партициях kafkа или каналах service, а в единственной СУБД. В итоге 9 сообщений снова будут обрабатываться по очереди в течение 9-ти секунд. Я хочу сказать, что очень просто запустить параллельный consumer с 10 000 000 обработчиков сообщений, однако надо понимать, справляется ли с таким количеством конкурентных запросов ваша СУБД.
В заключении небольшой план по решению проблемы с длительным обновлением истории покупок в интернет-магазине из начала статьи.
Начать можно с внедрения параллельного consumer'а. Этот шаг не потребует увеличения количества партиций, ведь в некоторых случаях увеличить количество партиций просто нет возможности. А если такая возможность есть, то можно добавить партиции и запустить несколько экземпляров микросервиса, который читает сообщения kafka. Каждый поток сообщений в такой системе будет соответствовать последовательности покупок определенного пользователя, и в качестве идентификатора этого потока стоит выбрать идентификатор пользователя. Количество очередей удобно регулировать с помощью параллельного consumer - изменить количество каналов go проще, чем добавить или удалить партицию. Будет удобно, если количество каналов будет задаваться в конфигурации микросервиса.
Описание основных концепций kafka можно найти по следующим ссылкам:
По этой ссылке располагается исходный код java-библиотеки Confluent Parallel Consumer
(термин параллельный consumer я позаимствовал отсюда), которая предназначается для создания очередей сообщений в kafka consumer. В документации описаны техники параллельной обработки сообщений kafka и приводится множество сценариев использования этого подхода.
Ссылки на библиотеки go, упомянутые в статье:
segmentio/kafka-go - клиент kafka
fiber - веб-фреймворк
Исходный код приложений, рассмотренных в статье, находится здесь.