Skip to content

Commit

Permalink
update gortsplib
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Jan 26, 2020
1 parent f46d6f4 commit 0285d02
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 75 deletions.
141 changes: 70 additions & 71 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ func sdpFilter(msgIn *sdp.Message, byteIn []byte) (*sdp.Message, []byte) {
return msgOut, byteOut
}

func interleavedChannelToTrack(channel int) (int, trackFlow) {
func interleavedChannelToTrack(channel uint8) (int, trackFlow) {
if (channel % 2) == 0 {
return (channel / 2), _TRACK_FLOW_RTP
return int(channel / 2), _TRACK_FLOW_RTP
}
return ((channel - 1) / 2), _TRACK_FLOW_RTCP
return int((channel - 1) / 2), _TRACK_FLOW_RTCP
}

func trackToInterleavedChannel(id int, flow trackFlow) int {
func trackToInterleavedChannel(id int, flow trackFlow) uint8 {
if flow == _TRACK_FLOW_RTP {
return id * 2
return uint8(id * 2)
}
return (id * 2) + 1
return uint8((id * 2) + 1)
}

type clientState int
Expand All @@ -104,7 +104,7 @@ const (

type client struct {
p *program
conn *gortsplib.Conn
conn *gortsplib.ConnServer
state clientState
ip net.IP
path string
Expand All @@ -117,7 +117,7 @@ type client struct {
func newClient(p *program, nconn net.Conn) *client {
c := &client{
p: p,
conn: gortsplib.NewConn(nconn),
conn: gortsplib.NewConnServer(nconn),
state: _CLIENT_STATE_STARTING,
}

Expand Down Expand Up @@ -195,12 +195,12 @@ func (c *client) writeResDeadline(res *gortsplib.Response) {
func (c *client) writeResError(req *gortsplib.Request, err error) {
c.log("ERR: %s", err)

if cseq, ok := req.Headers["CSeq"]; ok {
if cseq, ok := req.Header["CSeq"]; ok && len(cseq) == 1 {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 400,
Status: "Bad Request",
Headers: map[string]string{
"CSeq": cseq,
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
},
})
} else {
Expand All @@ -214,8 +214,8 @@ func (c *client) writeResError(req *gortsplib.Request, err error) {
func (c *client) handleRequest(req *gortsplib.Request) bool {
c.log(req.Method)

cseq, ok := req.Headers["CSeq"]
if !ok {
cseq, ok := req.Header["CSeq"]
if !ok || len(cseq) != 1 {
c.writeResError(req, fmt.Errorf("cseq missing"))
return false
}
Expand Down Expand Up @@ -250,17 +250,17 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
Header: gortsplib.Header{
"CSeq": cseq,
"Public": strings.Join([]string{
"Public": []string{strings.Join([]string{
"DESCRIBE",
"ANNOUNCE",
"SETUP",
"PLAY",
"PAUSE",
"RECORD",
"TEARDOWN",
}, ", "),
}, ", ")},
},
})
return true
Expand Down Expand Up @@ -290,10 +290,10 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Content-Base": req.Url,
"Content-Type": "application/sdp",
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
"Content-Base": []string{req.Url},
"Content-Type": []string{"application/sdp"},
},
Content: sdp,
})
Expand All @@ -305,13 +305,13 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false
}

ct, ok := req.Headers["Content-Type"]
if !ok {
ct, ok := req.Header["Content-Type"]
if !ok || len(ct) != 1 {
c.writeResError(req, fmt.Errorf("Content-Type header missing"))
return false
}

if ct != "application/sdp" {
if ct[0] != "application/sdp" {
c.writeResError(req, fmt.Errorf("unsupported Content-Type '%s'", ct))
return false
}
Expand All @@ -338,8 +338,8 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 401,
Status: "Unauthorized",
Headers: map[string]string{
"CSeq": req.Headers["CSeq"],
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
},
})
return false
Expand Down Expand Up @@ -370,20 +370,20 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
},
})
return true

case "SETUP":
transportStr, ok := req.Headers["Transport"]
if !ok {
tsRaw, ok := req.Header["Transport"]
if !ok || len(tsRaw) != 1 {
c.writeResError(req, fmt.Errorf("transport header missing"))
return false
}

th := gortsplib.NewTransportHeader(transportStr)
th := gortsplib.ReadHeaderTransport(tsRaw[0])

if _, ok := th["unicast"]; !ok {
c.writeResError(req, fmt.Errorf("transport header does not contain unicast"))
Expand All @@ -410,7 +410,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
Header: gortsplib.Header{
"CSeq": cseq,
},
})
Expand All @@ -419,7 +419,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {

rtpPort, rtcpPort := th.GetPorts("client_port")
if rtpPort == 0 || rtcpPort == 0 {
c.writeResError(req, fmt.Errorf("transport header does not have valid client ports (%s)", transportStr))
c.writeResError(req, fmt.Errorf("transport header does not have valid client ports (%s)", tsRaw[0]))
return false
}

Expand Down Expand Up @@ -463,15 +463,15 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
"Transport": []string{strings.Join([]string{
"RTP/AVP/UDP",
"unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
}, ";"),
"Session": "12345678",
}, ";")},
"Session": []string{"12345678"},
},
})
return true
Expand All @@ -483,7 +483,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
Header: gortsplib.Header{
"CSeq": cseq,
},
})
Expand Down Expand Up @@ -532,20 +532,20 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
"Transport": []string{strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
fmt.Sprintf("interleaved=%s", interleaved),
}, ";"),
"Session": "12345678",
}, ";")},
"Session": []string{"12345678"},
},
})
return true

} else {
c.writeResError(req, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", transportStr))
c.writeResError(req, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", tsRaw[0]))
return false
}

Expand Down Expand Up @@ -578,7 +578,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
Header: gortsplib.Header{
"CSeq": cseq,
},
})
Expand All @@ -587,7 +587,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {

rtpPort, rtcpPort := th.GetPorts("client_port")
if rtpPort == 0 || rtcpPort == 0 {
c.writeResError(req, fmt.Errorf("transport header does not have valid client ports (%s)", transportStr))
c.writeResError(req, fmt.Errorf("transport header does not have valid client ports (%s)", tsRaw[0]))
return false
}

Expand Down Expand Up @@ -620,15 +620,15 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
"Transport": []string{strings.Join([]string{
"RTP/AVP/UDP",
"unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
}, ";"),
"Session": "12345678",
}, ";")},
"Session": []string{"12345678"},
},
})
return true
Expand All @@ -640,7 +640,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 461,
Status: "Unsupported Transport",
Headers: map[string]string{
Header: gortsplib.Header{
"CSeq": cseq,
},
})
Expand Down Expand Up @@ -687,20 +687,20 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Transport": strings.Join([]string{
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
"Transport": []string{strings.Join([]string{
"RTP/AVP/TCP",
"unicast",
fmt.Sprintf("interleaved=%s", interleaved),
}, ";"),
"Session": "12345678",
}, ";")},
"Session": []string{"12345678"},
},
})
return true

} else {
c.writeResError(req, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", transportStr))
c.writeResError(req, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", tsRaw[0]))
return false
}

Expand Down Expand Up @@ -746,9 +746,9 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Session": "12345678",
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
"Session": []string{"12345678"},
},
})

