DEV Community

Masui Masanori
Masui Masanori

Posted on

[Go] Try Pion/WebRTC with SSE

Intro

This time, I will try Pion/WebRTC.

Because Pion already has good examples, I will create a sample based on SFU-WebSocket of example-webrtc-applications.

I will try changing these points.

  • Use SSE(Server-Sent Events) for signaling
  • Start connecting manually

I will add WebRTC functions into the last sample project.

And I also refer this post(especially the client-side).

Environments

  • Go ver.go1.18.2 windows/amd64
  • Node.js ver.18.1.0

Connecting with WebRTC SFU

When I tried WebRTC last time, the server-side application just worked for signaling.
After signaling, the clients were directly connected to each other.

Image description

This time they will only connect to the server-side application.
After connecting, the clients will send video tracks and audio tracks to the server-side application.
And The server-side application send other clients' tracks as remote tracks to the clients.

Image description

Samples

This time, I publish the sample project on GitHub.

Client-side

Because the process for connecting starts from the server-side application.

So the client-side just needs handling offer and ICE candidate events.
And I will create a RTCPeerConnection on start.

main.page.ts



...
function handleReceivedMessage(value: string) {
const message = JSON.parse(value);
if(!checkIsClientMessage(message)) {
return;
}
switch(message.event) {
case "text":
view.addReceivedText({ user: message.userName, message: message.data });
break;
case "offer":
webrtc.handleOffer(JSON.parse(message.data));
break;
case "candidate":
webrtc.handleCandidate(JSON.parse(message.data));
break;
}
}
function sendAnswer(data: RTCSessionDescriptionInit) {
if(!hasAnyTexts(userName)) {
return;
}
sse.sendMessage({userName, event: "answer", data: JSON.stringify(data)});
}
function sendCandidate(data: RTCIceCandidate) {
if(!hasAnyTexts(userName)) {
return;
}
sse.sendMessage({userName, event: "candidate", data: JSON.stringify(data)});
}
function checkIsClientMessage(value: any): value is ClientMessage {
// All messages from the server-side application are sent as "ClientMessage".
if(value == null) {
return false;
}
if(("event" in value &&
typeof value["event"] === "string") === false) {
return false;
}
if(("data" in value &&
typeof value["data"] === "string") === false) {
return false;
}
return true;
}
init();

Enter fullscreen mode Exit fullscreen mode




webrtc.controller.ts




export class WebRtcController {
private webcamStream: MediaStream|null = null;
private peerConnection: RTCPeerConnection|null = null;
private answerSentEvent: ((data: RTCSessionDescriptionInit) => void)|null = null;
private candidateSentEvent: ((data: RTCIceCandidate) => void)|null = null;
public init() {
const localVideo = document.getElementById("local_video") as HTMLVideoElement;
localVideo.addEventListener("canplay", () => {
const width = 320;
const height = localVideo.videoHeight / (localVideo.videoWidth/width);

localVideo.setAttribute("width", width.toString());
localVideo.setAttribute("height", height.toString());
}, false);
navigator.mediaDevices.getUserMedia({ video: true, audio: true })
.then(stream => {
localVideo.srcObject = stream;
localVideo.play();
this.webcamStream = stream;
// create a RTCPeerConnection after getting local MediaStream
this.connect();
})
.catch(err => console.error(An error occurred: </span><span class="p">${</span><span class="nx">err</span><span class="p">}</span><span class="s2">));
}
...
/** handle received offer and send answer /
public handleOffer(data: RTCSessionDescription|null|undefined) {
if(this.peerConnection == null ||
data == null) {
return;
}
this.peerConnection.setRemoteDescription(data);
this.peerConnection.createAnswer()
.then(answer => {
if(this.peerConnection != null) {
this.peerConnection.setLocalDescription(answer);
}
if(this.answerSentEvent != null) {
this.answerSentEvent(answer);
}
});
}
/* add ICE Candidate */
public handleCandidate(data: RTCIceCandidate|null|undefined) {
if(this.peerConnection == null ||
data == null) {
return;
}
this.peerConnection.addIceCandidate(data);
}
private connect() {
if(this.webcamStream == null) {
return;
}
this.peerConnection = new RTCPeerConnection({
iceServers: [{
urls: stun:stun.l.google.com:19302, // A STUN server
}]
});
// Add remote video tracks as new video elements.
this.peerConnection.ontrack = (ev) => {
if (ev.track.kind === "audio" ||
ev.streams[0] == null) {
return;
}

const remoteVideo = document.createElement("video");
remoteVideo.srcObject = ev.streams[0];
remoteVideo.autoplay = true;
remoteVideo.controls = true;
const videoArea = document.getElementById("remote_video_area") as HTMLElement;
videoArea.appendChild(remoteVideo);
ev.track.onmute = () => {
remoteVideo.play();
};
ev.streams[0].onremovetrack = () => {
if (remoteVideo.parentNode) {
remoteVideo.parentNode.removeChild(remoteVideo);
}
};
};
this.webcamStream.getTracks().forEach(track => {
if(this.peerConnection == null ||
this.webcamStream == null) {
return;
}
this.peerConnection.addTrack(track, this.webcamStream)
});
this.peerConnection.onicecandidate = ev => {
if (ev.candidate == null ||
this.candidateSentEvent == null) {
return;
}
this.candidateSentEvent(ev.candidate);
};
}

}

