golang

Event Sourcing и CQRS в Go

  • среда, 20 декабря 2023 г. в 00:00:06
https://habr.com/ru/companies/otus/articles/781302/

Привет, Хабр!

Сегодня мы поговорим о двух концепциях — Event Sourcing и CQRS, и их реализации на ЯП Go. Go предоставляет хорошие возможности для реализации этих паттернов благодаря своей производительности, простоте и поддержке конкурентности "из коробки".

Особенности Go, делающие его подходящим для этих паттернов

Go — это компилируемый язык, это говорит о его быстром времи отклика, что практически мастхев для систем, использующих Event Sourcing и CQRS. Go предоставляет примитивы для параллельного выполнения и конкурентности, такие как горутины и каналы.

Сильная статическая типизация в Go помогает предотвратить многие ошибки на этапе компиляции, что улучшает надежность системы.

Go имеет возможности для работы с HTTP, это упрощает создание RESTful API, важных для CQRS, где команды и запросы часто передаются через HTTP. В Go удобно работать с сериализацией данных (например, JSON, Protobuf), что необходимо для сохранения событий в Event Sourcing.

Event Sourcing

Event Sourcing — это архитектурный паттерн, который фундаментально меняет подход к обработке и хранению данных в приложениях. В традиционных системах обычно хранится только текущее состояние объектов (например, последний баланс счета в банковской системе). Event Sourcing же предлагает другой подход: вместо хранения только последнего состояния объекта, она сохраняет последовательность всех событий (events), которые привели к данному состоянию.

Основные концепции

  1. События:

    • Событие — это запись о том, что что-то произошло. Например, "снятие денег со счета на сумму X" или "добавление товара в корзину". Каждое событие представляет собой неизменяемый объект. Это значит, что оно никогда не изменяется после создания. События хранятся последовательно и обычно связаны с определенным объектом или агрегатом. В Go события обычно реализуются как интерфейсы или структуры с методами для сериализации и десериализации.

  2. Хранение событий:

    • Вместо традиционных баз данных, Event Sourcing использует специализированные хранилища событий (Event Stores). Event Store сохраняет все события в порядке их возникновения, что позволяет восстановить состояние системы на любой момент времени. В Go может быть реализован как простое in-memory хранилище для прототипирования или с использованием более сложных решений, таких как БД.

  3. Aggregate Roots:

    • Он служит точкой входа для агрегата, инкапсулируя бизнес-логику и гарантируя согласованность. Каждый Aggregate Root имеет уникальный идентификатор и внутреннее состояние, которое изменяется через события. В Gо реализуется как структура с методами для обработки команд и применения событий:

  4. Проекции:

    • Проекции используются для создания представлений данных на основе событий, подходящих для конкретных задач или запросов. Например, можно создать проекцию, показывающую все операции по счету клиента.

  5. Снимки состояния:

    • Снимки состояния используются для ускорения процесса восстановления состояния, сохраняя текущее состояние объекта на определенный момент времени.

  6. Проекции:

    • Проекции используются для создания представлений данных на основе событий, подходящих для конкретных задач или запросов.

Реализация Event Sourcing в Go

В Event Sourcing каждое событие представляет собой значимое изменение в системе. Сначала мы определяем структуру для событий:

type Event interface {
    Data() interface{}
    Timestamp() time.Time
}

type AccountCreatedEvent struct {
    accountID string
    timestamp time.Time
    owner     string
    balance   float64
}

func (e AccountCreatedEvent) Data() interface{} { return e }
func (e AccountCreatedEvent) Timestamp() time.Time { return e.timestamp }

Определяем интерфейс Event и конкретную реализацию AccountCreatedEvent. Это событие будет использоваться для представления создания нового счета.

Хранилище событий — это центральная часть системы Event Sourcing. Оно отвечает за сохранение и извлечение событий:

type EventStore interface {
    Save(events []Event) error
    Load(aggregateID string) ([]Event, error)
}

// использование in-memory хранилища
type InMemoryEventStore struct {
    store map[string][]Event
}

