Паттерны конкурентности в Go. Подробный разбор. Часть 3. Pub/Sub
- пятница, 10 апреля 2026 г. в 00:00:11
Паттерны конкурентности в Go. Подробный разбор. Часть 1. Worker pool
Паттерны конкурентности в Go. Подробный разбор. Часть 2. Fan-Out/Fan-In
Pub/sub, сокращение от «publish-subscribe» (публикация-подписка), — это паттерн обмена сообщениями между различными частями приложения. Прелесть такого подхода в том, что отправитель сообщения не знает о получателе, а получатель не знает об отправителе. Оба они знают только об одном посреднике — брокере сообщений. Отправитель публикует сообщение, а получатель подписывается на получение сообщений. В итоге это кардинально уменьшает связанность между частями приложения и неминуемую головную боль ею вызванную.
Пример:
В приложении нужно по расписанию выполнять некие действия - например рекламные рассылки или какое-нибудь закрытие дня раз в сутки. Вы реализуете сервис плнировщика задач, при старте задаете ему расписание, по которому он публикует некие уведомления. А потом в любом месте приложения просто подписываетесь на выбранные уведомления и выполняете нужные действия при их получении. Это просто, это удобно, это гибко, это легко масштабировать. Это Pub/Sub.
В Go паттерн Pub/Sub можно реализовать несколькими способами. Один из самых простых и эффективных — использование каналов. Каналы в Go являются потокобезопасными и позволяют передавать данные между горутинами максимально простым способом. Собственно, известные брокеры сообщений (RabbitMQ, Kafka и т.д.) - это предельная реализация паттерна Pub/Sub для обмена между приложениями (микросервисами). Мы же в данной статье рассмотрим, как реализовать Pub/Sub внутри приложения с использованием каналов.
Как правило, сообщение издателя имеют два основных поля: тема (topic) и сообщение (message). Подписчики могут подписываться на определенные темы и получать все сообщения, опубликованные в этих темах. В нашем примере структура сообщения представлены типом topics map[string][]chan any. Мапа в данном случае идеально подходит. Ключом мапы является тема, а значением - срез каналов (подписчиков), по которым будем рассылать произвольные данные. В момент подписки для нового подписчика создается канал, который добавляется в срез каналов для соответствующей темы.
Начнем с минимальной струтуры брокера сообщений:
type Notifier struct { topics map[string][]chan any } func (n *Notifier) Subscribe(topic string, handler func(any)) { ch := make(chan any) n.topics[topic] = append(n.topics[topic], ch) go func() { for msg := range ch { handler(msg) } }() } func (n *Notifier) Publish(topic string, message any) { for _, ch := range n.topics[topic] { ch <- message } }
При подписке мы передаем топик, на который подписываемся и функцию-хэндлер, которая будет вызвана при получении сообщения. У функций есть параметр any, позволяющий передавать произвольные данные сообщения. Внутри метода создается канал, который добавляется в срез каналов для соответствующей темы. Если тема еще не существует, она будет создана мапой автоматически. Затем запускается горутина, которая будет ждать сообщения из канала и вызывать обработчик для каждого сообщения. Горутина завершится, когда канал будет закрыт.
При публикации, мы сначала получаем срез подписчиков (каналов) для темы, а затем отправляем сообщение во все каналы.
Что тут не так, чего не хватает? Ну как минимум - нет инициализации мапы. После объявления карты ее значение по умолчанию равно nil. Чтение из такой карты возвращает пустой результат, но попытка записи в такую карту вызывает ошибку времени выполнения. Добавим:
func NewNotifier() *Notifier { return &Notifier{ topics: make(map[string][]chan any), } }
Далее, в коде есть доступ к общему ресурсу из нескольких горутин без синхронизации, что приводит к data race и непредсказуемому поведению. Общий ресурс в данном случае - это мапа topics.
Добавим мьютекс:
type Notifier struct { topics map[string][]chan any mu sync.Mutex } func (n *Notifier) Subscribe(topic string, handler func(any)) { ch := make(chan any) n.mu.Lock() n.topics[topic] = append(n.topics[topic], ch) n.mu.Unlock() go func() { for msg := range ch { handler(msg) } }() } func (n *Notifier) Publish(topic string, message any) { n.mu.Lock() chnls := n.topics[topic] n.mu.Unlock() for _, ch := range chnls { ch <- message } }
Выглядит неплохо на первый взляд. При подписке мы захватываем мьютекс, добавляем канал в мапу, и сразу же отпускаем. При публикации мы перед получаением среза подписчиков (каналов) для темы опять захватываем мьютекс, после получения среза - отпускаем. Пока мьютекс захвачен, код может выполняться только одной горутиной. Таким образом, наш код гарантирует, что мапа не будет одновременно читаться и записываться разными горутинами. Будто бы мы сделали всё правильно. И тем не менее, в этом коде зарыта потенциальная проблема. Чтобы понять, в чем она состоит, сначала вспомним, как устроены срезы в Go:
type slice struct { ptr *array len int cap int }
Когда выполняется chnls := n.topics[topic] копируется заголовок среза, но не сам массив. Это значит, что chnls и n.topics[topic] указывают на один и тот же массив. Что тут может пойти не так? Пример:
Горутина A (Publish) читает chnls в цикле.
Горутина B (Subscribe) одновременно делает append в тот же массив.
Если при добавлении capasity среза не хватает, то append создаёт новый массив, копирует в него данные из старого массива, добавляет новый элемент и возвращает новый срез. При этом старый массив не модифицируется, поэтому конкурентного доступа к одной и той же памяти не происходит.
Если же при добавлении срезу хватает capacity, то append модифицирует тот же массив, который в данный момент читается другой горутиной, что может привести к неопределённому поведению при итерации - data race. Читать и изменять не атомарно одни и те же данные из разных потоков нельзя.
Самое простое решение - держать мютекс захваченным на время итерации по срезу в методе Publish. Но это существенно снизит производительность, Publish будет блокировать Subscribe и наоборот.
Давайте это поправим иначе:
func (n *Notifier) Subscribe(topic string, handler func(any)) { ch := make(chan any) n.mu.Lock() oldChnls := n.topics[topic] // copy-on-write to avoid data races with concurrent readers newChnls := make([]chan any, len(oldChnls)+1) copy(newChnls, oldChnls) newChnls[len(oldChnls)] = ch n.topics[topic] = newChnls n.mu.Unlock() go func() { for msg := range ch { handler(msg) } }() }
Теперь при добавлении элемента в срез всегда создаётся новый массив, который копируется из старого, и только после этого новый массив присваивается n.topics[topic]. Это гарантирует, что читающие горутины не будут читать массив, который в данный момент модифицируется, поскольку это уже разные массивы. Такой прием называется copy-on-write.
Для начала полноценного использования нашего кода нам необходим метод корректной финализации нашего Notifier, который будет закрывать все каналы и освобождать ресурсы. Добавим метод Close. Метод должен вызываться в конце работы приложения. После его вызова, использовать Notifier уже будет нельзя.
type Notifier struct { topics map[string][]chan any mu sync.Mutex closed bool } func (n *Notifier) Close() { n.mu.Lock() defer n.mu.Unlock() if n.closed { return } for _, chs := range n.topics { for _, ch := range chs { close(ch) } } n.topics = nil n.closed = true } func (n *Notifier) Subscribe(topic string, handler func(any)) error { if n.closed { return fmt.Errorf("notifier is closed") } ch := make(chan any) n.mu.Lock() oldChnls := n.topics[topic] // copy-on-write to avoid data races with concurrent readers newChnls := make([]chan any, len(oldChnls)+1) copy(newChnls, oldChnls) newChnls[len(oldChnls)] = ch n.topics[topic] = newChnls n.mu.Unlock() go func() { for msg := range ch { handler(msg) } }() return nil } func (n *Notifier) Publish(topic string, message any) error { n.mu.Lock() if n.closed { n.mu.Unlock() return fmt.Errorf("notifier is closed") } chnls := n.topics[topic] n.mu.Unlock() for _, ch := range chnls { ch <- message } return nil }
В принципе, текущую реализацию уже можно считать неким минимальным MVP, иллюстрирующим использование паттерна Pub/Sub.
Что можно было бы добавить к коду Notifier, чтобы получить из него полноценную библиотеку?
В первую очередь не хватает метода Unsubscribe, который позволит отписаться от топика. Хотя в реальных кейсах отписка используется редко, но для полноценной библиотеки это было бы логично.
Главным недостатком данной реализации является использование одной простейшей стратегии отправки сообщений подписчикам.
for _, ch := range chnls { ch <- message }
При публикации, мы последовательно отправляем сообщение в каждый канал. Поскольку мы используем небуферизированные каналы, нам приходится каждый раз ждать, пока подписчик не прочитает сообщение. Это может привести к задержкам в обработке сообщений, если хэндлер подписчика будет обрабатывать сообщение долго. Универсального способа решения этой проблемы нет. Есть несколько стратегий, у каждой из которых есть свои преимущества и недостатки. Первое и самое эффективное решение - использовать буферизированные каналы. Второе - ограничить время обработки сообщения подписчиком таймаутом. Вариантов много и их можно комбинировать для каждого конкретного случая. Подобные стратегии используются в работе и “взрослых” брокеров сообщений (NATS, Kafka и пр.).
Не будет лишним добавить recovery в функцию хэндлера подписчика. Если хэндлер подписчика упадет с паникой, это приведет к падению горутины, которая читает канал.
Полный код готовой библиотеки, пригодной для полноценного использования доступен здесь.