Простейший пример kafka + golang
- воскресенье, 1 октября 2023 г. в 00:00:18
В данной статье представлен простой способ реализации микросервисной архитектуры с использованием Kafka, Golang и Docker.
Если вы сразу хотите перейти к рабочему коду, вот ссылка на репозиторий
Клиент отправляет HTTP-запрос на первый микросервис (API Gateway), используя, например, Postman.
API Gateway передает запрос в Kafka, откуда его принимает второй микросервис.
Второй микросервис обрабатывает запрос и отправляет ответ обратно в Kafka.
API Gateway извлекает ответ из Kafka и возвращает его клиенту.
Примечание: Важным аспектом является возможность сопоставления запросов и ответов, чтобы при параллельной обработке множества запросов каждый ответ был возвращен соответствующему клиенту.
Файл api-gateway/main.go
:
package main
import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/IBM/sarama"
"github.com/gin-gonic/gin"
)
// MyMessage - структура для нашего сообщения
type MyMessage struct {
ID string `json:"id"`
Name string `json:"name"`
Value string `json:"value"`
}
// responseChannels - словарь для хранения каналов ответов, индексированных по ID запроса
// mu - мьютекс для обеспечения синхронизации доступа к словарю responseChannels
var responseChannels map[string]chan *sarama.ConsumerMessage
var mu sync.Mutex
func main() {
responseChannels = make(map[string]chan *sarama.ConsumerMessage)
// Создание продюсера Kafka
producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil)
if err != nil {
panic(err)
}
defer producer.Close()
// Создание консьюмера Kafka
consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil)
if err != nil {
panic(err)
}
defer consumer.Close()
// Подписка на партицию "pong" в Kafka
partConsumer, err := consumer.ConsumePartition("pong", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer partConsumer.Close()
// Горутина для обработки входящих сообщений от Kafka
go func() {
for {
select {
// Чтение сообщения из Kafka
case msg := <-partConsumer.Messages():
responseID := string(msg.Key) // Извлекаем ID ответа из сообщения
mu.Lock() // Блокируем мьютекса
ch, exists := responseChannels[responseID] // Ищем канал для данного ID ответа
if exists {
ch <- msg // Передаём сообщения в канал
delete(responseChannels, responseID) // Удаляем канала из словаря
}
mu.Unlock() // Разблокировка мьютекса
}
}
}()
// Инициализация роутера Gin
router := gin.Default()
router.GET("/ping", func(c *gin.Context) {
// (определение и отправка сообщения в Kafka)
requestID := fmt.Sprintf("%d", time.Now().UnixNano())
message := MyMessage{
ID: requestID,
Name: "Ping",
Value: "Pong",
}
// Преобразование сообщения в JSON что бы потом отправить через kafka
bytes, err := json.Marshal(message)
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
msg := &sarama.ProducerMessage{
Topic: "ping",
Key: sarama.StringEncoder(requestID), // Использование ID запроса как ключа
Value: sarama.ByteEncoder(bytes), // Тело сообщения
}
// Отправка сообщения в Kafka
producer.SendMessage(msg)
responseCh := make(chan *sarama.ConsumerMessage)
mu.Lock()
responseChannels[requestID] = responseCh
mu.Unlock()
// (получение ответа из Kafka и отправка клиенту)
select {
case responseMsg := <-responseCh:
c.JSON(200, gin.H{"message": string(responseMsg.Value)})
case <-time.After(10 * time.Second):
c.JSON(500, gin.H{"error": "timeout waiting for response"})
}
})
router.Run(":8080")
}
Файл second-microservice/main.go
:
package main
import (
"encoding/json"
"log"
"github.com/IBM/sarama"
)
// Наша структура для сообщения
type MyMessage struct {
ID string `json:"id"`
Name string `json:"name"`
Value string `json:"value"`
}
func main() {
// Создание консьюмера Kafka
consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil)
if err != nil {
panic(err)
}
defer consumer.Close()
// Подписка на партицию "ping" в Kafka
partConsumer, err := consumer.ConsumePartition("ping", 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer partConsumer.Close()
for {
select {
// (обработка входящего сообщения и отправка ответа в Kafka)
case msg := <-partConsumer.Messages():
// Десериализация входящего сообщения из JSON
var receivedMessage MyMessage
err := json.Unmarshal(msg.Value, &receivedMessage)
if err != nil {
log.Printf("Error unmarshaling JSON: %v\n", err)
continue
}
log.Printf("Received message: %+v\n", receivedMessage)
// Создание продюсера Kafka
producer, err := sarama.NewSyncProducer([]string{"kafka:9092"}, nil)
if err != nil {
panic(err)
}
defer producer.Close()
// Формируем ответное сообщение
resp := &sarama.ProducerMessage{
Topic: "pong",
Key: sarama.StringEncoder(receivedMessage.ID),
Value: sarama.StringEncoder(receivedMessage.Name + " " + receivedMessage.Value + " ( " + receivedMessage.ID + " ) "),
}
// Отпровляем ответ в gateway
producer.SendMessage(resp)
}
}
}
Файл docker-compose.yml
:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
- ZOOKEEPER_CLIENT_PORT=2181
networks:
- kafka-network
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
networks:
- kafka-network
api-gateway:
build:
context: ./api-gateway
dockerfile: Dockerfile
depends_on:
- kafka
networks:
- kafka-network
ports:
- "8080:8080"
second-microservice:
build:
context: ./second-microservice
dockerfile: Dockerfile
depends_on:
- kafka
networks:
- kafka-network
networks:
kafka-network:
driver: bridge
Примечание: В 3-й версии Docker Compose была удалена функция "depends_on" для контроля за зависимостями сервисами, которая была доступна в 2-й версии. Эта функция позволяла автоматически ожидать, пока зависимые сервисы будут полностью запущены, прежде чем запускать зависящий сервис.
Из-за этого изменения, мы используем скрипт wait-for-it в каждом контейнере ( Dockerfile типовой, его можно посмотреть в репозитории ), чтобы гарантировать, что все необходимые сервисы доступны и полностью функционируют, прежде чем начнется выполнение основной программы.
Скрипт wait-for-it обеспечивает простой и эффективный способ ожидания доступности TCP-хоста и порта.
Это позволяет нам контролировать порядок запуска сервисов и гарантировать, что все необходимые зависимости удовлетворены, прежде чем сервис начнет свою работу.