From 21b33e1ddf7fa3882d14a6340a84613726b733cd Mon Sep 17 00:00:00 2001 From: Tetsuya Shiota Date: Mon, 6 Sep 2021 16:49:41 +0900 Subject: [PATCH 1/4] update: use SScan instead of SMembers Command --- pkg/backend/redis/task.go | 41 ++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index dec1c6d..295598a 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -38,11 +38,11 @@ import ( ) const ( - KB = 1 << 10 - PayloadMaxSizeInKB = 1 - MessageMaxSizeInKB = 1 - HistoryLengthMax = 10 - MaxNameLength = 1024 + KB = 1 << 10 + PayloadMaxSizeInKB = 1 + MessageMaxSizeInKB = 1 + HistoryLengthMax = 10 + MaxNameLength = 1024 ) func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) { @@ -162,10 +162,7 @@ func (b *Backend) getTasksByUIDs(queueUID string, taskUIDs []string, filter func } func (b *Backend) getTasks(queueUID string, filter func(*task.Task) bool, lggr zerolog.Logger) ([]*task.Task, error) { - taskUIDs, err := b.Client.SMembers(b.tasksKey(queueUID)).Result() - if err == redis.Nil { - return []*task.Task{}, nil - } + taskUIDs, err := b.allTaskUIDsByQueueUID(b.Client, queueUID) if err != nil { return nil, err } @@ -938,10 +935,7 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string) b.deadletterQueueKey(queueUID), b.pendingTaskQueueKey(queueUID), } - taskUIDs, err := rds.SMembers(b.tasksKey(queueUID)).Result() - if err == redis.Nil { - return []string{}, nil - } + taskUIDs, err := b.allTaskUIDsByQueueUID(rds, queueUID) if err != nil { return []string{}, err } @@ -950,3 +944,24 @@ func (b *Backend) allTasksKeysForDeleteQueue(rds redis.Cmdable, queueUID string) } return keysToDelete, nil } + +func (b *Backend) allTaskUIDsByQueueUID(rds redis.Cmdable, queueUID string) ([]string, error) { + var chunkSize = int64(b.ChunkSizeInGet) + var cursor uint64 + var taskUIDs []string + for { + keys, nextCursor, err := rds.SScan(b.tasksKey(queueUID), cursor, "", chunkSize).Result() + if err == redis.Nil { + return []string{}, nil + } + if err != nil { + return []string{}, err + } + taskUIDs = append(taskUIDs, keys...) + cursor = nextCursor + if cursor == 0 { + break + } + } + return taskUIDs, nil +} From 2b99cf7d427f0a8feab8066b7d93c286163f5378 Mon Sep 17 00:00:00 2001 From: Tetsuya Shiota Date: Tue, 7 Sep 2021 17:12:40 +0900 Subject: [PATCH 2/4] update: use UNLINK and chunking keys to delete task keys --- cmd/root.go | 9 +++++---- pkg/backend/config/config.go | 12 +++++++----- pkg/backend/redis/queue.go | 12 ++++++++++-- pkg/backend/redis/redis_test.go | 9 +++++---- pkg/backend/redis/task.go | 10 +++++----- pkg/worker/worker_test.go | 7 ++++--- 6 files changed, 36 insertions(+), 23 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 82d3e88..72b82a6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -297,10 +297,11 @@ func mustInitializeQueueBackend() { queueBackend, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: cmdOpts.Backend, Redis: &backendconfig.RedisConfig{ - KeyPrefix: cmdOpts.Redis.KeyPrefix, - Client: cmdOpts.Redis.NewClient(), - Backoff: cmdOpts.Redis.Backoff, - ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + KeyPrefix: cmdOpts.Redis.KeyPrefix, + Client: cmdOpts.Redis.NewClient(), + Backoff: cmdOpts.Redis.Backoff, + ChunkSizeInGet: cmdOpts.Redis.ChunkSizeInGet, + ChunkSizeInDelete: cmdOpts.Redis.ChunkSizeInDelete, }, }) diff --git a/pkg/backend/config/config.go b/pkg/backend/config/config.go index 1d148ff..66e18ba 100644 --- a/pkg/backend/config/config.go +++ b/pkg/backend/config/config.go @@ -30,10 +30,11 @@ type Config struct { } type RedisConfig struct { - KeyPrefix string - Client *redis.Client - Backoff BackoffConfig - ChunkSizeInGet int + KeyPrefix string + Client *redis.Client + Backoff BackoffConfig + ChunkSizeInGet int + ChunkSizeInDelete int } // TODO: support UniversalOptions @@ -52,7 +53,8 @@ type RedisClientConfig struct { IdleTimeout time.Duration `json:"idleTimeout" yaml:"idleTimeout" default:"5m"` IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"` - ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` + ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInGet" default:"1000"` } func (c RedisClientConfig) NewClient() *redis.Client { diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index 44ea134..f9873b3 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -218,7 +218,7 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { // .. task_keys = collect task keys // WATCh task_keys // MULTI - // DEL {queue_key} worker_keys task_keys + // UNLINK {queue_key} worker_keys task_keys // HDEL {all_queues_key} {queueName} // EXEC txf := func(tx *redis.Tx) error { @@ -240,8 +240,16 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { tx.Watch(taskKeysToDelete...) keysToDelete = append(keysToDelete, taskKeysToDelete...) + chunkSize := b.ChunkSizeInGet + numOfKeysToDelete := len(keysToDelete) _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { - pipe.Del(keysToDelete...) + for begin := 0; begin < numOfKeysToDelete; begin += chunkSize { + end := begin + chunkSize + if end > numOfKeysToDelete { + end = numOfKeysToDelete + } + pipe.Unlink(keysToDelete[begin:end]...) + } pipe.HDel(b.allQueuesKey(), queue.Spec.Name) return nil }) diff --git a/pkg/backend/redis/redis_test.go b/pkg/backend/redis/redis_test.go index 36ce2c5..d097d55 100644 --- a/pkg/backend/redis/redis_test.go +++ b/pkg/backend/redis/redis_test.go @@ -107,10 +107,11 @@ var _ = Describe("Backend", func() { ibackend, err := NewBackend(logger, backendconfig.Config{ BackendType: "redis", Redis: &backendconfig.RedisConfig{ - KeyPrefix: "test", - Client: client, - Backoff: backoffConfig, - ChunkSizeInGet: 1000, + KeyPrefix: "test", + Client: client, + Backoff: backoffConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, }, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index 295598a..e90a6a2 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -38,11 +38,11 @@ import ( ) const ( - KB = 1 << 10 - PayloadMaxSizeInKB = 1 - MessageMaxSizeInKB = 1 - HistoryLengthMax = 10 - MaxNameLength = 1024 + KB = 1 << 10 + PayloadMaxSizeInKB = 1 + MessageMaxSizeInKB = 1 + HistoryLengthMax = 10 + MaxNameLength = 1024 ) func (b *Backend) ensureQueueAndWorkerExists(queueUID, workerUID uuid.UUID) (*taskqueue.TaskQueue, *worker.Worker, error) { diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go index e3b67fc..5f32855 100644 --- a/pkg/worker/worker_test.go +++ b/pkg/worker/worker_test.go @@ -74,9 +74,10 @@ var _ = Describe("Worker", func() { bcknd, err = backendfactory.NewBackend(logger, backendconfig.Config{ BackendType: "redis", Redis: &backendconfig.RedisConfig{ - Client: client, - Backoff: backendConfig, - ChunkSizeInGet: 1000, + Client: client, + Backoff: backendConfig, + ChunkSizeInGet: 1000, + ChunkSizeInDelete: 1000, }, }) Expect(err).NotTo(HaveOccurred()) From f04fb8c49fe959479c796a52d0e1b65ef7af2207 Mon Sep 17 00:00:00 2001 From: Tetsuya Shiota Date: Wed, 8 Sep 2021 17:19:29 +0900 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Shingo Omura --- pkg/backend/config/config.go | 2 +- pkg/backend/redis/queue.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/backend/config/config.go b/pkg/backend/config/config.go index 66e18ba..61906e4 100644 --- a/pkg/backend/config/config.go +++ b/pkg/backend/config/config.go @@ -54,7 +54,7 @@ type RedisClientConfig struct { IdleCheckFrequency time.Duration `json:"idleCheckFrequency" yaml:"idleCheckFrequency" default:"1m"` ChunkSizeInGet int `json:"chunkSizeInGet" yaml:"chunkSizeInGet" default:"10000"` - ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInGet" default:"1000"` + ChunkSizeInDelete int `json:"chunkSizeInDelete" yaml:"chunkSizeInDelete" default:"1000"` } func (c RedisClientConfig) NewClient() *redis.Client { diff --git a/pkg/backend/redis/queue.go b/pkg/backend/redis/queue.go index f9873b3..9b085d9 100644 --- a/pkg/backend/redis/queue.go +++ b/pkg/backend/redis/queue.go @@ -218,7 +218,7 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { // .. task_keys = collect task keys // WATCh task_keys // MULTI - // UNLINK {queue_key} worker_keys task_keys + // UNLINK {queue_key} worker_keys task_keys (chunked) // HDEL {all_queues_key} {queueName} // EXEC txf := func(tx *redis.Tx) error { @@ -240,7 +240,7 @@ func (b *Backend) DeleteQueue(ctx context.Context, queueName string) error { tx.Watch(taskKeysToDelete...) keysToDelete = append(keysToDelete, taskKeysToDelete...) - chunkSize := b.ChunkSizeInGet + chunkSize := b.ChunkSizeInDelete numOfKeysToDelete := len(keysToDelete) _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error { for begin := 0; begin < numOfKeysToDelete; begin += chunkSize { From f2b8503401d9719b8cf674d5b4c53c79def46ce6 Mon Sep 17 00:00:00 2001 From: Tetsuya Shiota Date: Wed, 8 Sep 2021 17:39:41 +0900 Subject: [PATCH 4/4] add test to delete large queue --- pkg/backend/redis/redis_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pkg/backend/redis/redis_test.go b/pkg/backend/redis/redis_test.go index d097d55..1ca3485 100644 --- a/pkg/backend/redis/redis_test.go +++ b/pkg/backend/redis/redis_test.go @@ -278,6 +278,26 @@ var _ = Describe("Backend", func() { Expect(err).To(Equal(iface.TaskQueueNotFound)) }) }) + When("the large queue exists", func() { + It("can delete the queue", func() { + queue := testutil.MustCreateQueue(backend, SampleQueueSpec) + // numOfTasks % chunkSize != 0 && numOfTasks > chunkSize + numOfTasks := 12345 + for i := 0; i < numOfTasks; i++ { + _, err := backend.AddTask(context.Background(), QueueName, SampleTaskSpec) + Expect(err).NotTo(HaveOccurred()) + } + + Expect(backend.DeleteQueue(context.Background(), SampleQueueSpec.Name)).NotTo(HaveOccurred()) + + queuesHash, err := client.HGetAll(backend.allQueuesKey()).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(queuesHash)).To(Equal(0)) + keys, err := client.Keys(backend.queueKey(queue.UID.String()) + "*").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(keys)).To(Equal(0)) + }) + }) }) })