diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 92b76ed4..0704c2c8 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -3,6 +3,7 @@ package main import ( "errors" "fmt" + "math/rand" gohttp "net/http" goos "os" "os/signal" @@ -43,6 +44,8 @@ func init() { func main() { // Generic + rand.Seed(time.Now().UnixNano()) + options, err := parseOptions() if err != nil { log.Fatal().Err(err).Msg("invalid agent configuration") diff --git a/edge/client/portainer_edge_client.go b/edge/client/portainer_edge_client.go index 2f5009eb..de25f7f7 100644 --- a/edge/client/portainer_edge_client.go +++ b/edge/client/portainer_edge_client.go @@ -6,12 +6,14 @@ import ( "fmt" "net/http" "strconv" + "strings" "time" "github.com/pkg/errors" "github.com/portainer/agent" portainer "github.com/portainer/portainer/api" + lru "github.com/hashicorp/golang-lru" "github.com/rs/zerolog/log" ) @@ -23,6 +25,7 @@ type PortainerEdgeClient struct { getEndpointIDFn getEndpointIDFn edgeID string agentPlatform agent.ContainerPlatform + reqCache *lru.Cache } type globalKeyResponse struct { @@ -31,7 +34,7 @@ type globalKeyResponse struct { // NewPortainerEdgeClient returns a pointer to a new PortainerEdgeClient instance func NewPortainerEdgeClient(serverAddress string, setEIDFn setEndpointIDFn, getEIDFn getEndpointIDFn, edgeID string, agentPlatform agent.ContainerPlatform, httpClient *http.Client) *PortainerEdgeClient { - return &PortainerEdgeClient{ + c := &PortainerEdgeClient{ serverAddress: serverAddress, setEndpointIDFn: setEIDFn, getEndpointIDFn: getEIDFn, @@ -39,6 +42,15 @@ func NewPortainerEdgeClient(serverAddress string, setEIDFn setEndpointIDFn, getE agentPlatform: agentPlatform, httpClient: httpClient, } + + cache, err := lru.New(8) + if err == nil { + c.reqCache = cache + } else { + log.Printf("[WARN] [edge] [message: Could not initialize the cache: %s]", err) + } + + return c } func (client *PortainerEdgeClient) SetTimeout(t time.Duration) { @@ -85,6 +97,7 @@ func (client *PortainerEdgeClient) GetEnvironmentStatus(flags ...string) (*PollS req.Header.Set(agent.HTTPResponseAgentHeaderName, agent.Version) req.Header.Set(agent.HTTPEdgeIdentifierHeaderName, client.edgeID) req.Header.Set(agent.HTTPResponseAgentPlatform, strconv.Itoa(int(client.agentPlatform))) + req.Header.Set("If-None-Match", client.cacheHeaders()) log.Debug().Int("header", int(client.agentPlatform)).Msg("sending agent platform header") @@ -94,6 +107,11 @@ func (client *PortainerEdgeClient) GetEnvironmentStatus(flags ...string) (*PollS } defer resp.Body.Close() + cachedResp, ok := client.cachedResponse(resp) + if ok { + return cachedResp, nil + } + if resp.StatusCode != http.StatusOK { log.Debug().Int("response_code", resp.StatusCode).Msg("poll request failure]") @@ -108,6 +126,8 @@ func (client *PortainerEdgeClient) GetEnvironmentStatus(flags ...string) (*PollS return nil, err } + client.cacheResponse(resp.Header.Get("ETag"), &responseData) + return &responseData, nil } @@ -263,3 +283,40 @@ func (client *PortainerEdgeClient) SetLastCommandTimestamp(timestamp time.Time) func (client *PortainerEdgeClient) EnqueueLogCollectionForStack(logCmd LogCommandData) error { return nil } + +func (client *PortainerEdgeClient) cacheHeaders() string { + if client.reqCache == nil { + return "" + } + + ks := client.reqCache.Keys() + + var strKs []string + for _, k := range ks { + strKs = append(strKs, k.(string)) + } + + return strings.Join(strKs, ",") +} + +func (client *PortainerEdgeClient) cachedResponse(r *http.Response) (*PollStatusResponse, bool) { + etag := r.Header.Get("ETag") + + if client.reqCache == nil || r.StatusCode != http.StatusNotModified || etag == "" { + return nil, false + } + + if resp, ok := client.reqCache.Get(etag); ok { + return resp.(*PollStatusResponse), true + } + + return nil, false +} + +func (client *PortainerEdgeClient) cacheResponse(etag string, resp *PollStatusResponse) { + if client.reqCache == nil || etag == "" { + return + } + + client.reqCache.Add(etag, resp) +} diff --git a/edge/edge.go b/edge/edge.go index d6253863..8d9e962c 100644 --- a/edge/edge.go +++ b/edge/edge.go @@ -3,6 +3,7 @@ package edge import ( "errors" "fmt" + "math/rand" "sync" "time" @@ -156,8 +157,9 @@ func (manager *Manager) startEdgeBackgroundProcessOnDocker(runtimeCheckFrequency } go func() { - ticker := time.NewTicker(runtimeCheckFrequency) - for range ticker.C { + for { + // Jitter + time.Sleep(5*time.Minute + time.Duration(rand.Float32()*5*float32(time.Minute))) err := manager.checkDockerRuntimeConfig() if err != nil { log.Error().Msg("an error occurred during Docker runtime configuration check") diff --git a/edge/poll.go b/edge/poll.go index 9f11062f..84b721af 100644 --- a/edge/poll.go +++ b/edge/poll.go @@ -2,6 +2,7 @@ package edge import ( "encoding/base64" + "math/rand" "strconv" "time" @@ -134,12 +135,24 @@ func (service *PollService) startStatusPollLoop() { Str("server_url", service.portainerURL). Msg("starting Portainer short-polling client") + lastPollFailed := false for { select { case <-pollCh: + // Jitter + if lastPollFailed { + lastPollFailed = false + t := time.Duration(rand.Float64() * service.pollIntervalInSeconds * float64(time.Second)) + time.Sleep(t) + service.pollTicker.Reset(time.Duration(service.pollIntervalInSeconds) * time.Second) + } + err := service.poll() if err != nil { log.Error().Err(err).Msg("an error occured during short poll") + + lastPollFailed = true + service.pollTicker.Reset(time.Duration(service.pollIntervalInSeconds) * time.Second) } case <-service.startSignal: pollCh = service.pollTicker.C @@ -200,6 +213,8 @@ func (service *PollService) poll() error { environmentStatus, err := service.portainerClient.GetEnvironmentStatus() if err != nil { + service.edgeManager.SetEndpointID(0) + return err } diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go index ddacbd03..61d8f365 100644 --- a/healthcheck/healthcheck.go +++ b/healthcheck/healthcheck.go @@ -43,17 +43,19 @@ func Run(options *agent.Options, clusterService agent.ClusterService) error { } log.Printf("[DEBUG] [healthcheck] [message: Url reachable]") - err = checkPolling(decodedKey.PortainerInstanceURL, options) - if err != nil { - return err - } - log.Printf("[DEBUG] [healthcheck] [message: Portainer status check passed]") - - // We then check that the agent can establish a TCP connection to the Portainer instance tunnel server - err = checkTunnel(decodedKey.TunnelServerAddr) - if err != nil { - return err - } + /* + err = checkPolling(decodedKey.PortainerInstanceURL, options) + if err != nil { + return err + } + log.Printf("[DEBUG] [healthcheck] [message: Portainer status check passed]") + + // We then check that the agent can establish a TCP connection to the Portainer instance tunnel server + err = checkTunnel(decodedKey.TunnelServerAddr) + if err != nil { + return err + } + */ log.Printf("[DEBUG] [healthcheck] [message: Agent can open TCP connection to Portainer]")