golang

Алгоритмы балансировки нагрузки на Go

  • среда, 10 января 2024 г. в 00:00:13
https://habr.com/ru/companies/otus/articles/782064/

Салют Хабр и с наступающим новым годом!

Каждое приложение стремится быть доступным 24/7, балансировка нагрузки становится настоящим мастхевом.

В самом общем смысле, балансировка, это процесс распределения входящего трафика между несколькими серверами или ресурсами.

Представьте себе мост, по которому одновременно пытаются пройти тысячи людей. Без правильной организации движения это превратится в хаос:

То же самое происходит и в сетевых системах. Балансировка нагрузки - это наш дирижёр, который гармонично распределяет потоки данных, обеспечивая эффективность и стабильность работы системы.

Предполагаю, что читатель уже знаком с базовой терминологической частью балансировки нагрузки, поэтому перехожу сразу к делу.

Реализация балансировки в Go

Round-Robin

Round-Robin – это алгоритм балансировки нагрузки, который распределяет входящие запросы последовательно и равномерно между всеми серверами в пуле. Основная идея заключается в том, чтобы "пройтись по кругу" и назначить каждый новый запрос следующему серверу в списке.

Когда поступает запрос, алгоритм назначает его серверу, который следует по порядку после последнего, обработавшего запрос. После достижения конца списка, цикл начинается снова с первого сервера.

В базовой версии алгоритма не учитывается текущая нагрузка или состояние серверов. Это означает, что все серверы получают одинаковое количество запросов, независимо от их текущей загруженности или производительности.

Для отслеживания, какой сервер следующий в очереди, обычно используется простой индекс (или указатель), который перемещается по списку серверов.

Создадим базовую структуру для балансировщика нагрузки с использованием этого алгоритма:

определим сервер:

package main

import (
    "fmt"
    "net/http"
    "strconv"
    "sync"
    "time"
)

// srver - структура для имитации сервера
type Server struct {
    ID int
}

// handleRequest - обработка запроса сервером
func (s *Server) HandleRequest(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Server %d handled request\n", s.ID)
    time.Sleep(100 * time.Millisecond) // Имитация обработки
}

// cодание списка серверов
func createServers(count int) []*Server {
    var servers []*Server
    for i := 0; i < count; i++ {
        servers = append(servers, &Server{ID: i + 1})
    }
    return servers
}

реализовываем балансировщик:

type RoundRobinBalancer struct {
    servers []*Server
    current int
    lock    sync.Mutex
}

func NewRoundRobinBalancer(servers []*Server) *RoundRobinBalancer {
    return &RoundRobinBalancer{servers: servers}
}

func (b *RoundRobinBalancer) GetNextServer() *Server {
    b.lock.Lock()
    defer b.lock.Unlock()

    server := b.servers[b.current]
    b.current = (b.current + 1) % len(b.servers)
    return server
}

// обработка запроса балансировщиком
func (b *RoundRobinBalancer) HandleRequest(w http.ResponseWriter, r *http.Request) {
    server := b.GetNextServer()
    server.HandleRequest(w, r)
}

1-5 строчка:

servers []*Server: Это слайс (динамический массив) указателей на структуры Server. Каждый элемент в этом слайсе представляет собой сервер, между которыми будет распределяться нагрузка. current int: Это целочисленная переменная, которая отслеживает индекс текущего сервера в слайсе servers. Она используется для определения, какой сервер следующий в очереди для обработки запроса. lock sync.Mutex: Это мьютекс из пакета sync, который используется для синхронизации доступа к данным структуры RoundRobinBalancer. Это необходимо, чтобы предотвратить одновременный доступ к данным из разных горутин.

11-18 строчка:

b.lock.Lock(): Блокирует мьютекс перед чтением или изменением данных, чтобы обеспечить безопасный доступ в многопоточной среде. defer b.lock.Unlock(): Гарантирует, что мьютекс будет разблокирован после завершения работы метода. defer гарантирует выполнение этой операции даже если в процессе работы метода произойдёт ошибка или паника. server := b.servers[b.current]: Получает текущий сервер из слайса. b.current = (b.current + 1) % len(b.servers): Перемещает индекс current на следующий сервер. Использование операции модуля (%) гарантирует, что индекс вернётся к началу слайса, когда достигнет его конца. return server: Возвращает выбранный сервер.

запустим сервер с балансировщиком

func main() {
    servers := createServers(5) // Создание 5 серверов
    balancer := NewRoundRobinBalancer(servers)

    http.HandleFunc("/", balancer.HandleRequest)
    fmt.Println("Load Balancer started on :8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        panic(err)
    }
}

