From 988b60710d4fd8fc9524e851def328bd35bc175b Mon Sep 17 00:00:00 2001 From: Absorbing Date: Fri, 9 Feb 2024 18:02:04 +0000 Subject: [PATCH] Add delay capability to rejections for Redis --- pkg/redis/RedisConsumer.php | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/pkg/redis/RedisConsumer.php b/pkg/redis/RedisConsumer.php index 9c93b642a..eb4324637 100644 --- a/pkg/redis/RedisConsumer.php +++ b/pkg/redis/RedisConsumer.php @@ -28,6 +28,11 @@ class RedisConsumer implements Consumer */ private $redeliveryDelay = 300; + /** + * @var int + */ + private $deliveryDelay; + public function __construct(RedisContext $context, RedisDestination $queue) { $this->context = $context; @@ -47,6 +52,19 @@ public function setRedeliveryDelay(int $delay): void $this->redeliveryDelay = $delay; } + /** + * @return int + */ + public function getDeliveryDelay(): ?int + { + return $this->deliveryDelay; + } + + public function setDeliveryDelay(int $deliveryDelay): void + { + $this->deliveryDelay = $deliveryDelay; + } + /** * @return RedisDestination */ @@ -92,7 +110,7 @@ public function acknowledge(Message $message): void /** * @param RedisMessage $message */ - public function reject(Message $message, bool $requeue = false): void + public function reject(Message $message, bool $requeue = false, $delay = 0): void { InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class); @@ -102,13 +120,22 @@ public function reject(Message $message, bool $requeue = false): void $message = $this->getContext()->getSerializer()->toMessage($message->getReservedKey()); $message->setRedelivered(true); + if (null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) { + $message->setDeliveryDelay($this->deliveryDelay); + } + if ($message->getTimeToLive()) { $message->setHeader('expires_at', time() + $message->getTimeToLive()); } - $payload = $this->getContext()->getSerializer()->toString($message); - - $this->getRedis()->lpush($this->queue->getName(), $payload); + if($message->getDeliveryDelay()) { + $deliveryAt = time() + $message->getDeliveryDelay() / 1000; + $payload = $this->getContext()->getSerializer()->toString($message); + $this->getRedis()->zadd($this->queue->getName().':delayed', $payload, $deliveryAt); + } else { + $payload = $this->getContext()->getSerializer()->toString($message); + $this->getRedis()->lpush($this->queue->getName(), $payload); + } } }