Skip to content

Commit

Permalink
Merge pull request #2519 from Particular/john/del
Browse files Browse the repository at this point in the history
Add internal property to disable delayed delivery
  • Loading branch information
jpalac authored Jul 11, 2024
2 parents dab74cb + f550a01 commit 2e8168c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public async Task ShouldDisposeSqsAndSnsClients(bool disposeSqs, bool disposeSns
"",
false,
disposeSqs,
disposeSns);
disposeSns,
false);

await sut.Shutdown(CancellationToken.None);

Expand Down
9 changes: 7 additions & 2 deletions src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ internal set
/// </summary>
public string QueueNamePrefix { get; set; }

/// <summary>
/// Disable native delayed delivery infrastructure
/// </summary>
internal bool DisableDelayedDelivery { get; set; } = false;

/// <summary>
/// Specifies a lambda function that allows to take control of the queue name generation logic.
/// This is useful to overcome any limitations imposed by SQS.
Expand Down Expand Up @@ -248,14 +253,14 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
AssertQueueNameGeneratorIdempotent(queueNameGenerator);

var topicCache = new TopicCache(SnsClient, hostSettings.CoreSettings, eventToTopicsMappings, eventToEventsMappings, topicNameGenerator, topicNamePrefix);
var infra = new SqsTransportInfrastructure(hostSettings, receivers, SqsClient, SnsClient, QueueCache, topicCache, S3, Policies, QueueDelayTime, topicNamePrefix, DoNotWrapOutgoingMessages, !externallyManagedSqsClient, !externallyManagedSnsClient);
var infra = new SqsTransportInfrastructure(hostSettings, receivers, SqsClient, SnsClient, QueueCache, topicCache, S3, Policies, QueueDelayTime, topicNamePrefix, DoNotWrapOutgoingMessages, !externallyManagedSqsClient, !externallyManagedSnsClient, DisableDelayedDelivery);

if (hostSettings.SetupInfrastructure)
{
var queueCreator = new QueueCreator(SqsClient, QueueCache, S3, maxTimeToLive, QueueDelayTime);

var createQueueTasks = sendingAddresses.Select(x => queueCreator.CreateQueueIfNecessary(x, false, cancellationToken))
.Concat(infra.Receivers.Values.Select(x => queueCreator.CreateQueueIfNecessary(x.ReceiveAddress, true, cancellationToken))).ToArray();
.Concat(infra.Receivers.Values.Select(x => queueCreator.CreateQueueIfNecessary(x.ReceiveAddress, !DisableDelayedDelivery, cancellationToken))).ToArray();

await Task.WhenAll(createQueueTasks).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ class SqsTransportInfrastructure : TransportInfrastructure
{
public SqsTransportInfrastructure(HostSettings hostSettings, ReceiveSettings[] receiverSettings, IAmazonSQS sqsClient,
IAmazonSimpleNotificationService snsClient, QueueCache queueCache, TopicCache topicCache, S3Settings s3Settings, PolicySettings policySettings, int queueDelayTimeSeconds, string topicNamePrefix, bool doNotWrapOutgoingMessages,
bool shouldDisposeSqsClient, bool shouldDisposeSnsClient)
bool shouldDisposeSqsClient, bool shouldDisposeSnsClient, bool disableDelayedDelivery)
{
this.sqsClient = sqsClient;
this.snsClient = snsClient;
this.queueCache = queueCache;
this.shouldDisposeSqsClient = shouldDisposeSqsClient;
this.shouldDisposeSnsClient = shouldDisposeSnsClient;
this.disableDelayedDelivery = disableDelayedDelivery;
coreSettings = hostSettings.CoreSettings;
s3Client = s3Settings?.S3Client;
setupInfrastructure = hostSettings.SetupInfrastructure;
Expand All @@ -42,7 +43,7 @@ IMessageReceiver CreateMessagePump(ReceiveSettings receiveSettings, IAmazonSQS s
var receiveAddress = ToTransportAddress(receiveSettings.ReceiveAddress);
var subManager = new SubscriptionManager(sqsClient, snsClient, receiveAddress, queueCache, topicCache, policySettings, topicNamePrefix, setupInfrastructure);

return new MessagePump(receiveSettings.Id, receiveAddress, receiveSettings.ErrorQueue, receiveSettings.PurgeOnStartup, sqsClient, queueCache, s3Settings, subManager, queueDelayTimeSeconds, criticalErrorAction, coreSettings, setupInfrastructure);
return new MessagePump(receiveSettings.Id, receiveAddress, receiveSettings.ErrorQueue, receiveSettings.PurgeOnStartup, sqsClient, queueCache, s3Settings, subManager, queueDelayTimeSeconds, criticalErrorAction, coreSettings, setupInfrastructure, disableDelayedDelivery);
}

public override Task Shutdown(CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -90,6 +91,7 @@ public override string ToTransportAddress(QueueAddress address)
readonly bool setupInfrastructure;
readonly bool shouldDisposeSqsClient;
readonly bool shouldDisposeSnsClient;
readonly bool disableDelayedDelivery;
readonly bool shouldDisposeS3Client;
}
}
25 changes: 20 additions & 5 deletions src/NServiceBus.Transport.SQS/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace NServiceBus.Transport.SQS

class MessagePump : IMessageReceiver
{
readonly bool disableDelayedDelivery;
readonly InputQueuePump inputQueuePump;
readonly DelayedMessagesPump delayedMessagesPump;

Expand All @@ -23,27 +24,41 @@ public MessagePump(
int queueDelayTimeSeconds,
Action<string, Exception, CancellationToken> criticalErrorAction,
IReadOnlySettings coreSettings,
bool setupInfrastructure)
bool setupInfrastructure,
bool disableDelayedDelivery)
{
this.disableDelayedDelivery = disableDelayedDelivery;
inputQueuePump = new InputQueuePump(receiverId, receiveAddress, errorQueueAddress, purgeOnStartup, sqsClient, queueCache, s3Settings, subscriptionManager, criticalErrorAction, coreSettings, setupInfrastructure);
delayedMessagesPump = new DelayedMessagesPump(receiveAddress, sqsClient, queueCache, queueDelayTimeSeconds);
if (!disableDelayedDelivery)
{
delayedMessagesPump =
new DelayedMessagesPump(receiveAddress, sqsClient, queueCache, queueDelayTimeSeconds);
}
}

public async Task Initialize(PushRuntimeSettings limitations, OnMessage onMessage, OnError onError, CancellationToken cancellationToken = default)
{
await inputQueuePump.Initialize(limitations, onMessage, onError, cancellationToken).ConfigureAwait(false);
await delayedMessagesPump.Initialize(cancellationToken).ConfigureAwait(false);
if (!disableDelayedDelivery)
{
await delayedMessagesPump.Initialize(cancellationToken).ConfigureAwait(false);
}
}

public async Task StartReceive(CancellationToken cancellationToken = default)
{
await inputQueuePump.StartReceive(cancellationToken).ConfigureAwait(false);
delayedMessagesPump.Start(cancellationToken);
if (!disableDelayedDelivery)
{
delayedMessagesPump.Start(cancellationToken);
}
}

public Task StopReceive(CancellationToken cancellationToken = default)
{
var stopDelayed = delayedMessagesPump.Stop(cancellationToken);
var stopDelayed = !disableDelayedDelivery
? delayedMessagesPump.Stop(cancellationToken)
: Task.CompletedTask;
var stopPump = inputQueuePump.StopReceive(cancellationToken);

return Task.WhenAll(stopDelayed, stopPump);
Expand Down

0 comments on commit 2e8168c

Please sign in to comment.