From f550a01dc2bbc79747d071b3837ee2a4945b8482 Mon Sep 17 00:00:00 2001 From: John Simons Date: Thu, 11 Jul 2024 10:57:16 +1000 Subject: [PATCH] Add internal setting to disable delay delivery This is a workaround to allow ServiceControl to disable delay delivery for non main messagepumps --- .../SdkClientsDisposeTests.cs | 3 ++- .../Configure/SqsTransport.cs | 9 +++++-- .../Configure/SqsTransportInfrastructure.cs | 6 +++-- src/NServiceBus.Transport.SQS/MessagePump.cs | 25 +++++++++++++++---- 4 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/NServiceBus.Transport.SQS.Tests/SdkClientsDisposeTests.cs b/src/NServiceBus.Transport.SQS.Tests/SdkClientsDisposeTests.cs index 496357502..acee8efec 100644 --- a/src/NServiceBus.Transport.SQS.Tests/SdkClientsDisposeTests.cs +++ b/src/NServiceBus.Transport.SQS.Tests/SdkClientsDisposeTests.cs @@ -60,7 +60,8 @@ public async Task ShouldDisposeSqsAndSnsClients(bool disposeSqs, bool disposeSns "", false, disposeSqs, - disposeSns); + disposeSns, + false); await sut.Shutdown(CancellationToken.None); diff --git a/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs b/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs index 51e06bd0c..448b7e60b 100644 --- a/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs +++ b/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs @@ -57,6 +57,11 @@ internal set /// public string QueueNamePrefix { get; set; } + /// + /// Disable native delayed delivery infrastructure + /// + internal bool DisableDelayedDelivery { get; set; } = false; + /// /// Specifies a lambda function that allows to take control of the queue name generation logic. /// This is useful to overcome any limitations imposed by SQS. @@ -248,14 +253,14 @@ public override async Task Initialize(HostSettings host AssertQueueNameGeneratorIdempotent(queueNameGenerator); var topicCache = new TopicCache(SnsClient, hostSettings.CoreSettings, eventToTopicsMappings, eventToEventsMappings, topicNameGenerator, topicNamePrefix); - var infra = new SqsTransportInfrastructure(hostSettings, receivers, SqsClient, SnsClient, QueueCache, topicCache, S3, Policies, QueueDelayTime, topicNamePrefix, DoNotWrapOutgoingMessages, !externallyManagedSqsClient, !externallyManagedSnsClient); + var infra = new SqsTransportInfrastructure(hostSettings, receivers, SqsClient, SnsClient, QueueCache, topicCache, S3, Policies, QueueDelayTime, topicNamePrefix, DoNotWrapOutgoingMessages, !externallyManagedSqsClient, !externallyManagedSnsClient, DisableDelayedDelivery); if (hostSettings.SetupInfrastructure) { var queueCreator = new QueueCreator(SqsClient, QueueCache, S3, maxTimeToLive, QueueDelayTime); var createQueueTasks = sendingAddresses.Select(x => queueCreator.CreateQueueIfNecessary(x, false, cancellationToken)) - .Concat(infra.Receivers.Values.Select(x => queueCreator.CreateQueueIfNecessary(x.ReceiveAddress, true, cancellationToken))).ToArray(); + .Concat(infra.Receivers.Values.Select(x => queueCreator.CreateQueueIfNecessary(x.ReceiveAddress, !DisableDelayedDelivery, cancellationToken))).ToArray(); await Task.WhenAll(createQueueTasks).ConfigureAwait(false); } diff --git a/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs b/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs index 3a20fb163..19374ed2c 100644 --- a/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs @@ -15,13 +15,14 @@ class SqsTransportInfrastructure : TransportInfrastructure { public SqsTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings, IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsClient, QueueCache queueCache, TopicCache topicCache, S3Settings s3Settings, PolicySettings policySettings, int queueDelayTimeSeconds, string topicNamePrefix, bool doNotWrapOutgoingMessages, - bool shouldDisposeSqsClient, bool shouldDisposeSnsClient) + bool shouldDisposeSqsClient, bool shouldDisposeSnsClient, bool disableDelayedDelivery) { this.sqsClient = sqsClient; this.snsClient = snsClient; this.queueCache = queueCache; this.shouldDisposeSqsClient = shouldDisposeSqsClient; this.shouldDisposeSnsClient = shouldDisposeSnsClient; + this.disableDelayedDelivery = disableDelayedDelivery; coreSettings = hostSettings.CoreSettings; s3Client = s3Settings?.S3Client; setupInfrastructure = hostSettings.SetupInfrastructure; @@ -42,7 +43,7 @@ IMessageReceiver CreateMessagePump(ReceiveSettings receiveSettings, IAmazonSQS s var receiveAddress = ToTransportAddress(receiveSettings.ReceiveAddress); var subManager = new SubscriptionManager(sqsClient, snsClient, receiveAddress, queueCache, topicCache, policySettings, topicNamePrefix, setupInfrastructure); - return new MessagePump(receiveSettings.Id, receiveAddress, receiveSettings.ErrorQueue, receiveSettings.PurgeOnStartup, sqsClient, queueCache, s3Settings, subManager, queueDelayTimeSeconds, criticalErrorAction, coreSettings, setupInfrastructure); + return new MessagePump(receiveSettings.Id, receiveAddress, receiveSettings.ErrorQueue, receiveSettings.PurgeOnStartup, sqsClient, queueCache, s3Settings, subManager, queueDelayTimeSeconds, criticalErrorAction, coreSettings, setupInfrastructure, disableDelayedDelivery); } public override Task Shutdown(CancellationToken cancellationToken = default) @@ -90,6 +91,7 @@ public override string ToTransportAddress(QueueAddress address) readonly bool setupInfrastructure; readonly bool shouldDisposeSqsClient; readonly bool shouldDisposeSnsClient; + readonly bool disableDelayedDelivery; readonly bool shouldDisposeS3Client; } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/MessagePump.cs b/src/NServiceBus.Transport.SQS/MessagePump.cs index 529894ca4..db0ab846d 100644 --- a/src/NServiceBus.Transport.SQS/MessagePump.cs +++ b/src/NServiceBus.Transport.SQS/MessagePump.cs @@ -8,6 +8,7 @@ namespace NServiceBus.Transport.SQS class MessagePump : IMessageReceiver { + readonly bool disableDelayedDelivery; readonly InputQueuePump inputQueuePump; readonly DelayedMessagesPump delayedMessagesPump; @@ -23,27 +24,41 @@ public MessagePump( int queueDelayTimeSeconds, Action criticalErrorAction, IReadOnlySettings coreSettings, - bool setupInfrastructure) + bool setupInfrastructure, + bool disableDelayedDelivery) { + this.disableDelayedDelivery = disableDelayedDelivery; inputQueuePump = new InputQueuePump(receiverId, receiveAddress, errorQueueAddress, purgeOnStartup, sqsClient, queueCache, s3Settings, subscriptionManager, criticalErrorAction, coreSettings, setupInfrastructure); - delayedMessagesPump = new DelayedMessagesPump(receiveAddress, sqsClient, queueCache, queueDelayTimeSeconds); + if (!disableDelayedDelivery) + { + delayedMessagesPump = + new DelayedMessagesPump(receiveAddress, sqsClient, queueCache, queueDelayTimeSeconds); + } } public async Task Initialize(PushRuntimeSettings limitations, OnMessage onMessage, OnError onError, CancellationToken cancellationToken = default) { await inputQueuePump.Initialize(limitations, onMessage, onError, cancellationToken).ConfigureAwait(false); - await delayedMessagesPump.Initialize(cancellationToken).ConfigureAwait(false); + if (!disableDelayedDelivery) + { + await delayedMessagesPump.Initialize(cancellationToken).ConfigureAwait(false); + } } public async Task StartReceive(CancellationToken cancellationToken = default) { await inputQueuePump.StartReceive(cancellationToken).ConfigureAwait(false); - delayedMessagesPump.Start(cancellationToken); + if (!disableDelayedDelivery) + { + delayedMessagesPump.Start(cancellationToken); + } } public Task StopReceive(CancellationToken cancellationToken = default) { - var stopDelayed = delayedMessagesPump.Stop(cancellationToken); + var stopDelayed = !disableDelayedDelivery + ? delayedMessagesPump.Stop(cancellationToken) + : Task.CompletedTask; var stopPump = inputQueuePump.StopReceive(cancellationToken); return Task.WhenAll(stopDelayed, stopPump);