diff --git a/.github/workflows/test-integration.yaml b/.github/workflows/test-integration.yaml index 013764bb..81b2aab1 100644 --- a/.github/workflows/test-integration.yaml +++ b/.github/workflows/test-integration.yaml @@ -81,8 +81,9 @@ jobs: test: needs: build strategy: + fail-fast: false matrix: - integration_type: [room, web, participant, track_composite, track, edge] + integration_type: [file, stream, segments, images, multi, edge] runs-on: buildjet-8vcpu-ubuntu-2204 steps: - uses: shogo82148/actions-setup-redis@v1 diff --git a/test/builder.go b/test/builder.go new file mode 100644 index 00000000..b934e78a --- /dev/null +++ b/test/builder.go @@ -0,0 +1,289 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "path" + "testing" + "time" + + "github.com/livekit/egress/pkg/types" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/utils" +) + +const webUrl = "https://storage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" + +type testCase struct { + name string + requestType types.RequestType + + publishOptions + + // encoding options + encodingOptions *livekit.EncodingOptions + encodingPreset livekit.EncodingOptionsPreset + + *fileOptions + *streamOptions + *segmentOptions + *imageOptions + + multi bool + custom func(*testing.T, *testCase) +} + +type publishOptions struct { + audioCodec types.MimeType + audioDelay time.Duration + audioUnpublish time.Duration + audioRepublish time.Duration + audioOnly bool + audioTrackID string + + videoCodec types.MimeType + videoDelay time.Duration + videoUnpublish time.Duration + videoRepublish time.Duration + videoOnly bool + videoTrackID string +} + +type fileOptions struct { + filename string + fileType livekit.EncodedFileType + outputType types.OutputType +} + +type streamOptions struct { + streamUrls []string + rawFileName string + websocketUrl string + outputType types.OutputType +} + +type segmentOptions struct { + prefix string + playlist string + livePlaylist string + suffix livekit.SegmentedFileSuffix +} + +type imageOptions struct { + prefix string + suffix livekit.ImageFileSuffix +} + +func (r *Runner) build(test *testCase) *rpc.StartEgressRequest { + switch test.requestType { + case types.RequestTypeRoomComposite: + room := &livekit.RoomCompositeEgressRequest{ + RoomName: r.RoomName, + Layout: "speaker", + AudioOnly: test.audioOnly, + VideoOnly: test.videoOnly, + } + if test.encodingOptions != nil { + room.Options = &livekit.RoomCompositeEgressRequest_Advanced{ + Advanced: test.encodingOptions, + } + } else if test.encodingPreset != 0 { + room.Options = &livekit.RoomCompositeEgressRequest_Preset{ + Preset: test.encodingPreset, + } + } + if test.fileOptions != nil { + room.FileOutputs = r.buildFileOutputs(test.fileOptions) + } + if test.streamOptions != nil { + room.StreamOutputs = r.buildStreamOutputs(test.streamOptions) + } + if test.segmentOptions != nil { + room.SegmentOutputs = r.buildSegmentOutputs(test.segmentOptions) + } + if test.imageOptions != nil { + room.ImageOutputs = r.buildImageOutputs(test.imageOptions) + } + return &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_RoomComposite{RoomComposite: room}, + } + + case types.RequestTypeWeb: + web := &livekit.WebEgressRequest{ + Url: webUrl, + AudioOnly: test.audioOnly, + VideoOnly: test.videoOnly, + } + if test.encodingOptions != nil { + web.Options = &livekit.WebEgressRequest_Advanced{ + Advanced: test.encodingOptions, + } + } else if test.encodingPreset != 0 { + web.Options = &livekit.WebEgressRequest_Preset{ + Preset: test.encodingPreset, + } + } + if test.fileOptions != nil { + web.FileOutputs = r.buildFileOutputs(test.fileOptions) + } + if test.streamOptions != nil { + web.StreamOutputs = r.buildStreamOutputs(test.streamOptions) + } + if test.segmentOptions != nil { + web.SegmentOutputs = r.buildSegmentOutputs(test.segmentOptions) + } + if test.imageOptions != nil { + web.ImageOutputs = r.buildImageOutputs(test.imageOptions) + } + return &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Web{Web: web}, + } + + case types.RequestTypeParticipant: + participant := &livekit.ParticipantEgressRequest{ + RoomName: r.RoomName, + Identity: r.room.LocalParticipant.Identity(), + } + if test.encodingOptions != nil { + participant.Options = &livekit.ParticipantEgressRequest_Advanced{ + Advanced: test.encodingOptions, + } + } else if test.encodingPreset != 0 { + participant.Options = &livekit.ParticipantEgressRequest_Preset{ + Preset: test.encodingPreset, + } + } + if test.fileOptions != nil { + participant.FileOutputs = r.buildFileOutputs(test.fileOptions) + } + if test.streamOptions != nil { + participant.StreamOutputs = r.buildStreamOutputs(test.streamOptions) + } + if test.segmentOptions != nil { + participant.SegmentOutputs = r.buildSegmentOutputs(test.segmentOptions) + } + if test.imageOptions != nil { + participant.ImageOutputs = r.buildImageOutputs(test.imageOptions) + } + return &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Participant{Participant: participant}, + } + + case types.RequestTypeTrackComposite: + trackComposite := &livekit.TrackCompositeEgressRequest{ + RoomName: r.RoomName, + AudioTrackId: test.audioTrackID, + VideoTrackId: test.videoTrackID, + } + if test.encodingOptions != nil { + trackComposite.Options = &livekit.TrackCompositeEgressRequest_Advanced{ + Advanced: test.encodingOptions, + } + } else if test.encodingPreset != 0 { + trackComposite.Options = &livekit.TrackCompositeEgressRequest_Preset{ + Preset: test.encodingPreset, + } + } + if test.fileOptions != nil { + trackComposite.FileOutputs = r.buildFileOutputs(test.fileOptions) + } + if test.streamOptions != nil { + trackComposite.StreamOutputs = r.buildStreamOutputs(test.streamOptions) + } + if test.segmentOptions != nil { + trackComposite.SegmentOutputs = r.buildSegmentOutputs(test.segmentOptions) + } + if test.imageOptions != nil { + trackComposite.ImageOutputs = r.buildImageOutputs(test.imageOptions) + } + return &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_TrackComposite{TrackComposite: trackComposite}, + } + + case types.RequestTypeTrack: + trackID := test.audioTrackID + if trackID == "" { + trackID = test.videoTrackID + } + track := &livekit.TrackEgressRequest{ + RoomName: r.RoomName, + TrackId: trackID, + } + if test.fileOptions != nil { + track.Output = &livekit.TrackEgressRequest_File{ + File: &livekit.DirectFileOutput{ + Filepath: path.Join(r.FilePrefix, test.fileOptions.filename), + }, + } + } else if test.streamOptions != nil { + track.Output = &livekit.TrackEgressRequest_WebsocketUrl{ + WebsocketUrl: test.streamOptions.websocketUrl, + } + } + return &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Track{Track: track}, + } + } + + panic("unknown request type") +} + +func (r *Runner) buildFileOutputs(o *fileOptions) []*livekit.EncodedFileOutput { + return []*livekit.EncodedFileOutput{{ + FileType: o.fileType, + Filepath: path.Join(r.FilePrefix, o.filename), + }} +} + +func (r *Runner) buildStreamOutputs(o *streamOptions) []*livekit.StreamOutput { + var protocol livekit.StreamProtocol + switch o.outputType { + case types.OutputTypeRTMP: + protocol = livekit.StreamProtocol_RTMP + case types.OutputTypeSRT: + protocol = livekit.StreamProtocol_SRT + default: + protocol = livekit.StreamProtocol_DEFAULT_PROTOCOL + } + return []*livekit.StreamOutput{{ + Protocol: protocol, + Urls: o.streamUrls, + }} +} + +func (r *Runner) buildSegmentOutputs(o *segmentOptions) []*livekit.SegmentedFileOutput { + return []*livekit.SegmentedFileOutput{{ + FilenamePrefix: path.Join(r.FilePrefix, o.prefix), + PlaylistName: o.playlist, + LivePlaylistName: o.livePlaylist, + FilenameSuffix: o.suffix, + }} +} + +func (r *Runner) buildImageOutputs(o *imageOptions) []*livekit.ImageOutput { + return []*livekit.ImageOutput{{ + CaptureInterval: 5, + Width: 1280, + Height: 720, + FilenamePrefix: path.Join(r.FilePrefix, o.prefix), + FilenameSuffix: o.suffix, + }} +} diff --git a/test/edge.go b/test/edge.go index 8360d4c4..70e6438c 100644 --- a/test/edge.go +++ b/test/edge.go @@ -16,7 +16,6 @@ package test import ( "context" - "path" "testing" "time" @@ -25,8 +24,6 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" - "github.com/livekit/protocol/utils" lksdk "github.com/livekit/server-sdk-go/v2" ) @@ -36,263 +33,235 @@ func (r *Runner) testEdgeCases(t *testing.T) { } t.Run("EdgeCases", func(t *testing.T) { - r.testParticipantNoPublish(t) - r.testRoomCompositeStaysOpen(t) - r.testRtmpFailure(t) - r.testSrtFailure(t) - r.testTrackDisconnection(t) - r.testEmptyStreamBin(t) - }) -} + for _, test := range []*testCase{ -// ParticipantComposite where the participant never publishes -func (r *Runner) testParticipantNoPublish(t *testing.T) { - r.runParticipantTest(t, "ParticipantNoPublish", &testCase{}, - func(t *testing.T, identity string) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Participant{ - Participant: &livekit.ParticipantEgressRequest{ - RoomName: r.room.Name(), - Identity: identity, - FileOutputs: []*livekit.EncodedFileOutput{{ - FileType: livekit.EncodedFileType_MP4, - }}, - }, + // ParticipantComposite where the participant never publishes + + { + name: "ParticipantNoPublish", + requestType: types.RequestTypeParticipant, + fileOptions: &fileOptions{ + filename: "participant_no_publish_{time}.mp4", }, - } + custom: r.testParticipantNoPublish, + }, - info := r.sendRequest(t, req) - time.Sleep(time.Second * 15) - r.room.Disconnect() - time.Sleep(time.Second * 30) - info = r.getUpdate(t, info.EgressId) - require.Equal(t, livekit.EgressStatus_EGRESS_ABORTED.String(), info.Status.String()) - - // reconnect the publisher to the room - room, err := lksdk.ConnectToRoom(r.WsUrl, lksdk.ConnectInfo{ - APIKey: r.ApiKey, - APISecret: r.ApiSecret, - RoomName: r.RoomName, - ParticipantName: "egress-sample", - ParticipantIdentity: identity, - }, lksdk.NewRoomCallback()) - require.NoError(t, err) - r.room = room - }, - ) -} + // Test that the egress continues if a user leaves -// Test that the egress continues if a user leaves -func (r *Runner) testRoomCompositeStaysOpen(t *testing.T) { - r.run(t, "RoomCompositeStaysOpen", func(t *testing.T) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: &livekit.RoomCompositeEgressRequest{ - RoomName: r.RoomName, - Layout: "speaker", - FileOutputs: []*livekit.EncodedFileOutput{{ - Filepath: path.Join(r.FilePrefix, "room_composite_duration_{time}.mp4"), - }}, + { + name: "RoomCompositeStaysOpen", + requestType: types.RequestTypeRoomComposite, + fileOptions: &fileOptions{ + filename: "room_composite_stays_open_{time}.mp4", }, + custom: r.testRoomCompositeStaysOpen, }, - } - info := r.sendRequest(t, req) - time.Sleep(time.Second * 10) - identity := r.room.LocalParticipant.Identity() - r.room.Disconnect() - time.Sleep(time.Second * 10) - - // reconnect the publisher to the room - room, err := lksdk.ConnectToRoom(r.WsUrl, lksdk.ConnectInfo{ - APIKey: r.ApiKey, - APISecret: r.ApiSecret, - RoomName: r.RoomName, - ParticipantName: "egress-sample", - ParticipantIdentity: identity, - }, lksdk.NewRoomCallback()) - require.NoError(t, err) - r.room = room - - r.publishSamples(t, types.MimeTypeOpus, types.MimeTypeVP8) - time.Sleep(time.Second * 10) - - r.checkUpdate(t, info.EgressId, livekit.EgressStatus_EGRESS_ACTIVE) - r.stopEgress(t, info.EgressId) - }) -} + // RTMP output with no valid urls -// RTMP output with no valid urls -func (r *Runner) testRtmpFailure(t *testing.T) { - r.runRoomTest(t, "RtmpFailure", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: &livekit.RoomCompositeEgressRequest{ - RoomName: r.RoomName, - Layout: "speaker-light", - StreamOutputs: []*livekit.StreamOutput{{ - Protocol: livekit.StreamProtocol_RTMP, - Urls: []string{badRtmpUrl1}, - }}, + { + name: "RtmpFailure", + requestType: types.RequestTypeRoomComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, }, + streamOptions: &streamOptions{ + streamUrls: []string{badRtmpUrl1}, + outputType: types.OutputTypeRTMP, + }, + custom: r.testRtmpFailure, }, - } - info, err := r.StartEgress(context.Background(), req) - require.NoError(t, err) - require.Empty(t, info.Error) - require.NotEmpty(t, info.EgressId) - require.Equal(t, r.RoomName, info.RoomName) - require.Equal(t, livekit.EgressStatus_EGRESS_STARTING, info.Status) + // SRT output with no valid urls - // check updates - time.Sleep(time.Second * 5) - info = r.getUpdate(t, info.EgressId) - streamFailed := false - for info.Status == livekit.EgressStatus_EGRESS_ACTIVE { - if !streamFailed && info.StreamResults[0].Status == livekit.StreamInfo_FAILED { - streamFailed = true - } - if streamFailed { - // make sure this never reverts in subsequent updates - require.Equal(t, livekit.StreamInfo_FAILED, info.StreamResults[0].Status) - } - info = r.getUpdate(t, info.EgressId) - } + { + name: "SrtFailure", + requestType: types.RequestTypeWeb, + streamOptions: &streamOptions{ + streamUrls: []string{badSrtUrl1}, + outputType: types.OutputTypeSRT, + }, + custom: r.testSrtFailure, + }, - require.Equal(t, livekit.EgressStatus_EGRESS_FAILED, info.Status) - require.NotEmpty(t, info.Error) - require.Equal(t, livekit.StreamInfo_FAILED, info.StreamResults[0].Status) - require.NotEmpty(t, info.StreamResults[0].Error) - }) -} + // Track composite with data loss due to a disconnection -// SRT output with a no valid urls -func (r *Runner) testSrtFailure(t *testing.T) { - r.run(t, "SrtFailure", func(t *testing.T) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Web{ - Web: &livekit.WebEgressRequest{ - Url: webUrl, - StreamOutputs: []*livekit.StreamOutput{{ - Protocol: livekit.StreamProtocol_SRT, - Urls: []string{badSrtUrl1}, - }}, + { + name: "TrackDisconnection", + requestType: types.RequestTypeTrackComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + }, + fileOptions: &fileOptions{ + filename: "track_disconnection_{time}.mp4", + fileType: livekit.EncodedFileType_MP4, }, + custom: r.testTrackDisconnection, }, - } - info, err := r.StartEgress(context.Background(), req) - require.NoError(t, err) - require.Empty(t, info.Error) - require.NotEmpty(t, info.EgressId) - require.Equal(t, livekit.EgressStatus_EGRESS_STARTING, info.Status) + // Stream output with no urls - // check update - time.Sleep(time.Second * 5) - info = r.getUpdate(t, info.EgressId) - if info.Status == livekit.EgressStatus_EGRESS_ACTIVE { - r.checkUpdate(t, info.EgressId, livekit.EgressStatus_EGRESS_FAILED) - } else { - require.Equal(t, livekit.EgressStatus_EGRESS_FAILED, info.Status) + { + name: "EmptyStreamBin", + requestType: types.RequestTypeRoomComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + }, + streamOptions: &streamOptions{ + streamUrls: []string{rtmpUrl1, badRtmpUrl1}, + outputType: types.OutputTypeRTMP, + }, + segmentOptions: &segmentOptions{ + prefix: "empty_stream_{time}", + playlist: "empty_stream_{time}", + }, + custom: r.testEmptyStreamBin, + }, + } { + r.run(t, test, test.custom) + if r.Short { + return + } } }) - } -// Track composite with data loss due to a disconnection -func (r *Runner) testTrackDisconnection(t *testing.T) { - r.run(t, "TrackDisconnection", func(t *testing.T) { - test := &testCase{ - fileType: livekit.EncodedFileType_MP4, - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeVP8, - filename: "track_disconnection_{time}.mp4", - } +func (r *Runner) testParticipantNoPublish(t *testing.T, test *testCase) { + identity := r.room.LocalParticipant.Identity() + + req := r.build(test) + + info := r.sendRequest(t, req) + time.Sleep(time.Second * 15) + r.room.Disconnect() + time.Sleep(time.Second * 30) + info = r.getUpdate(t, info.EgressId) + require.Equal(t, livekit.EgressStatus_EGRESS_ABORTED.String(), info.Status.String()) + + // reconnect the publisher to the room + room, err := lksdk.ConnectToRoom(r.WsUrl, lksdk.ConnectInfo{ + APIKey: r.ApiKey, + APISecret: r.ApiSecret, + RoomName: r.RoomName, + ParticipantName: "egress-sample", + ParticipantIdentity: identity, + }, lksdk.NewRoomCallback()) + require.NoError(t, err) + r.room = room +} - audioTrackID := r.publishSample(t, test.audioCodec, false) - videoTrackID := r.publishSampleWithDisconnection(t, test.videoCodec) +func (r *Runner) testRoomCompositeStaysOpen(t *testing.T, test *testCase) { + req := r.build(test) + + info := r.sendRequest(t, req) + time.Sleep(time.Second * 10) + identity := r.room.LocalParticipant.Identity() + r.room.Disconnect() + time.Sleep(time.Second * 10) + + // reconnect the publisher to the room + room, err := lksdk.ConnectToRoom(r.WsUrl, lksdk.ConnectInfo{ + APIKey: r.ApiKey, + APISecret: r.ApiSecret, + RoomName: r.RoomName, + ParticipantName: "egress-sample", + ParticipantIdentity: identity, + }, lksdk.NewRoomCallback()) + require.NoError(t, err) + r.room = room + + r.publishSample(t, types.MimeTypeOpus, 0, 0, false) + r.publishSample(t, types.MimeTypeVP8, 0, 0, false) + + time.Sleep(time.Second * 10) + + r.checkUpdate(t, info.EgressId, livekit.EgressStatus_EGRESS_ACTIVE) + r.stopEgress(t, info.EgressId) +} - var fileOutput *livekit.EncodedFileOutput - if r.AzureUpload != nil { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(uploadPrefix, test.filename), - Output: &livekit.EncodedFileOutput_Azure{ - Azure: r.AzureUpload, - }, - } - } else { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(r.FilePrefix, test.filename), - } +func (r *Runner) testRtmpFailure(t *testing.T, test *testCase) { + req := r.build(test) + + info, err := r.StartEgress(context.Background(), req) + require.NoError(t, err) + require.Empty(t, info.Error) + require.NotEmpty(t, info.EgressId) + require.Equal(t, r.RoomName, info.RoomName) + require.Equal(t, livekit.EgressStatus_EGRESS_STARTING, info.Status) + + // check updates + time.Sleep(time.Second * 5) + info = r.getUpdate(t, info.EgressId) + streamFailed := false + for info.Status == livekit.EgressStatus_EGRESS_ACTIVE { + if !streamFailed && info.StreamResults[0].Status == livekit.StreamInfo_FAILED { + streamFailed = true } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_TrackComposite{ - TrackComposite: &livekit.TrackCompositeEgressRequest{ - RoomName: r.room.Name(), - AudioTrackId: audioTrackID, - VideoTrackId: videoTrackID, - FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, - }, - }, + if streamFailed { + // make sure this never reverts in subsequent updates + require.Equal(t, livekit.StreamInfo_FAILED, info.StreamResults[0].Status) } + info = r.getUpdate(t, info.EgressId) + } - test.expectVideoEncoding = true - r.runFileTest(t, req, test) - }) + require.Equal(t, livekit.EgressStatus_EGRESS_FAILED, info.Status) + require.NotEmpty(t, info.Error) + require.Equal(t, livekit.StreamInfo_FAILED, info.StreamResults[0].Status) + require.NotEmpty(t, info.StreamResults[0].Error) } -func (r *Runner) testEmptyStreamBin(t *testing.T) { - r.runRoomTest(t, "Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: &livekit.RoomCompositeEgressRequest{ - RoomName: r.room.Name(), - Layout: "grid-light", - StreamOutputs: []*livekit.StreamOutput{{ - Urls: []string{rtmpUrl1, badRtmpUrl1}, - }}, - SegmentOutputs: []*livekit.SegmentedFileOutput{{ - FilenamePrefix: path.Join(r.FilePrefix, "empty_stream_{time}"), - PlaylistName: "empty_stream_{time}", - }}, - }, - }, - } +func (r *Runner) testSrtFailure(t *testing.T, test *testCase) { + req := r.build(test) + + info, err := r.StartEgress(context.Background(), req) + require.NoError(t, err) + require.Empty(t, info.Error) + require.NotEmpty(t, info.EgressId) + require.Equal(t, livekit.EgressStatus_EGRESS_STARTING, info.Status) + + // check update + time.Sleep(time.Second * 5) + info = r.getUpdate(t, info.EgressId) + if info.Status == livekit.EgressStatus_EGRESS_ACTIVE { + r.checkUpdate(t, info.EgressId, livekit.EgressStatus_EGRESS_FAILED) + } else { + require.Equal(t, livekit.EgressStatus_EGRESS_FAILED, info.Status) + } +} + +func (r *Runner) testTrackDisconnection(t *testing.T, test *testCase) { + test.videoTrackID = r.publishSampleWithDisconnection(t, types.MimeTypeVP8) + r.runFileTest(t, test) +} - info := r.sendRequest(t, req) - egressID := info.EgressId - time.Sleep(time.Second * 15) - - // get params - p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) - require.NoError(t, err) - - r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - rtmpUrl1Redacted: livekit.StreamInfo_ACTIVE, - badRtmpUrl1Redacted: livekit.StreamInfo_FAILED, - }) - _, err = r.client.UpdateStream(context.Background(), egressID, &livekit.UpdateStreamRequest{ - EgressId: egressID, - RemoveOutputUrls: []string{rtmpUrl1}, - }) - require.NoError(t, err) - r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - rtmpUrl1Redacted: livekit.StreamInfo_FINISHED, - badRtmpUrl1Redacted: livekit.StreamInfo_FAILED, - }) - - time.Sleep(time.Second * 10) - res := r.stopEgress(t, egressID) - r.verifySegments(t, p, livekit.SegmentedFileSuffix_INDEX, res, false) +func (r *Runner) testEmptyStreamBin(t *testing.T, test *testCase) { + req := r.build(test) + + info := r.sendRequest(t, req) + egressID := info.EgressId + time.Sleep(time.Second * 15) + + // get params + p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) + require.NoError(t, err) + + r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ + rtmpUrl1Redacted: livekit.StreamInfo_ACTIVE, + badRtmpUrl1Redacted: livekit.StreamInfo_FAILED, }) + _, err = r.client.UpdateStream(context.Background(), egressID, &livekit.UpdateStreamRequest{ + EgressId: egressID, + RemoveOutputUrls: []string{rtmpUrl1}, + }) + require.NoError(t, err) + r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ + rtmpUrl1Redacted: livekit.StreamInfo_FINISHED, + badRtmpUrl1Redacted: livekit.StreamInfo_FAILED, + }) + + time.Sleep(time.Second * 10) + res := r.stopEgress(t, egressID) + r.verifySegments(t, p, livekit.SegmentedFileSuffix_INDEX, res, false) } diff --git a/test/file.go b/test/file.go index b77dc496..0b0ca610 100644 --- a/test/file.go +++ b/test/file.go @@ -27,10 +27,195 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" ) -func (r *Runner) runFileTest(t *testing.T, req *rpc.StartEgressRequest, test *testCase) { +func (r *Runner) testFile(t *testing.T) { + if !r.should(runFile) { + return + } + + t.Run("File", func(t *testing.T) { + for _, test := range []*testCase{ + + // ---- Room Composite ----- + + { + name: "RoomComposite/Base", + requestType: types.RequestTypeRoomComposite, publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + }, + fileOptions: &fileOptions{ + filename: "r_{room_name}_{time}.mp4", + }, + }, + { + name: "RoomComposite/VideoOnly", + requestType: types.RequestTypeRoomComposite, publishOptions: publishOptions{ + videoCodec: types.MimeTypeH264, + videoOnly: true, + }, + encodingOptions: &livekit.EncodingOptions{ + VideoCodec: livekit.VideoCodec_H264_HIGH, + }, + fileOptions: &fileOptions{ + filename: "r_{room_name}_video_{time}.mp4", + }, + }, + { + name: "RoomComposite/AudioOnly", + requestType: types.RequestTypeRoomComposite, publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + audioOnly: true, + }, + encodingOptions: &livekit.EncodingOptions{ + AudioCodec: livekit.AudioCodec_OPUS, + }, + fileOptions: &fileOptions{ + filename: "r_{room_name}_audio_{time}", + fileType: livekit.EncodedFileType_OGG, + }, + }, + + // ---------- Web ---------- + + { + name: "Web", + publishOptions: publishOptions{ + videoOnly: true, + }, + requestType: types.RequestTypeWeb, + fileOptions: &fileOptions{ + filename: "web_{time}", + }, + }, + + // ------ Participant ------ + + { + name: "ParticipantComposite/VP8", + requestType: types.RequestTypeParticipant, publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + audioDelay: time.Second * 8, + audioUnpublish: time.Second * 14, + audioRepublish: time.Second * 20, + videoCodec: types.MimeTypeVP8, + }, + fileOptions: &fileOptions{ + filename: "participant_{publisher_identity}_vp8_{time}.mp4", + fileType: livekit.EncodedFileType_MP4, + }, + }, + { + name: "ParticipantComposite/H264", + requestType: types.RequestTypeParticipant, publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + videoUnpublish: time.Second * 10, + videoRepublish: time.Second * 20, + }, + fileOptions: &fileOptions{ + filename: "participant_{room_name}_h264_{time}.mp4", + fileType: livekit.EncodedFileType_MP4, + }, + }, + { + name: "ParticipantComposite/AudioOnly", + requestType: types.RequestTypeParticipant, publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + audioUnpublish: time.Second * 10, + audioRepublish: time.Second * 15, + }, + fileOptions: &fileOptions{ + filename: "participant_{room_name}_{time}.mp4", + fileType: livekit.EncodedFileType_MP4, + }, + }, + + // ---- Track Composite ---- + + { + name: "TrackComposite/VP8", + requestType: types.RequestTypeTrackComposite, publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + }, + fileOptions: &fileOptions{ + filename: "tc_{publisher_identity}_vp8_{time}.mp4", + fileType: livekit.EncodedFileType_MP4, + }, + }, + { + name: "TrackComposite/VideoOnly", + requestType: types.RequestTypeTrackComposite, + publishOptions: publishOptions{ + videoCodec: types.MimeTypeH264, + videoOnly: true, + }, + fileOptions: &fileOptions{ + filename: "tc_{room_name}_video_{time}.mp4", + fileType: livekit.EncodedFileType_MP4, + }, + }, + + // --------- Track --------- + + { + name: "Track/Opus", + requestType: types.RequestTypeTrack, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoOnly: true, + }, + fileOptions: &fileOptions{ + filename: "t_{track_source}_{time}.ogg", + outputType: types.OutputTypeOGG, + }, + }, + { + name: "Track/H264", + requestType: types.RequestTypeTrack, + publishOptions: publishOptions{ + videoCodec: types.MimeTypeH264, + videoOnly: true, + }, + + fileOptions: &fileOptions{ + filename: "t_{track_id}_{time}.mp4", + outputType: types.OutputTypeMP4, + }, + }, + { + name: "Track/VP8", + requestType: types.RequestTypeTrack, + publishOptions: publishOptions{ + videoCodec: types.MimeTypeVP8, + videoOnly: true, + }, + fileOptions: &fileOptions{ + filename: "t_{track_type}_{time}.webm", + outputType: types.OutputTypeWebM, + }, + }, + // { + // name: "Track/VP9", + // videoOnly: true, + // videoCodec: types.MimeTypeVP9, + // outputType: types.OutputTypeWebM, + // filename: "t_{track_type}_{time}.webm", + // }, + } { + r.run(t, test, r.runFileTest) + if r.Short { + return + } + } + }) +} + +func (r *Runner) runFileTest(t *testing.T, test *testCase) { + req := r.build(test) + // start egressID := r.startEgress(t, req) @@ -47,10 +232,10 @@ func (r *Runner) runFileTest(t *testing.T, req *rpc.StartEgressRequest, test *te p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) require.NoError(t, err) if p.GetFileConfig().OutputType == types.OutputTypeUnknownFile { - p.GetFileConfig().OutputType = test.outputType + p.GetFileConfig().OutputType = test.fileOptions.outputType } - require.Equal(t, test.expectVideoEncoding, p.VideoEncoding) + require.Equal(t, test.requestType != types.RequestTypeTrack && !test.audioOnly, p.VideoEncoding) // verify r.verifyFile(t, p, res) diff --git a/test/flags.go b/test/flags.go index 87cd40c8..ea0b620d 100644 --- a/test/flags.go +++ b/test/flags.go @@ -14,23 +14,35 @@ package test +import "github.com/livekit/egress/pkg/types" + const ( runRoom = 0b1 << 0 runWeb = 0b1 << 1 runParticipant = 0b1 << 2 runTrackComposite = 0b1 << 3 runTrack = 0b1 << 4 - runEdge = 0b1 << 5 - runAllRequests = 0b111111 - runFile = 0b1 << 31 - runStream = 0b1 << 30 - runSegments = 0b1 << 29 - runImages = 0b1 << 28 - runMulti = 0b1 << 27 - runAllOutputs = 0b11111 << 27 + runAllRequests = 0b11111 + + runFile = 0b1 << 31 + runStream = 0b1 << 30 + runSegments = 0b1 << 29 + runImages = 0b1 << 28 + runMulti = 0b1 << 27 + runEdge = 0b1 << 26 + + runAllOutputs = 0b111111 << 26 ) +var runRequestType = map[types.RequestType]uint{ + types.RequestTypeRoomComposite: runRoom, + types.RequestTypeWeb: runWeb, + types.RequestTypeParticipant: runParticipant, + types.RequestTypeTrackComposite: runTrackComposite, + types.RequestTypeTrack: runTrack, +} + func (r *Runner) updateFlagset() { switch { case r.RoomTestsOnly: @@ -43,8 +55,6 @@ func (r *Runner) updateFlagset() { r.shouldRun |= runTrackComposite case r.TrackTestsOnly: r.shouldRun |= runTrack - case r.EdgeCasesOnly: - r.shouldRun |= runEdge default: r.shouldRun |= runAllRequests } @@ -60,6 +70,8 @@ func (r *Runner) updateFlagset() { r.shouldRun |= runImages case r.MultiTestsOnly: r.shouldRun |= runMulti + case r.EdgeCasesOnly: + r.shouldRun |= runEdge default: r.shouldRun |= runAllOutputs } diff --git a/test/images.go b/test/images.go index f93dace8..a60c5f37 100644 --- a/test/images.go +++ b/test/images.go @@ -25,11 +25,62 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" ) -func (r *Runner) runImagesTest(t *testing.T, req *rpc.StartEgressRequest, test *testCase) { +func (r *Runner) testImages(t *testing.T) { + if !r.should(runImages) { + return + } + + t.Run("Images", func(t *testing.T) { + for _, test := range []*testCase{ + + // ---- Room Composite ----- + + { + name: "RoomComposite", + requestType: types.RequestTypeRoomComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + }, + encodingOptions: &livekit.EncodingOptions{ + Width: 640, + Height: 360, + }, + imageOptions: &imageOptions{ + prefix: "r_{room_name}_{time}", + suffix: livekit.ImageFileSuffix_IMAGE_SUFFIX_TIMESTAMP, + }, + }, + + // ---- Track Composite ---- + + { + name: "TrackComposite/H264", + requestType: types.RequestTypeTrackComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + }, + imageOptions: &imageOptions{ + prefix: "tc_{publisher_identity}_h264", + }, + }, + } { + r.run(t, test, r.runImagesTest) + if r.Short { + return + } + } + }) +} + +func (r *Runner) runImagesTest(t *testing.T, test *testCase) { + req := r.build(test) + egressID := r.startEgress(t, req) time.Sleep(time.Second * 10) diff --git a/test/integration.go b/test/integration.go index ab3f5e45..fe1380b1 100644 --- a/test/integration.go +++ b/test/integration.go @@ -34,58 +34,48 @@ import ( var uploadPrefix = fmt.Sprintf("integration/%s", time.Now().Format("2006-01-02")) -type testCase struct { - name string - audioOnly bool - videoOnly bool - filename string - - // used by room and track composite tests - fileType livekit.EncodedFileType - options *livekit.EncodingOptions - preset livekit.EncodingOptionsPreset - - // used by segmented file tests - playlist string - livePlaylist string - filenameSuffix livekit.SegmentedFileSuffix - - // used by images tests - imageFilenameSuffix livekit.ImageFileSuffix - - // used by sdk tests - audioCodec types.MimeType - audioDelay time.Duration - audioUnpublish time.Duration - audioRepublish time.Duration - - videoCodec types.MimeType - videoDelay time.Duration - videoUnpublish time.Duration - videoRepublish time.Duration - - // used by track and stream tests - outputType types.OutputType - - expectVideoEncoding bool -} - func (r *Runner) RunTests(t *testing.T) { // run tests - r.testRoomComposite(t) - r.testWeb(t) - r.testParticipant(t) - r.testTrackComposite(t) - r.testTrack(t) + r.testFile(t) + r.testStream(t) + r.testSegments(t) + r.testImages(t) + r.testMulti(t) r.testEdgeCases(t) } var testNumber int -func (r *Runner) run(t *testing.T, name string, f func(t *testing.T)) { +func (r *Runner) run(t *testing.T, test *testCase, f func(*testing.T, *testCase)) { + if !r.should(runRequestType[test.requestType]) { + return + } + + switch test.requestType { + case types.RequestTypeRoomComposite, types.RequestTypeWeb: + r.sourceFramerate = 30 + case types.RequestTypeParticipant, types.RequestTypeTrackComposite, types.RequestTypeTrack: + r.sourceFramerate = 23.97 + } + r.awaitIdle(t) + testNumber++ - t.Run(fmt.Sprintf("%d/%s", testNumber, name), f) + t.Run(fmt.Sprintf("%d/%s", testNumber, test.name), func(t *testing.T) { + audioMuting := r.Muting + videoMuting := r.Muting && test.audioCodec == "" + + test.audioTrackID = r.publishSample(t, test.audioCodec, test.audioDelay, test.audioUnpublish, audioMuting) + if test.audioRepublish != 0 { + r.publishSample(t, test.audioCodec, test.audioRepublish, 0, audioMuting) + } + test.videoTrackID = r.publishSample(t, test.videoCodec, test.videoDelay, test.videoUnpublish, videoMuting) + if test.videoRepublish != 0 { + r.publishSample(t, test.videoCodec, test.videoRepublish, 0, videoMuting) + } + + f(t, test) + }) } func (r *Runner) awaitIdle(t *testing.T) { @@ -154,18 +144,12 @@ func (r *Runner) checkStreamUpdate(t *testing.T, egressID string, expected map[s for _, s := range info.StreamResults { require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "") - var e livekit.StreamInfo_Status - if strings.HasSuffix(s.Url, ".contribute.live-video.net/app/{f...1}") { - e = expected[badRtmpUrl1Redacted] - } else { - e = expected[s.Url] - } - if e == livekit.StreamInfo_FAILED && s.Status == livekit.StreamInfo_ACTIVE { + if expected[s.Url] == livekit.StreamInfo_FAILED && s.Status == livekit.StreamInfo_ACTIVE { failureStillActive = true continue } - require.Equal(t, e, s.Status) + require.Equal(t, expected[s.Url], s.Status) } if !failureStillActive { diff --git a/test/multi.go b/test/multi.go index dc96087d..3dcb288a 100644 --- a/test/multi.go +++ b/test/multi.go @@ -24,16 +24,98 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" ) -func (r *Runner) runMultipleTest( - t *testing.T, - req *rpc.StartEgressRequest, - file, stream, segments, images bool, - filenameSuffix livekit.SegmentedFileSuffix, -) { +func (r *Runner) testMulti(t *testing.T) { + if !r.should(runMulti) { + return + } + + t.Run("Multi", func(t *testing.T) { + for _, test := range []*testCase{ + + // ---- Room Composite ----- + + { + name: "RoomComposite", + requestType: types.RequestTypeRoomComposite, publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + }, + fileOptions: &fileOptions{ + filename: "rc_multiple_{time}", + }, + imageOptions: &imageOptions{ + prefix: "rc_image", + }, + multi: true, + }, + + // ---------- Web ---------- + + { + name: "Web", + requestType: types.RequestTypeWeb, + fileOptions: &fileOptions{ + filename: "web_multiple_{time}", + }, + segmentOptions: &segmentOptions{ + prefix: "web_multiple_{time}", + playlist: "web_multiple_{time}.m3u8", + }, + multi: true, + }, + + // ------ Participant ------ + + { + name: "ParticipantComposite", + requestType: types.RequestTypeParticipant, publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + audioUnpublish: time.Second * 20, + videoCodec: types.MimeTypeVP8, + videoDelay: time.Second * 5, + }, + fileOptions: &fileOptions{ + filename: "participant_multiple_{time}", + }, + streamOptions: &streamOptions{ + outputType: types.OutputTypeRTMP, + }, + multi: true, + }, + + // ---- Track Composite ---- + + { + name: "TrackComposite", + requestType: types.RequestTypeTrackComposite, publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + }, + streamOptions: &streamOptions{ + outputType: types.OutputTypeRTMP, + }, + segmentOptions: &segmentOptions{ + prefix: "tc_multiple_{time}", + playlist: "tc_multiple_{time}.m3u8", + }, + multi: true, + }, + } { + r.run(t, test, r.runMultiTest) + if r.Short { + return + } + } + }) +} + +func (r *Runner) runMultiTest(t *testing.T, test *testCase) { + req := r.build(test) + egressID := r.startEgress(t, req) time.Sleep(time.Second * 10) @@ -41,7 +123,7 @@ func (r *Runner) runMultipleTest( p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) require.NoError(t, err) - if stream { + if test.streamOptions != nil { _, err = r.client.UpdateStream(context.Background(), egressID, &livekit.UpdateStreamRequest{ EgressId: egressID, AddOutputUrls: []string{rtmpUrl1}, @@ -59,13 +141,13 @@ func (r *Runner) runMultipleTest( } res := r.stopEgress(t, egressID) - if file { + if test.fileOptions != nil { r.verifyFile(t, p, res) } - if segments { - r.verifySegments(t, p, filenameSuffix, res, false) + if test.segmentOptions != nil { + r.verifySegments(t, p, test.segmentOptions.suffix, res, false) } - if images { + if test.imageOptions != nil { r.verifyImages(t, p, res) } } diff --git a/test/participant.go b/test/participant.go deleted file mode 100644 index 0b17c8e7..00000000 --- a/test/participant.go +++ /dev/null @@ -1,280 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build integration - -package test - -import ( - "path" - "testing" - "time" - - "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" - "github.com/livekit/protocol/utils" -) - -func (r *Runner) testParticipant(t *testing.T) { - if !r.should(runParticipant) { - return - } - - r.sourceFramerate = 23.97 - t.Run("Participant", func(t *testing.T) { - r.testParticipantFile(t) - r.testParticipantStream(t) - r.testParticipantSegments(t) - r.testParticipantMulti(t) - }) -} - -func (r *Runner) runParticipantTest( - t *testing.T, name string, test *testCase, - f func(t *testing.T, identity string), -) { - r.run(t, name, func(t *testing.T) { - r.publishSampleOffset(t, test.audioCodec, test.audioDelay, test.audioUnpublish) - if test.audioRepublish != 0 { - r.publishSampleOffset(t, test.audioCodec, test.audioRepublish, 0) - } - r.publishSampleOffset(t, test.videoCodec, test.videoDelay, test.videoUnpublish) - if test.videoRepublish != 0 { - r.publishSampleOffset(t, test.videoCodec, test.videoRepublish, 0) - } - f(t, r.room.LocalParticipant.Identity()) - }) -} - -func (r *Runner) testParticipantFile(t *testing.T) { - if !r.should(runFile) { - return - } - - for _, test := range []*testCase{ - { - name: "File/VP8", - fileType: livekit.EncodedFileType_MP4, - audioCodec: types.MimeTypeOpus, - audioDelay: time.Second * 8, - audioUnpublish: time.Second * 14, - audioRepublish: time.Second * 20, - videoCodec: types.MimeTypeVP8, - filename: "participant_{publisher_identity}_vp8_{time}.mp4", - }, - { - name: "File/H264", - fileType: livekit.EncodedFileType_MP4, - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeH264, - videoUnpublish: time.Second * 10, - videoRepublish: time.Second * 20, - filename: "participant_{room_name}_h264_{time}.mp4", - }, - { - name: "File/AudioOnly", - fileType: livekit.EncodedFileType_MP4, - audioCodec: types.MimeTypeOpus, - audioUnpublish: time.Second * 10, - audioRepublish: time.Second * 15, - filename: "participant_{room_name}_{time}.mp4", - }, - } { - r.runParticipantTest(t, test.name, test, func(t *testing.T, identity string) { - var fileOutput *livekit.EncodedFileOutput - if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.AzureUpload != nil { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(uploadPrefix, test.filename), - Output: &livekit.EncodedFileOutput_Azure{ - Azure: r.AzureUpload, - }, - } - } else { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(r.FilePrefix, test.filename), - } - } - - participantRequest := &livekit.ParticipantEgressRequest{ - RoomName: r.room.Name(), - Identity: identity, - FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, - } - if test.options != nil { - participantRequest.Options = &livekit.ParticipantEgressRequest_Advanced{ - Advanced: test.options, - } - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Participant{ - Participant: participantRequest, - }, - } - - test.expectVideoEncoding = true - r.runFileTest(t, req, test) - }) - if r.Short { - return - } - } -} - -func (r *Runner) testParticipantStream(t *testing.T) { - if !r.should(runStream) { - return - } - - test := &testCase{ - audioCodec: types.MimeTypeOpus, - audioDelay: time.Second * 8, - videoCodec: types.MimeTypeVP8, - } - - r.runParticipantTest(t, "Stream", test, - func(t *testing.T, identity string) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Participant{ - Participant: &livekit.ParticipantEgressRequest{ - RoomName: r.room.Name(), - Identity: identity, - StreamOutputs: []*livekit.StreamOutput{{ - Urls: []string{rtmpUrl1, badRtmpUrl1}, - }}, - }, - }, - } - - r.runStreamTest(t, req, &testCase{ - expectVideoEncoding: true, - outputType: types.OutputTypeRTMP, - }) - }, - ) -} - -func (r *Runner) testParticipantSegments(t *testing.T) { - if !r.should(runSegments) { - return - } - - for _, test := range []*testCase{ - { - name: "Segments/VP8", - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeVP8, - // videoDelay: time.Second * 10, - // videoUnpublish: time.Second * 20, - filename: "participant_{publisher_identity}_vp8_{time}", - playlist: "participant_{publisher_identity}_vp8_{time}.m3u8", - }, - { - name: "Segments/H264", - audioCodec: types.MimeTypeOpus, - audioDelay: time.Second * 10, - audioUnpublish: time.Second * 20, - videoCodec: types.MimeTypeH264, - filename: "participant_{room_name}_h264_{time}", - playlist: "participant_{room_name}_h264_{time}.m3u8", - }, - } { - r.runParticipantTest(t, test.name, test, - func(t *testing.T, identity string) { - var segmentOutput *livekit.SegmentedFileOutput - if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(uploadPrefix, test.filename), - PlaylistName: test.playlist, - FilenameSuffix: test.filenameSuffix, - Output: &livekit.SegmentedFileOutput_S3{ - S3: r.S3Upload, - }, - } - } else { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(r.FilePrefix, test.filename), - PlaylistName: test.playlist, - FilenameSuffix: test.filenameSuffix, - } - } - - trackRequest := &livekit.ParticipantEgressRequest{ - RoomName: r.room.Name(), - Identity: identity, - SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, - } - if test.options != nil { - trackRequest.Options = &livekit.ParticipantEgressRequest_Advanced{ - Advanced: test.options, - } - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Participant{ - Participant: trackRequest, - }, - } - test.expectVideoEncoding = true - - r.runSegmentsTest(t, req, test) - }, - ) - if r.Short { - return - } - } -} - -func (r *Runner) testParticipantMulti(t *testing.T) { - if !r.should(runMulti) { - return - } - - test := &testCase{ - audioCodec: types.MimeTypeOpus, - audioUnpublish: time.Second * 20, - videoCodec: types.MimeTypeVP8, - videoDelay: time.Second * 5, - } - - r.runParticipantTest(t, "Multi", test, - func(t *testing.T, identity string) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Participant{ - Participant: &livekit.ParticipantEgressRequest{ - RoomName: r.room.Name(), - Identity: identity, - FileOutputs: []*livekit.EncodedFileOutput{{ - FileType: livekit.EncodedFileType_MP4, - Filepath: path.Join(r.FilePrefix, "participant_multiple_{time}"), - }}, - StreamOutputs: []*livekit.StreamOutput{{ - Protocol: livekit.StreamProtocol_RTMP, - }}, - }, - }, - } - - r.runMultipleTest(t, req, true, true, false, false, livekit.SegmentedFileSuffix_INDEX) - }, - ) -} diff --git a/test/publish.go b/test/publish.go index 54eca108..d70595d6 100644 --- a/test/publish.go +++ b/test/publish.go @@ -41,64 +41,45 @@ var ( } ) -func (r *Runner) publishSamples(t *testing.T, audioCodec, videoCodec types.MimeType) (audioTrackID, videoTrackID string) { - withAudioMuting := false - if videoCodec != "" { - videoTrackID = r.publishSample(t, videoCodec, r.Muting) - } else { - withAudioMuting = r.Muting - } - if audioCodec != "" { - audioTrackID = r.publishSample(t, audioCodec, withAudioMuting) - } - - time.Sleep(time.Second) - return -} - -func (r *Runner) publishSample(t *testing.T, codec types.MimeType, withMuting bool) string { - done := make(chan struct{}) - pub := r.publish(t, codec, done) - trackID := pub.SID() - - t.Cleanup(func() { - _ = r.room.LocalParticipant.UnpublishTrack(trackID) - }) - - if withMuting { - go func() { - muted := false - time.Sleep(time.Second * 15) - for { - select { - case <-done: - return - default: - pub.SetMuted(!muted) - muted = !muted - time.Sleep(time.Second * 10) - } - } - }() - } - - return trackID -} - -func (r *Runner) publishSampleOffset(t *testing.T, codec types.MimeType, publishAfter, unpublishAfter time.Duration) { +func (r *Runner) publishSample(t *testing.T, codec types.MimeType, publishAfter, unpublishAfter time.Duration, withMuting bool) string { if codec == "" { - return + return "" } + trackID := make(chan string, 1) time.AfterFunc(publishAfter, func() { done := make(chan struct{}) + unpublished := make(chan struct{}) + pub := r.publish(t, codec, done) + trackID <- pub.SID() + + if withMuting { + go func() { + muted := false + time.Sleep(time.Second * 15) + for { + select { + case <-unpublished: + return + case <-done: + return + default: + pub.SetMuted(!muted) + muted = !muted + time.Sleep(time.Second * 10) + } + } + }() + } + if unpublishAfter != 0 { time.AfterFunc(unpublishAfter-publishAfter, func() { select { case <-done: return default: + close(unpublished) _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) } }) @@ -108,6 +89,12 @@ func (r *Runner) publishSampleOffset(t *testing.T, codec types.MimeType, publish }) } }) + + if publishAfter == 0 { + return <-trackID + } else { + return "TBD" + } } func (r *Runner) publishSampleWithDisconnection(t *testing.T, codec types.MimeType) string { diff --git a/test/room_composite.go b/test/room_composite.go deleted file mode 100644 index 1bc63e70..00000000 --- a/test/room_composite.go +++ /dev/null @@ -1,306 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build integration - -package test - -import ( - "path" - "testing" - - "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" - "github.com/livekit/protocol/utils" -) - -func (r *Runner) testRoomComposite(t *testing.T) { - if !r.should(runRoom) { - return - } - - r.sourceFramerate = 30 - t.Run("RoomComposite", func(t *testing.T) { - r.testRoomCompositeFile(t) - r.testRoomCompositeStream(t) - r.testRoomCompositeSegments(t) - r.testRoomCompositeImages(t) - r.testRoomCompositeMulti(t) - }) -} - -func (r *Runner) runRoomTest(t *testing.T, name string, audioCodec, videoCodec types.MimeType, f func(t *testing.T)) { - r.run(t, name, func(t *testing.T) { - r.publishSamples(t, audioCodec, videoCodec) - f(t) - }) -} - -func (r *Runner) testRoomCompositeFile(t *testing.T) { - if !r.should(runFile) { - return - } - - for _, test := range []*testCase{ - { - name: "File/Base", - filename: "r_{room_name}_{time}.mp4", - expectVideoEncoding: true, - }, - { - name: "File/Video-Only", - videoOnly: true, - options: &livekit.EncodingOptions{ - VideoCodec: livekit.VideoCodec_H264_HIGH, - }, - filename: "r_{room_name}_video_{time}.mp4", - expectVideoEncoding: true, - }, - { - name: "File/Audio-Only", - fileType: livekit.EncodedFileType_OGG, - audioOnly: true, - options: &livekit.EncodingOptions{ - AudioCodec: livekit.AudioCodec_OPUS, - }, - filename: "r_{room_name}_audio_{time}", - expectVideoEncoding: false, - }, - } { - r.runRoomTest(t, test.name, types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { - var fileOutput *livekit.EncodedFileOutput - if r.S3Upload != nil { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(uploadPrefix, test.filename), - Output: &livekit.EncodedFileOutput_S3{ - S3: r.S3Upload, - }, - } - } else { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(r.FilePrefix, test.filename), - } - } - - roomRequest := &livekit.RoomCompositeEgressRequest{ - RoomName: r.room.Name(), - Layout: "speaker-dark", - AudioOnly: test.audioOnly, - VideoOnly: test.videoOnly, - FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, - } - if test.options != nil { - roomRequest.Options = &livekit.RoomCompositeEgressRequest_Advanced{ - Advanced: test.options, - } - } else if test.preset != 0 { - roomRequest.Options = &livekit.RoomCompositeEgressRequest_Preset{ - Preset: test.preset, - } - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: roomRequest, - }, - } - - r.runFileTest(t, req, test) - }) - if r.Short { - return - } - } -} - -func (r *Runner) testRoomCompositeStream(t *testing.T) { - if !r.should(runStream) { - return - } - - r.runRoomTest(t, "Stream", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: &livekit.RoomCompositeEgressRequest{ - RoomName: r.room.Name(), - Layout: "grid-light", - StreamOutputs: []*livekit.StreamOutput{{ - Protocol: livekit.StreamProtocol_RTMP, - Urls: []string{rtmpUrl1, badRtmpUrl1}, - }}, - }, - }, - } - - r.runStreamTest(t, req, &testCase{ - expectVideoEncoding: true, - outputType: types.OutputTypeRTMP, - }) - }) -} - -func (r *Runner) testRoomCompositeSegments(t *testing.T) { - if !r.should(runSegments) { - return - } - - r.runRoomTest(t, "Segments", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { - for _, test := range []*testCase{ - { - options: &livekit.EncodingOptions{ - AudioCodec: livekit.AudioCodec_AAC, - VideoCodec: livekit.VideoCodec_H264_BASELINE, - Width: 1920, - Height: 1080, - VideoBitrate: 4500, - }, - filename: "r_{room_name}_{time}", - playlist: "r_{room_name}_{time}.m3u8", - livePlaylist: "r_live_{room_name}_{time}.m3u8", - filenameSuffix: livekit.SegmentedFileSuffix_TIMESTAMP, - expectVideoEncoding: true, - }, - { - options: &livekit.EncodingOptions{ - AudioCodec: livekit.AudioCodec_AAC, - }, - filename: "r_{room_name}_audio_{time}", - playlist: "r_{room_name}_audio_{time}.m3u8", - filenameSuffix: livekit.SegmentedFileSuffix_TIMESTAMP, - audioOnly: true, - }, - } { - var segmentOutput *livekit.SegmentedFileOutput - if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.GCPUpload != nil { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(uploadPrefix, test.filename), - PlaylistName: test.playlist, - LivePlaylistName: test.livePlaylist, - FilenameSuffix: test.filenameSuffix, - Output: &livekit.SegmentedFileOutput_Gcp{ - Gcp: r.GCPUpload, - }, - } - } else { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(r.FilePrefix, test.filename), - PlaylistName: test.playlist, - LivePlaylistName: test.livePlaylist, - FilenameSuffix: test.filenameSuffix, - } - } - - room := &livekit.RoomCompositeEgressRequest{ - RoomName: r.RoomName, - Layout: "grid-dark", - AudioOnly: test.audioOnly, - SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, - } - if test.options != nil { - room.Options = &livekit.RoomCompositeEgressRequest_Advanced{ - Advanced: test.options, - } - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: room, - }, - } - - r.runSegmentsTest(t, req, test) - } - }) -} - -func (r *Runner) testRoomCompositeImages(t *testing.T) { - if !r.should(runImages) { - return - } - - r.runRoomTest(t, "Images", types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { - for _, test := range []*testCase{ - { - options: &livekit.EncodingOptions{ - Width: 640, - Height: 360, - }, - filename: "r_{room_name}_{time}", - imageFilenameSuffix: livekit.ImageFileSuffix_IMAGE_SUFFIX_TIMESTAMP, - }, - } { - imageOutput := &livekit.ImageOutput{ - FilenamePrefix: path.Join(r.FilePrefix, test.filename), - FilenameSuffix: test.imageFilenameSuffix, - } - - room := &livekit.RoomCompositeEgressRequest{ - RoomName: r.RoomName, - Layout: "grid-dark", - AudioOnly: test.audioOnly, - ImageOutputs: []*livekit.ImageOutput{imageOutput}, - } - if test.options != nil { - room.Options = &livekit.RoomCompositeEgressRequest_Advanced{ - Advanced: test.options, - } - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: room, - }, - } - - r.runImagesTest(t, req, test) - } - }) -} - -func (r *Runner) testRoomCompositeMulti(t *testing.T) { - if !r.should(runMulti) { - return - } - - r.runRoomTest(t, "Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: &livekit.RoomCompositeEgressRequest{ - RoomName: r.room.Name(), - Layout: "grid-light", - FileOutputs: []*livekit.EncodedFileOutput{{ - FileType: livekit.EncodedFileType_MP4, - Filepath: path.Join(r.FilePrefix, "rc_multiple_{time}"), - }}, - ImageOutputs: []*livekit.ImageOutput{{ - CaptureInterval: 10, - Width: 1280, - Height: 720, - FilenamePrefix: path.Join(r.FilePrefix, "rc_image"), - }}, - }, - }, - } - - r.runMultipleTest(t, req, true, false, false, true, livekit.SegmentedFileSuffix_TIMESTAMP) - }) -} diff --git a/test/runner.go b/test/runner.go index 5022adc3..244e134d 100644 --- a/test/runner.go +++ b/test/runner.go @@ -118,6 +118,21 @@ func NewRunner(t *testing.T) *Runner { case "track": r.TrackTestsOnly = true r.RoomName = fmt.Sprintf("track-integration-%d", rand.Intn(100)) + case "file": + r.FileTestsOnly = true + r.RoomName = fmt.Sprintf("file-integration-%d", rand.Intn(100)) + case "stream": + r.StreamTestsOnly = true + r.RoomName = fmt.Sprintf("stream-integration-%d", rand.Intn(100)) + case "segments": + r.SegmentTestsOnly = true + r.RoomName = fmt.Sprintf("segments-integration-%d", rand.Intn(100)) + case "images": + r.ImageTestsOnly = true + r.RoomName = fmt.Sprintf("images-integration-%d", rand.Intn(100)) + case "multi": + r.MultiTestsOnly = true + r.RoomName = fmt.Sprintf("multi-integration-%d", rand.Intn(100)) case "edge": r.EdgeCasesOnly = true r.RoomName = fmt.Sprintf("edge-integration-%d", rand.Intn(100)) diff --git a/test/segments.go b/test/segments.go index bdb2943e..cacb7b32 100644 --- a/test/segments.go +++ b/test/segments.go @@ -31,10 +31,137 @@ import ( "github.com/livekit/egress/pkg/pipeline/sink/m3u8" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" ) -func (r *Runner) runSegmentsTest(t *testing.T, req *rpc.StartEgressRequest, test *testCase) { +func (r *Runner) testSegments(t *testing.T) { + if !r.should(runSegments) { + return + } + + t.Run("Segments", func(t *testing.T) { + for _, test := range []*testCase{ + + // ---- Room Composite ----- + + { + name: "RoomComposite", + requestType: types.RequestTypeRoomComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + }, + encodingOptions: &livekit.EncodingOptions{ + AudioCodec: livekit.AudioCodec_AAC, + VideoCodec: livekit.VideoCodec_H264_BASELINE, + Width: 1920, + Height: 1080, + VideoBitrate: 4500, + }, + segmentOptions: &segmentOptions{ + prefix: "r_{room_name}_{time}", + playlist: "r_{room_name}_{time}.m3u8", + livePlaylist: "r_live_{room_name}_{time}.m3u8", + suffix: livekit.SegmentedFileSuffix_TIMESTAMP, + }, + }, + { + name: "RoomComposite/AudioOnly", + requestType: types.RequestTypeRoomComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + audioOnly: true, + }, + encodingOptions: &livekit.EncodingOptions{ + AudioCodec: livekit.AudioCodec_AAC, + }, + segmentOptions: &segmentOptions{ + prefix: "r_{room_name}_audio_{time}", + playlist: "r_{room_name}_audio_{time}.m3u8", + suffix: livekit.SegmentedFileSuffix_TIMESTAMP, + }, + }, + + // ---------- Web ---------- + + { + name: "Web", + requestType: types.RequestTypeWeb, + segmentOptions: &segmentOptions{ + prefix: "web_{time}", + playlist: "web_{time}.m3u8", + }, + }, + + // ------ Participant ------ + + { + name: "ParticipantComposite/VP8", + requestType: types.RequestTypeParticipant, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + // videoDelay: time.Second * 10, + // videoUnpublish: time.Second * 20, + }, + segmentOptions: &segmentOptions{ + prefix: "participant_{publisher_identity}_vp8_{time}", + playlist: "participant_{publisher_identity}_vp8_{time}.m3u8", + }, + }, + { + name: "ParticipantComposite/H264", + requestType: types.RequestTypeParticipant, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + audioDelay: time.Second * 10, + audioUnpublish: time.Second * 20, + videoCodec: types.MimeTypeH264, + }, + segmentOptions: &segmentOptions{ + prefix: "participant_{room_name}_h264_{time}", + playlist: "participant_{room_name}_h264_{time}.m3u8", + }, + }, + + // ---- Track Composite ---- + + { + name: "TrackComposite/H264", + requestType: types.RequestTypeTrackComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + }, + segmentOptions: &segmentOptions{ + prefix: "tcs_{room_name}_h264_{time}", + playlist: "tcs_{room_name}_h264_{time}.m3u8", + livePlaylist: "tcs_live_{room_name}_h264_{time}.m3u8", + }, + }, + { + name: "TrackComposite/AudioOnly", + requestType: types.RequestTypeTrackComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + audioOnly: true, + }, + segmentOptions: &segmentOptions{ + prefix: "tcs_{room_name}_audio_{time}", + playlist: "tcs_{room_name}_audio_{time}.m3u8", + }, + }, + } { + r.run(t, test, r.runSegmentsTest) + if r.Short { + return + } + } + }) +} + +func (r *Runner) runSegmentsTest(t *testing.T, test *testCase) { + req := r.build(test) + egressID := r.startEgress(t, req) time.Sleep(time.Second * 10) @@ -50,10 +177,9 @@ func (r *Runner) runSegmentsTest(t *testing.T, req *rpc.StartEgressRequest, test p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) require.NoError(t, err) - r.verifySegments(t, p, test.filenameSuffix, res, test.livePlaylist != "") - if !test.audioOnly { - require.Equal(t, test.expectVideoEncoding, p.VideoEncoding) - } + require.Equal(t, !test.audioOnly, p.VideoEncoding) + + r.verifySegments(t, p, test.segmentOptions.suffix, res, test.livePlaylist != "") } func (r *Runner) verifySegments(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, res *livekit.EgressInfo, enableLivePlaylist bool) { diff --git a/test/stream.go b/test/stream.go index 9331500c..0875ddd8 100644 --- a/test/stream.go +++ b/test/stream.go @@ -19,21 +19,27 @@ package test import ( "context" "fmt" + "net/http" + "net/http/httptest" + "os" + "path" + "strings" "testing" "time" + "github.com/gorilla/websocket" "github.com/stretchr/testify/require" "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" ) const ( - badRtmpUrl1 = "rtmp://xxx.contribute.live-video.net/app/fake1" - badRtmpUrl1Redacted = "rtmp://xxx.contribute.live-video.net/app/{f...1}" + badRtmpUrl1 = "rtmp://localhost:1936/wrong/stream" + badRtmpUrl1Redacted = "rtmp://localhost:1936/wrong/{st...am}" badRtmpUrl2 = "rtmp://localhost:1936/live/stream" badRtmpUrl2Redacted = "rtmp://localhost:1936/live/{st...am}" badSrtUrl1 = "srt://localhost:8891?streamid=publish:wrongport&pkt_size=1316" @@ -69,15 +75,111 @@ var streamUrls = map[types.OutputType][][]string{ }, } -func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test *testCase) { +func (r *Runner) testStream(t *testing.T) { + if !r.should(runStream) { + return + } + + t.Run("Stream", func(t *testing.T) { + for _, test := range []*testCase{ + + // ---- Room Composite ----- + + { + name: "RoomComposite", + requestType: types.RequestTypeRoomComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + }, + streamOptions: &streamOptions{ + streamUrls: []string{rtmpUrl1, badRtmpUrl1}, + outputType: types.OutputTypeRTMP, + }, + }, + + // ---------- Web ---------- + + { + name: "Web", + requestType: types.RequestTypeWeb, + streamOptions: &streamOptions{ + streamUrls: []string{srtPublishUrl1, badSrtUrl1}, + outputType: types.OutputTypeSRT, + }, + }, + + // ------ Participant ------ + + { + name: "ParticipantComposite", + requestType: types.RequestTypeParticipant, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + audioDelay: time.Second * 8, + videoCodec: types.MimeTypeVP8, + }, + streamOptions: &streamOptions{ + streamUrls: []string{rtmpUrl1, badRtmpUrl1}, + outputType: types.OutputTypeRTMP, + }, + }, + + // ---- Track Composite ---- + + { + name: "TrackComposite", + requestType: types.RequestTypeTrackComposite, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + }, + streamOptions: &streamOptions{ + streamUrls: []string{rtmpUrl1, badRtmpUrl1}, + outputType: types.OutputTypeRTMP, + }, + }, + + // --------- Track --------- + + { + name: "Track", + requestType: types.RequestTypeTrack, + publishOptions: publishOptions{ + audioCodec: types.MimeTypeOpus, + audioOnly: true, + }, + streamOptions: &streamOptions{ + rawFileName: fmt.Sprintf("track-ws-%v.raw", time.Now().Unix()), + outputType: types.OutputTypeRaw, + }, + }, + } { + r.run(t, test, r.runStreamTest) + if r.Short { + return + } + } + }) +} + +func (r *Runner) runStreamTest(t *testing.T, test *testCase) { + if test.requestType == types.RequestTypeTrack { + r.runWebsocketTest(t, test) + return + } + + req := r.build(test) + ctx := context.Background() - urls := streamUrls[test.outputType] + urls := streamUrls[test.streamOptions.outputType] egressID := r.startEgress(t, req) p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) require.NoError(t, err) - require.Equal(t, test.expectVideoEncoding, p.VideoEncoding) - if test.expectVideoEncoding { + + if !test.audioOnly { + require.True(t, p.VideoEncoding) require.Equal(t, config.StreamKeyframeInterval, p.KeyFrameInterval) } @@ -162,3 +264,97 @@ func (r *Runner) verifyStreams(t *testing.T, p *config.PipelineConfig, urls ...s verify(t, url, p, nil, types.EgressTypeStream, false, r.sourceFramerate, false) } } + +func (r *Runner) runWebsocketTest(t *testing.T, test *testCase) { + filepath := path.Join(r.FilePrefix, test.streamOptions.rawFileName) + wss := newTestWebsocketServer(filepath) + s := httptest.NewServer(http.HandlerFunc(wss.handleWebsocket)) + test.websocketUrl = "ws" + strings.TrimPrefix(s.URL, "http") + defer func() { + wss.close() + s.Close() + }() + + req := r.build(test) + + egressID := r.startEgress(t, req) + + p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) + require.NoError(t, err) + + time.Sleep(time.Second * 30) + + res := r.stopEgress(t, egressID) + verify(t, filepath, p, res, types.EgressTypeWebsocket, r.Muting, r.sourceFramerate, false) +} + +type websocketTestServer struct { + path string + file *os.File + conn *websocket.Conn + done chan struct{} +} + +func newTestWebsocketServer(filepath string) *websocketTestServer { + return &websocketTestServer{ + path: filepath, + done: make(chan struct{}), + } +} + +func (s *websocketTestServer) handleWebsocket(w http.ResponseWriter, r *http.Request) { + var err error + + s.file, err = os.Create(s.path) + if err != nil { + logger.Errorw("could not create file", err) + return + } + + // accept ws connection + upgrader := websocket.Upgrader{} + s.conn, err = upgrader.Upgrade(w, r, nil) + if err != nil { + logger.Errorw("could not accept ws connection", err) + return + } + + go func() { + defer func() { + _ = s.file.Close() + + // close the connection only if it's not closed already + if !websocket.IsUnexpectedCloseError(err) { + _ = s.conn.Close() + } + }() + + for { + select { + case <-s.done: + return + default: + mt, msg, err := s.conn.ReadMessage() + if err != nil { + if !websocket.IsUnexpectedCloseError(err) { + logger.Errorw("unexpected ws close", err) + } + return + } + + switch mt { + case websocket.BinaryMessage: + _, err = s.file.Write(msg) + if err != nil { + logger.Errorw("could not write to file", err) + return + } + } + } + } + }() +} + +func (s *websocketTestServer) close() { + close(s.done) +} diff --git a/test/track.go b/test/track.go deleted file mode 100644 index f6f9b672..00000000 --- a/test/track.go +++ /dev/null @@ -1,244 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build integration - -package test - -import ( - "fmt" - "net/http" - "net/http/httptest" - "os" - "path" - "strings" - "testing" - "time" - - "github.com/gorilla/websocket" - "github.com/stretchr/testify/require" - - "github.com/livekit/egress/pkg/config" - "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/rpc" - "github.com/livekit/protocol/utils" -) - -func (r *Runner) testTrack(t *testing.T) { - if !r.should(runTrack) { - return - } - - r.sourceFramerate = 23.97 - t.Run("Track", func(t *testing.T) { - r.testTrackFile(t) - r.testTrackStream(t) - }) -} - -func (r *Runner) testTrackFile(t *testing.T) { - if !r.should(runFile) { - return - } - - for _, test := range []*testCase{ - { - name: "File/OPUS", - audioOnly: true, - audioCodec: types.MimeTypeOpus, - outputType: types.OutputTypeOGG, - filename: "t_{track_source}_{time}.ogg", - }, - { - name: "File/H264", - videoOnly: true, - videoCodec: types.MimeTypeH264, - outputType: types.OutputTypeMP4, - filename: "t_{track_id}_{time}.mp4", - }, - { - name: "File/VP8", - videoOnly: true, - videoCodec: types.MimeTypeVP8, - outputType: types.OutputTypeWebM, - filename: "t_{track_type}_{time}.webm", - }, - // { - // name: "File/VP9", - // videoOnly: true, - // videoCodec: types.MimeTypeVP9, - // outputType: types.OutputTypeWebM, - // filename: "t_{track_type}_{time}.webm", - // }, - } { - r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { - trackID := audioTrackID - if trackID == "" { - trackID = videoTrackID - } - - trackRequest := &livekit.TrackEgressRequest{ - RoomName: r.room.Name(), - TrackId: trackID, - Output: &livekit.TrackEgressRequest_File{ - File: &livekit.DirectFileOutput{ - Filepath: path.Join(r.FilePrefix, test.filename), - }, - }, - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Track{ - Track: trackRequest, - }, - } - - r.runFileTest(t, req, test) - }) - if r.Short { - return - } - } -} - -func (r *Runner) testTrackStream(t *testing.T) { - if !r.should(runStream) { - return - } - - now := time.Now().Unix() - for _, test := range []*testCase{ - { - name: "Websocket", - audioOnly: true, - audioCodec: types.MimeTypeOpus, - filename: fmt.Sprintf("track-ws-%v.raw", now), - }, - } { - r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { - trackID := audioTrackID - if trackID == "" { - trackID = videoTrackID - } - - filepath := path.Join(r.FilePrefix, test.filename) - wss := newTestWebsocketServer(filepath) - s := httptest.NewServer(http.HandlerFunc(wss.handleWebsocket)) - defer func() { - wss.close() - s.Close() - }() - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Track{ - Track: &livekit.TrackEgressRequest{ - RoomName: r.room.Name(), - TrackId: trackID, - Output: &livekit.TrackEgressRequest_WebsocketUrl{ - WebsocketUrl: "ws" + strings.TrimPrefix(s.URL, "http"), - }, - }, - }, - } - - egressID := r.startEgress(t, req) - - p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) - require.NoError(t, err) - - time.Sleep(time.Second * 30) - - res := r.stopEgress(t, egressID) - verify(t, filepath, p, res, types.EgressTypeWebsocket, r.Muting, r.sourceFramerate, false) - }) - if r.Short { - return - } - } -} - -type websocketTestServer struct { - path string - file *os.File - conn *websocket.Conn - done chan struct{} -} - -func newTestWebsocketServer(filepath string) *websocketTestServer { - return &websocketTestServer{ - path: filepath, - done: make(chan struct{}), - } -} - -func (s *websocketTestServer) handleWebsocket(w http.ResponseWriter, r *http.Request) { - var err error - - s.file, err = os.Create(s.path) - if err != nil { - logger.Errorw("could not create file", err) - return - } - - // accept ws connection - upgrader := websocket.Upgrader{} - s.conn, err = upgrader.Upgrade(w, r, nil) - if err != nil { - logger.Errorw("could not accept ws connection", err) - return - } - - go func() { - defer func() { - _ = s.file.Close() - - // close the connection only if it's not closed already - if !websocket.IsUnexpectedCloseError(err) { - _ = s.conn.Close() - } - }() - - for { - select { - case <-s.done: - return - default: - mt, msg, err := s.conn.ReadMessage() - if err != nil { - if !websocket.IsUnexpectedCloseError(err) { - logger.Errorw("unexpected ws close", err) - } - return - } - - switch mt { - case websocket.BinaryMessage: - _, err = s.file.Write(msg) - if err != nil { - logger.Errorw("could not write to file", err) - return - } - } - } - } - }() -} - -func (s *websocketTestServer) close() { - close(s.done) -} diff --git a/test/track_composite.go b/test/track_composite.go deleted file mode 100644 index 4e86b2fe..00000000 --- a/test/track_composite.go +++ /dev/null @@ -1,336 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build integration - -package test - -import ( - "path" - "testing" - - "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" - "github.com/livekit/protocol/utils" -) - -func (r *Runner) testTrackComposite(t *testing.T) { - if !r.should(runTrackComposite) { - return - } - - r.sourceFramerate = 23.97 - t.Run("TrackComposite", func(t *testing.T) { - r.testTrackCompositeFile(t) - r.testTrackCompositeStream(t) - r.testTrackCompositeSegments(t) - r.testTrackCompositeImages(t) - r.testTrackCompositeMulti(t) - }) -} - -func (r *Runner) runTrackTest( - t *testing.T, name string, audioCodec, videoCodec types.MimeType, - f func(t *testing.T, audioTrackID, videoTrackID string), -) { - r.run(t, name, func(t *testing.T) { - audioTrackID, videoTrackID := r.publishSamples(t, audioCodec, videoCodec) - f(t, audioTrackID, videoTrackID) - }) -} - -func (r *Runner) testTrackCompositeFile(t *testing.T) { - if !r.should(runFile) { - return - } - - for _, test := range []*testCase{ - { - name: "File/VP8", - fileType: livekit.EncodedFileType_MP4, - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeVP8, - filename: "tc_{publisher_identity}_vp8_{time}.mp4", - }, - { - name: "File/VideoOnly", - fileType: livekit.EncodedFileType_MP4, - videoCodec: types.MimeTypeH264, - filename: "tc_{room_name}_video_{time}.mp4", - videoOnly: true, - }, - } { - r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { - var aID, vID string - if !test.audioOnly { - vID = videoTrackID - } - if !test.videoOnly { - aID = audioTrackID - } - - var fileOutput *livekit.EncodedFileOutput - if r.AzureUpload != nil { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(uploadPrefix, test.filename), - Output: &livekit.EncodedFileOutput_Azure{ - Azure: r.AzureUpload, - }, - } - } else { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(r.FilePrefix, test.filename), - } - } - - trackRequest := &livekit.TrackCompositeEgressRequest{ - RoomName: r.room.Name(), - AudioTrackId: aID, - VideoTrackId: vID, - FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, - } - if test.options != nil { - trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ - Advanced: test.options, - } - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_TrackComposite{ - TrackComposite: trackRequest, - }, - } - - test.expectVideoEncoding = true - r.runFileTest(t, req, test) - }) - if r.Short { - return - } - } -} - -func (r *Runner) testTrackCompositeStream(t *testing.T) { - if !r.should(runStream) { - return - } - - r.runTrackTest(t, "Stream", types.MimeTypeOpus, types.MimeTypeVP8, - func(t *testing.T, audioTrackID, videoTrackID string) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_TrackComposite{ - TrackComposite: &livekit.TrackCompositeEgressRequest{ - RoomName: r.room.Name(), - AudioTrackId: audioTrackID, - VideoTrackId: videoTrackID, - StreamOutputs: []*livekit.StreamOutput{{ - Urls: []string{rtmpUrl1, badRtmpUrl1}, - }}, - }, - }, - } - - r.runStreamTest(t, req, &testCase{ - expectVideoEncoding: true, - outputType: types.OutputTypeRTMP, - }) - }, - ) -} - -func (r *Runner) testTrackCompositeSegments(t *testing.T) { - if !r.should(runSegments) { - return - } - - for _, test := range []*testCase{ - { - name: "Segments/H264", - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeH264, - filename: "tcs_{room_name}_h264_{time}", - playlist: "tcs_{room_name}_h264_{time}.m3u8", - livePlaylist: "tcs_live_{room_name}_h264_{time}.m3u8", - }, - { - name: "Segments/Audio-Only", - audioCodec: types.MimeTypeOpus, - filename: "tcs_{room_name}_audio_{time}", - playlist: "tcs_{room_name}_audio_{time}.m3u8", - audioOnly: true, - }, - } { - r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, - func(t *testing.T, audioTrackID, videoTrackID string) { - var aID, vID string - if !test.audioOnly { - vID = videoTrackID - } - if !test.videoOnly { - aID = audioTrackID - } - - var segmentOutput *livekit.SegmentedFileOutput - if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(uploadPrefix, test.filename), - PlaylistName: test.playlist, - LivePlaylistName: test.livePlaylist, - FilenameSuffix: test.filenameSuffix, - Output: &livekit.SegmentedFileOutput_S3{ - S3: r.S3Upload, - }, - } - } else { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(r.FilePrefix, test.filename), - PlaylistName: test.playlist, - LivePlaylistName: test.livePlaylist, - FilenameSuffix: test.filenameSuffix, - } - } - - trackRequest := &livekit.TrackCompositeEgressRequest{ - RoomName: r.room.Name(), - AudioTrackId: aID, - VideoTrackId: vID, - SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, - } - if test.options != nil { - trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ - Advanced: test.options, - } - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_TrackComposite{ - TrackComposite: trackRequest, - }, - } - test.expectVideoEncoding = true - - r.runSegmentsTest(t, req, test) - }, - ) - if r.Short { - return - } - } -} - -func (r *Runner) testTrackCompositeImages(t *testing.T) { - if !r.should(runImages) { - return - } - - for _, test := range []*testCase{ - { - name: "Images/H264", - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeH264, - filename: "tc_{publisher_identity}_h264", - }, - } { - r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, - func(t *testing.T, audioTrackID, videoTrackID string) { - var aID, vID string - if !test.audioOnly { - vID = videoTrackID - } - if !test.videoOnly { - aID = audioTrackID - } - - var imageOutput *livekit.ImageOutput - if r.S3Upload != nil { - imageOutput = &livekit.ImageOutput{ - CaptureInterval: 5, - Width: 1280, - Height: 720, - FilenamePrefix: path.Join(uploadPrefix, test.filename), - Output: &livekit.ImageOutput_S3{ - S3: r.S3Upload, - }, - } - } else { - imageOutput = &livekit.ImageOutput{ - CaptureInterval: 5, - Width: 1280, - Height: 720, - FilenamePrefix: path.Join(r.FilePrefix, test.filename), - } - } - - trackRequest := &livekit.TrackCompositeEgressRequest{ - RoomName: r.room.Name(), - AudioTrackId: aID, - VideoTrackId: vID, - ImageOutputs: []*livekit.ImageOutput{imageOutput}, - } - if test.options != nil { - trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ - Advanced: test.options, - } - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_TrackComposite{ - TrackComposite: trackRequest, - }, - } - r.runImagesTest(t, req, test) - }, - ) - if r.Short { - return - } - } -} - -func (r *Runner) testTrackCompositeMulti(t *testing.T) { - if !r.should(runMulti) { - return - } - - r.runTrackTest(t, "Multi", types.MimeTypeOpus, types.MimeTypeVP8, - func(t *testing.T, audioTrackID, videoTrackID string) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_TrackComposite{ - TrackComposite: &livekit.TrackCompositeEgressRequest{ - RoomName: r.room.Name(), - AudioTrackId: audioTrackID, - VideoTrackId: videoTrackID, - StreamOutputs: []*livekit.StreamOutput{{ - Protocol: livekit.StreamProtocol_RTMP, - }}, - SegmentOutputs: []*livekit.SegmentedFileOutput{{ - FilenamePrefix: path.Join(r.FilePrefix, "tc_multiple_{time}"), - PlaylistName: "tc_multiple_{time}", - }}, - }, - }, - } - - r.runMultipleTest(t, req, false, true, true, false, livekit.SegmentedFileSuffix_INDEX) - }, - ) -} diff --git a/test/web.go b/test/web.go deleted file mode 100644 index bb4c191b..00000000 --- a/test/web.go +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build integration - -package test - -import ( - "path" - "testing" - - "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" - "github.com/livekit/protocol/utils" -) - -const webUrl = "https://storage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" - -func (r *Runner) testWeb(t *testing.T) { - if !r.should(runWeb) { - return - } - - r.sourceFramerate = 30 - t.Run("Web", func(t *testing.T) { - r.testWebFile(t) - r.testWebStream(t) - r.testWebSegments(t) - r.testWebMulti(t) - }) -} - -func (r *Runner) testWebFile(t *testing.T) { - if !r.should(runFile) { - return - } - - r.run(t, "File", func(t *testing.T) { - var fileOutput *livekit.EncodedFileOutput - if r.GCPUpload != nil { - fileOutput = &livekit.EncodedFileOutput{ - Filepath: path.Join(uploadPrefix, "web_{time}"), - Output: &livekit.EncodedFileOutput_Gcp{ - Gcp: r.GCPUpload, - }, - } - } else { - fileOutput = &livekit.EncodedFileOutput{ - Filepath: path.Join(r.FilePrefix, "web_{time}"), - } - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Web{ - Web: &livekit.WebEgressRequest{ - Url: webUrl, - VideoOnly: true, - FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, - }, - }, - } - - r.runFileTest(t, req, &testCase{ - expectVideoEncoding: true, - }) - }) -} - -func (r *Runner) testWebStream(t *testing.T) { - if !r.should(runStream) { - return - } - - r.run(t, "Stream", func(t *testing.T) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Web{ - Web: &livekit.WebEgressRequest{ - Url: webUrl, - StreamOutputs: []*livekit.StreamOutput{{ - Protocol: livekit.StreamProtocol_SRT, - Urls: []string{srtPublishUrl1, badSrtUrl1}, - }}, - }, - }, - } - - r.runStreamTest(t, req, &testCase{ - expectVideoEncoding: true, - outputType: types.OutputTypeSRT, - }) - }) -} - -func (r *Runner) testWebSegments(t *testing.T) { - if !r.should(runSegments) { - return - } - - r.run(t, "Segments", func(t *testing.T) { - var segmentOutput *livekit.SegmentedFileOutput - if r.AzureUpload != nil { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(uploadPrefix, "web_{time}"), - PlaylistName: "web_{time}.m3u8", - Output: &livekit.SegmentedFileOutput_Azure{ - Azure: r.AzureUpload, - }, - } - } else { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(r.FilePrefix, "web_{time}"), - PlaylistName: "web_{time}.m3u8", - } - } - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Web{ - Web: &livekit.WebEgressRequest{ - Url: webUrl, - SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, - }, - }, - } - - r.runSegmentsTest(t, req, &testCase{ - expectVideoEncoding: true, - }) - }) -} - -func (r *Runner) testWebMulti(t *testing.T) { - if !r.should(runMulti) { - return - } - - r.run(t, "Multi", func(t *testing.T) { - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - - Request: &rpc.StartEgressRequest_Web{ - Web: &livekit.WebEgressRequest{ - Url: webUrl, - FileOutputs: []*livekit.EncodedFileOutput{{ - FileType: livekit.EncodedFileType_MP4, - Filepath: path.Join(r.FilePrefix, "web_multiple_{time}"), - }}, - SegmentOutputs: []*livekit.SegmentedFileOutput{{ - FilenamePrefix: path.Join(r.FilePrefix, "web_multiple_{time}"), - PlaylistName: "web_multiple_{time}", - }}, - }, - }, - } - - r.runMultipleTest(t, req, true, false, true, false, livekit.SegmentedFileSuffix_INDEX) - }) -}