Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Oct 4, 2023
1 parent c58fc33 commit e198282
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,20 +178,20 @@ func (w *autoRefresh) Start(ctx context.Context) error {
// Update updates the item only if it exists in the cache, return true if we updated the item.
func (w *autoRefresh) Update(id ItemID, item Item) (ok bool) {
w.lock.Lock()
defer w.lock.Unlock()
ok = w.lruMap.Contains(id)
if ok {
w.lruMap.Add(id, item)
}
w.lock.Unlock()
return ok
}

// Delete deletes the item from the cache if it exists.
func (w *autoRefresh) Delete(key interface{}) {
w.lock.Lock()
defer w.lock.Unlock()
w.toDelete.Remove(key)
w.lruMap.Remove(key)
w.lock.Unlock()
}

func (w *autoRefresh) Get(id ItemID) (Item, error) {
Expand Down Expand Up @@ -293,21 +293,21 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
case <-ctx.Done():
return nil
default:
item, shutdown := w.workqueue.Get()
batch, shutdown := w.workqueue.Get()
if shutdown {
logger.Debugf(ctx, "Shutting down worker")
return nil
}

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
w.workqueue.Forget(item)
w.workqueue.Done(item)
w.workqueue.Forget(batch)
w.workqueue.Done(batch)

t := w.metrics.SyncLatency.Start()
batch := *item.(*Batch)
if len(batch) == 1 {
itemID := batch[0].GetID()
newBatch := make(Batch, 0, len(*batch.(*Batch)))
for _, b := range *batch.(*Batch) {
itemID := b.GetID()
item, ok := w.lruMap.Get(itemID)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
Expand All @@ -319,9 +319,10 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
t.Stop()
continue
}
newBatch = append(newBatch, b)
}

updatedBatch, err := w.syncCb(ctx, *item.(*Batch))
updatedBatch, err := w.syncCb(ctx, newBatch)

if err != nil {
w.metrics.SyncErrors.Inc()
Expand Down

0 comments on commit e198282

Please sign in to comment.