func NewInMemoryEventStore() *InMemoryEventStore {
    return &InMemoryEventStore{
        store: make(map[string][]Event),
    }
}

func (s *InMemoryEventStore) Save(events []Event) error {
    for _, event := range events {
        aggregateID := event.Data().(AccountCreatedEvent).accountID
        s.store[aggregateID] = append(s.store[aggregateID], event)
    }
    return nil
}

func (s *InMemoryEventStore) Load(aggregateID string) ([]Event, error) {
    events, ok := s.store[aggregateID]
    if !ok {
        return nil, fmt.Errorf("aggregate not found")
    }
    return events, nil
}

InMemoryEventStore реализует EventStore интерфейс.

Агрегаты — это объекты, состояние которых изменяется событиями. К примеру это будет банковский счет:

type Account struct {
    ID      string
    Owner   string
    Balance float64
}

func NewAccount(id, owner string) *Account {
    return &Account{
        ID:    id,
        Owner: owner,
    }
}

func (a *Account) Apply(event Event) {
    switch e := event.(type) {
    case AccountCreatedEvent:
        a.ID = e.accountID
        a.Owner = e.owner
        a.Balance = e.balance
    }
}

Apply используется для изменения состояния агрегата на основе события.

Команды — это запросы на выполнение операций. Обработчик команд изменяет состояние агрегатов и генерирует события.

type Command interface {
    Execute(store EventStore) error
}

type CreateAccountCommand struct {
    ID    string
    Owner string
}

func (c CreateAccountCommand) Execute(store EventStore) error {
    // проверяем, существует ли уже счет
    _, err := store.Load(c.ID)
    if err == nil {
        return fmt.Errorf("account already exists")
    }

    // создаем событие создания счета
    event := AccountCreatedEvent{
        accountID: c.ID,


        timestamp: time.Now(),
        owner:     c.Owner,
        balance:   0.0,
    }

    // сохраняем событие
    return store.Save([]Event{event})
}

Пример использования

func main() {
    store := NewInMemoryEventStore()
    cmd := CreateAccountCommand{ID: "123", Owner: "John Doe"}
    
    if err := cmd.Execute(store); err != nil {
        log.Fatalf("Failed to execute command: %v", err)
    }

    // восстановление состояния агрегата
    events, _ := store.Load("123")
    account := NewAccount("123", "")
    for _, event := range events {
        account.Apply(event)
    }

    fmt.Printf("Account created: %+v\n", account)
}

Примеры с горутинами

В Event Sourcing каждое изменение состояния приложения представляется в виде события, которое сохраняется в журнале событий. Горутины в Go могут быть использованы для асинхронной обработки этих событий.

Отправка и обработка событий

package main

import (
    "fmt"
    "sync"
    "time"
)

// Event представляет событие в системе
type Event struct {
    Data string
}

// обработчик событий
func handleEvent(event Event, wg *sync.WaitGroup) {
    defer wg.Done()
    // Имтация обработки события
    fmt.Println("Обрабатывается событие:", event.Data)
    time.Sleep(2 * time.Second)
}

func main() {
    var wg sync.WaitGroup
    events := []Event{
        {Data: "Событие 1"},
        {Data: "Событие 2"},
        {Data: "Событие 3"},
    }

    for _, event := range events {
        wg.Add(1)
        go handleEvent(event, &wg)
    }

    wg.Wait() //ожидание завершения всех горутин
}

handleEvent является функцией, которая асинхронно обрабатывает события. Каждое событие обрабатывается в своей горутине.

Публикация и подписка на события

package main

import (
    "fmt"
    "sync"
    "time"
)

// Event представляет событие
type Event struct {
    Data string
}

// EventChannel - канал для событий
type EventChannel chan Event

// Publish отправляет событие в канал
func (ec EventChannel) Publish(event Event) {
    ec <- event
}

