From 5387da14adb86874521ad2fc1953342629e96bcf Mon Sep 17 00:00:00 2001 From: David Colburn Date: Wed, 16 Aug 2023 13:31:51 -0700 Subject: [PATCH] continue splitting up participant composite --- build/test/entrypoint.sh | 4 ++-- pkg/config/service.go | 5 ++++- pkg/errors/errors.go | 6 +++++- pkg/pipeline/input/bin.go | 12 ++++++++++-- pkg/pipeline/pipeline.go | 1 - pkg/pipeline/source/sdk.go | 2 +- test/ffprobe.go | 2 +- test/track_composite.go | 3 ++- 8 files changed, 25 insertions(+), 10 deletions(-) diff --git a/build/test/entrypoint.sh b/build/test/entrypoint.sh index 0314e373..2cfc9b1d 100755 --- a/build/test/entrypoint.sh +++ b/build/test/entrypoint.sh @@ -24,8 +24,8 @@ pulseaudio -D --verbose --exit-idle-time=-1 --disallow-exit # Run tests if [[ -z ${GITHUB_WORKFLOW+x} ]]; then - exec ./test.test -test.v -test.timeout 20m + exec ./test.test -test.v -test.timeout 30m else go install github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest - exec go tool test2json -p egress ./test.test -test.v -test.timeout 20m 2>&1 | "$HOME"/go/bin/gotestfmt + exec go tool test2json -p egress ./test.test -test.v -test.timeout 30m 2>&1 | "$HOME"/go/bin/gotestfmt fi diff --git a/pkg/config/service.go b/pkg/config/service.go index 757a55e1..04a68829 100644 --- a/pkg/config/service.go +++ b/pkg/config/service.go @@ -21,6 +21,7 @@ import ( "gopkg.in/yaml.v3" "github.com/livekit/egress/pkg/errors" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" ) @@ -57,10 +58,12 @@ type CPUCostConfig struct { func NewServiceConfig(confString string) (*ServiceConfig, error) { conf := &ServiceConfig{ BaseConfig: BaseConfig{ + Logging: logger.Config{ + Level: "info", + }, ApiKey: os.Getenv("LIVEKIT_API_KEY"), ApiSecret: os.Getenv("LIVEKIT_API_SECRET"), WsUrl: os.Getenv("LIVEKIT_WS_URL"), - LogLevel: "info", }, TemplatePort: defaultTemplatePort, } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 6cc46756..7b4af40d 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -31,7 +31,7 @@ var ( ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs") ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs") ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU") - ErrInvalidTrack = psrpc.NewErrorf(psrpc.Internal, "unexpected track type") + ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track") ) func New(err string) error { @@ -96,6 +96,10 @@ func ErrTrackNotFound(trackID string) error { return psrpc.NewErrorf(psrpc.NotFound, "track %s not found", trackID) } +func ErrParticipantNotFound(identity string) error { + return psrpc.NewErrorf(psrpc.NotFound, "participant %s not found", identity) +} + func ErrPadLinkFailed(src, sink, status string) error { return psrpc.NewErrorf(psrpc.Internal, "failed to link %s to %s: %s", src, sink, status) } diff --git a/pkg/pipeline/input/bin.go b/pkg/pipeline/input/bin.go index ae2fff0a..2b53b661 100644 --- a/pkg/pipeline/input/bin.go +++ b/pkg/pipeline/input/bin.go @@ -39,6 +39,8 @@ func New(ctx context.Context, pipeline *gst.Pipeline, p *config.PipelineConfig) b := &Bin{ bin: gst.NewBin("input"), } + + // build input if p.AudioEnabled { if err := b.buildAudioInput(p); err != nil { return nil, err @@ -49,6 +51,8 @@ func New(ctx context.Context, pipeline *gst.Pipeline, p *config.PipelineConfig) return nil, err } } + + // add bin to pipeline if err := pipeline.Add(b.bin.Element); err != nil { return nil, errors.ErrGstPipelineError(err) } @@ -59,6 +63,7 @@ func New(ctx context.Context, pipeline *gst.Pipeline, p *config.PipelineConfig) func (b *Bin) buildAudioInput(p *config.PipelineConfig) error { a := &audioInput{} + // build input switch p.SourceType { case types.SourceTypeSDK: if err := a.buildSDKInput(p); err != nil { @@ -71,13 +76,14 @@ func (b *Bin) buildAudioInput(p *config.PipelineConfig) error { } } + // build encoder if p.AudioTranscoding { if err := a.buildEncoder(p); err != nil { return err } } - // Add elements to bin + // add elements to bin if err := b.bin.AddMany(a.src...); err != nil { return errors.ErrGstPipelineError(err) } @@ -100,6 +106,7 @@ func (b *Bin) buildAudioInput(p *config.PipelineConfig) error { func (b *Bin) buildVideoInput(p *config.PipelineConfig) error { v := &videoInput{} + // build input switch p.SourceType { case types.SourceTypeSDK: if err := v.buildSDKInput(p); err != nil { @@ -112,13 +119,14 @@ func (b *Bin) buildVideoInput(p *config.PipelineConfig) error { } } + // build encoder if p.VideoTranscoding { if err := v.buildEncoder(p); err != nil { return err } } - // Add elements to bin + // add elements to bin if err := b.bin.AddMany(v.src...); err != nil { return errors.ErrGstPipelineError(err) } diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index c922fd74..a9fccade 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -14,7 +14,6 @@ package pipeline -import "C" import ( "context" "sync" diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index ca4a0024..2f2ba2dc 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -234,7 +234,7 @@ func (s *SDKSource) subscribe(track lksdk.TrackPublication) error { return pub.SetSubscribed(true) } - return errors.ErrInvalidTrack + return errors.ErrSubscriptionFailed } // ----- Callbacks ----- diff --git a/test/ffprobe.go b/test/ffprobe.go index ed5abab7..d10a4eab 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -247,7 +247,7 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre d, err := strconv.ParseFloat(frac[1], 64) require.NoError(t, err) require.NotZero(t, d) - require.Less(t, n/d, float64(p.Framerate)*1.05) + require.Less(t, n/d, float64(p.Framerate)*1.5) require.Greater(t, n/d, float64(sourceFramerate)*0.8) } diff --git a/test/track_composite.go b/test/track_composite.go index 9eb4fd92..90c9ea92 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -37,7 +37,8 @@ func (r *Runner) testTrackComposite(t *testing.T) { r.testTrackCompositeMulti(t) } -func (r *Runner) runTrackTest(t *testing.T, name string, audioCodec, videoCodec types.MimeType, +func (r *Runner) runTrackTest( + t *testing.T, name string, audioCodec, videoCodec types.MimeType, f func(t *testing.T, audioTrackID, videoTrackID string), ) { t.Run(name, func(t *testing.T) {