Expand Down Expand Up @@ -800,9 +800,9 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Session": "12345678",
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
"Session": []string{"12345678"},
},
})
return true
Expand Down Expand Up @@ -836,9 +836,9 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{
StatusCode: 200,
Status: "OK",
Headers: map[string]string{
"CSeq": cseq,
"Session": "12345678",
Header: gortsplib.Header{
"CSeq": []string{cseq[0]},
"Session": []string{"12345678"},
},
})

Expand All @@ -856,24 +856,23 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
// when protocol is TCP, the RTSP connection becomes a RTP connection
// receive RTP data and parse it
if c.streamProtocol == _STREAM_PROTOCOL_TCP {
buf := make([]byte, 2048)
for {
c.conn.NetConn().SetReadDeadline(time.Now().Add(_READ_TIMEOUT))
channel, n, err := c.conn.ReadInterleavedFrame(buf)
frame, err := c.conn.ReadInterleavedFrame()
if err != nil {
c.log("ERR: %s", err)
return false
}

trackId, trackFlow := interleavedChannelToTrack(channel)
trackId, trackFlow := interleavedChannelToTrack(frame.Channel)

if trackId >= len(c.streamTracks) {
c.log("ERR: invalid track id '%d'", trackId)
return false
}

c.p.mutex.RLock()
c.p.forwardTrack(c.path, trackId, trackFlow, buf[:n])
c.p.forwardTrack(c.path, trackId, trackFlow, frame.Content)
c.p.mutex.RUnlock()
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.13
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200120211423-ea12a2ccff1c
github.com/aler9/gortsplib v0.0.0-20200126104709-9d2081e29ccc
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gortc.io/sdp v0.17.0
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200120211423-ea12a2ccff1c h1:dKNfvjX6CN/+fPsCxwQPRDy0dqW9K1d+61KXeBfzlcU=
github.com/aler9/gortsplib v0.0.0-20200120211423-ea12a2ccff1c/go.mod h1:YiIgmmv0ELkWUy11Jj2h5AgfqLCpy8sIX/l9MmS8+uw=
github.com/aler9/gortsplib v0.0.0-20200126104709-9d2081e29ccc h1:lRClB+QB904mIwYOy07gArpCwu7wZ9cqgdmrbsP28Rc=
github.com/aler9/gortsplib v0.0.0-20200126104709-9d2081e29ccc/go.mod h1:YiIgmmv0ELkWUy11Jj2h5AgfqLCpy8sIX/l9MmS8+uw=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
Expand Down
Loading

0 comments on commit 0285d02

Please sign in to comment.