Conc: новая библиотека для управления конкурентностью в Go
- среда, 17 мая 2023 г. в 00:04:56
Одной из главных фишек языка Go является удобная работа с конкурентностью. Однако, в больших проектах всё равно возникают некоторые проблемы:
утечка горутин
некорректная обработка паник в горутинах
плохая читаемость кода
необходимость писать повторяющийся код из раза в раз
Как указывает автор библиотеки в своей статье, он часто сталкивается с ошибками при работе с горутинами, что побудило его создать новую библиотеку conc.
Библиотека предоставляет набор инструментов для управления конкурентностью в Go. Она позволяет синхронизировать доступ к общим ресурсам, а также контролировать выполнение горутин. Среди её особенностей можно отметить:
Свой WaitGroup без необходимости вызывать defer
Свой Pool для упрощения работы с запуска задач с ограничением параллельности выполнения
Методы для конкурентной работы со слайсами
Методы для работы с паниками в дочерних горутинах
Если вам не хочется, чтобы программа завершалась аварийно во время возникновения паники в дочерней горутине либо же вы хотите избежать других проблем, например, взаимоблокировок или утечек горутин, то очень непросто сделать это нативно с помощью стандартных библиотек:
type propagatedPanic struct {
val any
stack []byte
}
func main() {
done := make(chan *propagatedPanic)
go func() {
defer func() {
if v := recover(); v != nil {
done <- &propagatedPanic{
val: v,
stack: debug.Stack(),
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic()
}()
if val := <-done; val != nil {
panic(val)
}
}
Библиотека conc справляется с поставленной задачей намного элегантнее:
func main() {
var wg conc.WaitGroup
wg.Go(doSomethingThatMightPanic)
// panics with a nice stacktrace
wg.Wait()
}
Зачастую необходимо обрабатывать большие объемы данных конкурентно. Для этого обычно все элементы среза отправляются в канал, откуда их забирают дочерние горутины и там же обрабатывают.
func process(values []int) {
feeder := make(chan int, 8)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for elem := range feeder {
handle(elem)
}
}()
}
for _, value := range values {
feeder <- value
}
close(feeder)
wg.Wait()
}
С библиотекой conc для этого подойдёт iter.ForEach:
func process(values []int) {
iterator := iter.Iterator[int]{
MaxGoroutines: len(input) / 2,
}
iterator.ForEach(values, handle)
}
Либо если вам нужно сопоставить элементы выходной массив так, чтобы output[i] = f(input[i])
:
func process(
input []int,
f func(int) int,
) []int {
output := make([]int, len(input))
var idx atomic.Int64
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
i := int(idx.Add(1) - 1)
if i >= len(input) {
return
}
output[i] = f(input[i])
}
}()
}
wg.Wait()
return output
}
Гораздо проще и понятнее воспользоваться методом iter.Map:
func process(
values []int,
f func(*int) int,
) []int {
mapper := iter.Mapper[int, int]{
MaxGoroutines: len(input) / 2,
}
return mapper.Map(input, f)
}
Выше были показаны только основные варианты работы с данной библиотекой, гораздо больше примеров вы можете найти непосредственно в исходниках. Если вам интересно, как работать с определённым методом, достаточно найти пример использования в файлах с тестами.
Также стоит отметить, что текущая версия библиотеки — pre-1.0. По заявлению разработчиков, перед выпуском версии 1.0 должны быть внесены незначительные изменения: стабилизация API и настройка параметров по умолчанию. Поэтому использовать данную библиотеку в больших проектах пока что может быть немного рискованно, но начать знакомство можно уже сейчас, тем более исходников там не слишком много (не больше 2к строк кода).