Skip to content

Commit

Permalink
Fixed memory leak caused by statistic recording
Browse files Browse the repository at this point in the history
  • Loading branch information
blaubaer committed Sep 9, 2020
1 parent d7b412d commit 96e54f0
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 89 deletions.
65 changes: 38 additions & 27 deletions lingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
log "github.com/sirupsen/logrus"
"net"
"net/http"
"reflect"
"strings"
"sync/atomic"
)
Expand All @@ -27,8 +28,9 @@ type Lingress struct {
Https *server.HttpConnector
AccessLogQueueSize uint16

accessLogQueue chan accessLogEntry
connectionInformation *ConnectionInformation
accessLogQueue chan accessLogEntry

unprocessableConnectionDocumented map[reflect.Type]bool
}

type accessLogEntry map[string]interface{}
Expand All @@ -55,14 +57,13 @@ func New(fps support.FileProviders) (*Lingress, error) {
return nil, err
} else {
result := &Lingress{
RulesRepository: r,
Proxy: p,
Fallback: f,
Management: m,
Http: hHttp,
Https: hHttps,
connectionInformation: NewConnectionInformation(),
AccessLogQueueSize: 5000,
RulesRepository: r,
Proxy: p,
Fallback: f,
Management: m,
Http: hHttp,
Https: hHttps,
AccessLogQueueSize: 5000,
}

result.Http.Handler = result
Expand Down Expand Up @@ -113,13 +114,21 @@ func (instance *Lingress) ServeHTTP(connector server.Connector, resp http.Respon
}

func (instance *Lingress) OnConnState(connector server.Connector, conn net.Conn, state http.ConnState) {
previous := instance.connectionInformation.SetState(conn, state, func(target http.ConnState) bool {
return target != http.StateNew &&
target != http.StateActive &&
target != http.StateIdle
})
annotated, ok := conn.RemoteAddr().(server.AnnotatedAddr)
if !ok {
connType := reflect.TypeOf(conn)
if !instance.unprocessableConnectionDocumented[connType] {
instance.unprocessableConnectionDocumented[connType] = true
log.WithField("connType", connType.String()).
Error("cannot inspect connection of provided connection type; for those kinds of connections there will be no statistics be available")
}
return
}

if previous == state {
previous := annotated.GetState()
annotated.SetState(&state)

if previous != nil && *previous == state {
return
}

Expand All @@ -129,17 +138,17 @@ func (instance *Lingress) OnConnState(connector server.Connector, conn net.Conn,
}

source := client.Connections.Source
switch previous {
case http.StateNew:
atomic.AddUint64(&source.New, ^uint64(0))
case http.StateActive:
atomic.AddUint64(&source.Active, ^uint64(0))
case http.StateIdle:
atomic.AddUint64(&source.Idle, ^uint64(0))
case -1:
// ignore
default:
return
if previous != nil {
switch *previous {
case http.StateNew:
atomic.AddUint64(&source.New, ^uint64(0))
case http.StateActive:
atomic.AddUint64(&source.Active, ^uint64(0))
case http.StateIdle:
atomic.AddUint64(&source.Idle, ^uint64(0))
default:
return
}
}

switch state {
Expand Down Expand Up @@ -174,6 +183,8 @@ func (instance *Lingress) Init(stop support.Channel) error {
instance.accessLogQueue = nil
}

instance.unprocessableConnectionDocumented = make(map[reflect.Type]bool)

go instance.shutdownListener(stop)

if tlsConfig, err := instance.createTlsConfig(); err != nil {
Expand Down
53 changes: 36 additions & 17 deletions server/utils.go → server/connections.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
package server

import (
"fmt"
"net"
"net/http"
"sync"
"time"
)

type tcpKeepAliveListener struct {
*net.TCPListener
}

func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
tc, err := ln.AcceptTCP()
if err != nil {
return nil, err
}
_ = tc.SetKeepAlive(true)
_ = tc.SetKeepAlivePeriod(2 * time.Minute)
return tc, nil
}

type limitedListener struct {
sync.Mutex
net.Listener
Expand Down Expand Up @@ -48,15 +35,28 @@ func (instance *limitedListener) Accept() (net.Conn, error) {
if c, err := instance.Listener.Accept(); err != nil {
return nil, err
} else {
result := &limitedConn{c, instance}
if err := c.(*net.TCPConn).SetLinger(0); err != nil {
return nil, fmt.Errorf("cannot set the SO_LINGER to 0")
}
result := &limitedConn{
Conn: c,
annotatedRemoteAddr: &annotatedAddr{Addr: c.RemoteAddr()},
parent: instance,
}
success = true
return result, nil
}
}

type limitedConn struct {
net.Conn
parent *limitedListener
annotatedRemoteAddr AnnotatedAddr
parent *limitedListener
annotatedAddr
}

func (instance *limitedConn) RemoteAddr() net.Addr {
return instance.annotatedRemoteAddr
}

func (instance *limitedConn) Close() error {
Expand All @@ -65,3 +65,22 @@ func (instance *limitedConn) Close() error {
}()
return instance.Conn.Close()
}

type AnnotatedAddr interface {
net.Addr
GetState() *http.ConnState
SetState(*http.ConnState)
}

type annotatedAddr struct {
net.Addr
state *http.ConnState
}

func (instance *annotatedAddr) GetState() *http.ConnState {
return instance.state
}

func (instance *annotatedAddr) SetState(v *http.ConnState) {
instance.state = v
}
14 changes: 11 additions & 3 deletions server/connector_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type HttpConnector struct {

MaxConnections uint16

Server http.Server
Server http.Server
ListenConfig net.ListenConfig
}

func NewHttpConnector(id ConnectorId) (*HttpConnector, error) {
Expand All @@ -35,6 +36,10 @@ func NewHttpConnector(id ConnectorId) (*HttpConnector, error) {
"context": "server.http",
}, log.DebugLevel),
},

ListenConfig: net.ListenConfig{
KeepAlive: 2 * time.Minute,
},
}

result.Server.Handler = http.HandlerFunc(result.serveHTTP)
Expand All @@ -44,11 +49,10 @@ func NewHttpConnector(id ConnectorId) (*HttpConnector, error) {
}

func (instance *HttpConnector) Serve(stop support.Channel) error {
ln, err := net.Listen("tcp", instance.Server.Addr)
ln, err := (&instance.ListenConfig).Listen(context.Background(), "tcp", instance.Server.Addr)
if err != nil {
return err
}
ln = tcpKeepAliveListener{ln.(*net.TCPListener)}
ln = newLimitedListener(instance.MaxConnections, ln)

var serve func() error
Expand Down Expand Up @@ -134,6 +138,10 @@ func (instance *HttpConnector) RegisterFlag(fe support.FlagEnabled, appPrefix st
PlaceHolder(fmt.Sprint(instance.Server.IdleTimeout)).
Envar(instance.clientFlagEnvVar(appPrefix, "IDLE_TIMEOUT")).
DurationVar(&instance.Server.IdleTimeout)
fe.Flag(instance.clientFlagName("keepAlive"), "Duration to keep a connection alive (if required); 0 means unlimited.").
PlaceHolder(fmt.Sprint(instance.ListenConfig.KeepAlive)).
Envar(instance.clientFlagEnvVar(appPrefix, "KEEP_ALIVE")).
DurationVar(&instance.ListenConfig.KeepAlive)

return nil
}
Expand Down
42 changes: 0 additions & 42 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package lingress
import (
"github.com/echocat/lingress/support"
"net"
"net/http"
"sync"
)

var (
Expand All @@ -21,43 +19,3 @@ var (
"fe80::/10", "fc00::/7", "::1/128",
)
)

type ConnectionInformation struct {
all map[net.Conn]http.ConnState

mutex *sync.Mutex
}

func NewConnectionInformation() *ConnectionInformation {
return &ConnectionInformation{
all: make(map[net.Conn]http.ConnState),
mutex: new(sync.Mutex),
}
}

func (instance *ConnectionInformation) SetState(
conn net.Conn,
newState http.ConnState,
keepFunc func(http.ConnState) bool,
) (previousState http.ConnState) {
instance.mutex.Lock()
defer instance.mutex.Unlock()

keep := newState >= 0 || keepFunc(newState)

result, exists := instance.all[conn]
if exists {
if !keep {
delete(instance.all, conn)
return result
}
previousState = result
instance.all[conn] = newState
return previousState
}

if keep {
instance.all[conn] = newState
}
return -1
}

0 comments on commit 96e54f0

Please sign in to comment.