javascript

Балансировка нагрузки LLM через Nginx

  • вторник, 1 апреля 2025 г. в 00:00:05
https://habr.com/ru/articles/896222/

Исходный код, разобранный в статье, опубликован в этом репозитории

В интернете существует множество примеров, которые позволяют подключить ChatGPT 3.5 без инструментов к телеграм боту. Однако, когда речь заходит о большом количестве пользователей, не существуют примеров распределения нагрузки по нескольким процессам: все туториалы в интернете запускают монолит с одной репликой

https://github.com/telegraf/telegraf/issues/423

Так же, на практике работы с NodeJS, я сталкивался с проблемой, когда много Promise в статусе pending замедляют работу приложения нагрузкой на сборщик мусора. Добавив сторонние инструменты (дав ChatGPT вызывать внешнии функции с запросами к базе данных) потребуется думать как минимум о создании реплик монолита, чтобы раздувания очереди ожидания данных из базы

Архитектура приложения

Для балансировки нагрузки самым очевидным инструментом является Nginx upstream - инструмент получает WebSocket соединение нового клиента на порт 80 и в порядке очереди проксирует его на 8081, 8082, ..., 8085 зависимо от количества реплик. Если клиент не проявляет активность 15 минут, соединение прирывается, если будет новое сообщение, оно создастся заного.

https://nginx.org/en/docs/http/ngx_http_upstream_module.html

Реплики сохраняют историю переписки в Redis, что позволяет воссоздать контекст не смотря на то, что сообщения обрабатывает новый процесс. Создавать реплики будет PM2 - наиболее нативный способ для приложений стека NodeJS

https://pm2.io/docs/plus/overview/

Так же, используя PM2, за 40$ в месяц можно купить готовое уведомление о инцидентах по Slack / Email, мониторинг сервера по стандартным метрикам таким как CPUs: cores, hardware threads, virtual threads, Memory: capacity, Network interfaces, Storage devices: I/O, capacity, время отклика и тд. Это экономит деньги на Dev Ops до самоокупаемости проекта.

Файлы конфигурации

Чтобы не специализировать Linux на машине разработчика под один проект, обернем Nginx в Docker. Для этого, напишем docker-compose.yaml

version: '3.8'

services:
  nginx:
    image: nginx:1.27.4
    ports:
      - "80:80"
    extra_hosts:
      - "host.docker.internal:host-gateway"
    volumes:
      - ./config/nginx.conf:/etc/nginx/nginx.conf:ro
      - ./logs/nginx:/var/log/nginx

И создадим сопутствующий ./config/nginx.config с перечислением реплик

user nginx;
worker_processes auto;

error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;

events {
    worker_connections 1024;
    use epoll;
    multi_accept on;
}

http {
    include /etc/nginx/mime.types;
    default_type application/octet-stream;

    log_format main '$remote_addr - $remote_user [$time_local] "$request" '
                    '$status $body_bytes_sent "$http_referer" '
                    '"$http_user_agent" "$http_x_forwarded_for"';

    access_log /var/log/nginx/access.log main;

    upstream local_websocket_servers {
        server host.docker.internal:8081;  # Using host.docker.internal from hosts shared from host machine
        server host.docker.internal:8082;
        server host.docker.internal:8083;
        server host.docker.internal:8084;
        server host.docker.internal:8085;
        least_conn;
    }

    map $http_upgrade $connection_upgrade {
        default upgrade;
        ''      close;
    }

    server {
        listen 80;
        server_name localhost;

        location / {
            proxy_pass http://local_websocket_servers$is_args$args;
            
            # WebSocket-specific headers
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection $connection_upgrade;
            
            # Preserve original headers and connection details
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;

            # Close upstream if client disconnects
            proxy_ignore_client_abort on;
            
            # Long-lived connection settings
            proxy_read_timeout 86400s;
            proxy_send_timeout 86400s;
            
            # Buffer and performance settings
            proxy_buffer_size 128k;
            proxy_buffers 4 256k;
            proxy_busy_buffers_size 256k;
        }
    }
}

Для запуска реплик через pm2 создадим pm2.config.cjs. По ссылке лежит package.json с скриптами для запуска проекта через npm start

const path = require("path");
const os = require("os");
const dotenv = require('dotenv');

const readConfig = () => dotenv.parse("./.env");

const getPath = (unixPath) => {
  return path.resolve(unixPath.replace('~', os.homedir()));
};

const createBun = (index) => ({
  name: `bun-ws-${index}`,
  script: "./src/server.ts",
  interpreter: getPath("~/.bun/bin/bun"),
  args: ["--server", `--port=808${index}`],
  out_file: `./logs/pm2/bun-ws-${index}-out.log`,
  error_file: `./logs/pm2/bun-ws-${index}-error.log`,
  log_date_format: "YYYY-MM-DD HH:mm:ss",
  merge_logs: true,
  env: readConfig(),
});

