golang

Погружение в Web RTC или пишем SFU своими силами

  • четверг, 1 февраля 2024 г. в 00:00:14
https://habr.com/ru/articles/790348/

WebRTC — это браузерная технология, предназначенная для передачи потоковых данных между браузерами или приложениями с использованием технологии двухточечной передачи (point-to-point transmission).

Web RTC уже давно имеет поддержку в большинстве браузеров, поэтому обходить стороной технологию довольно глупо. Вот и я так подумал, поэтому в качестве пет-проекта решил написать sfu-сервер на golang.

О самом Web RTC

Здесь я кратко пробегусь по основам работы Web RTC, кому интересно будет залезть чуть поглубже оставляю ссылку здесь.

Для того, чтобы два пира смогли обеспечить себе RTCPeerConnection, используется протокол SDP (Session Description Protocol). Протокол имеет структуру "ключ-значение" и по сути является описанием отдельно взятого пира (название само за себя говорит).

Пример SessionDescription

v=0
o=- 0 0 IN IP4 127.0.0.1
s=-
c=IN IP4 127.0.0.1
t=0 0
m=audio 4000 RTP/AVP 111
a=rtpmap:111 OPUS/48000/2
m=video 4002 RTP/AVP 96
a=rtpmap:96 VP8/90000

После того как пир сформировал нам SessionDescription (далее просто SD), пир отправляет его другому пиру в виде оффера.

Здесь сразу возникает вопрос: "А куда, собственно, отправить?" И вопрос совершенно правильный, сам по себе пир не может знать ip-адрес другого пира. Тут на помощь нам приходят: turn & signal сервера.

Signal - это сервер, к которому подключаются пиры для обмена SD. Остановимся поподробнее на нем чуть позже.

Turn сервер решает другую проблему. Наши устройства часто выходят в интернет через wifi, а следовательно они находятся внутри локального NAT вашего роутера и публичного ip-адреса у них нет. Но для turn сервера это не проблема. С помощью своих "Magic Cookie" он все равно доберется до вашего устройства.

Так, SD мы сформировали, узнали наш IP, даже отправили наш SD в виде offer на signal сервер. Далее второй пир записывает себе отправленный нами SD, генерирует свой SD и отправляет его signal серверу в виде answer. Первый пир записывает полученный SD в виде answer к себе.

В дополнение к обмену информацией о медиа(обсуждённой выше в offer/answer и SDP), узлы должны обменяться информацией о сетевом соединении. Об этом известно как о ICE‑кандидате и деталях доступных методов, которыми узел может общаться (непосредственно или через TURN‑сервер).ICE‑кандидатами пиры также делятся через signal сервер.

И вот, после всех танцев с бубном, мы наконец можем установить наш RTCPeerConnection.

А зачем нам нужен SFU ?

Select Forwarding Unit приходит на помощь тогда, когда количество пиров в одной сессии достигает 7+. Рассмотрим пример:

Мы имеем 7 пиров, каждый из которых отправляет видео и аудио треки. В формате p2p соединения мы получим: 6+5+4+3+2+1=21 Peer соединение. А количество треков будет больше в 4 раза(от каждого такого соединения с обеих сторон идет видео и аудио дорожка).

Налицо абсолютно неэффективное использование ресурсов. Поэтому мы переходим от полносвязанной топологии к звездочной:

Теперь мы имеем 7 соединений PeerConnection, где для каждого пира мы получаем 2 исходящих и 12 входящих треков. Хоть Web RTC и был изначально предназначен для общения браузеров напрямую, именно в связке с SFU мы получаем возможность по настоящему прочувствовать мощь этой технологии.

Следует также отметить, что имеется еще один вариант организации соединения - MCU (Multipoint Control Unit). Но тогда в обязанности нашего сервера будет еще входить упаковка всех исходящих треков к каждому пиру в единый MediaStream. Однако со стороны пользователя будет невозможным взаимодействовать с этими потоками.

