Skip to content

Commit

Permalink
hls muxer: when hlsAlwaysRemux is on, automatically recreate muxers in
Browse files Browse the repository at this point in the history
case of errors
  • Loading branch information
aler9 committed Jul 24, 2022
1 parent c769088 commit 8a4743f
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 41 deletions.
7 changes: 3 additions & 4 deletions internal/core/hls_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ type hlsMuxerParent interface {

type hlsMuxer struct {
name string
remoteAddr string
externalAuthenticationURL string
hlsAlwaysRemux bool
hlsVariant conf.HLSVariant
hlsSegmentCount int
hlsSegmentDuration conf.StringDuration
Expand Down Expand Up @@ -143,7 +143,6 @@ func newHLSMuxer(
name string,
remoteAddr string,
externalAuthenticationURL string,
hlsAlwaysRemux bool,
hlsVariant conf.HLSVariant,
hlsSegmentCount int,
hlsSegmentDuration conf.StringDuration,
Expand All @@ -160,8 +159,8 @@ func newHLSMuxer(

m := &hlsMuxer{
name: name,
remoteAddr: remoteAddr,
externalAuthenticationURL: externalAuthenticationURL,
hlsAlwaysRemux: hlsAlwaysRemux,
hlsVariant: hlsVariant,
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
Expand Down Expand Up @@ -398,7 +397,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
select {
case <-closeCheckTicker.C:
t := time.Unix(atomic.LoadInt64(m.lastRequestTime), 0)
if !m.hlsAlwaysRemux && time.Since(t) >= closeAfterInactivity {
if m.remoteAddr != "" && time.Since(t) >= closeAfterInactivity {
m.ringBuffer.Close()
<-writerDone
return fmt.Errorf("not used anymore")
Expand Down
34 changes: 28 additions & 6 deletions internal/core/hls_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ type hlsServer struct {
muxers map[string]*hlsMuxer

// in
pathSourceReady chan *path
request chan *hlsMuxerRequest
muxerClose chan *hlsMuxer
apiMuxersList chan hlsServerAPIMuxersListReq
pathSourceReady chan *path
pathSourceNotReady chan *path
request chan *hlsMuxerRequest
muxerClose chan *hlsMuxer
apiMuxersList chan hlsServerAPIMuxersListReq
}

func newHLSServer(
Expand Down Expand Up @@ -142,6 +143,7 @@ func newHLSServer(
tlsConfig: tlsConfig,
muxers: make(map[string]*hlsMuxer),
pathSourceReady: make(chan *path),
pathSourceNotReady: make(chan *path),
request: make(chan *hlsMuxerRequest),
muxerClose: make(chan *hlsMuxer),
apiMuxersList: make(chan hlsServerAPIMuxersListReq),
Expand Down Expand Up @@ -204,6 +206,15 @@ outer:
s.findOrCreateMuxer(pa.Name(), "", nil)
}

case pa := <-s.pathSourceNotReady:
if s.hlsAlwaysRemux {
c, ok := s.muxers[pa.Name()]
if ok {
c.close()
delete(s.muxers, pa.Name())
}
}

case req := <-s.request:
s.findOrCreateMuxer(req.dir, req.ctx.ClientIP(), req)

Expand All @@ -213,6 +224,10 @@ outer:
}
delete(s.muxers, c.PathName())

if s.hlsAlwaysRemux && c.remoteAddr == "" {
s.findOrCreateMuxer(c.PathName(), "", nil)
}

case req := <-s.apiMuxersList:
muxers := make(map[string]*hlsMuxer)

Expand Down Expand Up @@ -331,7 +346,6 @@ func (s *hlsServer) findOrCreateMuxer(pathName string, remoteAddr string, req *h
pathName,
remoteAddr,
s.externalAuthenticationURL,
s.hlsAlwaysRemux,
s.hlsVariant,
s.hlsSegmentCount,
s.hlsSegmentDuration,
Expand All @@ -358,14 +372,22 @@ func (s *hlsServer) onMuxerClose(c *hlsMuxer) {
}
}

// onPathSourceReady is called by core.
// onPathSourceReady is called by pathManager.
func (s *hlsServer) onPathSourceReady(pa *path) {
select {
case s.pathSourceReady <- pa:
case <-s.ctx.Done():
}
}

// onPathSourceNotReady is called by pathManager.
func (s *hlsServer) onPathSourceNotReady(pa *path) {
select {
case s.pathSourceNotReady <- pa:
case <-s.ctx.Done():
}
}

// onAPIHLSMuxersList is called by api.
func (s *hlsServer) onAPIHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes {
req.res = make(chan hlsServerAPIMuxersListRes)
Expand Down
11 changes: 8 additions & 3 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (pathErrAuthCritical) Error() string {
type pathParent interface {
log(logger.Level, string, ...interface{})
onPathSourceReady(*path)
onPathSourceNotReady(*path)
onPathClose(*path)
}

Expand Down Expand Up @@ -533,7 +534,9 @@ func (pa *path) run() {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
}

pa.sourceSetNotReady()
if pa.sourceReady {
pa.sourceSetNotReady()
}

if pa.source != nil {
if source, ok := pa.source.(sourceStatic); ok {
Expand Down Expand Up @@ -655,8 +658,6 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
pa.sourceReady = true
pa.stream = newStream(tracks)

pa.parent.onPathSourceReady(pa)

if pa.conf.RunOnReady != "" {
pa.log(logger.Info, "runOnReady command started")
pa.onReadyCmd = externalcmd.NewCmd(
Expand All @@ -668,9 +669,13 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
pa.log(logger.Info, "runOnReady command exited with code %d", co)
})
}

pa.parent.onPathSourceReady(pa)
}

func (pa *path) sourceSetNotReady() {
pa.parent.onPathSourceNotReady(pa)

for r := range pa.readers {
pa.doReaderRemove(r)
r.close()
Expand Down
72 changes: 44 additions & 28 deletions internal/core/path_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

type pathManagerHLSServer interface {
onPathSourceReady(pa *path)
onPathSourceReady(*path)
onPathSourceNotReady(*path)
}

type pathManagerParent interface {
Expand All @@ -35,14 +36,15 @@ type pathManager struct {
paths map[string]*path

// in
confReload chan map[string]*conf.PathConf
pathClose chan *path
pathSourceReady chan *path
describe chan pathDescribeReq
readerSetupPlay chan pathReaderSetupPlayReq
publisherAnnounce chan pathPublisherAnnounceReq
hlsServerSet chan pathManagerHLSServer
apiPathsList chan pathAPIPathsListReq
confReload chan map[string]*conf.PathConf
pathClose chan *path
pathSourceReady chan *path
pathSourceNotReady chan *path
describe chan pathDescribeReq
readerSetupPlay chan pathReaderSetupPlayReq
publisherAnnounce chan pathPublisherAnnounceReq
hlsServerSet chan pathManagerHLSServer
apiPathsList chan pathAPIPathsListReq
}

func newPathManager(
Expand All @@ -59,25 +61,26 @@ func newPathManager(
ctx, ctxCancel := context.WithCancel(parentCtx)

pm := &pathManager{
rtspAddress: rtspAddress,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
pathConfs: pathConfs,
externalCmdPool: externalCmdPool,
metrics: metrics,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
paths: make(map[string]*path),
confReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path),
pathSourceReady: make(chan *path),
describe: make(chan pathDescribeReq),
readerSetupPlay: make(chan pathReaderSetupPlayReq),
publisherAnnounce: make(chan pathPublisherAnnounceReq),
hlsServerSet: make(chan pathManagerHLSServer),
apiPathsList: make(chan pathAPIPathsListReq),
rtspAddress: rtspAddress,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
pathConfs: pathConfs,
externalCmdPool: externalCmdPool,
metrics: metrics,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
paths: make(map[string]*path),
confReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path),
pathSourceReady: make(chan *path),
pathSourceNotReady: make(chan *path),
describe: make(chan pathDescribeReq),
readerSetupPlay: make(chan pathReaderSetupPlayReq),
publisherAnnounce: make(chan pathPublisherAnnounceReq),
hlsServerSet: make(chan pathManagerHLSServer),
apiPathsList: make(chan pathAPIPathsListReq),
}

for pathConfName, pathConf := range pm.pathConfs {
Expand Down Expand Up @@ -165,6 +168,11 @@ outer:
pm.hlsServer.onPathSourceReady(pa)
}

case pa := <-pm.pathSourceNotReady:
if pm.hlsServer != nil {
pm.hlsServer.onPathSourceNotReady(pa)
}

case req := <-pm.describe:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
if err != nil {
Expand Down Expand Up @@ -323,6 +331,14 @@ func (pm *pathManager) onPathSourceReady(pa *path) {
}
}

// onPathSourceNotReady is called by path.
func (pm *pathManager) onPathSourceNotReady(pa *path) {
select {
case pm.pathSourceNotReady <- pa:
case <-pm.ctx.Done():
}
}

// onPathClose is called by path.
func (pm *pathManager) onPathClose(pa *path) {
select {
Expand Down

0 comments on commit 8a4743f

Please sign in to comment.