From 50e2d8625bc1100d076acc611a5dbeab949e8a80 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 20 Jul 2020 08:52:14 +0200 Subject: [PATCH] adjust buffer sizes to avoid memory leaks (#43) --- server-client.go | 10 ++++++++-- server-udpl.go | 4 ++-- source.go | 13 +++++++------ 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/server-client.go b/server-client.go index deed5a5f023..7447d95781f 100644 --- a/server-client.go +++ b/server-client.go @@ -17,6 +17,10 @@ import ( const ( clientCheckStreamInterval = 5 * time.Second clientReceiverReportInterval = 10 * time.Second + clientTcpReadBufferSize = 128 * 1024 + clientTcpWriteBufferSize = 128 * 1024 + clientUdpReadBufferSize = 2048 + clientUdpWriteBufferSize = 128 * 1024 ) type serverClientTrack struct { @@ -98,7 +102,7 @@ func newServerClient(p *program, nconn net.Conn) *serverClient { WriteTimeout: p.conf.WriteTimeout, }), state: clientStateStarting, - readBuf: newDoubleBuffer(512 * 1024), + readBuf: newDoubleBuffer(clientTcpReadBufferSize), done: make(chan struct{}), } @@ -788,7 +792,7 @@ func (c *serverClient) runPlay(path string) { pconf := c.findConfForPath(path) if c.streamProtocol == gortsplib.StreamProtocolTcp { - c.writeBuf = newDoubleBuffer(2048) + c.writeBuf = newDoubleBuffer(clientTcpWriteBufferSize) c.events = make(chan serverClientEvent) } @@ -818,6 +822,7 @@ func (c *serverClient) runPlay(path string) { readDone := make(chan error) go func() { buf := make([]byte, 2048) + for { _, err := c.conn.NetConn().Read(buf) if err != nil { @@ -920,6 +925,7 @@ func (c *serverClient) runRecord(path string) { for { frame.Content = c.readBuf.swap() frame.Content = frame.Content[:cap(frame.Content)] + recv, err := c.conn.ReadFrameOrRequest(frame) if err != nil { readDone <- err diff --git a/server-udpl.go b/server-udpl.go index 86b5b78b950..eecacf92c11 100644 --- a/server-udpl.go +++ b/server-udpl.go @@ -35,8 +35,8 @@ func newServerUdpListener(p *program, port int, streamType gortsplib.StreamType) p: p, nconn: nconn, streamType: streamType, - readBuf: newDoubleBuffer(2048), - writeBuf: newDoubleBuffer(2048), + readBuf: newDoubleBuffer(clientUdpReadBufferSize), + writeBuf: newDoubleBuffer(clientUdpWriteBufferSize), writeChan: make(chan *udpAddrBufPair), done: make(chan struct{}), } diff --git a/source.go b/source.go index 285bc66f0d1..93d0a4c4a14 100644 --- a/source.go +++ b/source.go @@ -14,7 +14,9 @@ import ( ) const ( - sourceRetryInterval = 5 * time.Second + sourceRetryInterval = 5 * time.Second + sourceUdpReadBufferSize = 2048 + sourceTcpReadBufferSize = 128 * 1024 ) type source struct { @@ -26,7 +28,6 @@ type source struct { tracks []*gortsplib.Track serverSdpText []byte serverSdpParsed *sdp.SessionDescription - readBuf *doubleBuffer terminate chan struct{} done chan struct{} @@ -71,7 +72,6 @@ func newSource(p *program, path string, sourceStr string, sourceProtocol string) path: path, u: u, proto: proto, - readBuf: newDoubleBuffer(512 * 1024), terminate: make(chan struct{}), done: make(chan struct{}), } @@ -226,7 +226,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { go func(trackId int, l *gortsplib.ConnClientUdpListener) { defer wg.Done() - doubleBuf := newDoubleBuffer(2048) + doubleBuf := newDoubleBuffer(sourceUdpReadBufferSize) for { buf := doubleBuf.swap() @@ -243,7 +243,7 @@ func (s *source) runUdp(conn *gortsplib.ConnClient) bool { go func(trackId int, l *gortsplib.ConnClientUdpListener) { defer wg.Done() - doubleBuf := newDoubleBuffer(2048) + doubleBuf := newDoubleBuffer(sourceUdpReadBufferSize) for { buf := doubleBuf.swap() @@ -309,11 +309,12 @@ func (s *source) runTcp(conn *gortsplib.ConnClient) bool { s.p.events <- programEventStreamerReady{s} frame := &gortsplib.InterleavedFrame{} + doubleBuf := newDoubleBuffer(sourceTcpReadBufferSize) tcpConnDone := make(chan error) go func() { for { - frame.Content = s.readBuf.swap() + frame.Content = doubleBuf.swap() frame.Content = frame.Content[:cap(frame.Content)] err := conn.ReadFrame(frame)