From 58907d03a2e76470c10c98dc2e57c52ce04fe0f2 Mon Sep 17 00:00:00 2001 From: Gregor Noczinski Date: Mon, 29 Jun 2020 16:10:18 +0200 Subject: [PATCH] Adding some more meaningful metrics --- lingress.go | 43 ++++++++++++++++++++--------------- utils.go | 65 ++++++++++++++++++++++++----------------------------- 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/lingress.go b/lingress.go index c93ad7c..ea2b4ef 100644 --- a/lingress.go +++ b/lingress.go @@ -20,13 +20,14 @@ import ( ) type Lingress struct { - RulesRepository rules.CombinedRepository - Proxy *proxy.Proxy - Fallback *fallback.Fallback - Management *management.Management - http http.Server - https http.Server - accessLogQueue chan accessLogEntry + RulesRepository rules.CombinedRepository + Proxy *proxy.Proxy + Fallback *fallback.Fallback + Management *management.Management + http http.Server + https http.Server + accessLogQueue chan accessLogEntry + connectionInformation *ConnectionInformation HttpListenAddr string HttpsListenAddr string @@ -71,13 +72,14 @@ func New(fps support.FileProviders) (*Lingress, error) { "context": "server.https", }, log.DebugLevel), }, - HttpListenAddr: ":8080", - HttpsListenAddr: ":8443", - MaxHeaderBytes: 2 << 20, // 2MB, - ReadHeaderTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 5 * time.Minute, - AccessLogQueueSize: 5000, + connectionInformation: NewConnectionInformation(), + HttpListenAddr: ":8080", + HttpsListenAddr: ":8443", + MaxHeaderBytes: 2 << 20, // 2MB, + ReadHeaderTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 5 * time.Minute, + AccessLogQueueSize: 5000, } result.http.Handler = result @@ -146,13 +148,17 @@ func (instance *Lingress) ServeHTTP(resp http.ResponseWriter, req *http.Request) } func (instance *Lingress) onConnState(conn net.Conn, state http.ConnState) { - previous := SetConnState(conn, state) + previous := instance.connectionInformation.SetState(conn, state, func(target http.ConnState) bool { + return target != http.StateNew && + target != http.StateActive && + target != http.StateIdle + }) + if previous == state { return } source := instance.Management.Metrics.Client.Connections.Source - switch previous { case http.StateNew: atomic.AddUint64(&source.New, ^uint64(0)) @@ -161,7 +167,7 @@ func (instance *Lingress) onConnState(conn net.Conn, state http.ConnState) { case http.StateIdle: atomic.AddUint64(&source.Idle, ^uint64(0)) case -1: - // Ignore + // ignore default: return } @@ -178,6 +184,8 @@ func (instance *Lingress) onConnState(conn net.Conn, state http.ConnState) { default: atomic.AddUint64(&source.Current, ^uint64(0)) } + + return } func (instance *Lingress) Init(stop support.Channel) error { @@ -229,7 +237,6 @@ func (instance *Lingress) serve(target *http.Server, addr string, tlsConfig *tls return err } ln = tcpKeepAliveListener{ln.(*net.TCPListener)} - ln = stateTrackingListener{ln} serve := func() error { return target.Serve(ln) diff --git a/utils.go b/utils.go index f6406d1..709ca91 100644 --- a/utils.go +++ b/utils.go @@ -1,12 +1,10 @@ package lingress import ( - "fmt" "github.com/echocat/lingress/support" "net" "net/http" - "reflect" - "sync/atomic" + "sync" "time" ) @@ -39,47 +37,42 @@ func (ln tcpKeepAliveListener) Accept() (net.Conn, error) { return tc, nil } -type stateTrackingListener struct { - net.Listener +type ConnectionInformation struct { + all map[net.Conn]http.ConnState + + mutex *sync.Mutex } -func (ln stateTrackingListener) Accept() (net.Conn, error) { - conn, err := ln.Listener.Accept() - if err != nil { - return nil, err +func NewConnectionInformation() *ConnectionInformation { + return &ConnectionInformation{ + all: make(map[net.Conn]http.ConnState), + mutex: new(sync.Mutex), } - return ConnectionWithConnStateTracking(conn), nil } -type stateTrackingConnection struct { - net.Conn - current int64 -} +func (instance *ConnectionInformation) SetState( + conn net.Conn, + newState http.ConnState, + keepFunc func(http.ConnState) bool, +) (previousState http.ConnState) { + instance.mutex.Lock() + defer instance.mutex.Unlock() -func (instance *stateTrackingConnection) SetConnState(newState http.ConnState) (previousState http.ConnState) { - for { - current := atomic.LoadInt64(&instance.current) - if atomic.CompareAndSwapInt64(&instance.current, current, int64(newState)) { - return http.ConnState(current) - } - } -} + keep := newState >= 0 || keepFunc(newState) -func SetConnState(conn net.Conn, newState http.ConnState) (previousState http.ConnState) { - stc, ok := conn.(*stateTrackingConnection) - if !ok { - panic(fmt.Sprintf("%v is not of type %v", conn, reflect.TypeOf(&stateTrackingConnection{}))) + result, exists := instance.all[conn] + if exists { + if !keep { + delete(instance.all, conn) + return result + } + previousState = result + instance.all[conn] = newState + return previousState } - return stc.SetConnState(newState) -} -func ConnectionWithConnStateTracking(in net.Conn) net.Conn { - stc, ok := in.(*stateTrackingConnection) - if ok { - return stc - } - return &stateTrackingConnection{ - Conn: in, - current: -1, + if keep { + instance.all[conn] = newState } + return -1 }