Запросы, поступающие на порт 8080, будут распределяться между пятью "серверами". Каждый "сервер" имитирует обработку запроса с задержкой.

Least Connections

Least Connections – это алгоритм балансировки нагрузки, который направляет входящие запросы на сервер с наименьшим количеством активных соединений. В отличие от алгоритма Round-Robin, который равномерно распределяет запросы, Least Connections учитывает текущую загруженность серверов, стремясь минимизировать время ожидания и улучшить производительность.

Когда поступает новый запрос, алгоритм анализирует текущее количество активных соединений на каждом сервере в пуле.

Запрос направляется на сервер с наименьшим числом активных соединений. Это позволяет более равномерно распределить нагрузку и избежать перегрузки отдельных узлов.

Алгоритм постоянно адаптируется к изменениям в нагрузке.

Реализуем аналогично робину:

Определим сервер

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

// Server - структура для имитации сервера
type Server struct {
    ID         int
    Connection int
    lock       sync.Mutex
}

// HandleRequest - обработка запроса сервером
func (s *Server) HandleRequest(w http.ResponseWriter, r *http.Request) {
    s.lock.Lock()
    s.Connection++
    s.lock.Unlock()

    // Имитация обработки запроса
    time.Sleep(100 * time.Millisecond)
    fmt.Fprintf(w, "Server %d handled request\n", s.ID)

    s.lock.Lock()
    s.Connection--
    s.lock.Unlock()
}

// Создание списка серверов
func createServers(count int) []*Server {
    var servers []*Server
    for i := 0; i < count; i++ {
        servers = append(servers, &Server{ID: i + 1})
    }
    return servers
}

Реализуем балансировщик

// LeastConnectionsBalancer - структура для балансировщика
type LeastConnectionsBalancer struct {
    servers []*Server
    lock    sync.Mutex
}

// NewLeastConnectionsBalancer - создание нового балансировщика
func NewLeastConnectionsBalancer(servers []*Server) *LeastConnectionsBalancer {
    return &LeastConnectionsBalancer{servers: servers}
}

// GetServerWithLeastConnections - получение сервера с наименьшим количеством соединений
func (b *LeastConnectionsBalancer) GetServerWithLeastConnections() *Server {
    b.lock.Lock()
    defer b.lock.Unlock()

    var minConnServer *Server
    minConn := int(^uint(0) >> 1) // максимально возможное значение int

    for _, server := range b.servers {
        server.lock.Lock()
        if server.Connection < minConn {
            minConn = server.Connection
            minConnServer = server
        }
        server.lock.Unlock()
    }

    return minConnServer
}

// HandleRequest - обработка запроса балансировщиком
func (b *LeastConnectionsBalancer) HandleRequest(w http.ResponseWriter, r *http.Request) {
    server := b.GetServerWithLeastConnections()
    server.HandleRequest(w, r)
}

12-30 строчка

b.lock.Lock() и defer b.lock.Unlock() строки блокируют и разблокируют мьютекс, обеспечивая безопасный доступ к общим данным.

minConn := int(^uint(0) >> 1)устанавливает minConn в максимально возможное значение для типа int, чтобы любой сервер с меньшим количеством соединений мог быть выбран.

Цикл for перебирает все серверы, и если текущий сервер имеет меньше соединений, чем minConn, он обновляет minConn и minConnServer.

Запуск сервра

func main() {
    servers := createServers(5) // Создание 5 серверов
    balancer := NewLeastConnectionsBalancer(servers)

    http.HandleFunc("/", balancer.HandleRequest)
    fmt.Println("Load Balancer started on :8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        panic(err)
    }
}

Для интерактивности можете попробовать самостоятельно добавить логирование, конфигурационные файлы для управления количеством серверов, более сложную обработку ошибок и тесты.

IP Hash

IP Hash – это алгоритм балансировки нагрузки, который использует хэш-функцию для преобразования IP-адреса клиента в номер сервера. Этот метод гарантирует, что запросы от одного и того же IP-адреса всегда будут направляться на один и тот же сервер, до тех пор, пока не изменится конфигурация серверов.

Когда поступает запрос, алгоритм вычисляет хэш от IP-адреса клиента.

Хэш-значение затем используется для выбора сервера из пула. Это может быть, например, остаток от деления хэша на количество серверов. Поскольку каждый IP-адрес всегда хэшируется в одно и то же число, клиент будет обслуживаться тем же сервером при повторных запросах.