Приведу пример: Zoom и его возможность заглушать, перемещать плитки с видео пользователей. Именно благодаря тому, что каждая плитка - это отдельный MediaStream, с ней можно взаимодействовать. Если бы мы реализовали MCU, то мы получили бы не множество плиток с видео, а в единый блок видео потоков, который MCU заботливо нам предоставил. Таким образом, повышая нагрузку на сервер в разы, мы урезаем многие возможности для пользователя. Да, нагрузка на клиента тогда практически сойдет на нет, но покрывают ли эти преимущества появившиеся недостатки. Я думаю, что нет.

Приступаем

Теперь начинаем воплощать проект в жизнь. Для начала определимся со структурой нашего sfu.

Наш сервер будет состоять из двух частей: Signal & Coordinator. Первый будет обеспечивать обмен SD & ICE-кандидатами, второй будет контролировать входящие и исходящие потоки.

Peer

Peer будет являться элементарной структурой и будет представлять пользователя

type Peer struct {
	id         string
	connection *webrtc.PeerConnection
	streams    map[string]*webrtc.TrackRemote
	mutex      sync.RWMutex
	socket     *websocket.Conn
}

Здесь пока все просто, класс содержит в себе сокет, само соединение и исходящие от пользователя треки

Теперь опишем поведение нашего пира

type PeerInterface interface {
	SetSocket(ws_conn *websocket.Conn)
	AddRemoteTrack(track *webrtc.TrackRemote)
	RemoveRemoteTrack(track *webrtc.TrackRemote)
	SetPeerConnection(conn *webrtc.PeerConnection)
	ReactOnOffer(offer webrtc.SessionDescription)
	ReactOnAnswer(answer_str string)
}

Также пока все стандартно, несколько методов для установки значения полей Peer. Единственное, на чем следует остановиться: ReactOnAnswer& ReactOnOffer:

func (peer *Peer) ReactOnOffer(offer_str string) (webrtc.SessionDescription, error) {
	peer.mutex.Lock()
	defer peer.mutex.Unlock()

	offer := webrtc.SessionDescription{
		Type: webrtc.SDPTypeOffer,
		SDP:  offer_str,
	}
	err := peer.connection.SetRemoteDescription(offer)
	if err != nil {
		fmt.Println(err)
		return offer, err
	}
	fmt.Println("Remote Description was set for peer ", peer.id)
	answer, err := peer.connection.CreateAnswer(nil)
	_ = peer.connection.SetLocalDescription(answer)
	fmt.Println("Local Description was set for peer ", peer.id)
	if err != nil {
		return offer, err
	}
	fmt.Println("Answer was created in peer ", peer.id)
	return answer, nil

}

Когда от signal-сервера мы получаем Offer от друго пира, нам нужно сохранить себе пришедший SD, для нас это удаленный SD, поэтому SetRemoteDescription() поможет нам в нашей задаче. Далее, как уже было описано в теоретической части, нам нужно отправить Answer, чтобы другой пир тоже установил RemoteDescription, но перед этим мы также сохраняем себе собственный LocalDescription

func (peer *Peer) ReactOnAnswer(answer_str string) error {
	peer.mutex.Lock()
	defer peer.mutex.Unlock()
	answer := webrtc.SessionDescription{
		Type: webrtc.SDPTypeAnswer,
		SDP:  answer_str,
	}
	err := peer.connection.SetRemoteDescription(answer)
	if err != nil {
		fmt.Println(err)
		return err
	}
	return nil
}

Ситуация идентичная, просто теперь мы на обратной стороне баррикад. Мы отправили Offer, получили Answer, и установили RemoteDescription.

Room

Поднимаемся чуть выше. Структура Room будет являться одной сессией видео и/или аудио конференции.

type Room struct {
	id     string
	mutex  sync.RWMutex
	peers  map[string]*Peer
	tracks map[string]*webrtc.TrackLocalStaticRTP
}

Она будет содержать в себе все пиры в данной комнате, а также будет сохранять исходящие треки к себе в поле tracks и передавать их остальным пирам.

