Суть Go: Time
- четверг, 6 февраля 2025 г. в 00:00:08
Статья о параллелизме Go, где с помощью интерактивных примеров раскрываем тему с нуля.
В этой главе мы рассмотрим методы управления временем в параллельных программах.
Удушение
Предположим, у нас есть куча задач.
func work() {
// Something very important, but not very fast.
time.Sleep(100 * time.Millisecond)
}
Самый простой способ — это обрабатывать последовательно:
func main() {
start := time.Now()
work()
work()
work()
work()
fmt.Println("4 calls took", time.Since(start))
}
Четыре вызова по 100 мс каждый, выполняемые один за другим, занимают в общей сложности 400 мс.
Конечно, быстрее выполнять работу параллельно, используя N обработчиков, подобных этому:
Если есть свободный обработчик, передайте ему задачу или подождите.
Создайте пустой канал с размером буфера N.
Перед запуском программа помещает в канал токен (некоторое значение).
После завершения работы программа goroutine забирает токен из канала.
Давайте создадим оболочку throttle(n, fn) для обеспечения параллельного выполнения. Мы настроим канал sema и убедимся, что одновременно выполняется не более n рабочих функций:
func throttle(n int, fn func()) (handle func(), wait func()) {
// Semaphore for n goroutines.
sema := make(chan struct{}, n)
// Execute fn functions concurrently, but not more than n at a time.
handle = func() {
sema <- struct{}{}
go func() {
fn()
<-sema
}()
}
// Wait until all functions have finished.
wait = func() {
for range n {
sema <- struct{}{}
}
}
return handle, wait
}
Теперь клиент вызывает функцию work() через оболочку, а не напрямую:
func main() {
handle, wait := throttle(2, work)
start := time.Now()
handle()
handle()
handle()
handle()
wait()
fmt.Println("4 calls took", time.Since(start))
}
Вот как это работает:
Первый и второй вызовы начинают обрабатываться немедленно. Третий и четвертый ожидают завершения двух предыдущих. При использовании двух обработчиков четыре вызова завершаются за 200 мс.
Такое регулирование хорошо работает, когда уровень параллелизма n и время индивидуальной работы() соответствуют (примерно) частоте вызовов handle(). Тогда каждый вызов имеет хорошие шансы быть обработанным немедленно или с небольшой задержкой.
Однако, если вызовов будет намного больше, чем могут обработать, работа системы замедлится. Функция Teach work() по-прежнему будет выполнять 100 мс, но handle() вызовы будут зависать в ожидании места в семафоре. Это не имеет большого значения для конвейеров передачи данных, но может быть проблематично для онлайн-запросов.
Иногда клиенты могут ссылаться на немедленное получение ошибки, когда все обработчики заняты. Для таких случаев нам нужен другой подход.
Давайте изменим логику throttle(). Если в семафоре есть место, выполните функцию. В противном случае немедленно верните сообщение об ошибке.
Таким образом, клиенту не придется ждать зависания вызова.
Оператор select поможет нам еще раз.
До:
// Execute fn functions concurrently,
// but not more than n at a time.
handle = func() {
sema <- struct{}{}
go func() {
fn()
<-sema
}()
}
После:
// Execute fn functions concurrently,
// but not more than n at a time.
handle = func() error {
select {
case sema <- struct{}{}:
go func() {
fn()
<-sema
}()
return nil
default:
return errors.New("busy")
}
}
Давайте вспомним, как работает select. Проверяет, какие обращения не заблокированы. Если готово несколько обращений, случайным образом выбирает одно для выполнения. Если все обращения заблокированы, ожидает, пока не будет готово одно. Третий пункт (все обращения заблокированы) фактически делится на два:
Если нет обращения по умолчанию, select ожидает, пока оно не будет готово.
Если есть обращение по умолчанию, select выполняет его. Обращение по умолчанию идеально подходит для нашей ситуации:
Если в канале sema есть токен, мы запускаем fn. В противном случае мы возвращаем ошибку "занято" без ожидания.
func main() {
handle, wait := throttle(2, work)
start := time.Now()
err := handle()
fmt.Println("1st call, error:", err)
err = handle()
fmt.Println("2nd call, error:", err)
err = handle()
fmt.Println("3rd call, error:", err)
err = handle()
fmt.Println("4th call, error:", err)
wait()
fmt.Println("4 calls took", time.Since(start))
}
Первые два вызова были выполнены одновременно (каждый занял 100 мс), в то время как третий и четвертый сразу же выдали сообщение об ошибке. Все вызовы были обработаны за 100 мс.
Конечно, такой подход (иногда его называют backpressure) требует осведомленности со стороны клиента. Он должен понимать, что ошибка busy означает перегрузку, и здесь либо отложить дальнейшие вызовы handle(), либо уменьшить их частоту.
Время ожидания операции
Вот функция, которая обычно выполняется за 10 мс, но в 20% случаев она выполняется за 200 мс:
func work() int {
if rand.Intn(10) < 8 {
time.Sleep(10 * time.Millisecond)
} else {
time.Sleep(200 * time.Millisecond)
}
return 42
}
Допустим, мы не хотим ждать более 50 мс. Итак, мы устанавливаем тайм-аут — максимальное время, в течение которого мы готовы ждать ответа. Если операция не завершится в течение этого времени, мы будем считать это ошибкой.
Давайте создадим оболочку, которая запускает данную функцию с заданным таймаутом:
func withTimeout(timeout time.Duration, fn func() int) (int, error) {
// ...
}
Мы назовем это так:
func main() {
for range 10 {
start := time.Now()
timeout := 50 * time.Millisecond
if answer, err := withTimeout(timeout, work); err != nil {
fmt.Printf("Took longer than %v. Error: %v\n", time.Since(start), err)
} else {
fmt.Printf("Took %v. Result: %v\n", time.Since(start), answer)
}
}
}
Вот идея, лежащая в основе Timeout():
Запустите данную функцию fn() в отдельной программе.
Дождитесь истечения времени ожидания.
Если функция fn() вернет результат, верните его.
Если она не завершится вовремя, верните сообщение об ошибке.
Вот как вы можете это реализовать:
// withTimeout executes a function with a given timeout.
func withTimeout(timeout time.Duration, fn func() int) (int, error) {
var result int
done := make(chan struct{})
go func() {
result = fn()
close(done)
}()
select {
case <-done:
return result, nil
case <-time.After(timeout):
return 0, errors.New("timeout")
}
}
Все здесь знакомо, за исключением time.After(). Эта функция stdlib возвращает канал, который изначально пуст, но получает значение по истечении времени ожидания. Это позволяет оператору select выбрать правильный регистр:
В случае <-done, если функция fn() завершается до истечения времени ожидания (возвращает результат);
В случае <-time.В случае After(), если функция fn() не завершается вовремя (возвращает ошибку).
Иногда требуется выполнить действие через время, а не сразу. В Go для этого можно использовать таймер.
func work() {
fmt.Println("work done")
}
func main() {
var eventTime time.Time
start := time.Now()
timer := time.NewTimer(100 * time.Millisecond) // (1)
go func() {
eventTime = <-timer.C // (2)
work()
}()
// enough time for the timer to expire
time.Sleep(150 * time.Millisecond)
fmt.Printf("delayed function started after %v\n", eventTime.Sub(start))
}
time.New Timer() создает новый таймер ➊, который истекает (запускается) по истечении заданного времени. Таймер — это структура с каналом C, в который он отправляет текущее время по истечении ➋. Таким образом, функция work() будет выполняться только после истечения таймера.
Если вы остановите таймер, на канал C не будет отправлено никакого значения, что предотвратит выполнение функции work():
func main() {
start := time.Now()
timer := time.NewTimer(100 * time.Millisecond)
go func() {
<-timer.C
work()
}()
time.Sleep(10 * time.Millisecond)
fmt.Println("10ms has passed...")
// the timer hasn't expired yet
if timer.Stop() {
fmt.Printf("delayed function canceled after %v\n", time.Since(start))
}
}
Функция Stop() останавливает таймер и возвращает значение true, если оно еще не истекло. В приведенном выше примере мы остановили таймер всего через 10 мс, поэтому она возвращает значение true.
Вы можете заметить проблему: поскольку timer.C никогда не получает значение, наша программа зависает. Вы можете исправить это с помощью инструкции select или библиотечной функции, которые мы обсудим позже.
Если вы остановите таймер слишком поздно, Stop() вернет значение false:
func main() {
timer := time.NewTimer(100 * time.Millisecond)
go func() {
<-timer.C
work()
}()
time.Sleep(150 * time.Millisecond)
fmt.Println("150ms has passed...")
// too late, the timer has already expired
if !timer.Stop() {
fmt.Println("too late to cancel")
}
}
Для отложенного выполнения функции вам не нужно вручную создавать таймер и считывать данные из его канала. Есть удобная оболочка time.AfterFunc():
func main() {
time.AfterFunc(100*time.Millisecond, work)
// enough time for the timer to expire
time.Sleep(150 * time.Millisecond)
}
AfterFunc(d, f) ожидает длительность d, а затем выполняет функцию f. Она возвращает таймер, который вы можете отменить перед началом выполнения:
В этом случае отмена выполнения с помощью timer.Stop() не приведет к зависанию каких-либо программ (хорошая причина использовать библиотечные функции вместо пользовательских).
func main() {
timer := time.AfterFunc(100*time.Millisecond, work)
time.Sleep(10 * time.Millisecond)
fmt.Println("10ms has passed...")
// the timer hasn't expired yet
if timer.Stop() {
fmt.Println("execution canceled")
}
}
Предположим, у нас есть функция, которая считывает токены из входного канала и предупреждает, если значение не появляется в канале через час:
type token struct{}
func consumer(cancel <-chan token, in <-chan token) {
const timeout = time.Hour
for {
select {
case <-in:
// do stuff
case <-time.After(timeout):
// log warning
case <-cancel:
return
}
}
}
Давайте напишем клиент, который измеряет использование памяти после отправки по каналу 100 тыс:
func main() {
cancel := make(chan token)
defer close(cancel)
tokens := make(chan token)
go consumer(cancel, tokens)
measure(func() {
for range 100000 {
tokens <- token{}
}
})
}
Незаметно каждый time.After создает таймер, который позже освобождается сборщиком мусора. Таким образом, наш цикл for, по сути, создает множество таймеров, выполняет множество распределений и создает ненужную работу для сборщика мусора. Обычно это не то, чего мы хотим.
Чтобы избежать создания таймера на каждой итерации цикла, вы можете создать его в начале и сбросить, прежде чем переходить к следующей итерации. Метод сброса в Go 1.23+ идеально подходит для этого:
func consumer(cancel <-chan token, in <-chan token) {
const timeout = time.Hour
timer := time.NewTimer(timeout)
for {
timer.Reset(timeout)
select {
case <-in:
// do stuff
case <-timer.C:
// log warning
case <-cancel:
return
}
}
}
Этот подход не создает новых таймеров, поэтому сборщику данных не нужно их собирать.
Из-за особенностей реализации в версиях Go до версии 1.23 сброс должен выполняться только для уже остановленного или просроченного таймера с пустым выходным каналом. Таким образом, для корректного сброса таймера вам необходимо использовать вспомогательную функцию:
// resetTimer stops, drains and resets the timer.
func resetTimer(t *time.Timer, d time.Duration) {
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(d)
}
func consumer(cancel <-chan token, in <-chan token) {
const timeout = time.Hour
timer := time.NewTimer(timeout)
for {
resetTimer(timer, timeout)
select {
case <-in:
// do stuff
case <-timer.C:
// log warning
case <-cancel:
return
}
}
}
Что еще хуже, time.AfterFun также создает таймер, но совсем другой. Он имеет нулевой канал C, поэтому Reset работает по-другому:
Если таймер все еще активен (не остановлен, срок действия не истек), происходит сброс время ожидания, фактически перезапускается таймер.
Если таймер уже остановлен или срок действия истек, сброс назначает выполнение новой функции.
func main() {
var start time.Time
work := func() {
fmt.Printf("work done after %dms\n", time.Since(start).Milliseconds())
}
// run work after 10 milliseconds
timeout := 10 * time.Millisecond
start = time.Now() // ignore the data race for simplicity
t := time.AfterFunc(timeout, work)
// wait for 5 to 15 milliseconds
delay := time.Duration(5+rand.Intn(11)) * time.Millisecond
time.Sleep(delay)
fmt.Printf("%dms has passed...\n", delay.Milliseconds())
// Reset behavior depends on whether the timer has expired
t.Reset(timeout)
start = time.Now()
time.Sleep(50*time.Millisecond)
}
Если срок действия таймера не истек, сброс сбрасывает время ожидания:
8ms has passed...
work done after 10ms
Если таймер истек, Reset запланирует новый вызов функции:
work done after 10ms
13ms has passed...
work done after 10ms
Повторим еще раз:
Go = 1.22: Для Timer, созданного с помощью NewTimer, Reset должен выполняться только для остановленных или просроченных таймеров с дренажными каналами.
Go = 1.23: Для Timer, созданного с помощью New Timer, безопасно вызывать Reset таймеров в любом состоянии (активном, остановленном или с истекшим сроком действия). Не требуется слив канала.
Для Timer, созданного с помощью AfterFunc, Reset либо повторно запускается (если таймер все еще активен), либо запускается повторно (если таймер остановился или истек срок его действия).
Таймеры — не самая очевидная вещь в Go, как считаете?
Иногда требуется выполнять какое-либо действие через регулярные промежутки времени. Для этого в Go есть инструмент, называемый бегущей строкой. Бегущая строка похожа на таймер, но она продолжает работать, пока вы ее не остановите:
func work(at time.Time) {
fmt.Printf("%s: work done\n", at.Format("15:04:05.000"))
}
func main() {
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
go func() {
for {
at := <-ticker.C
work(at)
}
}()
// enough for 5 ticks
time.Sleep(260 * time.Millisecond)
}
NewTicker(d) создает тикер, который отправляет текущее время в канал Cat interval d. В конечном итоге вы должны остановить тикер с помощью Stop(), чтобы освободить ресурсы.
В нашем случае интервал составляет 50 мс, что позволяет использовать 5 тактов.
Если читатель канала не может следить за тикером, он будет пропускать тики:
func work(at time.Time) {
fmt.Printf("%s: work done\n", at.Format("15:04:05.000"))
time.Sleep(100 * time.Millisecond)
}
func main() {
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
go func() {
for {
at := <-ticker.C
work(at)
}
}()
// enough for 3 ticks because of the slow work()
time.Sleep(260 * time.Millisecond)
}
В этом случае получатель начинает отставать после второго тика.
Как вы можете видеть, тики не накапливаются, они адаптируются к медленному получателю.
Теперь вы знаете, что время обработки в параллельных программах не связано с (ab) использованием времени.Спать. Вот несколько полезных инструментов, с которыми вы ознакомились:
Тайм-ауты ограничивают время выполнения операций.
Таймеры помогают при отложенных операциях.
Заявки предназначены для периодических действий.
Регистр по умолчанию в select разрешает обработку nowait.
В следующей главе мы будем работать с контекстом.