Skip to content

Commit

Permalink
Merge pull request #379 from SOF3/discovery-cache-timeout
Browse files Browse the repository at this point in the history
fix(discovery): remove the unnecessary 5-second wait when another goroutine is doing resync
  • Loading branch information
SOF3 authored Jan 9, 2025
2 parents 59823e0 + f753491 commit 8868298
Showing 1 changed file with 64 additions and 21 deletions.
85 changes: 64 additions & 21 deletions pkg/k8s/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ type discoveryCache struct {
ctx context.Context //nolint:containedctx
hasCtx chan struct{}

ResyncMetric *metrics.Metric[*resyncMetric]
ResyncMetric *metrics.Metric[*resyncMetric]
LookupErrorMetric *metrics.Metric[*lookupErrorMetric]

clusters sync.Map
}
Expand All @@ -91,14 +92,16 @@ type clusterDiscoveryCache struct {
initLock sync.Once
initErr error

options *discoveryOptions
logger logrus.FieldLogger
clock clock.Clock
resyncMetric metrics.TaggedMetric
client k8s.Client
options *discoveryOptions
logger logrus.FieldLogger
clock clock.Clock
resyncMetric metrics.TaggedMetric
lookupErrorMetric *metrics.Metric[*lookupErrorMetric]
client k8s.Client

resyncRequestCh chan struct{}
onResyncCh []chan<- struct{}
onResyncCh []chan<- struct{}
resyncCv *sync.Cond
isDoingResync bool

dataLock sync.RWMutex
gvrDetails GvrDetails
Expand All @@ -112,6 +115,14 @@ type resyncMetric struct {

func (*resyncMetric) MetricName() string { return "discovery_resync" }

type lookupErrorMetric struct {
Cluster string
Type string
Query string
}

func (*lookupErrorMetric) MetricName() string { return "discovery_lookup_error" }

func (dc *discoveryCache) Options() manager.Options { return &dc.options }

func (dc *discoveryCache) Init() error { return nil }
Expand All @@ -125,7 +136,7 @@ func (dc *discoveryCache) Start(ctx context.Context) error {
func (dc *discoveryCache) Close(ctx context.Context) error { return nil }

func (dc *discoveryCache) ForCluster(cluster string) (ClusterDiscoveryCache, error) {
cacheAny, _ := dc.clusters.LoadOrStore(cluster, &clusterDiscoveryCache{})
cacheAny, _ := dc.clusters.LoadOrStore(cluster, &clusterDiscoveryCache{resyncCv: sync.NewCond(&sync.Mutex{})})
cdc := cacheAny.(*clusterDiscoveryCache)
// no matter we loaded or stored, someone has to initialize it the first time.
cdc.initLock.Do(func() {
Expand All @@ -135,7 +146,7 @@ func (dc *discoveryCache) ForCluster(cluster string) (ClusterDiscoveryCache, err
cdc.resyncMetric = dc.ResyncMetric.With(&resyncMetric{
Cluster: cluster,
})
cdc.resyncRequestCh = make(chan struct{}, 1)
cdc.lookupErrorMetric = dc.LookupErrorMetric
cdc.client, cdc.initErr = dc.Clients.Cluster(cluster)
if cdc.initErr != nil {
return
Expand All @@ -154,20 +165,41 @@ func (cdc *clusterDiscoveryCache) run(ctx context.Context) {
defer shutdown.RecoverPanic(cdc.logger)

for {
if err := cdc.doResync(); err != nil {
cdc.logger.Error(err)
}
cdc.doResync()

select {
case <-ctx.Done():
return
case <-cdc.clock.After(cdc.options.resyncInterval):
case <-cdc.resyncRequestCh:
}
}
}

func (cdc *clusterDiscoveryCache) doResync() error {
func (cdc *clusterDiscoveryCache) doResync() {
cdc.resyncCv.L.Lock()
defer cdc.resyncCv.L.Unlock()

if cdc.isDoingResync {
// wait for isDoingResync to be set to false by other goroutines
for cdc.isDoingResync {
cdc.resyncCv.Wait()
}

return
}

cdc.isDoingResync = true
defer func() {
cdc.isDoingResync = false
cdc.resyncCv.Broadcast()
}()

if err := cdc.tryDoResync(); err != nil {
cdc.logger.Error(err)
}
}

func (cdc *clusterDiscoveryCache) tryDoResync() error {
defer cdc.resyncMetric.DeferCount(cdc.clock.Now())

// TODO also sync non-target clusters
Expand Down Expand Up @@ -267,6 +299,14 @@ func (cdc *clusterDiscoveryCache) LookupKind(gvr schema.GroupVersionResource) (s
cdc.dataLock.RLock()
gvk, exists = cdc.gvrToGvk[gvr]
cdc.dataLock.RUnlock()

if !exists {
cdc.lookupErrorMetric.With(&lookupErrorMetric{
Cluster: cdc.client.ClusterName(),
Type: "LookupKind",
Query: gvr.String(),
})
}
}

return gvk, exists
Expand All @@ -287,18 +327,21 @@ func (cdc *clusterDiscoveryCache) LookupResource(gvk schema.GroupVersionKind) (s
cdc.dataLock.RLock()
gvr, exists = cdc.gvkToGvr[gvk]
cdc.dataLock.RUnlock()

if !exists {
cdc.lookupErrorMetric.With(&lookupErrorMetric{
Cluster: cdc.client.ClusterName(),
Type: "LookupResource",
Query: gvk.String(),
})
}
}

return gvr, exists
}

func (cdc *clusterDiscoveryCache) RequestAndWaitResync() {
select {
case cdc.resyncRequestCh <- struct{}{}:
default:
}

cdc.clock.Sleep(time.Second * 5) // FIXME: notify resync completion from doResync
cdc.doResync()
}

func (cdc *clusterDiscoveryCache) AddResyncHandler() <-chan struct{} {
Expand Down

0 comments on commit 8868298

Please sign in to comment.