Skip to content

Commit

Permalink
1. Added limitations of max concurrent connections to improve memory …
Browse files Browse the repository at this point in the history
…usage.

2. Moved code to a separated package for handling connections.
3. Splitted up statistics in http and https
  • Loading branch information
blaubaer committed Aug 17, 2020
1 parent 9bcce64 commit 5eea351
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 164 deletions.
6 changes: 5 additions & 1 deletion context/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package context
import (
"errors"
"fmt"
"github.com/echocat/lingress/server"
"github.com/echocat/lingress/support"
"net"
"net/http"
Expand All @@ -15,6 +16,7 @@ var (
)

type Client struct {
Connector server.ConnectorId
FromOtherReverseProxy bool
Response http.ResponseWriter
Request *http.Request
Expand All @@ -28,7 +30,8 @@ type Client struct {
address *string
}

func (instance *Client) configure(fromOtherReverseProxy bool, resp http.ResponseWriter, req *http.Request) {
func (instance *Client) configure(connector server.ConnectorId, fromOtherReverseProxy bool, resp http.ResponseWriter, req *http.Request) {
instance.Connector = connector
instance.FromOtherReverseProxy = fromOtherReverseProxy
instance.Response = resp
instance.Request = req
Expand All @@ -52,6 +55,7 @@ func (instance *Client) clean() {
_ = b.Close()
}
}
instance.Connector = ""
instance.FromOtherReverseProxy = false
instance.Response = nil
instance.Request = nil
Expand Down
5 changes: 3 additions & 2 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package context
import (
"encoding/json"
"github.com/echocat/lingress/rules"
"github.com/echocat/lingress/server"
"github.com/echocat/lingress/support"
log "github.com/sirupsen/logrus"
"net/http"
Expand Down Expand Up @@ -33,7 +34,7 @@ type Context struct {
Properties map[string]interface{}
}

func AcquireContext(fromOtherReverseProxy bool, resp http.ResponseWriter, req *http.Request) *Context {
func AcquireContext(connector server.ConnectorId, fromOtherReverseProxy bool, resp http.ResponseWriter, req *http.Request) *Context {
success := false
result := contextPool.Get().(*Context)
defer func(created *Context) {
Expand All @@ -45,7 +46,7 @@ func AcquireContext(fromOtherReverseProxy bool, resp http.ResponseWriter, req *h
result.Id = NewId(fromOtherReverseProxy, req)
result.CorrelationId = NewCorrelationId(fromOtherReverseProxy, req)
result.Stage = StageCreated
result.Client.configure(fromOtherReverseProxy, resp, req)
result.Client.configure(connector, fromOtherReverseProxy, resp, req)
result.Upstream.configure()

result.Rule = nil
Expand Down
4 changes: 3 additions & 1 deletion context/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package context

import "github.com/echocat/lingress/server"

type MetricsCollector interface {
CollectContext(*Context)

CollectClientStarted() func()
CollectClientStarted(server.ConnectorId) func()
CollectUpstreamStarted() func()
}
171 changes: 49 additions & 122 deletions lingress.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,34 @@
package lingress

import (
"context"
"crypto/tls"
"fmt"
lctx "github.com/echocat/lingress/context"
"github.com/echocat/lingress/fallback"
"github.com/echocat/lingress/management"
"github.com/echocat/lingress/proxy"
"github.com/echocat/lingress/rules"
"github.com/echocat/lingress/server"
"github.com/echocat/lingress/support"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"net"
"net/http"
"strings"
"sync/atomic"
"time"
)

type Lingress struct {
RulesRepository rules.CombinedRepository
Proxy *proxy.Proxy
Fallback *fallback.Fallback
Management *management.Management
http http.Server
https http.Server
RulesRepository rules.CombinedRepository
Proxy *proxy.Proxy
Fallback *fallback.Fallback
Management *management.Management
Http *server.HttpConnector
Https *server.HttpConnector
AccessLogQueueSize uint16

accessLogQueue chan accessLogEntry
connectionInformation *ConnectionInformation

HttpListenAddr string
HttpsListenAddr string
MaxHeaderBytes uint
ReadHeaderTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration

AccessLogQueueSize uint16
}

type accessLogEntry map[string]interface{}
Expand All @@ -48,44 +40,36 @@ type ConnectionStates struct {
}

func New(fps support.FileProviders) (*Lingress, error) {
connectorIds := []server.ConnectorId{server.DefaultConnectorIdHttp, server.DefaultConnectorIdHttps}
if r, err := rules.NewRepository(); err != nil {
return nil, err
} else if p, err := proxy.New(r); err != nil {
return nil, err
} else if f, err := fallback.New(fps); err != nil {
return nil, err
} else if m, err := management.New(r); err != nil {
} else if m, err := management.New(connectorIds, r); err != nil {
return nil, err
} else if hHttp, err := server.NewHttpConnector(server.DefaultConnectorIdHttp); err != nil {
return nil, err
} else if hHttps, err := server.NewHttpConnector(server.DefaultConnectorIdHttps); err != nil {
return nil, err
} else {
result := &Lingress{
RulesRepository: r,
Proxy: p,
Fallback: f,
Management: m,
http: http.Server{
ErrorLog: support.StdLog(log.Fields{
"context": "server.http",
}, log.DebugLevel),
},
https: http.Server{
ErrorLog: support.StdLog(log.Fields{
"context": "server.https",
}, log.DebugLevel),
},
RulesRepository: r,
Proxy: p,
Fallback: f,
Management: m,
Http: hHttp,
Https: hHttps,
connectionInformation: NewConnectionInformation(),
HttpListenAddr: ":8080",
HttpsListenAddr: ":8443",
MaxHeaderBytes: 2 << 20, // 2MB,
ReadHeaderTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 5 * time.Minute,
AccessLogQueueSize: 5000,
}

result.http.Handler = result
result.https.Handler = result
result.http.ConnState = result.onConnState
result.https.ConnState = result.onConnState
result.Http.Handler = result
result.Http.MaxConnections = 512

result.Https.Handler = result
result.Https.Server.Addr = ":8443"

p.ResultHandler = result.onResult
p.AccessLogger = result.onAccessLog
Expand All @@ -96,35 +80,10 @@ func New(fps support.FileProviders) (*Lingress, error) {
}

func (instance *Lingress) RegisterFlag(fe support.FlagEnabled, appPrefix string) error {
fe.Flag("listen.http", "Listen address where the proxy is listening to serve").
PlaceHolder(instance.HttpListenAddr).
Envar(support.FlagEnvName(appPrefix, "LISTEN_HTTP")).
StringVar(&instance.HttpListenAddr)
fe.Flag("listen.https", "Listen address where the proxy is listening to serve").
PlaceHolder(instance.HttpsListenAddr).
Envar(support.FlagEnvName(appPrefix, "LISTEN_HTTPS")).
StringVar(&instance.HttpsListenAddr)
fe.Flag("accessLogQueueSize", "Maximum number of accessLog elements that could be queue before blocking.").
PlaceHolder(fmt.Sprint(instance.AccessLogQueueSize)).
Envar(support.FlagEnvName(appPrefix, "ACCESS_LOG_QUEUE_SIZE")).
Uint16Var(&instance.AccessLogQueueSize)
fe.Flag("client.maxHeaderBytes", "Maximum number of bytes the server will read parsing the request header's keys and values, including the request line. It does not limit the size of the request body.").
PlaceHolder(fmt.Sprint(instance.MaxHeaderBytes)).
Envar(support.FlagEnvName(appPrefix, "CLIENT_MAX_HEADER_BYTES")).
UintVar(&instance.MaxHeaderBytes)
fe.Flag("client.readHeaderTimeout", "Amount of time allowed to read request headers. The connection's read deadline is reset after reading the headers and the Handler can decide what is considered too slow for the body.").
PlaceHolder(fmt.Sprint(instance.ReadHeaderTimeout)).
Envar(support.FlagEnvName(appPrefix, "CLIENT_READ_HEADER_TIMEOUT")).
DurationVar(&instance.ReadHeaderTimeout)
fe.Flag("client.writeTimeout", "Maximum duration before timing out writes of the response. It is reset whenever a new request's header is read.").
PlaceHolder(fmt.Sprint(instance.WriteTimeout)).
Envar(support.FlagEnvName(appPrefix, "CLIENT_WRITE_TIMEOUT")).
DurationVar(&instance.WriteTimeout)
fe.Flag("client.idleTimeout", "Maximum amount of time to wait for the next request when keep-alives are enabled.").
PlaceHolder(fmt.Sprint(instance.IdleTimeout)).
Envar(support.FlagEnvName(appPrefix, "CLIENT_IDLE_TIMEOUT")).
DurationVar(&instance.IdleTimeout)

if err := instance.RulesRepository.RegisterFlag(fe, appPrefix); err != nil {
return err
}
Expand All @@ -137,17 +96,23 @@ func (instance *Lingress) RegisterFlag(fe support.FlagEnabled, appPrefix string)
if err := instance.Management.RegisterFlag(fe, appPrefix); err != nil {
return err
}
if err := instance.Http.RegisterFlag(fe, appPrefix); err != nil {
return err
}
if err := instance.Https.RegisterFlag(fe, appPrefix); err != nil {
return err
}
return nil
}

func (instance *Lingress) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
finalize := instance.Management.CollectClientStarted()
func (instance *Lingress) ServeHTTP(connector server.Connector, resp http.ResponseWriter, req *http.Request) {
finalize := instance.Management.CollectClientStarted(connector.GetId())
defer finalize()

instance.Proxy.ServeHTTP(resp, req)
instance.Proxy.ServeHTTP(connector, resp, req)
}

func (instance *Lingress) onConnState(conn net.Conn, state http.ConnState) {
func (instance *Lingress) OnConnState(connector server.Connector, conn net.Conn, state http.ConnState) {
previous := instance.connectionInformation.SetState(conn, state, func(target http.ConnState) bool {
return target != http.StateNew &&
target != http.StateActive &&
Expand All @@ -158,7 +123,12 @@ func (instance *Lingress) onConnState(conn net.Conn, state http.ConnState) {
return
}

source := instance.Management.Metrics.Client.Connections.Source
client, ok := instance.Management.Metrics.Client[connector.GetId()]
if !ok {
return
}

source := client.Connections.Source
switch previous {
case http.StateNew:
atomic.AddUint64(&source.New, ^uint64(0))
Expand Down Expand Up @@ -206,15 +176,16 @@ func (instance *Lingress) Init(stop support.Channel) error {

go instance.shutdownListener(stop)

tlsConfig, err := instance.createTlsConfig()
if err != nil {
if tlsConfig, err := instance.createTlsConfig(); err != nil {
return err
} else {
instance.Https.TlsConfig = tlsConfig
}

if err := instance.serve(&instance.http, instance.HttpListenAddr, nil, stop); err != nil {
if err := instance.Http.Serve(stop); err != nil {
return err
}
if err := instance.serve(&instance.https, instance.HttpsListenAddr, tlsConfig, stop); err != nil {
if err := instance.Https.Serve(stop); err != nil {
return err
}

Expand All @@ -225,43 +196,6 @@ func (instance *Lingress) Init(stop support.Channel) error {
return nil
}

func (instance *Lingress) serve(target *http.Server, addr string, tlsConfig *tls.Config, stop support.Channel) error {
target.Addr = addr
target.MaxHeaderBytes = int(instance.MaxHeaderBytes)
target.ReadHeaderTimeout = instance.ReadHeaderTimeout
target.WriteTimeout = instance.WriteTimeout
target.IdleTimeout = instance.IdleTimeout

ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
ln = tcpKeepAliveListener{ln.(*net.TCPListener)}

serve := func() error {
return target.Serve(ln)
}
if tlsConfig != nil {
target.TLSConfig = tlsConfig
serve = func() error {
return target.ServeTLS(ln, "", "")
}
}

go func() {
if err := serve(); err != nil && err != http.ErrServerClosed {
log.WithError(err).
WithField("addr", target.Addr).
Error("server is unable to serve proxy interface")
stop.Broadcast()
}
}()
log.WithField("addr", target.Addr).
Info("serve proxy interface")

return nil
}

func (instance *Lingress) createTlsConfig() (*tls.Config, error) {
fail := func(err error) (*tls.Config, error) {
return nil, errors.Wrap(err, "cannot create TLS config")
Expand Down Expand Up @@ -298,15 +232,8 @@ func (instance *Lingress) resolveCertificate(info *tls.ClientHelloInfo) (*tls.Ce

func (instance *Lingress) shutdownListener(stop support.Channel) {
stop.Wait()
ctx, _ := context.WithTimeout(context.Background(), 5*time.Minute)
if err := instance.http.Shutdown(ctx); err != nil {
log.WithError(err).
Warnf("cannot graceful shutdown proxy interface %s", instance.http.Addr)
}
if err := instance.https.Shutdown(ctx); err != nil {
log.WithError(err).
Warnf("cannot graceful shutdown proxy interface %s", instance.http.Addr)
}
instance.Http.Shutdown()
instance.Https.Shutdown()
}

func (instance *Lingress) accessLogQueueWorker(stop support.Channel, queue chan accessLogEntry) {
Expand Down
9 changes: 5 additions & 4 deletions management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
lctx "github.com/echocat/lingress/context"
"github.com/echocat/lingress/rules"
"github.com/echocat/lingress/server"
"github.com/echocat/lingress/support"
log "github.com/sirupsen/logrus"
"net"
Expand All @@ -20,9 +21,9 @@ type Management struct {
rules rules.Repository
}

func New(rulesRepository rules.Repository) (*Management, error) {
func New(connectorIds []server.ConnectorId, rulesRepository rules.Repository) (*Management, error) {
result := &Management{
Metrics: NewMetrics(rulesRepository),
Metrics: NewMetrics(connectorIds, rulesRepository),
rules: rulesRepository,
server: http.Server{
Addr: ":8090",
Expand Down Expand Up @@ -69,8 +70,8 @@ func (instance *Management) CollectContext(ctx *lctx.Context) {
instance.Metrics.CollectContext(ctx)
}

func (instance *Management) CollectClientStarted() func() {
return instance.Metrics.CollectClientStarted()
func (instance *Management) CollectClientStarted(connector server.ConnectorId) func() {
return instance.Metrics.CollectClientStarted(connector)
}

func (instance *Management) CollectUpstreamStarted() func() {
Expand Down
Loading

0 comments on commit 5eea351

Please sign in to comment.