Опишем поведение:

type RoomInterface interface {
	JoinRoom(id string)
	AddPeer(peer *Peer)
	RemovePeer(peer_id string)
	AddTrack(track *webrtc.TrackRemote)
	RemoveTrack(track *webrtc.TrackRemote)
	SendAnswer(message webrtc.SessionDescription, peer_id string)
	SendOffer(message webrtc.SessionDescription, peer_id string)
	SendICE(message *webrtc.ICECandidate, peer_id string)
	Signal()
}

Также несколько стандартных функций, пробежимся по ним:

  1. AddPeer(peer *Peer) - добавляет пользователя в поле room.peers

  2. RemovePeer(peer_id string) - удаляет пользователя из комнаты, если такой имеется

  3. AddTrack(track *webrtc.TrackRemote) - добавляет новый трек и добавлет его к себе в room.tracks

  4. RemoveTrack(track *webrtc.TrackRemote) - соответственно удаляет трек

  5. SendAnswer, SendOffer, SendICE - отправляют Offer, Answer & Ice-кандидата соответственно всем пользователям в комнате, кроме пира со id=peer_id.

Теперь перейдем к самому интересному: room.Signal()

func (room *Room) Signal() {
	room.mutex.Lock()
	defer room.mutex.Unlock()
	attemptSync := func() (again bool) {
		for _, peer := range room.peers {
          
			// 1) Check if peer is already closed
			if peer.connection.ConnectionState() == webrtc.PeerConnectionStateClosed {
				fmt.Println("Peer with peer_id", peer.id, "was disconnected")
				room.RemovePeer(peer.id)
				return true
			}
            // 2) 
			existingSenders := map[string]bool{}
			for _, sender := range peer.connection.GetSenders() {
				if sender.Track() == nil {
					continue
				}
                // 3)
				existingSenders[sender.Track().ID()] = true
				// If we have a RTPSender that doesn't map to a existing track remove and signal
				if _, ok := room.tracks[sender.Track().ID()]; !ok {
					if err := peer.connection.RemoveTrack(sender); err != nil {
						fmt.Println("Track", sender.Track().ID(), "was removed")
						return true
					}
				}
			}

			// 4) Don't receive videos we are sending, make sure we don't have loopback
			for _, receiver := range peer.connection.GetReceivers() {
				if receiver.Track() == nil {
					continue
				}

				existingSenders[receiver.Track().ID()] = true
			}
			// 5) Add all track we aren't sending yet to the PeerConnection
			for trackID := range room.tracks {
				if _, ok := existingSenders[trackID]; !ok {
					if _, err := peer.connection.AddTrack(room.tracks[trackID]); err == nil {
						fmt.Println("New track are sending for peer", peer.id)
						return true
					} else {
						fmt.Println(err)
					}
				}
			}
            // 6)
			if peer.connection.PendingLocalDescription() != nil {
				fmt.Println(peer.connection.PendingLocalDescription())
				offer, err := peer.connection.CreateOffer(&webrtc.OfferOptions{
					OfferAnswerOptions: webrtc.OfferAnswerOptions{},
					ICERestart:         true,
				})
				if err != nil {
					fmt.Println("Error in CreateOffer: ", err)
					return true
				}
				if err = peer.connection.SetLocalDescription(offer); err != nil {
					fmt.Println("Offer: ", offer)
					fmt.Println("Cannot set LocalDescription: ", err)
					return false
				}

				offerString, err := json.Marshal(offer)
				if err != nil {
					fmt.Println("Marshalling failed: ", err)
					return true
				}

				if err = peer.socket.WriteJSON(&WsMessage{
					Event: "offer",
					Data:  string(offerString),
				}); err != nil {
					fmt.Println("Cannot write message in WsMessage: ", err)
					return true
				}
			}

		}
		return
	}
    // 7)
	for syncAttempt := 0; ; syncAttempt++ {
		if syncAttempt == 25 {
			// Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
			go func() {
				time.Sleep(time.Second * 3)
				room.Signal()
			}()
			return
		}

		if !attemptSync() {
			fmt.Println("Signalling finished")
			break
		}
	}
}

