Skip to content

Commit

Permalink
add read and write deadlines
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Jan 20, 2020
1 parent 7f792f4 commit ec32c80
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ import (
"net"
"net/url"
"strings"
"time"

"github.com/aler9/gortsplib"
"gortc.io/sdp"
)

const (
_READ_TIMEOUT = 5 * time.Second
_WRITE_TIMEOUT = 5 * time.Second
)

func interleavedChannelToTrack(channel int) (int, trackFlow) {
if (channel % 2) == 0 {
return (channel / 2), _TRACK_FLOW_RTP
Expand Down Expand Up @@ -122,23 +128,24 @@ func (c *client) run() {
}
}

func (c *client) writeRes(res *gortsplib.Response) {
func (c *client) writeResDeadline(res *gortsplib.Response) {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(_WRITE_TIMEOUT))
c.conn.WriteResponse(res)
}

func (c *client) writeResError(req *gortsplib.Request, err error) {
c.log("ERR: %s", err)

if cseq, ok := req.Headers["CSeq"]; ok {
c.conn.WriteResponse(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 400,
Status: "Bad Request",
Headers: map[string]string{
"CSeq": cseq,
},
})
} else {
c.conn.WriteResponse(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 400,
Status: "Bad Request",
})
Expand Down Expand Up @@ -181,7 +188,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
// do not check state, since OPTIONS can be requested
// in any state

c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Expand Down Expand Up @@ -221,7 +228,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}

c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Expand Down Expand Up @@ -281,7 +288,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
if !ok || len(key) != 1 || key[0] != c.p.publishKey {
// reply with 401 and exit
c.log("ERR: publish key wrong or missing")
c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 401,
Status: "Unauthorized",
Headers: map[string]string{
Expand Down Expand Up @@ -313,7 +320,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}

c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Expand Down Expand Up @@ -353,7 +360,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
}() {
if _, ok := c.p.protocols[_STREAM_PROTOCOL_UDP]; !ok {
c.log("ERR: udp streaming is disabled")
c.conn.WriteResponse(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
Expand Down Expand Up @@ -406,7 +413,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}

c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Expand All @@ -426,7 +433,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.protocols[_STREAM_PROTOCOL_TCP]; !ok {
c.log("ERR: tcp streaming is disabled")
c.conn.WriteResponse(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
Expand Down Expand Up @@ -475,7 +482,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {

interleaved := fmt.Sprintf("%d-%d", ((len(c.streamTracks) - 1) * 2), ((len(c.streamTracks)-1)*2)+1)

c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Expand Down Expand Up @@ -521,7 +528,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
}() {
if _, ok := c.p.protocols[_STREAM_PROTOCOL_UDP]; !ok {
c.log("ERR: udp streaming is disabled")
c.conn.WriteResponse(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
Expand Down Expand Up @@ -563,7 +570,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}

c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Expand All @@ -583,7 +590,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
} else if _, ok := th["RTP/AVP/TCP"]; ok {
if _, ok := c.p.protocols[_STREAM_PROTOCOL_TCP]; !ok {
c.log("ERR: tcp streaming is disabled")
c.conn.WriteResponse(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
Expand Down Expand Up @@ -630,7 +637,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}

c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Expand Down Expand Up @@ -689,7 +696,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
// first write response, then set state
// otherwise, in case of TCP connections, RTP packets could be written
// before the response
c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Expand Down Expand Up @@ -743,7 +750,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.state = _CLIENT_STATE_PRE_PLAY
c.p.mutex.Unlock()

c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Expand Down Expand Up @@ -779,7 +786,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}

c.writeRes(&gortsplib.Response{
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Expand All @@ -804,13 +811,10 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
if c.streamProtocol == _STREAM_PROTOCOL_TCP {
buf := make([]byte, 2048)
for {
c.conn.NetConn().SetReadDeadline(time.Now().Add(_READ_TIMEOUT))
channel, n, err := c.conn.ReadInterleavedFrame(buf)
if err != nil {
if _, ok := err.(*net.OpError); ok {
} else if err == io.EOF {
} else {
c.log("ERR: %s", err)
}
c.log("ERR: %s", err)
return false
}

Expand Down

0 comments on commit ec32c80

Please sign in to comment.