From 5eea35164ada217fb1c4c7c6010fc53da43e118c Mon Sep 17 00:00:00 2001 From: Gregor Noczinski Date: Mon, 17 Aug 2020 14:40:32 +0200 Subject: [PATCH] 1. Added limitations of max concurrent connections to improve memory usage. 2. Moved code to a separated package for handling connections. 3. Splitted up statistics in http and https --- context/client.go | 6 +- context/context.go | 5 +- context/metrics.go | 4 +- lingress.go | 171 +++++++++++---------------------------- management/management.go | 9 ++- management/metics.go | 64 +++++++++++---- proxy/proxy.go | 5 +- server/connector.go | 35 ++++++++ server/connector_http.go | 158 ++++++++++++++++++++++++++++++++++++ server/connector_id.go | 41 ++++++++++ server/utils.go | 67 +++++++++++++++ utils.go | 15 ---- 12 files changed, 416 insertions(+), 164 deletions(-) create mode 100644 server/connector.go create mode 100644 server/connector_http.go create mode 100644 server/connector_id.go create mode 100644 server/utils.go diff --git a/context/client.go b/context/client.go index 6c194f2..e290c23 100644 --- a/context/client.go +++ b/context/client.go @@ -3,6 +3,7 @@ package context import ( "errors" "fmt" + "github.com/echocat/lingress/server" "github.com/echocat/lingress/support" "net" "net/http" @@ -15,6 +16,7 @@ var ( ) type Client struct { + Connector server.ConnectorId FromOtherReverseProxy bool Response http.ResponseWriter Request *http.Request @@ -28,7 +30,8 @@ type Client struct { address *string } -func (instance *Client) configure(fromOtherReverseProxy bool, resp http.ResponseWriter, req *http.Request) { +func (instance *Client) configure(connector server.ConnectorId, fromOtherReverseProxy bool, resp http.ResponseWriter, req *http.Request) { + instance.Connector = connector instance.FromOtherReverseProxy = fromOtherReverseProxy instance.Response = resp instance.Request = req @@ -52,6 +55,7 @@ func (instance *Client) clean() { _ = b.Close() } } + instance.Connector = "" instance.FromOtherReverseProxy = false instance.Response = nil instance.Request = nil diff --git a/context/context.go b/context/context.go index d086092..6931b21 100644 --- a/context/context.go +++ b/context/context.go @@ -3,6 +3,7 @@ package context import ( "encoding/json" "github.com/echocat/lingress/rules" + "github.com/echocat/lingress/server" "github.com/echocat/lingress/support" log "github.com/sirupsen/logrus" "net/http" @@ -33,7 +34,7 @@ type Context struct { Properties map[string]interface{} } -func AcquireContext(fromOtherReverseProxy bool, resp http.ResponseWriter, req *http.Request) *Context { +func AcquireContext(connector server.ConnectorId, fromOtherReverseProxy bool, resp http.ResponseWriter, req *http.Request) *Context { success := false result := contextPool.Get().(*Context) defer func(created *Context) { @@ -45,7 +46,7 @@ func AcquireContext(fromOtherReverseProxy bool, resp http.ResponseWriter, req *h result.Id = NewId(fromOtherReverseProxy, req) result.CorrelationId = NewCorrelationId(fromOtherReverseProxy, req) result.Stage = StageCreated - result.Client.configure(fromOtherReverseProxy, resp, req) + result.Client.configure(connector, fromOtherReverseProxy, resp, req) result.Upstream.configure() result.Rule = nil diff --git a/context/metrics.go b/context/metrics.go index cee2510..c671e0e 100644 --- a/context/metrics.go +++ b/context/metrics.go @@ -1,8 +1,10 @@ package context +import "github.com/echocat/lingress/server" + type MetricsCollector interface { CollectContext(*Context) - CollectClientStarted() func() + CollectClientStarted(server.ConnectorId) func() CollectUpstreamStarted() func() } diff --git a/lingress.go b/lingress.go index ea2b4ef..a2c4343 100644 --- a/lingress.go +++ b/lingress.go @@ -1,7 +1,6 @@ package lingress import ( - "context" "crypto/tls" "fmt" lctx "github.com/echocat/lingress/context" @@ -9,6 +8,7 @@ import ( "github.com/echocat/lingress/management" "github.com/echocat/lingress/proxy" "github.com/echocat/lingress/rules" + "github.com/echocat/lingress/server" "github.com/echocat/lingress/support" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -16,27 +16,19 @@ import ( "net/http" "strings" "sync/atomic" - "time" ) type Lingress struct { - RulesRepository rules.CombinedRepository - Proxy *proxy.Proxy - Fallback *fallback.Fallback - Management *management.Management - http http.Server - https http.Server + RulesRepository rules.CombinedRepository + Proxy *proxy.Proxy + Fallback *fallback.Fallback + Management *management.Management + Http *server.HttpConnector + Https *server.HttpConnector + AccessLogQueueSize uint16 + accessLogQueue chan accessLogEntry connectionInformation *ConnectionInformation - - HttpListenAddr string - HttpsListenAddr string - MaxHeaderBytes uint - ReadHeaderTimeout time.Duration - WriteTimeout time.Duration - IdleTimeout time.Duration - - AccessLogQueueSize uint16 } type accessLogEntry map[string]interface{} @@ -48,44 +40,36 @@ type ConnectionStates struct { } func New(fps support.FileProviders) (*Lingress, error) { + connectorIds := []server.ConnectorId{server.DefaultConnectorIdHttp, server.DefaultConnectorIdHttps} if r, err := rules.NewRepository(); err != nil { return nil, err } else if p, err := proxy.New(r); err != nil { return nil, err } else if f, err := fallback.New(fps); err != nil { return nil, err - } else if m, err := management.New(r); err != nil { + } else if m, err := management.New(connectorIds, r); err != nil { + return nil, err + } else if hHttp, err := server.NewHttpConnector(server.DefaultConnectorIdHttp); err != nil { + return nil, err + } else if hHttps, err := server.NewHttpConnector(server.DefaultConnectorIdHttps); err != nil { return nil, err } else { result := &Lingress{ - RulesRepository: r, - Proxy: p, - Fallback: f, - Management: m, - http: http.Server{ - ErrorLog: support.StdLog(log.Fields{ - "context": "server.http", - }, log.DebugLevel), - }, - https: http.Server{ - ErrorLog: support.StdLog(log.Fields{ - "context": "server.https", - }, log.DebugLevel), - }, + RulesRepository: r, + Proxy: p, + Fallback: f, + Management: m, + Http: hHttp, + Https: hHttps, connectionInformation: NewConnectionInformation(), - HttpListenAddr: ":8080", - HttpsListenAddr: ":8443", - MaxHeaderBytes: 2 << 20, // 2MB, - ReadHeaderTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 5 * time.Minute, AccessLogQueueSize: 5000, } - result.http.Handler = result - result.https.Handler = result - result.http.ConnState = result.onConnState - result.https.ConnState = result.onConnState + result.Http.Handler = result + result.Http.MaxConnections = 512 + + result.Https.Handler = result + result.Https.Server.Addr = ":8443" p.ResultHandler = result.onResult p.AccessLogger = result.onAccessLog @@ -96,35 +80,10 @@ func New(fps support.FileProviders) (*Lingress, error) { } func (instance *Lingress) RegisterFlag(fe support.FlagEnabled, appPrefix string) error { - fe.Flag("listen.http", "Listen address where the proxy is listening to serve"). - PlaceHolder(instance.HttpListenAddr). - Envar(support.FlagEnvName(appPrefix, "LISTEN_HTTP")). - StringVar(&instance.HttpListenAddr) - fe.Flag("listen.https", "Listen address where the proxy is listening to serve"). - PlaceHolder(instance.HttpsListenAddr). - Envar(support.FlagEnvName(appPrefix, "LISTEN_HTTPS")). - StringVar(&instance.HttpsListenAddr) fe.Flag("accessLogQueueSize", "Maximum number of accessLog elements that could be queue before blocking."). PlaceHolder(fmt.Sprint(instance.AccessLogQueueSize)). Envar(support.FlagEnvName(appPrefix, "ACCESS_LOG_QUEUE_SIZE")). Uint16Var(&instance.AccessLogQueueSize) - fe.Flag("client.maxHeaderBytes", "Maximum number of bytes the server will read parsing the request header's keys and values, including the request line. It does not limit the size of the request body."). - PlaceHolder(fmt.Sprint(instance.MaxHeaderBytes)). - Envar(support.FlagEnvName(appPrefix, "CLIENT_MAX_HEADER_BYTES")). - UintVar(&instance.MaxHeaderBytes) - fe.Flag("client.readHeaderTimeout", "Amount of time allowed to read request headers. The connection's read deadline is reset after reading the headers and the Handler can decide what is considered too slow for the body."). - PlaceHolder(fmt.Sprint(instance.ReadHeaderTimeout)). - Envar(support.FlagEnvName(appPrefix, "CLIENT_READ_HEADER_TIMEOUT")). - DurationVar(&instance.ReadHeaderTimeout) - fe.Flag("client.writeTimeout", "Maximum duration before timing out writes of the response. It is reset whenever a new request's header is read."). - PlaceHolder(fmt.Sprint(instance.WriteTimeout)). - Envar(support.FlagEnvName(appPrefix, "CLIENT_WRITE_TIMEOUT")). - DurationVar(&instance.WriteTimeout) - fe.Flag("client.idleTimeout", "Maximum amount of time to wait for the next request when keep-alives are enabled."). - PlaceHolder(fmt.Sprint(instance.IdleTimeout)). - Envar(support.FlagEnvName(appPrefix, "CLIENT_IDLE_TIMEOUT")). - DurationVar(&instance.IdleTimeout) - if err := instance.RulesRepository.RegisterFlag(fe, appPrefix); err != nil { return err } @@ -137,17 +96,23 @@ func (instance *Lingress) RegisterFlag(fe support.FlagEnabled, appPrefix string) if err := instance.Management.RegisterFlag(fe, appPrefix); err != nil { return err } + if err := instance.Http.RegisterFlag(fe, appPrefix); err != nil { + return err + } + if err := instance.Https.RegisterFlag(fe, appPrefix); err != nil { + return err + } return nil } -func (instance *Lingress) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - finalize := instance.Management.CollectClientStarted() +func (instance *Lingress) ServeHTTP(connector server.Connector, resp http.ResponseWriter, req *http.Request) { + finalize := instance.Management.CollectClientStarted(connector.GetId()) defer finalize() - instance.Proxy.ServeHTTP(resp, req) + instance.Proxy.ServeHTTP(connector, resp, req) } -func (instance *Lingress) onConnState(conn net.Conn, state http.ConnState) { +func (instance *Lingress) OnConnState(connector server.Connector, conn net.Conn, state http.ConnState) { previous := instance.connectionInformation.SetState(conn, state, func(target http.ConnState) bool { return target != http.StateNew && target != http.StateActive && @@ -158,7 +123,12 @@ func (instance *Lingress) onConnState(conn net.Conn, state http.ConnState) { return } - source := instance.Management.Metrics.Client.Connections.Source + client, ok := instance.Management.Metrics.Client[connector.GetId()] + if !ok { + return + } + + source := client.Connections.Source switch previous { case http.StateNew: atomic.AddUint64(&source.New, ^uint64(0)) @@ -206,15 +176,16 @@ func (instance *Lingress) Init(stop support.Channel) error { go instance.shutdownListener(stop) - tlsConfig, err := instance.createTlsConfig() - if err != nil { + if tlsConfig, err := instance.createTlsConfig(); err != nil { return err + } else { + instance.Https.TlsConfig = tlsConfig } - if err := instance.serve(&instance.http, instance.HttpListenAddr, nil, stop); err != nil { + if err := instance.Http.Serve(stop); err != nil { return err } - if err := instance.serve(&instance.https, instance.HttpsListenAddr, tlsConfig, stop); err != nil { + if err := instance.Https.Serve(stop); err != nil { return err } @@ -225,43 +196,6 @@ func (instance *Lingress) Init(stop support.Channel) error { return nil } -func (instance *Lingress) serve(target *http.Server, addr string, tlsConfig *tls.Config, stop support.Channel) error { - target.Addr = addr - target.MaxHeaderBytes = int(instance.MaxHeaderBytes) - target.ReadHeaderTimeout = instance.ReadHeaderTimeout - target.WriteTimeout = instance.WriteTimeout - target.IdleTimeout = instance.IdleTimeout - - ln, err := net.Listen("tcp", addr) - if err != nil { - return err - } - ln = tcpKeepAliveListener{ln.(*net.TCPListener)} - - serve := func() error { - return target.Serve(ln) - } - if tlsConfig != nil { - target.TLSConfig = tlsConfig - serve = func() error { - return target.ServeTLS(ln, "", "") - } - } - - go func() { - if err := serve(); err != nil && err != http.ErrServerClosed { - log.WithError(err). - WithField("addr", target.Addr). - Error("server is unable to serve proxy interface") - stop.Broadcast() - } - }() - log.WithField("addr", target.Addr). - Info("serve proxy interface") - - return nil -} - func (instance *Lingress) createTlsConfig() (*tls.Config, error) { fail := func(err error) (*tls.Config, error) { return nil, errors.Wrap(err, "cannot create TLS config") @@ -298,15 +232,8 @@ func (instance *Lingress) resolveCertificate(info *tls.ClientHelloInfo) (*tls.Ce func (instance *Lingress) shutdownListener(stop support.Channel) { stop.Wait() - ctx, _ := context.WithTimeout(context.Background(), 5*time.Minute) - if err := instance.http.Shutdown(ctx); err != nil { - log.WithError(err). - Warnf("cannot graceful shutdown proxy interface %s", instance.http.Addr) - } - if err := instance.https.Shutdown(ctx); err != nil { - log.WithError(err). - Warnf("cannot graceful shutdown proxy interface %s", instance.http.Addr) - } + instance.Http.Shutdown() + instance.Https.Shutdown() } func (instance *Lingress) accessLogQueueWorker(stop support.Channel, queue chan accessLogEntry) { diff --git a/management/management.go b/management/management.go index 04ecead..d44033e 100644 --- a/management/management.go +++ b/management/management.go @@ -5,6 +5,7 @@ import ( "fmt" lctx "github.com/echocat/lingress/context" "github.com/echocat/lingress/rules" + "github.com/echocat/lingress/server" "github.com/echocat/lingress/support" log "github.com/sirupsen/logrus" "net" @@ -20,9 +21,9 @@ type Management struct { rules rules.Repository } -func New(rulesRepository rules.Repository) (*Management, error) { +func New(connectorIds []server.ConnectorId, rulesRepository rules.Repository) (*Management, error) { result := &Management{ - Metrics: NewMetrics(rulesRepository), + Metrics: NewMetrics(connectorIds, rulesRepository), rules: rulesRepository, server: http.Server{ Addr: ":8090", @@ -69,8 +70,8 @@ func (instance *Management) CollectContext(ctx *lctx.Context) { instance.Metrics.CollectContext(ctx) } -func (instance *Management) CollectClientStarted() func() { - return instance.Metrics.CollectClientStarted() +func (instance *Management) CollectClientStarted(connector server.ConnectorId) func() { + return instance.Metrics.CollectClientStarted(connector) } func (instance *Management) CollectUpstreamStarted() func() { diff --git a/management/metics.go b/management/metics.go index 55e76f9..48d8c0a 100644 --- a/management/metics.go +++ b/management/metics.go @@ -3,6 +3,7 @@ package management import ( "github.com/echocat/lingress/context" "github.com/echocat/lingress/rules" + "github.com/echocat/lingress/server" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -22,7 +23,7 @@ var ( ) type Metrics struct { - Client *ClientMetrics + Client ConnectorEnabledClientMetrics Upstream *UpstreamMetrics Rules *RulesMetrics @@ -35,6 +36,8 @@ type ClientMetrics struct { Connections *ConnectionMetrics } +type ConnectorEnabledClientMetrics map[server.ConnectorId]*ClientMetrics + type UpstreamMetrics struct { Request *RequestMetrics } @@ -77,15 +80,16 @@ type ConnectionStates struct { Current uint64 Total uint64 + Max uint64 } -func NewMetrics(rulesRepository rules.Repository) *Metrics { +func NewMetrics(connectorIds []server.ConnectorId, rulesRepository rules.Repository) *Metrics { registry := prometheus.NewRegistry() registry.MustRegister(prometheus.NewGoCollector()) registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) return &Metrics{ - Client: NewClientMetrics(registry), + Client: NewConnectorEnabledClientMetrics(connectorIds, registry), Upstream: NewUpstreamMetrics(registry), Rules: NewRulesMetrics(registry, rulesRepository), @@ -176,16 +180,24 @@ func NewConnectionMetrics(registerer prometheus.Registerer, variant string) *Con return result } -func NewClientMetrics(registerer prometheus.Registerer) *ClientMetrics { +func NewConnectorEnabledClientMetrics(connectorIds []server.ConnectorId, registerer prometheus.Registerer) ConnectorEnabledClientMetrics { + result := make(ConnectorEnabledClientMetrics, len(connectorIds)) + for _, id := range connectorIds { + result[id] = NewClientMetrics(id, registerer) + } + return result +} + +func NewClientMetrics(id server.ConnectorId, registerer prometheus.Registerer) *ClientMetrics { return &ClientMetrics{ - Request: NewRequestMetrics(registerer, "client", []float64{ + Request: NewRequestMetrics(registerer, "client_"+string(id), []float64{ 0.001, 0.01, 0.1, 1, 10, }), - Connections: NewConnectionMetrics(registerer, "client"), + Connections: NewConnectionMetrics(registerer, "client_"+string(id)), } } @@ -229,23 +241,15 @@ func (instance *Metrics) ServeHTTP(resp http.ResponseWriter, req *http.Request) func (instance *Metrics) CollectContext(ctx *context.Context) { labels := instance.labelsFor(ctx) - if v := ctx.Client.Duration; v > -1 { - instance.Client.Request.DurationSeconds.With(labels).Observe(v.Seconds()) - instance.Client.Request.Total.With(labels).Inc() - } - + instance.Client.collectContext(labels, ctx) if v := ctx.Upstream.Duration; v > -1 { instance.Upstream.Request.DurationSeconds.With(labels).Observe(v.Seconds()) instance.Upstream.Request.Total.With(labels).Inc() } } -func (instance *Metrics) CollectClientStarted() func() { - source := instance.Client.Request.Source - atomic.AddUint64(&source.Current, 1) - return func() { - atomic.AddUint64(&source.Current, ^uint64(0)) - } +func (instance *Metrics) CollectClientStarted(connector server.ConnectorId) func() { + return instance.Client.collectClientStarted(connector) } func (instance *Metrics) CollectUpstreamStarted() func() { @@ -312,3 +316,29 @@ func (instance *RulesMetrics) sources() float64 { }) return float64(len(result)) } + +func (instance ConnectorEnabledClientMetrics) collectContext(labels prometheus.Labels, ctx *context.Context) { + if instance == nil { + return + } + if v := instance[ctx.Client.Connector]; v != nil { + if d := ctx.Client.Duration; d > -1 { + v.Request.DurationSeconds.With(labels).Observe(d.Seconds()) + v.Request.Total.With(labels).Inc() + } + } +} + +func (instance ConnectorEnabledClientMetrics) collectClientStarted(connector server.ConnectorId) func() { + if instance == nil { + return func() {} + } + if v := instance[connector]; v != nil { + source := v.Request.Source + atomic.AddUint64(&source.Current, 1) + return func() { + atomic.AddUint64(&source.Current, ^uint64(0)) + } + } + return func() {} +} diff --git a/proxy/proxy.go b/proxy/proxy.go index aeb59bb..5735484 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -7,6 +7,7 @@ import ( "fmt" lctx "github.com/echocat/lingress/context" "github.com/echocat/lingress/rules" + "github.com/echocat/lingress/server" "github.com/echocat/lingress/support" log "github.com/sirupsen/logrus" "io" @@ -114,8 +115,8 @@ func (instance *Proxy) Init(stop support.Channel) error { return nil } -func (instance *Proxy) ServeHTTP(resp http.ResponseWriter, req *http.Request) { - ctx := lctx.AcquireContext(instance.BehindOtherReverseProxy, resp, req) +func (instance *Proxy) ServeHTTP(connector server.Connector, resp http.ResponseWriter, req *http.Request) { + ctx := lctx.AcquireContext(connector.GetId(), instance.BehindOtherReverseProxy, resp, req) defer ctx.Release() ctx.Client.Started = time.Now() defer func(ctx *lctx.Context) { diff --git a/server/connector.go b/server/connector.go new file mode 100644 index 0000000..2ab2866 --- /dev/null +++ b/server/connector.go @@ -0,0 +1,35 @@ +package server + +import ( + "context" + "net" + "net/http" +) + +const ( + DefaultConnectorIdHttp = ConnectorId("http") + DefaultConnectorIdHttps = ConnectorId("https") + + ConnectorKey = "lingress.server.connector" +) + +type Connector interface { + GetId() ConnectorId +} + +func GetConnectorOfContext(ctx context.Context) Connector { + if v, ok := ctx.Value(ConnectorKey).(Connector); ok { + return v + } else { + return nil + } +} + +func ContextWithConnector(ctx context.Context, connector Connector) context.Context { + return context.WithValue(ctx, ConnectorKey, connector) +} + +type ConnectorHandler interface { + ServeHTTP(Connector, http.ResponseWriter, *http.Request) + OnConnState(Connector, net.Conn, http.ConnState) +} diff --git a/server/connector_http.go b/server/connector_http.go new file mode 100644 index 0000000..06bad22 --- /dev/null +++ b/server/connector_http.go @@ -0,0 +1,158 @@ +package server + +import ( + "context" + "crypto/tls" + "fmt" + "github.com/echocat/lingress/support" + log "github.com/sirupsen/logrus" + "net" + "net/http" + "strings" + "time" +) + +type HttpConnector struct { + Id ConnectorId + Handler ConnectorHandler + + TlsConfig *tls.Config + + MaxConnections uint16 + + Server http.Server +} + +func NewHttpConnector(id ConnectorId) (*HttpConnector, error) { + result := HttpConnector{ + Id: id, + MaxConnections: 1024, + + Server: http.Server{ + Addr: ":8080", + MaxHeaderBytes: 2 << 20, // 2MB, + ReadHeaderTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 5 * time.Minute, + ErrorLog: support.StdLog(log.Fields{ + "context": "server.http", + }, log.DebugLevel), + }, + } + + result.Server.Handler = http.HandlerFunc(result.serveHTTP) + result.Server.ConnState = result.onConnState + + return &result, nil +} + +func (instance *HttpConnector) Serve(stop support.Channel) error { + ln, err := net.Listen("tcp", instance.Server.Addr) + if err != nil { + return err + } + ln = tcpKeepAliveListener{ln.(*net.TCPListener)} + ln = newLimitedListener(instance.MaxConnections, ln) + + var serve func() error + if tlsConfig := instance.Server.TLSConfig; tlsConfig != nil { + serve = func() error { + return instance.Server.ServeTLS(ln, "", "") + } + } else { + serve = func() error { + return instance.Server.Serve(ln) + } + } + + go func() { + if err := serve(); err != nil && err != http.ErrServerClosed { + log.WithError(err). + WithField("address", instance.Server.Addr). + Error("server is unable to serve proxy interface") + stop.Broadcast() + } + }() + log.WithField("address", instance.Server.Addr). + Info("serve proxy interface") + + return nil +} + +func (instance *HttpConnector) Shutdown() { + ctx, _ := context.WithTimeout(context.Background(), 5*time.Minute) + if err := instance.Server.Shutdown(ctx); err != nil { + log.WithError(err). + Warnf("cannot graceful shutdown %s proxy interface %s", instance.Id, instance.Server.Addr) + } +} + +func (instance *HttpConnector) flagName(prefix, suffix string) string { + return fmt.Sprintf("%s.%s.%s", prefix, instance.Id, suffix) +} + +func (instance *HttpConnector) serverFlagName(suffix string) string { + return instance.flagName("server", suffix) +} + +func (instance *HttpConnector) clientFlagName(suffix string) string { + return instance.flagName("client", suffix) +} + +func (instance *HttpConnector) flagEnvVar(appPrefix string, prefix, suffix string) string { + return support.FlagEnvName(appPrefix, fmt.Sprintf("%s_%s_%s", prefix, strings.ToUpper(string(instance.Id)), suffix)) +} + +func (instance *HttpConnector) serverFlagEnvVar(appPrefix string, suffix string) string { + return instance.flagEnvVar(appPrefix, "SERVER", suffix) +} + +func (instance *HttpConnector) clientFlagEnvVar(appPrefix string, suffix string) string { + return instance.flagEnvVar(appPrefix, "CLIENT", suffix) +} + +func (instance *HttpConnector) RegisterFlag(fe support.FlagEnabled, appPrefix string) error { + fe.Flag(instance.serverFlagName("address"), "Listen address where the proxy is listening to serve"). + PlaceHolder(instance.Server.Addr). + Envar(instance.serverFlagEnvVar(appPrefix, "address")). + StringVar(&instance.Server.Addr) + fe.Flag(instance.serverFlagName("maxConnections"), "Maximum amount of connections handled by lingress concurrently via HTTP."). + PlaceHolder(fmt.Sprint(instance.MaxConnections)). + Envar(instance.serverFlagEnvVar(appPrefix, "MAX_CONNECTIONS")). + Uint16Var(&instance.MaxConnections) + + fe.Flag(instance.clientFlagName("maxHeaderBytes"), "Maximum number of bytes the server will read parsing the request header's keys and values, including the request line. It does not limit the size of the request body."). + PlaceHolder(fmt.Sprint(instance.Server.MaxHeaderBytes)). + Envar(instance.clientFlagEnvVar(appPrefix, "MAX_HEADER_BYTES")). + IntVar(&instance.Server.MaxHeaderBytes) + fe.Flag(instance.clientFlagName("readHeaderTimeout"), "Amount of time allowed to read request headers. The connection's read deadline is reset after reading the headers and the Handler can decide what is considered too slow for the body."). + PlaceHolder(fmt.Sprint(instance.Server.ReadHeaderTimeout)). + Envar(instance.clientFlagEnvVar(appPrefix, "READ_HEADER_TIMEOUT")). + DurationVar(&instance.Server.ReadHeaderTimeout) + fe.Flag(instance.clientFlagName("writeTimeout"), "Maximum duration before timing out writes of the response. It is reset whenever a new request's header is read."). + PlaceHolder(fmt.Sprint(instance.Server.WriteTimeout)). + Envar(instance.clientFlagEnvVar(appPrefix, "WRITE_TIMEOUT")). + DurationVar(&instance.Server.WriteTimeout) + fe.Flag(instance.clientFlagName("idleTimeout"), "Maximum amount of time to wait for the next request when keep-alives are enabled."). + PlaceHolder(fmt.Sprint(instance.Server.IdleTimeout)). + Envar(instance.clientFlagEnvVar(appPrefix, "IDLE_TIMEOUT")). + DurationVar(&instance.Server.IdleTimeout) + + return nil +} + +func (instance *HttpConnector) GetId() ConnectorId { + return instance.Id +} + +func (instance *HttpConnector) serveHTTP(resp http.ResponseWriter, req *http.Request) { + if v := instance.Handler; v != nil { + v.ServeHTTP(instance, resp, req) + } +} + +func (instance *HttpConnector) onConnState(conn net.Conn, state http.ConnState) { + if v := instance.Handler; v != nil { + v.OnConnState(instance, conn, state) + } +} diff --git a/server/connector_id.go b/server/connector_id.go new file mode 100644 index 0000000..5d7c93e --- /dev/null +++ b/server/connector_id.go @@ -0,0 +1,41 @@ +package server + +import ( + "errors" + "fmt" + "regexp" +) + +type ConnectorId string + +var ( + connectorIdRegexp = regexp.MustCompile(`^[a-z0-9]([-a-z0-9]*[a-z0-9])$`) + + ErrIllegalConnectorId = errors.New("illegal connector-id") +) + +func (instance *ConnectorId) Set(plain string) error { + return instance.UnmarshalText([]byte(plain)) +} + +func (instance ConnectorId) String() string { + v, _ := instance.MarshalText() + return string(v) +} + +func (instance ConnectorId) MarshalText() (text []byte, err error) { + if len(instance) > 0 && (!connectorIdRegexp.MatchString(string(instance)) || len(instance) > 64) { + return []byte(fmt.Sprintf("illegal-connector-id-%s", string(instance))), + fmt.Errorf("%w: %s", ErrIllegalConnectorId, string(instance)) + } + return []byte(instance), nil +} + +func (instance *ConnectorId) UnmarshalText(text []byte) error { + v := ConnectorId(text) + if _, err := instance.MarshalText(); err != nil { + return err + } + *instance = v + return nil +} diff --git a/server/utils.go b/server/utils.go new file mode 100644 index 0000000..f4e99fa --- /dev/null +++ b/server/utils.go @@ -0,0 +1,67 @@ +package server + +import ( + "net" + "sync" + "time" +) + +type tcpKeepAliveListener struct { + *net.TCPListener +} + +func (ln tcpKeepAliveListener) Accept() (net.Conn, error) { + tc, err := ln.AcceptTCP() + if err != nil { + return nil, err + } + _ = tc.SetKeepAlive(true) + _ = tc.SetKeepAlivePeriod(2 * time.Minute) + return tc, nil +} + +type limitedListener struct { + sync.Mutex + net.Listener + sem chan bool +} + +func newLimitedListener(count uint16, l net.Listener) *limitedListener { + sem := make(chan bool, count) + for i := uint16(0); i < count; i++ { + sem <- true + } + return &limitedListener{ + Listener: l, + sem: sem, + } +} + +func (instance *limitedListener) Accept() (net.Conn, error) { + success := false + <-instance.sem // acquire + defer func() { + if !success { + instance.sem <- true + } + }() + if c, err := instance.Listener.Accept(); err != nil { + return nil, err + } else { + result := &limitedConn{c, instance} + success = true + return result, nil + } +} + +type limitedConn struct { + net.Conn + parent *limitedListener +} + +func (instance *limitedConn) Close() error { + defer func() { + instance.parent.sem <- true // release + }() + return instance.Conn.Close() +} diff --git a/utils.go b/utils.go index 709ca91..42ad207 100644 --- a/utils.go +++ b/utils.go @@ -5,7 +5,6 @@ import ( "net" "net/http" "sync" - "time" ) var ( @@ -23,20 +22,6 @@ var ( ) ) -type tcpKeepAliveListener struct { - *net.TCPListener -} - -func (ln tcpKeepAliveListener) Accept() (net.Conn, error) { - tc, err := ln.AcceptTCP() - if err != nil { - return nil, err - } - _ = tc.SetKeepAlive(true) - _ = tc.SetKeepAlivePeriod(2 * time.Minute) - return tc, nil -} - type ConnectionInformation struct { all map[net.Conn]http.ConnState