golang

Своё кастомное межсервисное взаимодействие с блекджеком и gRPC

  • суббота, 9 декабря 2023 г. в 00:00:16
https://habr.com/ru/companies/ozontech/articles/779266/

Привет, Хабр! Меня зовут Ильяс. Мы с командой делаем собственный Service Mesh в Ozon Tech, и в этой статье я расскажу, как можно за вечер реализовать свое супер кастомное межсервисное взаимодействие. К концу статьи мы с вами напишем современные алгоритмы балансировки, настроим канареечные деплои, а также узнаем, как реализовать кучу других супернеобычных механизмов межсервисного взаимодействия на основе, не поверите, библиотеки gRPC :D. И да, мы с моей командой уже раскатили это на весь Ozon Tech, состоящий из 5000 сервисов. Пристегнитесь — мы начинаем… :-)

… по порядку.

Микросервисная архитектура и её особенности

С приходом в наш мир микросервисной архитектуры появился некоторый пробел между сервисами, который потребовал от разработчиков решения по организации связности этих замечательных микросервисов. Самым простым и логичным было использовать тот же механизм, который доставляет входящий трафик от пользователей, а именно — HTTP. Но прогресс не стоит на месте. Для удобства использования микросервисов придумали такой подход, как RPC (remote procedure call, или на великом и могучем – удалённый вызов процедуры). Существует множество реализаций этого подхода — как при помощи JSON в привычных HTTP-запросах, так и усовершенствованных, которые отправляют данные в более сжатом виде. Самый популярный и уважаемый в Ozon Tech — gRPC. Большая часть синхронного межсервисного взаимодействия в нашей системе осуществляется при помощи этого протокола. 

В этой статье речь пойдёт не про «рекламные» преимущества gRPC, такие как высокая скорость, удобство и т. д., а кое о чём необычном, что когда-то лично меня поразило, а именно — о встроенном механизме поиска адресов приложения, в которое совершаются запросы, о менеджменте подключений к разным его адресам, а также о балансировке запросов между несколькими копиями этого приложения. Ну или, если коротко, - речь про Resolver, Balancer и Picker в библиотеке gRPC-go :)

Мы познакомимся со всеми этими объектами, препарируем их и сделаем собственные реализации каждой из них, которые позволят нам к концу статьи запустить собственный механизм управления взаимодействиями между приложениями. Статья наполнена кодом, написанным на Go, но в целом применима и к другим языкам, для которых есть официальная версия gRPC от разработчиков Google (С#, Java, C++, JavaScript, Swift и других). Некоторые языки имеют собственную реализацию, а некоторые — обёртку над главной реализацией протокола на С. 

Когда возникает проблема?

Когда монолит только начинают распиливать на микросервисы или запускают новый проект, вся инсталляция состоит из небольшого количества частей приложения. Для того чтобы начать отправлять запросы из одного сервиса в другой, можно либо захардкодить адреса серваков или виртуалок, на которых хостится нужная часть, либо подтюнить внутренний DNS, добавив туда записи с адресами этих сервисов, и резолвить адреса через него. Называют этот процесс Service Discovery (да-да, поиск сервисов :)). Но, когда эти единицы и десятки сервисов превращаются в сотни и тысячи, начинаются проблемы. А если ещё все эти сервисы затаскивают в Kubernetes, в котором им свойственно переезжать с одного сервера на другой, начинается сущий ад. Но есть «коробочное» решение — Service Mesh, реализуемый сайдкаром (привет, Envoy и Linkerd :)).

Кто такой этот ваш Service Mesh???

Service Mesh — это инфраструктурный слой, который в общем случае реализует поиск IP-адресов сервисов, создание подключений, балансировку нагрузки, обеспечение шифрования трафика и прочие дополнительные механизмы, которые могут потребоваться в том пробеле, который образуется при разделении монолита на части. 

Обычно все они реализуются путём добавления к каждому приложению некоего дополнительного приложения, называемого сайдкаром, через которое проксируется весь трафик. Оно куда-то совершает запросы, получает список адресов, создаёт коннекты и начинает балансировать на них трафик (например, Envoy делает запросы в приложение Istio, которое в свою очередь всю информацию про сервисы выкачивает из Kubernetes API). И я назвал это решение «коробочным», так как его достаточно легко подключить к вашему кластеру Kubernetes, добавив правило на инъекцию сайдкара к каждому приложению, которое поднимается в Kubernetes. 

Реализация gRPC немного отличается от стандартного подхода тем, что вся логика находится в библиотеке, которая компилируется вместе с приложением и, по сути, вшивается в бинарь вашего приложения. Такой подход хорош тем, что он практически не требует дополнительных ресурсов, в отличие от сайдкаров (так как сайдкар — это отдельное приложение, которое занимает процессорное время). Но, конечно же, есть нюансы, связанные с усложнением клиентского приложения, а также с тем, что, в отличие от сайдкара, для обновления версии фреймворка требуется пересобирать всё приложение, что может быть сложнее и дольше, чем замена бинаря в сайдкаре. Серебряной пули всё ещё не существует :(

Создаём подключение

Немного определений

Пара слов об используемых терминах. У нас есть некий сервис 1, который мы будем называть КЛИЕНТОМ, которому необходимо совершать запросы к некоему сервису 2, который мы будем называть БЭКЕНДОМ. 

Перейдём от слов к делу. Когда мы со стороны клиента создаём подключение к нужному нам бэкенду через gRPC, мы вызываем функцию grpc.Dial, которая, помимо ошибки (всё в жизни может пойти не так :D), возвращает некий объект ClientConn. Это ключевой объект в gRPC, в рамках которого реализуется работа данного протокола, а также функционируют механизмы Service Discovery, Load Balancing и другие. Также ClientConn проксирует взаимодействие этих элементов (чуть позже увидим).

На картинках ниже объект КЛИЕНТ (который демонстрирует исходное приложение клиента) не будет изображаться для упрощения
На картинках ниже объект КЛИЕНТ (который демонстрирует исходное приложение клиента) не будет изображаться для упрощения

Resolver

С чего начать?

Чтобы нам начать совершать запросы, надо понять, куда их совершать. Опускаться ниже сетевого уровня по модели OSI мы не будем, поэтому ответом на вопрос «куда?» будет список IP-адресов. Мы знаем, что существует некий бэкенд, который как-то себя идентифицирует (пусть у него будет какое-то уникальное имя, которое нам известно, просто потому что это имя ему дали либо мы, либо разработчики из соседнего отдела. Можно сходить к ним и спросить :)).

Воспользуемся примером gRPC hello-сервера. На данном этапе нам надо придумать некий объект, который будет заниматься логикой определения адресов бэкенда, то есть какой-то механизм, который будет переводить имя бэкенда в список адресов. В gRPC он называется Resolver. Его задачей будет совершать запрос в некоторое хранилище и получать список адресов, а также поддерживать его в актуальном состоянии. Ну а хранилищем может быть, например, DNS-служба. Её IP-адрес всегда один и тот же. 

Наглядный мок DNS-сервера

