Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up stream logic #754

Merged
merged 6 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aws/aws-sdk-go v1.51.28
github.com/bep/debounce v1.2.1
github.com/chromedp/cdproto v0.0.0-20240421230201-ab917191657d
github.com/chromedp/chromedp v0.9.5
github.com/frostbyte73/core v0.0.10
Expand Down Expand Up @@ -49,7 +50,6 @@ require (
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bep/debounce v1.2.1 // indirect
github.com/bufbuild/protovalidate-go v0.6.1 // indirect
github.com/bufbuild/protoyaml-go v0.1.9 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand Down
34 changes: 19 additions & 15 deletions pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
}

p.Outputs[types.EgressTypeFile] = []OutputConfig{conf}
p.OutputCount++
p.OutputCount.Inc()
p.FinalizationRequired = true
if p.VideoEnabled {
p.VideoEncoding = true
Expand Down Expand Up @@ -118,16 +118,18 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
}

p.Outputs[types.EgressTypeStream] = []OutputConfig{conf}
p.OutputCount += len(stream.Urls)
p.OutputCount.Add(int32(len(stream.Urls)))
if p.VideoEnabled {
p.VideoEncoding = true
}

streamInfoList := make([]*livekit.StreamInfo, 0, len(conf.StreamInfo))
for _, info := range conf.StreamInfo {
streamInfoList = append(streamInfoList, info)
}
streamInfoList := make([]*livekit.StreamInfo, 0, len(stream.Urls))
conf.Streams.Range(func(_, stream any) bool {
streamInfoList = append(streamInfoList, stream.(*Stream).StreamInfo)
return true
})
p.Info.StreamResults = streamInfoList

if len(files)+len(segments)+len(images) == 0 {
// empty stream output only valid in combination with other outputs
if len(stream.Urls) == 0 {
Expand Down Expand Up @@ -157,7 +159,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
}

p.Outputs[types.EgressTypeSegments] = []OutputConfig{conf}
p.OutputCount++
p.OutputCount.Inc()
p.FinalizationRequired = true
if p.VideoEnabled {
p.VideoEncoding = true
Expand Down Expand Up @@ -186,7 +188,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
return err
}

if p.OutputCount == 0 {
if p.OutputCount.Load() == 0 {
return errors.ErrInvalidInput("output")
}

Expand All @@ -205,7 +207,7 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err
p.Info.Result = &livekit.EgressInfo_File{File: conf.FileInfo}

p.Outputs[types.EgressTypeFile] = []OutputConfig{conf}
p.OutputCount = 1
p.OutputCount.Inc()
p.FinalizationRequired = true

case *livekit.TrackEgressRequest_WebsocketUrl:
Expand All @@ -214,15 +216,17 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err
return err
}

streamInfoList := make([]*livekit.StreamInfo, 0, len(conf.StreamInfo))
for _, info := range conf.StreamInfo {
streamInfoList = append(streamInfoList, info)
}
streamInfoList := make([]*livekit.StreamInfo, 0, 1)
conf.Streams.Range(func(_, stream any) bool {
streamInfoList = append(streamInfoList, stream.(*Stream).StreamInfo)
return true
})

p.Info.StreamResults = streamInfoList
p.Info.Result = &livekit.EgressInfo_Stream{Stream: &livekit.StreamInfoList{Info: streamInfoList}}

p.Outputs[types.EgressTypeWebsocket] = []OutputConfig{conf}
p.OutputCount = 1
p.OutputCount.Inc()

default:
return errors.ErrInvalidInput("output")
Expand All @@ -243,7 +247,7 @@ func (p *PipelineConfig) updateImageOutputs(images []*livekit.ImageOutput) error
}

p.Outputs[types.EgressTypeImages] = append(p.Outputs[types.EgressTypeImages], conf)
p.OutputCount++
p.OutputCount.Inc()
p.FinalizationRequired = true

p.Info.ImageResults = append(p.Info.ImageResults, conf.ImagesInfo)
Expand Down
37 changes: 24 additions & 13 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,30 @@
package config

