From c526bd976587671f58b610eba661310899a8c4c0 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 5 Aug 2024 18:07:21 -0400 Subject: [PATCH] add lock, fix srt added through update --- pkg/pipeline/controller.go | 5 ++++- pkg/pipeline/watch.go | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 25d894d1..916e9c52 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -265,6 +265,7 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream sendUpdate := false errs := errors.ErrArray{} + now := time.Now().UnixNano() // add stream outputs first for _, rawUrl := range req.AddOutputUrls { @@ -296,7 +297,9 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream continue } - // add to output count + if o.OutputType != types.OutputTypeRTMP { + streamInfo.StartedAt = now + } c.OutputCount++ sendUpdate = true } diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 60f05b40..a15dd03b 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -97,6 +97,7 @@ func (c *Controller) gstLog( if function == fnSendCreateStream { streamID := strings.Split(message, "'")[1] if o := c.GetStreamConfig(); o != nil { + c.mu.Lock() for url, sID := range o.StreamIDs { if streamID == sID { if streamInfo := o.StreamInfo[url]; streamInfo != nil && streamInfo.StartedAt == 0 { @@ -105,6 +106,7 @@ func (c *Controller) gstLog( } } } + c.mu.Unlock() } } return