Паттерны конкурентности в Go. Подробный разбор. Часть 1. Worker pool
- четверг, 26 февраля 2026 г. в 00:00:21
Здесь пойдет речь именно о простых примерах реализации паттернов, годных для начального понимания и использования на собеседованиях. Это будут не полноценные production-шаблоны, а схематичные примеры с подробным описанием - что и зачем.
Вероятно, самый популярный паттерн на собеседованиях.
Задача: Написать код, параллельно обрабатывающий некий массив входных данных фиксированным количеством "воркеров" и складывающий результаты в выходной массив.
Часто можно встретить в статьях такую минимальную реализацию паттерна:
package main import ( "fmt" "time" ) func worker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Println("worker", id, "started job", j) time.Sleep(time.Second) fmt.Println("worker", id, "finished job", j) results <- j * 2 } } func main() { const numJobs = 5 jobs := make(chan int, numJobs) results := make(chan int, numJobs) for w := 1; w <= 3; w++ { go worker(w, jobs, results) } for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) for a := 1; a <= numJobs; a++ { <-results } }
И в целом, она неплохо иллюстрирует подход. Но имеет ряд принципиальных недостатков:
Нет явного ожидания завершения worker’ов. Если в worker добавить код после results <-, он может выполняться уже после выхода main.
Завязка на количество задач. Логика сломается, если:
задач станет больше
задач станет меньше
появится ранний выход
worker завершится с ошибкой
Буфер равен числу задач. Producer может выгрузить всё сразу. При большом количестве задач - так делать не стоит.
Нет закрытия results. Если заменить цикл на range results — будет deadlock. Это делает пример хрупким при модификации.
Поэтому, этот код не пригоден как production-шаблон. Ну и для обучения тоже он чересчур упрощен.
Есть немало других примеров реализации паттерна. Но я не нашел статьи, где был бы годный простой пример и подробное его описание по шагам. Поэтому решил написать еще одну.
Здесь реализована трёхзвенная схема: producer → workers → consumer. Используем небуферизированные каналы.
Используем для чтения из каналов цикл for ... range, завершающийся после закрытия канала. Это реально удобно и красиво.
Воркер объявляем как функцию, принимающую на вход канал данных и канал результатов. Хорошим тоном будет ограничение канала данных только на чтение, а канала результатов - только на запись. Параметр workerID необходим только для визуализации процесса и в реальной жизни не нужен. Воркер в данном примере возвращает квадрат входных данных.
func worker(in <-chan int, out chan<- int, workerID int) { for x := range in { fmt.Printf("worker %d, job %d\n", workerID, x) out <- x * x } }
Ну это самое простое. Создаем входной канал. В горутине отправляем данные в канал для последующей обработки воркерами. Как только все отправили - канал можно закрыть (лучший вариант - используя defer). Это будет сигналом воркерам о завершении работы. Срок жизни горутины отдельно не контроллируем, поскольку приложение в любом случае не завершится, пока не завершится горутина продюсера, закрывающая канал данных (зависимость косвенная, через цепочку синхронизации, см. далее).
Для чтения данных из слайса используем цикл for ... range.
in := make(chan int) go func() { defer close(in) for _, x := range data { in <- x } }()
Создаем канал для записи результатов.Запускаем несколько горутин, в которых выполняются воркеры. Здесь уже мы контроллируем выполнение горутин, используя sync.WaitGroup. Используем метод func (wg *WaitGroup) Go(f func()), который появился в Go 1.25.0. Этот метод избавит нас от лишнего кода и потенциальных ошибок.
var wg sync.WaitGroup out := make(chan int) for w := 1; w <= workersNum; w++ { wg.Go(func() { worker(in, out, w) }) } go func() { wg.Wait() close(out) }()
Для визуализации выполнения воркеров, передаем номер воркера ему в качестве параметра. Здесь есть тонкий момент: Еще вчера такой код привел бы к ошибке. И это был один из частых вопросов на собеседованиях - почему? Здесь горутина захватывает переменную цикла w. В старых версиях Go эта переменная создавалась единожды для всего цикла. И такой код привел бы к неверным значениям w, поскольку в замыканиях переменные захватываются всегда по ссылке.
Начиная с Go 1.22 цикл for по умолчанию создаёт новую переменную для каждой итерации цикла, что устраняет классическую ошибку с замыканиями (когда все горутины захватывали одну и ту же переменную).
После запуска воркеров, нам необходимо позаботиться о закрытии канала результатов после окончания их работы. Это необходимо для завершения работы консьюмера. Ожидание и закрытие канала осуществляем в горутине, чтоб не блокировать основной поток. Контроллировать жизненный цикл горутины не нужно, поскольку пока она не завершится, не завершится цикл в консьюмере (см. ниже) и не завершится приложение.
Тут тоже просто. Читаем данные из канала результатов уже в основном потоке, используя цикл for ... range. Чтение завершится после закрытия канала, который в свою очередь закроется после завершения работы воркеров (см. выше).
for x := range out { results = append(results, x) }
Изначально, я написал код, который запускает консьюмер в горутине, а потом ждет уже в основной горутине завершения работы воркеров. И всё работало без ошибок - все результаты корректно собирались. Но потенциально, такое решение могло привести к ошибке, хоть и вероятность её ничтожна.
go func() { for x := range out { results = append(results, x) } }() wg.Wait()
Суть ошибки: После wg.Wait(), мы считаем, что воркеры завершили работу и все результаты корректно получены. Но на самом деле, в этом случае у нас есть гарантия, что последний результат был прочитан из канала консьюмером, но нет никакой гарантии, что консьюмер выполнил append до того, как main продолжит выполнение после wg.Wait().
Ошибка не воспроизводилась потому, что append выполняется быстро и планировщик Go просто не успевает вмешаться в процесс.
package main import ( "fmt" "sync" ) func worker(in <-chan int, out chan<- int, workerID int) { for x := range in { fmt.Printf("worker %d, job %d\n", workerID, x) out <- x * x } } func main() { const workersNum = 3 data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} results := make([]int, 0, len(data)) // producer in := make(chan int) go func() { defer close(in) for _, x := range data { in <- x } }() // workers var wg sync.WaitGroup out := make(chan int) for w := 1; w <= workersNum; w++ { wg.Go(func() { worker(in, out, w) }) } go func() { wg.Wait() close(out) }() // consumer for x := range out { results = append(results, x) } fmt.Println("Results:", results) }
Продолжение следует…