// Subscribe обрабатывает события из канала
func (ec EventChannel) Subscribe(wg *sync.WaitGroup) {
    for event := range ec {
        fmt.Println("Подписка на событие:", event.Data)
    }
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    eventChannel := make(EventChannel, 10)

    wg.Add(1)
    go eventChannel.Subscribe(&wg)

    events := []Event{
        {Data: "Событие 1"},
        {Data: "Событие 2"},
        {Data: "Событие 3"},
    }

    for _, event := range events {
        eventChannel.Publish(event)
        time.Sleep(1 * time.Second) // Имитация задержки
    }

    close(eventChannel) // Закрытие канала
    wg.Wait()           // Ожидание завершения подписки
}

Горутина Subscribe ожидает события из канала и обрабатывает их по мере поступления.

CQRS

CQRS разделяет систему на две части: одну для обработки команд (Command), то есть запросов, изменяющих состояние системы, и другую для обработки запросов (Query), то есть запросов, возвращающих данные, но не изменяющих состояние. Этот паттерн основан на принципе CQS, предложенном Бертраном Мейером, но адаптирован под нужды нынешнего по.

Основные аспекты

Команды (Commands): Отвечают за выполнение действий, которые изменяют состояние системы. Каждая команда должна быть неделимой (атомарной) и ориентирована на выполнение конкретной операции.

Запросы (Queries): Предназначены для получения данных из системы. Они не должны влиять на состояние системы и, таким образом, могут быть оптимизированы для быстрого и эффективного извлечения данных.

Разделение команд и запросов позволяет оптимизировать каждую часть системы независимо. Например, можно использовать разные подходы к хранению данных для команд и запросов.

CQRS позволяет системе легче масштабироваться, так как операции чтения и записи могут быть разделены и обрабатываться разными серверами или даже разными сервисами. В сложных системах, где операции чтения и записи значительно различаются, CQRS может упростить проектирование и реализацию.

CQRS естественно дополняет Event Sourcing. В системах, использующих Event Sourcing, команды приводят к созданию событий, которые затем сохраняются в Event Store, событиия могут быть использованы для создания и обновления представлений, которые служат для обработки запросов в CQRS.

Event Sourcing предоставляет полную историю всех изменений, которые произошли в системе, в то время как CQRS позволяет обрабатывать запросы на основе этой информации.

Реализация CQRS может выглядеть так:

Сначала определяем базовые структуры для команд и запросов:

// command представляет собой интерфейс для всех команд
type Command interface {
    Execute() error
}

// query представляет собой интерфейс для всех запросов
type Query interface {
    Execute() (interface{}, error)
}

Реализуем обработчик команд, который будет принимать команды и выполнять соответствующие действия:

// CreateOrderCommand — команда для создания заказа
type CreateOrderCommand struct {
    OrderID string
    Product string
    Amount  int
}

func (c *CreateOrderCommand) Execute() error {
    // лгика создания заказа
    // ...
    fmt.Printf("Order created: %s\n", c.OrderID)
    return nil
}

type CommandHandler struct{}

func (h *CommandHandler) Handle(command Command) error {
    return command.Execute()
}

Теперь реализуем обработчик запросов для получения данных:

// GetOrderQuery — запрос на получение информации о заказе
type GetOrderQuery struct {
    OrderID string
}

func (q *GetOrderQuery) Execute() (interface{}, error) {
    // логика получения информации о заказе
    // ...
    return OrderDetails{OrderID: q.OrderID, Product: "Example Product", Amount: 2}, nil
}

// QueryHandler обрабатывает запросы
type QueryHandler struct{}

func (h *QueryHandler) Handle(query Query) (interface{}, error) {
    return query.Execute()
}

Интегрируем обработчики команд и запросов в приложение:

func main() {
    cmdHandler := CommandHandler{}
    qryHandler := QueryHandler{}

    // Обработка команды
    createOrderCmd := &CreateOrderCommand{OrderID: "1", Product: "Book", Amount: 1}
    if err := cmdHandler.Handle(createOrderCmd); err != nil {
        log.Fatalf("Command execution failed: %v", err)
    }

    // Обработка запроса
    getOrderQry := &GetOrderQuery{OrderID: "1"}
    orderDetails, err := qryHandler.Handle(getOrderQry)
    if err != nil {
        log.Fatalf("Query execution failed: %v", err)
    }
    fmt.Printf("Order details: %+v\n", orderDetails)
}

