https://habr.com/ru/post/506056/Доброго дня!
Меня зовут Соболев Андрей и сегодня мы с вами напишем решение для фреймворка Starlette, которое позволит группировать открытые вебсокет соединения.
Вступление
Starlette довольно молодой фреймворк, и какие-то «плюшки» для него приходится писать самостоятельно.
В предыдущей статье я показал как можно реализовать JWT сессии и «Djangoподобную» структуру, сегодня мы рассмотрим как группировать вебсокет соединения.
Для чего нужна группировка вебсокет cоединений?
Допустим я и мой друг (назовем его UnnamedUser) решили пообщаться в чате.
Когда я захожу в комнату с ID=1,
мой браузер устанавливает первое вебсокет соединение с сервером (для упрощения назовем его «канал 1»*).
Когда UnnamedUser заходит в комнату с ID=1,
его браузер устанавливает второе вебсокет соединение с сервером (для упрощения назовем его «канал 2»*).
* далее по тексту вебсокет соединение будет называться каналом
Технически «канал 1» и «канал 2» это два разных объекта класса WebSocketEndpoint, поэтому в чате мы видим только свои сообщения, а не сообщения других участников (как ожидалось).
Чтобы решить эту проблему нам необходимо объединить наши каналы в группу (к примеру
room_1) и делать массовые рассылки при наступлении какого либо события (к примеру кто-то написал в чат).
Где хранить группы?
Для хранения групп воспользуемся обычным словарем, который назовем
CHANNEL_GROUPS и объявим глобально:
import time
import uuid
from simple_print.functions import sprint_f
from starlette.endpoints import WebSocketEndpoint
CHANNEL_GROUPS = {}
Унаследуемся от WebSocketEndpoint
Чтобы добавить для каждого канала связь с
CHANNEL_GROUPS нам необходимо унаследоваться от базового класса
WebSocketEndpoint.
Начнем с создания вспомогательного класса
Channel:
class Channel:
def __init__(self, websocket, expires, encoding):
self.channel_uuid = str(uuid.uuid1()) # uid
self.websocket = websocket
self.expires = expires # срок жизни
self.encoding = encoding # тип канала (json, text, bytes)
self.created = time.time() # время создания
async def _send(self, payload): # приватный метод для отправки в группу
websocket = self.websocket
if self.encoding == "json":
await websocket.send_json(payload)
elif self.encoding == "text":
await websocket.send_text(payload)
elif self.encoding == "bytes":
await websocket.send_bytes(payload)
else:
await websocket.send(payload)
self.created = time.time()
def _is_expired(self):
return self.expires + int(self.created) < time.time()
def __repr__(self):
return f"{self.channel_uuid}"
В качестве uuid (уникального идентификатора) канала мы будем использовать встроенный в Python механизм идентификации
UUID objects.
Создадим основной класс
ChannelEndpoint от которого мы будем наследоваться в наших endpoints:
class ChannelEndpoint(WebSocketEndpoint):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.expires = 60 * 60 * 24
self.encoding = "json"
self.groups = CHANNEL_GROUPS # добавляем связь с CHANNEL_GROUPS
async def on_connect(self, websocket, **kwargs):
await super().on_connect(websocket, **kwargs)
self.channel = Channel(websocket=websocket, expires=self.expires, encoding=self.encoding)
async def on_disconnect(self, websocket, close_code):
await super().on_disconnect(websocket, close_code)
await self._remove(self.channel)
async def _remove(self, channel):
for group in self.groups:
if channel in self.groups[group]:
del self.groups[group][channel]
async def _validate_name(self, name):
if name.isidentifier():
return True
raise TypeError("Group names must be valid python identifier only alphanumerics and underscores are accepted")
async def _clean_expired(self):
for group in self.groups:
for channel in self.groups.get(group, {}):
if channel._is_expired():
del self.groups[group][channel]
async def get_or_create(self, group): # получаем или добавляем группу
assert await self._validate_name(group), "Invalid group name"
self.groups.setdefault(group, {})
self.groups[group][self.channel] = ""
self.group = group
async def group_send(self, payload): # отправляем сообщение в группу
await self._clean_expired()
for channel in self.groups.get(self.group, {}):
await channel._send(payload)
При инициализации мы переопределяем базовый метод WebSocketEndpoint
on_connect, добавляя к нему объект
Channel.
Когда пользователь покидает канал, вызывается приватный метод
_remove, для удаления объекта канала из
CHANNEL_GROUPS.
В endpoints унаследованных от
ChannelEndpoint появляются новые публичные методы:
- get_or_create(self, group) — для получения или создания группы
- group_send(self, payload) — для отправки сообщений в каналы, которые входят в данную группу.
Пример интеграции
routes = [
Route("/chat/", endpoint=ChatView)
Route("/chat/ws", endpoint=ChatChannel)
]
html = """
<!DOCTYPE html>
<html>
<head>
<title>ws</title>
</head>
<body>
<h1>ChannelEndpoint</h1>
<form action="" onsubmit="sendMessage(event)">
<label>group_id: </label><input type="text" id="groupId" autocomplete="off" value="2"><br/>
<label>username: </label><input type="text" id="username" autocomplete="off" value="test_user2"><br/>
<label>message: </label><input type="text" id="messageText" autocomplete="off" value="test_message2"><br/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost/chat/chat/ws");
ws.onmessage = function(event) {
console.log('Message received %s', event.data)
var messages = document.getElementById('messages');
var message = document.createElement('li');
var data = JSON.parse(event.data);
message.innerHTML = `<strong>${data.username} :</strong> ${data.message}`;
messages.appendChild(message);
};
function sendMessage(event) {
var username = document.getElementById("username");
var group_id = document.getElementById("groupId");
var input = document.getElementById("messageText");
var data = {
"group_id": group_id.value,
"username": username.value,
"message": input.value,
};
console.log('Message send %s', data)
ws.send(JSON.stringify(data));
event.preventDefault();
}
</script>
</body>
</html>
"""
class ChatView(HTTPEndpoint):
async def get(self, request):
return HTMLResponse(html)
class ChatChannel(ChannelEndpoint): # наследуемся от ChannelEndpoint
async def on_receive(self, websocket, data):
group_id = data["group_id"]
message = data["message"]
username = data["username"]
if message.strip():
group = f"group_{group_id}"
await self.get_or_create(group) # получаем группу (и все ее каналы) из словаря CHANNEL_GROUPS
payload = {
"username": username,
"message": message,
}
await self.group_send(payload) # отправляем сообщение всем участникам группы
Добавим полезные функции
В процессе эксплуатации нам понадобится отправка сообщения в группы из любого места кода, «мониторинг» групп, а также «очистка».
Отправка сообщения в группу из любого места кода:
from channel_box import group_send
await group_send('my_chat_1', {"username": "New User", "message": "Hello world"})
«Мониторинг» групп:
from channel_box import groups_show
groups_show()
Очистка:
from channel_box import groups_flush
groups_flush()
Установка
Менеджер пакетов pip:
pip install channel-box
Исходный код решения
Пример работы:
backend.starlette-vue.site/chat/chat1
backend.starlette-vue.site/chat/chat2
Исходный код примера работы