Skip to content

Commit

Permalink
fix panic when a client disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Dec 29, 2019
1 parent 58e6f6d commit 9a6a813
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 27 deletions.
16 changes: 5 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,12 @@ func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) {
}

func (p *program) run() {
var wg sync.WaitGroup
go p.rtpl.run()
go p.rtcpl.run()
go p.rtspl.run()

wg.Add(1)
go p.rtpl.run(wg)

wg.Add(1)
go p.rtcpl.run(wg)

wg.Add(1)
go p.rtspl.run(wg)

wg.Wait()
infty := make(chan struct{})
<-infty
}

func (p *program) handleRtp(buf []byte) {
Expand Down
16 changes: 10 additions & 6 deletions rtsp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"net/url"
"strconv"
"strings"
"sync"

"rtsp-server/rtsp"
)
Expand Down Expand Up @@ -40,29 +39,34 @@ func newRtspClient(p *program, nconn net.Conn) *rtspClient {
}

func (c *rtspClient) close() error {
// already deleted
if _, ok := c.p.clients[c]; !ok {
return nil
}

delete(c.p.clients, c)
c.nconn.Close()

if c.p.streamAuthor == c {
c.p.streamAuthor = nil
c.p.streamSdp = nil

// if the streamer has disconnected
// if the publisher has disconnected
// close all other connections
for oc := range c.p.clients {
oc.close()
}
}

return c.nconn.Close()
return nil
}

func (c *rtspClient) log(format string, args ...interface{}) {
format = "[RTSP client " + c.nconn.RemoteAddr().String() + "] " + format
log.Printf(format, args...)
}

func (c *rtspClient) run(wg sync.WaitGroup) {
defer wg.Done()
func (c *rtspClient) run() {
defer c.log("disconnected")
defer func() {
c.p.mutex.Lock()
Expand Down Expand Up @@ -278,7 +282,7 @@ func (c *rtspClient) run(wg sync.WaitGroup) {
fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2),
// use two fake server ports, since we do not want to receive feedback
// from the client
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort + 2, c.p.rtcpPort + 2),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort+2, c.p.rtcpPort+2),
"ssrc=1234ABCD",
}, ";"),
"Session": "12345678",
Expand Down
8 changes: 2 additions & 6 deletions rtsp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"log"
"net"
"sync"
)

type rtspListener struct {
Expand Down Expand Up @@ -32,17 +31,14 @@ func (l *rtspListener) log(format string, args ...interface{}) {
log.Printf("[RTSP listener] "+format, args...)
}

func (l *rtspListener) run(wg sync.WaitGroup) {
defer wg.Done()

func (l *rtspListener) run() {
for {
nconn, err := l.netl.AcceptTCP()
if err != nil {
break
}

rsc := newRtspClient(l.p, nconn)
wg.Add(1)
go rsc.run(wg)
go rsc.run()
}
}
5 changes: 1 addition & 4 deletions udp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"log"
"net"
"sync"
)

type udpListener struct {
Expand Down Expand Up @@ -34,9 +33,7 @@ func (l *udpListener) log(format string, args ...interface{}) {
log.Printf("["+l.logPrefix+" listener] "+format, args...)
}

func (l *udpListener) run(wg sync.WaitGroup) {
defer wg.Done()

func (l *udpListener) run() {
buf := make([]byte, 2048) // UDP MTU is 1400

for {
Expand Down

0 comments on commit 9a6a813

Please sign in to comment.