diff --git a/main.go b/main.go index ad33157940f..76b910d2db6 100644 --- a/main.go +++ b/main.go @@ -55,18 +55,12 @@ func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { } func (p *program) run() { - var wg sync.WaitGroup + go p.rtpl.run() + go p.rtcpl.run() + go p.rtspl.run() - wg.Add(1) - go p.rtpl.run(wg) - - wg.Add(1) - go p.rtcpl.run(wg) - - wg.Add(1) - go p.rtspl.run(wg) - - wg.Wait() + infty := make(chan struct{}) + <-infty } func (p *program) handleRtp(buf []byte) { diff --git a/rtsp_client.go b/rtsp_client.go index 4fba8153e9c..b6ce40c194f 100644 --- a/rtsp_client.go +++ b/rtsp_client.go @@ -10,7 +10,6 @@ import ( "net/url" "strconv" "strings" - "sync" "rtsp-server/rtsp" ) @@ -40,20 +39,26 @@ func newRtspClient(p *program, nconn net.Conn) *rtspClient { } func (c *rtspClient) close() error { + // already deleted + if _, ok := c.p.clients[c]; !ok { + return nil + } + delete(c.p.clients, c) + c.nconn.Close() if c.p.streamAuthor == c { c.p.streamAuthor = nil c.p.streamSdp = nil - // if the streamer has disconnected + // if the publisher has disconnected // close all other connections for oc := range c.p.clients { oc.close() } } - return c.nconn.Close() + return nil } func (c *rtspClient) log(format string, args ...interface{}) { @@ -61,8 +66,7 @@ func (c *rtspClient) log(format string, args ...interface{}) { log.Printf(format, args...) } -func (c *rtspClient) run(wg sync.WaitGroup) { - defer wg.Done() +func (c *rtspClient) run() { defer c.log("disconnected") defer func() { c.p.mutex.Lock() @@ -278,7 +282,7 @@ func (c *rtspClient) run(wg sync.WaitGroup) { fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), // use two fake server ports, since we do not want to receive feedback // from the client - fmt.Sprintf("server_port=%d-%d", c.p.rtpPort + 2, c.p.rtcpPort + 2), + fmt.Sprintf("server_port=%d-%d", c.p.rtpPort+2, c.p.rtcpPort+2), "ssrc=1234ABCD", }, ";"), "Session": "12345678", diff --git a/rtsp_listener.go b/rtsp_listener.go index 2fb15af2c12..f82071aa6a4 100644 --- a/rtsp_listener.go +++ b/rtsp_listener.go @@ -3,7 +3,6 @@ package main import ( "log" "net" - "sync" ) type rtspListener struct { @@ -32,9 +31,7 @@ func (l *rtspListener) log(format string, args ...interface{}) { log.Printf("[RTSP listener] "+format, args...) } -func (l *rtspListener) run(wg sync.WaitGroup) { - defer wg.Done() - +func (l *rtspListener) run() { for { nconn, err := l.netl.AcceptTCP() if err != nil { @@ -42,7 +39,6 @@ func (l *rtspListener) run(wg sync.WaitGroup) { } rsc := newRtspClient(l.p, nconn) - wg.Add(1) - go rsc.run(wg) + go rsc.run() } } diff --git a/udp_listener.go b/udp_listener.go index 512cb891448..7cc5966a4a8 100644 --- a/udp_listener.go +++ b/udp_listener.go @@ -3,7 +3,6 @@ package main import ( "log" "net" - "sync" ) type udpListener struct { @@ -34,9 +33,7 @@ func (l *udpListener) log(format string, args ...interface{}) { log.Printf("["+l.logPrefix+" listener] "+format, args...) } -func (l *udpListener) run(wg sync.WaitGroup) { - defer wg.Done() - +func (l *udpListener) run() { buf := make([]byte, 2048) // UDP MTU is 1400 for {