Для наглядности сделаем простой мок DNS-сервера — приложение, которое на HTTP GET-запрос будет возвращать список IP-адресов заданного бэкенда. На порту 8081 поднимаем HTTP-сервер, который на GET запрос “/endpoints” с query-параметром target вернёт адрес с портом интересующего нас сервиса. Выглядит оно как-то так: 

func main() {
	http.HandleFunc("/endpoints", HandleEndpointRequest)
	err := http.ListenAndServe("localhost:8081", nil)
	if err != nil {
		log.Fatal(err)
	}
}
func HandleEndpointRequest(w http.ResponseWriter, req *http.Request) {
	log.Println("new request with query:", req.URL.RawQuery)
	target := req.URL.Query().Get("target")
	switch target {
	case "helloworld_server":
		_, err := w.Write([]byte("127.0.0.1:8080"))
		if err != nil {
			log.Println(err)
		}
	default:
		w.WriteHeader(http.StatusNotFound)
	}
}

Интерфейс создания Resolver

Перейдем к Resolver’у и уже непосредственно к gRPC. Для его использования требуется зарегистрировать билдер Resolver’а в gRPC.

rb := &ResolverBuilder{}
resolver.Register(rb)

Сам билдер реализует простой интерфейс — функции Build и Scheme. Build функция создаёт инстанс Resolver, а Scheme — возвращает строку с названием нашего Resolver. Мы позже будем использовать его на этапе создания подключения к сервису. ClientConn, таким образом, будет создавать тот Resolver, которым мы хотим воспользоваться (резолюция имён). 

type Builder interface {
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	Scheme() string
}

Назовём наш Resolver habr.

func (b *ResolverBuilder) Scheme() string {
	return "habr"
}


Таким образом, когда на клиенте вызывается grpc.Dial, этой функции на вход отправляется адрес, к которому мы хотим подключиться:

grpc.Dial(“ozon.ru”, opts...)