Enter fullscreen mode Exit fullscreen mode




Server-side

sseClient.go



package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"

<span class="s">"github.com/pion/webrtc/v3"</span>
Enter fullscreen mode Exit fullscreen mode

)

type SSEClient struct {
candidateFound chan webrtc.ICECandidate
changeConnectionState chan webrtc.PeerConnectionState
addTrack chan webrtc.TrackRemote
userName string
w http.ResponseWriter
}

func newSSEClient(userName string, w http.ResponseWriter) SSEClient {
return &SSEClient{
candidateFound: make(chan webrtc.ICECandidate),
changeConnectionState: make(chan webrtc.PeerConnectionState),
addTrack: make(chan *webrtc.TrackRemote),
userName: userName,
w: w,
}
}

func registerSSEClient(w http.ResponseWriter, r http.Request, hub SSEHub) {
userName, err := getParam(r, "user")
if err != nil {
log.Println(err.Error())
fmt.Fprint(w, "The parameters have no names")
return
}
newClient := newSSEClient(userName, w)
ps, err := NewPeerConnectionState(newClient)
if err != nil {
log.Println(err.Error())
fmt.Fprint(w, "Failed connection")
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

<span class="n">hub</span><span class="o">.</span><span class="n">register</span> <span class="o">&lt;-</span> <span class="n">ps</span>

<span class="c">// For pushing data to clients, I call "flusher.Flush()"</span>
<span class="n">flusher</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">w</span><span class="o">.</span><span class="p">(</span><span class="n">http</span><span class="o">.</span><span class="n">Flusher</span><span class="p">)</span>
<span class="k">defer</span> <span class="k">func</span><span class="p">()</span> <span class="p">{</span>
    <span class="n">hub</span><span class="o">.</span><span class="n">unregister</span> <span class="o">&lt;-</span> <span class="n">ps</span>
    <span class="k">if</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">ConnectionState</span><span class="p">()</span> <span class="o">!=</span> <span class="n">webrtc</span><span class="o">.</span><span class="n">PeerConnectionStateClosed</span> <span class="p">{</span>
        <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">Close</span><span class="p">()</span>
    <span class="p">}</span>
    <span class="nb">close</span><span class="p">(</span><span class="n">newClient</span><span class="o">.</span><span class="n">candidateFound</span><span class="p">)</span>
    <span class="nb">close</span><span class="p">(</span><span class="n">newClient</span><span class="o">.</span><span class="n">changeConnectionState</span><span class="p">)</span>
    <span class="nb">close</span><span class="p">(</span><span class="n">newClient</span><span class="o">.</span><span class="n">addTrack</span><span class="p">)</span>
<span class="p">}()</span>
<span class="k">for</span> <span class="p">{</span>
    <span class="c">// handle PeerConnection events and close SSE event.</span>
    <span class="k">select</span> <span class="p">{</span>
    <span class="k">case</span> <span class="n">candidate</span> <span class="o">:=</span> <span class="o">&lt;-</span><span class="n">newClient</span><span class="o">.</span><span class="n">candidateFound</span><span class="o">:</span>
        <span class="n">jsonValue</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">NewCandidateMessageJSON</span><span class="p">(</span><span class="n">newClient</span><span class="o">.</span><span class="n">userName</span><span class="p">,</span> <span class="n">candidate</span><span class="p">)</span>
        <span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
            <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="o">.</span><span class="n">Error</span><span class="p">())</span>
            <span class="k">return</span>
        <span class="p">}</span>
        <span class="n">fmt</span><span class="o">.</span><span class="n">Fprintf</span><span class="p">(</span><span class="n">w</span><span class="p">,</span> <span class="s">"data: %s</span><span class="se">\n\n</span><span class="s">"</span><span class="p">,</span> <span class="n">jsonValue</span><span class="p">)</span>
        <span class="n">flusher</span><span class="o">.</span><span class="n">Flush</span><span class="p">()</span>
    <span class="k">case</span> <span class="n">track</span> <span class="o">:=</span> <span class="o">&lt;-</span><span class="n">newClient</span><span class="o">.</span><span class="n">addTrack</span><span class="o">:</span>
        <span class="n">hub</span><span class="o">.</span><span class="n">addTrack</span> <span class="o">&lt;-</span> <span class="n">track</span>
    <span class="k">case</span> <span class="n">connectionState</span> <span class="o">:=</span> <span class="o">&lt;-</span><span class="n">newClient</span><span class="o">.</span><span class="n">changeConnectionState</span><span class="o">:</span>
        <span class="k">switch</span> <span class="n">connectionState</span> <span class="p">{</span>
        <span class="k">case</span> <span class="n">webrtc</span><span class="o">.</span><span class="n">PeerConnectionStateFailed</span><span class="o">:</span>
            <span class="k">return</span>
        <span class="k">case</span> <span class="n">webrtc</span><span class="o">.</span><span class="n">PeerConnectionStateClosed</span><span class="o">:</span>
            <span class="k">return</span>
        <span class="p">}</span>
    <span class="k">case</span> <span class="o">&lt;-</span><span class="n">r</span><span class="o">.</span><span class="n">Context</span><span class="p">()</span><span class="o">.</span><span class="n">Done</span><span class="p">()</span><span class="o">:</span>
        <span class="c">// when "es.close()" is called, this loop operation will be ended.</span>
        <span class="k">return</span>
    <span class="p">}</span>
