Skip to content

Commit

Permalink
Add an error return to makeStreamID to make issues explicit (#400)
Browse files Browse the repository at this point in the history
* Add an error return to makeStreamID to make issues explicit

* Reinstate nil check
  • Loading branch information
thomshutt authored May 28, 2024
1 parent 11a5584 commit 343a3dd
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 27 deletions.
4 changes: 2 additions & 2 deletions cmd/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func main() {

lpms.HandleRTMPPublish(
//makeStreamID (give the stream an ID)
func(url *url.URL) stream.AppData {
func(url *url.URL) (stream.AppData, error) {
s := exampleStream(randString(10))
return &s
return &s, nil
},

//gotStream
Expand Down
4 changes: 2 additions & 2 deletions cmd/example/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func main() {
lpms := core.New(&core.LPMSOpts{WorkDir: fmt.Sprintf("%v/.tmp", dir)})

lpms.HandleRTMPPublish(
func(url *url.URL) stream.AppData {
func(url *url.URL) (stream.AppData, error) {
glog.Infof("Stream has been started!: %v", url)
return exampleStream(randString(10))
return exampleStream(randString(10)), nil
},

func(url *url.URL, rs stream.RTMPVideoStream) (err error) {
Expand Down
20 changes: 10 additions & 10 deletions core/lpms.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//The RTMP server. This will put up a RTMP endpoint when starting up Swarm.
//To integrate with LPMS means your code will become the source / destination of the media server.
//This RTMP endpoint is mainly used for video upload. The expected url is rtmp://localhost:port/livepeer/stream
// The RTMP server. This will put up a RTMP endpoint when starting up Swarm.
// To integrate with LPMS means your code will become the source / destination of the media server.
// This RTMP endpoint is mainly used for video upload. The expected url is rtmp://localhost:port/livepeer/stream
package core

import (
Expand Down Expand Up @@ -67,7 +67,7 @@ func defaultLPMSOpts(opts *LPMSOpts) {
}
}

//New creates a new LPMS server object. It really just brokers everything to the components.
// New creates a new LPMS server object. It really just brokers everything to the components.
func New(opts *LPMSOpts) *LPMS {
defaultLPMSOpts(opts)
var rtmpServer *joy4rtmp.Server
Expand All @@ -83,7 +83,7 @@ func New(opts *LPMSOpts) *LPMS {
return &LPMS{vidPlayer: player, vidListener: listener, workDir: opts.WorkDir, rtmpAddr: opts.RtmpAddr, httpAddr: httpAddr}
}

//Start starts the rtmp and http servers, and initializes ffmpeg
// Start starts the rtmp and http servers, and initializes ffmpeg
func (l *LPMS) Start(ctx context.Context) error {
ec := make(chan error, 1)
ffmpeg.InitFFmpeg()
Expand Down Expand Up @@ -114,21 +114,21 @@ func (l *LPMS) Start(ctx context.Context) error {
return nil
}

//HandleRTMPPublish offload to the video listener. To understand how it works, look at videoListener.HandleRTMPPublish.
// HandleRTMPPublish offload to the video listener. To understand how it works, look at videoListener.HandleRTMPPublish.
func (l *LPMS) HandleRTMPPublish(
makeStreamID func(url *url.URL) (strmID stream.AppData),
makeStreamID func(url *url.URL) (strmID stream.AppData, err error),
gotStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error),
endStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error) {

l.vidListener.HandleRTMPPublish(makeStreamID, gotStream, endStream)
}

//HandleRTMPPlay offload to the video player
// HandleRTMPPlay offload to the video player
func (l *LPMS) HandleRTMPPlay(getStream func(url *url.URL) (stream.RTMPVideoStream, error)) error {
return l.vidPlayer.HandleRTMPPlay(getStream)
}

//HandleHLSPlay offload to the video player
// HandleHLSPlay offload to the video player
func (l *LPMS) HandleHLSPlay(
getMasterPlaylist func(url *url.URL) (*m3u8.MasterPlaylist, error),
getMediaPlaylist func(url *url.URL) (*m3u8.MediaPlaylist, error),
Expand All @@ -137,7 +137,7 @@ func (l *LPMS) HandleHLSPlay(
l.vidPlayer.HandleHLSPlay(getMasterPlaylist, getMediaPlaylist, getSegment)
}

//SegmentRTMPToHLS takes a rtmp stream and re-packages it into a HLS stream with the specified segmenter options
// SegmentRTMPToHLS takes a rtmp stream and re-packages it into a HLS stream with the specified segmenter options
func (l *LPMS) SegmentRTMPToHLS(ctx context.Context, rs stream.RTMPVideoStream, hs stream.HLSVideoStream, segOptions segmenter.SegmenterOptions) error {
// set localhost if necessary. Check more problematic addrs? [::] ?
rtmpAddr := l.rtmpAddr
Expand Down
14 changes: 7 additions & 7 deletions vidlistener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ type VidListener struct {
RtmpServer *joy4rtmp.Server
}

//HandleRTMPPublish takes 3 parameters - makeStreamID, gotStream, and endStream.
//makeStreamID is called when the stream starts. It should return a streamID from the requestURL.
//gotStream is called when the stream starts. It gives you access to the stream.
//endStream is called when the stream ends. It gives you access to the stream.
// HandleRTMPPublish takes 3 parameters - makeStreamID, gotStream, and endStream.
// makeStreamID is called when the stream starts. It should return a streamID from the requestURL.
// gotStream is called when the stream starts. It gives you access to the stream.
// endStream is called when the stream ends. It gives you access to the stream.
func (self *VidListener) HandleRTMPPublish(
makeStreamID func(url *url.URL) (strmID stream.AppData),
makeStreamID func(url *url.URL) (strmID stream.AppData, err error),
gotStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error,
endStream func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error) {

if self.RtmpServer != nil {
self.RtmpServer.HandlePublish = func(conn *joy4rtmp.Conn) {
glog.V(2).Infof("RTMP server got upstream: %v", conn.URL)

strmID := makeStreamID(conn.URL)
if strmID == nil || strmID.StreamID() == "" {
strmID, err := makeStreamID(conn.URL)
if err != nil || strmID == nil || strmID.StreamID() == "" {
conn.Close()
return
}
Expand Down
12 changes: 6 additions & 6 deletions vidlistener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func TestListener(t *testing.T) {

listener.HandleRTMPPublish(
//makeStreamID
func(url *url.URL) stream.AppData {
return newTestStream()
func(url *url.URL) (stream.AppData, error) {
return newTestStream(), nil
},
//gotStream
func(url *url.URL, rtmpStrm stream.RTMPVideoStream) (err error) {
Expand Down Expand Up @@ -82,8 +82,8 @@ func TestListenerError(t *testing.T) {
failures := 0
badListener.HandleRTMPPublish(
//makeStreamID
func(url *url.URL) stream.AppData {
return newTestStream()
func(url *url.URL) (stream.AppData, error) {
return newTestStream(), nil
},
//gotStream
func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error {
Expand Down Expand Up @@ -123,9 +123,9 @@ func TestListenerEmptyStreamID(t *testing.T) {

badListener.HandleRTMPPublish(
//makeStreamID
func(url *url.URL) stream.AppData {
func(url *url.URL) (stream.AppData, error) {
// On returning empty stream id connection should be closed
return newTestStream()
return newTestStream(), nil
},
//gotStream
func(url *url.URL, rtmpStrm stream.RTMPVideoStream) error {
Expand Down

0 comments on commit 343a3dd

Please sign in to comment.