Чтобы выбрать Resolver, нужно перед адресом добавить его схему:

 grpc.Dial(“habr:///ozon.ru”, opts...)

Под капотом функции Dial эта строка распарсится — и из мапы зарегистрированных билдеров Resolver’ов будет выбран нужный, при помощи которого будет создан Resolver:

Интерфейс самого Resolver

Интерфейс Resolver достаточно прост. У нас есть функция закрытия Resolver (Close) и функция ResolveNow. С фукнцией Close всё понятно: название говорит о том, что при её вызове Resolver должен завершить свою работу. Ну а функция ResolveNow может быть вызвана ClientConn для форсирования резолвинга адресов в ряде экстренных случаев, которые мы рассмотрим ниже (например, если не удалось подключиться ни к одному из ранее полученных адресов; плюс она может быть вызвана несколько раз конкурентно). Простой Resolver может эту функцию игнорировать. Основная логика работы Resolver должна запускаться при его создании и работать в фоне (то есть в отдельной горутине), наполняя ClientConn адресами.

type Resolver interface {
   ResolveNow(ResolveNowOptions)
   Close()
}

В функцию Build передаются сведения о таргете (названии сервиса, к которому мы хотим подключиться, в виде структуры URL из stg пакета net/url), управляющий ClientConn (обёрнутый дополнительным проксирующим объектом специально для Resolver) для передачи в него информации, а также дополнительные BuildOptions. 

func (b *ResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
	ctx, cancel := context.WithCancel(context.Background())
	r := &Resolver{
		target: target.Endpoint,
		ctx:    ctx,
		cancel: cancel,
		wg:     sync.WaitGroup{},
		cc:     cc,
	}
	r.wg.Add(1)
// Та самая горутина, которая в фоне будет обновлять адреса
	go r.watch()
	return r, nil
}

При создании Resolver будем запускать горутину с функцией watch. Она будет совершать достаточно простое действие: раз в 5 секунд осуществлять некий lookup заданного target.

func (r *Resolver) watch() {
	defer r.wg.Done()
	r.lookup(r.target)
	ticker := time.NewTicker(5 * time.Second)
	for {
		select {
		case <-r.ctx.Done():
			return
		case <-ticker.C:
			r.lookup(r.target)
		}
	}
}

В функции lookup же мы будем делать запрос к вышеописанному моку DNS-сервера, а в конце — передавать сведения в ClientConn для дальнейшей обработки (вернёмся к этому чуть позже). Для упрощения опустим обработки ошибок, заменив их на вывод лога.

func (r *Resolver) lookup(target string) {
	resp, err := http.Get(fmt.Sprintf("http://%s/endpoints?target=%s", lookupServerHostPort, target))
	if err != nil {
		log.Println(err)
		return
	}
	data, err := io.ReadAll(resp.Body)
	if err != nil {
		log.Println(err)
		return
	}
	addrs := strings.Split(string(data), " ")
	log.Println("resolver made request:", addrs)
	newAddrs := make([]resolver.Address, 0, len(addrs))
	for _, a := range addrs {
		newAddrs = append(newAddrs, resolver.Address{Addr: a})
	}
	// Обновляем адреса в ClientConn
	err = r.cc.UpdateState(resolver.State{
		Addresses: newAddrs,
		ServiceConfig: defaultServiceConfig,
	})
	if err != nil {
		log.Println(err)
		return
	}
}

Ну и после того как мы получим список адресов и конвертируем его в нужный нам вид, мы передадим эту информацию обратно ClientConn через функцию UpdateState. На эту функцию мы посмотрим чуть позже, а пока просто будем считать, что мы получили список адресов и передали его обработку куда-то дальше. 

Демонстрация работы

Запустим все наши 3 приложения: моки DNS-сервера, бэкенда и клиента с кастомным Resolver.

Логи клиента:

2023/10/21 16:45:17 resolver made request: [127.0.0.1:8080]

2023/10/21 16:45:17 Greeting: Hello OzonTech

2023/10/21 16:45:19 Greeting: Hello OzonTech

2023/10/21 16:45:21 Greeting: Hello OzonTech

2023/10/21 16:45:22 resolver made request: [127.0.0.1:8080]

2023/10/21 16:45:23 Greeting: Hello OzonTech

2023/10/21 16:45:25 Greeting: Hello OzonTech

2023/10/21 16:45:27 resolver made request: [127.0.0.1:8080]

2023/10/21 16:45:27 Greeting: Hello OzonTech

2023/10/21 16:45:29 Greeting: Hello OzonTech

2023/10/21 16:45:31 Greeting: Hello OzonTech

2023/10/21 16:45:32 resolver made request: [127.0.0.1:8080]

2023/10/21 16:45:33 Greeting: Hello OzonTech

2023/10/21 16:45:35 Greeting: Hello OzonTech

2023/10/21 16:45:37 resolver made request: [127.0.0.1:8080]

Логи бэкенда:

2023/10/21 16:45:17 Received: OzonTech

2023/10/21 16:45:19 Received: OzonTech

2023/10/21 16:45:21 Received: OzonTech

2023/10/21 16:45:23 Received: OzonTech

2023/10/21 16:45:25 Received: OzonTech

2023/10/21 16:45:27 Received: OzonTech

2023/10/21 16:45:29 Received: OzonTech

2023/10/21 16:45:31 Received: OzonTech

2023/10/21 16:45:33 Received: OzonTech

2023/10/21 16:45:35 Received: OzonTech

Логи мока DNS-сервера: 

2023/10/21 16:45:17 new request with query: target=helloworld_server

2023/10/21 16:45:22 new request with query: target=helloworld_server

2023/10/21 16:45:27 new request with query: target=helloworld_server

2023/10/21 16:45:32 new request with query: target=helloworld_server

2023/10/21 16:45:37 new request with query: target=helloworld_server

В логах мы видим, что клиент делает запрос к нашему моку DNS-сервера, после чего начинает отправлять запросы до бэкенда. И дальше раз в 5 секунд обновляет данные от DNS-сервера.

Промежуточные выводы

Мы реализовали кастомный механизм поиска адресов. Мы осуществляем lookup и получаем адреса интересующих нас бэкендов. Из недостатков такого подхода можно отметить задержку изменения адресов. Но в качестве примера нас это устраивает.

Пока что мы получили просто список — и что-то дальше обработало его, начав отправлять запросы к бэкенду. Логично, что сначала нам надо бы установить подключения к этим адресам и начать отслеживать их состояния. Для этого реализуем объект Balancer, задачей которого будет создавать соединения, проверять их состояния и при необходимости реализовывать на клиенте какую-то логику по порядку создания соединений и т.д. Но сначала нужно понять, где он создаётся и как выбирается нужный для конкретного подключения, а также как в него попадает информация об адресах. 

Balancer

В функции UpdateState у ClientConn, которую мы вызывали ранее, помимо поля с адресами, было поле ServiceConfig. В нём как раз и содержится информация о том, какой Balancer использовать, а также необходимые конфиги для него, если они имеются. В примере выше мы передали ClientConn в поле ServiceConfig константу, но данные для данного поля могут быть также получены от управляющего сервера, который передал нам информацию об адресах.

В конце своей работы функция UpdateState ставит событие в очередь на обновление состояния Balancer. Одной из фич ClientConn при использовании Balancer является то, что управляющие функции вызываются одной горутиной, в связи с чем между функциями Balancer не возникает race condition и нет необходимости обеспечивать защиту от конкурентного обращения к общим данным в функциях: UpdateClientConnState, ResolverError, UpdateSubConnState и Close.

Сам Balancer регистрируется аналогично Resolver. 

bb := &BalancerBuilder{}
balancer.Register(bb)

А вот его интерфейс немного отличается. В Balancer в функцию Build не передаются сведения о таргете напрямую — они передаются в опциях. Ну и вместо метода Scheme в нём метод Name. Но задача в целом у него аналогичная.

type Builder interface {
	Build(cc ClientConn, opts BuildOptions) Balancer
	Name() string
}

Реализуем же Balancer. Его дефолтный интерфейс выглядит следующим образом:

type Balancer interface {
	UpdateClientConnState(ClientConnState) error
	ResolverError(error)
	UpdateSubConnState(SubConn, SubConnState)
	Close()
}

Обработка полученного состояния

Функция UpdateClientConnState — как раз та самая, при помощи которой ClientConn обновляет состояние Balancer.

Она передаёт Balancer сведения от Resolver. В этот момент в Balancer попадает список адресов, которыми он будет управлять, а также ряд атрибутов, которые могут сообщить ему дополнительную информацию при необходимости. Поскольку эта функция является некой точкой входа (после Build, конечно же), начнём с неё.

func (b *Balancer) UpdateClientConnState(s balancer.ClientConnState) error {
   // В случае обновления состояния очищаем ошибку Resolver (в конце будет новая проверка на неё)
   b.resolverErr = nil


   addrsSet := b.createNewSubConns(s)


   b.clearDeletedSubConns(addrsSet)


   // Если состояние Resolver не содержит адресов, то возвращаем ошибку, чтобы ClientConn попытался вызвать ResolveNow
   // Также сохраняем ошибку об отсутствии адресов, чтобы в случае падения состояния Balancer репортить её наверх


   if len(s.ResolverState.Addresses) == 0 {
      b.ResolverError(errors.New("produced zero addresses"))
      return balancer.ErrBadResolverState
   }
   // В случае если всё окей, пересоздаём Picker и обновляем состояние Balancer, а также передаем ClientConn новый Picker
   b.regeneratePicker()
   b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
   return nil
}

Глобально у UpdateClientConnState основные задачи — пройтись по полученному состоянию и принять решение о том, какие недостающие соединения необходимо создать, и, возможно, закрыть те, которые были открыты, если они исчезли из нового состояния. В нашем примере мы будем просто синхронизировать состояние с тем, которое получаем от Resolver.

Создание и отслеживание Subconnects

Соединения в рамках подключения к сервису в gRPC называют SubConn (или сабконнекты). Создаются они также через ClientConn.

ClientConn создаёт новый SubConn — и после вызова функции Connect в отдельной горутине SubConn создаёт HTTP/2-транспорт и начинает отслеживать его состояние. Данная информация через ClientConn передаётся уже Balancer при помощи функции UpdateSubConnState. 

Функции createNewSubConns и clearDeletedSubConns:

func (b *Balancer) createNewSubConns(s balancer.ClientConnState) *resolver.AddressMap {
   // addrsSet — это мапка, благодаря которой в будущем будет проще удалять сабконнекты, которые Resolver в обновлении не вернул
   addrsSet := resolver.NewAddressMap()
   for _, a := range s.ResolverState.Addresses {
      addrsSet.Set(a, nil)
      if _, ok := b.subConns.Get(a); !ok {
         // Создаём сабконнект, так как его нет в стейте Balancer
         sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
         if err != nil {
            // Если что-то пошло не так, скипаем адрес и продолжаем
            // Возможные ошибки:
            // -- len(addrs) <= 0
            // -- ErrClientConnClosing
            // -- channelz.RegisterSubChannel error (errors.New("a SubChannel's parent id cannot be nil"))
            continue
         }
         // Сохраняем сабконнект
         b.subConns.Set(a, sc)


         b.scStates[sc] = connectivity.Idle
         // Так как это новый сабконнект, считаем, что ранее он был в выключенном состоянии
         b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
         // Вызываем подключение сабконнекта (под капотом подключение происходит в отдельной горутине, поэтому просто вызываем функцию)
         sc.Connect()
      }
   }
   return addrsSet
}


func (b *Balancer) clearDeletedSubConns(addrsSet *resolver.AddressMap) {
   // Пройдёмся по всем сабконнектам в текущем состоянии и проверим, не были ли сабконнекты удалены в новом состоянии Resolver
   for _, a := range b.subConns.Keys() {
      sci, _ := b.subConns.Get(a)
      sc := sci.(balancer.SubConn)
      // Проверяем наличие адреса в мапке новых адресов
      if _, ok := addrsSet.Get(a); !ok {
         // Если в новом стейте нет этого адреса, делаем запрос к СlientConn на его удаление
         b.cc.RemoveSubConn(sc)
         b.subConns.Delete(a)
         // Но изменение собственного состояния произойдёт, когда ClientConn вернёт колбэк об изменении состояния
         // этого сабконнекта через функцию UpdateSubConnState
      }
   }
}

Итак, в функцию UpdateSubConnState приходят обновления состояний сабконнектов. В ней нам необходимо обработать полученное состояние сабконнекта, а также зафиксировать агрегированное состояние всех сабконнектов, которое будет характеризовать состояние Balancer.

func (b *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   // Если к нам пришло обновление сабконнекта, которого нет в нашем стейте, скипаем его
   oldS, ok := b.scStates[sc]
   if !ok {
      return
   }


   s := state.ConnectivityState


   if oldS == connectivity.TransientFailure &&
      (s == connectivity.Connecting || s == connectivity.Idle) {
      // Если SubConn попал в TRANSIENT_FAILURE, игнорируем последующий IDLE или
      // CONNECTING переходы, чтобы предотвратить агрегированное состояние от
      // нахождения в постоянном CONNECTING состоянии, когда есть много бэкендов,
      // но все они находятся в нерабочем состоянии
      if s == connectivity.Idle {
         sc.Connect()
      }
      return
   }
   b.scStates[sc] = s
   switch s {
   case connectivity.Idle:
      sc.Connect()
   case connectivity.Shutdown:
      // В этом месте удаляем сабконнект, который ранее мы попросили закрыть у ClientConn через RemoveSubConn
      delete(b.scStates, sc)
   case connectivity.TransientFailure:
      // Если сабконнект попал в TransientFailure => сохраняем ошибку, по которой он находится в этом состоянии
      b.connErr = state.ConnectionError
   }


   // Записываем новое состояние в агрегатор состояний — и получаем новое состояние Balancer
   b.state = b.csEvltr.RecordTransition(oldS, s)


   // Пересоздаём Picker в следующих случаях:
   //  - сабконнект стал Ready или наоборот
   //  - суммарное состояние Balancer стало TransientFailure (возможно, нужно обновить ошибку Balancer)
   if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
      b.state == connectivity.TransientFailure {
      b.regeneratePicker()
   }
   // Обновляем стейт
   b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}

Следующая функция интерфейса Balancer — ResolverError, которой мы уже воспользовались в UpdateClientConnState. Эта функция предназначена для обработки ошибок Resolver. Помимо того что она вызывается в UpdateClientConnState, она может быть вызвана вышестоящими балансировщиками (да, их иногда оборачивают и реализуют вложенную логику, если это необходимо).

func (b *Balancer) ResolverError(err error) {
   // Сохраняем ошибку в стейт
   b.resolverErr = err


   // Если у Balancer нет сабконнектов, переводим его в состояние TransientFailure
   if b.subConns.Len() == 0 {
      b.state = connectivity.TransientFailure
   }


   // Если состояние не TransientFailure, Picker должен дальше работать, пока Balancer не будет отдавать корректную ошибку
   if b.state != connectivity.TransientFailure {
      return
   }


   // Пересоздаём Picker и обновляем состояние ClientConn
   b.regeneratePicker()
   b.cc.UpdateState(balancer.State{
      ConnectivityState: b.state,
      Picker:            b.picker,
   })
}

Выделяем готовые к работе Subconnects

Функция regeneratePicker создаёт Picker, который уже является последним ключевым объектом во всей цепочке, так как именно он реализует алгоритмы балансировки.

Если всё хорошо (Balancer не находится в состоянии TransientFailure), то соберём слайс сабконнектов, находящихся в состоянии Ready, и вызовем функцию создания Picker, передав туда полученный слайс. Если же наш Balancer находится в состоянии TransientFailure, создадим no operate (noop) Picker, который на все вызовы функции Pick будет возвращать ошибку. Данную ошибку будет видеть наш клиент при попытке вызова RPC.

func (b *Balancer) regeneratePicker() {
	// Если Balancer находится в состоянии TransientFailure, Picker’у на каждый пик следует отдавать ошибку (объединение двух ошибок: Resolver’а и сабконнектов)
	if b.state == connectivity.TransientFailure {
		b.picker = NewErrPicker(b.mergeErrors())
		return
	}
	// Создаём мапу сабконнектов в состоянии Ready
	readySCs := make(map[balancer.SubConn]base.SubConnInfo)
	// Фильтруем все сабконнекты в состоянии Ready из мапы сабконнектов
	for _, addr := range b.subConns.Keys() {
		sci, _ := b.subConns.Get(addr)
		sc := sci.(balancer.SubConn)
		if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
			readySCs[sc] = base.SubConnInfo{Address: addr}
		}
	}
	// Создаём дефолтный RoundRobin Picker, который в функциях выше будет отправлен в состояние ClientConn’а
	b.picker = BuildRRPicker(base.PickerBuildInfo{ReadySCs: readySCs})
}

	Вспомогательная функция mergeErrors позволяет сформировать ошибку из двух ранее сохранённых: resolverErr и connErr. Объединенная ошибка далее передаётся в ErrPicker и прокидывается до самого клиента при вызове методов RPC.