<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}
func sendSSEMessage(w http.ResponseWriter, r http.Request, hub SSEHub) {
w.Header().Set("Content-Type", "application/json")
body, err := ioutil.ReadAll(r.Body)

<span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
    <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="o">.</span><span class="n">Error</span><span class="p">())</span>
    <span class="n">j</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">json</span><span class="o">.</span><span class="n">Marshal</span><span class="p">(</span><span class="n">GetFailed</span><span class="p">(</span><span class="s">"Failed reading values from body"</span><span class="p">))</span>
    <span class="n">w</span><span class="o">.</span><span class="n">Write</span><span class="p">(</span><span class="n">j</span><span class="p">)</span>
    <span class="k">return</span>
<span class="p">}</span>
<span class="n">message</span> <span class="o">:=</span> <span class="o">&amp;</span><span class="n">ClientMessage</span><span class="p">{}</span>
<span class="n">err</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">Unmarshal</span><span class="p">(</span><span class="n">body</span><span class="p">,</span> <span class="o">&amp;</span><span class="n">message</span><span class="p">)</span>
<span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
    <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="o">.</span><span class="n">Error</span><span class="p">())</span>
    <span class="n">j</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">json</span><span class="o">.</span><span class="n">Marshal</span><span class="p">(</span><span class="n">GetFailed</span><span class="p">(</span><span class="s">"Failed converting to ClientMessage"</span><span class="p">))</span>
    <span class="n">w</span><span class="o">.</span><span class="n">Write</span><span class="p">(</span><span class="n">j</span><span class="p">)</span>
    <span class="k">return</span>
