golang

Паттерны проектирования Go. Fan-in

  • среда, 30 октября 2024 г. в 00:00:18
https://habr.com/ru/articles/854302/

Введение

Доброго времени суток!

Наблюдая за тем, как разработчик пишет код, можно подумать, что он пребывает в состоянии глубокой безмятежности - работа непыльная, тяжести поднимать не нужно. Тем не менее, этот труд кажется простым только с виду. На самом деле очень легко сделать что-то не так. Да и вообще... что такое код? - мысли разработчика в чистом виде, а мысли у всех людей разные. Даже если решение получается верным, его понимание может требовать определённых усилий со стороны других разработчиков, что здорово может замедлить работу команды. Да и в принципе прибавить хлопот при дальнейшем сопровождении кода. Для избежания таких трудностей были придумана паттерны проектирования - варианты решений типовых задач в разработке. Такой подход позволяет не только воспользоваться готовой идеей, но и выработать общий стиль написания кода в команде...но, конечно, при правильном понимании паттерна проектирования.

В этой статье хочется немного поговорить о паттернах, которые применяются в Go. Поэтому рассмотрим, так называемый, fan-in: попишем немного код и unit-тест.

Назначение

Fan-in используется в многопоточном коде, когда данные разных потоков нужно передать в один поток.

Задача

Предположим, есть несколько горутин, которые откуда-то получают лог. И, в силу непреодолимых обстоятельств, весь этот лог нужно передать в единый канал.

Реализация

Задача поставлена. Самое время набросать каркас функции:

func merge(sources ...<-chan string) <-chan string { 
  result := make(chan string)
  
  return result
}

Функция merge получает каналы с логами и возвращает канал с агрегированным логом. В черновом варианте возвращается нулевой канал. Соответственно, unit-тест для этой функции будет:

func TestMerge(t *testing.T) {
    //подготовка исходных данных
	expected := map[string]struct{}{
		"create user":  struct{}{},
		"update user":  struct{}{},
		"select user":  struct{}{},
		"create order": struct{}{},
		"update order": struct{}{},
		"select order": struct{}{},
		"create task":  struct{}{},
		"update task":  struct{}{},
		"run task":     struct{}{},
	}
	result := map[string]struct{}{}

    //запуск тестируемой функции
    output := merge()
  
	//наполнение карты результатов
    for v := range output {
		result[v] = struct{}{}
	}

    //сверка значений результата с ожидаемыми значениями
    if len(result) != len(expected) {
		t.Errorf("Expected: %v, got: %v", expected, result)
	}

	for key := range result {
		if _, ok := expected[key]; !ok {
			t.Errorf("Unexpected key: %s", key)
		}
	}
}

Тестируемая функция merge возвращает небуферизованный канал, значения которого нужно вычитывать и заодно проверять. Для проверки значений потребуется дополнительная структура ожидаемых значений. Так как значений может быть много, то, кажется, что для этой задачи хорошо подходят карты - одна карта содержит в качестве ключей ожидаемые значения:

expected := map[string]struct{}{
      "create user":  struct{}{},
      "update user":  struct{}{},
      "select user":  struct{}{},
      "create order": struct{}{},
      "update order": struct{}{},
      "select order": struct{}{},
      "create task":  struct{}{},
      "update task":  struct{}{},
      "run task":     struct{}{},
  }

а другая - результаты:

result := map[string]struct{}{}

После запуска тестируемой функции и наполнения результатами карты result:

//запуск тестируемой функции
output := merge()
  
//наполнение карты результатов
for v := range output {
  result[v] = struct{}{}
}

Остаётся лишь сравнить их длины:

if len(result) != len(expected) {
	t.Errorf("Expected: %v, got: %v", expected, result)
}

и убедиться, что ключи карт совпадают:

for key := range result {
	if _, ok := expected[key]; !ok {
		t.Errorf("Unexpected key: %s", key)
	}
}

Если на данном этапе попробовать запустить тест, то он никогда не выполнится - будет "зависание", так как в тесте есть чтение из канала output, но в него никто не пишет. Следует добавить логику в функцию merge, но сначала вспомним, что требуется в задаче: необходимо лог из нескольких каналов записать в единый канал. Первое, что приходит в голову - обойти в цикле передаваемое множество каналов sources и записывать значения из них в результирующий канал:

func merge(sources ...<-chan string) <-chan string { 
  result := make(chan string)
  
  for _, source := range sources {
    for v := range ch {
    	result <- v
	}
  }
    
  return result
}

Но беда этого кода в том, что, при записи значения в небуферизованный результирующий канал, главная горутина будет ждать, пока что-то не прочитает это значение из него. Поэтому логику записи в результирующий канал необходимо запускать в отдельных горутинах:

func merge(sources ...<-chan string) <-chan string { 
  result := make(chan string)

  output := func(ch <-chan string) {
    for v := range ch {
      result <- v
    }
  }
  
  for _, source := range sources {
    go output(source)
  }
   
  return result
}

Для этого создаётся анонимная функция output, которая запускается в отдельных горутинах при обходе среза sources.

Теперь лучше, но пока всё равно ничего работать не будет, так как, при запуске функции merge, главная горутина, не дожидаясь дочерних горутин, завершит своё выполнение, и мы просто не получим результат. А также логика, которая будет читать из результирующего канала, не будет знать, когда это чтение следует прекратить. Для решения этих проблем следует использовать sync.WaitGroup и закрывать результирующий канал:

func merge(sources ...<-chan string) <-chan string {
	var wg sync.WaitGroup
	result := make(chan string)

	output := func(ch <-chan string) {
		for v := range ch {
			result <- v
		}
		wg.Done()
	}

	for _, source := range sources {
		wg.Add(1)
		go output(source)
	}

	go func() {
		wg.Wait()
		close(result)
	}()

	return result
}

Таким образом

var wg sync.WaitGroup

WaitGroup создаётся в теле функции merge. В теле анонимной функции output уменьшается счётчик количества требующих ожидания горутин:

output := func(ch <-chan string) {
  for v := range ch {
    result <- v
  }
  wg.Done()
}

Перед созданием горутины, для запуска функции output, счётчик необходимо увеличивать:

for _, source := range sources {
  wg.Add(1)
  go output(source)
}

Закрытие результирующего канала и ожидание выполнения дочерних горутин также происходит в отдельной горутине:

go func() {
  wg.Wait()
  close(result)
}()

В противном случае будет снова "зависание" кода на wg.Wait(), так как результирующий канал небуферизованный, а это значит, что в него можно писать по одному значению, и где-то это значение должно вычитываться из него.

Код функции для объединения лога в единый канал готов. Самое время запустить unit-тест:

go clean --testcache && go test -v --race ./...

тест упадёт:

=== RUN   TestMerge
    fan_in_test.go:52: Expected: map[create order:{} create task:{} create user:{} run task:{} select order:{} select user:{} update order:{} update task:{} update user:{}], got: map[]
--- FAIL: TestMerge (0.00s)

Так в тесте ожидается прочитать лог, но в функцию merge ничего не передаётся:

output := merge()

После подготовки тестовых данных:

ch1 := make(chan string)
ch2 := make(chan string)
ch3 := make(chan string)

go func() {
  for _, msg := range []string{"create user", "update user", "select user"} {
    ch1 <- msg
  }
  close(ch1)
}()

go func() {
  for _, msg := range []string{"create order", "update order", "select order"} {
    ch2 <- msg
  }
  close(ch2)
}()

go func() {
  for _, msg := range []string{"create task", "update task", "run task"} {
    ch3 <- msg
  }
  close(ch3)
}()

output := merge(ch1, ch2, ch3)

Тест успешно проходит:

=== RUN   TestMerge
--- PASS: TestMerge (0.00s)
PASS

Итого

Код функции агрегирующей лог от разных горутин в единый канал:

func merge(sources ...<-chan string) <-chan string {
	var wg sync.WaitGroup
	result := make(chan string)

	output := func(ch <-chan string) {
		for v := range ch {
			result <- v
		}
		wg.Done()
	}

	for _, source := range sources {
		wg.Add(1)
		go output(source)
	}

	go func() {
		wg.Wait()
		close(result)
	}()

	return result
}

Код unit-теста этой функции:

func TestMerge(t *testing.T) {
	expected := map[string]struct{}{
		"create user":  struct{}{},
		"update user":  struct{}{},
		"select user":  struct{}{},
		"create order": struct{}{},
		"update order": struct{}{},
		"select order": struct{}{},
		"create task":  struct{}{},
		"update task":  struct{}{},
		"run task":     struct{}{},
	}
	result := map[string]struct{}{}

	ch1 := make(chan string)
	ch2 := make(chan string)
	ch3 := make(chan string)

	go func() {
		for _, msg := range []string{"create user", "update user", "select user"} {
			ch1 <- msg
		}
		close(ch1)
	}()

	go func() {
		for _, msg := range []string{"create order", "update order", "select order"} {
			ch2 <- msg
		}
		close(ch2)
	}()

	go func() {
		for _, msg := range []string{"create task", "update task", "run task"} {
			ch3 <- msg
		}
		close(ch3)
	}()

	output := merge(ch1, ch2, ch3)
	for v := range output {
		result[v] = struct{}{}
	}

	if len(result) != len(expected) {
		t.Errorf("Expected: %v, got: %v", expected, result)
	}

	for key := range result {
		if _, ok := expected[key]; !ok {
			t.Errorf("Unexpected key: %s", key)
		}
	}
}