func (b *Balancer) mergeErrors() error {
	// Либо у нас проблема с подключениями, если есть какие-то сабконнекты,
	// либо у нас проблема с Resolver, если сабконнектов нет
	if b.connErr == nil {
		return fmt.Errorf("last resolver error: %v", b.resolverErr)
	}
	if b.resolverErr == nil {
		return fmt.Errorf("last connection error: %v", b.connErr)
	}
	// Потенциально могут быть обе ошибки, если в логике Resolver есть более сложная логика, которая упала, помимо
	// того что он не смог создать сабконнекты (например, был совершён фолбэк на дополнительный механизм резолвинга)
	return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
}

Дополнительные функции

Помимо этого, в дополнительном интерфейсе, который должен реализовать Balancer, есть функция ExitIdle, которая используется в том случае, если ClientConn необходимо форсировать переход всех сабконнектов в состояние подключения. В нашем Balancer мы не реализуем логику с каким-либо ожиданием перед тем, как подключиться к сабконнектам из списка адресов, полученного от Resolver:

func (b *Balancer) ExitIdle() {}

Функция Close нужна для очистки внутреннего состояния Balancer, если оно имеется. Сабконнекты в Close закрывать не нужно.

func (b *Balancer) Close() {}

Мы проделали достаточно большой путь. У нас уже есть Resolver, который запрашивает список адресов по названию бэкенда и обновляет их в фоне. Balancer, получая эти адреса, принимает решение о создании подключений к ним, а также отслеживает состояние этих подключений. Мы уже готовы отправить запрос в бэкенд, но…

Picker

Round-robin Picker

Остался последний элемент — Picker. Имея массив готовых к обработке сабконнектов, нужно решить, в какой же из них отправить конкретный запрос. И как раз эту задачу решает Picker, который на вход получает список сабконнектов, готовых к обработке RPC, и по нему реализует алгоритм балансировки. Его функция Pick может вызываться параллельно несколько раз. Рассмотрим базовый пример RoundRobin Picker. 

