Skip to content

Commit

Permalink
new options runOnConnect, runOnPublish, runOnRead
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Jul 13, 2020
1 parent 8898ad2 commit b837b7a
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 32 deletions.
16 changes: 10 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ test-nodocker:

define DOCKERFILE_RUN
FROM amd64/$(BASE_IMAGE)
RUN apk add --no-cache git
RUN apk add --no-cache git ffmpeg
WORKDIR /s
COPY go.mod go.sum ./
RUN go mod download
Expand All @@ -71,12 +71,16 @@ define CONFIG_RUN

paths:
all:
readUser: test
readPass: tast
# readUser: test
# readPass: tast

# proxied:
# source: rtsp://192.168.10.1/unicast
# sourceProtocol: udp

original:
runOnPublish: ffmpeg -i rtsp://localhost:8554/original -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed

proxied:
source: rtsp://192.168.10.1/unicast
sourceProtocol: udp
endef
export CONFIG_RUN

Expand Down
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Features:
* Each stream can have multiple video and audio tracks, encoded in any format
* Publish multiple streams at once, each in a separate path, that can be read by multiple users
* Supports authentication
* Supports running a script when a client connects or disconnects
* Run custom commands when clients connect, disconnect, read or publish streams (linux only)
* Compatible with Linux, Windows and Mac, does not require any dependency or interpreter, it's a single executable

## Installation and basic usage
Expand Down Expand Up @@ -66,7 +66,7 @@ docker run --rm -it -v $PWD/conf.yml:/conf.yml -p 8554:8554 aler9/rtsp-simple-se

#### Full configuration file

To change the configuration, it's enough to edit the file `conf.yml`, provided with the executable. The default configuration is [available here](conf.yml).
To change the configuration, it's enough to edit the `conf.yml` file, provided with the executable. The default configuration is [available here](conf.yml).

#### Usage as RTSP Proxy

Expand Down Expand Up @@ -115,9 +115,12 @@ WARNING: RTSP is a plain protocol, and the credentials can be intercepted and re

_rtsp-simple-server_ is an RTSP server: it publishes existing streams and does not touch them. It is not a media server, that is a far more complex and heavy software that can receive existing streams, re-encode them and publish them.

To change the format, codec or compression of a stream, you can use _FFmpeg_ or _Gstreamer_ together with _rtsp-simple-server_, obtaining the same features of a media server. For instance, if we want to re-encode an existing stream, that is available in the `/original` path, and make the resulting stream available in the `/compressed` path, it is enough to launch _FFmpeg_ in parallel with _rtsp-simple-server_, with the following syntax:
To change the format, codec or compression of a stream, you can use _FFmpeg_ or _Gstreamer_ together with _rtsp-simple-server_, obtaining the same features of a media server. For instance, to re-encode an existing stream, that is available in the `/original` path, and publish the resulting stream in the `/compressed` path, edit `conf.yml` and replace everything inside section `paths` with the following content:
```
ffmpeg -i rtsp://localhost:8554/original -c:v libx264 -preset ultrafast -tune zerolatency -b 600k -f rtsp rtsp://localhost:8554/compressed
paths:
all:
original:
runOnPublish: ffmpeg -i rtsp://localhost:8554/original -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed
```

#### Client count
Expand Down
10 changes: 8 additions & 2 deletions conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type ConfPath struct {
ReadPass string `yaml:"readPass"`
ReadIps []string `yaml:"readIps"`
readIpsParsed []interface{}
RunOnPublish string `yaml:"runOnPublish"`
RunOnRead string `yaml:"runOnRead"`
}

