Skip to content

Commit

Permalink
update: use SScan instead of SMembers Command
Browse files Browse the repository at this point in the history
  • Loading branch information
shioshiota committed Sep 7, 2021
1 parent 79c2f88 commit d3e5936
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions pkg/backend/redis/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ import (
)

const (
KB = 1 << 10
PayloadMaxSizeInKB = 1
MessageMaxSizeInKB = 1
HistoryLengthMax = 10
MaxNameLength = 1024
KB = 1 << 10
PayloadMaxSizeInKB = 1
MessageMaxSizeInKB = 1
HistoryLengthMax = 10
MaxNameLength = 1024
taskOperationChunkSize = 1024
)

func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) {
Expand Down Expand Up @@ -162,10 +163,7 @@ func (b *Backend) getTasksByUIDs(queueUID string, taskUIDs []string, filter func
}

func (b *Backend) getTasks(queueUID string, filter func(*task.Task) bool, lggr zerolog.Logger) ([]*task.Task, error) {
taskUIDs, err := b.Client.SMembers(b.tasksKey(queueUID)).Result()
if err == redis.Nil {
return []*task.Task{}, nil
}
taskUIDs, err := b.allTaskUIDsByQueueUID(b.Client, queueUID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -938,10 +936,7 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string)
b.deadletterQueueKey(queueUID),
b.pendingTaskQueueKey(queueUID),
}
taskUIDs, err := rds.SMembers(b.tasksKey(queueUID)).Result()
if err == redis.Nil {
return []string{}, nil
}
taskUIDs, err := b.allTaskUIDsByQueueUID(rds, queueUID)
if err != nil {
return []string{}, err
}
Expand All @@ -950,3 +945,23 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string)
}
return keysToDelete, nil
}

func (b *Backend) allTaskUIDsByQueueUID(rds redis.Cmdable, queueUID string) ([]string, error) {
var cursor uint64
var taskUIDs []string
for {
keys, nextCursor, err := rds.SScan(b.tasksKey(queueUID), cursor, "", taskOperationChunkSize).Result()
if err == redis.Nil {
return []string{}, nil
}
if err != nil {
return []string{}, err
}
taskUIDs = append(taskUIDs, keys...)
cursor = nextCursor
if cursor == 0 {
break
}
}
return taskUIDs, nil
}

0 comments on commit d3e5936

Please sign in to comment.