python

Группировка вебсокет соединений для асинхронного фреймворка Starlette

  • суббота, 13 июня 2020 г. в 00:26:34
https://habr.com/ru/post/506056/
  • Python


Доброго дня!

Меня зовут Соболев Андрей и сегодня мы с вами напишем решение для фреймворка Starlette, которое позволит группировать открытые вебсокет соединения.

Вступление


Starlette довольно молодой фреймворк, и какие-то «плюшки» для него приходится писать самостоятельно. В предыдущей статье я показал как можно реализовать JWT сессии и «Djangoподобную» структуру, сегодня мы рассмотрим как группировать вебсокет соединения.

Для чего нужна группировка вебсокет cоединений?


Допустим я и мой друг (назовем его UnnamedUser) решили пообщаться в чате.

Когда я захожу в комнату с ID=1, мой браузер устанавливает первое вебсокет соединение с сервером (для упрощения назовем его «канал 1»*).

Когда UnnamedUser заходит в комнату с ID=1, его браузер устанавливает второе вебсокет соединение с сервером (для упрощения назовем его «канал 2»*).

* далее по тексту вебсокет соединение будет называться каналом
Технически «канал 1» и «канал 2» это два разных объекта класса WebSocketEndpoint, поэтому в чате мы видим только свои сообщения, а не сообщения других участников (как ожидалось).

Чтобы решить эту проблему нам необходимо объединить наши каналы в группу (к примеру room_1) и делать массовые рассылки при наступлении какого либо события (к примеру кто-то написал в чат).

Где хранить группы?


Для хранения групп воспользуемся обычным словарем, который назовем CHANNEL_GROUPS и объявим глобально:

import time
import uuid
from simple_print.functions import sprint_f
from starlette.endpoints import WebSocketEndpoint

CHANNEL_GROUPS = {}

Унаследуемся от WebSocketEndpoint


Чтобы добавить для каждого канала связь с CHANNEL_GROUPS нам необходимо унаследоваться от базового класса WebSocketEndpoint.

Начнем с создания вспомогательного класса Channel:

class Channel:
    def __init__(self, websocket, expires, encoding):
        self.channel_uuid = str(uuid.uuid1()) # uid
        self.websocket = websocket
        self.expires = expires # срок жизни
        self.encoding = encoding # тип канала (json, text, bytes)
        self.created = time.time() # время создания 

    async def _send(self, payload): # приватный метод для отправки в группу
        websocket = self.websocket
        if self.encoding == "json":
            await websocket.send_json(payload)
        elif self.encoding == "text":
            await websocket.send_text(payload)
        elif self.encoding == "bytes":
            await websocket.send_bytes(payload)
        else:
            await websocket.send(payload)
        self.created = time.time()

    def _is_expired(self):
        return self.expires + int(self.created) < time.time()

    def __repr__(self):
        return f"{self.channel_uuid}"

В качестве uuid (уникального идентификатора) канала мы будем использовать встроенный в Python механизм идентификации UUID objects.

Создадим основной класс ChannelEndpoint от которого мы будем наследоваться в наших endpoints:

class ChannelEndpoint(WebSocketEndpoint):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.expires = 60 * 60 * 24  
        self.encoding = "json"
        self.groups = CHANNEL_GROUPS # добавляем связь с CHANNEL_GROUPS 

    async def on_connect(self, websocket, **kwargs):
        await super().on_connect(websocket, **kwargs)
        self.channel = Channel(websocket=websocket, expires=self.expires, encoding=self.encoding)

    async def on_disconnect(self, websocket, close_code):
        await super().on_disconnect(websocket, close_code)
        await self._remove(self.channel)

    async def _remove(self, channel):
        for group in self.groups:
            if channel in self.groups[group]:
                del self.groups[group][channel]

    async def _validate_name(self, name):
        if name.isidentifier():
            return True
        raise TypeError("Group names must be valid python identifier only alphanumerics and underscores are accepted")

    async def _clean_expired(self):
        for group in self.groups:
            for channel in self.groups.get(group, {}):
                if channel._is_expired():
                    del self.groups[group][channel]

    async def get_or_create(self, group): # получаем или добавляем группу
        assert await self._validate_name(group), "Invalid group name"
        self.groups.setdefault(group, {})
        self.groups[group][self.channel] = ""
        self.group = group

    async def group_send(self, payload): # отправляем сообщение в группу
        await self._clean_expired()
        for channel in self.groups.get(self.group, {}):
            await channel._send(payload)

При инициализации мы переопределяем базовый метод WebSocketEndpoint on_connect, добавляя к нему объект Channel.

Когда пользователь покидает канал, вызывается приватный метод _remove, для удаления объекта канала из CHANNEL_GROUPS.

В endpoints унаследованных от ChannelEndpoint появляются новые публичные методы:

  • get_or_create(self, group) — для получения или создания группы
  • group_send(self, payload) — для отправки сообщений в каналы, которые входят в данную группу.

Пример интеграции


routes = [
    Route("/chat/", endpoint=ChatView)
    Route("/chat/ws", endpoint=ChatChannel)
]

html = """
<!DOCTYPE html>
<html>
    <head>
        <title>ws</title>
    </head>
    <body>
        <h1>ChannelEndpoint</h1>
        <form action="" onsubmit="sendMessage(event)">
            <label>group_id: </label><input type="text" id="groupId" autocomplete="off" value="2"><br/>
            <label>username: </label><input type="text" id="username" autocomplete="off" value="test_user2"><br/>       
            <label>message: </label><input type="text" id="messageText" autocomplete="off" value="test_message2"><br/>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = new WebSocket("ws://localhost/chat/chat/ws");
            ws.onmessage = function(event) {
                console.log('Message received %s', event.data)
                var messages = document.getElementById('messages');
                var message = document.createElement('li');
                var data = JSON.parse(event.data);
                message.innerHTML = `<strong>${data.username} :</strong> ${data.message}`;
                messages.appendChild(message);
            };
            function sendMessage(event) {
                var username = document.getElementById("username");
                var group_id = document.getElementById("groupId");
                var input = document.getElementById("messageText");
                var data = {
                    "group_id": group_id.value, 
                    "username": username.value,
                    "message": input.value,
                };
                console.log('Message send %s', data)
                ws.send(JSON.stringify(data));
                event.preventDefault();
            }
        </script>
    </body>
</html>
"""

class ChatView(HTTPEndpoint):
    async def get(self, request):
        return HTMLResponse(html)

class ChatChannel(ChannelEndpoint): # наследуемся от ChannelEndpoint
    async def on_receive(self, websocket, data):
        group_id = data["group_id"]
        message = data["message"]
        username = data["username"]
        if message.strip():
            group = f"group_{group_id}" 
            await self.get_or_create(group)  # получаем группу (и все ее каналы) из словаря CHANNEL_GROUPS 
            payload = {
                "username": username,
                "message": message,
            } 
            await self.group_send(payload) # отправляем сообщение всем участникам группы

Добавим полезные функции


В процессе эксплуатации нам понадобится отправка сообщения в группы из любого места кода, «мониторинг» групп, а также «очистка».

Отправка сообщения в группу из любого места кода:

from channel_box import group_send
await group_send('my_chat_1', {"username": "New User", "message": "Hello world"})

«Мониторинг» групп:

from channel_box import groups_show
groups_show()

Очистка:

from channel_box import groups_flush
groups_flush()

Установка


Менеджер пакетов pip:

pip install channel-box

Исходный код решения

Пример работы:
backend.starlette-vue.site/chat/chat1
backend.starlette-vue.site/chat/chat2

Исходный код примера работы