В этой функции основная часть логики Room. Остановимся поподробнее:

  1. Проверяем наши текущие пиры. Если он закрыт, удаляем его.

  2. Получаем все Senders() от этого пира. Sender - поток, который приходит к нам от данного пира.

  3. Проверяем есть ли у нас "зависшие" треки, то есть треки, которые исходили от пользователя, уже покинувшего комнату.

  4. В переменную existingSenders добавляем треки, исходящие от пользовтеля. Делается это для того, чтобы на следующем этапе не добавить исходящий от этого пользователя трек.

  5. Теперь в переменной existingSenders будут находится все треки, которые пользователь уже получает или отправляет. Нам остается только добавить те, которые в этом пире еще не числятся.

  6. Здесь мы проверяем PendingLocalDescription. Здесь следует пояснить. В каждом PeerConnection у нас есть два статуса LocalDescription: Current & Pending. Первое обновляется при отправке Offer и получения Answer. Второе обновляется при изменении параметров соединения. Но вступит оно в силу (т е Current будет равно Pending )только при повторном обмене Offer/Answer. Здесь идет проверка на nil значение. Если PendingLocalDescription == nil, значит никаких изменений не происходило и обновлять PeerConnection не требуется, иначе - отправляем Offer данному пиру. В контексте Web RTC это называется renogotiation, почитать про это можно здесь.

  7. При обновлении состояний клиентов мы можем натолкнуться на различные ошибки. Пример: добавление новых треков, это происходит не мгновенно и может вызвать конфликтные ситуации, поэтому реализован механизм перезапуска room.Signal() с задержкой в 3 секунды

Coordinator

Теперь мы перешли к главной структуре, которая контролирует поведение всех комнат.

type Coordinator struct {
	sessioins map[string]*Room
}

Как обычно, опишем ее поведение изначально интерфейсом

type CoordinatorInterface interface {
	CreateRoom(id string)
	RemoveRoom(id string)
	AddUserToRoom(self_id string, room_id string, socket *websocket.Conn)
	RemoveUserFromRoom(self_id string, room_id string, socket *websocket.Conn)
	ShowSessions()
	ObtainEvent(message WsMessage, socket *websocket.Conn)
}

Пройдемся по методам:

  1. CreateRoom() & RemoveRoom() - создает и удаляет комнату соответственно

  2. ShowSessions() - выдает все активные на данный момент комнаты

  3. RemoveUserFromRoom() - удаляет пользователя из комнаты

  4. AddUserToRoom() - добавляет пользователя и настраивает PeerConnection

  5. ObtainEvent() - метод-свзяка с нашим Signal сервером. При инициализации сервера мы будем создавать структуру Coordinator и все необходимые события будем обрабатывать этим методом

Посмотрим код AddUserToRoom():

