diff --git a/client.go b/client.go index 67253edcf3c..2684c687516 100644 --- a/client.go +++ b/client.go @@ -7,11 +7,17 @@ import ( "net" "net/url" "strings" + "time" "github.com/aler9/gortsplib" "gortc.io/sdp" ) +const ( + _READ_TIMEOUT = 5 * time.Second + _WRITE_TIMEOUT = 5 * time.Second +) + func interleavedChannelToTrack(channel int) (int, trackFlow) { if (channel % 2) == 0 { return (channel / 2), _TRACK_FLOW_RTP @@ -122,7 +128,8 @@ func (c *client) run() { } } -func (c *client) writeRes(res *gortsplib.Response) { +func (c *client) writeResDeadline(res *gortsplib.Response) { + c.conn.NetConn().SetWriteDeadline(time.Now().Add(_WRITE_TIMEOUT)) c.conn.WriteResponse(res) } @@ -130,7 +137,7 @@ func (c *client) writeResError(req *gortsplib.Request, err error) { c.log("ERR: %s", err) if cseq, ok := req.Headers["CSeq"]; ok { - c.conn.WriteResponse(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 400, Status: "Bad Request", Headers: map[string]string{ @@ -138,7 +145,7 @@ func (c *client) writeResError(req *gortsplib.Request, err error) { }, }) } else { - c.conn.WriteResponse(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 400, Status: "Bad Request", }) @@ -181,7 +188,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { // do not check state, since OPTIONS can be requested // in any state - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -221,7 +228,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -281,7 +288,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { if !ok || len(key) != 1 || key[0] != c.p.publishKey { // reply with 401 and exit c.log("ERR: publish key wrong or missing") - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 401, Status: "Unauthorized", Headers: map[string]string{ @@ -313,7 +320,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -353,7 +360,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { }() { if _, ok := c.p.protocols[_STREAM_PROTOCOL_UDP]; !ok { c.log("ERR: udp streaming is disabled") - c.conn.WriteResponse(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 461, Status: "Unsupported Transport", Headers: map[string]string{ @@ -406,7 +413,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -426,7 +433,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { } else if _, ok := th["RTP/AVP/TCP"]; ok { if _, ok := c.p.protocols[_STREAM_PROTOCOL_TCP]; !ok { c.log("ERR: tcp streaming is disabled") - c.conn.WriteResponse(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 461, Status: "Unsupported Transport", Headers: map[string]string{ @@ -475,7 +482,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { interleaved := fmt.Sprintf("%d-%d", ((len(c.streamTracks) - 1) * 2), ((len(c.streamTracks)-1)*2)+1) - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -521,7 +528,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { }() { if _, ok := c.p.protocols[_STREAM_PROTOCOL_UDP]; !ok { c.log("ERR: udp streaming is disabled") - c.conn.WriteResponse(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 461, Status: "Unsupported Transport", Headers: map[string]string{ @@ -563,7 +570,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -583,7 +590,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { } else if _, ok := th["RTP/AVP/TCP"]; ok { if _, ok := c.p.protocols[_STREAM_PROTOCOL_TCP]; !ok { c.log("ERR: tcp streaming is disabled") - c.conn.WriteResponse(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 461, Status: "Unsupported Transport", Headers: map[string]string{ @@ -630,7 +637,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -689,7 +696,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { // first write response, then set state // otherwise, in case of TCP connections, RTP packets could be written // before the response - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -743,7 +750,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { c.state = _CLIENT_STATE_PRE_PLAY c.p.mutex.Unlock() - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -779,7 +786,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - c.writeRes(&gortsplib.Response{ + c.writeResDeadline(&gortsplib.Response{ StatusCode: 200, Status: "OK", Headers: map[string]string{ @@ -804,13 +811,10 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { if c.streamProtocol == _STREAM_PROTOCOL_TCP { buf := make([]byte, 2048) for { + c.conn.NetConn().SetReadDeadline(time.Now().Add(_READ_TIMEOUT)) channel, n, err := c.conn.ReadInterleavedFrame(buf) if err != nil { - if _, ok := err.(*net.OpError); ok { - } else if err == io.EOF { - } else { - c.log("ERR: %s", err) - } + c.log("ERR: %s", err) return false }