Skip to content

Commit

Permalink
update started_at for rtmp output (#751)
Browse files Browse the repository at this point in the history
* update started_at for rtmp output

* add lock, fix srt added through update

* also fix azure location
  • Loading branch information
frostbyte73 authored Aug 6, 2024
1 parent e2a0f23 commit e7b3b9a
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 30 deletions.
15 changes: 10 additions & 5 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config

import (
"os"
"strings"
"time"

"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -111,16 +112,20 @@ func (c *BaseConfig) initLogger(values ...interface{}) error {
c.Logging.Level = c.LogLevel
}

var gstDebug string
var gstDebug []string
switch c.Logging.Level {
case "debug":
gstDebug = "3"
gstDebug = []string{"3"}
case "info", "warn":
gstDebug = "2"
gstDebug = []string{"2"}
case "error":
gstDebug = "1"
gstDebug = []string{"1"}
}
if err := os.Setenv("GST_DEBUG", gstDebug); err != nil {
gstDebug = append(gstDebug,
"rtmpclient:4",
"srtlib:1",
)
if err := os.Setenv("GST_DEBUG", strings.Join(gstDebug, ",")); err != nil {
return err
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type StreamConfig struct {
outputConfig

Urls []string
StreamIDs map[string]string
StreamInfo map[string]*livekit.StreamInfo

twitchTemplate string
Expand All @@ -47,6 +48,7 @@ func (p *PipelineConfig) GetWebsocketConfig() *StreamConfig {
func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []string) (*StreamConfig, error) {
conf := &StreamConfig{
outputConfig: outputConfig{OutputType: outputType},
StreamIDs: make(map[string]string),
}

conf.StreamInfo = make(map[string]*livekit.StreamInfo)
Expand Down
21 changes: 19 additions & 2 deletions pkg/config/urls.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import (
"github.com/livekit/protocol/utils"
)

var twitchEndpoint = regexp.MustCompile("^rtmps?://.*\\.contribute\\.live-video\\.net/app/(.*)( live=1)?$")
// rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)
var (
rtmpRegexp = regexp.MustCompile("^(rtmps?:\\/\\/)(.*\\/)(.*\\/)(\\S*)( live=1)?$")
twitchEndpoint = regexp.MustCompile("^rtmps?://.*\\.contribute\\.live-video\\.net/app/(.*)( live=1)?$")
)

func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) {
parsed, err := url.Parse(rawUrl)
Expand All @@ -55,10 +59,12 @@ func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (
}
}

redacted, ok := utils.RedactStreamKey(rawUrl)
redacted, streamID, ok := redactStreamKey(rawUrl)
if !ok {
return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)")
}
o.StreamIDs[rawUrl] = streamID

return rawUrl, redacted, nil

case types.OutputTypeSRT:
Expand Down Expand Up @@ -142,3 +148,14 @@ func (o *StreamConfig) updateTwitchTemplate() error {

return errors.New("no ingest found")
}

func redactStreamKey(url string) (string, string, bool) {
match := rtmpRegexp.FindStringSubmatch(url)
if len(match) != 6 {
return url, "", false
}

streamID := match[4]
match[4] = utils.RedactIdentifier(match[4])
return strings.Join(match[1:], ""), streamID, true
}
33 changes: 20 additions & 13 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,21 +276,11 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
continue
}

// add stream
if err = c.streamBin.AddStream(url); err != nil {
errs.AppendErr(err)
continue
}

// add to output count
c.OutputCount++

// add stream info to results
c.mu.Lock()
streamInfo := &livekit.StreamInfo{
Url: redacted,
StartedAt: now,
Status: livekit.StreamInfo_ACTIVE,
Url: redacted,
Status: livekit.StreamInfo_ACTIVE,
}
o.StreamInfo[url] = streamInfo

Expand All @@ -299,6 +289,18 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
list.Info = append(list.Info, streamInfo)
}
c.mu.Unlock()

// add stream
if err = c.streamBin.AddStream(url); err != nil {
streamInfo.Status = livekit.StreamInfo_FAILED
errs.AppendErr(err)
continue
}

if o.OutputType != types.OutputTypeRTMP {
streamInfo.StartedAt = now
}
c.OutputCount++
sendUpdate = true
}