import (
"sync"

"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)

type StreamConfig struct {
outputConfig

Urls []string
StreamIDs map[string]string
StreamInfo map[string]*livekit.StreamInfo
// url -> Stream
Streams sync.Map

twitchTemplate string
}

type Stream struct {
Name string // gstreamer stream ID
ParsedUrl string // parsed/validated url
RedactedUrl string // url with stream key removed
StreamID string // stream ID used by rtmpconnection
StreamInfo *livekit.StreamInfo
}

func (p *PipelineConfig) GetStreamConfig() *StreamConfig {
o, ok := p.Outputs[types.EgressTypeStream]
if !ok || len(o) == 0 {
Expand All @@ -48,22 +58,13 @@ func (p *PipelineConfig) GetWebsocketConfig() *StreamConfig {
func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []string) (*StreamConfig, error) {
conf := &StreamConfig{
outputConfig: outputConfig{OutputType: outputType},
StreamIDs: make(map[string]string),
}

conf.StreamInfo = make(map[string]*livekit.StreamInfo)
var streamInfoList []*livekit.StreamInfo
for _, rawUrl := range urls {
url, redacted, err := conf.ValidateUrl(rawUrl, outputType)
_, err := conf.AddStream(rawUrl, outputType)
if err != nil {
return nil, err
}

conf.Urls = append(conf.Urls, url)

info := &livekit.StreamInfo{Url: redacted}
conf.StreamInfo[url] = info
streamInfoList = append(streamInfoList, info)
}

switch outputType {
Expand All @@ -81,3 +82,13 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str

return conf, nil
}

func (s *Stream) UpdateEndTime(endedAt int64) {
s.StreamInfo.EndedAt = endedAt
if s.StreamInfo.StartedAt == 0 {
logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl)
s.StreamInfo.StartedAt = endedAt
} else {
s.StreamInfo.Duration = endedAt - s.StreamInfo.StartedAt
}
}
3 changes: 2 additions & 1 deletion pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/go-gst/go-gst/gst/app"
"github.com/pion/webrtc/v3"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"
"gopkg.in/yaml.v3"

Expand Down Expand Up @@ -50,7 +51,7 @@ type PipelineConfig struct {
VideoConfig `yaml:"-"`

Outputs map[types.EgressType][]OutputConfig `yaml:"-"`
OutputCount int `yaml:"-"`
OutputCount atomic.Int32 `yaml:"-"`
FinalizationRequired bool `yaml:"-"`

Info *info.EgressInfo `yaml:"-"`
Expand Down
113 changes: 75 additions & 38 deletions pkg/config/urls.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"net/url"
"regexp"
"strings"
"time"

"github.com/go-jose/go-jose/v3/json"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
)

Expand All @@ -34,75 +36,110 @@ var (
twitchEndpoint = regexp.MustCompile("^rtmps?://.*\\.contribute\\.live-video\\.net/app/(.*)( live=1)?$")
)

func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) {
parsed, err := url.Parse(rawUrl)
func (o *StreamConfig) AddStream(rawUrl string, outputType types.OutputType) (*Stream, error) {
parsed, redacted, streamID, err := o.ValidateUrl(rawUrl, outputType)
if err != nil {
return "", "", errors.ErrInvalidUrl(rawUrl, err.Error())
return nil, err
}
if types.StreamOutputTypes[parsed.Scheme] != outputType {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")

stream := &Stream{
ParsedUrl: parsed,
RedactedUrl: redacted,
StreamID: streamID,
StreamInfo: &livekit.StreamInfo{
Url: redacted,
Status: livekit.StreamInfo_ACTIVE,
},
}
if outputType != types.OutputTypeRTMP {
stream.StreamInfo.StartedAt = time.Now().UnixNano()
}
o.Streams.Store(parsed, stream)

return stream, nil
}

func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (
parsed string, redacted string, streamID string, err error,
) {
parsedUrl, err := url.Parse(rawUrl)
if err != nil {
err = errors.ErrInvalidUrl(rawUrl, err.Error())
return
}
if types.StreamOutputTypes[parsedUrl.Scheme] != outputType {
err = errors.ErrInvalidUrl(rawUrl, "invalid scheme")
return
}

switch outputType {
case types.OutputTypeRTMP:
if parsed.Scheme == "mux" {
rawUrl = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host)
} else if parsed.Scheme == "twitch" {
rawUrl, err = o.updateTwitchURL(parsed.Host)
if parsedUrl.Scheme == "mux" {
parsed = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsedUrl.Host)
} else if parsedUrl.Scheme == "twitch" {
parsed, err = o.updateTwitchURL(parsedUrl.Host)
if err != nil {
return "", "", errors.ErrInvalidUrl(rawUrl, err.Error())
return
}
} else if match := twitchEndpoint.FindStringSubmatch(rawUrl); len(match) > 0 {
updated, err := o.updateTwitchURL(match[1])
if err == nil {
rawUrl = updated
if updated, err := o.updateTwitchURL(match[1]); err == nil {
parsed = updated
}
} else {
parsed = rawUrl
}

redacted, streamID, ok := redactStreamKey(rawUrl)
var ok bool
redacted, streamID, ok = redactStreamKey(parsed)
if !ok {
return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)")
err = errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)")
}
o.StreamIDs[rawUrl] = streamID

return rawUrl, redacted, nil
return

case types.OutputTypeSRT:
return rawUrl, rawUrl, nil
parsed = rawUrl
redacted = rawUrl
return

case types.OutputTypeRaw:
return rawUrl, rawUrl, nil
parsed = rawUrl
redacted = rawUrl
return

default:
return "", "", errors.ErrInvalidInput("stream output type")
err = errors.ErrInvalidInput("stream output type")
return
}
}

func (o *StreamConfig) GetStreamUrl(rawUrl string) (string, error) {
parsed, err := url.Parse(rawUrl)
func (o *StreamConfig) GetStream(rawUrl string) (*Stream, error) {
parsedUrl, err := url.Parse(rawUrl)
if err != nil {
return "", errors.ErrInvalidUrl(rawUrl, err.Error())
return nil, errors.ErrInvalidUrl(rawUrl, err.Error())
}

var twitchKey string
if parsed.Scheme == "mux" {
return fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host), nil
} else if parsed.Scheme == "twitch" {
twitchKey = parsed.Host
var parsed string
if parsedUrl.Scheme == "mux" {
parsed = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsedUrl.Host)
} else if parsedUrl.Scheme == "twitch" {
parsed, err = o.updateTwitchURL(parsedUrl.Host)
if err != nil {
return nil, err
}
} else if match := twitchEndpoint.FindStringSubmatch(rawUrl); len(match) > 0 {
twitchKey = match[1]
parsed, err = o.updateTwitchURL(match[1])
if err != nil {
return nil, err
}
} else {
return rawUrl, nil
parsed = rawUrl
}

// find twitch url by stream key because we can't rely on the ingest endpoint returning consistent results
for u := range o.StreamInfo {
if match := twitchEndpoint.FindStringSubmatch(u); len(match) > 0 && match[1] == twitchKey {
return u, nil
}
stream, ok := o.Streams.Load(parsed)
if !ok {
return nil, errors.ErrStreamNotFound(rawUrl)
}

return "", errors.ErrStreamNotFound(rawUrl)
return stream.(*Stream), nil
}

func (o *StreamConfig) updateTwitchURL(key string) (string, error) {
Expand Down
Loading
Loading