From d7b412db6b4222ba21fa3139b62f34155b172bc1 Mon Sep 17 00:00:00 2001 From: Gregor Noczinski Date: Mon, 24 Aug 2020 16:26:03 +0200 Subject: [PATCH] Reusing of buffers and optimizing memory usage. --- lingress.go | 2 +- proxy/proxy.go | 46 ++++++++++++++++++++++++++-------------- server/connector_http.go | 2 +- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/lingress.go b/lingress.go index cd030d0..f41469f 100644 --- a/lingress.go +++ b/lingress.go @@ -66,7 +66,7 @@ func New(fps support.FileProviders) (*Lingress, error) { } result.Http.Handler = result - result.Http.MaxConnections = 512 + result.Http.MaxConnections = 256 result.Https.Handler = result result.Https.Server.Addr = ":8443" diff --git a/proxy/proxy.go b/proxy/proxy.go index 5735484..bd1d077 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -16,6 +16,7 @@ import ( "net/url" "runtime/debug" "strings" + "sync" "time" ) @@ -32,6 +33,8 @@ type Proxy struct { AccessLogger AccessLogger Interceptors Interceptors MetricsCollector lctx.MetricsCollector + + bufferPool sync.Pool } type AccessLogger func(*lctx.Context) @@ -51,12 +54,14 @@ func New(rules rules.Repository) (*Proxy, error) { RootCAs: support.Pool, }, } - return &Proxy{ + result := &Proxy{ Dialer: dialer, Transport: transport, RulesRepository: rules, Interceptors: DefaultInterceptors.Clone(), - }, nil + } + result.bufferPool.New = result.createBuffer + return result, nil } func (instance *Proxy) RegisterFlag(fe support.FlagEnabled, appPrefix string) error { @@ -155,6 +160,9 @@ func (instance *Proxy) ServeHTTP(connector server.Connector, resp http.ResponseW Host: ctx.Client.Host(), Path: ctx.Client.Request.RequestURI, } + if u := ctx.Client.Request.URL; u != nil { + query.Path = u.Path + } ctx.Stage = lctx.StageEvaluateClientRequest rs, err := instance.RulesRepository.FindBy(query) @@ -367,7 +375,7 @@ func (instance *Proxy) execute(ctx *lctx.Context) error { return nil } - err = instance.copyResponse(ctx.Client.Response, ctx.Upstream.Response.Body) + _, err = instance.copyBuffered(ctx.Client.Response, ctx.Upstream.Response.Body) if err != nil { //noinspection GoUnhandledErrorResult defer ctx.Upstream.Response.Body.Close() @@ -446,27 +454,20 @@ func (instance *Proxy) handleUpgradeResponse(rw http.ResponseWriter, req *http.R return nil } -func (instance *Proxy) copyResponse(dst io.Writer, src io.Reader) error { - var buf []byte - _, err := instance.copyBuffer(dst, src, buf) - return err -} - -// copyBuffer returns any write errors or non-EOF read errors, and the amount +// copyBuffered returns any write errors or non-EOF read errors, and the amount // of bytes written. -func (instance *Proxy) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, error) { - if len(buf) == 0 { - buf = make([]byte, 32*1024) - } +func (instance *Proxy) copyBuffered(dst io.Writer, src io.Reader) (int64, error) { + buf := instance.acquireBuffer() + defer instance.releaseBuffer(buf) var written int64 for { - nr, rErr := src.Read(buf) + nr, rErr := src.Read(*buf) if rErr != nil && rErr != io.EOF && rErr != context.Canceled { log.WithError(rErr). Warn("read error during body copy") } if nr > 0 { - nw, wErr := dst.Write(buf[:nr]) + nw, wErr := dst.Write((*buf)[:nr]) if nw > 0 { written += int64(nw) } @@ -485,3 +486,16 @@ func (instance *Proxy) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int } } } + +func (instance *Proxy) createBuffer() interface{} { + result := make([]byte, 32*1024) + return &result +} + +func (instance *Proxy) acquireBuffer() *[]byte { + return instance.bufferPool.Get().(*[]byte) +} + +func (instance *Proxy) releaseBuffer(buf *[]byte) { + instance.bufferPool.Put(buf) +} diff --git a/server/connector_http.go b/server/connector_http.go index aa158e8..11878a8 100644 --- a/server/connector_http.go +++ b/server/connector_http.go @@ -23,7 +23,7 @@ type HttpConnector struct { func NewHttpConnector(id ConnectorId) (*HttpConnector, error) { result := HttpConnector{ Id: id, - MaxConnections: 1024, + MaxConnections: 512, Server: http.Server{ Addr: ":8080",