Если вы реализуете CQRS в сочетании с Event Sourcing, команды будут генерировать события, которые сохраняются в Event Store.

RESTful API в CQRS на go

Для CQRS важно чётко определить команды и запросы как отдельные ресурсы. Например, если у нас есть сущность Order, мы можем иметь следующие эндпоинты:

  • Для команд:

    • POST /orders - создание нового заказа.

    • PUT /orders/{id} - обновление существующего заказа.

    • DELETE /orders/{id} - удаление заказа.

  • Для запросов:

    • GET /orders - получение списка заказов.

    • GET /orders/{id} - получение информации о конкретном заказе.

Пример для команды создания заказа:

@app.route('/orders', methods=['POST'])
def create_order():
    order_data = request.get_json()
    # Здесь может быть логика валидации
    result = command_bus.execute(CreateOrderCommand(order_data))
    return jsonify(result), 201

Здесь command_bus.execute() обрабатывает команду CreateOrderCommand, которая отвечает за бизнес-логику создания заказа.

Для запроса информации о заказе:

@app.route('/orders/<string:order_id>', methods=['GET'])
def get_order(order_id):
    order = query_bus.execute(GetOrderQuery(order_id))
    if order:
        return jsonify(order)
    else:
        return 'Order not found', 404

query_bus.execute() здесь обрабатывает запрос GetOrderQuery, возвращая данные конкретного заказа.

В CQRS команды часто обрабатываются асинхронно. Это можно реализовать через очередь сообщений:

@app.route('/orders', methods=['POST'])
def create_order_async():
    order_data = request.get_json()
    # Здесь может быть логика валидации
    message_bus.publish(CreateOrderCommand(order_data))
    return 'Accepted', 202

В этом случае ответ отправляется немедленно, а команда обрабатывается в фоне.

В CQRS можно версиями API:

@app.route('/v1/orders/<string:order_id>', methods=['GET'])
def get_order_v1(order_id):
    # Логика для версии 1
    pass

@app.route('/v2/orders/<string:order_id>', methods=['GET'])
def get_order_v2(order_id):
    # Логика для версии 2
    pass

Тестирование

Event Sourcing добавляет слой сложности в тестирование, поскольку состояние системы восстанавливается через последовательность событий.

Убедитесь, что команды генерируют правильные события. Это включает в себя проверку типов событий и данных, которые они содержат. Проверьте, что система корректно восстанавливает состояние из последовательности событий.

В тестах можно использовать mock-объекты для имитации хранения событий, чтобы изолировать тесты от реальной базы данных.

Пример теста:

func TestCreateAccount(t *testing.T) {
    command := CreateAccountCommand{...}
    eventStore := NewMockEventStore() // Имитация хранилища событий
    commandHandler := NewCommandHandler(eventStore)

    err := commandHandler.Handle(command)
    assert.Nil(t, err)
    assert.Equal(t, 1, len(eventStore.Events))
    assert.IsType(t, AccountCreatedEvent{}, eventStore.Events[0])
}

При тестировании CQRS важно раздельно тестировать командную и запросную части системы.

Пример теста запросной части в Go:

func TestGetAccountDetails(t *testing.T) {
    query := GetAccountDetailsQuery{AccountID: "123"}
    queryHandler := NewQueryHandler(/* зависимости */)

    result, err := queryHandler.Handle(query)
    assert.Nil(t, err)
    assert.NotNil(t, result)
    assert.Equal(t, "123", result.AccountID)
}

В гоу есть пакет testing , он предоставляет инструменты для написания модульных и интеграционных тестов. Использование интерфейсов в Go упрощает создание mock-объектов для тестирования, особенно при работе с внешними зависимостями, к примеру с БД.


Go в целом хороший выбор для реализации этих паттернов благодаря своей производительности, простоте и поддержке конкурентных операций.

Изучить актуальные концепции и паттерны для всех популярных языков программирования можно на онлайн-курсах OTUS под руководством опытных преподавателей.