Skip to content

Commit

Permalink
update: use SScan instead of SMembers Command
Browse files Browse the repository at this point in the history
  • Loading branch information
shioshiota committed Sep 7, 2021
1 parent 79c2f88 commit 21b33e1
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions pkg/backend/redis/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (
)

const (
KB = 1 << 10
PayloadMaxSizeInKB = 1
MessageMaxSizeInKB = 1
HistoryLengthMax = 10
MaxNameLength = 1024
KB = 1 << 10
PayloadMaxSizeInKB = 1
MessageMaxSizeInKB = 1
HistoryLengthMax = 10
MaxNameLength = 1024
)

func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) {
Expand Down Expand Up @@ -162,10 +162,7 @@ func (b *Backend) getTasksByUIDs(queueUID string, taskUIDs []string, filter func
}

func (b *Backend) getTasks(queueUID string, filter func(*task.Task) bool, lggr zerolog.Logger) ([]*task.Task, error) {
taskUIDs, err := b.Client.SMembers(b.tasksKey(queueUID)).Result()
if err == redis.Nil {
return []*task.Task{}, nil
}
taskUIDs, err := b.allTaskUIDsByQueueUID(b.Client, queueUID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -938,10 +935,7 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string)
b.deadletterQueueKey(queueUID),
b.pendingTaskQueueKey(queueUID),
}
taskUIDs, err := rds.SMembers(b.tasksKey(queueUID)).Result()
if err == redis.Nil {
return []string{}, nil
}
taskUIDs, err := b.allTaskUIDsByQueueUID(rds, queueUID)
if err != nil {
return []string{}, err
}
Expand All @@ -950,3 +944,24 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string)
}
return keysToDelete, nil
}

func (b *Backend) allTaskUIDsByQueueUID(rds redis.Cmdable, queueUID string) ([]string, error) {
var chunkSize = int64(b.ChunkSizeInGet)
var cursor uint64
var taskUIDs []string
for {
keys, nextCursor, err := rds.SScan(b.tasksKey(queueUID), cursor, "", chunkSize).Result()
if err == redis.Nil {
return []string{}, nil
}
if err != nil {
return []string{}, err
}
taskUIDs = append(taskUIDs, keys...)
cursor = nextCursor
if cursor == 0 {
break
}
}
return taskUIDs, nil
}

0 comments on commit 21b33e1

Please sign in to comment.