Going NATS
- вторник, 21 января 2025 г. в 00:00:09
Все инновации так или иначе будут связаны с глобальными, распределенными системами в которых ключевую роль будут играть периферийные вычисления.
Многие из инструментов, которые мы успешно используем для решения текущих задач, могут быть неадекватными для новых вызовов.
В качестве примера таких инструментов, можно привести Ansible и SSH. Ansible, как известно использует SSH в качестве основного транспорта для выполнения команд на удаленных машинах. Это своего рода RPC для системных администраторов, который для управления конфигурациями использует YAML.
Ansible отлично подходит для управления парком из нескольких сотен серверов в дата-центре, но любой, кто пытался использовать его для управления тысячами машин, обязательно сталкивался с трудностями масштабирования.
В мире где работают десятки тысяч микросервисов, периферийных устройств и функций, возможности Ansible неадекватны и это стало одной из причин появления таких проектов как Kubernetes.
Но для Edge AI и Agentic AI нужны новые подходы.
NATS это система для обмена сообщений, которая, на мой взгляд отлично подходит для решения такого рода задач.
Например NATS может стать основой для централизованной системы управления серверами и периферийными устройствами.
Для тех кто знаком не нужно будет объяснять принципы работы pub/sub. Архитектурно NATS похож на Kafka и RabbitMQ. Основное отличие производительность и простота в использовании, которая делает NATS идеальным выбором именно для периферийных вычислений.
Ниже приведен очень простой пример RPC для LLM на удаленной машине.
Для установки NATS:
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@v2.10.20 | sh
Запускаем сервер NATS:
./nats-server -m 8222 &
Клиентской части добавлен простой UI который запускается на http://localhost:8080:
Subscriber:
package main
import (
"fmt"
"os/exec"
"runtime"
"github.com/nats-io/nats.go"
)
func executeCommand(command string) (string, error) {
var cmd *exec.Cmd
if runtime.GOOS == "windows" {
cmd = exec.Command("cmd", "/C", command)
} else {
cmd = exec.Command("sh", "-c", command)
}
output, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("error executing command: %v, output: %s", err, string(output))
}
return string(output), nil
}
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
fmt.Println("Error connecting to NATS:", err)
return
}
defer nc.Close()
_, err = nc.Subscribe("commands", func(msg *nats.Msg) {
command := string(msg.Data)
fmt.Printf("Received command: %s\n", command)
output, err := executeCommand(command)
if err != nil {
fmt.Println("Error:", err)
nc.Publish(msg.Reply, []byte(fmt.Sprintf("Error: %v", err)))
return
}
nc.Publish(msg.Reply, []byte(output))
fmt.Printf("Command executed successfully: %s\n", output)
})
if err != nil {
fmt.Println("Error subscribing to subject:", err)
return
}
fmt.Println("Remote host is listening for commands...")
select {}
}
и
Publisher:
package main
import (
"fmt"
"html/template"
"log"
"net/http"
"time"
"github.com/nats-io/nats.go"
)
const natsServer = "nats://localhost:4222"
const htmlTemplate = `
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Command Publisher</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0-alpha1/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
body {
background-color: #f8f9fa;
padding: 20px;
}
.container {
max-width: 800px;
margin: 0 auto;
}
.response-box {
background-color: #ffffff;
border: 1px solid #dee2e6;
border-radius: 5px;
padding: 15px;
margin-top: 20px;
}
</style>
</head>
<body>
<div class="container">
<h1 class="text-center my-4">Command Publisher</h1>
<form method="POST" action="/publish" class="mb-4">
<div class="mb-3">
<label for="command" class="form-label">Enter Command:</label>
<input type="text" id="command" name="command" class="form-control" placeholder="e.g., ls -l" required>
</div>
<button type="submit" class="btn btn-primary">Execute</button>
</form>
{{if .Response}}
<div class="response-box">
<h2>Response:</h2>
<pre class="bg-light p-3 border rounded">{{.Response}}</pre>
</div>
{{end}}
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0-alpha1/dist/js/bootstrap.bundle.min.js"></script>
</body>
</html>
`
type PageData struct {
Response string
}
func main() {
nc, err := nats.Connect(natsServer)
if err != nil {
log.Fatalf("Error connecting to NATS: %v", err)
}
defer nc.Close()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
tmpl := template.Must(template.New("index").Parse(htmlTemplate))
data := PageData{Response: ""}
tmpl.Execute(w, data)
})
http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
http.Error(w, "Error parsing form", http.StatusBadRequest)
return
}
command := r.FormValue("command")
if command == "" {
http.Error(w, "Command cannot be empty", http.StatusBadRequest)
return
}
msg, err := nc.Request("commands", []byte(command), 5*time.Second)
if err != nil {
http.Error(w, fmt.Sprintf("Error sending command: %v", err), http.StatusInternalServerError)
return
}
tmpl := template.Must(template.New("index").Parse(htmlTemplate))
data := PageData{Response: string(msg.Data)}
tmpl.Execute(w, data)
})
fmt.Println("Web UI is running on http://localhost:8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
Агент AI – это просто программа которая может выполнятся на каком-то устройстве. Несложно представить, что он может использовать инструкции пользователя для управления переферийным устройством: камера, манипулятор, он может самостоятельно отслеживать метрики. Но в данном случае это просто LLM которому можно так же как и в обычном Ansible плейбуке передать команды в YAML файле.
Только для этого нужно будет добавить YAML парсер в клиентскую часть.
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/nats-io/nats.go"
"gopkg.in/yaml.v3"
)
const natsServer = "nats://localhost:4222"
type Task struct {
Name string `yaml:"name"`
Command string `yaml:"command"`
}
type Manifest struct {
Tasks []Task `yaml:"tasks"`
}
func main() {
nc, err := nats.Connect(natsServer)
if err != nil {
log.Fatalf("Error connecting to NATS: %v", err)
}
defer nc.Close()
yamlFile, err := os.ReadFile("manifest.yaml")
if err != nil {
log.Fatalf("Error reading YAML file: %v", err)
}
var manifest Manifest
err = yaml.Unmarshal(yamlFile, &manifest)
if err != nil {
log.Fatalf("Error parsing YAML file: %v", err)
}
for _, task := range manifest.Tasks {
fmt.Printf("Executing task: %s\n", task.Name)
fmt.Printf("Command: %s\n", task.Command)
msg, err := nc.Request("commands", []byte(task.Command), 5*time.Second)
if err != nil {
if err == nats.ErrTimeout {
fmt.Println("Error: Request timed out. The subscriber did not respond in time.")
} else {
fmt.Println("Error sending command:", err)
}
continue
}
fmt.Printf("Response from remote host:\n%s\n", string(msg.Data))
fmt.Println("----------------------------------------")
}
}
«Будущее уже наступило, просто оно еще неравномерно распределено». Для того, чтобы соответствовать масштабу задач, нам возможно понадобятся новые инструменты.