func BuildRRPicker(info base.PickerBuildInfo) balancer.Picker {
	if len(info.ReadySCs) == 0 {
		return NewErrPicker(balancer.ErrNoSubConnAvailable)
	}
	scs := make([]balancer.SubConn, 0, len(info.ReadySCs))
	// Проходимся по Ready subconnects 
	for sc := range info.ReadySCs {
		scs = append(scs, sc)
	}
	return &rrPicker{
		subConns: scs,
		// Первый запрос отправим на рандомный индекс, поскольку
		// параллельно может быть создан такой же RR Picker в другом клиенте
		// и не хотелось бы отправлять нагрузку со всех клиентов на первый
// SubConn
		next: uint32(rand.Intn(len(scs))),
	}
}

Структура у RR Picker крайне простая: массив сабконнектов и индекс этого массива, на который отправится следующий запрос. 

type rrPicker struct {
	subConns []balancer.SubConn
	next     uint32
}

Атомарно инкрементим счётчик запросов и берём остаток от деления его значения на длину массива, чтобы не вывалиться за границы. Ну и возвращаем результат нашего выбора :)

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	subConnsLen := uint32(len(p.subConns))
	nextIndex := atomic.AddUint32(&p.next, 1)
	sc := p.subConns[nextIndex%subConnsLen]
	return balancer.PickResult{SubConn: sc}, nil
}

Error Picker

Также ранее мы встречали ErrorPicker. Это, по сути, no operate Picker, который при старте принимает на вход ошибку и на все Pick запросы возвращает её. Таким образом, эта ошибка прокидывается конечному клиенту фреймворка.

func NewErrPicker(err error) balancer.Picker {
	return &errPicker{err: err}
}
type errPicker struct {
	err error
}
func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
	return balancer.PickResult{}, p.err
}

Но помимо того что Picker может реализовывать статические алгоритмы балансировки (как описанный RR, или возможные chash и WRR), его интерфейс позволяет создавать динамические алгоритмы балансировки. В ответе функции Pick в balancer.PickResult есть два интересных поля:

// PickResult contains information related to a connection chosen for an RPC.
type PickResult struct {
   // SubConn is the connection to use for this pick, if its state is Ready.
   // If the state is not Ready, gRPC will block the RPC until a new Picker is
   // provided by the balancer (using ClientConn.UpdateState).  The SubConn
   // must be one returned by ClientConn.NewSubConn.
   SubConn SubConn


   // Done is called when the RPC is completed.  If the SubConn is not ready,
   // this will be called with a nil parameter.  If the SubConn is not a valid
   // type, Done may not be called.  May be nil if the balancer does not wish
   // to be notified when the RPC completes.
   Done func(DoneInfo)


   // Metadata provides a way for LB policies to inject arbitrary per-call
   // metadata. Any metadata returned here will be merged with existing
   // metadata added by the client application.
   //
   // LB policies with child policies are responsible for propagating metadata
   // injected by their children to the ClientConn, as part of Pick().
   Metadata metadata.MD
}

Поле Done, которое представляет собой функцию, будет вызвано по окончании запроса. Таким образом, мы можем отслеживать, например, количество in-flight запросов на каждый из сабконнектов или рассчитывать среднее время ответа и направлять трафик с приоритетом на самый быстрый сабконнект. 

Поле Metadata позволяет алгоритмам балансировки помещать в запрос дополнительную информацию, если это необходимо.

И только после того как будут созданы и обработаны все вышеописанные объекты, наш ClientConn будет готов к тому, чтобы совершить свой первый RPC.

Самое интересное

Поскольку теперь мы разобрались с тем, как работают все упомянутые в этой статье объекты, можно приступить к реализации кастомных механизмов, о которых я писал в начале статьи.

На первый взгляд, самыми интересными и кастомизируемыми объектами являются Picker и Resolver. Можно воспользоваться дефолтной реализацией base.Balancer от gRPC, которому передать PickerBuilder и назначить уникальное имя. 

base.NewBalancerBuilder("habr", pb, base.Config{})

PickerBuilder будет вызван в момент regeneratePicker:

type PickerBuilder interface {
   Build(info PickerBuildInfo) balancer.Picker
}

Канареечные деплои

Реализуем же механизмы, упомянутые в заголовке статьи. Начнём с простого — с канареечного деплоя. Для этого необходимо добавить к адресам в ответе нашего мока DNS-сервера сведения о версии приложения, а также веса этих версий. После этого у нас получится не просто мок DNS-сервера, а нечто похожее на полноценный control plane для Service Mesh.

type (
   Response struct {
      Endpoints      []Endpoint    `json:"endpoints"`
      VersionWeights map[string]int `json:"version_weights"`
      ServiceConfig  string         `json:"service_config"`
   }
   Endpoint struct {
      Address string `json:"address"`
      Version string `json:"version"`
   }
)


var HelloWorldServerResponse = Response{
   Endpoints: []Endpoint{
      {
         Address: "127.0.0.1:8080",
         Version: "v1",
      },
      {
         Address: "127.0.0.1:8090",
         Version: "v2",
      },
   },
   VersionWeights: map[string]int{
      "v1": 10,
      "v2": 90,
   },
   ServiceConfig: "{\"loadBalancingPolicy\": \"habr_balancer\"}",
}

Ну и, конечно же, нужно доработать наш Resolver, чтобы он смог парсить дополнительные поля. Передавать сведения будем через атрибуты, которые имеются в структуре ответа Resolver, передаваемой в ClientConn:

func (r *Resolver) lookup(target string) {
   resp, err := http.Get(fmt.Sprintf("http://%s/endpoints?target=%s", lookupServerHostPort, target))
   if err != nil {
      log.Println(err)
      return
   }
   data, err := io.ReadAll(resp.Body)
   if err != nil {
      log.Println(err)
      return
   }
   var epsResp Response
   err = json.Unmarshal(data, &epsResp)
   if err != nil {
      return
   }


   log.Println("resolver made request:", epsResp)
   newAddrs := make([]resolver.Address, 0, len(epsResp.Endpoints))
   for _, ep := range epsResp.Endpoints {
      newAddrs = append(newAddrs, resolver.Address{
         Addr:               ep.Address,
         Attributes:         attributes.New(versionAttrKey, ep.Version),
         BalancerAttributes: attributes.New(versionAttrKey, ep.Version),
      })
   }
   err = r.cc.UpdateState(resolver.State{
      Addresses:     newAddrs,
      ServiceConfig: r.cc.ParseServiceConfig(epsResp.ServiceConfig),
      Attributes:    attributes.New(weightsAttrsKey, epsResp.VersionWeights),
   })
   if err != nil {
      return
   }
}


