Создание кастомного балансировщика нагрузки на Go для gRPC с приоритизацией адресов
- суббота, 16 ноября 2024 г. в 00:00:08
В процессе разработки микросервисных приложений часто необходимо наладить эффективную и быструю коммуникацию между сервисами. Разработанный Google gRPC предоставляет высокопроизводительный фреймворк для организации такого взаимодействия. Однако стандартные балансировщики нагрузки в gRPC не всегда удовлетворяют специфическим требованиям, особенно когда требуется приоритизация адресов для минимизации сетевых задержек и обеспечения отказоустойчивости.
В этой статье я поделюсь опытом создания кастомного балансировщика нагрузки на Go для gRPC, который использует приоритеты адресов для выбора наилучшего соединения. Это решение позволяет гибко управлять распределением клиентских запросов между серверами с разными уровнями доступности и обеспечивает подключение к оптимальному ЦОД с минимальными задержками.
При разработке одного из проектов VK Tech мне потребовалось реализовать балансировщик, который выбирает первый доступный адрес из приоритетного списка. Приоритеты адресов определяются порядком в конфигурационном файле: чем выше адрес в списке, тем выше его приоритет. В случае недоступности адреса с наивысшим приоритетом балансировщик должен автоматически переключаться на следующий доступный адрес по приоритету.
Требования к балансировщику:
Приоритизация адресов: выбор адреса с наивысшим приоритетом из списка.
Отказоустойчивость: автоматическое переключение на следующий адрес при недоступности текущего.
Минимизация задержек: подключение к ближайшему или наиболее оптимальному ЦОД.
Стандартные балансировщики в gRPC, такие как round-robin (циклический) и pick-first («первый доступный»), не учитывают приоритизацию адресов в списке.
Round-robin равномерно распределяет запросы между всеми доступными серверами, что может привести к увеличению сетевых задержек, если некоторые серверы географически удалены или менее производительны.
Pick-first всегда выбирает первый доступный адрес, но не переключается на адреса с более высоким приоритетом, если они становятся доступными после первоначального подключения.
Таким образом, для решения задачи минимизации задержек и обеспечения гибкости подключения к различным ЦОДам стандартные балансировщики не подходят.
Наш кастомный балансировщик использует приоритизацию адресов, заданную в конфигурационном файле, для выбора наилучшего соединения.
Порядок адресов: адреса упорядочены по приоритету; индекс 0 — наивысший приоритет.
Выбор соединения: всегда выбирается первое доступное соединение с наивысшим приоритетом.
Автоматическое переключение: при недоступности текущего соединения балансировщик переключается на следующий по приоритету.
Преимущества такого подхода:
Минимизация сетевых задержек.
Повышенная отказоустойчивость.
Гибкость настройки.
Перед тем как перейти к реализации, рассмотрим основные компоненты нашего балансировщика и их взаимодействие.
Балансировщик в gRPC создаётся с помощью билдера. Наш BalancerBuilder регистрирует балансировщик с определённым именем и схемой, чтобы gRPC-клиент мог его использовать.
type BalancerBuilder struct{}
func (b BalancerBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
return &Balancer{
cc: cc,
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
state: connectivity.Connecting,
}
}
func (b BalancerBuilder) Name() string { return balancerName }
func init() {
balancer.Register(&BalancerBuilder{})
}
Основные задачи билдера:
Создание и инициализация балансировщика.
Настройка взаимодействия с ClientConn.
Регистрация балансировщика для использования клиентом.
Резолвер предоставляет балансировщику список адресов с их приоритетами. Он преобразует адреса из конфигурационного файла в resolver.Address, присваивая каждому адресу атрибут index, соответствующий его приоритету.
type resolverBuilder struct {
addresses []resolver.Address
}
func (b *resolverBuilder) Build(
target resolver.Target,
clientConn resolver.ClientConn,
_ resolver.BuildOptions,
) (resolver.Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())
res := &fiResolver{
ctx: ctx,
cancel: cancel,
target: target,
cc: clientConn,
addressesStore: b.addresses,
}
if len(b.addresses) > 1 {
res.serviceConfig = clientConn.ParseServiceConfig(defaultConfig)
}
go res.start()
return res, nil
}
func (*resolverBuilder) Scheme() string {
return scheme
}
func initResolver(addresses []string) {
addressesStore := make([]resolver.Address, len(addresses))
for i, addr := range addresses {
addressesStore[i] = resolver.Address{
Addr: addr,
Attributes: attributes.New("index", i),
}
}
resolver.Register(&resolverBuilder{addresses: addressesStore})
}
Функции резолвера:
Динамическое обновление адресов.
Предоставление адресов с приоритетами балансировщику.
Сообщение об ошибках в случае недоступности адресов.
Picker выбирает соединение с наименьшим индексом (наивысшим приоритетом) из доступных. Если соединение с более высоким приоритетом становится доступным, балансировщик автоматически переключается на него.
type firstIdxPicker struct {
result balancer.PickResult
err error
}
func (p *firstIdxPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
return p.result, p.err
}
func NewFIPicker(info base.PickerBuildInfo) balancer.Picker {
if len(info.ReadySCs) == 0 {
return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable}
}
minIdx := math.MaxInt
var selectedConn balancer.SubConn
for sc, scInfo := range info.ReadySCs {
idx, ok := scInfo.Address.Attributes.Value("index").(int) // <- наш простенький алгоритм определения оптимального соединения
if ok && idx < minIdx {
minIdx = idx
selectedConn = sc
}
}
if selectedConn != nil {
return &firstIdxPicker{result: balancer.PickResult{SubConn: selectedConn}}
}
return &firstIdxPicker{err: balancer.ErrNoSubConnAvailable}
}
Алгоритм выбора:
Проходит по всем готовым соединениям.
Выбирает соединение с наименьшим index.
Возвращает выбранное соединение для обработки запроса.
Балансировщик отслеживает состояния соединений и регенерирует Picker при их изменении.
type Balancer struct {
cc balancer.ClientConn
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
subConns *resolver.AddressMap
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
resolverErr error
connErr error
}
func (b *Balancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
b.resolverErr = nil
addressMap := b.createNewSubConnections(ccs)
for _, addr := range b.subConns.Keys() {
if _, ok := addressMap.Get(addr); !ok {
sci, _ := b.subConns.Get(addr)
sc := sci.(balancer.SubConn)
sc.Shutdown()
b.subConns.Delete(addr)
}
}
if len(ccs.ResolverState.Addresses) == 0 {
b.ResolverError(errZeroAddresses)
return balancer.ErrBadResolverState
}
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
return nil
}
func (b *Balancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
oldState, ok := b.scStates[subConn]
if !ok {
return
}
b.scStates[subConn] = state.ConnectivityState
switch state.ConnectivityState {
case connectivity.Idle:
subConn.Connect()
case connectivity.Shutdown:
delete(b.scStates, subConn)
case connectivity.TransientFailure:
b.connErr = state.ConnectionError
}
b.state = b.csEvltr.RecordTransition(oldState, state.ConnectivityState)
if (state.ConnectivityState == connectivity.Ready) != (oldState == connectivity.Ready) || b.state == connectivity.TransientFailure {
b.regeneratePicker()
}
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
func (b *Balancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
b.picker = &firstIdxPicker{err: errors.Join(b.resolverErr, b.connErr)}
return
}
readySCs := make(map[balancer.SubConn]base.SubConnInfo)
for _, addr := range b.subConns.Keys() {
sci, _ := b.subConns.Get(addr)
sc := sci.(balancer.SubConn)
if state, ok := b.scStates[sc]; ok && state == connectivity.Ready {
readySCs[sc] = base.SubConnInfo{Address: addr}
}
}
b.picker = NewFIPicker(base.PickerBuildInfo{ReadySCs: readySCs})
}
Отслеживание состояний соединений:
UpdateClientConnState: создание новых и удаление неактуальных соединений.
UpdateSubConnState: обновление состояний существующих соединений.
regeneratePicker: обновление пикера при изменении состояний для выбора оптимального соединения.
Для использования кастомного балансировщика необходимо определить его имя и схему, а также настроить подключение.
const (
scheme = "scheme-name"
balancerName = "pick_idx_first"
defaultConfig = `{"loadBalancingConfig": [{"pick_idx_first": {}}]}`
retryTimeout = time.Millisecond * 100
maxRetries = 10
)
type ConnOptions struct {
Addrs []string
Opts []grpc.DialOption
}
func NewConn(ctx context.Context, connOptions ConnOptions) (*grpc.ClientConn, error) {
conn, err := dialContext(ctx, connOptions.Addrs, connOptions.Opts...)
if err != nil {
return nil, fmt.Errorf("unable to initialize conn: %w", err)
}
return conn, nil
}
func dialContext(ctx context.Context, addresses []string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
...
opts = append(opts,
grpc.WithDefaultServiceConfig(defaultConfig),
grpc.WithStreamInterceptor(
retry.StreamClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)),
),
grpc.WithUnaryInterceptor(
retry.UnaryClientInterceptor(retry.WithPerRetryTimeout(retryTimeout), retry.WithMax(maxRetries)),
),
)
...
initResolver(addresses)
return grpc.DialContext(ctx, fmt.Sprintf("%s:///", scheme), opts...)
Параметры подключения:
scheme и balancerName: определяют кастомный балансировщик.
defaultConfig: задаёт конфигурацию балансировки.
Интерсепторы: добавлены для повторных подключений при кратковременных сбоях. Я использовал пакет github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry.
В процессе тестирования балансировщик показал стабильную работу при переключении между адресами в случае недоступности сервера с более высоким приоритетом. Задержки были минимизированы благодаря приоритетному подключению к ближайшему ЦОДу.
Создание кастомного gRPC-балансировщика с приоритизацией адресов позволяет более точно контролировать распределение клиентских запросов и улучшить производительность приложения. Такое решение обеспечивает гибкость настройки, минимизацию сетевых задержек и повышенную отказоустойчивость, что особенно важно в современных микросервисных архитектурах.
Преимущества кастомного решения:
Гибкость: настройка приоритетов адресов.
Эффективность: минимизация задержек за счёт выбора оптимального соединения.
Отказоустойчивость: автоматическое переключение при недоступности сервера.
Перспективы развития:
Динамическое обновление приоритетов.
Интеграция с сервисами обнаружения.
Расширение логики выбора на основе метрик производительности.
Надеюсь, эта статья поможет вам в создании кастомных решений для ваших gRPC-приложений.