Skip to content

Commit

Permalink
support static sources in paths with regular expressions (#824) (#2799)
Browse files Browse the repository at this point in the history
This allows to proxy requests to other servers by using regular
expressions.
  • Loading branch information
aler9 authored Dec 10, 2023
1 parent 9bb9d58 commit d261bfe
Show file tree
Hide file tree
Showing 18 changed files with 210 additions and 136 deletions.
22 changes: 19 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi
* [Encrypt the configuration](#encrypt-the-configuration)
* [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression)
* [Record streams to disk](#record-streams-to-disk)
* [Forward streams to another server](#forward-streams-to-another-server)
* [Forward streams to other servers](#forward-streams-to-other-servers)
* [Proxy requests to other servers](#proxy-requests-to-other-servers)
* [On-demand publishing](#on-demand-publishing)
* [Start on boot](#start-on-boot)
* [Linux](#linux)
Expand Down Expand Up @@ -1164,7 +1165,7 @@ To upload recordings to a remote location, you can use _MediaMTX_ together with

If you want to delete local segments after they are uploaded, replace `rclone sync` with `rclone move`.

### Forward streams to another server
### Forward streams to other servers

To forward incoming streams to another server, use _FFmpeg_ inside the `runOnReady` parameter:

Expand All @@ -1173,10 +1174,25 @@ pathDefaults:
runOnReady: >
ffmpeg -i rtsp://localhost:$RTSP_PORT/$MTX_PATH
-c copy
-f rtsp rtsp://another-server/another-path
-f rtsp rtsp://other-server:8554/another-path
runOnReadyRestart: yes
```

### Proxy requests to other servers

The server allows to proxy incoming requests to other servers or cameras. This is useful to expose servers or cameras behind a NAT. Edit `mediamtx.yml` and replace everything inside section `paths` with the following content:

```yml
paths:
"~^proxy_(.+)$":
# If path name is a regular expression, $G1, G2, etc will be replaced
# with regular expression groups.
source: rtsp://other-server:8554/$G1
sourceOnDemand: yes
```

All requests addressed to `rtsp://server:8854/proxy_a` will be forwarded to `rtsp://other-server:8854/a` and so on.

### On-demand publishing

Edit `mediamtx.yml` and replace everything inside section `paths` with the following content:
Expand Down
33 changes: 5 additions & 28 deletions internal/conf/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,26 +235,23 @@ func (pconf *Path) check(conf *Conf, name string) error {

// General

if pconf.Source != "publisher" && pconf.Source != "redirect" &&
pconf.Regexp != nil && !pconf.SourceOnDemand {
return fmt.Errorf("a path with a regular expression (or path 'all') and a static source" +
" must have 'sourceOnDemand' set to true")
}
switch {
case pconf.Source == "publisher":

case strings.HasPrefix(pconf.Source, "rtsp://") ||
strings.HasPrefix(pconf.Source, "rtsps://"):
if pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a RTSP source. use another path")
}

_, err := base.ParseURL(pconf.Source)
if err != nil {
return fmt.Errorf("'%s' is not a valid URL", pconf.Source)
}

case strings.HasPrefix(pconf.Source, "rtmp://") ||
strings.HasPrefix(pconf.Source, "rtmps://"):
if pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a RTMP source. use another path")
}

u, err := gourl.Parse(pconf.Source)
if err != nil {
return fmt.Errorf("'%s' is not a valid URL", pconf.Source)
Expand All @@ -271,10 +268,6 @@ func (pconf *Path) check(conf *Conf, name string) error {

case strings.HasPrefix(pconf.Source, "http://") ||
strings.HasPrefix(pconf.Source, "https://"):
if pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a HLS source. use another path")
}

u, err := gourl.Parse(pconf.Source)
if err != nil {
return fmt.Errorf("'%s' is not a valid URL", pconf.Source)
Expand All @@ -293,19 +286,12 @@ func (pconf *Path) check(conf *Conf, name string) error {
}

case strings.HasPrefix(pconf.Source, "udp://"):
if pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a HLS source. use another path")
}

_, _, err := net.SplitHostPort(pconf.Source[len("udp://"):])
if err != nil {
return fmt.Errorf("'%s' is not a valid UDP URL", pconf.Source)
}

case strings.HasPrefix(pconf.Source, "srt://"):
if pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a SRT source. use another path")
}

_, err := gourl.Parse(pconf.Source)
if err != nil {
Expand All @@ -314,11 +300,6 @@ func (pconf *Path) check(conf *Conf, name string) error {

case strings.HasPrefix(pconf.Source, "whep://") ||
strings.HasPrefix(pconf.Source, "wheps://"):
if pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression (or path 'all') " +
"cannot have a WebRTC/WHEP source. use another path")
}

_, err := gourl.Parse(pconf.Source)
if err != nil {
return fmt.Errorf("'%s' is not a valid URL", pconf.Source)
Expand All @@ -327,10 +308,6 @@ func (pconf *Path) check(conf *Conf, name string) error {
case pconf.Source == "redirect":

case pconf.Source == "rpiCamera":
if pconf.Regexp != nil {
return fmt.Errorf(
"a path with a regular expression (or path 'all') cannot have 'rpiCamera' as source. use another path")
}

default:
return fmt.Errorf("invalid source: '%s'", pconf.Source)
Expand Down
22 changes: 16 additions & 6 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,22 @@ func (pa *path) run() {
if pa.conf.Source == "redirect" {
pa.source = &sourceRedirect{}
} else if pa.conf.HasStaticSource() {
pa.source = newStaticSourceHandler(
pa.conf,
pa.readTimeout,
pa.writeTimeout,
pa.writeQueueSize,
pa)
resolvedSource := pa.conf.Source
if len(pa.matches) > 1 {
for i, ma := range pa.matches[1:] {
resolvedSource = strings.ReplaceAll(resolvedSource, "$G"+strconv.FormatInt(int64(i+1), 10), ma)
}
}

pa.source = &staticSourceHandler{
conf: pa.conf,
readTimeout: pa.readTimeout,
writeTimeout: pa.writeTimeout,
writeQueueSize: pa.writeQueueSize,
resolvedSource: resolvedSource,
parent: pa,
}
pa.source.(*staticSourceHandler).initialize()

if !pa.conf.SourceOnDemand {
pa.source.(*staticSourceHandler).start(false)
Expand Down
74 changes: 74 additions & 0 deletions internal/core/path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,25 @@ func main() {
}
`

type testServer struct {
onDescribe func(*gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error)
onSetup func(*gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error)
onPlay func(*gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error)
}

func (sh *testServer) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
) (*base.Response, *gortsplib.ServerStream, error) {
return sh.onDescribe(ctx)
}

func (sh *testServer) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
return sh.onSetup(ctx)
}

func (sh *testServer) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
return sh.onPlay(ctx)
}

var _ defs.Path = &path{}

func TestPathRunOnDemand(t *testing.T) {
Expand Down Expand Up @@ -573,3 +592,58 @@ func TestPathFallback(t *testing.T) {
})
}
}

func TestPathSourceRegexp(t *testing.T) {
var stream *gortsplib.ServerStream

s := gortsplib.Server{
Handler: &testServer{
onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx,
) (*base.Response, *gortsplib.ServerStream, error) {
require.Equal(t, "/a", ctx.Path)
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "127.0.0.1:8555",
}

err := s.Start()
require.NoError(t, err)
defer s.Close()

stream = gortsplib.NewServerStream(&s, &description.Session{Medias: []*description.Media{testMediaH264}})
defer stream.Close()

p, ok := newInstance(
"paths:\n" +
" '~^test_(.+)$':\n" +
" source: rtsp://127.0.0.1:8555/$G1\n" +
" sourceOnDemand: yes\n" +
" 'all':\n")
require.Equal(t, true, ok)
defer p.Close()

reader := gortsplib.Client{}

u, err := base.ParseURL("rtsp://127.0.0.1:8554/test_a")
require.NoError(t, err)

err = reader.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer reader.Close()

_, _, err = reader.Describe(u)
require.NoError(t, err)
}
84 changes: 41 additions & 43 deletions internal/core/static_source_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ type staticSourceHandlerParent interface {

// staticSourceHandler is a static source handler.
type staticSourceHandler struct {
conf *conf.Path
parent staticSourceHandlerParent
conf *conf.Path
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
writeQueueSize int
resolvedSource string
parent staticSourceHandlerParent

ctx context.Context
ctxCancel func()
Expand All @@ -47,72 +51,66 @@ type staticSourceHandler struct {
done chan struct{}
}

func newStaticSourceHandler(
cnf *conf.Path,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
writeQueueSize int,
parent staticSourceHandlerParent,
) *staticSourceHandler {
s := &staticSourceHandler{
conf: cnf,
parent: parent,
chReloadConf: make(chan *conf.Path),
chInstanceSetReady: make(chan defs.PathSourceStaticSetReadyReq),
chInstanceSetNotReady: make(chan defs.PathSourceStaticSetNotReadyReq),
}
func (s *staticSourceHandler) initialize() {
s.chReloadConf = make(chan *conf.Path)
s.chInstanceSetReady = make(chan defs.PathSourceStaticSetReadyReq)
s.chInstanceSetNotReady = make(chan defs.PathSourceStaticSetNotReadyReq)

switch {
case strings.HasPrefix(cnf.Source, "rtsp://") ||
strings.HasPrefix(cnf.Source, "rtsps://"):
case strings.HasPrefix(s.resolvedSource, "rtsp://") ||
strings.HasPrefix(s.resolvedSource, "rtsps://"):
s.instance = &rtspsource.Source{
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
WriteQueueSize: writeQueueSize,
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
WriteTimeout: s.writeTimeout,
WriteQueueSize: s.writeQueueSize,
Parent: s,
}

case strings.HasPrefix(cnf.Source, "rtmp://") ||
strings.HasPrefix(cnf.Source, "rtmps://"):
case strings.HasPrefix(s.resolvedSource, "rtmp://") ||
strings.HasPrefix(s.resolvedSource, "rtmps://"):
s.instance = &rtmpsource.Source{
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
Parent: s,
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
WriteTimeout: s.writeTimeout,
Parent: s,
}

case strings.HasPrefix(cnf.Source, "http://") ||
strings.HasPrefix(cnf.Source, "https://"):
case strings.HasPrefix(s.resolvedSource, "http://") ||
strings.HasPrefix(s.resolvedSource, "https://"):
s.instance = &hlssource.Source{
ReadTimeout: readTimeout,
Parent: s,
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
Parent: s,
}

case strings.HasPrefix(cnf.Source, "udp://"):
case strings.HasPrefix(s.resolvedSource, "udp://"):
s.instance = &udpsource.Source{
ReadTimeout: readTimeout,
Parent: s,
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
Parent: s,
}

case strings.HasPrefix(cnf.Source, "srt://"):
case strings.HasPrefix(s.resolvedSource, "srt://"):
s.instance = &srtsource.Source{
ReadTimeout: readTimeout,
Parent: s,
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
Parent: s,
}

case strings.HasPrefix(cnf.Source, "whep://") ||
strings.HasPrefix(cnf.Source, "wheps://"):
case strings.HasPrefix(s.resolvedSource, "whep://") ||
strings.HasPrefix(s.resolvedSource, "wheps://"):
s.instance = &webrtcsource.Source{
ReadTimeout: readTimeout,
Parent: s,
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
Parent: s,
}

case cnf.Source == "rpiCamera":
case s.resolvedSource == "rpiCamera":
s.instance = &rpicamerasource.Source{
Parent: s,
}
}

return s
}

func (s *staticSourceHandler) close(reason string) {
Expand Down
7 changes: 4 additions & 3 deletions internal/staticsources/hls/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (

// Source is a HLS static source.
type Source struct {
ReadTimeout conf.StringDuration
Parent defs.StaticSourceParent
ResolvedSource string
ReadTimeout conf.StringDuration
Parent defs.StaticSourceParent
}

// Log implements logger.Writer.
Expand All @@ -43,7 +44,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {

var c *gohlslib.Client
c = &gohlslib.Client{
URI: params.Conf.Source,
URI: s.ResolvedSource,
HTTPClient: &http.Client{
Timeout: time.Duration(s.ReadTimeout),
Transport: &http.Transport{
Expand Down
Loading

0 comments on commit d261bfe

Please sign in to comment.