golang

Простейший пример kafka + golang

  • воскресенье, 1 октября 2023 г. в 00:00:18
https://habr.com/ru/articles/764582/

В данной статье представлен простой способ реализации микросервисной архитектуры с использованием Kafka, Golang и Docker.

Если вы сразу хотите перейти к рабочему коду, вот ссылка на репозиторий

Общий процесс работы

  1. Клиент отправляет HTTP-запрос на первый микросервис (API Gateway), используя, например, Postman.

  2. API Gateway передает запрос в Kafka, откуда его принимает второй микросервис.

  3. Второй микросервис обрабатывает запрос и отправляет ответ обратно в Kafka.

  4. API Gateway извлекает ответ из Kafka и возвращает его клиенту.

Примечание: Важным аспектом является возможность сопоставления запросов и ответов, чтобы при параллельной обработке множества запросов каждый ответ был возвращен соответствующему клиенту.

Краткий обзор кода

Конфигурация API Gateway:

Файл api-gateway/main.go:

package main

import (
	"encoding/json"
	"fmt"
	"sync"
	"time"

	"github.com/IBM/sarama"
	"github.com/gin-gonic/gin"
)

// MyMessage - структура для нашего сообщения
type MyMessage struct {
	ID    string `json:"id"`
	Name  string `json:"name"`
	Value string `json:"value"`
}

// responseChannels - словарь для хранения каналов ответов, индексированных по ID запроса
// mu - мьютекс для обеспечения синхронизации доступа к словарю responseChannels
var responseChannels map[string]chan *sarama.ConsumerMessage
var mu sync.Mutex

func main() {
	responseChannels = make(map[string]chan *sarama.ConsumerMessage)

	// Создание продюсера Kafka
	producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil)
	if err != nil {
		panic(err)
	}
	defer producer.Close()

	// Создание консьюмера Kafka
	consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// Подписка на партицию "pong" в Kafka
	partConsumer, err := consumer.ConsumePartition("pong", 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}
	defer partConsumer.Close()

	// Горутина для обработки входящих сообщений от Kafka
	go func() {
		for {
			select {
            // Чтение сообщения из Kafka
			case msg := <-partConsumer.Messages():
				responseID := string(msg.Key)              // Извлекаем ID ответа из сообщения
				mu.Lock()                                  // Блокируем мьютекса
				ch, exists := responseChannels[responseID] // Ищем канал для данного ID ответа
				if exists { 
  					ch <- msg                              // Передаём сообщения в канал
					delete(responseChannels, responseID)   // Удаляем канала из словаря
				}
				mu.Unlock()                                // Разблокировка мьютекса
			}
		}
	}()

	// Инициализация роутера Gin
	router := gin.Default()
	router.GET("/ping", func(c *gin.Context) {
		// (определение и отправка сообщения в Kafka)
		requestID := fmt.Sprintf("%d", time.Now().UnixNano())

		message := MyMessage{
			ID:    requestID,
			Name:  "Ping",
			Value: "Pong",
		}

        // Преобразование сообщения в JSON что бы потом отправить через kafka
		bytes, err := json.Marshal(message) 
		if err != nil {
			c.JSON(500, gin.H{"error": err.Error()})
			return
		}

		msg := &sarama.ProducerMessage{
			Topic: "ping",
			Key:   sarama.StringEncoder(requestID), // Использование ID запроса как ключа
  			Value: sarama.ByteEncoder(bytes),       // Тело сообщения
		}

        // Отправка сообщения в Kafka
        producer.SendMessage(msg)

		responseCh := make(chan *sarama.ConsumerMessage)
		mu.Lock()
		responseChannels[requestID] = responseCh
		mu.Unlock()
        // (получение ответа из Kafka и отправка клиенту)
		select {
		case responseMsg := <-responseCh:
			c.JSON(200, gin.H{"message": string(responseMsg.Value)})
		case <-time.After(10 * time.Second):
			c.JSON(500, gin.H{"error": "timeout waiting for response"})
		}
	})

	router.Run(":8080")
}

Второй микросервис:

Файл second-microservice/main.go:

package main

import (
	"encoding/json"
	"log"

	"github.com/IBM/sarama"
)

// Наша структура для сообщения
type MyMessage struct {
	ID    string `json:"id"`
	Name  string `json:"name"`
	Value string `json:"value"`
}

func main() {
	// Создание консьюмера Kafka
	consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// Подписка на партицию "ping" в Kafka
	partConsumer, err := consumer.ConsumePartition("ping", 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}
	defer partConsumer.Close()

	for {
		select {
		// (обработка входящего сообщения и отправка ответа в Kafka)
		case msg := <-partConsumer.Messages():
			// Десериализация входящего сообщения из JSON
			var receivedMessage MyMessage
			err := json.Unmarshal(msg.Value, &receivedMessage)

			if err != nil {
				log.Printf("Error unmarshaling JSON: %v\n", err)
				continue
			}

			log.Printf("Received message: %+v\n", receivedMessage)

            // Создание продюсера Kafka
			producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil)
			if err != nil {
				panic(err)
			}
			defer producer.Close()

			// Формируем ответное сообщение
			resp := &sarama.ProducerMessage{
				Topic: "pong",
				Key:   sarama.StringEncoder(receivedMessage.ID),
				Value: sarama.StringEncoder(receivedMessage.Name + " " + receivedMessage.Value + " ( " + receivedMessage.ID + " ) "),
			}
			// Отпровляем ответ в gateway 
			producer.SendMessage(resp)
		}
	}
}

Конфигурация docker-compose ( для 3 версии )

Файл docker-compose.yml:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
    networks:
      - kafka-network

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
    networks:
      - kafka-network

  api-gateway:
    build:
      context: ./api-gateway
      dockerfile: Dockerfile
    depends_on:
      - kafka
    networks:
      - kafka-network
    ports:
      - "8080:8080"


  second-microservice:
    build:
      context: ./second-microservice
      dockerfile: Dockerfile
    depends_on:
      - kafka
    networks:
      - kafka-network

networks:
  kafka-network:
    driver: bridge

Примечание: В 3-й версии Docker Compose была удалена функция "depends_on" для контроля за зависимостями сервисами, которая была доступна в 2-й версии. Эта функция позволяла автоматически ожидать, пока зависимые сервисы будут полностью запущены, прежде чем запускать зависящий сервис.

Из-за этого изменения, мы используем скрипт wait-for-it в каждом контейнере ( Dockerfile типовой, его можно посмотреть в репозитории ), чтобы гарантировать, что все необходимые сервисы доступны и полностью функционируют, прежде чем начнется выполнение основной программы.

Скрипт wait-for-it обеспечивает простой и эффективный способ ожидания доступности TCP-хоста и порта.

Это позволяет нам контролировать порядок запуска сервисов и гарантировать, что все необходимые зависимости удовлетворены, прежде чем сервис начнет свою работу.

Рабочий код в репозитории