Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add video support #35

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions pkg/media/h264/h264.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package h264

import (
m "github.com/livekit/sip/pkg/media"
"github.com/pion/webrtc/v3/pkg/media"
"time"
)

type Sample []byte

type Encoder struct {
w m.Writer[Sample]
buf Sample
}

func (e *Encoder) WriteSample(in Sample) error {
return e.w.WriteSample(in)

Check warning on line 31 in pkg/media/h264/h264.go

View check run for this annotation

Codecov / codecov/patch

pkg/media/h264/h264.go#L30-L31

Added lines #L30 - L31 were not covered by tests
}

func Encode(w m.Writer[Sample]) m.Writer[Sample] {
return &Encoder{w: w}

Check warning on line 35 in pkg/media/h264/h264.go

View check run for this annotation

Codecov / codecov/patch

pkg/media/h264/h264.go#L34-L35

Added lines #L34 - L35 were not covered by tests
}

type SampleWriter interface {
WriteSample(sample media.Sample) error
}

func BuildSampleWriter[T ~[]byte](w SampleWriter, sampleDur time.Duration) m.Writer[T] {
return m.WriterFunc[T](func(in T) error {
data := make([]byte, len(in))
copy(data, in)
return w.WriteSample(media.Sample{Data: data})
})

Check warning on line 47 in pkg/media/h264/h264.go

View check run for this annotation

Codecov / codecov/patch

pkg/media/h264/h264.go#L42-L47

Added lines #L42 - L47 were not covered by tests
}
69 changes: 48 additions & 21 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import (
"context"
"fmt"
"github.com/livekit/sip/pkg/media/h264"
"sync/atomic"
"time"

Expand Down Expand Up @@ -54,7 +55,6 @@

h := req.GetHeader("Proxy-Authorization")
if h == nil {
logger.Infow(fmt.Sprintf("Requesting inbound auth for %s", from), "from", from)
inviteState.challenge = digest.Challenge{
Realm: UserAgent,
Nonce: fmt.Sprintf("%d", time.Now().UnixMicro()),
Expand Down Expand Up @@ -117,14 +117,12 @@
src := req.Source()

s.mon.InviteReq(false, from.Address.String(), to.Address.String())
logger.Infow(fmt.Sprintf("INVITE from %q to %q", from.Address.String(), to.Address.String()),
"tag", tag, "from", from, "to", to)
logger.Infow("INVITE", "tag", tag, "from", from, "to", to)

username, password, err := s.authHandler(from.Address.User, to.Address.User, to.Address.Host, src)
if err != nil {
s.mon.InviteError(false, from.Address.String(), to.Address.String(), "no-rule")
logger.Warnw(fmt.Sprintf("Rejecting inbound call to %q, doesn't match any Trunks", to.Address.String()), err,
"tag", tag, "src", src, "from", from, "to", to, "to-host", to.Address.Host)
logger.Warnw("Rejecting inbound call, doesn't match any Trunks", err, "tag", tag, "src", src, "from", from, "to", to, "to-host", to.Address.Host)
sipErrorResponse(tx, req)
return
}
Expand Down Expand Up @@ -166,8 +164,10 @@
from *sip.FromHeader
to *sip.ToHeader
src string
rtpConn *MediaConn
audioRtpConn *MediaConn
videoRtpConn *MediaConn
audioHandler atomic.Pointer[rtp.Handler]
videoHandler atomic.Pointer[rtp.Handler]
dtmf chan byte // buffered; DTMF digits as characters
lkRoom *Room // LiveKit room; only active after correct pin is entered
done atomic.Bool
Expand Down Expand Up @@ -256,23 +256,34 @@
}

func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answerData []byte, _ error) {
conn := NewMediaConn()
conn.OnRTP(c)
if err := conn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil {
audioConn := NewMediaConn()
audioConn.OnRTP(c)
if err := audioConn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil {

Check warning on line 261 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L259-L261

Added lines #L259 - L261 were not covered by tests
return nil, err
}
c.rtpConn = conn
c.audioRtpConn = audioConn

videoConn := NewMediaConn()
videoConn.OnRTP(c)
if err := videoConn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil {
return nil, err
}
c.videoRtpConn = videoConn

Check warning on line 272 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L264-L272

Added lines #L264 - L272 were not covered by tests
offer := sdp.SessionDescription{}
if err := offer.Unmarshal(offerData); err != nil {
return nil, err
}

// Encoding pipeline (LK -> SIP)
// Need to be created earlier to send the pin prompts.
s := rtp.NewMediaStreamOut[ulaw.Sample](conn, rtpPacketDur)
c.lkRoom.SetOutput(ulaw.Encode(s))
aus := rtp.NewMediaStreamOut[ulaw.Sample](audioConn, rtpPacketDur)
c.lkRoom.SetAudioOutput(ulaw.Encode(aus))

vis := rtp.NewMediaStreamOut[h264.Sample](videoConn, rtpPacketDur)
c.lkRoom.SetVideoOutput(h264.Encode(vis))

Check warning on line 284 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L280-L284

Added lines #L280 - L284 were not covered by tests

return sdpGenerateAnswer(offer, c.s.signalingIp, conn.LocalAddr().Port)
return sdpGenerateAnswer(offer, c.s.signalingIp, audioConn.LocalAddr().Port, videoConn.LocalAddr().Port)

Check warning on line 286 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L286

Added line #L286 was not covered by tests
}

func (c *inboundCall) pinPrompt(ctx context.Context) {
Expand Down Expand Up @@ -343,9 +354,13 @@
p.Close()
c.lkRoom = nil
}
if c.rtpConn != nil {
c.rtpConn.Close()
c.rtpConn = nil
if c.audioRtpConn != nil {
c.audioRtpConn.Close()
c.audioRtpConn = nil
}
if c.videoRtpConn != nil {
c.videoRtpConn.Close()
c.videoRtpConn = nil

Check warning on line 363 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L357-L363

Added lines #L357 - L363 were not covered by tests
}
close(c.dtmf)
}
Expand All @@ -356,9 +371,17 @@
return nil
}
// TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it?
if h := c.audioHandler.Load(); h != nil {
return (*h).HandleRTP(p)
//logger.Infow("------------ pt == ", p.PayloadType)
if p.PayloadType < 96 {
if h := c.audioHandler.Load(); h != nil {
return (*h).HandleRTP(p)
}
} else {
if h := c.videoHandler.Load(); h != nil {
return (*h).HandleRTP(p)
}

Check warning on line 382 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L375-L382

Added lines #L375 - L382 were not covered by tests
}

return nil
}

Expand All @@ -372,17 +395,21 @@
if err != nil {
return err
}
local, err := c.lkRoom.NewParticipant()
audioTrack, videoTrack, err := c.lkRoom.NewParticipant()

Check warning on line 398 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L398

Added line #L398 was not covered by tests
if err != nil {
_ = c.lkRoom.Close()
return err
}

// Decoding pipeline (SIP -> LK)
law := ulaw.Decode(local)
// Decoding pipeline (SIP -> LK) audio
law := ulaw.Decode(audioTrack)
//构造NewMediaStreamIn 通过readLoop方法传入rtp byte数据,通过law decode为int16数据传给local (pw)

Check warning on line 406 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L405-L406

Added lines #L405 - L406 were not covered by tests
var h rtp.Handler = rtp.NewMediaStreamIn(law)
c.audioHandler.Store(&h)

var vh rtp.Handler = rtp.NewMediaStreamIn(videoTrack)
c.videoHandler.Store(&vh)

Check warning on line 412 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L410-L412

Added lines #L410 - L412 were not covered by tests
return nil
}