<span class="p">}</span>
<span class="n">w</span><span class="o">.</span><span class="n">WriteHeader</span><span class="p">(</span><span class="m">200</span><span class="p">)</span>
<span class="n">hub</span><span class="o">.</span><span class="n">broadcast</span> <span class="o">&lt;-</span> <span class="o">*</span><span class="n">message</span>
<span class="n">data</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">json</span><span class="o">.</span><span class="n">Marshal</span><span class="p">(</span><span class="n">GetSucceeded</span><span class="p">())</span>
<span class="n">w</span><span class="o">.</span><span class="n">Write</span><span class="p">(</span><span class="n">data</span><span class="p">)</span>
Enter fullscreen mode Exit fullscreen mode

}

Enter fullscreen mode Exit fullscreen mode




peerConnectionState.go




package main

import (
"github.com/pion/webrtc/v3"
)

type PeerConnectionState struct {
peerConnection webrtc.PeerConnection
client SSEClient
}

func NewPeerConnectionState(client SSEClient) (PeerConnectionState, error) {
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{
"stun:stun.l.google.com:19302",
},
},
},
})
if err != nil {
return nil, err
}
for , typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
if , err := peerConnection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
}); err != nil {
return nil, err
}
}
// Add event handlers.
peerConnection.OnICECandidate(func(i webrtc.ICECandidate) {
if i == nil {
return
}
client.candidateFound <- i
})
peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
// avoid panic after closing channel
if p == webrtc.PeerConnectionStateClosed {
_, ok := <-client.changeConnectionState
if ok {
client.changeConnectionState <- p
}
return
}
client.changeConnectionState <- p
})
peerConnection.OnTrack(func(t webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
client.addTrack <- t
})

<span class="k">return</span> <span class="o">&amp;</span><span class="n">PeerConnectionState</span><span class="p">{</span>
    <span class="n">peerConnection</span><span class="o">:</span> <span class="n">peerConnection</span><span class="p">,</span>
    <span class="n">client</span><span class="o">:</span>         <span class="n">client</span><span class="p">,</span>
<span class="p">},</span> <span class="no">nil</span>
Enter fullscreen mode Exit fullscreen mode

}

Enter fullscreen mode Exit fullscreen mode




sseHub.go




package main

import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"

<span class="s">"github.com/pion/rtcp"</span>
<span class="s">"github.com/pion/webrtc/v3"</span>
Enter fullscreen mode Exit fullscreen mode

)

type SSEHub struct {
clients map[PeerConnectionState]bool
broadcast chan ClientMessage
register chan PeerConnectionState
unregister chan PeerConnectionState
trackLocals map[string]webrtc.TrackLocalStaticRTP
addTrack chan *webrtc.TrackRemote
}