func (b *Balancer) regeneratePicker() {
   // Если мы в состоянии TransientFailure => пикеру на каждый пик следует отдавать ошибку (мерж двух ошибок - резолвера и сабконнектов)
   if b.state == connectivity.TransientFailure {
      b.picker = NewErrPicker(b.mergeErrors())
      return
   }


   // Создаем мапу сабконнектов в Ready состоянии
   readySCs := make(map[balancer.SubConn]base.SubConnInfo)


   // Фильтруем все Ready сабконнекты из мапы всех сабконнектов
   for _, addr := range b.subConns.Keys() {
      sci, _ := b.subConns.Get(addr)
      sc := sci.(balancer.SubConn)
      if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
         readySCs[sc] = base.SubConnInfo{Address: addr}
      }
   }


   // Если в атрибутах имеются сведения о весах по версиям - создадим канареечный пикер
   if weightByVersion, ok := b.lastResolverState.Attributes.Value(weightsAttrsKey).(map[string]int); ok {
      b.picker = NewCanaryPicker(base.PickerBuildInfo{ReadySCs: readySCs}, weightByVersion)
   } else {
      // Создаем дефолтный RoundRobin пикер, который в функциях выше будет отправлен в состояние clientConn
      b.picker = BuildRRPicker(base.PickerBuildInfo{ReadySCs: readySCs})
   }
}

Эти данные надо прокинуть в PickerBuilder, чтобы наполнить создаваемый Picker нужной нам информацией про веса релизов.

На самом же Picker в соответствии с полученной информацией про версии разделим сабконнекты на группы, между которыми будем делать выбор. Для упрощения реализаций Picker’ов, соберём их в цепочку. Каждый вышестоящий Picker будет реализовывать какую-то дополнительную логику, а самый последний — конечный алгоритм балансировки. Пока что воспользуемся нашим любимым RoundRobin Picker’ом. 

func NewCanaryPicker(info base.PickerBuildInfo, weightsByVersion map[string]int) balancer.Picker {


   byVersion := make(map[string]map[balancer.SubConn]base.SubConnInfo)


   for subConn, connInfo := range info.ReadySCs {
      v, ok := connInfo.Address.Attributes.Value(versionAttrKey).(string)
      if !ok {
         continue
      }


      m := byVersion[v]
      if m == nil {
         m = make(map[balancer.SubConn]base.SubConnInfo)
      }
      m[subConn] = connInfo
      byVersion[v] = m
   }


   switch len(byVersion) {
   case 0:
      return NewErrPicker(balancer.ErrNoSubConnAvailable)
   case 1:
      // не канарейка :)
      for version, _ := range weightsByVersion {
         return BuildRRPicker(base.PickerBuildInfo{ReadySCs: byVersion[version]})
      }
   }


   byVersionPickers := make([]wrappedPicker, 0, len(byVersion))
   var totalWeight int
   for version, w := range weightsByVersion {
      totalWeight += w
      byVersionPickers = append(byVersionPickers, wrappedPicker{
         weight: w,
         picker: BuildRRPicker(base.PickerBuildInfo{ReadySCs: byVersion[version]}),
      })
   }


   return &CanaryPicker{
      totalWeight:      totalWeight,
      byVersionPickers: byVersionPickers,
   }
}


func (p *CanaryPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
   weight := fastrand.Intn(p.totalWeight)


   for _, version := range p.byVersionPickers {
      weight -= version.weight
      if weight < 0 {
         return version.picker.Pick(info)
      }
   }


   if len(p.byVersionPickers) == 0 {
      return balancer.PickResult{}, errors.New("picker has no versions available")
   }


   return p.byVersionPickers[0].picker.Pick(info)
}

Давайте посмотрим, как всё работает:

Логи мока DNS:

2023/12/06 22:05:37 new request with query: target=helloworld_server

2023/12/06 22:05:42 new request with query: target=helloworld_server

2023/12/06 22:05:47 new request with query: target=helloworld_server

2023/12/06 22:05:52 new request with query: target=helloworld_server

2023/12/06 22:05:57 new request with query: target=helloworld_server

Логи клиента:


2023/12/06 22:05:37 resolver made request: {[{127.0.0.1:8080 v1} {127.0.0.1:8090 v2}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

2023/12/06 22:05:37 Greeting: Hello OzonTech

2023/12/06 22:05:39 Greeting: Hello OzonTech

2023/12/06 22:05:41 Greeting: Hello OzonTech

2023/12/06 22:05:42 resolver made request: {[{127.0.0.1:8080 v1} {127.0.0.1:8090 v2}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

2023/12/06 22:05:43 Greeting: Hello OzonTech

2023/12/06 22:05:45 Greeting: Hello OzonTech

2023/12/06 22:05:47 resolver made request: {[{127.0.0.1:8080 v1} {127.0.0.1:8090 v2}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

2023/12/06 22:05:47 Greeting: Hello OzonTech

2023/12/06 22:05:49 Greeting: Hello OzonTech

2023/12/06 22:05:51 Greeting: Hello OzonTech

2023/12/06 22:05:52 resolver made request: {[{127.0.0.1:8080 v1} {127.0.0.1:8090 v2}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

2023/12/06 22:05:53 Greeting: Hello OzonTech

2023/12/06 22:05:55 Greeting: Hello OzonTech

2023/12/06 22:05:57 resolver made request: {[{127.0.0.1:8080 v1} {127.0.0.1:8090 v2}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

Exiting.

Логи бэкенда на порту 8080:

2023/12/06 22:03:45 server listening at [::]:8080

2023/12/06 22:05:37 Received: OzonTech

Логи бэкенда на порту 8090:

2023/12/06 22:04:13 server listening at [::]:8090

2023/12/06 22:05:39 Received: OzonTech

2023/12/06 22:05:41 Received: OzonTech

2023/12/06 22:05:43 Received: OzonTech

2023/12/06 22:05:45 Received: OzonTech

2023/12/06 22:05:47 Received: OzonTech

2023/12/06 22:05:49 Received: OzonTech

2023/12/06 22:05:51 Received: OzonTech

2023/12/06 22:05:53 Received: OzonTech

2023/12/06 22:05:55 Received: OzonTech

По логам можно увидеть, что инстанс бэкенда на порту 8090, который имел версию v2 с весом 90 - получил 9 запросов из 10, а бэкенда на порту 8080, который имел версию v1 с весом 10 - получил 1 запрос из 10, что мы и ожидали).

Современные алгоритмы балансировки

И куда же без алгоритмов балансировки? :) Начнём с простого — с WRR (weighted round robin).

Дополним ответ от мока DNS весами по каждому поду в релизе v2:

var HelloWorldServerResponse = Response{
   Endpoints: []Endpoint{
      {
         Address: "127.0.0.1:8080",
         Version: "v1",
      },
      {
         Address: "127.0.0.1:8090",
         Version: "v2",
         Weight:  33,
      },
      {
         Address: "127.0.0.1:8095",
         Version: "v2",
         Weight:  67,
      },
   },
   VersionWeights: map[string]int{
      "v1": 10,
      "v2": 90,
   },
   ServiceConfig: "{\"loadBalancingPolicy\": \"habr_balancer\"}",
}

Совместим его с уже реализованным Canary Picker:

for version, w := range weightsByVersion {
   totalWeight += w
   byVersionPickers = append(byVersionPickers, wrappedPicker{
      weight: w,
      picker: NewWRRPicker(base.PickerBuildInfo{ReadySCs: byVersion[version]}),
   })
}

Реализацию алгоритма WRR на основе Random возьмем готовую из gRPC и обернем её структурой Picker’а:

type (
   WRRPicker struct {
      wrr *RandomWRR
   }
)


func NewWRRPicker(info base.PickerBuildInfo) balancer.Picker {
   p := &WRRPicker{
      wrr: NewRandom(),
   }


   for subConn, connInfo := range info.ReadySCs {
      w, ok := connInfo.Address.Attributes.Value(weightAttrKey).(int)
      if !ok {
         // Если атрибута нет, то выставляем 0 вес
         w = 0
      }
      // В случае, если у всех subConn вес нулевой - WRR выродится в Random
      p.wrr.Add(subConn, w)
   }
   return p
}


func (p *WRRPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
   subConn, ok := p.wrr.Next().(balancer.SubConn)
   if !ok {
      return balancer.PickResult{}, errors.New("WRRPicker internal state error. Invalid item")
   }
   return balancer.PickResult{SubConn: subConn}, nil
}


// weightedItem is a wrapped weighted item that is used to implement weighted random algorithm.
type weightedItem struct {
   item              any
   weight            int
   accumulatedWeight int
}


// RandomWRR is a struct that contains weighted items implement weighted random algorithm.
type RandomWRR struct {
   items []*weightedItem
   // Are all item's weights equal
   equalWeights bool
}


// NewRandom creates a new WRR with random.
func NewRandom() *RandomWRR {
   return &RandomWRR{}
}


func (rw *RandomWRR) Next() (item any) {
   if len(rw.items) == 0 {
      return nil
   }
   if rw.equalWeights {
      return rw.items[fastrand.Intn(len(rw.items))].item
   }


   sumOfWeights := rw.items[len(rw.items)-1].accumulatedWeight
   // Random number in [0, sumOfWeights).
   randomWeight := fastrand.Intn(sumOfWeights)
   // Item's accumulated weights are in ascending order, because item's weight >= 0.
   // Binary search rw.items to find first item whose accumulatedWeight > randomWeight
   // The return i is guaranteed to be in range [0, len(rw.items)) because randomWeight < last item's accumulatedWeight
   i := sort.Search(len(rw.items), func(i int) bool { return rw.items[i].accumulatedWeight > randomWeight })
   return rw.items[i].item
}


func (rw *RandomWRR) Add(item any, weight int) {
   accumulatedWeight := weight
   equalWeights := true
   if len(rw.items) > 0 {
      lastItem := rw.items[len(rw.items)-1]
      accumulatedWeight = lastItem.accumulatedWeight + weight
      equalWeights = rw.equalWeights && weight == lastItem.weight
   }
   rw.equalWeights = equalWeights
   rItem := &weightedItem{item: item, weight: weight, accumulatedWeight: accumulatedWeight}
   rw.items = append(rw.items, rItem)
}
Логи клиента:

2023/12/06 23:17:08 resolver made request: {[{127.0.0.1:8080 v1 0} {127.0.0.1:8090 v2 33} {127.0.0.1:8095 v2 67}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

2023/12/06 23:17:08 Greeting: Hello OzonTech

2023/12/06 23:17:10 Greeting: Hello OzonTech

2023/12/06 23:17:12 Greeting: Hello OzonTech

2023/12/06 23:17:13 resolver made request: {[{127.0.0.1:8080 v1 0} {127.0.0.1:8090 v2 33} {127.0.0.1:8095 v2 67}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

2023/12/06 23:17:14 Greeting: Hello OzonTech

2023/12/06 23:17:16 Greeting: Hello OzonTech

2023/12/06 23:17:18 resolver made request: {[{127.0.0.1:8080 v1 0} {127.0.0.1:8090 v2 33} {127.0.0.1:8095 v2 67}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

2023/12/06 23:17:18 Greeting: Hello OzonTech

2023/12/06 23:17:20 Greeting: Hello OzonTech

2023/12/06 23:17:22 Greeting: Hello OzonTech

2023/12/06 23:17:23 resolver made request: {[{127.0.0.1:8080 v1 0} {127.0.0.1:8090 v2 33} {127.0.0.1:8095 v2 67}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

2023/12/06 23:17:24 Greeting: Hello OzonTech

2023/12/06 23:17:26 Greeting: Hello OzonTech

2023/12/06 23:17:28 resolver made request: {[{127.0.0.1:8080 v1 0} {127.0.0.1:8090 v2 33} {127.0.0.1:8095 v2 67}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

Логи бэкенда на порту 8080:

2023/12/06 23:17:04 server listening at [::]:8080

2023/12/06 23:17:20 Received: OzonTech

Логи бэкенда на порту 8090:

2023/12/06 23:17:01 server listening at [::]:8090

2023/12/06 23:17:08 Received: OzonTech

2023/12/06 23:17:10 Received: OzonTech

Логи бэкенда на порту 8095:

2023/12/06 23:17:06 server listening at [::]:8095

2023/12/06 23:17:12 Received: OzonTech

2023/12/06 23:17:14 Received: OzonTech

2023/12/06 23:17:16 Received: OzonTech

2023/12/06 23:17:18 Received: OzonTech

2023/12/06 23:17:22 Received: OzonTech

2023/12/06 23:17:24 Received: OzonTech

2023/12/06 23:17:26 Received: OzonTech

Итого на инстанс бэкенда на порту 8090, который имел версию v2 с весом 90 и собственный вес 67 - получил 7 запросов из 10, инстанс бэкенда на порту 8095, который имел версию v2 с весом 90 и собственный вес 33 - получил 2 запроса из 10, а бэкенда на порту 8080, который имел версию v1 с весом 10 - все так же получил 1 запрос из 10. С инстансами бэкенда версии v2 получилось не прям идеально (6 запросов в один и 3 запроса в другой), поскольку это все же великий рандом :)

Ну и что-то интересное — leastConn.

// leastConnItem is a wrapped weighted item that is used to implement least conn algorithm.
type leastConnItem struct {
   item        balancer.SubConn
   inFlightReq atomic.Int64
}


type LeastConnPicker struct {
   lc *leastConnections
}


func NewLeastConnPicker(info base.PickerBuildInfo) balancer.Picker {
   if len(info.ReadySCs) == 0 {
      return NewErrPicker(errors.New("no subConn available"))
   }
   var items []balancer.SubConn
   for subConn := range info.ReadySCs {
      items = append(items, subConn)
   }


   p := &LeastConnPicker{
      lc: New(items),
   }


   return p
}


func (p *LeastConnPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
   subConn, doneFn := p.lc.Next()
   if subConn == nil {
      return balancer.PickResult{}, errors.New("LeastConnPicker internal state error. No subConns in leastConn state")
   }
   return balancer.PickResult{SubConn: subConn, Done: doneFn}, nil
}


type leastConnections struct {
   conns []*leastConnItem
}


func New(items []balancer.SubConn) *leastConnections {
   conns := make([]*leastConnItem, 0, len(items))
   for _, i := range items {
      conns = append(conns, &leastConnItem{
         item:        i,
         inFlightReq: atomic.Int64{},
      })
   }
   return &leastConnections{
      conns: conns,
   }
}


func (lc *leastConnections) Next() (balancer.SubConn, func(info balancer.DoneInfo)) {
   if len(lc.conns) == 0 {
      return nil, nil
   }


   var (
      minInFlightReq = int64(-1)
      idx            int
   )


   for i, conn := range lc.conns {
      inFlightReq := conn.inFlightReq.Load()
      if minInFlightReq == -1 || inFlightReq < minInFlightReq {
         minInFlightReq = inFlightReq
         idx = i
      }
   }


   lc.conns[idx].inFlightReq.Add(1)


   return lc.conns[idx].item, func(balancer.DoneInfo) {
      lc.conns[idx].inFlightReq.Add(-1)
   }
}

Важно понимать, что обновления от Resolver’а будут вызывать создание нового Picker, что приведет к потере накопленных статистических данные по запросам, поэтому схема с периодическим lookup и постоянным обновлением состояния нам уже не подходит. Для этого следует либо в Resolver кэшировать прошлый ответ, либо переводить его модель на Push, но это уже не в рамках этой статьи. Для демонстрации работы увеличим интервал работы lookup функции до 50 секунд, чтобы она нам не мешала.

В бэкенды добавим задаваемую задержку ответа и лог об отправке ответа. Пусть инстанс с портом 8090 будет иметь задержку на ответ в 10 секунд. 

Логи клиента:

2023/12/07 00:25:34 resolver made request: {[{127.0.0.1:8080 v1 0} {127.0.0.1:8090 v2 33} {127.0.0.1:8095 v2 67}] map[v1:10 v2:90] {"loadBalancingPolicy": "habr_balancer"}}

2023/12/07 00:25:37 Greeting: Hello OzonTech

2023/12/07 00:25:39 Greeting: Hello OzonTech

2023/12/07 00:25:41 Greeting: Hello OzonTech

2023/12/07 00:25:43 Greeting: Hello OzonTech

2023/12/07 00:25:45 Greeting: Hello OzonTech

2023/12/07 00:25:45 Greeting: Hello OzonTech

2023/12/07 00:25:49 Greeting: Hello OzonTech

2023/12/07 00:25:51 Greeting: Hello OzonTech

2023/12/07 00:25:53 Greeting: Hello OzonTech

Логи бэкенда на порту 8080:

2023/12/07 00:25:15 server listening at [::]:8080

2023/12/07 00:25:53 Received: OzonTech

2023/12/07 00:25:53 Returned: OzonTech

Логи бэкенда на порту 8090:

2023/12/07 00:25:13 server listening at [::]:8090

2023/12/07 00:25:35 Received: OzonTech

2023/12/07 00:25:45 Returned: OzonTech

2023/12/07 00:25:47 Received: OzonTech

2023/12/07 00:25:57 Returned: OzonTech

Логи бэкенда на порту 8095:

2023/12/07 00:25:31 server listening at [::]:8095

2023/12/07 00:25:37 Received: OzonTech

2023/12/07 00:25:37 Returned: OzonTech

2023/12/07 00:25:39 Received: OzonTech

2023/12/07 00:25:39 Returned: OzonTech

2023/12/07 00:25:41 Received: OzonTech

2023/12/07 00:25:41 Returned: OzonTech

2023/12/07 00:25:43 Received: OzonTech

2023/12/07 00:25:43 Returned: OzonTech

2023/12/07 00:25:45 Received: OzonTech

2023/12/07 00:25:45 Returned: OzonTech

2023/12/07 00:25:49 Received: OzonTech

2023/12/07 00:25:49 Returned: OzonTech

2023/12/07 00:25:51 Received: OzonTech

2023/12/07 00:25:51 Returned: OzonTech

2023/12/07 00:25:53 Received: OzonTech

2023/12/07 00:25:53 Returned: OzonTech

Видим, что на инстанс с портом 8090 пока не завершился первый запрос, новые запросы не летели, когда инстанс без задержки с портом 8095 получал постоянные запросы. Ну а инстанс с портом 8080 до сих пор работает с 10% трафика при помощи канареечного Picker :)

Готовые механизмы


В gRPC уже есть ряд реализованных Resolver’ов и Balancer’ов. Некоторое время назад добавили поддержку протокола xDS, который позволяет получать информацию об адресах бэкенда от Istio и других xDS-провайдеров. Также есть дефолтный DNS Resolver, который ходит в DNS и по RoundRobin распределяет запросы по полученным адресам (в отличие от DNS Resolver на уровне net.Transport, который делает Dial на рандомный адрес и отправляет все запросы туда). Называются эти Resolver’ы соответственно xds и dns:

grpc.DialContext(context.Background(), "xds:///ozontech:grpc")
grpc.DialContext(context.Background(), "dns:///ozontech:grpc")

Заключение

В статье мы рассмотрели возможности современной реализации gRPC-библиотек, а также реализовали свои механизмы балансировок и Service Discovery. Если в вашем проекте есть необходимость кастомизировать взаимодействие сервисов, наполнив его какой-то особенной функциональностью, которой ещё нет в «коробочных» решениях, то реализация собственных структур объектов gRPC может вам помочь. Как, например, помогла нам в Ozon Tech :) 

Около 4 лет назад на базе описанных механизмов в Ozon Tech был построен собственный Service Mesh под названием Warden, который позволяет нам кастомизировать все механизмы межсервисного взаимодействия. При помощи Resolver у нас реализован собственный контракт для Service Discovery, в котором мы передаём различную дополнительную информацию про наши сервисы. На базе Balancer у нас реализован механизм определения падения дата-центров на стороне клиента. Ну а при помощи Picker’ов мы реализовали различные алгоритмы балансировки, которые показались нам интересными и перспективными (с возможностью конфигурирования их ключевых параметров по команде управляющего сервера — серверной части Warden). 

Надеюсь, вам была полезна эта статья и при помощи описанных алгоритмов вы сможете построить свои механизмы, которые усовершенствуют ваши проекты :)

Если статья понравится многим (например, наберёт больше 101 плюса :D), то я напишу вторую часть, где рассмотрю принципы работы сложных балансировщиков, которые могут реализовывать логику переключения трафика между разными группами адресов, а также отслеживать количество ошибок, которые возвращают отдельные сабконнекты, и исключать их из балансировки.

Все актуальные вакансии нашей компании вы можете найти на job.ozon.ru. Сейчас в отделе Service Mesh, который ковыряется под капотом gRPC и разрабатывает Warden, есть открытая вакансия :) Больше про Warden можно узнать из доклада. 

P.S. исходный код примеров доступен по ссылке. Он является демонстрационным. Не используйте его в проде :)