Skip to content

Commit

Permalink
webrtc: fix WHIP/WHEP implementation (#1857) (#1861)
Browse files Browse the repository at this point in the history
offers and answers are now encoded in SDP in place of JSON; Location
header is set by the server.

This fixes compatibility with GStreamer and whipsink
  • Loading branch information
aler9 authored May 24, 2023
1 parent 834cade commit 99aa0d0
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 48 deletions.
1 change: 1 addition & 0 deletions internal/core/webrtc_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
ctx.Writer.Header().Set("E-Tag", res.sx.secret.String())
ctx.Writer.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag")
ctx.Writer.Header()["Link"] = iceServersToLinkHeader(s.parent.genICEServers())
ctx.Writer.Header().Set("Location", ctx.Request.URL.String())
ctx.Writer.WriteHeader(http.StatusCreated)
ctx.Writer.Write(res.answer)

Expand Down
27 changes: 13 additions & 14 deletions internal/core/webrtc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package core

import (
"bytes"
"encoding/json"
"io"
"net/http"
"sync"
"testing"
Expand Down Expand Up @@ -38,10 +38,7 @@ func whipGetICEServers(t *testing.T, hc *http.Client, ur string) []webrtc.ICESer
func whipPostOffer(t *testing.T, hc *http.Client, ur string,
offer *webrtc.SessionDescription,
) (*webrtc.SessionDescription, string) {
enc, err := json.Marshal(offer)
require.NoError(t, err)

req, err := http.NewRequest("POST", ur, bytes.NewReader(enc))
req, err := http.NewRequest("POST", ur, bytes.NewReader([]byte(offer.SDP)))
require.NoError(t, err)

req.Header.Set("Content-Type", "application/sdp")
Expand All @@ -51,22 +48,27 @@ func whipPostOffer(t *testing.T, hc *http.Client, ur string,
defer res.Body.Close()

require.Equal(t, http.StatusCreated, res.StatusCode)
require.Equal(t, "application/sdp", res.Header.Get("Content-Type"))
require.Equal(t, "application/trickle-ice-sdpfrag", res.Header.Get("Accept-Patch"))
require.Equal(t, req.URL.Path, res.Header.Get("Location"))

link, ok := res.Header["Link"]
require.Equal(t, true, ok)
servers := linkHeaderToIceServers(link)
require.NotEqual(t, 0, len(servers))

require.Equal(t, "application/sdp", res.Header.Get("Content-Type"))
etag := res.Header.Get("E-Tag")
require.NotEqual(t, 0, len(etag))
require.Equal(t, "application/trickle-ice-sdpfrag", res.Header.Get("Accept-Patch"))

var answer webrtc.SessionDescription
err = json.NewDecoder(res.Body).Decode(&answer)
sdp, err := io.ReadAll(res.Body)
require.NoError(t, err)

return &answer, etag
answer := &webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: string(sdp),
}

return answer, etag
}

func whipPostCandidate(t *testing.T, ur string, offer *webrtc.SessionDescription,
Expand Down Expand Up @@ -323,10 +325,7 @@ func TestWebRTCReadNotFound(t *testing.T) {
offer, err := pc.CreateOffer(nil)
require.NoError(t, err)

enc, err := json.Marshal(offer)
require.NoError(t, err)

req, err := http.NewRequest("POST", "http://localhost:8889/stream/whep", bytes.NewReader(enc))
req, err := http.NewRequest("POST", "http://localhost:8889/stream/whep", bytes.NewReader([]byte(offer.SDP)))
require.NoError(t, err)

req.Header.Set("Content-Type", "application/sdp")
Expand Down
9 changes: 6 additions & 3 deletions internal/core/webrtc_publish_index.html
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,19 @@
headers: {
'Content-Type': 'application/sdp',
},
body: JSON.stringify(desc),
body: desc.sdp,
})
.then((res) => {
if (res.status !== 201) {
throw new Error('bad status code');
}
this.eTag = res.headers.get('E-Tag');
return res.json();
return res.text();
})
.then((answer) => this.onRemoteDescription(answer))
.then((sdp) => this.onRemoteDescription(new RTCSessionDescription({
type: 'answer',
sdp,
})))
.catch((err) => {
console.log('error: ' + err);
this.scheduleRestart();
Expand Down
9 changes: 6 additions & 3 deletions internal/core/webrtc_read_index.html
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,19 @@
headers: {
'Content-Type': 'application/sdp',
},
body: JSON.stringify(desc),
body: desc.sdp,
})
.then((res) => {
if (res.status !== 201) {
throw new Error('bad status code');
}
this.eTag = res.headers.get('E-Tag');
return res.json();
return res.text();
})
.then((answer) => this.onRemoteDescription(answer))
.then((sdp) => this.onRemoteDescription(new RTCSessionDescription({
type: 'answer',
sdp,
})))
.catch((err) => {
console.log('error: ' + err);
this.scheduleRestart();
Expand Down
35 changes: 7 additions & 28 deletions internal/core/webrtc_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package core
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -256,11 +255,6 @@ func (s *webRTCSession) runPublish() (int, error) {

defer res.path.publisherRemove(pathPublisherRemoveReq{author: s})

offer, err := s.decodeOffer()
if err != nil {
return http.StatusBadRequest, err
}

pc, err := newPeerConnection(
s.req.videoCodec,
s.req.audioCodec,
Expand Down Expand Up @@ -297,6 +291,7 @@ func (s *webRTCSession) runPublish() (int, error) {
}
})

offer := s.buildOffer()
err = pc.SetRemoteDescription(*offer)
if err != nil {
return http.StatusBadRequest, err
Expand Down Expand Up @@ -390,11 +385,6 @@ func (s *webRTCSession) runRead() (int, error) {
return http.StatusBadRequest, err
}

offer, err := s.decodeOffer()
if err != nil {
return http.StatusBadRequest, err
}

pc, err := newPeerConnection(
"",
"",
Expand All @@ -416,6 +406,7 @@ func (s *webRTCSession) runRead() (int, error) {
}
}

offer := s.buildOffer()
err = pc.SetRemoteDescription(*offer)
if err != nil {
return http.StatusBadRequest, err
Expand Down Expand Up @@ -484,18 +475,11 @@ func (s *webRTCSession) runRead() (int, error) {
}
}

func (s *webRTCSession) decodeOffer() (*webrtc.SessionDescription, error) {
var offer webrtc.SessionDescription
err := json.Unmarshal(s.req.offer, &offer)
if err != nil {
return nil, err
}

if offer.Type != webrtc.SDPTypeOffer {
return nil, fmt.Errorf("received SDP is not an offer")
func (s *webRTCSession) buildOffer() *webrtc.SessionDescription {
return &webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(s.req.offer),
}

return &offer, nil
}

func (s *webRTCSession) waitGatheringDone(pc *peerConnection) error {
Expand All @@ -511,15 +495,10 @@ func (s *webRTCSession) waitGatheringDone(pc *peerConnection) error {
}

func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) error {
enc, err := json.Marshal(answer)
if err != nil {
return err
}

select {
case s.req.res <- webRTCSessionNewRes{
sx: s,
answer: enc,
answer: []byte(answer.SDP),
}:
s.answerSent = true
case <-s.ctx.Done():
Expand Down

0 comments on commit 99aa0d0

Please sign in to comment.