func newSSEHub() SSEHub {
return &SSEHub{
clients: make(map[PeerConnectionState]bool),
broadcast: make(chan ClientMessage),
register: make(chan PeerConnectionState),
unregister: make(chan PeerConnectionState),
trackLocals: map[string]webrtc.TrackLocalStaticRTP{},
addTrack: make(chan webrtc.TrackRemote),
}
}
func (h SSEHub) run() {
go func() {
for range time.NewTicker(time.Second 3).C {
dispatchKeyFrame(h)
}
}()
for {
select {
case client := <-h.register:
h.clients[client] = true
signalPeerConnections(h)
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
signalPeerConnections(h)
}
case track := <-h.addTrack:
trackLocal, err := webrtc.NewTrackLocalStaticRTP(track.Codec().RTPCodecCapability,
track.ID(), track.StreamID())
if err != nil {
log.Println(err.Error())
return
}
h.trackLocals[track.ID()] = trackLocal
signalPeerConnections(h)
go updateTrackValue(h, track)

    <span class="k">case</span> <span class="n">message</span> <span class="o">:=</span> <span class="o">&lt;-</span><span class="n">h</span><span class="o">.</span><span class="n">broadcast</span><span class="o">:</span>
        <span class="n">handleReceivedMessage</span><span class="p">(</span><span class="n">h</span><span class="p">,</span> <span class="n">message</span><span class="p">)</span>
    <span class="p">}</span>
<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}
func updateTrackValue(h SSEHub, track webrtc.TrackRemote) {
defer func() {
delete(h.trackLocals, track.ID())
signalPeerConnections(h)
}()

<span class="n">buf</span> <span class="o">:=</span> <span class="nb">make</span><span class="p">([]</span><span class="kt">byte</span><span class="p">,</span> <span class="m">1500</span><span class="p">)</span>

<span class="k">for</span> <span class="p">{</span>
    <span class="n">i</span><span class="p">,</span> <span class="n">_</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">track</span><span class="o">.</span><span class="n">Read</span><span class="p">(</span><span class="n">buf</span><span class="p">)</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="k">return</span>
    <span class="p">}</span>
    <span class="k">if</span> <span class="n">_</span><span class="p">,</span> <span class="n">err</span> <span class="o">=</span> <span class="n">h</span><span class="o">.</span><span class="n">trackLocals</span><span class="p">[</span><span class="n">track</span><span class="o">.</span><span class="n">ID</span><span class="p">()]</span><span class="o">.</span><span class="n">Write</span><span class="p">(</span><span class="n">buf</span><span class="p">[</span><span class="o">:</span><span class="n">i</span><span class="p">]);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="k">return</span>
    <span class="p">}</span>
<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}
func handleReceivedMessage(h *SSEHub, message ClientMessage) {
switch message.Event {
case TextEvent:
m, _ := json.Marshal(message)
jsonText := string(m)

    <span class="k">for</span> <span class="n">client</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">h</span><span class="o">.</span><span class="n">clients</span> <span class="p">{</span>
        <span class="n">flusher</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">client</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">w</span><span class="o">.</span><span class="p">(</span><span class="n">http</span><span class="o">.</span><span class="n">Flusher</span><span class="p">)</span>

        <span class="n">fmt</span><span class="o">.</span><span class="n">Fprintf</span><span class="p">(</span><span class="n">client</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">w</span><span class="p">,</span> <span class="s">"data: %s</span><span class="se">\n\n</span><span class="s">"</span><span class="p">,</span> <span class="n">jsonText</span><span class="p">)</span>
        <span class="n">flusher</span><span class="o">.</span><span class="n">Flush</span><span class="p">()</span>
    <span class="p">}</span>
<span class="k">case</span> <span class="n">CandidateEvent</span><span class="o">:</span>
    <span class="n">candidate</span> <span class="o">:=</span> <span class="n">webrtc</span><span class="o">.</span><span class="n">ICECandidateInit</span><span class="p">{}</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">json</span><span class="o">.</span><span class="n">Unmarshal</span><span class="p">([]</span><span class="kt">byte</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">Data</span><span class="p">),</span> <span class="o">&amp;</span><span class="n">candidate</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
        <span class="k">return</span>
    <span class="p">}</span>
    <span class="k">for</span> <span class="n">pc</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">h</span><span class="o">.</span><span class="n">clients</span> <span class="p">{</span>
        <span class="k">if</span> <span class="n">pc</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">userName</span> <span class="o">==</span> <span class="n">message</span><span class="o">.</span><span class="n">UserName</span> <span class="p">{</span>
            <span class="k">if</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">pc</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">AddICECandidate</span><span class="p">(</span><span class="n">candidate</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
                <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
                <span class="k">return</span>
            <span class="p">}</span>
        <span class="p">}</span>
    <span class="p">}</span>