Определим подобным образом как и предыдущим сервер

package main

import (
    "fmt"
    "net/http"
    "time"
)

type Server struct {
    ID int
}

func (s *Server) HandleRequest(w http.ResponseWriter, r *http.Request) {
    time.Sleep(time.Millisecond * 100) // Имитация обработки
    fmt.Fprintf(w, "Server %d handled request\n", s.ID)
}

func createServers(count int) []*Server {
    var servers []*Server
    for i := 0; i < count; i++ {
        servers = append(servers, &Server{ID: i + 1})
    }
    return servers
}

реализуем балансировщик

import (
    "hash/fnv"
    "net"
)

// IPHashBalancer - структура для балансировщика с IP Hash
type IPHashBalancer struct {
    servers []*Server
}

// NewIPHashBalancer - создание нового балансировщика
func NewIPHashBalancer(servers []*Server) *IPHashBalancer {
    return &IPHashBalancer{servers: servers}
}

// hashIPAddress - хэширование IP-адреса
func hashIPAddress(ip net.IP) uint32 {
    hasher := fnv.New32a()
    hasher.Write([]byte(ip))
    return hasher.Sum32()
}

// GetServer - получение сервера на основе IP-адреса
func (b *IPHashBalancer) GetServer(ip net.IP) *Server {
    index := hashIPAddress(ip) % uint32(len(b.servers))
    return b.servers[index]
}

// HandleRequest - обработка запроса балансировщиком
func (b *IPHashBalancer) HandleRequest(w http.ResponseWriter, r *http.Request) {
    ip := net.ParseIP(r.RemoteAddr)
    server := b.GetServer(ip)
    server.HandleRequest(w, r)
}

hashIPAddress

hashIPAddress функция принимает IP-адрес (net.IP) и возвращает его хэш-значение.

fnv.New32a()создаёт новый 32-битный FNV-1a хэшер.

hasher.Write([]byte(ip)):преобразует IP-адрес в байты и записывает их в хэшер.

hasher.Sum32() возвращает 32-битное хэш-значение IP-адреса.

GetServer

GetServer принимает IP-адрес и возвращает сервер, который должен обработать запрос.

hashIPAddress(ip) % uint32(len(b.servers))вычисляет индекс сервера, используя хэш IP-адреса. Индекс получается путём взятия остатка от деления хэша на количество серверов

func main() {
    servers := createServers(5) // Создание 5 серверов
    balancer := NewIPHashBalancer(servers)

    http.HandleFunc("/", balancer.HandleRequest)
    fmt.Println("Load Balancer started on :8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        panic(err)
    }
}

Многопоточность

Пришлое самое время использовать горутины и каналы.

Каналы будут использоваться для передачи запросов от основной горутины, принимающей соединения, к рабочим горутинам, которые затем распределяют запросы между серверами.

Горутины будут выполнять задачи, получаемые от балансировщика. А каналы будем юзать для передачи задач от балансировщика к рабочим и обратной связи.

package main

import (
    "fmt"
    "sync"
    "time"
)

// Task - структура задачи
type Task struct {
    Data int
}

// Worker - функция рабочего, обрабатывающего задачи
func Worker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
    defer wg.Done()
    for task := range tasks {
        fmt.Printf("Worker %d обрабатывает задачу с данными %d\n", id, task.Data)
        time.Sleep(time.Second) // Имитация работы
    }
}

func main() {
    const numWorkers = 5
    tasks := make(chan Task, 10)

    var wg sync.WaitGroup
    wg.Add(numWorkers)

    // Запуск рабочих
    for i := 0; i < numWorkers; i++ {
        go Worker(i, tasks, &wg)
    }

    // Отправка задач рабочим
    for i := 0; i < 20; i++ {
        tasks <- Task{Data: i}
    }
    close(tasks)

    // Ожидание завршения всех рабочих
    wg.Wait()
}

Каждый рабочий представлен горутиной, которая обрабатывает задачи из канала tasks.

Каналы используются для передачи задач от основной горутины к рабочим.

WaitGroup используется для ожидания завершения всех рабочих горутин.

Балансировка достигается за счет равномерного распределения задач между рабочими.


Про балансировку нагрузки и другие инструменты инфраструктуры, эксперты OTUS рассказывают в рамках онлайн-курсов, ознакомиться с которыми можно по этой ссылке. Также напоминаю о том, что в календаре мероприятий любой желающий может зарегистрироваться на бесплатные вебинары курсов.