module.exports = {
  apps: [
    /*
    {
      name: "bun-ws-1",
      script: "./src/server.ts",
      interpreter: getPath("~/.bun/bin/bun"),
      args: ["--server", "--port=8081"],
      out_file: "./logs/pm2/bun-ws-1-out.log",
      error_file: "./logs/pm2/bun-ws-1-error.log",
      log_date_format: "YYYY-MM-DD HH:mm:ss",
      merge_logs: true,
    },
    */
    createBun(1),
    createBun(2),
    createBun(3),
    createBun(4),
    createBun(5),
  ]
}

Как видно, номер порта для оркестрации мы передаем через аргументы командной строки. Наиболее удобно, так как файл .env остается статичным без изменения в рантайме. Для запуска проекта мы испольуем Bun - ускоренный аналог NodeJS по скорости сопоставимый с Golang

Рой агентов

Агент - аналог сцены в telegraf, модель LLM, используящая изолированный system prompt. Текущий агент в Swarm может быть изменен через вызов функции changeToAgent - аналог навигации по сценам в телеграм боте после клика по кнопке

import { Adapter, addAgent, addCompletion, addSwarm } from "agent-swarm-kit";
import { OpenAI } from "openai";

export const OPENAI_COMPLETION = addCompletion({
  completionName: "openai_completion",
  getCompletion: Adapter.fromOpenAI(new OpenAI({ apiKey: process.env.OPENAI_API_KEY }))
});

export const TEST_AGENT = addAgent({
  docDescription: "This agent operates within the nginx-balancer-chat project as a test agent, utilizing the OpenaiCompletion to inform users about the actual server port of one of 5 chat instances running on different ports and upstreamed by Nginx to port 80, extracting the port details from the chat history’s system message.",
  agentName: "test_agent",
  completion: OPENAI_COMPLETION,
  prompt: `You are a test agent for Nginx Upstream. Tell user the server port from the chat history (system message)`,
  dependsOn: [],
});

export const TEST_SWARM = addSwarm({
  docDescription: "This swarm serves as the core structure for the nginx-balancer-chat project, managing a single TestAgent as both the sole member and default agent to handle user interactions, leveraging the CohereCompletion to report the specific port of one of 5 upstreamed chat instances balanced by Nginx to port 80.",
  swarmName: "test_swarm",
  agentList: [TEST_AGENT],
  defaultAgent: TEST_AGENT,
});

Код этого примера запрограммирован так, чтобы LLM модель назвала, с какого порта был проксирован запрос WebSocket. Вместо OPENAI_COMPLETION для каждого агента по отдельности можно использовать LMStudio, Ollama, Cohere, более подробнее в репо

import { Chat, getAgentName, Schema, History } from "agent-swarm-kit";
import type { ServerWebSocket } from "bun";
import { parseArgs } from "util";
import { TEST_SWARM } from "./lib/swarm";

declare function parseInt(value: unknown): number;

type WebSocketData = {
  clientId: string;
};

const { values } = parseArgs({
  args: process.argv,
  options: {
    server: {
      type: "boolean",
    },
    port: {
      type: "string",
    },
  },
  strict: true,
  allowPositionals: true,
});


History.useHistoryCallbacks({
  getSystemPrompt: () => [
    `The server port is ${SERVER_PORT}. Tell him that port ASAP`
  ]
});

const SERVER_PORT = parseInt(values.port);

if (isNaN(SERVER_PORT)) {
  throw new Error(`Server port is not a number: ${values.port}`);
}

if (values.server) {
  Bun.serve({
    fetch(req, server) {
      const clientId = new URL(req.url).searchParams.get("clientId")!;
      if (!clientId) {
        return new Response("Invalid clientId", { status: 500 });
      }
      console.log(`Connected clientId=${clientId} port=${SERVER_PORT}`);
      server.upgrade<WebSocketData>(req, {
        data: {
          clientId,
        },
      });
    },
    websocket: {
      async open(ws: ServerWebSocket<WebSocketData>) {
        await Chat.beginChat(ws.data.clientId, TEST_SWARM);
        await Schema.writeSessionMemory(ws.data.clientId, { port: SERVER_PORT });
      },
      async message(ws: ServerWebSocket<WebSocketData>, message: string) {
        const answer = await Chat.sendMessage(ws.data.clientId, message, TEST_SWARM);
        ws.send(
          JSON.stringify({
            data: answer,
            agentName: await getAgentName(ws.data.clientId),
          })
        );
      },
      async close(ws: ServerWebSocket<WebSocketData>) {
        console.log(`Disconnected clientId=${ws.data.clientId} port=${SERVER_PORT}`);
        await Chat.dispose(ws.data.clientId, TEST_SWARM);
      },
    },
    port: SERVER_PORT,
  });
}

console.log(`Server listening http://localhost:${SERVER_PORT}`)

Если интересует разработка чатов с более чем одним агентом, посмотрите документация

Спасибо за внимание!