Skip to content

Commit

Permalink
support multiple paths
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Dec 31, 2019
1 parent 507eff4 commit 9728ea8
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 97 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This software was developed with the aim of simulating a live camera feed for de

Features:
* Supports reading and publishing streams via UDP and TCP
* Supports publishing one stream at once, that can be read by multiple users
* Supports publishing multiple streams at once, each in a separate path, that can be read by multiple users
* Supports multiple video and audio tracks for each stream
* Supports the RTP/RTCP streaming protocol

Expand All @@ -33,17 +33,17 @@ Precompiled binaries are available in the [release](https://github.com/aler9/rts

2. In another terminal, publish something with FFmpeg (in this example it's a video file, but it can be anything you want):
```
ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/
ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/mystream
```

3. Open the stream with VLC:
```
vlc rtsp://localhost:8554/
vlc rtsp://localhost:8554/mystream
```

you can alternatively use GStreamer:
```
gst-launch-1.0 -v rtspsrc location=rtsp://localhost:8554/ ! rtph264depay ! decodebin ! autovideosink
gst-launch-1.0 -v rtspsrc location=rtsp://localhost:8554/mystream ! rtph264depay ! decodebin ! autovideosink
```

<br />
Expand Down
141 changes: 97 additions & 44 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ var (
errRecord = errors.New("record")
)

func interleavedChannelToTrack(channel int) (trackFlow, int) {
func interleavedChannelToTrack(channel int) (int, trackFlow) {
if (channel % 2) == 0 {
return _TRACK_FLOW_RTP, (channel / 2)
return (channel / 2), _TRACK_FLOW_RTP
}
return _TRACK_FLOW_RTCP, ((channel - 1) / 2)
return ((channel - 1) / 2), _TRACK_FLOW_RTCP
}

func trackToInterleavedChannel(flow trackFlow, id int) int {
func trackToInterleavedChannel(id int, flow trackFlow) int {
if flow == _TRACK_FLOW_RTP {
return id * 2
}
Expand Down Expand Up @@ -84,13 +84,14 @@ type client struct {
rconn *rtsp.Conn
state string
ip net.IP
path string
streamSdpText []byte // filled only if publisher
streamSdpParsed *sdp.Message // filled only if publisher
streamProtocol streamProtocol
streamTracks []*track
}

func newRtspClient(p *program, nconn net.Conn) *client {
func newClient(p *program, nconn net.Conn) *client {
c := &client{
p: p,
rconn: rtsp.NewConn(nconn),
Expand All @@ -113,16 +114,19 @@ func (c *client) close() error {
delete(c.p.clients, c)
c.rconn.Close()

if c.p.publisher == c {
c.p.publisher = nil
if c.path != "" {
if pub, ok := c.p.publishers[c.path]; ok && pub == c {
delete(c.p.publishers, c.path)

// if the publisher has disconnected
// close all other connections
for oc := range c.p.clients {
oc.close()
// if the publisher has disconnected
// close all other connections that share the same path
for oc := range c.p.clients {
if oc.path == c.path {
oc.close()
}
}
}
}

return nil
}

Expand Down Expand Up @@ -181,7 +185,7 @@ func (c *client) run() {
return
}

c.log("is receiving %d %s via %s", len(c.streamTracks), func() string {
c.log("is receiving on path %s, %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
Expand Down Expand Up @@ -219,7 +223,7 @@ func (c *client) run() {
c.state = "RECORD"
c.p.mutex.Unlock()

c.log("is publishing %d %s via %s", len(c.streamTracks), func() string {
c.log("is publishing on path %s, %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 {
return "track"
}
Expand All @@ -241,15 +245,15 @@ func (c *client) run() {
return
}

trackFlow, trackId := interleavedChannelToTrack(channel)
trackId, trackFlow := interleavedChannelToTrack(channel)

if trackId >= len(c.streamTracks) {
c.log("ERR: invalid track id '%d'", trackId)
return
}

c.p.mutex.RLock()
c.p.forwardTrack(trackFlow, trackId, buf[:n])
c.p.forwardTrack(c.path, trackId, trackFlow, buf[:n])
c.p.mutex.RUnlock()
}
}
Expand Down Expand Up @@ -283,10 +287,29 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("cseq missing")
}

ur, err := url.Parse(req.Path)
if err != nil {
return nil, fmt.Errorf("unable to parse path '%s'", req.Path)
}
path, err := func() (string, error) {
ur, err := url.Parse(req.Url)
if err != nil {
return "", fmt.Errorf("unable to parse path '%s'", req.Url)
}
path := ur.Path

// remove leading slash
if len(path) > 1 {
path = path[1:]
}

// strip any subpath
if n := strings.Index(path, "/"); n >= 0 {
path = path[:n]
}

return path, nil
}()

c.p.mutex.Lock()
c.path = path
c.p.mutex.Unlock()

switch req.Method {
case "OPTIONS":
Expand Down Expand Up @@ -319,11 +342,12 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
c.p.mutex.RLock()
defer c.p.mutex.RUnlock()

if c.p.publisher == nil {
return nil, fmt.Errorf("no one is streaming")
pub, ok := c.p.publishers[path]
if !ok {
return nil, fmt.Errorf("no one is streaming on path '%s'", path)
}

return c.p.publisher.streamSdpText, nil
return pub.streamSdpText, nil
}()
if err != nil {
return nil, err
Expand All @@ -334,7 +358,7 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Content-Base": ur.String(),
"Content-Base": req.Url,
"Content-Type": "application/sdp",
},
Content: sdp,
Expand Down Expand Up @@ -377,11 +401,13 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()

if c.p.publisher != nil {
return fmt.Errorf("another client is already streaming")
_, ok := c.p.publishers[path]
if ok {
return fmt.Errorf("another client is already publishing on path '%s'", path)
}

c.p.publisher = c
c.path = path
c.p.publishers[path] = c
c.streamSdpText = req.Content
c.streamSdpParsed = sdpParsed
c.state = "ANNOUNCE"
Expand Down Expand Up @@ -414,39 +440,35 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
switch c.state {
// play
case "STARTING", "PRE_PLAY":
err := func() error {
c.p.mutex.RLock()
defer c.p.mutex.RUnlock()

if c.p.publisher == nil {
return fmt.Errorf("no one is streaming")
}

return nil
}()
if err != nil {
return nil, err
}

// play via UDP
if _, ok := th["RTP/AVP"]; ok {
rtpPort, rtcpPort := th.getClientPorts()
if rtpPort == 0 || rtcpPort == 0 {
return nil, fmt.Errorf("transport header does not have valid client ports (%s)", transportstr)
}

if c.path != "" && path != c.path {
return nil, fmt.Errorf("path has changed")
}

err = func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()

pub, ok := c.p.publishers[path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", path)
}

if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP {
return fmt.Errorf("client want to send tracks with different protocols")
}

if len(c.streamTracks) >= len(c.p.publisher.streamSdpParsed.Medias) {
if len(c.streamTracks) >= len(pub.streamSdpParsed.Medias) {
return fmt.Errorf("all the tracks have already been setup")
}

c.path = path
c.streamProtocol = _STREAM_PROTOCOL_UDP
c.streamTracks = append(c.streamTracks, &track{
rtpPort: rtpPort,
Expand Down Expand Up @@ -480,18 +502,28 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {

// play via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if c.path != "" && path != c.path {
return nil, fmt.Errorf("path has changed")
}

err = func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()

pub, ok := c.p.publishers[path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", path)
}

if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP {
return fmt.Errorf("client want to send tracks with different protocols")
}

if len(c.streamTracks) >= len(c.p.publisher.streamSdpParsed.Medias) {
if len(c.streamTracks) >= len(pub.streamSdpParsed.Medias) {
return fmt.Errorf("all the tracks have already been setup")
}

c.path = path
c.streamProtocol = _STREAM_PROTOCOL_TCP
c.streamTracks = append(c.streamTracks, &track{
rtpPort: 0,
Expand Down Expand Up @@ -531,6 +563,10 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("transport header does not contain mode=record")
}

if path != c.path {
return nil, fmt.Errorf("path has changed")
}

// record via UDP
if _, ok := th["RTP/AVP/UDP"]; ok {
rtpPort, rtcpPort := th.getClientPorts()
Expand Down Expand Up @@ -644,11 +680,20 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("client is in state '%s'", c.state)
}

if path != c.path {
return nil, fmt.Errorf("path has changed")
}

err := func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()

if len(c.streamTracks) != len(c.p.publisher.streamSdpParsed.Medias) {
pub, ok := c.p.publishers[c.path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", c.path)
}

if len(c.streamTracks) != len(pub.streamSdpParsed.Medias) {
return fmt.Errorf("not all tracks have been setup")
}

Expand All @@ -672,6 +717,10 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("client is in state '%s'", c.state)
}

if path != c.path {
return nil, fmt.Errorf("path has changed")
}

c.log("paused")

c.p.mutex.Lock()
Expand All @@ -692,6 +741,10 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("client is in state '%s'", c.state)
}

if path != c.path {
return nil, fmt.Errorf("path has changed")
}

err := func() error {
c.p.mutex.Lock()
defer c.p.mutex.Unlock()
Expand Down
33 changes: 17 additions & 16 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,24 @@ func (s streamProtocol) String() string {
}

type program struct {
rtspPort int
rtpPort int
rtcpPort int
mutex sync.RWMutex
rtspl *rtspListener
rtpl *udpListener
rtcpl *udpListener
clients map[*client]struct{}
publisher *client
rtspPort int
rtpPort int
rtcpPort int
mutex sync.RWMutex
rtspl *rtspListener
rtpl *udpListener
rtcpl *udpListener
clients map[*client]struct{}
publishers map[string]*client
}

func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) {
p := &program{
rtspPort: rtspPort,
rtpPort: rtpPort,
rtcpPort: rtcpPort,
clients: make(map[*client]struct{}),
rtspPort: rtspPort,
rtpPort: rtpPort,
rtcpPort: rtcpPort,
clients: make(map[*client]struct{}),
publishers: make(map[string]*client),
}

var err error
Expand Down Expand Up @@ -87,9 +88,9 @@ func (p *program) run() {
<-infty
}

func (p *program) forwardTrack(flow trackFlow, id int, frame []byte) {
func (p *program) forwardTrack(path string, id int, flow trackFlow, frame []byte) {
for c := range p.clients {
if c.state == "PLAY" {
if c.path == path && c.state == "PLAY" {
if c.streamProtocol == _STREAM_PROTOCOL_UDP {
if flow == _TRACK_FLOW_RTP {
p.rtpl.nconn.WriteTo(frame, &net.UDPAddr{
Expand All @@ -104,7 +105,7 @@ func (p *program) forwardTrack(flow trackFlow, id int, frame []byte) {
}

} else {
c.rconn.WriteInterleavedFrame(trackToInterleavedChannel(flow, id), frame)
c.rconn.WriteInterleavedFrame(trackToInterleavedChannel(id, flow), frame)
}
}
}
Expand Down
Loading

0 comments on commit 9728ea8

Please sign in to comment.