diff --git a/flytestdlib/cache/auto_refresh.go b/flytestdlib/cache/auto_refresh.go index 252706d27a..bb23ef9369 100644 --- a/flytestdlib/cache/auto_refresh.go +++ b/flytestdlib/cache/auto_refresh.go @@ -118,7 +118,7 @@ type autoRefresh struct { lruMap *lru.Cache // Items that are currently being processed are in the processing set. // It will prevent the same item from being processed multiple times by different workers. - processing *syncSet + processing *sync.Map toDelete *syncSet syncPeriod time.Duration workqueue workqueue.RateLimitingInterface @@ -220,7 +220,7 @@ func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) { batch := make([]ItemWrapper, 0, 1) batch = append(batch, itemWrapper{id: id, item: item}) w.workqueue.AddRateLimited(&batch) - w.processing.Insert(id) + w.processing.Store(id, time.Now()) return item, nil } @@ -246,7 +246,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { // If not ok, it means evicted between the item was evicted between getting the keys and this update loop // which is fine, we can just ignore. if value, ok := w.lruMap.Peek(k); ok { - if item, ok := value.(Item); !ok || (ok && !item.IsTerminal() && !w.processing.Contains(k)) { + if item, ok := value.(Item); !ok || (ok && !item.IsTerminal() && !w.inProcessing(k)) { snapshot = append(snapshot, itemWrapper{ id: k.(ItemID), item: value.(Item), @@ -264,7 +264,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { b := batch w.workqueue.AddRateLimited(&b) for i := 1; i < len(b); i++ { - w.processing.Insert(b[i].GetID()) + w.processing.Store(b[i].GetID(), time.Now()) } } @@ -316,7 +316,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { newBatch := make(Batch, 0, len(*batch.(*Batch))) for _, b := range *batch.(*Batch) { itemID := b.GetID() - w.processing.Remove(itemID) + w.processing.Delete(itemID) item, ok := w.lruMap.Get(itemID) if !ok { logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) @@ -359,6 +359,20 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { } } +// Checks if the item is currently being processed and returns false if the item has been in processing for too long +func (w *autoRefresh) inProcessing(key interface{}) bool { + item, found := w.processing.Load(key) + if found { + // handle potential race conditions where the item is in processing but not in the workqueue + if timeItem, ok := item.(time.Time); ok && time.Since(timeItem) > (w.syncPeriod*5) { + w.processing.Delete(key) + return false + } + return true + } + return false +} + // Instantiates a new AutoRefresh Cache that syncs items in batches. func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, syncCb SyncFunc, syncRateLimiter workqueue.RateLimiter, resyncPeriod time.Duration, parallelizm, size int, scope promutils.Scope) (AutoRefresh, error) { @@ -376,7 +390,7 @@ func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, sy createBatchesCb: createBatches, syncCb: syncCb, lruMap: lruCache, - processing: newSyncSet(), + processing: &sync.Map{}, toDelete: newSyncSet(), syncPeriod: resyncPeriod, workqueue: workqueue.NewNamedRateLimitingQueue(syncRateLimiter, scope.CurrentScope()), diff --git a/flytestdlib/cache/auto_refresh_test.go b/flytestdlib/cache/auto_refresh_test.go index 7707b593ff..ca36b54e08 100644 --- a/flytestdlib/cache/auto_refresh_test.go +++ b/flytestdlib/cache/auto_refresh_test.go @@ -210,3 +210,22 @@ func TestQueueBuildUp(t *testing.T) { time.Sleep(5 * time.Second) assert.Equal(t, int32(size), syncCount.Load()) } + +func TestInProcessing(t *testing.T) { + + syncPeriod := time.Millisecond + cache := &autoRefresh{ + processing: &sync.Map{}, + syncPeriod: syncPeriod, + } + + assert.False(t, cache.inProcessing("test")) + + cache.processing.Store("test", time.Now()) + assert.True(t, cache.inProcessing("test")) + + cache.processing.Store("test1", time.Now().Add(syncPeriod*-11)) + assert.False(t, cache.inProcessing("test1")) + _, found := cache.processing.Load("test1") + assert.False(t, found) +}