Expand Down
57 changes: 39 additions & 18 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import (
"context"
"fmt"
"github.com/livekit/sip/pkg/media/h264"
"sync"

"github.com/emiago/sipgo/sip"
Expand All @@ -40,13 +41,15 @@
type outboundCall struct {
c *Client
participantID string
rtpConn *MediaConn
audioRtpConn *MediaConn
videoRtpConn *MediaConn

mu sync.RWMutex
mediaRunning bool
lkCur lkRoomConfig
lkRoom *Room
lkRoomIn media.Writer[media.PCM16Sample]
lkRoomAudioIn media.Writer[media.PCM16Sample]
lkRoomVideoIn media.Writer[h264.Sample]
sipCur sipOutboundConfig
sipInviteReq *sip.Request
sipInviteResp *sip.Response
Expand Down Expand Up @@ -79,7 +82,8 @@
call := &outboundCall{
c: c,
participantID: participantId,
rtpConn: NewMediaConn(),
audioRtpConn: NewMediaConn(),
videoRtpConn: NewMediaConn(),

Check warning on line 86 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}
return call
}
Expand All @@ -92,19 +96,23 @@
}

func (c *outboundCall) close() {
c.rtpConn.OnRTP(nil)
c.lkRoom.SetOutput(nil)
c.audioRtpConn.OnRTP(nil)
c.videoRtpConn.OnRTP(nil)
c.lkRoom.SetAudioOutput(nil)
c.lkRoom.SetVideoOutput(nil)

Check warning on line 102 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L99-L102

Added lines #L99 - L102 were not covered by tests

if c.mediaRunning {
_ = c.rtpConn.Close()
_ = c.audioRtpConn.Close()
_ = c.videoRtpConn.Close()

Check warning on line 106 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L105-L106

Added lines #L105 - L106 were not covered by tests
}
c.mediaRunning = false

if c.lkRoom != nil {
_ = c.lkRoom.Close()
}
c.lkRoom = nil
c.lkRoomIn = nil
c.lkRoomAudioIn = nil
c.lkRoomVideoIn = nil

Check warning on line 115 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L114-L115

Added lines #L114 - L115 were not covered by tests
c.lkCur = lkRoomConfig{}

c.stopSIP()
Expand Down Expand Up @@ -155,7 +163,10 @@
if c.mediaRunning {
return nil
}
if err := c.rtpConn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil {
if err := c.audioRtpConn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil {
return err
}
if err := c.videoRtpConn.Start(conf.RTPPort.Start, conf.RTPPort.End, "0.0.0.0"); err != nil {

Check warning on line 169 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L166-L169

Added lines #L166 - L169 were not covered by tests
return err
}
c.mediaRunning = true
Expand All @@ -169,19 +180,21 @@
if c.lkRoom != nil {
_ = c.lkRoom.Close()
c.lkRoom = nil
c.lkRoomIn = nil
c.lkRoomAudioIn = nil
c.lkRoomVideoIn = nil

Check warning on line 184 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L183-L184

Added lines #L183 - L184 were not covered by tests
}
r, err := ConnectToRoom(c.c.conf, lkNew.roomName, lkNew.identity)
if err != nil {
return err
}
local, err := r.NewParticipant()
audioTrack, videoTrack, err := r.NewParticipant()

Check warning on line 190 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L190

Added line #L190 was not covered by tests
if err != nil {
_ = r.Close()
return err
}
c.lkRoom = r
c.lkRoomIn = local
c.lkRoomAudioIn = audioTrack
c.lkRoomVideoIn = videoTrack

Check warning on line 197 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L196-L197

Added lines #L196 - L197 were not covered by tests
c.lkCur = lkNew
return nil
}
Expand All @@ -201,17 +214,25 @@

func (c *outboundCall) relinkMedia() {
if c.lkRoom == nil || !c.mediaRunning {
c.lkRoom.SetOutput(nil)
c.rtpConn.OnRTP(nil)
c.lkRoom.SetAudioOutput(nil)
c.lkRoom.SetVideoOutput(nil)
c.audioRtpConn.OnRTP(nil)
c.videoRtpConn.OnRTP(nil)

Check warning on line 220 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L217-L220

Added lines #L217 - L220 were not covered by tests
return
}
// Encoding pipeline (LK -> SIP)
s := rtp.NewMediaStreamOut[ulaw.Sample](c.rtpConn, rtpPacketDur)
c.lkRoom.SetOutput(ulaw.Encode(s))
aus := rtp.NewMediaStreamOut[ulaw.Sample](c.audioRtpConn, rtpPacketDur)
c.lkRoom.SetAudioOutput(ulaw.Encode(aus))

vis := rtp.NewMediaStreamOut[h264.Sample](c.videoRtpConn, rtpPacketDur)
c.lkRoom.SetVideoOutput(h264.Encode(vis))

Check warning on line 228 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L224-L228

Added lines #L224 - L228 were not covered by tests

// Decoding pipeline (SIP -> LK)
law := ulaw.Decode(c.lkRoomIn)
c.rtpConn.OnRTP(rtp.NewMediaStreamIn(law))
law := ulaw.Decode(c.lkRoomAudioIn)
c.audioRtpConn.OnRTP(rtp.NewMediaStreamIn(law))

var vh rtp.Handler = rtp.NewMediaStreamIn(c.lkRoomVideoIn)
c.videoRtpConn.OnRTP(vh)

Check warning on line 235 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L231-L235

Added lines #L231 - L235 were not covered by tests
}

func (c *outboundCall) SendDTMF(ctx context.Context, digits string) error {
Expand Down Expand Up @@ -250,7 +271,7 @@
}

func (c *outboundCall) sipSignal(conf sipOutboundConfig) error {
offer, err := sdpGenerateOffer(c.c.signalingIp, c.rtpConn.LocalAddr().Port)
offer, err := sdpGenerateOffer(c.c.signalingIp, c.audioRtpConn.LocalAddr().Port, c.videoRtpConn.LocalAddr().Port)

Check warning on line 274 in pkg/sip/outbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/outbound.go#L274

Added line #L274 was not covered by tests
if err != nil {
return err
}
Expand Down
Loading
Loading