Skip to content

Commit

Permalink
support clients that don't specify track ID, like tvheadend (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Jan 20, 2021
1 parent b6277dc commit cc703fe
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 71 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ _rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server a
Features:

* Read and publish live streams with UDP and TCP
* Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MP3, AAC, Opus, PCM)
* Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM)
* Serve multiple streams at once in separate paths
* Encrypt streams with TLS (RTSPS)
* Pull and serve streams from other RTSP or RTMP servers, always or on-demand (RTSP proxy)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210110113516-c3805aadc400
github.com/aler9/gortsplib v0.0.0-20210119134354-d9b819e85e9b
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210110113516-c3805aadc400 h1:mRaK+tJuQQ39M6poSfluxSu94kdqfzMrIi24+zacWeQ=
github.com/aler9/gortsplib v0.0.0-20210110113516-c3805aadc400/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/aler9/gortsplib v0.0.0-20210119134354-d9b819e85e9b h1:gNkdUoWMsgM9URQIWPF8fUgShKiSxteG463yNWZbIiA=
github.com/aler9/gortsplib v0.0.0-20210119134354-d9b819e85e9b/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
134 changes: 70 additions & 64 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,16 @@ func (c *Client) run() {
}

onDescribe := func(req *base.Request) (*base.Response, error) {
basePath, ok := req.URL.BasePath()
reqPath, ok := req.URL.RTSPPath()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to find base path (%s)", req.URL)
}, fmt.Errorf("invalid path (%s)", req.URL)
}

c.describeData = make(chan describeData)

path, err := c.parent.OnClientDescribe(c, basePath, req)
path, err := c.parent.OnClientDescribe(c, reqPath, req)
if err != nil {
switch terr := err.(type) {
case errAuthNotCritical:
Expand Down Expand Up @@ -199,7 +199,7 @@ func (c *Client) run() {
c.path = nil

if res.err != nil {
c.log(logger.Info, "no one is publishing to path '%s'", basePath)
c.log(logger.Info, "no one is publishing to path '%s'", reqPath)
return &base.Response{
StatusCode: base.StatusNotFound,
}, nil
Expand Down Expand Up @@ -242,14 +242,14 @@ func (c *Client) run() {
}

onAnnounce := func(req *base.Request, tracks gortsplib.Tracks) (*base.Response, error) {
basePath, ok := req.URL.BasePath()
reqPath, ok := req.URL.RTSPPath()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to find base path (%s)", req.URL)
}, fmt.Errorf("invalid path (%s)", req.URL)
}

path, err := c.parent.OnClientAnnounce(c, basePath, tracks, req)
path, err := c.parent.OnClientAnnounce(c, reqPath, tracks, req)
if err != nil {
switch terr := err.(type) {
case errAuthNotCritical:
Expand Down Expand Up @@ -281,19 +281,28 @@ func (c *Client) run() {
}

onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) {
basePath, _, ok := req.URL.BasePathControlAttr()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to find control attribute (%s)", req.URL)
}

switch c.conn.State() {
case gortsplib.ServerConnStateInitial, gortsplib.ServerConnStatePrePlay: // play
if c.path != nil && basePath != c.path.Name() {
pathAndQuery, ok := req.URL.RTSPPathAndQuery()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), basePath)
}, fmt.Errorf("invalid path (%s)", req.URL)
}

_, pathAndQuery, ok = base.PathSplitControlAttribute(pathAndQuery)
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid path (%s)", req.URL)
}

reqPath, _ := base.PathSplitQuery(pathAndQuery)

if c.path != nil && reqPath != c.path.Name() {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath)
}

// play with UDP
Expand All @@ -304,7 +313,7 @@ func (c *Client) run() {
}, nil
}

path, err := c.parent.OnClientSetupPlay(c, basePath, trackID, req)
path, err := c.parent.OnClientSetupPlay(c, reqPath, trackID, req)
if err != nil {
switch terr := err.(type) {
case errAuthNotCritical:
Expand Down Expand Up @@ -346,7 +355,7 @@ func (c *Client) run() {
}, nil
}

path, err := c.parent.OnClientSetupPlay(c, basePath, trackID, req)
path, err := c.parent.OnClientSetupPlay(c, reqPath, trackID, req)
if err != nil {
switch terr := err.(type) {
case errAuthNotCritical:
Expand Down Expand Up @@ -380,11 +389,18 @@ func (c *Client) run() {
}, nil

default: // record
// after ANNOUNCE, c.path is already set
if basePath != c.path.Name() {
reqPathAndQuery, ok := req.URL.RTSPPathAndQuery()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), basePath)
}, fmt.Errorf("invalid path (%s)", req.URL)
}

if !strings.HasPrefix(reqPathAndQuery, c.path.Name()) {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("invalid path: must begin with '%s', but is '%s'",
c.path.Name(), reqPathAndQuery)
}

// record with UDP
Expand All @@ -395,18 +411,6 @@ func (c *Client) run() {
}, nil
}

if th.ClientPorts == nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("transport header does not have valid client ports (%s)", req.Header["Transport"])
}

if c.conn.TracksLen() >= c.path.SourceTrackCount() {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("all the tracks have already been setup")
}

return &base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
Expand All @@ -423,12 +427,6 @@ func (c *Client) run() {
}, nil
}

if c.conn.TracksLen() >= c.path.SourceTrackCount() {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("all the tracks have already been setup")
}

