diff --git a/.gitignore b/.gitignore index 4f062d69caf..ae196a030c9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ +/tmp /release diff --git a/Makefile b/Makefile index 42ac6187620..189b62f62d7 100644 --- a/Makefile +++ b/Makefile @@ -91,25 +91,27 @@ release-nodocker: $(eval export CGO_ENABLED=0) $(eval VERSION := $(shell git describe --tags)) $(eval GOBUILD := go build -ldflags '-X main.Version=$(VERSION)') + rm -rf tmp && mkdir tmp rm -rf release && mkdir release + cp conf.yml tmp/ - GOOS=windows GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server.exe - cd /tmp && zip -q $(PWD)/release/rtsp-simple-server_$(VERSION)_windows_amd64.zip rtsp-simple-server.exe + GOOS=windows GOARCH=amd64 $(GOBUILD) -o tmp/rtsp-simple-server.exe + cd tmp && zip -q $(PWD)/release/rtsp-simple-server_$(VERSION)_windows_amd64.zip rtsp-simple-server.exe conf.yml - GOOS=linux GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server - tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server + GOOS=linux GOARCH=amd64 $(GOBUILD) -o tmp/rtsp-simple-server + tar -C tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server conf.yml - GOOS=linux GOARCH=arm GOARM=6 $(GOBUILD) -o /tmp/rtsp-simple-server - tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm6.tar.gz --owner=0 --group=0 rtsp-simple-server + GOOS=linux GOARCH=arm GOARM=6 $(GOBUILD) -o tmp/rtsp-simple-server + tar -C tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm6.tar.gz --owner=0 --group=0 rtsp-simple-server conf.yml - GOOS=linux GOARCH=arm GOARM=7 $(GOBUILD) -o /tmp/rtsp-simple-server - tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm7.tar.gz --owner=0 --group=0 rtsp-simple-server + GOOS=linux GOARCH=arm GOARM=7 $(GOBUILD) -o tmp/rtsp-simple-server + tar -C tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm7.tar.gz --owner=0 --group=0 rtsp-simple-server conf.yml - GOOS=linux GOARCH=arm64 $(GOBUILD) -o /tmp/rtsp-simple-server - tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm64.tar.gz --owner=0 --group=0 rtsp-simple-server + GOOS=linux GOARCH=arm64 $(GOBUILD) -o tmp/rtsp-simple-server + tar -C tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm64.tar.gz --owner=0 --group=0 rtsp-simple-server conf.yml - GOOS=darwin GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server - tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_darwin_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server + GOOS=darwin GOARCH=amd64 $(GOBUILD) -o tmp/rtsp-simple-server + tar -C tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_darwin_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server conf.yml define DOCKERFILE_IMAGE FROM --platform=linux/amd64 $(BASE_IMAGE) AS build @@ -125,7 +127,7 @@ RUN export CGO_ENABLED=0 $${OPTS} \ FROM scratch COPY --from=build /rtsp-simple-server /rtsp-simple-server -ENTRYPOINT [ "/rtsp-simple-server"] +ENTRYPOINT [ "/rtsp-simple-server" ] endef export DOCKERFILE_IMAGE diff --git a/README.md b/README.md index 23f271e1069..da9814a8217 100644 --- a/README.md +++ b/README.md @@ -3,12 +3,13 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/aler9/rtsp-simple-server)](https://goreportcard.com/report/github.com/aler9/rtsp-simple-server) [![Build Status](https://travis-ci.org/aler9/rtsp-simple-server.svg?branch=master)](https://travis-ci.org/aler9/rtsp-simple-server) -[![Docker Hub](https://img.shields.io/badge/docker-aler9%2Frtsp--simple--proxy-blue)](https://hub.docker.com/r/aler9/rtsp-simple-server) +[![Docker Hub](https://img.shields.io/badge/docker-aler9%2Frtsp--simple--server-blue)](https://hub.docker.com/r/aler9/rtsp-simple-server) -_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server, a software that allows multiple users to publish and read live video and audio streams. RTSP is a standardized protocol that defines how to perform these operations with the help of a server, that is contacted by both readers and publishers in order to negotiate a streaming protocol. The server is then responsible of relaying the publisher streams to the readers. +_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server and RTSP proxy, a software that allows multiple users to publish and read live video and audio streams. RTSP is a standardized protocol that defines how to perform these operations with the help of a server, that is contacted by both readers and publishers in order to negotiate a streaming protocol. The server is then responsible of relaying the publisher streams to the readers. Features: * Read and publish streams via UDP and TCP +* Pull and serve streams from other RTSP servers (RTSP proxy) * Each stream can have multiple video and audio tracks, encoded in any format * Publish multiple streams at once, each in a separate path, that can be read by multiple users * Supports the RTP/RTCP streaming protocol @@ -66,50 +67,33 @@ docker run --rm -it -v $PWD/conf.yml:/conf.yml -p 8554:8554 aler9/rtsp-simple-se #### Full configuration file -To change the configuration, it's enough to create a file named `conf.yml` in the same folder of the executable. The default configuration is the following: +To change the configuration, it's enough to edit the file `conf.yml`, provided with the executable. The default configuration is [available here](conf.yml). + +#### Usage as an RTSP Proxy + +An RTSP proxy is usually deployed in one of these scenarios: +* when there are multiple users that are receiving a stream and the bandwidth is limited, so the proxy is used to receive the stream once. Users can then connect to the proxy instead of the original source. +* when there's a NAT / firewall between a stream and the users, in this case the proxy is installed in the NAT and makes the stream available to the outside world. + +Edit `conf.yml` and replace everything inside section `paths` with the following content: ```yaml -# supported stream protocols (the handshake is always performed with TCP) -protocols: [udp, tcp] -# port of the TCP rtsp listener -rtspPort: 8554 -# port of the UDP rtp listener -rtpPort: 8000 -# port of the UDP rtcp listener -rtcpPort: 8001 -# timeout of read operations -readTimeout: 5s -# timeout of write operations -writeTimeout: 5s -# script to run when a client connects -preScript: -# script to run when a client disconnects -postScript: -# enable pprof on port 9999 to monitor performance -pprof: false - -# these settings are path-dependent. The settings under the path 'all' are -# applied to all paths that do not match a specific entry. paths: - all: - # username required to publish - publishUser: - # password required to publish - publishPass: - # IPs or networks (x.x.x.x/24) allowed to publish - publishIps: [] - - # username required to read - readUser: - # password required to read - readPass: - # IPs or networks (x.x.x.x/24) allowed to read - readIps: [] + # insert one or more entries + proxied: + # url of the source stream, in the format rtsp://user:pass@host:port/path + source: rtsp://original-url +``` +Start the server: ``` +./rtsp-simple-server +``` + +Users can then connect to `rtsp://localhost:8554/proxied`, instead of connecting to the original url. #### Publisher authentication -Create a file named `conf.yml` in the same folder of the executable, with the following content: +Edit `conf.yml` and replace everything inside section `paths` with the following content: ```yaml paths: all: diff --git a/conf.yml b/conf.yml new file mode 100644 index 00000000000..3f9d6f8de02 --- /dev/null +++ b/conf.yml @@ -0,0 +1,44 @@ + +# supported stream protocols (the handshake is always performed with TCP) +protocols: [udp, tcp] +# port of the TCP rtsp listener +rtspPort: 8554 +# port of the UDP rtp listener +rtpPort: 8000 +# port of the UDP rtcp listener +rtcpPort: 8001 +# timeout of read operations +readTimeout: 5s +# timeout of write operations +writeTimeout: 5s +# script to run when a client connects +preScript: +# script to run when a client disconnects +postScript: +# enable pprof on port 9999 to monitor performance +pprof: false + +# these settings are path-dependent. The settings under the path 'all' are +# applied to all paths that do not match a specific entry. +paths: + all: + # source of the stream - this can be: + # * record -> the stream is provided by a client through the RECORD command (like ffmpeg) + # * rtsp://url -> the stream is pulled from another RTSP server + source: record + # if the source is an RTSP url, this is the protocol that will be used to pull the stream + sourceProtocol: udp + + # username required to publish + publishUser: + # password required to publish + publishPass: + # IPs or networks (x.x.x.x/24) allowed to publish + publishIps: [] + + # username required to read + readUser: + # password required to read + readPass: + # IPs or networks (x.x.x.x/24) allowed to read + readIps: [] diff --git a/main.go b/main.go index 11282822576..4e5fb433b0b 100644 --- a/main.go +++ b/main.go @@ -85,19 +85,17 @@ type programEventClientClose struct { func (programEventClientClose) isProgramEvent() {} -type programEventClientGetStreamSdp struct { +type programEventClientDescribe struct { path string res chan []byte } -func (programEventClientGetStreamSdp) isProgramEvent() {} +func (programEventClientDescribe) isProgramEvent() {} type programEventClientAnnounce struct { - res chan error - client *serverClient - path string - sdpText []byte - sdpParsed *sdp.Message + res chan error + client *serverClient + path string } func (programEventClientAnnounce) isProgramEvent() {} @@ -151,36 +149,59 @@ type programEventClientRecord struct { func (programEventClientRecord) isProgramEvent() {} -type programEventFrameUdp struct { +type programEventClientFrameUdp struct { trackFlowType trackFlowType addr *net.UDPAddr buf []byte } -func (programEventFrameUdp) isProgramEvent() {} +func (programEventClientFrameUdp) isProgramEvent() {} -type programEventFrameTcp struct { +type programEventClientFrameTcp struct { path string trackId int trackFlowType trackFlowType buf []byte } -func (programEventFrameTcp) isProgramEvent() {} +func (programEventClientFrameTcp) isProgramEvent() {} + +type programEventStreamerReady struct { + streamer *streamer +} + +func (programEventStreamerReady) isProgramEvent() {} + +type programEventStreamerNotReady struct { + streamer *streamer +} + +func (programEventStreamerNotReady) isProgramEvent() {} + +type programEventStreamerFrame struct { + streamer *streamer + trackId int + trackFlowType trackFlowType + buf []byte +} + +func (programEventStreamerFrame) isProgramEvent() {} type programEventTerminate struct{} func (programEventTerminate) isProgramEvent() {} type ConfPath struct { - PublishUser string `yaml:"publishUser"` - PublishPass string `yaml:"publishPass"` - PublishIps []string `yaml:"publishIps"` - publishIps []interface{} - ReadUser string `yaml:"readUser"` - ReadPass string `yaml:"readPass"` - ReadIps []string `yaml:"readIps"` - readIps []interface{} + Source string `yaml:"source"` + SourceProtocol string `yaml:"sourceProtocol"` + PublishUser string `yaml:"publishUser"` + PublishPass string `yaml:"publishPass"` + PublishIps []string `yaml:"publishIps"` + publishIps []interface{} + ReadUser string `yaml:"readUser"` + ReadPass string `yaml:"readPass"` + ReadIps []string `yaml:"readIps"` + readIps []interface{} } type conf struct { @@ -230,6 +251,13 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) { } } +// a publisher can be either a serverClient or a streamer +type publisher interface { + publisherIsReady() bool + publisherSdpText() []byte + publisherSdpParsed() *sdp.Message +} + type program struct { conf *conf protocols map[streamProtocol]struct{} @@ -237,7 +265,8 @@ type program struct { udplRtp *serverUdpListener udplRtcp *serverUdpListener clients map[*serverClient]struct{} - publishers map[string]*serverClient + streamers []*streamer + publishers map[string]publisher publisherCount int receiverCount int @@ -313,7 +342,20 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) { } } - for _, pconf := range conf.Paths { + p := &program{ + conf: conf, + protocols: protocols, + clients: make(map[*serverClient]struct{}), + publishers: make(map[string]publisher), + events: make(chan programEvent), + done: make(chan struct{}), + } + + for path, pconf := range conf.Paths { + if pconf.Source == "" { + pconf.Source = "record" + } + if pconf.PublishUser != "" { if !regexp.MustCompile("^[a-zA-Z0-9]+$").MatchString(pconf.PublishUser) { return nil, fmt.Errorf("publish username must be alphanumeric") @@ -349,15 +391,24 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) { if err != nil { return nil, err } - } - p := &program{ - conf: conf, - protocols: protocols, - clients: make(map[*serverClient]struct{}), - publishers: make(map[string]*serverClient), - events: make(chan programEvent), - done: make(chan struct{}), + if pconf.Source != "record" { + if path == "all" { + return nil, fmt.Errorf("path 'all' cannot have a RTSP source") + } + + if pconf.SourceProtocol == "" { + pconf.SourceProtocol = "udp" + } + + s, err := newStreamer(p, path, pconf.Source, pconf.SourceProtocol) + if err != nil { + return nil, err + } + + p.streamers = append(p.streamers, s) + p.publishers[path] = s + } } p.log("rtsp-simple-server %s", Version) @@ -392,6 +443,9 @@ func newProgram(sargs []string, stdin io.Reader) (*program, error) { go p.udplRtp.run() go p.udplRtcp.run() go p.tcpl.run() + for _, s := range p.streamers { + go s.run() + } go p.run() return p, nil @@ -424,11 +478,13 @@ outer: if pub, ok := p.publishers[evt.client.path]; ok && pub == evt.client { delete(p.publishers, evt.client.path) - // if the publisher has disconnected - // close all other connections that share the same path - for oc := range p.clients { - if oc.path == evt.client.path { - go oc.close() + // if the publisher has disconnected and was ready + // close all other clients that share the same path + if pub.publisherIsReady() { + for oc := range p.clients { + if oc.path == evt.client.path { + go oc.close() + } } } } @@ -445,36 +501,37 @@ outer: evt.client.log("disconnected") close(evt.done) - case programEventClientGetStreamSdp: + case programEventClientDescribe: pub, ok := p.publishers[evt.path] - if !ok { + if !ok || !pub.publisherIsReady() { evt.res <- nil continue } - evt.res <- pub.streamSdpText + + evt.res <- pub.publisherSdpText() case programEventClientAnnounce: _, ok := p.publishers[evt.path] if ok { - evt.res <- fmt.Errorf("another client is already publishing on path '%s'", evt.path) + evt.res <- fmt.Errorf("someone is already publishing on path '%s'", evt.path) continue } evt.client.path = evt.path - evt.client.streamSdpText = evt.sdpText - evt.client.streamSdpParsed = evt.sdpParsed evt.client.state = _CLIENT_STATE_ANNOUNCE p.publishers[evt.path] = evt.client evt.res <- nil case programEventClientSetupPlay: pub, ok := p.publishers[evt.path] - if !ok { + if !ok || !pub.publisherIsReady() { evt.res <- fmt.Errorf("no one is streaming on path '%s'", evt.path) continue } - if len(evt.client.streamTracks) >= len(pub.streamSdpParsed.Medias) { + sdpParsed := pub.publisherSdpParsed() + + if len(evt.client.streamTracks) >= len(sdpParsed.Medias) { evt.res <- fmt.Errorf("all the tracks have already been setup") continue } @@ -499,12 +556,14 @@ outer: case programEventClientPlay1: pub, ok := p.publishers[evt.client.path] - if !ok { + if !ok || !pub.publisherIsReady() { evt.res <- fmt.Errorf("no one is streaming on path '%s'", evt.client.path) continue } - if len(evt.client.streamTracks) != len(pub.streamSdpParsed.Medias) { + sdpParsed := pub.publisherSdpParsed() + + if len(evt.client.streamTracks) != len(sdpParsed.Medias) { evt.res <- fmt.Errorf("not all tracks have been setup") continue } @@ -526,40 +585,65 @@ outer: evt.client.state = _CLIENT_STATE_RECORD evt.res <- nil - case programEventFrameUdp: + case programEventClientFrameUdp: // find publisher and track id from ip and port - pub, trackId := func() (*serverClient, int) { + cl, trackId := func() (*serverClient, int) { for _, pub := range p.publishers { - if pub.streamProtocol != _STREAM_PROTOCOL_UDP || - pub.state != _CLIENT_STATE_RECORD || - !pub.ip().Equal(evt.addr.IP) { + cl, ok := pub.(*serverClient) + if !ok { continue } - for i, t := range pub.streamTracks { + if cl.streamProtocol != _STREAM_PROTOCOL_UDP || + cl.state != _CLIENT_STATE_RECORD || + !cl.ip().Equal(evt.addr.IP) { + continue + } + + for i, t := range cl.streamTracks { if evt.trackFlowType == _TRACK_FLOW_RTP { if t.rtpPort == evt.addr.Port { - return pub, i + return cl, i } } else { if t.rtcpPort == evt.addr.Port { - return pub, i + return cl, i } } } } return nil, -1 }() - if pub == nil { + if cl == nil { continue } - pub.udpLastFrameTime = time.Now() - p.forwardTrack(pub.path, trackId, evt.trackFlowType, evt.buf) + cl.udpLastFrameTime = time.Now() + p.forwardTrack(cl.path, trackId, evt.trackFlowType, evt.buf) - case programEventFrameTcp: + case programEventClientFrameTcp: p.forwardTrack(evt.path, evt.trackId, evt.trackFlowType, evt.buf) + case programEventStreamerReady: + evt.streamer.ready = true + p.publisherCount += 1 + evt.streamer.log("ready") + + case programEventStreamerNotReady: + evt.streamer.ready = false + p.publisherCount -= 1 + evt.streamer.log("not ready") + + // close all clients that share the same path + for oc := range p.clients { + if oc.path == evt.streamer.path { + go oc.close() + } + } + + case programEventStreamerFrame: + p.forwardTrack(evt.streamer.path, evt.trackId, evt.trackFlowType, evt.buf) + case programEventTerminate: break outer } @@ -571,7 +655,7 @@ outer: case programEventClientClose: close(evt.done) - case programEventClientGetStreamSdp: + case programEventClientDescribe: evt.res <- nil case programEventClientAnnounce: @@ -598,6 +682,10 @@ outer: } }() + for _, s := range p.streamers { + s.close() + } + p.tcpl.close() p.udplRtcp.close() p.udplRtp.close() diff --git a/main_test.go b/main_test.go index af585fc7188..80f916ce117 100644 --- a/main_test.go +++ b/main_test.go @@ -229,3 +229,70 @@ func TestReadAuth(t *testing.T) { require.Equal(t, "all right\n", string(cnt2.stdout.Bytes())) } + +func TestProxy(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + stdin := []byte("\n" + + "paths:\n" + + " all:\n" + + " readUser: testuser\n" + + " readPass: testpass\n") + p1, err := newProgram([]string{"stdin"}, bytes.NewBuffer(stdin)) + require.NoError(t, err) + defer p1.close() + + time.Sleep(1 * time.Second) + + cnt1, err := newContainer("ffmpeg", "source", []string{ + "-hide_banner", + "-loglevel", "panic", + "-re", + "-stream_loop", "-1", + "-i", "/emptyvideo.ts", + "-c", "copy", + "-f", "rtsp", + "-rtsp_transport", "udp", + "rtsp://" + ownDockerIp + ":8554/teststream", + }) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + stdin = []byte("\n" + + "rtspPort: 8555\n" + + "rtpPort: 8100\n" + + "rtcpPort: 8101\n" + + "\n" + + "paths:\n" + + " proxied:\n" + + " source: rtsp://testuser:testpass@localhost:8554/teststream\n" + + " sourceProtocol: " + proto + "\n") + p2, err := newProgram([]string{"stdin"}, bytes.NewBuffer(stdin)) + require.NoError(t, err) + defer p2.close() + + time.Sleep(1 * time.Second) + + cnt2, err := newContainer("ffmpeg", "dest", []string{ + "-hide_banner", + "-loglevel", "panic", + "-rtsp_transport", "udp", + "-i", "rtsp://" + ownDockerIp + ":8555/proxied", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) + require.NoError(t, err) + defer cnt2.close() + + cnt2.wait() + + require.Equal(t, "all right\n", string(cnt2.stdout.Bytes())) + }) + } +} diff --git a/server-client.go b/server-client.go index fadb27f1a76..35aae0ba648 100644 --- a/server-client.go +++ b/server-client.go @@ -123,6 +123,18 @@ func (c *serverClient) zone() string { return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).Zone } +func (c *serverClient) publisherIsReady() bool { + return c.state == _CLIENT_STATE_RECORD +} + +func (c *serverClient) publisherSdpText() []byte { + return c.streamSdpText +} + +func (c *serverClient) publisherSdpParsed() *sdp.Message { + return c.streamSdpParsed +} + func (c *serverClient) run() { if c.p.conf.PreScript != "" { preScript := exec.Command(c.p.conf.PreScript) @@ -367,7 +379,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { } res := make(chan []byte) - c.p.events <- programEventClientGetStreamSdp{path, res} + c.p.events <- programEventClientDescribe{path, res} sdp := <-res if sdp == nil { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("no one is streaming on path '%s'", path)) @@ -431,13 +443,16 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { } res := make(chan error) - c.p.events <- programEventClientAnnounce{res, c, path, req.Content, sdpParsed} + c.p.events <- programEventClientAnnounce{res, c, path} err = <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) return false } + c.streamSdpText = req.Content + c.streamSdpParsed = sdpParsed + c.conn.WriteResponse(&gortsplib.Response{ StatusCode: gortsplib.StatusOK, Header: gortsplib.Header{ @@ -869,7 +884,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return false } - c.p.events <- programEventFrameTcp{ + c.p.events <- programEventClientFrameTcp{ c.path, trackId, trackFlowType, diff --git a/server-udpl.go b/server-udpl.go index aa4af5df9ed..71ca5c13c2e 100644 --- a/server-udpl.go +++ b/server-udpl.go @@ -81,7 +81,7 @@ func (l *serverUdpListener) run() { break } - l.p.events <- programEventFrameUdp{ + l.p.events <- programEventClientFrameUdp{ l.trackFlowType, addr, buf[:n], diff --git a/streamer-udpl.go b/streamer-udpl.go new file mode 100644 index 00000000000..75ecefbc395 --- /dev/null +++ b/streamer-udpl.go @@ -0,0 +1,95 @@ +package main + +import ( + "net" + "time" +) + +type streamerUdpListenerState int + +const ( + _UDPL_STATE_STARTING streamerUdpListenerState = iota + _UDPL_STATE_RUNNING +) + +type streamerUdpListener struct { + p *program + streamer *streamer + trackId int + trackFlowType trackFlowType + publisherIp net.IP + publisherPort int + nconn *net.UDPConn + running bool + readBuf1 []byte + readBuf2 []byte + readCurBuf bool + lastFrameTime time.Time + + done chan struct{} +} + +func newStreamerUdpListener(p *program, port int, streamer *streamer, + trackId int, trackFlowType trackFlowType, publisherIp net.IP) (*streamerUdpListener, error) { + nconn, err := net.ListenUDP("udp", &net.UDPAddr{ + Port: port, + }) + if err != nil { + return nil, err + } + + l := &streamerUdpListener{ + p: p, + streamer: streamer, + trackId: trackId, + trackFlowType: trackFlowType, + publisherIp: publisherIp, + nconn: nconn, + readBuf1: make([]byte, 2048), + readBuf2: make([]byte, 2048), + lastFrameTime: time.Now(), + done: make(chan struct{}), + } + + return l, nil +} + +func (l *streamerUdpListener) close() { + l.nconn.Close() + + if l.running { + <-l.done + } +} + +func (l *streamerUdpListener) start() { + l.running = true + go l.run() +} + +func (l *streamerUdpListener) run() { + for { + var buf []byte + if !l.readCurBuf { + buf = l.readBuf1 + } else { + buf = l.readBuf2 + } + l.readCurBuf = !l.readCurBuf + + n, addr, err := l.nconn.ReadFromUDP(buf) + if err != nil { + break + } + + if !l.publisherIp.Equal(addr.IP) || addr.Port != l.publisherPort { + continue + } + + l.lastFrameTime = time.Now() + + l.p.events <- programEventStreamerFrame{l.streamer, l.trackId, l.trackFlowType, buf[:n]} + } + + close(l.done) +} diff --git a/streamer.go b/streamer.go new file mode 100644 index 00000000000..a1a81aaea3e --- /dev/null +++ b/streamer.go @@ -0,0 +1,618 @@ +package main + +import ( + "fmt" + "math/rand" + "net" + "net/url" + "strconv" + "strings" + "time" + + "github.com/aler9/gortsplib" + "gortc.io/sdp" +) + +const ( + _DIAL_TIMEOUT = 10 * time.Second + _RETRY_INTERVAL = 5 * time.Second + _CHECK_STREAM_INTERVAL = 6 * time.Second + _STREAM_DEAD_AFTER = 5 * time.Second + _KEEPALIVE_INTERVAL = 60 * time.Second +) + +type streamerUdpListenerPair struct { + udplRtp *streamerUdpListener + udplRtcp *streamerUdpListener +} + +type streamer struct { + p *program + path string + ur *url.URL + proto streamProtocol + ready bool + clientSdpParsed *sdp.Message + serverSdpText []byte + serverSdpParsed *sdp.Message + firstTime bool + readBuf1 []byte + readBuf2 []byte + readCurBuf bool + + terminate chan struct{} + done chan struct{} +} + +func newStreamer(p *program, path string, source string, sourceProtocol string) (*streamer, error) { + ur, err := url.Parse(source) + if err != nil { + return nil, fmt.Errorf("'%s' is not a valid source not an RTSP url", source) + } + if ur.Scheme != "rtsp" { + return nil, fmt.Errorf("'%s' is not a valid RTSP url", source) + } + + if ur.User != nil { + pass, _ := ur.User.Password() + user := ur.User.Username() + if user != "" && pass == "" || + user == "" && pass != "" { + fmt.Errorf("username and password must be both provided") + } + } + + proto, err := func() (streamProtocol, error) { + switch sourceProtocol { + case "udp": + return _STREAM_PROTOCOL_UDP, nil + + case "tcp": + return _STREAM_PROTOCOL_TCP, nil + } + return streamProtocol(0), fmt.Errorf("unsupported protocol '%s'", sourceProtocol) + }() + if err != nil { + return nil, err + } + + s := &streamer{ + p: p, + path: path, + ur: ur, + proto: proto, + firstTime: true, + readBuf1: make([]byte, 0, 512*1024), + readBuf2: make([]byte, 0, 512*1024), + terminate: make(chan struct{}), + done: make(chan struct{}), + } + + return s, nil +} + +func (s *streamer) log(format string, args ...interface{}) { + s.p.log("[streamer "+s.path+"] "+format, args...) +} + +func (s *streamer) publisherIsReady() bool { + return s.ready +} + +func (s *streamer) publisherSdpText() []byte { + return s.serverSdpText +} + +func (s *streamer) publisherSdpParsed() *sdp.Message { + return s.serverSdpParsed +} + +func (s *streamer) run() { + for { + ok := s.do() + if !ok { + break + } + } + + close(s.done) +} + +func (s *streamer) do() bool { + if s.firstTime { + s.firstTime = false + } else { + t := time.NewTimer(_RETRY_INTERVAL) + select { + case <-s.terminate: + return false + case <-t.C: + } + } + + s.log("initializing with protocol %s", s.proto) + + var nconn net.Conn + var err error + dialDone := make(chan struct{}) + go func() { + nconn, err = net.DialTimeout("tcp", s.ur.Host, _DIAL_TIMEOUT) + close(dialDone) + }() + + select { + case <-s.terminate: + return false + case <-dialDone: + } + + if err != nil { + s.log("ERR: %s", err) + return true + } + defer nconn.Close() + + conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{ + NConn: nconn, + Username: func() string { + if s.ur.User != nil { + return s.ur.User.Username() + } + return "" + }(), + Password: func() string { + if s.ur.User != nil { + pass, _ := s.ur.User.Password() + return pass + } + return "" + }(), + ReadTimeout: s.p.conf.ReadTimeout, + WriteTimeout: s.p.conf.WriteTimeout, + }) + if err != nil { + s.log("ERR: %s", err) + return true + } + + res, err := conn.WriteRequest(&gortsplib.Request{ + Method: gortsplib.OPTIONS, + Url: &url.URL{ + Scheme: "rtsp", + Host: s.ur.Host, + Path: "/", + }, + }) + if err != nil { + s.log("ERR: %s", err) + return true + } + + // OPTIONS is not available in some cameras + if res.StatusCode != gortsplib.StatusOK && res.StatusCode != gortsplib.StatusNotFound { + s.log("ERR: OPTIONS returned code %d (%s)", res.StatusCode, res.StatusMessage) + return true + } + + res, err = conn.WriteRequest(&gortsplib.Request{ + Method: gortsplib.DESCRIBE, + Url: &url.URL{ + Scheme: "rtsp", + Host: s.ur.Host, + Path: s.ur.Path, + RawQuery: s.ur.RawQuery, + }, + }) + if err != nil { + s.log("ERR: %s", err) + return true + } + + if res.StatusCode != gortsplib.StatusOK { + s.log("ERR: DESCRIBE returned code %d (%s)", res.StatusCode, res.StatusMessage) + return true + } + + contentType, ok := res.Header["Content-Type"] + if !ok || len(contentType) != 1 { + s.log("ERR: Content-Type not provided") + return true + } + + if contentType[0] != "application/sdp" { + s.log("ERR: wrong Content-Type, expected application/sdp") + return true + } + + clientSdpParsed, err := gortsplib.SDPParse(res.Content) + if err != nil { + s.log("ERR: invalid SDP: %s", err) + return true + } + + // create a filtered SDP that is used by the server (not by the client) + serverSdpParsed, serverSdpText := gortsplib.SDPFilter(clientSdpParsed, res.Content) + + s.clientSdpParsed = clientSdpParsed + s.serverSdpText = serverSdpText + s.serverSdpParsed = serverSdpParsed + + if s.proto == _STREAM_PROTOCOL_UDP { + return s.runUdp(conn) + } else { + return s.runTcp(conn) + } +} + +func (s *streamer) runUdp(conn *gortsplib.ConnClient) bool { + publisherIp := conn.NetConn().RemoteAddr().(*net.TCPAddr).IP + + var streamerUdpListenerPairs []streamerUdpListenerPair + + defer func() { + for _, pair := range streamerUdpListenerPairs { + pair.udplRtp.close() + pair.udplRtcp.close() + } + }() + + for i, media := range s.clientSdpParsed.Medias { + var rtpPort int + var rtcpPort int + var udplRtp *streamerUdpListener + var udplRtcp *streamerUdpListener + func() { + for { + // choose two consecutive ports in range 65536-10000 + // rtp must be pair and rtcp odd + rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000 + rtcpPort = rtpPort + 1 + + var err error + udplRtp, err = newStreamerUdpListener(s.p, rtpPort, s, i, + _TRACK_FLOW_RTP, publisherIp) + if err != nil { + continue + } + + udplRtcp, err = newStreamerUdpListener(s.p, rtcpPort, s, i, + _TRACK_FLOW_RTCP, publisherIp) + if err != nil { + udplRtp.close() + continue + } + + return + } + }() + + res, err := conn.WriteRequest(&gortsplib.Request{ + Method: gortsplib.SETUP, + Url: func() *url.URL { + control := media.Attributes.Value("control") + + // no control attribute + if control == "" { + return s.ur + } + + // absolute path + if strings.HasPrefix(control, "rtsp://") { + ur, err := url.Parse(control) + if err != nil { + return s.ur + } + return ur + } + + // relative path + return &url.URL{ + Scheme: "rtsp", + Host: s.ur.Host, + Path: func() string { + ret := s.ur.Path + + if len(ret) == 0 || ret[len(ret)-1] != '/' { + ret += "/" + } + + control := media.Attributes.Value("control") + if control != "" { + ret += control + } else { + ret += "trackID=" + strconv.FormatInt(int64(i+1), 10) + } + + return ret + }(), + RawQuery: s.ur.RawQuery, + } + }(), + Header: gortsplib.Header{ + "Transport": []string{strings.Join([]string{ + "RTP/AVP/UDP", + "unicast", + fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort), + }, ";")}, + }, + }) + if err != nil { + s.log("ERR: %s", err) + udplRtp.close() + udplRtcp.close() + return true + } + + if res.StatusCode != gortsplib.StatusOK { + s.log("ERR: SETUP returned code %d (%s)", res.StatusCode, res.StatusMessage) + udplRtp.close() + udplRtcp.close() + return true + } + + tsRaw, ok := res.Header["Transport"] + if !ok || len(tsRaw) != 1 { + s.log("ERR: transport header not provided") + udplRtp.close() + udplRtcp.close() + return true + } + + th := gortsplib.ReadHeaderTransport(tsRaw[0]) + rtpServerPort, rtcpServerPort := th.GetPorts("server_port") + if rtpServerPort == 0 { + s.log("ERR: server ports not provided") + udplRtp.close() + udplRtcp.close() + return true + } + + udplRtp.publisherPort = rtpServerPort + udplRtcp.publisherPort = rtcpServerPort + + streamerUdpListenerPairs = append(streamerUdpListenerPairs, streamerUdpListenerPair{ + udplRtp: udplRtp, + udplRtcp: udplRtcp, + }) + } + + res, err := conn.WriteRequest(&gortsplib.Request{ + Method: gortsplib.PLAY, + Url: &url.URL{ + Scheme: "rtsp", + Host: s.ur.Host, + Path: s.ur.Path, + RawQuery: s.ur.RawQuery, + }, + }) + if err != nil { + s.log("ERR: %s", err) + return true + } + + if res.StatusCode != gortsplib.StatusOK { + s.log("ERR: PLAY returned code %d (%s)", res.StatusCode, res.StatusMessage) + return true + } + + for _, pair := range streamerUdpListenerPairs { + pair.udplRtp.start() + pair.udplRtcp.start() + } + + tickerSendKeepalive := time.NewTicker(_KEEPALIVE_INTERVAL) + defer tickerSendKeepalive.Stop() + + tickerCheckStream := time.NewTicker(_CHECK_STREAM_INTERVAL) + defer tickerSendKeepalive.Stop() + + s.p.events <- programEventStreamerReady{s} + + defer func() { + s.p.events <- programEventStreamerNotReady{s} + }() + + for { + select { + case <-s.terminate: + return false + + case <-tickerSendKeepalive.C: + _, err = conn.WriteRequest(&gortsplib.Request{ + Method: gortsplib.OPTIONS, + Url: &url.URL{ + Scheme: "rtsp", + Host: s.ur.Host, + Path: "/", + }, + }) + if err != nil { + s.log("ERR: %s", err) + return true + } + + case <-tickerCheckStream.C: + lastFrameTime := time.Time{} + + for _, pair := range streamerUdpListenerPairs { + lft := pair.udplRtp.lastFrameTime + if lft.After(lastFrameTime) { + lastFrameTime = lft + } + + lft = pair.udplRtp.lastFrameTime + if lft.After(lastFrameTime) { + lastFrameTime = lft + } + } + + if time.Since(lastFrameTime) >= _STREAM_DEAD_AFTER { + s.log("ERR: stream is dead") + return true + } + } + } +} + +func (s *streamer) runTcp(conn *gortsplib.ConnClient) bool { + for i, media := range s.clientSdpParsed.Medias { + interleaved := fmt.Sprintf("interleaved=%d-%d", (i * 2), (i*2)+1) + + res, err := conn.WriteRequest(&gortsplib.Request{ + Method: gortsplib.SETUP, + Url: func() *url.URL { + control := media.Attributes.Value("control") + + // no control attribute + if control == "" { + return s.ur + } + + // absolute path + if strings.HasPrefix(control, "rtsp://") { + ur, err := url.Parse(control) + if err != nil { + return s.ur + } + return ur + } + + // relative path + return &url.URL{ + Scheme: "rtsp", + Host: s.ur.Host, + Path: func() string { + ret := s.ur.Path + + if len(ret) == 0 || ret[len(ret)-1] != '/' { + ret += "/" + } + + control := media.Attributes.Value("control") + if control != "" { + ret += control + } else { + ret += "trackID=" + strconv.FormatInt(int64(i+1), 10) + } + + return ret + }(), + RawQuery: s.ur.RawQuery, + } + }(), + Header: gortsplib.Header{ + "Transport": []string{strings.Join([]string{ + "RTP/AVP/TCP", + "unicast", + interleaved, + }, ";")}, + }, + }) + if err != nil { + s.log("ERR: %s", err) + return true + } + + if res.StatusCode != gortsplib.StatusOK { + s.log("ERR: SETUP returned code %d (%s)", res.StatusCode, res.StatusMessage) + return true + } + + tsRaw, ok := res.Header["Transport"] + if !ok || len(tsRaw) != 1 { + s.log("ERR: transport header not provided") + return true + } + + th := gortsplib.ReadHeaderTransport(tsRaw[0]) + + _, ok = th[interleaved] + if !ok { + s.log("ERR: transport header does not have %s (%s)", interleaved, tsRaw[0]) + return true + } + } + + err := conn.WriteRequestNoResponse(&gortsplib.Request{ + Method: gortsplib.PLAY, + Url: &url.URL{ + Scheme: "rtsp", + Host: s.ur.Host, + Path: s.ur.Path, + RawQuery: s.ur.RawQuery, + }, + }) + if err != nil { + s.log("ERR: %s", err) + return true + } + + frame := &gortsplib.InterleavedFrame{} + +outer: + for { + if !s.readCurBuf { + frame.Content = s.readBuf1 + } else { + frame.Content = s.readBuf2 + } + + frame.Content = frame.Content[:cap(frame.Content)] + s.readCurBuf = !s.readCurBuf + + vres, err := conn.ReadInterleavedFrameOrResponse(frame) + if err != nil { + s.log("ERR: %s", err) + return true + } + + switch res := vres.(type) { + case *gortsplib.Response: + if res.StatusCode != gortsplib.StatusOK { + s.log("ERR: PLAY returned code %d (%s)", res.StatusCode, res.StatusMessage) + return true + } + break outer + + case *gortsplib.InterleavedFrame: + // ignore the frames sent before the response + } + } + + s.p.events <- programEventStreamerReady{s} + + defer func() { + s.p.events <- programEventStreamerNotReady{s} + }() + + chanConnError := make(chan struct{}) + go func() { + for { + frame := &gortsplib.InterleavedFrame{ + Content: make([]byte, 512*1024), + } + err := conn.ReadInterleavedFrame(frame) + if err != nil { + s.log("ERR: %s", err) + close(chanConnError) + break + } + + trackId, trackFlowType := interleavedChannelToTrack(frame.Channel) + + s.p.events <- programEventStreamerFrame{s, trackId, trackFlowType, frame.Content} + } + }() + + select { + case <-s.terminate: + return false + case <-chanConnError: + return true + } +} + +func (s *streamer) close() { + close(s.terminate) + <-s.done +}