Паттерны конкурентности в Go. Подробный разбор. Часть 2. Fan-Out/Fan-In
- воскресенье, 22 марта 2026 г. в 00:00:01
Паттерн Fan-Out/Fan-In — это подход к параллельной обработке задач в Go с использованием горутин и каналов. Он позволяет распределить работу между несколькими исполнителями (Fan-Out) и затем собрать результаты в одном месте (Fan-In).
Паттерн Fan-Out/Fan-In очень похож на паттерн Worker Pool. Более того, в большинстве практических случаев Worker Pool можно считать частным (специальным) случаем паттерна Fan-Out/Fan-In.
По сути, принципиальная разница заключается в том, что Fan-Out/Fan-In обычно использует для обработки индивидуальный pipeline для каждого воркера, что подразумевает индивидуальный канал результатов для каждого воркера. После выполнения pipeline, результаты из всех каналов объединяются (Fan-In). Это позволяет легко масштабировать на более сложные пайплайны (например, если следующий этап обработки нуждается в отдельных потоках).
В Fan-Out/Fan-In есть явный этап Fan-In — функция merge в данном примере, которая запускает дополнительные горутины для копирования данных из нескольких каналов в один. Таким образом, Fan-Out/Fan-In имеет больше overhead (дополнительные каналы и горутины для merge), но он более гибкий для цепочек обработки (pipeline-style: source → parallel process → aggregate).
Вторым отличием Fan-Out/Fan-In является необязательное ограничение на фиксированное количество воркеров. Можно создавать динамически по одному воркеру на каждую задачу.
И откровенно говоря, различия между этими паттернами носят скорее идеологический характер, приводят к бесконечным спорам и разбираться в них - головная боль, для того кто их изучает.
Пример реализации, приведенный ниже, имеет две ключевые особенности:
Количество воркеров не фиксировано, а определяется количеством задач.
Каждый воркер имеет свой канал результатов, который объединяется в один итоговый канал.
Данные особенности не являются "руководством к действию", но намеренно продемонстрированы, чтобы показать отличия от паттерна Worker Pool, рассмотренного в предыдущей статье.
Воркер здесь - функция, возвращающая канал результатов, в отличие от примера Workerpool, где воркер принимал канал результатов в качестве параметра.
func worker(x int, workerID int) chan int { out := make(chan int) go func() { defer close(out) fmt.Printf("worker %d, job %d\n", workerID, x) out <- x * x }() return out }
Воркер запускает горутину, в которой выполняет работу (в примере - это возведение числа в квадрат), отправляет результат в канал. По завершении горутины канал закрывается. Срок жизни горутин воркеров отдельно не контроллируем, поскольку приложение в любом случае не завершится, пока не завершатся все горутины воркеров. В реальной жизни так делать конечно не стоит, и нужно использовать контекст, для того чтобы иметь возможность завершить горутины по требованию или таймауту.
Продюсером в данном примере является цикл for ... range, запускающий по одному воркеру на каждый элемент из слайса входных данных. Каналы воркеров сохраняются в слайсе chnls.
for i, x := range data { chnls = append(chnls, worker(x, i+1)) }
Роль Fun-In в данном примере выполняет функция merge. На вход она принимает слайс каналов созданный Fan-Out, создает общий выходной канал out для сбора данных. Далее, он в цикле для каждого канала из слайса запускает горутину, в которой осуществляется чтение из канала в общий канал out. Цикл чтения завершается после закрытия канала воркером.
Кроме того, запускается горутина, ожидающая завершения всех горутин чтения из каналов воркеров. Для ожидания используется sync.WaitGroup. По завершении всех горутин чтения из каналов воркеров общий канал out закрывается. Закрытие канала out является сигналом для consumer’а для завершения работы, поскольку все данные получены.
Если ожидание завершения горутин сделать не в отдельной горутине а в основном потоке, возникнет deadlock, поскольку консьюмер еще не начал читать данные из общего канала out и первая же попытка записи в out заблокирует канал, вдеь канал out небуферизированный.
func merge(chnls ...chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup for _, ch := range chnls { wg.Go(func() { for x := range ch { out <- x } }) } go func() { wg.Wait() close(out) }() return out }
Consumer’ом в данном примере является простой цикл for ... range, который читает данные из общего канала out. Цикл чтения завершится, когда горутина ожидания в функции merge (см. выше) закроет канал out.
Во всем примере используются небуферизированные каналы. Для чтения из каналов используется удобный цикл for ... range, завершающийся после закрытия канала. Это позволяет писать компактный и легко читаемый код.
Данный пример является демонстрационным и не пригоден для использования в production как есть. В частности, запуск неограниченного числа воркеров - плохая практика. Также, в боевом коде обязательно нужно использовать контекст для управления жизненным циклом горутин. Использование контекста позволяет прерывать приложение по требованию и избегать его зависания при чрезмерно длительных операциях (например файловый или сетевой ввод/вывод). Впрочем, это уже не относится к теме данной статьи.
package fanoutfanin import ( "fmt" "sync" ) func worker(x int, workerID int) chan int { out := make(chan int) go func() { defer close(out) fmt.Printf("worker %d, job %d\n", workerID, x) out <- x * x }() return out } func merge(chnls ...chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup for _, ch := range chnls { wg.Go(func() { for x := range ch { out <- x } }) } go func() { wg.Wait() close(out) }() return out } func Run() { data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} results := make([]int, 0, len(data)) chnls := make([]chan int, 0, len(data)) // Fan Out, producer for i, x := range data { chnls = append(chnls, worker(x, i+1)) } // Fan In out := merge(chnls...) // consumer for x := range out { results = append(results, x) } fmt.Println("Results:", results) }
Продолжение следует…