Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement polling response caching. #353

Draft
wants to merge 24 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
226ace0
Implement polling response caching.
andres-portainer Aug 10, 2022
f14a46f
Fix cacheResponse().
andres-portainer Aug 10, 2022
af8b8f9
Add missing change.
andres-portainer Aug 10, 2022
1cb2397
Add jitter and backoff for polling.
andres-portainer Aug 13, 2022
da94377
Add missing imports.
andres-portainer Aug 13, 2022
03da406
Add jitter to the Docker runtime check.
andres-portainer Aug 14, 2022
7f853ae
Reduce the impact of GetRuntimeConfigurationFromDockerEngine().
andres-portainer Aug 15, 2022
712c569
Improve jitter.
andres-portainer Aug 16, 2022
68b24eb
Change jitter again.
andres-portainer Aug 16, 2022
5f1f4ed
Improve the jitter algorithm.
andres-portainer Aug 17, 2022
0a0fb75
Reset EndpointID if the polling request fails.
andres-portainer Sep 17, 2022
8b5cb86
Merge branch 'develop' into perf/polling-cache
andres-portainer Sep 17, 2022
9967a9d
feat(agent): add healthcheck option [EE-4044]
Aug 23, 2022
a350005
feat(build): add healthcheck to images
Aug 23, 2022
f2e2553
fix(agent): remove cert
Aug 24, 2022
57a5d1f
fix(edge): remove unused GetDecodedKey
Aug 24, 2022
742cfb4
fix(agent): remove key
Aug 24, 2022
1c37cd9
refactor(edge): remove client-api package
Aug 24, 2022
410d07a
feat(healthcheck): skip no edge key checks
Aug 24, 2022
237f94e
Merge branch 'develop' into perf/polling-cache
andres-portainer Sep 20, 2022
9317e66
Merge branch 'feat/EE-3023/EE-4044/healthcheck' into perf/polling-cache
andres-portainer Sep 21, 2022
4ba21a1
Disable healthcheck requests.
andres-portainer Sep 21, 2022
39856fc
Merge branch 'develop' into perf/polling-cache
andres-portainer Sep 21, 2022
d4a0de5
Merge branch 'develop' into perf/polling-cache
andres-portainer Oct 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"math/rand"
gohttp "net/http"
goos "os"
"os/signal"
Expand Down Expand Up @@ -43,6 +44,8 @@ func init() {
func main() {
// Generic

rand.Seed(time.Now().UnixNano())

options, err := parseOptions()
if err != nil {
log.Fatal().Err(err).Msg("invalid agent configuration")
Expand Down
59 changes: 58 additions & 1 deletion edge/client/portainer_edge_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"github.com/portainer/agent"
portainer "github.com/portainer/portainer/api"

lru "github.com/hashicorp/golang-lru"
"github.com/rs/zerolog/log"
)

Expand All @@ -23,6 +25,7 @@ type PortainerEdgeClient struct {
getEndpointIDFn getEndpointIDFn
edgeID string
agentPlatform agent.ContainerPlatform
reqCache *lru.Cache
}

type globalKeyResponse struct {
Expand All @@ -31,14 +34,23 @@ type globalKeyResponse struct {

// NewPortainerEdgeClient returns a pointer to a new PortainerEdgeClient instance
func NewPortainerEdgeClient(serverAddress string, setEIDFn setEndpointIDFn, getEIDFn getEndpointIDFn, edgeID string, agentPlatform agent.ContainerPlatform, httpClient *http.Client) *PortainerEdgeClient {
return &PortainerEdgeClient{
c := &PortainerEdgeClient{
serverAddress: serverAddress,
setEndpointIDFn: setEIDFn,
getEndpointIDFn: getEIDFn,
edgeID: edgeID,
agentPlatform: agentPlatform,
httpClient: httpClient,
}

cache, err := lru.New(8)
if err == nil {
c.reqCache = cache
} else {
log.Printf("[WARN] [edge] [message: Could not initialize the cache: %s]", err)
}

return c
}

func (client *PortainerEdgeClient) SetTimeout(t time.Duration) {
Expand Down Expand Up @@ -85,6 +97,7 @@ func (client *PortainerEdgeClient) GetEnvironmentStatus(flags ...string) (*PollS
req.Header.Set(agent.HTTPResponseAgentHeaderName, agent.Version)
req.Header.Set(agent.HTTPEdgeIdentifierHeaderName, client.edgeID)
req.Header.Set(agent.HTTPResponseAgentPlatform, strconv.Itoa(int(client.agentPlatform)))
req.Header.Set("If-None-Match", client.cacheHeaders())

log.Debug().Int("header", int(client.agentPlatform)).Msg("sending agent platform header")

Expand All @@ -94,6 +107,11 @@ func (client *PortainerEdgeClient) GetEnvironmentStatus(flags ...string) (*PollS
}
defer resp.Body.Close()

cachedResp, ok := client.cachedResponse(resp)
if ok {
return cachedResp, nil
}

if resp.StatusCode != http.StatusOK {
log.Debug().Int("response_code", resp.StatusCode).Msg("poll request failure]")

Expand All @@ -108,6 +126,8 @@ func (client *PortainerEdgeClient) GetEnvironmentStatus(flags ...string) (*PollS
return nil, err
}

client.cacheResponse(resp.Header.Get("ETag"), &responseData)

return &responseData, nil
}

Expand Down Expand Up @@ -263,3 +283,40 @@ func (client *PortainerEdgeClient) SetLastCommandTimestamp(timestamp time.Time)
func (client *PortainerEdgeClient) EnqueueLogCollectionForStack(logCmd LogCommandData) error {
return nil
}

func (client *PortainerEdgeClient) cacheHeaders() string {
if client.reqCache == nil {
return ""
}

ks := client.reqCache.Keys()

var strKs []string
for _, k := range ks {
strKs = append(strKs, k.(string))
}

return strings.Join(strKs, ",")
}

func (client *PortainerEdgeClient) cachedResponse(r *http.Response) (*PollStatusResponse, bool) {
etag := r.Header.Get("ETag")

if client.reqCache == nil || r.StatusCode != http.StatusNotModified || etag == "" {
return nil, false
}

if resp, ok := client.reqCache.Get(etag); ok {
return resp.(*PollStatusResponse), true
}

return nil, false
}

func (client *PortainerEdgeClient) cacheResponse(etag string, resp *PollStatusResponse) {
if client.reqCache == nil || etag == "" {
return
}

client.reqCache.Add(etag, resp)
}
6 changes: 4 additions & 2 deletions edge/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package edge
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -156,8 +157,9 @@ func (manager *Manager) startEdgeBackgroundProcessOnDocker(runtimeCheckFrequency
}

go func() {
ticker := time.NewTicker(runtimeCheckFrequency)
for range ticker.C {
for {
// Jitter
time.Sleep(5*time.Minute + time.Duration(rand.Float32()*5*float32(time.Minute)))
err := manager.checkDockerRuntimeConfig()
if err != nil {
log.Error().Msg("an error occurred during Docker runtime configuration check")
Expand Down
15 changes: 15 additions & 0 deletions edge/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package edge

import (
"encoding/base64"
"math/rand"
"strconv"
"time"

Expand Down Expand Up @@ -134,12 +135,24 @@ func (service *PollService) startStatusPollLoop() {
Str("server_url", service.portainerURL).
Msg("starting Portainer short-polling client")

lastPollFailed := false
for {
select {
case <-pollCh:
// Jitter
if lastPollFailed {
lastPollFailed = false
t := time.Duration(rand.Float64() * service.pollIntervalInSeconds * float64(time.Second))
time.Sleep(t)
service.pollTicker.Reset(time.Duration(service.pollIntervalInSeconds) * time.Second)
}

err := service.poll()
if err != nil {
log.Error().Err(err).Msg("an error occured during short poll")

lastPollFailed = true
service.pollTicker.Reset(time.Duration(service.pollIntervalInSeconds) * time.Second)
}
case <-service.startSignal:
pollCh = service.pollTicker.C
Expand Down Expand Up @@ -200,6 +213,8 @@ func (service *PollService) poll() error {

environmentStatus, err := service.portainerClient.GetEnvironmentStatus()
if err != nil {
service.edgeManager.SetEndpointID(0)

return err
}

Expand Down
24 changes: 13 additions & 11 deletions healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ func Run(options *agent.Options, clusterService agent.ClusterService) error {
}
log.Printf("[DEBUG] [healthcheck] [message: Url reachable]")

err = checkPolling(decodedKey.PortainerInstanceURL, options)
if err != nil {
return err
}
log.Printf("[DEBUG] [healthcheck] [message: Portainer status check passed]")

// We then check that the agent can establish a TCP connection to the Portainer instance tunnel server
err = checkTunnel(decodedKey.TunnelServerAddr)
if err != nil {
return err
}
/*
err = checkPolling(decodedKey.PortainerInstanceURL, options)
if err != nil {
return err
}
log.Printf("[DEBUG] [healthcheck] [message: Portainer status check passed]")

// We then check that the agent can establish a TCP connection to the Portainer instance tunnel server
err = checkTunnel(decodedKey.TunnelServerAddr)
if err != nil {
return err
}
*/

log.Printf("[DEBUG] [healthcheck] [message: Agent can open TCP connection to Portainer]")

Expand Down