<span class="k">case</span> <span class="n">AnswerEvent</span><span class="o">:</span>
    <span class="n">answer</span> <span class="o">:=</span> <span class="n">webrtc</span><span class="o">.</span><span class="n">SessionDescription</span><span class="p">{}</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">json</span><span class="o">.</span><span class="n">Unmarshal</span><span class="p">([]</span><span class="kt">byte</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">Data</span><span class="p">),</span> <span class="o">&amp;</span><span class="n">answer</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
        <span class="k">return</span>
    <span class="p">}</span>
    <span class="k">for</span> <span class="n">pc</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">h</span><span class="o">.</span><span class="n">clients</span> <span class="p">{</span>
        <span class="k">if</span> <span class="n">pc</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">userName</span> <span class="o">==</span> <span class="n">message</span><span class="o">.</span><span class="n">UserName</span> <span class="p">{</span>
            <span class="k">if</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">pc</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">SetRemoteDescription</span><span class="p">(</span><span class="n">answer</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
                <span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
                <span class="k">return</span>
            <span class="p">}</span>
        <span class="p">}</span>
    <span class="p">}</span>

<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}
func signalPeerConnections(h SSEHub) {
defer func() {
dispatchKeyFrame(h)
}()
for syncAttempt := 0; ; syncAttempt++ {
if syncAttempt == 25 {
// Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
go func() {
time.Sleep(time.Second 3)
signalPeerConnections(h)
}()
return
}
// For ignoring errors like below, execute attemptSync multiple times.
// InvalidModificationError: invalid proposed signaling state transition: have-local-offer->SetLocal(offer)->have-local-offer
if !attemptSync(h) {
break
}
}
}
// Share received tracks to all connected peers.
func attemptSync(h *SSEHub) bool {
for ps := range h.clients {
if ps.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
delete(h.clients, ps)
// We modified the slice, start from the beginning
return true
}
existingSenders := map[string]bool{}

    <span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">sender</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">GetSenders</span><span class="p">()</span> <span class="p">{</span>
        <span class="k">if</span> <span class="n">sender</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span> <span class="o">==</span> <span class="no">nil</span> <span class="p">{</span>
            <span class="k">continue</span>
        <span class="p">}</span>
        <span class="n">existingSenders</span><span class="p">[</span><span class="n">sender</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span><span class="o">.</span><span class="n">ID</span><span class="p">()]</span> <span class="o">=</span> <span class="no">true</span>

        <span class="k">if</span> <span class="n">_</span><span class="p">,</span> <span class="n">ok</span> <span class="o">:=</span> <span class="n">h</span><span class="o">.</span><span class="n">trackLocals</span><span class="p">[</span><span class="n">sender</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span><span class="o">.</span><span class="n">ID</span><span class="p">()];</span> <span class="o">!</span><span class="n">ok</span> <span class="p">{</span>
            <span class="k">if</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">RemoveTrack</span><span class="p">(</span><span class="n">sender</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
                <span class="k">return</span> <span class="no">true</span>
            <span class="p">}</span>
        <span class="p">}</span>
    <span class="p">}</span>
    <span class="k">for</span> <span class="n">_</span><span class="p">,</span> <span class="n">receiver</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">GetReceivers</span><span class="p">()</span> <span class="p">{</span>
        <span class="k">if</span> <span class="n">receiver</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span> <span class="o">==</span> <span class="no">nil</span> <span class="p">{</span>
            <span class="k">continue</span>
        <span class="p">}</span>
        <span class="n">existingSenders</span><span class="p">[</span><span class="n">receiver</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span><span class="o">.</span><span class="n">ID</span><span class="p">()]</span> <span class="o">=</span> <span class="no">true</span>
    <span class="p">}</span>
    <span class="k">for</span> <span class="n">trackID</span> <span class="o">:=</span> <span class="k">range</span> <span class="n">h</span><span class="o">.</span><span class="n">trackLocals</span> <span class="p">{</span>
        <span class="k">if</span> <span class="n">_</span><span class="p">,</span> <span class="n">ok</span> <span class="o">:=</span> <span class="n">existingSenders</span><span class="p">[</span><span class="n">trackID</span><span class="p">];</span> <span class="o">!</span><span class="n">ok</span> <span class="p">{</span>
            <span class="k">if</span> <span class="n">_</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">AddTrack</span><span class="p">(</span><span class="n">h</span><span class="o">.</span><span class="n">trackLocals</span><span class="p">[</span><span class="n">trackID</span><span class="p">]);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
                <span class="k">return</span> <span class="no">true</span>
            <span class="p">}</span>
        <span class="p">}</span>
    <span class="p">}</span>

    <span class="n">offer</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">CreateOffer</span><span class="p">(</span><span class="no">nil</span><span class="p">)</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="k">return</span> <span class="no">true</span>
    <span class="p">}</span>
    <span class="n">messageJSON</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">NewOfferMessageJSON</span><span class="p">(</span><span class="n">ps</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">userName</span><span class="p">,</span> <span class="n">offer</span><span class="p">)</span>
    <span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="k">return</span> <span class="no">true</span>
    <span class="p">}</span>

    <span class="k">if</span> <span class="n">err</span> <span class="o">=</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">SetLocalDescription</span><span class="p">(</span><span class="n">offer</span><span class="p">);</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
        <span class="k">return</span> <span class="no">true</span>
    <span class="p">}</span>
    <span class="n">flusher</span><span class="p">,</span> <span class="n">_</span> <span class="o">:=</span> <span class="n">ps</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">w</span><span class="o">.</span><span class="p">(</span><span class="n">http</span><span class="o">.</span><span class="n">Flusher</span><span class="p">)</span>

    <span class="n">fmt</span><span class="o">.</span><span class="n">Fprintf</span><span class="p">(</span><span class="n">ps</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">w</span><span class="p">,</span> <span class="s">"data: %s</span><span class="se">\n\n</span><span class="s">"</span><span class="p">,</span> <span class="n">messageJSON</span><span class="p">)</span>
    <span class="n">flusher</span><span class="o">.</span><span class="n">Flush</span><span class="p">()</span>
<span class="p">}</span>
<span class="k">return</span> <span class="no">false</span>
Enter fullscreen mode Exit fullscreen mode

}
func dispatchKeyFrame(h *SSEHub) {
for ps := range h.clients {
for _, receiver := range ps.peerConnection.GetReceivers() {
if receiver.Track() == nil {
continue
}

        <span class="n">_</span> <span class="o">=</span> <span class="n">ps</span><span class="o">.</span><span class="n">peerConnection</span><span class="o">.</span><span class="n">WriteRTCP</span><span class="p">([]</span><span class="n">rtcp</span><span class="o">.</span><span class="n">Packet</span><span class="p">{</span>
            <span class="o">&amp;</span><span class="n">rtcp</span><span class="o">.</span><span class="n">PictureLossIndication</span><span class="p">{</span>
                <span class="n">MediaSSRC</span><span class="o">:</span> <span class="kt">uint32</span><span class="p">(</span><span class="n">receiver</span><span class="o">.</span><span class="n">Track</span><span class="p">()</span><span class="o">.</span><span class="n">SSRC</span><span class="p">()),</span>
            <span class="p">},</span>
        <span class="p">})</span>
    <span class="p">}</span>
<span class="p">}</span>
Enter fullscreen mode Exit fullscreen mode

}

Enter fullscreen mode Exit fullscreen mode




Channels

I create channels in SSEClient and SSEHub.
I tried adding some channels into SSEClient to send messages from SSEHub first.

But if I did that, the application hang when I sent text messages after connecting WebRTC.
Because I think the cause is a circular reference, I remove these channels and send messages from SSEHub.

Resources

Top comments (0)