Корутины для Go
- четверг, 23 ноября 2023 г. в 00:00:17
Эта заметка о том, зачем нам нужен пакет coroutine для Go и как он будет выглядеть. Но прежде всего, что такое корутины?
Сегодня каждый программист знаком с вызовами функций (подпрограмм): F вызывает G, которая останавливает F и запускает G. G выполняет свою работу, потенциально вызывая и ожидая другие функции, и в конце концов возвращается. Когда G возвращается, G уже нет, а F продолжает работать. В этой схеме одновременно выполняется только одна функция, а ее вызывающие ожидают, поднимаясь вверх по стеку вызовов.
В отличие от подпрограмм, корутины выполняются конкурентно на разных стеках, но все равно верно, что одновременно выполняется только одна функция, а ее вызывающая сторона ждет. F запускает G, но G не выполняется немедленно. Вместо этого F должен явно возобновить (resume) выполнение G, который затем начинает выполняться. В любой момент G может развернуться и вернуться (yield) назад к F. Это приостановит G и продолжит F операцию возобновления. В конце концов F снова вызывает resume, который приостанавливает F и продолжает G после выхода. И так далее, туда-сюда, пока G не вернется, что приведет к очистке G и продолжению F с последней операции возобновления, с некоторым сигналом для F, что G закончена и F больше не должна пытаться возобновить G. В этом паттерне одновременно выполняется только одна корутина, а ее вызывающая сторона ждет на другом стеке. Они выполняются по очереди в четко определенном, скоординированном порядке.
Это несколько абстрактно. Давайте посмотрим на реальные программы.
В качестве примера можно привести сравнение двух двоичных деревьев на предмет наличия в них одинаковой последовательности значений, даже если их структуры различны. Например, здесь приведен код на языке Lua 5 для генерации некоторых двоичных деревьев:
function T(l, v, r)
return {left = l, value = v, right = r}
end
e = nil
t1 = T(T(T(e, 1, e), 2, T(e, 3, e)), 4, T(e, 5, e))
t2 = T(e, 1, T(e, 2, T(e, 3, T(e, 4, T(e, 5, e)))))
t3 = T(e, 1, T(e, 2, T(e, 3, T(e, 4, T(e, 6, e)))))
Деревья t1
и t2
содержат значения 1, 2, 3, 4, 5; t3
содержит 1, 2, 3, 4, 6.
Мы можем написать корутину для обхода дерева и получения каждого значения:
function visit(t)
if t ~= nil then -- note: ~= is "not equal"
visit(t.left)
coroutine.yield(t.value)
visit(t.right)
end
end
Тогда для сравнения двух деревьев мы можем создать две корутины visit
и попеременно считывать и сравнивать последовательные значения:
function cmp(t1, t2)
co1 = coroutine.create(visit)
co2 = coroutine.create(visit)
while true
do
ok1, v1 = coroutine.resume(co1, t1)
ok2, v2 = coroutine.resume(co2, t2)
if ok1 ~= ok2 or v1 ~= v2 then
return false
end
if not ok1 and not ok2 then
return true
end
end
end
Аргументы t1
и t2
в coroutine.resume
используются только на первой итерации в качестве аргумента visit
. Последующие возобновления возвращают это значение из coroutine.yield
, но в коде оно игнорируется.
Более идиоматичная версия на языке Lua предполагает использование функции coroutine.wrap
, которая возвращает функцию, скрывающую объект coroutine
:
function cmp(t1, t2)
next1 = coroutine.wrap(function() visit(t1) end)
next2 = coroutine.wrap(function() visit(t2) end)
while true
do
v1 = next1()
v2 = next2()
if v1 ~= v2 then
return false
end
if v1 == nil and v2 == nil then
return true
end
end
end
После завершения работы корутины, next
функция возвращает nil
(полный код).
Python предоставляет генераторы, которые очень похожи на корутины Lua, но они не являются корутинами, поэтому стоит указать на различия. Главное отличие заключается в том, что "очевидные" программы не работают. Например, вот прямой перевод нашего дерева Lua и посетителя в Python:
def T(l, v, r):
return {'left': l, 'value': v, 'right': r}
def visit(t):
if t is not None:
visit(t['left'])
yield t['value']
visit(t['right'])
Но этот очевидный перевод не работает:
>>> e = None
>>> t1 = T(T(T(e, 1, e), 2, T(e, 3, e)), 4, T(e, 5, e))
>>> for x in visit(t1):
... print(x)
...
4
>>>
Мы потеряли 1, 2, 3 и 5. Что произошло?
В Python этот def visit
не определяет обычную функцию. Поскольку тело содержит оператор yield
, результатом является генератор:
>>> type(visit(t1))
<class 'generator'>
>>>
Вызов visit(t['left'])
вообще не выполняет код в visit
. Он только создает и возвращает новый генератор, который затем отбрасывается. Чтобы избежать отбрасывания этих результатов, необходимо выполнить цикл по генератору и повторно выдать их:
def visit(t):
if t is not None:
for x in visit(t['left']):
yield x
yield t['value']
for x in visit(t['right'])
yield x
В Python 3.3 появилась функция yield from
, позволяющая:
def visit(t):
if t is not None:
yield from visit(t['left']):
yield t['value']
yield from visit(t['right'])
Объект генератора содержит состояние единственного вызова visit
, то есть значения локальных переменных и то, какая строка выполняется. Это состояние выталкивается в стек вызовов при каждом возобновлении работы генератора и затем заталкивается обратно в объект генератора при каждом выходе (yield
), который может произойти только в самом верхнем кадре вызова. Таким образом, генератор использует тот же стек, что и исходная программа, избегая необходимости в полной реализации корутин, но вводя вместо этого непонятные ограничения.
Генераторы в Python, похоже, почти полностью скопированы с CLU, который был пионером этой абстракции (и многих других вещей), хотя в CLU они называются итераторами, а не генераторами. Итератор дерева CLU выглядит следующим образом:
visit = iter (t: cvt) yields (int):
tagcase t
tag empty: ;
tag non_empty(t: node):
for x: int
in tree$visit(t.left) do
yield(x);
end;
yield(t.value);
for x: int
in tree$visit(t.right) do
yield(x);
end;
end;
end visit;
Синтаксис отличается, особенно tagcase
, который рассматривает представление дерева в виде тегированного объединения, но основная структура, включая вложенные циклы for
, точно такая же, как и в нашей первой рабочей версии на Python. Кроме того, поскольку CLU был статически типизирован, visit
четко обозначен как итератор (iter
), а не как функция (proc
в CLU). Благодаря этой информации о типе, неправильное использование visit
в качестве обычного вызова функции, как в нашем глючном примере на Python, компилятор может диагностировать (и, как я полагаю, диагностировал).
О реализации CLU первоначальные разработчики писали: "Итераторы - это разновидность корутин; однако их использование достаточно ограничено, и они реализуются с использованием только стека программы. Поэтому использование итератора лишь немного дороже, чем использование процедуры". Это звучит точно так же, как объяснение, которое я дал выше для генераторов Python. Подробнее об этом см. в работе Барбары Лисков и др. 1977 года "Механизмы абстракции в CLU", в частности, разделы 4.2, 4.3 и 6.
На первый взгляд, корутины, потоки и генераторы выглядят одинаково. Все они обеспечивают конкурентность в той или иной форме, но имеют существенные отличия.
Корутины обеспечивают конкурентность без параллелизма: когда выполняется одна корутина, возобновляющая ее или уступающая ей не выполняется. Поскольку корутины выполняются по очереди и переключаются только в определенных точках программы, корутины могут обмениваться данными между собой без гонок. Явные переключения (coroutine.resume
в первом примере Lua или вызов функции next
во втором примере Lua) служат точками синхронизации, создавая happens-before edges. Поскольку планирование осуществляется в явном виде (без какого-либо прерывания) и полностью без участия операционной системы, переключение корутины занимает не более десяти наносекунд, а обычно и того меньше. Запуск и завершение работы также значительно дешевле, чем у потоков.
Потоки предоставляют больше возможностей, чем корутины, но и обходятся дороже. Дополнительная мощность - это параллелизм, а стоимость - это накладные расходы на планирование, включая более дорогие контекстные переключения и необходимость добавления преимуществ в той или иной форме. Обычно операционная система предоставляет потоки, и переключение потоков занимает несколько микросекунд. Для данной таксономии горутины Go являются дешевыми потоками: переключение горутины занимает ближе к нескольким сотням наносекунд, поскольку среда выполнения Go берет на себя часть работы по планированию, но горутины все равно обеспечивают полный параллелизм и вытеснение потоков. (Новые облегченные потоки Java по сути являются тем же самым, что и горутины).
Генераторы обеспечивают меньшую мощность, чем корутины, поскольку в корутине разрешен выход (yield
) только самого верхнего кадра. Этот кадр перемещается туда-сюда между объектом и стеком вызовов для его приостановки и возобновления.
Корутины являются полезным строительным блоком для написания программ, которым нужна конкурентность для структурирования программы, но не для параллелизма. Подробный пример такого подхода приведен в моей предыдущей заметке "Хранилище данных в потоке управления". Другие примеры см. в статье Аны Лусии Де Моуры и Роберто Иерусалимши "Пересмотр корутин", опубликованной в 2009 году. Оригинальный пример приведен в работе Мелвина Конвея "Design of a Separable Transition-Diagram Compiler", опубликованной в 1963 году.
Корутины - это паттерн конкурентности, который не обслуживается напрямую существующими библиотеками конкурентности Go. Горутины часто достаточно близки, но, как мы видели, это не одно и то же, и иногда это различие имеет значение.
Например, в докладе Роба Пайка "Лексическое сканирование в Go", опубликованном в 2011 году, представлены оригинальные лексер и парсер для пакета text/template. Они выполнялись в отдельных горутинах, соединенных каналом, несовершенно имитируя пару корутин: лексер и парсер работали параллельно, причем лексер просматривал следующую лексему, а парсер обрабатывал последнюю. Генераторов было бы недостаточно - лексер выдает значения из множества различных функций, но полноценные горутины оказались слишком сложными. Параллелизм, обеспечиваемый горутинами, вызвал гонки и в конечном итоге привел к отказу от этой конструкции в пользу хранилища состояния лексера в объекте, что было более точной имитацией корутин. Правильные корутины позволили бы избежать гонок и были бы более эффективны, чем горутины.
Предполагается, что в будущем корутины в Go будут использоваться для итераций над общими коллекциями. Мы обсуждали возможность добавления в Go поддержки ranging over functions, что побудило бы авторов коллекций и других абстракций предоставлять CLU-подобные функции-итераторы. Итераторы можно реализовать уже сегодня, используя значения функций, без каких-либо изменений в языке. Например, несколько упрощенный итератор деревьев в Go может выглядеть следующим образом:
func (t *Tree[V]) All(yield func(v V)) {
if t != nil {
t.left.All(yield)
yield(t.value)
t.right.All(yield)
}
}
Сегодня этот итератор может быть вызван как:
t.All(func(v V) {
fmt.Println(v)
})
и, возможно, его вариант может быть вызван в будущей версии Go как:
for v := range t.All {
fmt.Println(v)
}
Иногда, однако, мы хотим выполнить итерацию по коллекции таким образом, что это не укладывается в рамки одного цикла for
. Примером может служить сравнение бинарных деревьев: две итерации должны быть как-то чередованы. Как мы уже видели, ответ на этот вопрос дают корутины, позволяющие превратить функцию типа (*Tree).All
(итератор "push") в функцию, возвращающую поток значений, по одному за вызов (итератор "pull").
Если мы хотим добавить корутины в Go, мы должны стремиться сделать это без изменения языка. Это означает, что определение корутин должно быть доступно для реализации и понимания в терминах обычного кода Go. Позже я выскажусь за оптимизированную реализацию, предоставляемую непосредственно средой выполнения, но эта реализация должна быть неотличима от чистого определения Go.
Начнем с очень простой версии, в которой операция yield
полностью игнорируется. Она просто запускает функцию в другой горутине:
package coro
func New[In, Out any](f func(In) Out) (resume func(In) Out) {
cin := make(chan In)
cout := make(chan Out)
resume = func(in In) Out {
cin <- in
return <-cout
}
go func() { cout <- f(<-cin) }()
return resume
}
New
принимает функцию f
, которая должна иметь один аргумент и один результат. New
выделяет каналы, определяет resume
, создает горутину для выполнения f
и возвращает функцию resume
. Новая горутина блокируется на <-cin
, поэтому возможности для параллелизма отсутствуют. Функция resume
разблокирует новую горутину, посылая значение in
, а затем блокирует получение значения out
. Эта пара "посылка-получение" образует корутинный переключатель. Мы можем использовать coro.New
следующим образом (полный код):
func main() {
resume := coro.New(strings.ToUpper)
fmt.Println(resume("hello world"))
}
Пока что coro.New
- это просто неуклюжий способ вызова функции. Нам необходимо добавить yield
, который мы можем передать в качестве аргумента в f
:
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) Out) {
cin := make(chan In)
cout := make(chan Out)
resume = func(in In) Out {
cin <- in
return <-cout
}
yield := func(out Out) In {
cout <- out
return <-cin
}
go func() { cout <- f(<-cin, yield) }()
return resume
}
Заметим, что параллелизма здесь все равно нет: yield
- это еще одна пара send-receive. Эти горутины ограничены коммуникационным паттерном и действуют неотличимо от корутин.
Прежде чем перейти к преобразованию итераторов, рассмотрим несколько более простых примеров. В разделе "Хранилище данных в потоке управления" мы рассматривали проблему взятия функции
func parseQuoted(read func() byte) bool
и запустить ее в отдельном потоке управления, чтобы байты по одному передавались в метод Write
. Вместо специальной канальной реализации, приведенной в этом посте, мы можем использовать:
type parser struct {
resume func(byte) Status
}
func (p *parser) Init() {
coparse := func(_ byte, yield func(Status) byte) Status {
read := func() byte { return yield(NeedMoreInput) }
if !parseQuoted(read) {
return BadInput
}
return Success
}
p.resume = coro.New(coparse)
p.resume(0)
}
func (p *parser) Write(c byte) Status {
return p.resume(c)
}
Функция Init
выполняет всю работу, и даже не очень. Она определяет функцию coparse
, которая имеет сигнатуру, необходимую для coro.New
, что означает добавление отбрасываемого входа типа byte
. Эта функция определяет read
, которое выдает NeedMoreInput
, а затем возвращает байт, предоставленный вызывающей стороной. Затем выполняется parseQuoted(read)
, преобразующая bool-результат в обычный код состояния. Создав с помощью coro.New
корутину для coparse
, Init
вызывает p.resume(0)
, чтобы позволить coparse
перейти к первому чтению в parseQuoted
. Наконец, метод Write
представляет собой тривиальную обертку вокруг p.resume
(полный код).
Эта установка абстрагирует нас от пары каналов, которые мы поддерживали вручную в предыдущем посте, позволяя работать на более высоком уровне при написании программы.
В качестве несколько более крупного примера рассмотрим Doug McIlroy's concurrent prime sieve. Он состоит из конвейера корутин, по одному для каждого простого числа p
, каждая из которых выполняется:
loop:
n = get a number from left neighbor
if (p does not divide n)
pass n to right neighbor
Счетная корутина, расположенная в левой части конвейера, передает в левый конец конвейера числа 2, 3, 4, .... Корутины печати, расположенные в крайней правой части, могут считывать простые числа, печатать их и создавать новые корутины фильтрации. Первый фильтр в конвейере удаляет кратные 2, следующий - кратные 3, следующий - кратные 5 и так далее.
Созданный нами примитив coro.New
позволяет взять прямой цикл, выдающий значения, и преобразовать его в функцию, которая может быть вызвана для получения каждого значения по очереди. Вот счетчик:
func counter() func(bool) int {
return coro.New(func(more bool, yield func(int) bool) int {
for i := 2; more; i++ {
more = yield(i)
}
return 0
})
}
Логика счетчика - это литерал функции, передаваемый в New
. Он принимает yield
функцию типа func(int) bool
. Код выдает значение, передавая его в yield
, а затем получает обратно булево значение, говорящее о том, следует ли продолжать генерировать новые числа. При получении команды на остановку, либо потому, что при входе значение more
было false
, либо потому, что вызов yield
вернул false
, цикл завершается. Он возвращает конечное, игнорируемое значение, удовлетворяющее типу функции, требуемому New
.
New
превращает это в функцию цикла, обратную yield: func(bool) int
, которая может быть вызвана с true
для получения следующего значения или с false
для остановки генератора. Корутина фильтрации лишь немного сложнее:
func filter(p int, next func(bool) int) (filtered func(bool) int) {
return coro.New(func(more bool, yield func(int) bool) int {
for more {
n := next(true)
if n%p != 0 {
more = yield(n)
}
}
return next(false)
})
}
Она принимает простое p
и функцию next
, подключенную к корутине слева, а затем возвращает отфильтрованный выходной поток для подключения к корутине справа.
Наконец, у нас есть корутина печати:
func main() {
next := counter()
for i := 0; i < 10; i++ {
p := next(true)
fmt.Println(p)
next = filter(p, next)
}
next(false)
}
Начиная со счетчика, main
сохраняет в next
вывод построенного на данный момент конвейера. Затем выполняется цикл: чтение простого числа p
, печать p
, а затем добавление нового фильтра на правом конце конвейера для удаления кратных p
(полный код).
Обратите внимание, что отношения вызова между корутинами могут меняться со временем: любая корутина C может вызвать функцию next
другой корутины D и стать корутиной, к которой D передает управление. Первый yield
счетчика идет к main
, в то время как его последующие yield
идут ко 2-ому фильтру. Аналогично, каждый p
-фильтр передает свой первый вывод (следующее простое число) в main
, в то время как его последующие yield
идут к фильтру для этого следующего простого числа.
В определенном смысле неправильно называть эти потоки управления корутинами. Это полноценные горутины, и они могут делать все, что может обычная горутина, включая блок ожидания мьютексов, каналов, системных вызовов и т.д. Что делает coro.New
, так это создает горутины с доступом к операциям переключения корутин внутри функций yield
и resume
(которые сито называет next
). Возможность использования этих операций может быть даже передана разным горутинам, что и происходит при передаче main
каждого своего next
потока каждой следующей горутине filter
. В отличие от оператора go
, coro.New
добавляет новую конкурентность в программу без нового параллелизма. Горутина, которую создает coro.New(f)
, может выполняться только тогда, когда какая-либо другая горутина явно одолжит ей возможность выполнения с помощью resume
; этот заем возвращается с помощью yield
или возврата f
. Если у вас есть только одна главная горутина и выполняется 10 операторов go
, то все 11 горутин могут быть запущены одновременно. Напротив, если у вас есть одна главная горутина и выполняется 10 вызовов coro.New
, то теперь есть 11 потоков управления, но параллельность программы остается прежней: одновременно выполняется только один. То, какие именно горутины приостанавливаются в операциях корутин, может меняться по мере выполнения программы, но параллелизм при этом не увеличивается.
Короче говоря, go
создает новый конкурентный параллельный поток управления, а coro.New
- новый конкурентный непараллельный поток управления. Удобно продолжать говорить о непараллельных потоках управления как о корутинах, но следует помнить, что то, какие именно корутины являются "непараллельными", может меняться в процессе выполнения программы, точно так же, как может меняться в процессе выполнения программы то, какие горутины принимают или отправляют из каналов.
Есть несколько улучшений, которые мы можем внести в coro.New
, чтобы он лучше работал в реальных программах. Первое - разрешить вызывать resume
после завершения работы функции: сейчас это приводит к deadlock-у. Добавим bool-результат, указывающий, был ли результат resume
получен в результате yield
. Реализация coro.New
, которую мы имеем на данный момент, выглядит следующим образом:
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) Out) {
cin := make(chan In)
cout := make(chan Out)
resume = func(in In) Out {
cin <- in
return <-cout
}
yield := func(out Out) In {
cout <- out
return <-cin
}
go func() {
cout <- f(<-cin, yield)
}()
return resume
}
Чтобы добавить этот дополнительный результат, нам нужно отследить, выполняется ли f
, и вернуть этот результат из resume
:
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool)) {
cin := make(chan In)
cout := make(chan Out)
running := true
resume = func(in In) (out Out, ok bool) {
if !running {
return
}
cin <- in
out = <-cout
return out, running
}
yield := func(out Out) In {
cout <- out
return <-cin
}
go func() {
out := f(<-cin, yield)
running = false
cout <- out
}()
return resume
}
Обратите внимание, что поскольку resume
может выполняться только тогда, когда вызывающая горутина заблокирована, и наоборот, совместное использование переменной running
не является гонкой. Они синхронизируются, выполняясь по очереди. Если resume
вызывается после выхода горутины, то resume
возвращает нулевое значение и false
.
Теперь мы можем определить, когда горутина завершила свою работу (полный код):
func main() {
resume := coro.New(func(_ int, yield func(string) int) string {
yield("hello")
yield("world")
return "done"
})
for i := 0; i < 4; i++ {
s, ok := resume(0)
fmt.Printf("%q %v\n", s, ok)
}
}
$ go run cohello.go
"hello" true
"world" true
"done" false
"" false
$
В примере с простым ситом было показано прямое использование coro.New
, но аргумент more bool
был несколько неудобен и не соответствовал рассмотренным ранее функциям итераторов. Рассмотрим преобразование любого push-итератора в pull-итератор с помощью coro.New
. Нам понадобится способ завершить работу корутины, выполняющей push-итератор, если мы хотим остановиться раньше, поэтому мы добавим в yield
булевый результат, указывающий, стоит ли продолжать, как в простом сите:
push func(yield func(V) bool)
Цель новой функции coro.Pull
- превратить эту push-функцию в pull-итератор. Итератор будет возвращать следующее значение и булево число, указывающее, закончилась ли итерация, подобно приему канала или просмотру карты:
pull func() (V, bool)
Если мы хотим остановить push-итерацию раньше времени, нам нужно как-то сигнализировать об этом, поэтому Pull
будет возвращать не только pull-функцию, но и stop-функцию:
stop func()
Если сложить эти функции вместе, то полная сигнатура Pull
будет выглядеть следующим образом:
func Pull[V any](push func(yield func(V) bool)) (pull func() (V, bool), stop func()) {
...
}
Первое, что необходимо сделать Pull
, - это запустить корутину для выполнения итератора push
, а для этого нужна функция-обертка с нужным типом, а именно функция, которая принимает more bool
, чтобы соответствовать результату bool
из yield
, и возвращает конечное значение V
. Функция pull
может вызывать resume(true)
, а функция stop
- resume(false)
:
func Pull[V any](push func(yield func(V) bool)) (pull func() (V, bool), stop func()) {
copush := func(more bool, yield func(V) bool) V {
if more {
push(yield)
}
var zero V
return zero
}
resume := coro.New(copush)
pull = func() (V, bool) {
return resume(true)
}
stop = func() {
resume(false)
}
return pull, stop
}
Вот и вся реализация. Благодаря возможностям coro.New
нам потребовалось совсем немного кода и усилий для создания хорошего преобразователя итераторов.
Чтобы использовать coro.Pull
, нам нужно переопределить метод All
дерева, чтобы он ожидал и использовал новый результат bool
из yield
:
func (t *Tree[V]) All(yield func(v V) bool) {
t.all(yield)
}
func (t *Tree[V]) all(yield func(v V) bool) bool {
return t == nil ||
t.Left.all(yield) && yield(t.Value) && t.Right.all(yield)
}
Теперь у нас есть все необходимое для написания функции сравнения деревьев на языке Go (полный код):
func cmp[V comparable](t1, t2 *Tree[V]) bool {
next1, stop1 := coro.Pull(t1.All)
next2, stop2 := coro.Pull(t2.All)
defer stop1()
defer stop2()
for {
v1, ok1 := next1()
v2, ok2 := next2()
if v1 != v2 || ok1 != ok2 {
return false
}
if !ok1 && !ok2 {
return true
}
}
}
Еще одним улучшением является передача паники от корутины обратно ее вызывающей стороне, то есть той корутине, которая в последний раз вызывала resume
для ее выполнения (и, следовательно, сидит заблокированной в resume
в ожидании ее). Очень распространенным запросом является создание некоторого механизма для информирования одной горутины о панике другой, но в общем случае это может быть затруднительно, поскольку мы не знаем, какую горутину информировать и готова ли она услышать это сообщение. В случае с корутинами у нас вызывающая сторона заблокирована в ожидании новостей, поэтому имеет смысл передать сообщение о панике.
Чтобы сделать это, мы можем добавить defer
, чтобы перехватить панику в новой корутине и снова вызвать ее в ожидающем resume
.
type msg[T any] struct {
panic any
val T
}
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool)) {
cin := make(chan In)
cout := make(chan msg[Out])
running := true
resume = func(in In) (out Out, ok bool) {
if !running {
return
}
cin <- in
m := <-cout
if m.panic != nil {
panic(m.panic)
}
return m.val, running
}
yield := func(out Out) In {
cout <- msg[Out]{val: out}
return <-cin
}
go func() {
defer func() {
if running {
running = false
cout <- msg[Out]{panic: recover()}
}
}()
out := f(<-cin, yield)
running = false
cout <- msg[Out]{val: out}
}()
return resume
}
Протестируем его (полный код):
func main() {
defer func() {
if e := recover(); e != nil {
fmt.Println("main panic:", e)
panic(e)
}
}()
next, _ := coro.Pull(func(yield func(string) bool) {
yield("hello")
panic("world")
})
for {
fmt.Println(next())
}
}
Новая корутина выдает hello
, а затем вызывает панику world
. Эта паника передается обратно в главную горутину, которая печатает значение и восстанавливается. Видно, что паника возникает в вызове resume
:
% go run coro.go
hello true
main panic: world
panic: world [recovered]
panic: world
goroutine 1 [running]:
main.main.func1()
/tmp/coro.go:9 +0x95
panic({0x108f360?, 0x10c2cf0?})
/go/src/runtime/panic.go:1003 +0x225
main.coro_New[...].func1()
/tmp/coro.go.go:55 +0x91
main.Pull[...].func2()
/tmp/coro.go.go:31 +0x1c
main.main()
/tmp/coro.go.go:17 +0x52
exit status 2
%
Распространение паники позаботилось о том, чтобы сообщить вызывающей стороне о досрочном выходе из корутины, но как сообщить корутине о досрочном выходе вызывающей стороны? По аналогии с функцией stop
в pull-итераторе, нам нужно каким-то образом сигнализировать корутине, что она больше не нужна, возможно, потому, что вызывающий паникует, а возможно, потому, что вызывающий просто возвращается.
Для этого мы можем изменить coro.New
так, чтобы он возвращал не только resume
, но и функцию cancel
. Вызов cancel
будет похож на resume
, только вместо возврата значения yield
будет паниковать. Если во время отмены корутина паникует по-другому, мы хотим, чтобы cancel
распространяла эту панику, как это делает resume
. Но, конечно, мы не хотим, чтобы cancel
распространял свою собственную панику, поэтому мы создаем уникальное значение паники, которое можно проверить. Мы также должны обработать отмену перед началом выполнения f
.
var ErrCanceled = errors.New("coroutine canceled")
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool), cancel func()) {
cin := make(chan msg[In])
cout := make(chan msg[Out])
running := true
resume = func(in In) (out Out, ok bool) {
if !running {
return
}
cin <- msg[In]{val: in}
m := <-cout
if m.panic != nil {
panic(m.panic)
}
return m.val, running
}
cancel = func() {
e := fmt.Errorf("%w", ErrCanceled) // unique wrapper
cin <- msg[In]{panic: e}
m := <-cout
if m.panic != nil && m.panic != e {
panic(m.panic)
}
}
yield := func(out Out) In {
cout <- msg[Out]{val: out}
m := <-cin
if m.panic != nil {
panic(m.panic)
}
return m.val
}
go func() {
defer func() {
if running {
running = false
cout <- msg[Out]{panic: recover()}
}
}()
var out Out
m := <-cin
if m.panic == nil {
out = f(m.val, yield)
}
running = false
cout <- msg[Out]{val: out}
}()
return resume, cancel
}
Мы могли бы изменить Pull
, чтобы использовать паники и для отмены итераторов, но в этом контексте явный bool
кажется более понятным, тем более что остановка итератора не является исключением.
Давайте посмотрим, как распространение и отмена паники позволяют очистке простого сита "просто работать". Сначала обновим сито, чтобы оно использовало новый API. Функции counter
и filter
уже являются "однострочными" вызовами return coro.New(...)
. Изменим сигнатуру, чтобы включить в нее дополнительную cancel
-функцию, возвращаемую из coro.New
:
func counter() (func(bool) (int, bool), func()) {
return coro.New(...)
}
func filter(p int, next func(bool) (int, bool)) (func(bool) (int, bool), func()) {
return coro.New(...)
}
Затем преобразуем функцию main
в функцию primes
, которая выводит n
простых чисел (полный код):
func primes(n int) {
next, cancel := counter()
defer cancel()
for i := 0; i < n; i++ {
p, _ := next(true)
fmt.Println(p)
next, cancel = filter(p, next)
defer cancel()
}
}
При выполнении этой функции, получив n
простых чисел, она возвращается. Каждый из отложенных вызовов cancel
очищает созданные корутины. А что если в одной из этих корутин произошла ошибка и она запаниковала? Если эта корутина была возобновлена next
-вызовом в primes
, то паника возвращается в primes
, и отложенные cancel
-вызовы primes
очищают все остальные корутины. Если же работа была возобновлена next
-вызовом в корутине filter
, то паника распространяется до ожидающей корутины filter
, затем до следующей ожидающей корутины filter
и так далее, пока не дойдет до p := next(true)
в primes
, который снова очистит оставшиеся корутины.
Мы пришли к следующему API:
New
создает новую приостановленную корутину, готовую выполнить функцию f
. Новая корутина - это горутина, которая никогда не выполняется сама по себе: она выполняется только в то время, когда какая-либо другая горутина вызывает ее и ожидает, вызывая resume
или cancel
.
Горутина может приостановить свою работу и переключиться на новую корутину, вызвав resume(in)
. Первый вызов resume
запускает f(in, yield)
. Вызов resume
блокирует выполнение f
до тех пор, пока f
не вызовет yield(out)
или не вернется в исходное состояние. Когда f
вызывает yield
, то yield
блокируется, а resume
возвращает out, true
. Когда f
возвращается, resume
возвращает out, false
. Если resume
вернулся из-за yield
, то следующий resume(in)
переключается обратно на f
, а yield
возвращается in
.
cancel
прекращает выполнение f
и закрывает корутину. Если вызов resume
не был произведен, то f
вообще не выполняется. В противном случае отмена приводит к панике заблокированного вызова yield
с ошибкой, удовлетворяющей errors.Is(err, ErrCanceled)
.
Если f
паникует и не восстанавливает панику, то паника останавливается в корутине f
и перезапускается в ожидающей f
горутине, вызывая повторную панику заблокированного ожидания resume
или cancel
с тем же значением паники. cancel
не выполняет повторную панику, если паника f
- это паника, которую вызвала сама cancel
.
После возврата f
или паники корутины больше не существует. Последующие вызовы resume
возвращают нулевое значение и false
. Последующие вызовы cancel
просто возвращаются.
Функции resume
, cancel
и yield
могут передаваться между различными горутинами и использоваться ими, тем самым динамически меняя, какая из горутин является "корутиной". Хотя New
создает новую горутину, она также устанавливает инвариант, согласно которому одна горутина всегда заблокирована - либо в resume
, cancel
, yield
, либо (сразу после New
) в ожидании resume
, которая вызовет f
. Этот инвариант сохраняется до возвращения f
, после чего новая горутина завершается. В результате coro.New
создает новую конкурентность в программе без какого-либо нового параллелизма.
Если несколько горутин вызывают resume
или cancel
, то эти вызовы сериализуются. Аналогичным образом, если несколько горутин вызывают yield
, эти вызовы сериализуются.
func New[In, Out any](f func(in In, yield func(Out) In) Out) (resume func(In) (Out, bool), cancel func())
Как я уже говорил в начале, хотя важно иметь определение корутин, которое можно понять по ссылке на чистую реализацию Go, я считаю, что нам следует использовать оптимизированную реализацию во время выполнения. На моем MacBook Pro 2019 года передача значений туда и обратно с помощью основанного на каналах coro.New
, описанного в этом посте, требует примерно 190 нс на переключение, или 380 нс на значение в coro.Pull
. Помните, что coro.Pull
не является стандартным способом использования итератора: стандартным способом является прямой вызов итератора, который вообще не имеет накладных расходов на корутину. coro.Pull
нужен только в том случае, если требуется обрабатывать итерируемые значения инкрементально, а не с помощью одного цикла for
. Тем не менее, мы хотим сделать coro.Pull
настолько быстрым, насколько это возможно.
Сначала я попробовал заставить компилятор помечать пары send-receive и оставлять подсказки для среды выполнения, чтобы объединить их в одну операцию. Это позволило бы времени выполнения канала обойти планировщик и перейти непосредственно к другой корутине. Такая реализация требует около 118 нс на одно переключение, или 236 нс на одно pulled-значение (на 38% быстрее). Это уже лучше, но все равно не так быстро, как хотелось бы. Вся общность каналов добавляет слишком много накладных расходов.
Далее я добавил прямой переключатель корутин во время выполнения, полностью избегая каналов. Это позволило сократить переключение между корутинами до трех атомарных сравнений (одно в структуре данных корутины, одно для статуса планировщика блокирующей корутины и одно для статуса планировщика возобновляющей корутины), что я считаю оптимальным с учетом инвариантов безопасности, которые необходимо поддерживать. Такая реализация занимает 20 нс на переключение, или 40 нс на pulled-значение. Это примерно в 10 раз быстрее, чем исходная реализация канала. Возможно, более важно то, что 40 нс на pulled-значение кажутся достаточно малыми в абсолютном выражении, чтобы не стать узким местом для кода, которому нужен coro.Pull
.
Расс Кокс - это один из ключевых разработчиков языка программирования Go. Он внес значительный вклад в развитие Go и активно участвует в его поддержке. Расс также выступает с докладами на конференциях, например, на GopherCon 2022, где он говорил о совместимости и том, как программы на Go продолжают работать. Кроме того, он ведет исследования в области моделей памяти. Вы можете найти его работы на GitHub.
Трудности перевода. В контексте программирования, yield
обычно не переводится как "выход". Это потому, что yield
не завершает функцию, в отличие от return
. Вместо этого, yield
приостанавливает выполнение функции и "производит" или "генерирует" значение. Когда функция возобновляет свое выполнение, она продолжает с того места, где был вызван yield
. Таким образом, yield
больше похож на "паузу" или "производство", а не "выход".