golang

Паттерны конкурентности в Go. Подробный разбор. Часть 1. Worker pool

  • четверг, 26 февраля 2026 г. в 00:00:21
https://habr.com/ru/articles/1003418/

Здесь пойдет речь именно о простых примерах реализации паттернов, годных для начального понимания и использования на собеседованиях. Это будут не полноценные production-шаблоны, а схематичные примеры с подробным описанием - что и зачем.

Worker pool

Вероятно, самый популярный паттерн на собеседованиях.
Задача: Написать код, параллельно обрабатывающий некий массив входных данных фиксированным количеством "воркеров" и складывающий результаты в выходной массив.

Часто можно встретить в статьях такую минимальную реализацию паттерна:

package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {
    const numJobs = 5
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    for a := 1; a <= numJobs; a++ {
        <-results
    }
}

И в целом, она неплохо иллюстрирует подход. Но имеет ряд принципиальных недостатков:

  1. Нет явного ожидания завершения worker’ов. Если в worker добавить код после results <-, он может выполняться уже после выхода main.

  2. Завязка на количество задач. Логика сломается, если:

    1. задач станет больше

    2. задач станет меньше

    3. появится ранний выход

    4. worker завершится с ошибкой

  3. Буфер равен числу задач. Producer может выгрузить всё сразу. При большом количестве задач - так делать не стоит.

  4. Нет закрытия results. Если заменить цикл на range results — будет deadlock. Это делает пример хрупким при модификации.

Поэтому, этот код не пригоден как production-шаблон. Ну и для обучения тоже он чересчур упрощен.
Есть немало других примеров реализации паттерна. Но я не нашел статьи, где был бы годный простой пример и подробное его описание по шагам. Поэтому решил написать еще одну.

Реализация

Здесь реализована трёхзвенная схема: producer → workers → consumer. Используем небуферизированные каналы.
Используем для чтения из каналов цикл for ... range, завершающийся после закрытия канала. Это реально удобно и красиво.

Worker

Воркер объявляем как функцию, принимающую на вход канал данных и канал результатов. Хорошим тоном будет ограничение канала данных только на чтение, а канала результатов - только на запись. Параметр workerID необходим только для визуализации процесса и в реальной жизни не нужен. Воркер в данном примере возвращает квадрат входных данных.

func worker(in <-chan int, out chan<- int, workerID int) {
	for x := range in {
		fmt.Printf("worker %d, job %d\n", workerID, x)
		out <- x * x
	}
}

Producer

Ну это самое простое. Создаем входной канал. В горутине отправляем данные в канал для последующей обработки воркерами. Как только все отправили - канал можно закрыть (лучший вариант - используя defer). Это будет сигналом воркерам о завершении работы. Срок жизни горутины отдельно не контроллируем, поскольку приложение в любом случае не завершится, пока не завершится горутина продюсера, закрывающая канал данных (зависимость косвенная, через цепочку синхронизации, см. далее).
Для чтения данных из слайса используем цикл for ... range.

	in := make(chan int)

	go func() {
		defer close(in)

		for _, x := range data {
			in <- x
		}
	}()

Workers

Создаем канал для записи результатов.Запускаем несколько горутин, в которых выполняются воркеры. Здесь уже мы контроллируем выполнение горутин, используя sync.WaitGroup. Используем метод func (wg *WaitGroup) Go(f func()), который появился в Go 1.25.0. Этот метод избавит нас от лишнего кода и потенциальных ошибок.

	var wg sync.WaitGroup
	out := make(chan int)

	for w := 1; w <= workersNum; w++ {
		wg.Go(func() {
			worker(in, out, w)
		})
	}

	go func() {
		wg.Wait()
		close(out)
	}()

Для визуализации выполнения воркеров, передаем номер воркера ему в качестве параметра. Здесь есть тонкий момент: Еще вчера такой код привел бы к ошибке. И это был один из частых вопросов на собеседованиях - почему? Здесь горутина захватывает переменную цикла w. В старых версиях Go эта переменная создавалась единожды для всего цикла. И такой код привел бы к неверным значениям w, поскольку в замыканиях переменные захватываются всегда по ссылке.
Начиная с Go 1.22 цикл for по умолчанию создаёт новую переменную для каждой итерации цикла, что устраняет классическую ошибку с замыканиями (когда все горутины захватывали одну и ту же переменную).

После запуска воркеров, нам необходимо позаботиться о закрытии канала результатов после окончания их работы. Это необходимо для завершения работы консьюмера. Ожидание и закрытие канала осуществляем в горутине, чтоб не блокировать основной поток. Контроллировать жизненный цикл горутины не нужно, поскольку пока она не завершится, не завершится цикл в консьюмере (см. ниже) и не завершится приложение.

Consumer

Тут тоже просто. Читаем данные из канала результатов уже в основном потоке, используя цикл for ... range. Чтение завершится после закрытия канала, который в свою очередь закроется после завершения работы воркеров (см. выше).

	for x := range out {
		results = append(results, x)
	}

Ошибка, которую я изначально допустил

Изначально, я написал код, который запускает консьюмер в горутине, а потом ждет уже в основной горутине завершения работы воркеров. И всё работало без ошибок - все результаты корректно собирались. Но потенциально, такое решение могло привести к ошибке, хоть и вероятность её ничтожна.

	go func() {
		for x := range out {
			results = append(results, x)
		}
	}()

	wg.Wait()	

Суть ошибки: После wg.Wait(), мы считаем, что воркеры завершили работу и все результаты корректно получены. Но на самом деле, в этом случае у нас есть гарантия, что последний результат был прочитан из канала консьюмером, но нет никакой гарантии, что консьюмер выполнил append до того, как main продолжит выполнение после wg.Wait().
Ошибка не воспроизводилась потому, что append выполняется быстро и планировщик Go просто не успевает вмешаться в процесс.

Полный код примера

package main

import (
	"fmt"
	"sync"
)

func worker(in <-chan int, out chan<- int, workerID int) {
	for x := range in {
		fmt.Printf("worker %d, job %d\n", workerID, x)
		out <- x * x
	}
}

func main() {
	const workersNum = 3
	data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	results := make([]int, 0, len(data))

	// producer
	in := make(chan int)

	go func() {
		defer close(in)

		for _, x := range data {
			in <- x
		}
	}()

	// workers
	var wg sync.WaitGroup
	out := make(chan int)

	for w := 1; w <= workersNum; w++ {
		wg.Go(func() {
			worker(in, out, w)
		})
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	// consumer
	for x := range out {
		results = append(results, x)
	}

	fmt.Println("Results:", results)
}

Продолжение следует…