Skip to content

Commit

Permalink
Add refresh cache logic to keep docker and sfx in sync (#1610)
Browse files Browse the repository at this point in the history
Signed-off-by: Dani Louca <[email protected]>
  • Loading branch information
dloucasfx authored Feb 19, 2021
1 parent 5008be1 commit 6269da6
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 54 deletions.
1 change: 1 addition & 0 deletions docs/monitors/docker-container-stats.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Configuration](../monitor-config.md#common-configuration).**
| `enableExtraNetworkMetrics` | no | `bool` | Whether it will send all extra network metrics as well. (**default:** `false`) |
| `dockerURL` | no | `string` | The URL of the docker server (**default:** `unix:///var/run/docker.sock`) |
| `timeoutSeconds` | no | `integer` | The maximum amount of time to wait for docker API requests (**default:** `5`) |
| `cacheSyncInterval` | no | `int64` | The time to wait before resyncing the list of containers the monitor maintains through the docker event listener example: cacheSyncInterval: "20m" (**default:** `60m`) |
| `labelsToDimensions` | no | `map of strings` | A mapping of container label names to dimension names. The corresponding label values will become the dimension value for the mapped name. E.g. `io.kubernetes.container.name: container_spec_name` would result in a dimension called `container_spec_name` that has the value of the `io.kubernetes.container.name` container label. |
| `envToDimensions` | no | `map of strings` | A mapping of container environment variable names to dimension names. The corresponding env var values become the dimension values on the emitted metrics. E.g. `APP_VERSION: version` would result in datapoints having a dimension called `version` whose value is the value of the `APP_VERSION` envvar configured for that particular container, if present. |
| `excludedImages` | no | `list of strings` | A list of filters of images to exclude. Supports literals, globs, and regex. |
Expand Down
1 change: 1 addition & 0 deletions docs/observers/docker.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Observer Type: `docker`
| Config option | Required | Type | Description |
| --- | --- | --- | --- |
| `dockerURL` | no | `string` | (**default:** `unix:///var/run/docker.sock`) |
| `cacheSyncInterval` | no | `int64` | The time to wait before resyncing the list of containers the monitor maintains through the docker event listener example: cacheSyncInterval: "20m" (**default:** `60m`) |
| `labelsToDimensions` | no | `map of strings` | A mapping of container label names to dimension names that will get applied to the metrics of all discovered services. The corresponding label values will become the dimension values for the mapped name. E.g. `io.kubernetes.container.name: container_spec_name` would result in a dimension called `container_spec_name` that has the value of the `io.kubernetes.container.name` container label. |
| `useHostnameIfPresent` | no | `bool` | If true, the "Config.Hostname" field (if present) of the docker container will be used as the discovered host that is used to configure monitors. If false or if no hostname is configured, the field `NetworkSettings.IPAddress` is used instead. (**default:** `false`) |
| `useHostBindings` | no | `bool` | If true, the observer will configure monitors for matching container endpoints using the host bound ip and port. This is useful if containers exist that are not accessible to an instance of the agent running outside of the docker network stack. (**default:** `false`) |
Expand Down
103 changes: 59 additions & 44 deletions pkg/core/common/docker/containerlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
dtypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
docker "github.com/docker/docker/client"
"github.com/signalfx/signalfx-agent/pkg/utils"
"github.com/signalfx/signalfx-agent/pkg/utils/filter"
log "github.com/sirupsen/logrus"
)
Expand All @@ -20,7 +19,7 @@ import (
type ContainerChangeHandler func(old *dtypes.ContainerJSON, new *dtypes.ContainerJSON)

// ListAndWatchContainers accepts a changeHandler that gets called as containers come and go.
func ListAndWatchContainers(ctx context.Context, client *docker.Client, changeHandler ContainerChangeHandler, imageFilter filter.StringFilter, logger log.FieldLogger) error {
func ListAndWatchContainers(ctx context.Context, client *docker.Client, changeHandler ContainerChangeHandler, imageFilter filter.StringFilter, logger log.FieldLogger, syncTime time.Duration) {
lock := sync.Mutex{}
containers := make(map[string]*dtypes.ContainerJSON)

Expand All @@ -39,10 +38,54 @@ func ListAndWatchContainers(ctx context.Context, client *docker.Client, changeHa
return false
}

watchStarted := make(chan struct{})
// channel to end goroutines when the initial list of containers fails
endRoutine := make(chan struct{})
syncContainerList := func() error {
f := filters.NewArgs()
f.Add("status", "running")
options := dtypes.ContainerListOptions{
Filters: f,
}
containerList, err := client.ContainerList(ctx, options)
if err != nil {
return err
}

type void struct{}
containersMap := make(map[string]void, len(containerList))

wg := sync.WaitGroup{}
for i := range containerList {
// The Docker API has a different return type for list vs. inspect, and
// no way to get the return type of list for individual containers,
// which makes this harder than it should be.
// Add new entries and skip containers that are already cached
if _, ok := containers[containerList[i].ID]; !ok {
wg.Add(1)
go func(id string) {
lock.Lock()
updateContainer(id)
changeHandler(nil, containers[id])
lock.Unlock()
wg.Done()
}(containerList[i].ID)
}
// Map will be used to find the delta
containersMap[containerList[i].ID] = void{}
}
wg.Wait()
// Find stale entries and delete them
for containerID := range containers {
if _, ok := containersMap[containerID]; !ok {
logger.Debugf("Docker container %s no longer exists, removing from cache", containerID)
changeHandler(containers[containerID], nil)
delete(containers, containerID)
}
}
return nil
}

go func() {
refreshTicker := time.NewTicker(syncTime)
defer refreshTicker.Stop()
// This pattern is taken from
// https://github.com/docker/cli/blob/master/cli/command/container/stats.go
f := filters.NewArgs()
Expand All @@ -67,12 +110,20 @@ func ListAndWatchContainers(ctx context.Context, client *docker.Client, changeHa
logger.Infof("Watching for Docker events since %s", since)
eventCh, errCh := client.Events(ctx, options)

if !utils.IsSignalChanClosed(watchStarted) {
close(watchStarted)
err := syncContainerList()
if err != nil {
logger.WithError(err).Error("Error while syncing container cache")
}

for {
select {
case <-refreshTicker.C:
logger.Debugf("sync container cache")
err := syncContainerList()
if err != nil {
logger.WithError(err).Error("Error while periodically syncing container cache")
}

case event := <-eventCh:
lock.Lock()

Expand All @@ -83,8 +134,8 @@ func ListAndWatchContainers(ctx context.Context, client *docker.Client, changeHa
case "destroy":
logger.Debugf("Docker container was destroyed: %s", event.ID)
if _, ok := containers[event.ID]; ok {
delete(containers, event.ID)
changeHandler(containers[event.ID], nil)
delete(containers, event.ID)
}
default:
oldContainer := containers[event.ID]
Expand All @@ -102,10 +153,6 @@ func ListAndWatchContainers(ctx context.Context, client *docker.Client, changeHa
time.Sleep(3 * time.Second)
continue START_STREAM

case <-endRoutine:
logger.Error("Error building the initial container list, ending routine")
return

case <-ctx.Done():
// Event stream is tied to the same context and will quit
// also.
Expand All @@ -114,36 +161,4 @@ func ListAndWatchContainers(ctx context.Context, client *docker.Client, changeHa
}
}
}()

<-watchStarted

f := filters.NewArgs()
f.Add("status", "running")
options := dtypes.ContainerListOptions{
Filters: f,
}
containerList, err := client.ContainerList(ctx, options)
if err != nil {
close(endRoutine)
return err
}

wg := sync.WaitGroup{}
for i := range containerList {
wg.Add(1)
// The Docker API has a different return type for list vs. inspect, and
// no way to get the return type of list for individual containers,
// which makes this harder than it should be.
go func(id string) {
lock.Lock()
updateContainer(id)
changeHandler(nil, containers[id])
lock.Unlock()
wg.Done()
}(containerList[i].ID)
}

wg.Wait()

return nil
}
10 changes: 5 additions & 5 deletions pkg/monitors/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/signalfx/signalfx-agent/pkg/monitors/types"
"github.com/signalfx/signalfx-agent/pkg/utils"
"github.com/signalfx/signalfx-agent/pkg/utils/filter"
"github.com/signalfx/signalfx-agent/pkg/utils/timeutil"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -49,6 +50,9 @@ type Config struct {
DockerURL string `yaml:"dockerURL" default:"unix:///var/run/docker.sock"`
// The maximum amount of time to wait for docker API requests
TimeoutSeconds int `yaml:"timeoutSeconds" default:"5"`
// The time to wait before resyncing the list of containers the monitor maintains
// through the docker event listener example: cacheSyncInterval: "20m"
CacheSyncInterval timeutil.Duration `yaml:"cacheSyncInterval" default:"60m"`
// A mapping of container label names to dimension names. The corresponding
// label values will become the dimension value for the mapped name. E.g.
// `io.kubernetes.container.name: container_spec_name` would result in a
Expand Down Expand Up @@ -140,11 +144,7 @@ func (m *Monitor) Configure(conf *Config) error {
// Repeat the watch setup in the face of errors in case the docker
// engine is non-responsive when the monitor starts.
if !isRegistered {
err := dockercommon.ListAndWatchContainers(m.ctx, m.client, changeHandler, imageFilter, m.logger)
if err != nil {
m.logger.WithError(err).Error("Could not list docker containers")
return
}
dockercommon.ListAndWatchContainers(m.ctx, m.client, changeHandler, imageFilter, m.logger, conf.CacheSyncInterval.AsDuration())
isRegistered = true
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/observers/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/signalfx/signalfx-agent/pkg/core/config"
"github.com/signalfx/signalfx-agent/pkg/core/services"
"github.com/signalfx/signalfx-agent/pkg/observers"
"github.com/signalfx/signalfx-agent/pkg/utils/timeutil"
)

const (
Expand Down Expand Up @@ -142,6 +143,9 @@ type Docker struct {
type Config struct {
config.ObserverConfig
DockerURL string `yaml:"dockerURL" default:"unix:///var/run/docker.sock"`
// The time to wait before resyncing the list of containers the monitor maintains
// through the docker event listener example: cacheSyncInterval: "20m"
CacheSyncInterval timeutil.Duration `yaml:"cacheSyncInterval" default:"60m"`
// A mapping of container label names to dimension names that will get
// applied to the metrics of all discovered services. The corresponding
// label values will become the dimension values for the mapped name. E.g.
Expand Down Expand Up @@ -187,11 +191,8 @@ func (docker *Docker) Configure(config *Config) error {
var ctx context.Context
ctx, docker.cancel = context.WithCancel(context.Background())

err = dockercommon.ListAndWatchContainers(ctx, client, docker.changeHandler, nil, logger)
if err != nil {
logger.WithError(err).Error("Could not list docker containers")
return err
}
dockercommon.ListAndWatchContainers(ctx, client, docker.changeHandler, nil, logger, config.CacheSyncInterval.AsDuration())

return nil
}

Expand Down
16 changes: 16 additions & 0 deletions selfdescribe.json
Original file line number Diff line number Diff line change
Expand Up @@ -24341,6 +24341,14 @@
"type": "int",
"elementKind": ""
},
{
"yamlName": "cacheSyncInterval",
"doc": "The time to wait before resyncing the list of containers the monitor maintains through the docker event listener example: cacheSyncInterval: \"20m\"",
"default": "60m",
"required": false,
"type": "int64",
"elementKind": ""
},
{
"yamlName": "labelsToDimensions",
"doc": "A mapping of container label names to dimension names. The corresponding label values will become the dimension value for the mapped name. E.g. `io.kubernetes.container.name: container_spec_name` would result in a dimension called `container_spec_name` that has the value of the `io.kubernetes.container.name` container label.",
Expand Down Expand Up @@ -55900,6 +55908,14 @@
"type": "string",
"elementKind": ""
},
{
"yamlName": "cacheSyncInterval",
"doc": "The time to wait before resyncing the list of containers the monitor maintains through the docker event listener example: cacheSyncInterval: \"20m\"",
"default": "60m",
"required": false,
"type": "int64",
"elementKind": ""
},
{
"yamlName": "labelsToDimensions",
"doc": "A mapping of container label names to dimension names that will get applied to the metrics of all discovered services. The corresponding label values will become the dimension values for the mapped name. E.g. `io.kubernetes.container.name: container_spec_name` would result in a dimension called `container_spec_name` that has the value of the `io.kubernetes.container.name` container label.",
Expand Down

0 comments on commit 6269da6

Please sign in to comment.