return &base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
Expand All @@ -440,20 +438,20 @@ func (c *Client) run() {

onPlay := func(req *base.Request) (*base.Response, error) {
if c.conn.State() == gortsplib.ServerConnStatePrePlay {
basePath, ok := req.URL.BasePath()
reqPath, ok := req.URL.RTSPPath()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to find base path (%s)", req.URL)
}, fmt.Errorf("invalid path (%s)", req.URL)
}

// path can end with a slash, remove it
basePath = strings.TrimSuffix(basePath, "/")
reqPath = strings.TrimSuffix(reqPath, "/")

if basePath != c.path.Name() {
if reqPath != c.path.Name() {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), basePath)
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath)
}

c.startPlay()
Expand All @@ -468,20 +466,20 @@ func (c *Client) run() {
}

onRecord := func(req *base.Request) (*base.Response, error) {
basePath, ok := req.URL.BasePath()
reqPath, ok := req.URL.RTSPPath()
if !ok {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("unable to find base path (%s)", req.URL)
}, fmt.Errorf("invalid path (%s)", req.URL)
}

// path can end with a slash, remove it
basePath = strings.TrimSuffix(basePath, "/")
reqPath = strings.TrimSuffix(reqPath, "/")

if basePath != c.path.Name() {
if reqPath != c.path.Name() {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), basePath)
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath)
}

c.startRecord()
Expand Down Expand Up @@ -592,7 +590,8 @@ func (errAuthCritical) Error() string {
}

// Authenticate performs an authentication.
func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{}, user string, pass string, req *base.Request) error {
func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{},
user string, pass string, req *base.Request, altURL *base.URL) error {
// validate ip
if ips != nil {
ip := c.ip()
Expand All @@ -615,7 +614,8 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
c.authValidator = auth.NewValidator(user, pass, authMethods)
}

err := c.authValidator.ValidateHeader(req.Header["Authorization"], req.Method, req.URL)
err := c.authValidator.ValidateHeader(req.Header["Authorization"],
req.Method, req.URL, altURL)
if err != nil {
c.authFailures++

Expand Down Expand Up @@ -658,12 +658,15 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
func (c *Client) startPlay() {
c.path.OnClientPlay(c)

c.log(logger.Info, "is reading from path '%s', %d %s with %s", c.path.Name(), c.conn.TracksLen(), func() string {
if c.conn.TracksLen() == 1 {
return "track"
}
return "tracks"
}(), *c.conn.TracksProtocol())
c.log(logger.Info, "is reading from path '%s', %d %s with %s", c.path.Name(),
c.conn.SetuppedTracksLen(),
func() string {
if c.conn.SetuppedTracksLen() == 1 {
return "track"
}
return "tracks"
}(),
*c.conn.SetuppedTracksProtocol())

if c.path.Conf().RunOnRead != "" {
c.onReadCmd = externalcmd.New(c.path.Conf().RunOnRead, c.path.Conf().RunOnReadRestart, externalcmd.Environment{
Expand All @@ -682,12 +685,15 @@ func (c *Client) stopPlay() {
func (c *Client) startRecord() {
c.path.OnClientRecord(c)

c.log(logger.Info, "is publishing to path '%s', %d %s with %s", c.path.Name(), c.conn.TracksLen(), func() string {
if c.conn.TracksLen() == 1 {
return "track"
}
return "tracks"
}(), *c.conn.TracksProtocol())
c.log(logger.Info, "is publishing to path '%s', %d %s with %s", c.path.Name(),
c.conn.SetuppedTracksLen(),
func() string {
if c.conn.SetuppedTracksLen() == 1 {
return "track"
}
return "tracks"
}(),
*c.conn.SetuppedTracksProtocol())

if c.path.Conf().RunOnPublish != "" {
c.onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish, c.path.Conf().RunOnPublishRestart, externalcmd.Environment{
Expand All @@ -705,7 +711,7 @@ func (c *Client) stopRecord() {

// OnReaderFrame implements path.Reader.
func (c *Client) OnReaderFrame(trackID int, streamType base.StreamType, buf []byte) {
if !c.conn.HasTrack(trackID) {
if !c.conn.HasSetuppedTrack(trackID) {
return
}

Expand Down
16 changes: 13 additions & 3 deletions internal/pathman/pathman.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ outer:
}

err = req.Client.Authenticate(pm.authMethods, pathConf.ReadIpsParsed,
pathConf.ReadUser, pathConf.ReadPass, req.Req)
pathConf.ReadUser, pathConf.ReadPass, req.Req, nil)
if err != nil {
req.Res <- path.ClientDescribeRes{nil, err} //nolint:govet
continue
Expand Down Expand Up @@ -187,7 +187,8 @@ outer:
}

err = req.Client.Authenticate(pm.authMethods,
pathConf.PublishIpsParsed, pathConf.PublishUser, pathConf.PublishPass, req.Req)
pathConf.PublishIpsParsed, pathConf.PublishUser,
pathConf.PublishPass, req.Req, nil)
if err != nil {
req.Res <- path.ClientAnnounceRes{nil, err} //nolint:govet
continue
Expand Down Expand Up @@ -223,8 +224,17 @@ outer:
continue
}

// VLC strips the control attribute
// provide an alternative URL without the control attribute
altURL := &base.URL{
Scheme: req.Req.URL.Scheme,
Host: req.Req.URL.Host,
Path: "/" + req.PathName + "/",
}

err = req.Client.Authenticate(pm.authMethods,
pathConf.ReadIpsParsed, pathConf.ReadUser, pathConf.ReadPass, req.Req)
pathConf.ReadIpsParsed, pathConf.ReadUser, pathConf.ReadPass,
req.Req, altURL)
if err != nil {
req.Res <- path.ClientSetupPlayRes{nil, err} //nolint:govet
continue
Expand Down

0 comments on commit cc703fe

Please sign in to comment.