Skip to content

Commit

Permalink
Clean up stream logic (#754)
Browse files Browse the repository at this point in the history
* clean up stream logic

* more updates

* update urls_test

* fix SRT test

* fix test races for stream updates

* update rtmp failure test
  • Loading branch information
frostbyte73 authored Aug 9, 2024
1 parent 813e77c commit 8602b26
Show file tree
Hide file tree
Showing 14 changed files with 370 additions and 294 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aws/aws-sdk-go v1.51.28
github.com/bep/debounce v1.2.1
github.com/chromedp/cdproto v0.0.0-20240421230201-ab917191657d
github.com/chromedp/chromedp v0.9.5
github.com/frostbyte73/core v0.0.10
Expand Down Expand Up @@ -49,7 +50,6 @@ require (
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bep/debounce v1.2.1 // indirect
github.com/bufbuild/protovalidate-go v0.6.1 // indirect
github.com/bufbuild/protoyaml-go v0.1.9 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand Down
34 changes: 19 additions & 15 deletions pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
}

p.Outputs[types.EgressTypeFile] = []OutputConfig{conf}
p.OutputCount++
p.OutputCount.Inc()
p.FinalizationRequired = true
if p.VideoEnabled {
p.VideoEncoding = true
Expand Down Expand Up @@ -118,16 +118,18 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
}

p.Outputs[types.EgressTypeStream] = []OutputConfig{conf}
p.OutputCount += len(stream.Urls)
p.OutputCount.Add(int32(len(stream.Urls)))
if p.VideoEnabled {
p.VideoEncoding = true
}

streamInfoList := make([]*livekit.StreamInfo, 0, len(conf.StreamInfo))
for _, info := range conf.StreamInfo {
streamInfoList = append(streamInfoList, info)
}
streamInfoList := make([]*livekit.StreamInfo, 0, len(stream.Urls))
conf.Streams.Range(func(_, stream any) bool {
streamInfoList = append(streamInfoList, stream.(*Stream).StreamInfo)
return true
})
p.Info.StreamResults = streamInfoList

if len(files)+len(segments)+len(images) == 0 {
// empty stream output only valid in combination with other outputs
if len(stream.Urls) == 0 {
Expand Down Expand Up @@ -157,7 +159,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
}

p.Outputs[types.EgressTypeSegments] = []OutputConfig{conf}
p.OutputCount++
p.OutputCount.Inc()
p.FinalizationRequired = true
if p.VideoEnabled {
p.VideoEncoding = true
Expand Down Expand Up @@ -186,7 +188,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
return err
}

if p.OutputCount == 0 {
if p.OutputCount.Load() == 0 {
return errors.ErrInvalidInput("output")
}

Expand All @@ -205,7 +207,7 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err
p.Info.Result = &livekit.EgressInfo_File{File: conf.FileInfo}

p.Outputs[types.EgressTypeFile] = []OutputConfig{conf}
p.OutputCount = 1
p.OutputCount.Inc()
p.FinalizationRequired = true

case *livekit.TrackEgressRequest_WebsocketUrl:
Expand All @@ -214,15 +216,17 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err
return err
}

streamInfoList := make([]*livekit.StreamInfo, 0, len(conf.StreamInfo))
for _, info := range conf.StreamInfo {
streamInfoList = append(streamInfoList, info)
}
streamInfoList := make([]*livekit.StreamInfo, 0, 1)
conf.Streams.Range(func(_, stream any) bool {
streamInfoList = append(streamInfoList, stream.(*Stream).StreamInfo)
return true
})

p.Info.StreamResults = streamInfoList
p.Info.Result = &livekit.EgressInfo_Stream{Stream: &livekit.StreamInfoList{Info: streamInfoList}}

p.Outputs[types.EgressTypeWebsocket] = []OutputConfig{conf}
p.OutputCount = 1
p.OutputCount.Inc()

default:
return errors.ErrInvalidInput("output")
Expand All @@ -243,7 +247,7 @@ func (p *PipelineConfig) updateImageOutputs(images []*livekit.ImageOutput) error
}

p.Outputs[types.EgressTypeImages] = append(p.Outputs[types.EgressTypeImages], conf)
p.OutputCount++
p.OutputCount.Inc()
p.FinalizationRequired = true

p.Info.ImageResults = append(p.Info.ImageResults, conf.ImagesInfo)
Expand Down
37 changes: 24 additions & 13 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,30 @@
package config

import (
"sync"

"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)

type StreamConfig struct {
outputConfig

Urls []string
StreamIDs map[string]string
StreamInfo map[string]*livekit.StreamInfo
// url -> Stream
Streams sync.Map

twitchTemplate string
}

type Stream struct {
Name string // gstreamer stream ID
ParsedUrl string // parsed/validated url
RedactedUrl string // url with stream key removed
StreamID string // stream ID used by rtmpconnection
StreamInfo *livekit.StreamInfo
}

func (p *PipelineConfig) GetStreamConfig() *StreamConfig {
o, ok := p.Outputs[types.EgressTypeStream]
if !ok || len(o) == 0 {
Expand All @@ -48,22 +58,13 @@ func (p *PipelineConfig) GetWebsocketConfig() *StreamConfig {
func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []string) (*StreamConfig, error) {
conf := &StreamConfig{
outputConfig: outputConfig{OutputType: outputType},
StreamIDs: make(map[string]string),
}

conf.StreamInfo = make(map[string]*livekit.StreamInfo)
var streamInfoList []*livekit.StreamInfo
for _, rawUrl := range urls {
url, redacted, err := conf.ValidateUrl(rawUrl, outputType)
_, err := conf.AddStream(rawUrl, outputType)
if err != nil {
return nil, err
}

conf.Urls = append(conf.Urls, url)

info := &livekit.StreamInfo{Url: redacted}
conf.StreamInfo[url] = info
streamInfoList = append(streamInfoList, info)
}

switch outputType {
Expand All @@ -81,3 +82,13 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str

return conf, nil
}

func (s *Stream) UpdateEndTime(endedAt int64) {
s.StreamInfo.EndedAt = endedAt
if s.StreamInfo.StartedAt == 0 {
logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl)
s.StreamInfo.StartedAt = endedAt
} else {
s.StreamInfo.Duration = endedAt - s.StreamInfo.StartedAt
}
}
3 changes: 2 additions & 1 deletion pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/go-gst/go-gst/gst/app"
"github.com/pion/webrtc/v3"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"gopkg.in/yaml.v3"

Expand Down Expand Up @@ -50,7 +51,7 @@ type PipelineConfig struct {
VideoConfig `yaml:"-"`

Outputs map[types.EgressType][]OutputConfig `yaml:"-"`
OutputCount int `yaml:"-"`
OutputCount atomic.Int32 `yaml:"-"`
FinalizationRequired bool `yaml:"-"`

Info *info.EgressInfo `yaml:"-"`
Expand Down
113 changes: 75 additions & 38 deletions pkg/config/urls.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"net/url"
"regexp"
"strings"
"time"

"github.com/go-jose/go-jose/v3/json"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
)

Expand All @@ -34,75 +36,110 @@ var (
twitchEndpoint = regexp.MustCompile("^rtmps?://.*\\.contribute\\.live-video\\.net/app/(.*)( live=1)?$")
)

func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) {
parsed, err := url.Parse(rawUrl)
func (o *StreamConfig) AddStream(rawUrl string, outputType types.OutputType) (*Stream, error) {
parsed, redacted, streamID, err := o.ValidateUrl(rawUrl, outputType)
if err != nil {
return "", "", errors.ErrInvalidUrl(rawUrl, err.Error())
return nil, err
}
if types.StreamOutputTypes[parsed.Scheme] != outputType {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")

stream := &Stream{
ParsedUrl: parsed,
RedactedUrl: redacted,
StreamID: streamID,
StreamInfo: &livekit.StreamInfo{
Url: redacted,
Status: livekit.StreamInfo_ACTIVE,
},
}
if outputType != types.OutputTypeRTMP {
stream.StreamInfo.StartedAt = time.Now().UnixNano()
}
o.Streams.Store(parsed, stream)

return stream, nil
}

func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (
parsed string, redacted string, streamID string, err error,
) {
parsedUrl, err := url.Parse(rawUrl)
if err != nil {
err = errors.ErrInvalidUrl(rawUrl, err.Error())
return
}
if types.StreamOutputTypes[parsedUrl.Scheme] != outputType {
err = errors.ErrInvalidUrl(rawUrl, "invalid scheme")
return
}

switch outputType {
case types.OutputTypeRTMP:
if parsed.Scheme == "mux" {
rawUrl = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host)
} else if parsed.Scheme == "twitch" {
rawUrl, err = o.updateTwitchURL(parsed.Host)
if parsedUrl.Scheme == "mux" {
parsed = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsedUrl.Host)
} else if parsedUrl.Scheme == "twitch" {
parsed, err = o.updateTwitchURL(parsedUrl.Host)
if err != nil {
return "", "", errors.ErrInvalidUrl(rawUrl, err.Error())
return
}
} else if match := twitchEndpoint.FindStringSubmatch(rawUrl); len(match) > 0 {
updated, err := o.updateTwitchURL(match[1])
if err == nil {
rawUrl = updated
if updated, err := o.updateTwitchURL(match[1]); err == nil {
parsed = updated
}
} else {
parsed = rawUrl
}

redacted, streamID, ok := redactStreamKey(rawUrl)
var ok bool
redacted, streamID, ok = redactStreamKey(parsed)
if !ok {
return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)")
err = errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)")
}
o.StreamIDs[rawUrl] = streamID

return rawUrl, redacted, nil
return

case types.OutputTypeSRT:
return rawUrl, rawUrl, nil
parsed = rawUrl
redacted = rawUrl
return

case types.OutputTypeRaw:
return rawUrl, rawUrl, nil
parsed = rawUrl
redacted = rawUrl
return

default:
return "", "", errors.ErrInvalidInput("stream output type")
err = errors.ErrInvalidInput("stream output type")
return
}
}

func (o *StreamConfig) GetStreamUrl(rawUrl string) (string, error) {
parsed, err := url.Parse(rawUrl)
func (o *StreamConfig) GetStream(rawUrl string) (*Stream, error) {
parsedUrl, err := url.Parse(rawUrl)
if err != nil {
return "", errors.ErrInvalidUrl(rawUrl, err.Error())
return nil, errors.ErrInvalidUrl(rawUrl, err.Error())
}

var twitchKey string
if parsed.Scheme == "mux" {
return fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host), nil
} else if parsed.Scheme == "twitch" {
twitchKey = parsed.Host
var parsed string
if parsedUrl.Scheme == "mux" {
parsed = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsedUrl.Host)
} else if parsedUrl.Scheme == "twitch" {
parsed, err = o.updateTwitchURL(parsedUrl.Host)
if err != nil {
return nil, err
}
} else if match := twitchEndpoint.FindStringSubmatch(rawUrl); len(match) > 0 {
twitchKey = match[1]
parsed, err = o.updateTwitchURL(match[1])
if err != nil {
return nil, err
}
} else {
return rawUrl, nil
parsed = rawUrl
}

// find twitch url by stream key because we can't rely on the ingest endpoint returning consistent results
for u := range o.StreamInfo {
if match := twitchEndpoint.FindStringSubmatch(u); len(match) > 0 && match[1] == twitchKey {
return u, nil
}
stream, ok := o.Streams.Load(parsed)
if !ok {
return nil, errors.ErrStreamNotFound(rawUrl)
}

return "", errors.ErrStreamNotFound(rawUrl)
return stream.(*Stream), nil
}

func (o *StreamConfig) updateTwitchURL(key string) (string, error) {
Expand Down
Loading

0 comments on commit 8602b26

Please sign in to comment.