Skip to content

Commit

Permalink
cherry picked a/v bin updates (#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Sep 8, 2023
1 parent b21df23 commit 60762ef
Show file tree
Hide file tree
Showing 3 changed files with 429 additions and 334 deletions.
219 changes: 133 additions & 86 deletions pkg/pipeline/builder/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,124 +16,167 @@ package builder

import (
"fmt"
"sync"

"github.com/tinyzimmer/go-gst/gst"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/gstreamer"
"github.com/livekit/egress/pkg/types"
lksdk "github.com/livekit/server-sdk-go"
)

const audioMixerLatency = uint64(2e9)

func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*gstreamer.Bin, error) {
b := pipeline.NewBin("audio")
type AudioBin struct {
bin *gstreamer.Bin
conf *config.PipelineConfig

mu sync.Mutex
tracks map[string]struct{}
}

func BuildAudioBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) error {
b := &AudioBin{
bin: pipeline.NewBin("audio"),
conf: p,
tracks: make(map[string]struct{}),
}

switch p.SourceType {
case types.SourceTypeSDK:
if err := buildSDKAudioInput(b, p); err != nil {
return nil, err
case types.SourceTypeWeb:
if err := b.buildWebInput(); err != nil {
return err
}

case types.SourceTypeWeb:
if err := buildWebAudioInput(b, p); err != nil {
return nil, err
case types.SourceTypeSDK:
if err := b.buildSDKInput(); err != nil {
return err
}

pipeline.AddOnTrackAdded(b.onTrackAdded)
pipeline.AddOnTrackRemoved(b.onTrackRemoved)
}

if len(p.Outputs) > 1 {
tee, err := gst.NewElementWithName("tee", "audio_tee")
if err != nil {
return nil, err
return err
}

if err = b.AddElement(tee); err != nil {
return nil, err
if err = b.bin.AddElement(tee); err != nil {
return err
}
} else {
queue, err := gstreamer.BuildQueue("audio_queue", p.Latency, true)
if err != nil {
return nil, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}
if err = b.AddElement(queue); err != nil {
return nil, err
if err = b.bin.AddElement(queue); err != nil {
return err
}
}

return b, nil
return pipeline.AddSourceBin(b.bin)
}

func (b *AudioBin) onTrackAdded(ts *config.TrackSource) {
if b.bin.GetState() > gstreamer.StateRunning {
return
}

if ts.Kind == lksdk.TrackKindAudio {
if err := b.addAudioAppSrcBin(ts); err != nil {
b.bin.OnError(err)
}
}
}

func buildWebAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error {
func (b *AudioBin) onTrackRemoved(trackID string) {
if b.bin.GetState() > gstreamer.StateRunning {
return
}

b.mu.Lock()
_, ok := b.tracks[trackID]
delete(b.tracks, trackID)
b.mu.Unlock()

if ok {
if _, err := b.bin.RemoveSourceBin(trackID); err != nil {
b.bin.OnError(err)
}
}
}

func (b *AudioBin) buildWebInput() error {
pulseSrc, err := gst.NewElement("pulsesrc")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = pulseSrc.SetProperty("device", fmt.Sprintf("%s.monitor", p.Info.EgressId)); err != nil {
if err = pulseSrc.SetProperty("device", fmt.Sprintf("%s.monitor", b.conf.Info.EgressId)); err != nil {
return errors.ErrGstPipelineError(err)
}
if err = b.AddElement(pulseSrc); err != nil {
if err = b.bin.AddElement(pulseSrc); err != nil {
return err
}

if err = addAudioConverter(b, p); err != nil {
if err = addAudioConverter(b.bin, b.conf); err != nil {
return err
}

if p.AudioTranscoding {
if err = addAudioEncoder(b, p); err != nil {
if b.conf.AudioTranscoding {
if err = b.addEncoder(); err != nil {
return err
}
}

return nil
}

func buildSDKAudioInput(b *gstreamer.Bin, p *config.PipelineConfig) error {
if p.AudioTrack != nil {
if err := buildAudioAppSrcBin(b, p); err != nil {
func (b *AudioBin) buildSDKInput() error {
if b.conf.AudioTrack != nil {
if err := b.addAudioAppSrcBin(b.conf.AudioTrack); err != nil {
return err
}
}
if err := buildAudioTestSrcBin(b, p); err != nil {
if err := b.addAudioTestSrcBin(); err != nil {
return err
}
if err := addAudioMixer(b, p); err != nil {
if err := b.addMixer(); err != nil {
return err
}
if p.AudioTranscoding {
if err := addAudioEncoder(b, p); err != nil {
if b.conf.AudioTranscoding {
if err := b.addEncoder(); err != nil {
return err
}
}

return nil
}

func buildAudioAppSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) error {
track := p.AudioTrack
func (b *AudioBin) addAudioAppSrcBin(ts *config.TrackSource) error {
b.mu.Lock()
defer b.mu.Unlock()

b.tracks[ts.TrackID] = struct{}{}

b := audioBin.NewBin(track.TrackID)
b.SetEOSFunc(func() bool {
appSrcBin := b.bin.NewBin(ts.TrackID)
appSrcBin.SetEOSFunc(func() bool {
return false
})
if err := audioBin.AddSourceBin(b); err != nil {
ts.AppSrc.Element.SetArg("format", "time")
if err := ts.AppSrc.Element.SetProperty("is-live", true); err != nil {
return err
}

track.AppSrc.Element.SetArg("format", "time")
if err := track.AppSrc.Element.SetProperty("is-live", true); err != nil {
return err
}
if err := b.AddElement(track.AppSrc.Element); err != nil {
if err := appSrcBin.AddElement(ts.AppSrc.Element); err != nil {
return err
}

switch track.MimeType {
switch ts.MimeType {
case types.MimeTypeOpus:
if err := track.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf(
if err := ts.AppSrc.Element.SetProperty("caps", gst.NewCapsFromString(fmt.Sprintf(
"application/x-rtp,media=audio,payload=%d,encoding-name=OPUS,clock-rate=%d",
track.PayloadType, track.ClockRate,
ts.PayloadType, ts.ClockRate,
))); err != nil {
return errors.ErrGstPipelineError(err)
}
Expand All @@ -148,24 +191,28 @@ func buildAudioAppSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) erro
return errors.ErrGstPipelineError(err)
}

if err = b.AddElements(rtpOpusDepay, opusDec); err != nil {
if err = appSrcBin.AddElements(rtpOpusDepay, opusDec); err != nil {
return err
}

default:
return errors.ErrNotSupported(string(track.MimeType))
return errors.ErrNotSupported(string(ts.MimeType))
}

if err := addAudioConverter(b, p); err != nil {
if err := addAudioConverter(appSrcBin, b.conf); err != nil {
return err
}

if err := b.bin.AddSourceBin(appSrcBin); err != nil {
return err
}

return nil
}

func buildAudioTestSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) error {
b := audioBin.NewBin("audio_test_src")
if err := audioBin.AddSourceBin(b); err != nil {
func (b *AudioBin) addAudioTestSrcBin() error {
testSrcBin := b.bin.NewBin("audio_test_src")
if err := b.bin.AddSourceBin(testSrcBin); err != nil {
return err
}

Expand All @@ -183,39 +230,15 @@ func buildAudioTestSrcBin(audioBin *gstreamer.Bin, p *config.PipelineConfig) err
return errors.ErrGstPipelineError(err)
}

audioCaps, err := newAudioCapsFilter(p)
if err != nil {
return err
}

return b.AddElements(audioTestSrc, audioCaps)
}

func addAudioConverter(b *gstreamer.Bin, p *config.PipelineConfig) error {
audioQueue, err := gstreamer.BuildQueue("audio_input_queue", p.Latency, true)
if err != nil {
return err
}

audioConvert, err := gst.NewElement("audioconvert")
if err != nil {
return errors.ErrGstPipelineError(err)
}

audioResample, err := gst.NewElement("audioresample")
if err != nil {
return errors.ErrGstPipelineError(err)
}

capsFilter, err := newAudioCapsFilter(p)
audioCaps, err := newAudioCapsFilter(b.conf)
if err != nil {
return err
}

return b.AddElements(audioQueue, audioConvert, audioResample, capsFilter)
return testSrcBin.AddElements(audioTestSrc, audioCaps)
}

func addAudioMixer(b *gstreamer.Bin, p *config.PipelineConfig) error {
func (b *AudioBin) addMixer() error {
audioMixer, err := gst.NewElement("audiomixer")
if err != nil {
return errors.ErrGstPipelineError(err)
Expand All @@ -224,44 +247,68 @@ func addAudioMixer(b *gstreamer.Bin, p *config.PipelineConfig) error {
return errors.ErrGstPipelineError(err)
}

mixedCaps, err := newAudioCapsFilter(p)
mixedCaps, err := newAudioCapsFilter(b.conf)
if err != nil {
return err
}

return b.AddElements(audioMixer, mixedCaps)
return b.bin.AddElements(audioMixer, mixedCaps)
}

func addAudioEncoder(b *gstreamer.Bin, p *config.PipelineConfig) error {
switch p.AudioOutCodec {
func (b *AudioBin) addEncoder() error {
switch b.conf.AudioOutCodec {
case types.MimeTypeOpus:
opusEnc, err := gst.NewElement("opusenc")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = opusEnc.SetProperty("bitrate", int(p.AudioBitrate*1000)); err != nil {
if err = opusEnc.SetProperty("bitrate", int(b.conf.AudioBitrate*1000)); err != nil {
return errors.ErrGstPipelineError(err)
}
return b.AddElement(opusEnc)
return b.bin.AddElement(opusEnc)

case types.MimeTypeAAC:
faac, err := gst.NewElement("faac")
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = faac.SetProperty("bitrate", int(p.AudioBitrate*1000)); err != nil {
if err = faac.SetProperty("bitrate", int(b.conf.AudioBitrate*1000)); err != nil {
return errors.ErrGstPipelineError(err)
}
return b.AddElement(faac)
return b.bin.AddElement(faac)

case types.MimeTypeRawAudio:
return nil

default:
return errors.ErrNotSupported(string(p.AudioOutCodec))
return errors.ErrNotSupported(string(b.conf.AudioOutCodec))
}
}

func addAudioConverter(b *gstreamer.Bin, p *config.PipelineConfig) error {
audioQueue, err := gstreamer.BuildQueue("audio_input_queue", p.Latency, true)
if err != nil {
return err
}

audioConvert, err := gst.NewElement("audioconvert")
if err != nil {
return errors.ErrGstPipelineError(err)
}

audioResample, err := gst.NewElement("audioresample")
if err != nil {
return errors.ErrGstPipelineError(err)
}

capsFilter, err := newAudioCapsFilter(p)
if err != nil {
return err
}

return b.AddElements(audioQueue, audioConvert, audioResample, capsFilter)
}

func newAudioCapsFilter(p *config.PipelineConfig) (*gst.Element, error) {
var caps *gst.Caps
switch p.AudioOutCodec {
Expand Down
Loading

0 comments on commit 60762ef

Please sign in to comment.