diff --git a/dataclients/kubernetes/clusterclient.go b/dataclients/kubernetes/clusterclient.go index aed07eec6d..b878ad749d 100644 --- a/dataclients/kubernetes/clusterclient.go +++ b/dataclients/kubernetes/clusterclient.go @@ -116,9 +116,11 @@ func buildHTTPClient(certFilePath string, inCluster bool, quit <-chan struct{}) // regularly force closing idle connections go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() for { select { - case <-time.After(10 * time.Second): + case <-ticker.C: transport.CloseIdleConnections() case <-quit: return diff --git a/dataclients/kubernetes/main_test.go b/dataclients/kubernetes/main_test.go new file mode 100644 index 0000000000..1d0a625286 --- /dev/null +++ b/dataclients/kubernetes/main_test.go @@ -0,0 +1,12 @@ +package kubernetes_test + +import ( + "os" + "testing" + + "github.com/AlexanderYastrebov/noleak" +) + +func TestMain(m *testing.M) { + os.Exit(noleak.CheckMain(m)) +} diff --git a/eskip/eskip_test.go b/eskip/eskip_test.go index 2a6ad2f1db..143a024dd6 100644 --- a/eskip/eskip_test.go +++ b/eskip/eskip_test.go @@ -322,13 +322,15 @@ func TestParseFilters(t *testing.T) { []*Filter{{Name: "filter1", Args: []interface{}{3.14}}, {Name: "filter2", Args: []interface{}{"key", float64(42)}}}, false, }} { - fs, err := ParseFilters(ti.expression) - if err == nil && ti.err || err != nil && !ti.err { - t.Error(ti.msg, "failure case", err, ti.err) - return - } + t.Run(ti.msg, func(t *testing.T) { + fs, err := ParseFilters(ti.expression) + if err == nil && ti.err || err != nil && !ti.err { + t.Error(ti.msg, "failure case", err, ti.err) + return + } - checkFilters(t, ti.msg, fs, ti.check) + checkFilters(t, ti.msg, fs, ti.check) + }) } } diff --git a/eskip/main_test.go b/eskip/main_test.go new file mode 100644 index 0000000000..a9fe05a632 --- /dev/null +++ b/eskip/main_test.go @@ -0,0 +1,12 @@ +package eskip_test + +import ( + "os" + "testing" + + "github.com/AlexanderYastrebov/noleak" +) + +func TestMain(m *testing.M) { + os.Exit(noleak.CheckMain(m)) +} diff --git a/eskipfile/remote.go b/eskipfile/remote.go index 1d8b9a5a12..216f5d83a0 100644 --- a/eskipfile/remote.go +++ b/eskipfile/remote.go @@ -5,6 +5,7 @@ import ( "io" "os" "strings" + "sync" "time" "github.com/zalando/skipper/eskip" @@ -15,6 +16,7 @@ import ( ) type remoteEskipFile struct { + once sync.Once preloaded bool remotePath string localPath string @@ -55,6 +57,7 @@ func RemoteWatch(o *RemoteWatchOptions) (routing.DataClient, error) { } dataClient := &remoteEskipFile{ + once: sync.Once{}, remotePath: o.RemoteFile, localPath: tempFilename.Name(), threshold: o.Threshold, @@ -135,6 +138,13 @@ func (client *remoteEskipFile) LoadUpdate() ([]*eskip.Route, []string, error) { return newRoutes, deletedRoutes, err } +func (client *remoteEskipFile) Close() { + client.once.Do(func() { + client.http.Close() + client.eskipFileClient.Close() + }) +} + func isFileRemote(remotePath string) bool { return strings.HasPrefix(remotePath, "http://") || strings.HasPrefix(remotePath, "https://") } diff --git a/eskipfile/remote_test.go b/eskipfile/remote_test.go index ccbdd93ee6..3cf45257bc 100644 --- a/eskipfile/remote_test.go +++ b/eskipfile/remote_test.go @@ -90,6 +90,10 @@ func TestLoadAll(t *testing.T) { t.Run(test.title, func(t *testing.T) { options := &RemoteWatchOptions{RemoteFile: s.URL, Threshold: 10, Verbose: true, FailOnStartup: true} client, err := RemoteWatch(options) + if err == nil { + defer client.(*remoteEskipFile).Close() + } + if err == nil && test.fail { t.Error("failed to fail") return diff --git a/eskipfile/watch.go b/eskipfile/watch.go index 683bd52bb3..a86cf55924 100644 --- a/eskipfile/watch.go +++ b/eskipfile/watch.go @@ -3,6 +3,7 @@ package eskipfile import ( "os" "reflect" + "sync" "github.com/zalando/skipper/eskip" ) @@ -21,6 +22,7 @@ type WatchClient struct { getAll chan (chan<- watchResponse) getUpdates chan (chan<- watchResponse) quit chan struct{} + once sync.Once } // Watch creates a route configuration client with file watching. Watch doesn't follow file system nodes, it @@ -31,6 +33,7 @@ func Watch(name string) *WatchClient { getAll: make(chan (chan<- watchResponse)), getUpdates: make(chan (chan<- watchResponse)), quit: make(chan struct{}), + once: sync.Once{}, } go c.watch() @@ -157,5 +160,7 @@ func (c *WatchClient) LoadUpdate() ([]*eskip.Route, []string, error) { // Close stops watching the configured file and providing updates. func (c *WatchClient) Close() { - close(c.quit) + c.once.Do(func() { + close(c.quit) + }) } diff --git a/filters/diag/diag_test.go b/filters/diag/diag_test.go index 824d094f15..089d139d32 100644 --- a/filters/diag/diag_test.go +++ b/filters/diag/diag_test.go @@ -860,7 +860,7 @@ func TestRequestLatency(t *testing.T) { spec: NewUniformRequestLatency, args: []interface{}{"10ms", "5ms"}, p10: 6 * time.Millisecond, - p25: 9 * time.Millisecond, + p25: 8 * time.Millisecond, p50: 11 * time.Millisecond, p75: 13 * time.Millisecond, p90: 14 * time.Millisecond, @@ -939,7 +939,7 @@ func TestResponseLatency(t *testing.T) { spec: NewUniformResponseLatency, args: []interface{}{"10ms", "5ms"}, p10: 7 * time.Millisecond, - p25: 9 * time.Millisecond, + p25: 8 * time.Millisecond, p50: 11 * time.Millisecond, p75: 13 * time.Millisecond, p90: 14 * time.Millisecond, diff --git a/filters/serve/serve_test.go b/filters/serve/serve_test.go index e0a735d660..9ec6ccd61f 100644 --- a/filters/serve/serve_test.go +++ b/filters/serve/serve_test.go @@ -121,6 +121,7 @@ func TestServe(t *testing.T) { if err != nil || string(b) != strings.Join(parts, "") { t.Error("failed to serve body") } + ctx.Response().Body.Close() } func TestStreamBody(t *testing.T) { diff --git a/go.mod b/go.mod index 58286e3ccb..bc18c4eebf 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/zalando/skipper require ( + github.com/AlexanderYastrebov/noleak v0.0.0-20221130200240-b1c4bed70a32 github.com/MicahParks/keyfunc v1.0.1 github.com/abbot/go-http-auth v0.4.0 github.com/andybalholm/brotli v1.0.4 @@ -32,7 +33,7 @@ require ( github.com/sarslanhan/cronmask v0.0.0-20190709075623-766eca24d011 github.com/sirupsen/logrus v1.8.1 github.com/sony/gobreaker v0.5.0 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.8.0 github.com/szuecs/rate-limit-buffer v0.7.1 github.com/testcontainers/testcontainers-go v0.12.0 github.com/tidwall/gjson v1.12.1 diff --git a/go.sum b/go.sum index 2e9074bc31..99ef11caf7 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/AlexanderYastrebov/noleak v0.0.0-20221130200240-b1c4bed70a32 h1:fbuCcKeKtE+xCio25fUN0nNXg7uTsm61mrr00w2p8S0= +github.com/AlexanderYastrebov/noleak v0.0.0-20221130200240-b1c4bed70a32/go.mod h1:Ac8KyJXsCfx2Gb9h/Eb6SUYk2tQ9At1ICaBm/1mipJQ= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= @@ -797,16 +799,19 @@ github.com/streadway/quantile v0.0.0-20220407130108-4246515d968d/go.mod h1:lbP8t github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= diff --git a/net/httpclient.go b/net/httpclient.go index bcc4662529..7e67906dac 100644 --- a/net/httpclient.go +++ b/net/httpclient.go @@ -7,6 +7,7 @@ import ( "net/http/httptrace" "net/url" "strings" + "sync" "time" "github.com/opentracing/opentracing-go" @@ -24,6 +25,7 @@ const ( // opentracing to the wrapped http.Client with the same interface as // http.Client from the stdlib. type Client struct { + once sync.Once client http.Client tr *Transport log logging.Logger @@ -58,6 +60,7 @@ func NewClient(o Options) *Client { } c := &Client{ + once: sync.Once{}, client: http.Client{ Transport: tr, }, @@ -70,10 +73,12 @@ func NewClient(o Options) *Client { } func (c *Client) Close() { - c.tr.Close() - if c.sr != nil { - c.sr.Close() - } + c.once.Do(func() { + c.tr.Close() + if c.sr != nil { + c.sr.Close() + } + }) } func (c *Client) Head(url string) (*http.Response, error) { @@ -202,8 +207,8 @@ type Options struct { // Transport wraps an http.Transport and adds support for tracing and // bearerToken injection. type Transport struct { + once sync.Once quit chan struct{} - closed bool tr *http.Transport tracer opentracing.Tracer spanName string @@ -265,6 +270,7 @@ func NewTransport(options Options) *Transport { } t := &Transport{ + once: sync.Once{}, quit: make(chan struct{}), tr: htransport, tracer: options.Tracer, @@ -280,9 +286,11 @@ func NewTransport(options Options) *Transport { } go func() { + ticker := time.NewTicker(options.IdleConnTimeout) + defer ticker.Stop() for { select { - case <-time.After(options.IdleConnTimeout): + case <-ticker.C: htransport.CloseIdleConnections() case <-t.quit: return @@ -320,15 +328,21 @@ func WithBearerToken(t *Transport, bearerToken string) *Transport { } func (t *Transport) shallowCopy() *Transport { - tt := *t - return &tt + return &Transport{ + once: sync.Once{}, + quit: t.quit, + tr: t.tr, + tracer: t.tracer, + spanName: t.spanName, + componentName: t.componentName, + bearerToken: t.bearerToken, + } } func (t *Transport) Close() { - if !t.closed { - t.closed = true + t.once.Do(func() { close(t.quit) - } + }) } func (t *Transport) CloseIdleConnections() { diff --git a/net/httpclient_test.go b/net/httpclient_test.go index 1784498458..1870b6b7fd 100644 --- a/net/httpclient_test.go +++ b/net/httpclient_test.go @@ -36,6 +36,7 @@ func TestClient(t *testing.T) { if err != nil { t.Fatalf("Failed to get a tracer: %v", err) } + defer tracer.Close() for _, tt := range []struct { name string @@ -157,6 +158,9 @@ func TestClient(t *testing.T) { } cli := NewClient(tt.options) + if cli == nil { + t.Fatal("NewClient returned nil") + } defer cli.Close() u := "http://" + s.Listener.Addr().String() + "/" @@ -201,6 +205,7 @@ func TestTransport(t *testing.T) { if err != nil { t.Fatalf("Failed to get a tracer: %v", err) } + defer tracer.Close() for _, tt := range []struct { name string diff --git a/net/main_test.go b/net/main_test.go new file mode 100644 index 0000000000..5fb6047c72 --- /dev/null +++ b/net/main_test.go @@ -0,0 +1,12 @@ +package net_test + +import ( + "os" + "testing" + + "github.com/AlexanderYastrebov/noleak" +) + +func TestMain(m *testing.M) { + os.Exit(noleak.CheckMain(m)) +} diff --git a/net/redisclient.go b/net/redisclient.go index cdb7a384ce..09c83eafb7 100644 --- a/net/redisclient.go +++ b/net/redisclient.go @@ -287,12 +287,15 @@ func (r *RedisRingClient) startUpdater(ctx context.Context) { } r.log.Infof("Start goroutine to update redis instances every %s", r.options.UpdateInterval) + defer r.log.Info("Stopped goroutine to update redis") + ticker := time.NewTicker(r.options.UpdateInterval) + defer ticker.Stop() for { select { case <-r.quit: return - case <-time.After(r.options.UpdateInterval): + case <-ticker.C: } addrs := r.options.AddrUpdater() @@ -324,9 +327,12 @@ func (r *RedisRingClient) RingAvailable() bool { func (r *RedisRingClient) StartMetricsCollection() { go func() { + ticker := time.NewTicker(r.options.ConnMetricsInterval) + defer ticker.Stop() + for { select { - case <-time.After(r.options.ConnMetricsInterval): + case <-ticker.C: stats := r.ring.PoolStats() // counter values r.metrics.UpdateGauge(r.metricsPrefix+"hits", float64(stats.Hits)) @@ -353,6 +359,7 @@ func (r *RedisRingClient) Close() { r.once.Do(func() { r.closed = true close(r.quit) + r.ring.Close() }) } diff --git a/net/redisclient_test.go b/net/redisclient_test.go index e648a40f58..de74f14669 100644 --- a/net/redisclient_test.go +++ b/net/redisclient_test.go @@ -89,6 +89,7 @@ func TestRedisClient(t *testing.T) { if err != nil { t.Fatalf("Failed to get a tracer: %v", err) } + defer tracer.Close() redisAddr, done := redistest.NewTestRedis(t) defer done() @@ -110,17 +111,7 @@ func TestRedisClient(t *testing.T) { { name: "With AddrUpdater", options: &RedisOptions{ - AddrUpdater: func() []string { return []string{redisAddr} }, - // i := 0 - // return func() []string { - // i++ - // if i < 2 { - // return []string{redisAddr} - // } - // return []string{redisAddr, redisAddr2} - // }() - - // }, + AddrUpdater: func() []string { return []string{redisAddr} }, UpdateInterval: 10 * time.Millisecond, }, wantErr: false, @@ -156,7 +147,9 @@ func TestRedisClient(t *testing.T) { } } - go func() { <-ch }() // create client will block + if ch != nil { + go func() { <-ch }() // create client will block + } cli := NewRedisRingClient(tt.options) defer func() { if !cli.closed { @@ -1059,6 +1052,7 @@ func TestRedisClientSetAddr(t *testing.T) { } { t.Run(tt.name, func(t *testing.T) { r := NewRedisRingClient(tt.options) + defer r.Close() for i := 0; i < len(tt.keys); i++ { r.Set(context.Background(), tt.keys[i], tt.vals[i], time.Second) } diff --git a/net/redistest/main_test.go b/net/redistest/main_test.go new file mode 100644 index 0000000000..122227984e --- /dev/null +++ b/net/redistest/main_test.go @@ -0,0 +1,12 @@ +package redistest_test + +import ( + "os" + "testing" + + "github.com/AlexanderYastrebov/noleak" +) + +func TestMain(m *testing.M) { + os.Exit(noleak.CheckMain(m)) +} diff --git a/net/redistest/redistest_test.go b/net/redistest/redistest_test.go new file mode 100644 index 0000000000..94c6f4dba9 --- /dev/null +++ b/net/redistest/redistest_test.go @@ -0,0 +1,17 @@ +package redistest + +import ( + "context" + "testing" + "time" +) + +func TestRedistest(t *testing.T) { + r, done := NewTestRedis(t) + defer done() + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if err := ping(ctx, r, ""); err != nil { + t.Fatalf("Failed to ping redis: %v", err) + } +} diff --git a/proxy/proxy.go b/proxy/proxy.go index b7d5072f96..b898cca648 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -648,9 +648,11 @@ func WithParams(p Params) *Proxy { // https://github.com/golang/go/issues/23427 if p.CloseIdleConnsPeriod > 0 { go func() { + ticker := time.NewTicker(p.CloseIdleConnsPeriod) + defer ticker.Stop() for { select { - case <-time.After(p.CloseIdleConnsPeriod): + case <-ticker.C: tr.CloseIdleConnections() case <-quit: return diff --git a/routesrv/polling.go b/routesrv/polling.go index 4eef7cc8e7..26a1b75f9e 100644 --- a/routesrv/polling.go +++ b/routesrv/polling.go @@ -70,7 +70,10 @@ func (p *poller) poll(wg *sync.WaitGroup) { ) log.WithField("timeout", p.timeout).Info(LogPollingStarted) + ticker := time.NewTicker(p.timeout) + defer ticker.Stop() pollingStarted.SetToCurrentTime() + for { span := tracing.CreateSpan("poll_routes", context.TODO(), p.tracer) @@ -118,7 +121,7 @@ func (p *poller) poll(wg *sync.WaitGroup) { case <-p.quit: log.Info(LogPollingStopped) return - case <-time.After(p.timeout): + case <-ticker.C: } } } diff --git a/routesrv/routesrv.go b/routesrv/routesrv.go index ecc71cacf4..d1306d6c9f 100644 --- a/routesrv/routesrv.go +++ b/routesrv/routesrv.go @@ -128,7 +128,7 @@ func (rs *RouteServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func newShutdownFunc(rs *RouteServer) func(delay time.Duration) { - once := &sync.Once{} + once := sync.Once{} rs.wg.Add(1) return func(delay time.Duration) { diff --git a/routing/datasource.go b/routing/datasource.go index 542554a978..9b679df4ac 100644 --- a/routing/datasource.go +++ b/routing/datasource.go @@ -66,6 +66,13 @@ func (d *incomingData) log(l logging.Logger, suppress bool) { // undeterministic way, but this may change in the future. func receiveFromClient(c DataClient, o Options, out chan<- *incomingData, quit <-chan struct{}) { initial := true + var ticker *time.Ticker + if o.PollTimeout != 0 { + ticker = time.NewTicker(o.PollTimeout) + } else { + ticker = time.NewTicker(time.Millisecond) + } + defer ticker.Stop() for { var ( routes []*eskip.Route @@ -73,8 +80,6 @@ func receiveFromClient(c DataClient, o Options, out chan<- *incomingData, quit < err error ) - to := o.PollTimeout - if initial { routes, err = c.LoadAll() } else { @@ -87,7 +92,7 @@ func receiveFromClient(c DataClient, o Options, out chan<- *incomingData, quit < case err != nil: o.Log.Error("error while receiving update;", err) initial = true - to = 0 + continue case initial || len(routes) > 0 || len(deletedIDs) > 0: var incoming *incomingData if initial { @@ -105,7 +110,7 @@ func receiveFromClient(c DataClient, o Options, out chan<- *incomingData, quit < } select { - case <-time.After(to): + case <-ticker.C: case <-quit: return } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 67b22fad42..2fef163b7f 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -651,9 +651,11 @@ func (r *Registry) measure() { r.measuring = true go func() { + ticker := time.NewTicker(r.options.MetricsUpdateTimeout) + defer ticker.Stop() for { select { - case <-time.After(r.options.MetricsUpdateTimeout): + case <-ticker.C: r.updateMetrics() case <-r.quit: return diff --git a/secrets/file.go b/secrets/file.go index 42244a191c..a245099958 100644 --- a/secrets/file.go +++ b/secrets/file.go @@ -142,33 +142,39 @@ func (sp *SecretPaths) registerSecretFile(p string) error { // runRefresher refreshes all secrets, that are registered func (sp *SecretPaths) runRefresher() { - log.Infof("Run secrets path refresher every %s, but update once first", sp.refreshInterval) - var d time.Duration + sp.refresh() + + ticker := time.NewTicker(sp.refreshInterval) + defer ticker.Stop() + for { select { - case <-time.After(d): - sp.secrets.Range(func(k, _ interface{}) bool { - f, ok := k.(string) - if !ok { - log.Errorf("Failed to convert k '%v' to string", k) - return true - } - sec, err := os.ReadFile(f) - if err != nil { - log.Errorf("Failed to read file (%s): %v", f, err) - return true - } - sp.updateSecret(f, sec) - return true - }) + case <-ticker.C: + sp.refresh() case <-sp.quit: log.Infoln("Stop secrets background refresher") return } - d = sp.refreshInterval } } +func (sp *SecretPaths) refresh() { + sp.secrets.Range(func(k, _ interface{}) bool { + f, ok := k.(string) + if !ok { + log.Errorf("Failed to convert k '%v' to string", k) + return true + } + sec, err := os.ReadFile(f) + if err != nil { + log.Errorf("Failed to read file (%s): %v", f, err) + return true + } + sp.updateSecret(f, sec) + return true + }) +} + func (sp *SecretPaths) Close() { if sp != nil { close(sp.quit) diff --git a/skptesting/gohttpclient.go b/skptesting/gohttpclient.go index 1fe45ffa00..f90f8c70c1 100644 --- a/skptesting/gohttpclient.go +++ b/skptesting/gohttpclient.go @@ -36,9 +36,11 @@ func main() { request.Host = "test.example.org" go func(req *http.Request, q chan struct{}) { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() for { select { - case <-time.After(100 * time.Millisecond): + case <-ticker.C: resp, err2 := tr.RoundTrip(req) if err2 != nil { logrus.Errorf("Failed to roundtrip: %v", err2) diff --git a/swarm/kubernetes.go b/swarm/kubernetes.go index 7dcdfd542c..e9ba1e3187 100644 --- a/swarm/kubernetes.go +++ b/swarm/kubernetes.go @@ -152,9 +152,11 @@ func buildHTTPClient(certFilePath string, inCluster bool, quit chan struct{}) (* } // regularly force closing idle connections go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() for { select { - case <-time.After(10 * time.Second): + case <-ticker.C: transport.CloseIdleConnections() case <-quit: return diff --git a/tracing/tracers/basic/basic.go b/tracing/tracers/basic/basic.go index e5a9f15f45..3bfafbf34e 100644 --- a/tracing/tracers/basic/basic.go +++ b/tracing/tracers/basic/basic.go @@ -4,13 +4,25 @@ import ( "fmt" "strconv" "strings" + "sync" "time" basic "github.com/opentracing/basictracer-go" opentracing "github.com/opentracing/opentracing-go" ) -func InitTracer(opts []string) (opentracing.Tracer, error) { +type CloseableTracer interface { + opentracing.Tracer + Close() +} + +type basicTracer struct { + opentracing.Tracer + quit chan struct{} + once sync.Once +} + +func InitTracer(opts []string) (CloseableTracer, error) { fmt.Printf("DO NOT USE IN PRODUCTION\n") var ( dropAllLogs bool @@ -56,7 +68,23 @@ func InitTracer(opts []string) (opentracing.Tracer, error) { } } } + + quit := make(chan struct{}) + bt := &basicTracer{ + basic.NewWithOptions(basic.Options{ + DropAllLogs: dropAllLogs, + ShouldSample: func(traceID uint64) bool { return traceID%sampleModulo == 0 }, + MaxLogsPerSpan: maxLogsPerSpan, + Recorder: recorder, + }), + quit, + sync.Once{}, + } + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { rec := recorder.(*basic.InMemorySpanRecorder) spans := rec.GetSampledSpans() @@ -65,16 +93,17 @@ func InitTracer(opts []string) (opentracing.Tracer, error) { for _, span := range spans { fmt.Printf("SAMPLED=%#v\n", span) } - time.Sleep(1 * time.Second) + + select { + case <-ticker.C: + case <-quit: + return + } + } }() - return basic.NewWithOptions(basic.Options{ - DropAllLogs: dropAllLogs, - ShouldSample: func(traceID uint64) bool { return traceID%sampleModulo == 0 }, - MaxLogsPerSpan: maxLogsPerSpan, - Recorder: recorder, - }), nil + return bt, nil } func missingArg(opt string) error { @@ -84,3 +113,9 @@ func missingArg(opt string) error { func invalidArg(opt string, err error) error { return fmt.Errorf("invalid argument for %s option: %s", opt, err) } + +func (bt *basicTracer) Close() { + bt.once.Do(func() { + close(bt.quit) + }) +}