diff --git a/pkg/k8s/discovery/discovery.go b/pkg/k8s/discovery/discovery.go index 04159a8a..5b1c6c91 100644 --- a/pkg/k8s/discovery/discovery.go +++ b/pkg/k8s/discovery/discovery.go @@ -82,7 +82,8 @@ type discoveryCache struct { ctx context.Context //nolint:containedctx hasCtx chan struct{} - ResyncMetric *metrics.Metric[*resyncMetric] + ResyncMetric *metrics.Metric[*resyncMetric] + LookupErrorMetric *metrics.Metric[*lookupErrorMetric] clusters sync.Map } @@ -91,14 +92,16 @@ type clusterDiscoveryCache struct { initLock sync.Once initErr error - options *discoveryOptions - logger logrus.FieldLogger - clock clock.Clock - resyncMetric metrics.TaggedMetric - client k8s.Client + options *discoveryOptions + logger logrus.FieldLogger + clock clock.Clock + resyncMetric metrics.TaggedMetric + lookupErrorMetric *metrics.Metric[*lookupErrorMetric] + client k8s.Client - resyncRequestCh chan struct{} - onResyncCh []chan<- struct{} + onResyncCh []chan<- struct{} + resyncCv *sync.Cond + isDoingResync bool dataLock sync.RWMutex gvrDetails GvrDetails @@ -112,6 +115,14 @@ type resyncMetric struct { func (*resyncMetric) MetricName() string { return "discovery_resync" } +type lookupErrorMetric struct { + Cluster string + Type string + Query string +} + +func (*lookupErrorMetric) MetricName() string { return "discovery_lookup_error" } + func (dc *discoveryCache) Options() manager.Options { return &dc.options } func (dc *discoveryCache) Init() error { return nil } @@ -125,7 +136,7 @@ func (dc *discoveryCache) Start(ctx context.Context) error { func (dc *discoveryCache) Close(ctx context.Context) error { return nil } func (dc *discoveryCache) ForCluster(cluster string) (ClusterDiscoveryCache, error) { - cacheAny, _ := dc.clusters.LoadOrStore(cluster, &clusterDiscoveryCache{}) + cacheAny, _ := dc.clusters.LoadOrStore(cluster, &clusterDiscoveryCache{resyncCv: sync.NewCond(&sync.Mutex{})}) cdc := cacheAny.(*clusterDiscoveryCache) // no matter we loaded or stored, someone has to initialize it the first time. cdc.initLock.Do(func() { @@ -135,7 +146,7 @@ func (dc *discoveryCache) ForCluster(cluster string) (ClusterDiscoveryCache, err cdc.resyncMetric = dc.ResyncMetric.With(&resyncMetric{ Cluster: cluster, }) - cdc.resyncRequestCh = make(chan struct{}, 1) + cdc.lookupErrorMetric = dc.LookupErrorMetric cdc.client, cdc.initErr = dc.Clients.Cluster(cluster) if cdc.initErr != nil { return @@ -154,20 +165,41 @@ func (cdc *clusterDiscoveryCache) run(ctx context.Context) { defer shutdown.RecoverPanic(cdc.logger) for { - if err := cdc.doResync(); err != nil { - cdc.logger.Error(err) - } + cdc.doResync() select { case <-ctx.Done(): return case <-cdc.clock.After(cdc.options.resyncInterval): - case <-cdc.resyncRequestCh: } } } -func (cdc *clusterDiscoveryCache) doResync() error { +func (cdc *clusterDiscoveryCache) doResync() { + cdc.resyncCv.L.Lock() + defer cdc.resyncCv.L.Unlock() + + if cdc.isDoingResync { + // wait for isDoingResync to be set to false by other goroutines + for cdc.isDoingResync { + cdc.resyncCv.Wait() + } + + return + } + + cdc.isDoingResync = true + defer func() { + cdc.isDoingResync = false + cdc.resyncCv.Broadcast() + }() + + if err := cdc.tryDoResync(); err != nil { + cdc.logger.Error(err) + } +} + +func (cdc *clusterDiscoveryCache) tryDoResync() error { defer cdc.resyncMetric.DeferCount(cdc.clock.Now()) // TODO also sync non-target clusters @@ -267,6 +299,14 @@ func (cdc *clusterDiscoveryCache) LookupKind(gvr schema.GroupVersionResource) (s cdc.dataLock.RLock() gvk, exists = cdc.gvrToGvk[gvr] cdc.dataLock.RUnlock() + + if !exists { + cdc.lookupErrorMetric.With(&lookupErrorMetric{ + Cluster: cdc.client.ClusterName(), + Type: "LookupKind", + Query: gvr.String(), + }) + } } return gvk, exists @@ -287,18 +327,21 @@ func (cdc *clusterDiscoveryCache) LookupResource(gvk schema.GroupVersionKind) (s cdc.dataLock.RLock() gvr, exists = cdc.gvkToGvr[gvk] cdc.dataLock.RUnlock() + + if !exists { + cdc.lookupErrorMetric.With(&lookupErrorMetric{ + Cluster: cdc.client.ClusterName(), + Type: "LookupResource", + Query: gvk.String(), + }) + } } return gvr, exists } func (cdc *clusterDiscoveryCache) RequestAndWaitResync() { - select { - case cdc.resyncRequestCh <- struct{}{}: - default: - } - - cdc.clock.Sleep(time.Second * 5) // FIXME: notify resync completion from doResync + cdc.doResync() } func (cdc *clusterDiscoveryCache) AddResyncHandler() <-chan struct{} {