func (coordinator *Coordinator) AddUserToRoom(self_id string, room_id string, socket *websocket.Conn) {
    // 1)
	if _, ok := coordinator.sessioins[room_id]; !ok {
		fmt.Println("New Room was created: ", room_id)
		coordinator.CreateRoom(room_id)
	}
	if room, ok := coordinator.sessioins[room_id]; ok {
		// 2) Add Peer to Room
		room.AddPeer(newPeer(self_id))
		fmt.Println("Peer ", self_id, "was added to room ", room_id)
		if peer, ok := room.peers[self_id]; ok {
			// 3) Set socket connection to Peer
			peer.SetSocket(socket)

			// 4) Create Peer Connection
			conn, err := webrtc.NewPeerConnection(webrtc.Configuration{})
			if err != nil {
				fmt.Println("Failed to establish peer connection")
			}

			peer.SetPeerConnection(conn)
			fmt.Println("Peer connection was established")
			// 5) Accept one audio and one video track incoming
			for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
				if _, err := peer.connection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
					Direction: webrtc.RTPTransceiverDirectionRecvonly,
				}); err != nil {
					log.Print(err)
					return
				}
			}

			// 6) If PeerConnection is closed remove it from global list
			peer.connection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
				switch p {
				case webrtc.PeerConnectionStateFailed:
					if err := peer.connection.Close(); err != nil {
						log.Print(err)
					}
				case webrtc.PeerConnectionStateClosed:
					room.Signal()
				default:
				}
			})

			// 7) When peer connection is getting the ICE -> send ICE to client
			peer.connection.OnICECandidate(func(i *webrtc.ICECandidate) {
				if i == nil {
					fmt.Println("ICEGatheringState: connected")
					return
				}
				fmt.Println("Ice: ", i)
				room.SendICE(i, self_id)
			})
            // 8) 
			peer.connection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
				fmt.Println("Track added from peer: ", self_id)
				defer room.Signal()
				// Create a track to fan out our incoming video to all peers
				trackLocal := room.AddTrack(t)
				defer room.RemoveTrack(trackLocal)
				defer fmt.Println("Track", trackLocal, "was removed")
				buf := make([]byte, 1500)
				for {
					i, _, err := t.Read(buf)
					if err != nil {
						return
					}

					if _, err = trackLocal.Write(buf[:i]); err != nil {
						return
					}
				}
			})
		}

	}
}

Пройдемся по пунктам:

  1. Проверяю, существует ли комната, к которой пользователь хочет подключиться. Если такой нет - создаем.

  2. Добавляем Peer в комнату.

  3. Устанавливаем сокет и RTCPeerConnection в Peer

  4. Принимаем треки, которые исходят от нового пользователя.

  5. Добавляем сигнализирование комнате, если в какой-то момент пользователь отключился

  6. Здесь мы реализовали логику отправки друг другу ICE-кандидатов. Если мы получили nil в качестве нашего ICE-кандидата, значит пересылка ICE-кандидатами завершена. Иначе отправляем новоиспеченного ICE-кандидата другому пиру.

  7. Добавляем трек и сигнализируем комнате при появлении нового трека.

Trickle ICE

В Pion можно найти реализацию так называемого Trickle ICE. Суть этого подхода в том, что клиент не дожидается окончания Offer/Answer. А параллельно отправляет/получает ICE-кандидатов так скоро, как только это возможно. Это мы и реазилозвали в нашем Peer.OnICECandidate.

Теперь очередь за ObtainEvent:

func (coordinator *Coordinator) ObtainEvent(message WsMessage, socket *websocket.Conn) {
	wsMessage := message
	switch wsMessage.Event {
	case "joinRoom":
		go func() {
			m, ok := message.Data.(map[string]any)
			if ok {
				self_id := m["self_id"].(string)
				room_id := m["room_id"].(string)
				coordinator.AddUserToRoom(self_id, room_id, socket)
			}
		}()
	case "leaveRoom":
		go func() {
			m, ok := message.Data.(map[string]any)
			if ok {
				self_id := m["self_id"].(string)
				room_id := m["room_id"].(string)
				coordinator.RemoveUserFromRoom(self_id, room_id)
			}
		}()
	case "offer":
		go func() {
			m, ok := message.Data.(map[string]any)
			if ok {
				self_id, _ := m["self_id"].(string)
				room_id, _ := m["room_id"].(string)
				offer2 := m["offer"].(map[string]any)
				if room, ok := coordinator.sessioins[room_id]; ok {
					if peer, ok := room.peers[self_id]; ok {
						answer, err2 := peer.ReactOnOffer(offer2["sdp"].(string))
						if err2 != nil {
							fmt.Println(err2)
							return
						}
						room.SendAnswer(answer, self_id)
					}
				}
			}
		}()
	case "answer":
		go func() {
			m, ok := message.Data.(map[string]any)
			if ok {
				self_id, _ := m["self_id"].(string)
				room_id, _ := m["room_id"].(string)
				offer2 := m["answer"].(map[string]any)
				if room, ok := coordinator.sessioins[room_id]; ok {
					if peer, ok := room.peers[self_id]; ok {
						err := peer.ReactOnAnswer(offer2["sdp"].(string))
						if err != nil {
							fmt.Println(err)
							return
						}
					}

				}
			}
		}()
	case "ice-candidate":
		go func() {
			m, ok := message.Data.(map[string]any)
			if ok {
				self_id, _ := m["self_id"].(string)
				room_id, _ := m["room_id"].(string)
				candidate := m["candidate"].(map[string]any)
				i_candidate := candidate["candidate"].(string)
				sdp_mid := candidate["sdpMid"].(string)
				sdp_m_line_index := uint16(candidate["sdpMLineIndex"].(float64))
				var username_fragment string
				if candidate["usernameFragment"] != nil {
					username_fragment = candidate["usernameFragment"].(string)
				} else {
					username_fragment = ""
				}
				init := webrtc.ICECandidateInit{
					Candidate:        i_candidate,
					SDPMid:           &sdp_mid,
					SDPMLineIndex:    &sdp_m_line_index,
					UsernameFragment: &username_fragment,
				}
				if room, ok := coordinator.sessioins[room_id]; ok {
					if peer, ok := room.peers[self_id]; ok {
						if err := peer.connection.AddICECandidate(init); err != nil {
							log.Println(err)
							return
						}
						fmt.Println("ICE-CANDIDATE added for peer", peer.id)
						fmt.Println(peer.connection.ICEConnectionState())
						fmt.Println(peer.connection.ICEGatheringState())
					}
				}
			} else {
				fmt.Println(m)
				fmt.Println("nach")
			}
		}()
	default:
		fmt.Println("DEFAULT")
		fmt.Println(wsMessage)

	}

	return
}

Здесь мы валидируем значение поля Event. И вызываем соответствующий метод. На вход мы получаем Сообщение WsMessage, которое является общим методом для различных сообщений.

Signal

Теперь завершающий элемент нашего сервера, сокет, который будет принимать сообщения и отправлять их в Coordinator.ObtainEvent()

// websockets listener
func (ws *WsServer) wsInit(w http.ResponseWriter, r *http.Request) {

	conn, err := upgrader.Upgrade(w, r, nil)

	defer conn.Close()

	fmt.Printf("Client connected")

	if err != nil {
		fmt.Printf(" with error %s", err)
		return
	}

	fmt.Println(" successfully")

	message := types.WsMessage{}

	for {
		messageType, bmessage, err := conn.ReadMessage()
		fmt.Println(bmessage)
		if err != nil {
			fmt.Println(err)
			return
		}
		if messageType == websocket.CloseMessage {
			break
		}

		err = json.Unmarshal(bmessage, &message)
		if err != nil {
			fmt.Println("DROP")
			fmt.Println(message.Data)
			fmt.Println(err)
			return
		}
		ws.coordinator.ObtainEvent(message, conn)
	}
}

Здесь мы приходящие сообщения приводим к WsMessage и отдаем нашему координатору

Заключение

По итогу у нас есть стартовая версия SFU. Как говорилось в Web RTC for Curious:

Building a simple SFU can be done in a weekend. Building a good SFU that can handle all types of clients is never ending. Tuning the Congestion Control, Error Correction and Performance is a never ending task.

Поэтому это лишь первая версия простого SFU сервера. Впереди еще много различных реализаций, которые смогут улучшить этот сервер и повысить качество видео/аудио конференций. Если эта статья вам понравится, я не задержу выпуск следующих.

Исходный код можно найти в этом репозитории.

Также хотелось бы услышать от вас пожелания и подсказки по развитию SFU. Так как реализовалось это с нуля как в знании WebRTC, так и Golang.

Источники

https://developer.mozilla.org/ru/docs/Web/API/WebRTC_API/Protocols

https://webrtcforthecurious.com