diff --git a/lingress.go b/lingress.go index f41469f..d67245f 100644 --- a/lingress.go +++ b/lingress.go @@ -14,6 +14,7 @@ import ( log "github.com/sirupsen/logrus" "net" "net/http" + "reflect" "strings" "sync/atomic" ) @@ -27,8 +28,9 @@ type Lingress struct { Https *server.HttpConnector AccessLogQueueSize uint16 - accessLogQueue chan accessLogEntry - connectionInformation *ConnectionInformation + accessLogQueue chan accessLogEntry + + unprocessableConnectionDocumented map[reflect.Type]bool } type accessLogEntry map[string]interface{} @@ -55,14 +57,13 @@ func New(fps support.FileProviders) (*Lingress, error) { return nil, err } else { result := &Lingress{ - RulesRepository: r, - Proxy: p, - Fallback: f, - Management: m, - Http: hHttp, - Https: hHttps, - connectionInformation: NewConnectionInformation(), - AccessLogQueueSize: 5000, + RulesRepository: r, + Proxy: p, + Fallback: f, + Management: m, + Http: hHttp, + Https: hHttps, + AccessLogQueueSize: 5000, } result.Http.Handler = result @@ -113,13 +114,21 @@ func (instance *Lingress) ServeHTTP(connector server.Connector, resp http.Respon } func (instance *Lingress) OnConnState(connector server.Connector, conn net.Conn, state http.ConnState) { - previous := instance.connectionInformation.SetState(conn, state, func(target http.ConnState) bool { - return target != http.StateNew && - target != http.StateActive && - target != http.StateIdle - }) + annotated, ok := conn.RemoteAddr().(server.AnnotatedAddr) + if !ok { + connType := reflect.TypeOf(conn) + if !instance.unprocessableConnectionDocumented[connType] { + instance.unprocessableConnectionDocumented[connType] = true + log.WithField("connType", connType.String()). + Error("cannot inspect connection of provided connection type; for those kinds of connections there will be no statistics be available") + } + return + } - if previous == state { + previous := annotated.GetState() + annotated.SetState(&state) + + if previous != nil && *previous == state { return } @@ -129,17 +138,17 @@ func (instance *Lingress) OnConnState(connector server.Connector, conn net.Conn, } source := client.Connections.Source - switch previous { - case http.StateNew: - atomic.AddUint64(&source.New, ^uint64(0)) - case http.StateActive: - atomic.AddUint64(&source.Active, ^uint64(0)) - case http.StateIdle: - atomic.AddUint64(&source.Idle, ^uint64(0)) - case -1: - // ignore - default: - return + if previous != nil { + switch *previous { + case http.StateNew: + atomic.AddUint64(&source.New, ^uint64(0)) + case http.StateActive: + atomic.AddUint64(&source.Active, ^uint64(0)) + case http.StateIdle: + atomic.AddUint64(&source.Idle, ^uint64(0)) + default: + return + } } switch state { @@ -174,6 +183,8 @@ func (instance *Lingress) Init(stop support.Channel) error { instance.accessLogQueue = nil } + instance.unprocessableConnectionDocumented = make(map[reflect.Type]bool) + go instance.shutdownListener(stop) if tlsConfig, err := instance.createTlsConfig(); err != nil { diff --git a/server/utils.go b/server/connections.go similarity index 50% rename from server/utils.go rename to server/connections.go index f4e99fa..38a0423 100644 --- a/server/utils.go +++ b/server/connections.go @@ -1,25 +1,12 @@ package server import ( + "fmt" "net" + "net/http" "sync" - "time" ) -type tcpKeepAliveListener struct { - *net.TCPListener -} - -func (ln tcpKeepAliveListener) Accept() (net.Conn, error) { - tc, err := ln.AcceptTCP() - if err != nil { - return nil, err - } - _ = tc.SetKeepAlive(true) - _ = tc.SetKeepAlivePeriod(2 * time.Minute) - return tc, nil -} - type limitedListener struct { sync.Mutex net.Listener @@ -48,7 +35,14 @@ func (instance *limitedListener) Accept() (net.Conn, error) { if c, err := instance.Listener.Accept(); err != nil { return nil, err } else { - result := &limitedConn{c, instance} + if err := c.(*net.TCPConn).SetLinger(0); err != nil { + return nil, fmt.Errorf("cannot set the SO_LINGER to 0") + } + result := &limitedConn{ + Conn: c, + annotatedRemoteAddr: &annotatedAddr{Addr: c.RemoteAddr()}, + parent: instance, + } success = true return result, nil } @@ -56,7 +50,13 @@ func (instance *limitedListener) Accept() (net.Conn, error) { type limitedConn struct { net.Conn - parent *limitedListener + annotatedRemoteAddr AnnotatedAddr + parent *limitedListener + annotatedAddr +} + +func (instance *limitedConn) RemoteAddr() net.Addr { + return instance.annotatedRemoteAddr } func (instance *limitedConn) Close() error { @@ -65,3 +65,22 @@ func (instance *limitedConn) Close() error { }() return instance.Conn.Close() } + +type AnnotatedAddr interface { + net.Addr + GetState() *http.ConnState + SetState(*http.ConnState) +} + +type annotatedAddr struct { + net.Addr + state *http.ConnState +} + +func (instance *annotatedAddr) GetState() *http.ConnState { + return instance.state +} + +func (instance *annotatedAddr) SetState(v *http.ConnState) { + instance.state = v +} diff --git a/server/connector_http.go b/server/connector_http.go index 11878a8..16fcb69 100644 --- a/server/connector_http.go +++ b/server/connector_http.go @@ -17,7 +17,8 @@ type HttpConnector struct { MaxConnections uint16 - Server http.Server + Server http.Server + ListenConfig net.ListenConfig } func NewHttpConnector(id ConnectorId) (*HttpConnector, error) { @@ -35,6 +36,10 @@ func NewHttpConnector(id ConnectorId) (*HttpConnector, error) { "context": "server.http", }, log.DebugLevel), }, + + ListenConfig: net.ListenConfig{ + KeepAlive: 2 * time.Minute, + }, } result.Server.Handler = http.HandlerFunc(result.serveHTTP) @@ -44,11 +49,10 @@ func NewHttpConnector(id ConnectorId) (*HttpConnector, error) { } func (instance *HttpConnector) Serve(stop support.Channel) error { - ln, err := net.Listen("tcp", instance.Server.Addr) + ln, err := (&instance.ListenConfig).Listen(context.Background(), "tcp", instance.Server.Addr) if err != nil { return err } - ln = tcpKeepAliveListener{ln.(*net.TCPListener)} ln = newLimitedListener(instance.MaxConnections, ln) var serve func() error @@ -134,6 +138,10 @@ func (instance *HttpConnector) RegisterFlag(fe support.FlagEnabled, appPrefix st PlaceHolder(fmt.Sprint(instance.Server.IdleTimeout)). Envar(instance.clientFlagEnvVar(appPrefix, "IDLE_TIMEOUT")). DurationVar(&instance.Server.IdleTimeout) + fe.Flag(instance.clientFlagName("keepAlive"), "Duration to keep a connection alive (if required); 0 means unlimited."). + PlaceHolder(fmt.Sprint(instance.ListenConfig.KeepAlive)). + Envar(instance.clientFlagEnvVar(appPrefix, "KEEP_ALIVE")). + DurationVar(&instance.ListenConfig.KeepAlive) return nil } diff --git a/utils.go b/utils.go index 42ad207..93680b3 100644 --- a/utils.go +++ b/utils.go @@ -3,8 +3,6 @@ package lingress import ( "github.com/echocat/lingress/support" "net" - "net/http" - "sync" ) var ( @@ -21,43 +19,3 @@ var ( "fe80::/10", "fc00::/7", "::1/128", ) ) - -type ConnectionInformation struct { - all map[net.Conn]http.ConnState - - mutex *sync.Mutex -} - -func NewConnectionInformation() *ConnectionInformation { - return &ConnectionInformation{ - all: make(map[net.Conn]http.ConnState), - mutex: new(sync.Mutex), - } -} - -func (instance *ConnectionInformation) SetState( - conn net.Conn, - newState http.ConnState, - keepFunc func(http.ConnState) bool, -) (previousState http.ConnState) { - instance.mutex.Lock() - defer instance.mutex.Unlock() - - keep := newState >= 0 || keepFunc(newState) - - result, exists := instance.all[conn] - if exists { - if !keep { - delete(instance.all, conn) - return result - } - previousState = result - instance.all[conn] = newState - return previousState - } - - if keep { - instance.all[conn] = newState - } - return -1 -}