type conf struct {
Expand All @@ -30,8 +32,7 @@ type conf struct {
RtspPort int `yaml:"rtspPort"`
RtpPort int `yaml:"rtpPort"`
RtcpPort int `yaml:"rtcpPort"`
PreScript string `yaml:"preScript"`
PostScript string `yaml:"postScript"`
RunOnConnect string `yaml:"runOnConnect"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
StreamDeadAfter time.Duration `yaml:"streamDeadAfter"`
Expand Down Expand Up @@ -148,6 +149,11 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
}

for path, pconf := range conf.Paths {
if pconf == nil {
conf.Paths[path] = &ConfPath{}
pconf = conf.Paths[path]
}

if pconf.Source == "" {
pconf.Source = "record"
}
Expand Down
15 changes: 11 additions & 4 deletions conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ rtspPort: 8554
rtpPort: 8000
# port of the UDP RTCP listener
rtcpPort: 8001
# script to run when a client connects
preScript:
# script to run when a client disconnects
postScript:
# command to run when a client connects.
# this is terminated with SIGINT when a client disconnects.
runOnConnect:
# timeout of read operations
readTimeout: 5s
# timeout of write operations
Expand Down Expand Up @@ -46,3 +45,11 @@ paths:
readPass:
# IPs or networks (x.x.x.x/24) allowed to read
readIps: []

# command to run when a client starts publishing.
# This is terminated with SIGINT when a client stops publishing.
runOnPublish:

# command to run when a clients starts reading.
# This is terminated with SIGINT when a client stops reading.
runOnRead:
67 changes: 51 additions & 16 deletions server-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net"
"os"
"os/exec"
"strings"
"time"
Expand Down Expand Up @@ -125,9 +126,12 @@ func (c *serverClient) publisherSdpParsed() *sdp.SessionDescription {
}

func (c *serverClient) run() {
if c.p.conf.PreScript != "" {
preScript := exec.Command(c.p.conf.PreScript)
err := preScript.Run()
var runOnConnectCmd *exec.Cmd
if c.p.conf.RunOnConnect != "" {
runOnConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect)
runOnConnectCmd.Stdout = os.Stdout
runOnConnectCmd.Stderr = os.Stderr
err := runOnConnectCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
Expand Down Expand Up @@ -155,15 +159,10 @@ outer:

c.conn.NetConn().Close() // close socket in case it has not been closed yet

func() {
if c.p.conf.PostScript != "" {
postScript := exec.Command(c.p.conf.PostScript)
err := postScript.Run()
if err != nil {
c.log("ERR: %s", err)
}
}
}()
if runOnConnectCmd != nil {
runOnConnectCmd.Process.Signal(os.Interrupt)
runOnConnectCmd.Wait()
}

close(c.done) // close() never blocks
}
Expand Down Expand Up @@ -732,7 +731,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
},
})

c.runPlay()
c.runPlay(path)
return false

case gortsplib.RECORD:
Expand Down Expand Up @@ -760,7 +759,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
},
})

c.runRecord()
c.runRecord(path)
return false

case gortsplib.TEARDOWN:
Expand All @@ -773,7 +772,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}
}

func (c *serverClient) runPlay() {
func (c *serverClient) runPlay(path string) {
pconf := c.findConfForPath(path)

if c.streamProtocol == streamProtocolTcp {
c.writeBuf = newDoubleBuffer(2048)
c.events = make(chan serverClientEvent)
Expand All @@ -790,6 +791,17 @@ func (c *serverClient) runPlay() {
return "tracks"
}(), c.streamProtocol)

var runOnReadCmd *exec.Cmd
if pconf.RunOnRead != "" {
runOnReadCmd = exec.Command("/bin/sh", "-c", pconf.RunOnRead)
runOnReadCmd.Stdout = os.Stdout
runOnReadCmd.Stderr = os.Stderr
err := runOnReadCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
}

if c.streamProtocol == streamProtocolTcp {
readDone := make(chan error)
go func() {
Expand Down Expand Up @@ -851,9 +863,16 @@ func (c *serverClient) runPlay() {
c.p.events <- programEventClientPlayStop{done, c}
<-done
}

if runOnReadCmd != nil {
runOnReadCmd.Process.Signal(os.Interrupt)
runOnReadCmd.Wait()
}
}

func (c *serverClient) runRecord() {
func (c *serverClient) runRecord(path string) {
pconf := c.findConfForPath(path)

c.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks))
for trackId := range c.streamTracks {
c.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
Expand All @@ -870,6 +889,17 @@ func (c *serverClient) runRecord() {
return "tracks"
}(), c.streamProtocol)

var runOnPublishCmd *exec.Cmd
if pconf.RunOnPublish != "" {
runOnPublishCmd = exec.Command("/bin/sh", "-c", pconf.RunOnPublish)
runOnPublishCmd.Stdout = os.Stdout
runOnPublishCmd.Stderr = os.Stderr
err := runOnPublishCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
}

if c.streamProtocol == streamProtocolTcp {
frame := &gortsplib.InterleavedFrame{}

Expand Down Expand Up @@ -1013,4 +1043,9 @@ func (c *serverClient) runRecord() {
for trackId := range c.streamTracks {
c.RtcpReceivers[trackId].Close()
}

if runOnPublishCmd != nil {
runOnPublishCmd.Process.Signal(os.Interrupt)
runOnPublishCmd.Wait()
}
}

0 comments on commit b837b7a

Please sign in to comment.