From 9728ea823d953a6ef98dcc81515ccd60f8ea0dfa Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 31 Dec 2019 14:55:46 +0100 Subject: [PATCH] support multiple paths --- README.md | 8 +-- client.go | 141 +++++++++++++++++++++++++++++-------------- main.go | 33 +++++----- rtsp/request.go | 8 +-- rtsp/request_test.go | 8 +-- rtsplistener.go | 2 +- udplistener.go | 42 ++++++------- 7 files changed, 145 insertions(+), 97 deletions(-) diff --git a/README.md b/README.md index 1ca266721d0..693bfb95700 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ This software was developed with the aim of simulating a live camera feed for de Features: * Supports reading and publishing streams via UDP and TCP -* Supports publishing one stream at once, that can be read by multiple users +* Supports publishing multiple streams at once, each in a separate path, that can be read by multiple users * Supports multiple video and audio tracks for each stream * Supports the RTP/RTCP streaming protocol @@ -33,17 +33,17 @@ Precompiled binaries are available in the [release](https://github.com/aler9/rts 2. In another terminal, publish something with FFmpeg (in this example it's a video file, but it can be anything you want): ``` - ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/ + ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/mystream ``` 3. Open the stream with VLC: ``` - vlc rtsp://localhost:8554/ + vlc rtsp://localhost:8554/mystream ``` you can alternatively use GStreamer: ``` - gst-launch-1.0 -v rtspsrc location=rtsp://localhost:8554/ ! rtph264depay ! decodebin ! autovideosink + gst-launch-1.0 -v rtspsrc location=rtsp://localhost:8554/mystream ! rtph264depay ! decodebin ! autovideosink ```
diff --git a/client.go b/client.go index 7056f8fa2af..5f62ee7c391 100644 --- a/client.go +++ b/client.go @@ -21,14 +21,14 @@ var ( errRecord = errors.New("record") ) -func interleavedChannelToTrack(channel int) (trackFlow, int) { +func interleavedChannelToTrack(channel int) (int, trackFlow) { if (channel % 2) == 0 { - return _TRACK_FLOW_RTP, (channel / 2) + return (channel / 2), _TRACK_FLOW_RTP } - return _TRACK_FLOW_RTCP, ((channel - 1) / 2) + return ((channel - 1) / 2), _TRACK_FLOW_RTCP } -func trackToInterleavedChannel(flow trackFlow, id int) int { +func trackToInterleavedChannel(id int, flow trackFlow) int { if flow == _TRACK_FLOW_RTP { return id * 2 } @@ -84,13 +84,14 @@ type client struct { rconn *rtsp.Conn state string ip net.IP + path string streamSdpText []byte // filled only if publisher streamSdpParsed *sdp.Message // filled only if publisher streamProtocol streamProtocol streamTracks []*track } -func newRtspClient(p *program, nconn net.Conn) *client { +func newClient(p *program, nconn net.Conn) *client { c := &client{ p: p, rconn: rtsp.NewConn(nconn), @@ -113,16 +114,19 @@ func (c *client) close() error { delete(c.p.clients, c) c.rconn.Close() - if c.p.publisher == c { - c.p.publisher = nil + if c.path != "" { + if pub, ok := c.p.publishers[c.path]; ok && pub == c { + delete(c.p.publishers, c.path) - // if the publisher has disconnected - // close all other connections - for oc := range c.p.clients { - oc.close() + // if the publisher has disconnected + // close all other connections that share the same path + for oc := range c.p.clients { + if oc.path == c.path { + oc.close() + } + } } } - return nil } @@ -181,7 +185,7 @@ func (c *client) run() { return } - c.log("is receiving %d %s via %s", len(c.streamTracks), func() string { + c.log("is receiving on path %s, %d %s via %s", c.path, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { return "track" } @@ -219,7 +223,7 @@ func (c *client) run() { c.state = "RECORD" c.p.mutex.Unlock() - c.log("is publishing %d %s via %s", len(c.streamTracks), func() string { + c.log("is publishing on path %s, %d %s via %s", c.path, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { return "track" } @@ -241,7 +245,7 @@ func (c *client) run() { return } - trackFlow, trackId := interleavedChannelToTrack(channel) + trackId, trackFlow := interleavedChannelToTrack(channel) if trackId >= len(c.streamTracks) { c.log("ERR: invalid track id '%d'", trackId) @@ -249,7 +253,7 @@ func (c *client) run() { } c.p.mutex.RLock() - c.p.forwardTrack(trackFlow, trackId, buf[:n]) + c.p.forwardTrack(c.path, trackId, trackFlow, buf[:n]) c.p.mutex.RUnlock() } } @@ -283,10 +287,29 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { return nil, fmt.Errorf("cseq missing") } - ur, err := url.Parse(req.Path) - if err != nil { - return nil, fmt.Errorf("unable to parse path '%s'", req.Path) - } + path, err := func() (string, error) { + ur, err := url.Parse(req.Url) + if err != nil { + return "", fmt.Errorf("unable to parse path '%s'", req.Url) + } + path := ur.Path + + // remove leading slash + if len(path) > 1 { + path = path[1:] + } + + // strip any subpath + if n := strings.Index(path, "/"); n >= 0 { + path = path[:n] + } + + return path, nil + }() + + c.p.mutex.Lock() + c.path = path + c.p.mutex.Unlock() switch req.Method { case "OPTIONS": @@ -319,11 +342,12 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { c.p.mutex.RLock() defer c.p.mutex.RUnlock() - if c.p.publisher == nil { - return nil, fmt.Errorf("no one is streaming") + pub, ok := c.p.publishers[path] + if !ok { + return nil, fmt.Errorf("no one is streaming on path '%s'", path) } - return c.p.publisher.streamSdpText, nil + return pub.streamSdpText, nil }() if err != nil { return nil, err @@ -334,7 +358,7 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { Status: "OK", Headers: map[string]string{ "CSeq": cseq, - "Content-Base": ur.String(), + "Content-Base": req.Url, "Content-Type": "application/sdp", }, Content: sdp, @@ -377,11 +401,13 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { c.p.mutex.Lock() defer c.p.mutex.Unlock() - if c.p.publisher != nil { - return fmt.Errorf("another client is already streaming") + _, ok := c.p.publishers[path] + if ok { + return fmt.Errorf("another client is already publishing on path '%s'", path) } - c.p.publisher = c + c.path = path + c.p.publishers[path] = c c.streamSdpText = req.Content c.streamSdpParsed = sdpParsed c.state = "ANNOUNCE" @@ -414,20 +440,6 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { switch c.state { // play case "STARTING", "PRE_PLAY": - err := func() error { - c.p.mutex.RLock() - defer c.p.mutex.RUnlock() - - if c.p.publisher == nil { - return fmt.Errorf("no one is streaming") - } - - return nil - }() - if err != nil { - return nil, err - } - // play via UDP if _, ok := th["RTP/AVP"]; ok { rtpPort, rtcpPort := th.getClientPorts() @@ -435,18 +447,28 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { return nil, fmt.Errorf("transport header does not have valid client ports (%s)", transportstr) } + if c.path != "" && path != c.path { + return nil, fmt.Errorf("path has changed") + } + err = func() error { c.p.mutex.Lock() defer c.p.mutex.Unlock() + pub, ok := c.p.publishers[path] + if !ok { + return fmt.Errorf("no one is streaming on path '%s'", path) + } + if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP { return fmt.Errorf("client want to send tracks with different protocols") } - if len(c.streamTracks) >= len(c.p.publisher.streamSdpParsed.Medias) { + if len(c.streamTracks) >= len(pub.streamSdpParsed.Medias) { return fmt.Errorf("all the tracks have already been setup") } + c.path = path c.streamProtocol = _STREAM_PROTOCOL_UDP c.streamTracks = append(c.streamTracks, &track{ rtpPort: rtpPort, @@ -480,18 +502,28 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { // play via TCP } else if _, ok := th["RTP/AVP/TCP"]; ok { + if c.path != "" && path != c.path { + return nil, fmt.Errorf("path has changed") + } + err = func() error { c.p.mutex.Lock() defer c.p.mutex.Unlock() + pub, ok := c.p.publishers[path] + if !ok { + return fmt.Errorf("no one is streaming on path '%s'", path) + } + if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP { return fmt.Errorf("client want to send tracks with different protocols") } - if len(c.streamTracks) >= len(c.p.publisher.streamSdpParsed.Medias) { + if len(c.streamTracks) >= len(pub.streamSdpParsed.Medias) { return fmt.Errorf("all the tracks have already been setup") } + c.path = path c.streamProtocol = _STREAM_PROTOCOL_TCP c.streamTracks = append(c.streamTracks, &track{ rtpPort: 0, @@ -531,6 +563,10 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { return nil, fmt.Errorf("transport header does not contain mode=record") } + if path != c.path { + return nil, fmt.Errorf("path has changed") + } + // record via UDP if _, ok := th["RTP/AVP/UDP"]; ok { rtpPort, rtcpPort := th.getClientPorts() @@ -644,11 +680,20 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { return nil, fmt.Errorf("client is in state '%s'", c.state) } + if path != c.path { + return nil, fmt.Errorf("path has changed") + } + err := func() error { c.p.mutex.Lock() defer c.p.mutex.Unlock() - if len(c.streamTracks) != len(c.p.publisher.streamSdpParsed.Medias) { + pub, ok := c.p.publishers[c.path] + if !ok { + return fmt.Errorf("no one is streaming on path '%s'", c.path) + } + + if len(c.streamTracks) != len(pub.streamSdpParsed.Medias) { return fmt.Errorf("not all tracks have been setup") } @@ -672,6 +717,10 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { return nil, fmt.Errorf("client is in state '%s'", c.state) } + if path != c.path { + return nil, fmt.Errorf("path has changed") + } + c.log("paused") c.p.mutex.Lock() @@ -692,6 +741,10 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { return nil, fmt.Errorf("client is in state '%s'", c.state) } + if path != c.path { + return nil, fmt.Errorf("path has changed") + } + err := func() error { c.p.mutex.Lock() defer c.p.mutex.Unlock() diff --git a/main.go b/main.go index b5c3fa58c09..e4c1ee7d9b3 100644 --- a/main.go +++ b/main.go @@ -39,23 +39,24 @@ func (s streamProtocol) String() string { } type program struct { - rtspPort int - rtpPort int - rtcpPort int - mutex sync.RWMutex - rtspl *rtspListener - rtpl *udpListener - rtcpl *udpListener - clients map[*client]struct{} - publisher *client + rtspPort int + rtpPort int + rtcpPort int + mutex sync.RWMutex + rtspl *rtspListener + rtpl *udpListener + rtcpl *udpListener + clients map[*client]struct{} + publishers map[string]*client } func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { p := &program{ - rtspPort: rtspPort, - rtpPort: rtpPort, - rtcpPort: rtcpPort, - clients: make(map[*client]struct{}), + rtspPort: rtspPort, + rtpPort: rtpPort, + rtcpPort: rtcpPort, + clients: make(map[*client]struct{}), + publishers: make(map[string]*client), } var err error @@ -87,9 +88,9 @@ func (p *program) run() { <-infty } -func (p *program) forwardTrack(flow trackFlow, id int, frame []byte) { +func (p *program) forwardTrack(path string, id int, flow trackFlow, frame []byte) { for c := range p.clients { - if c.state == "PLAY" { + if c.path == path && c.state == "PLAY" { if c.streamProtocol == _STREAM_PROTOCOL_UDP { if flow == _TRACK_FLOW_RTP { p.rtpl.nconn.WriteTo(frame, &net.UDPAddr{ @@ -104,7 +105,7 @@ func (p *program) forwardTrack(flow trackFlow, id int, frame []byte) { } } else { - c.rconn.WriteInterleavedFrame(trackToInterleavedChannel(flow, id), frame) + c.rconn.WriteInterleavedFrame(trackToInterleavedChannel(id, flow), frame) } } } diff --git a/rtsp/request.go b/rtsp/request.go index 44b08bd18f8..dee15eacd98 100644 --- a/rtsp/request.go +++ b/rtsp/request.go @@ -8,7 +8,7 @@ import ( type Request struct { Method string - Path string + Url string Headers map[string]string Content []byte } @@ -32,9 +32,9 @@ func requestDecode(r io.Reader) (*Request, error) { if err != nil { return nil, err } - req.Path = string(byts[:len(byts)-1]) + req.Url = string(byts[:len(byts)-1]) - if len(req.Path) == 0 { + if len(req.Url) == 0 { return nil, fmt.Errorf("empty path") } @@ -69,7 +69,7 @@ func requestDecode(r io.Reader) (*Request, error) { func requestEncode(w io.Writer, req *Request) error { wb := bufio.NewWriter(w) - _, err := wb.Write([]byte(req.Method + " " + req.Path + " " + _RTSP_PROTO + "\r\n")) + _, err := wb.Write([]byte(req.Method + " " + req.Url + " " + _RTSP_PROTO + "\r\n")) if err != nil { return err } diff --git a/rtsp/request_test.go b/rtsp/request_test.go index fb910a8d988..10cc51957b3 100644 --- a/rtsp/request_test.go +++ b/rtsp/request_test.go @@ -21,7 +21,7 @@ var casesRequest = []struct { "\r\n"), &Request{ Method: "OPTIONS", - Path: "rtsp://example.com/media.mp4", + Url: "rtsp://example.com/media.mp4", Headers: map[string]string{ "CSeq": "1", "Require": "implicit-play", @@ -36,7 +36,7 @@ var casesRequest = []struct { "\r\n"), &Request{ Method: "DESCRIBE", - Path: "rtsp://example.com/media.mp4", + Url: "rtsp://example.com/media.mp4", Headers: map[string]string{ "CSeq": "2", }, @@ -64,7 +64,7 @@ var casesRequest = []struct { "m=video 2232 RTP/AVP 31\n"), &Request{ Method: "ANNOUNCE", - Path: "rtsp://example.com/media.mp4", + Url: "rtsp://example.com/media.mp4", Headers: map[string]string{ "CSeq": "7", "Date": "23 Jan 1997 15:35:06 GMT", @@ -98,7 +98,7 @@ var casesRequest = []struct { "jitter\n"), &Request{ Method: "GET_PARAMETER", - Path: "rtsp://example.com/media.mp4", + Url: "rtsp://example.com/media.mp4", Headers: map[string]string{ "CSeq": "9", "Content-Type": "text/parameters", diff --git a/rtsplistener.go b/rtsplistener.go index f82071aa6a4..1e88846e622 100644 --- a/rtsplistener.go +++ b/rtsplistener.go @@ -38,7 +38,7 @@ func (l *rtspListener) run() { break } - rsc := newRtspClient(l.p, nconn) + rsc := newClient(l.p, nconn) go rsc.run() } } diff --git a/udplistener.go b/udplistener.go index 5d273679e4e..892aa4770b9 100644 --- a/udplistener.go +++ b/udplistener.go @@ -53,38 +53,32 @@ func (l *udpListener) run() { l.p.mutex.RLock() defer l.p.mutex.RUnlock() - if l.p.publisher == nil { - return - } - - if l.p.publisher.streamProtocol != _STREAM_PROTOCOL_UDP { - return - } - - if !l.p.publisher.ip.Equal(addr.IP) { - return - } - - // get track id by using client port - trackId := func() int { - for i, t := range l.p.publisher.streamTracks { - if l.flow == _TRACK_FLOW_RTP { - if t.rtpPort == addr.Port { - return i + // find path and track id + path, trackId := func() (string, int) { + for _, pub := range l.p.publishers { + for i, t := range pub.streamTracks { + if !pub.ip.Equal(addr.IP) { + continue } - } else { - if t.rtcpPort == addr.Port { - return i + + if l.flow == _TRACK_FLOW_RTP { + if t.rtpPort == addr.Port { + return pub.path, i + } + } else { + if t.rtcpPort == addr.Port { + return pub.path, i + } } } } - return -1 + return "", -1 }() - if trackId < 0 { + if path == "" { return } - l.p.forwardTrack(l.flow, trackId, buf[:n]) + l.p.forwardTrack(path, trackId, l.flow, buf[:n]) }() } }