Expand Down Expand Up @@ -520,8 +522,13 @@ func (c *Controller) updateStartTime(startedAt int64) {
}
switch egressType {
case types.EgressTypeStream, types.EgressTypeWebsocket:
streamConfig := o[0].(*config.StreamConfig)
if streamConfig.OutputType == types.OutputTypeRTMP {
continue
}

c.mu.Lock()
for _, streamInfo := range o[0].(*config.StreamConfig).StreamInfo {
for _, streamInfo := range streamConfig.StreamInfo {
streamInfo.Status = livekit.StreamInfo_ACTIVE
streamInfo.StartedAt = startedAt
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/pipeline/sink/uploader/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"net/url"
"os"
"path"

"github.com/Azure/azure-storage-blob-go/azblob"

Expand Down Expand Up @@ -89,5 +88,5 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
return "", 0, errors.ErrUploadFailed("Azure", err)
}

return path.Join(u.container, storageFilepath), stat.Size(), nil
return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil
}
33 changes: 25 additions & 8 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ const (
msgInputDisappeared = "Can't copy metadata because input buffer disappeared"
msgSkippingSegment = "error reading data -1 (reason: Success), skipping segment"
fnGstAudioResampleCheckDiscont = "gst_audio_resample_check_discont"
callerEPollUpdateEvents = "./srtcore/epoll.cpp:905"

// noisy gst fixmes
msgStreamStart = "stream-start event without group-id. Consider implementing group-id handling in the upstream elements"
msgCreatingStream = "Creating random stream-id, consider implementing a deterministic way of creating a stream-id"
msgAggregateSubclass = "Subclass should call gst_aggregator_selected_samples() from its aggregate implementation."

// rtmp client
catRtmpClient = "rtmpclient"
fnSendCreateStream = "send_create_stream"
)

var (
Expand All @@ -70,38 +73,52 @@ var (
msgInputDisappeared: true,
msgSkippingSegment: true,
fnGstAudioResampleCheckDiscont: true,
callerEPollUpdateEvents: true,
msgStreamStart: true,
msgCreatingStream: true,
msgAggregateSubclass: true,
}
)

func (c *Controller) gstLog(
_ *gst.DebugCategory,
cat *gst.DebugCategory,
level gst.DebugLevel,
file, function string, line int,
_ *gst.LoggedObject,
debugMsg *gst.DebugMessage,
) {
category := cat.GetName()
message := debugMsg.Get()
lvl, ok := logLevels[level]
if !ok || ignore[message] || ignore[function] {
return
}

caller := fmt.Sprintf("%s:%d", file, line)
if ignore[caller] {
if category == catRtmpClient {
if function == fnSendCreateStream {
streamID := strings.Split(message, "'")[1]
if o := c.GetStreamConfig(); o != nil {
c.mu.Lock()
for url, sID := range o.StreamIDs {
if streamID == sID {
if streamInfo := o.StreamInfo[url]; streamInfo != nil && streamInfo.StartedAt == 0 {
streamInfo.StartedAt = time.Now().UnixNano()
break
}
}
}
c.mu.Unlock()
}
}
return
}

var msg string
if function != "" {
msg = fmt.Sprintf("[gst %s] %s: %s", lvl, function, message)
msg = fmt.Sprintf("[%s %s] %s: %s", category, lvl, function, message)
} else {
msg = fmt.Sprintf("[gst %s] %s", lvl, message)
msg = fmt.Sprintf("[%s %s] %s", category, lvl, message)
}
c.gstLogger.Debugw(msg, "caller", caller)
c.gstLogger.Debugw(msg, "caller", fmt.Sprintf("%s:%d", file, line))
}

func (c *Controller) messageWatch(msg *gst.Message) bool {
Expand Down

0 comments on commit e7b3b9a

Please sign in to comment.