Skip to content

Commit

Permalink
Fix delayed message table bug in original implementation (#865)
Browse files Browse the repository at this point in the history
* Fix delayed message table bug

* More uses of DateTimeOffset
  • Loading branch information
DavidBoike authored Jun 23, 2021
1 parent c8a8f88 commit 00a3c3b
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
namespace NServiceBus.AcceptanceTests.DelayedDelivery
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using Features;
using NUnit.Framework;

public class When_deferring_multiple_messages : NServiceBusAcceptanceTest
{
[Test]
public async Task Adaptive_polling_should_work()
{
var delay = TimeSpan.FromSeconds(2);
var longDelay = TimeSpan.FromDays(1);

var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(b => b.When(async (session, c) =>
{
var longOptions = new SendOptions();
longOptions.DelayDeliveryWith(longDelay);
longOptions.RouteToThisEndpoint();
await session.Send(new MyMessage { Which = "Long" }, longOptions);
var options = new SendOptions();
options.DelayDeliveryWith(delay);
options.RouteToThisEndpoint();
await session.Send(new MyMessage { Which = "Short" }, options);
}))
.Done(c => c.WasCalled)
.Run();

Assert.True(context.WasCalled);
Assert.AreEqual("Short", context.WhichWasCalled);
}

public class Context : ScenarioContext
{
public bool WasCalled { get; set; }
public string WhichWasCalled { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(config => config.EnableFeature<TimeoutManager>());
}

public class MyMessageHandler : IHandleMessages<MyMessage>
{
public MyMessageHandler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
testContext.WasCalled = true;
testContext.WhichWasCalled = message.Which;
return Task.FromResult(0);
}

Context testContext;
}
}

public class MyMessage : IMessage
{
public string Which { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ class DelayedMessageTable : IDelayedMessageStore
{
public DelayedMessageTable(string delayedQueueTable, string inputQueueTable)
{
#pragma warning disable 618
storeCommand = string.Format(SqlConstants.StoreDelayedMessageText, delayedQueueTable);
moveDueCommand = string.Format(SqlConstants.MoveDueDelayedMessageText, delayedQueueTable, inputQueueTable);
#pragma warning restore 618
}

public event EventHandler<DateTimeOffset> OnStoreDelayedMessage;
public event EventHandler<DateTime> OnStoreDelayedMessage;

public async Task Store(OutgoingMessage message, TimeSpan dueAfter, string destination, SqlConnection connection, SqlTransaction transaction)
{
Expand All @@ -44,11 +42,11 @@ public async Task Store(OutgoingMessage message, TimeSpan dueAfter, string desti
await command.ExecuteNonQueryAsync().ConfigureAwait(false);
}

OnStoreDelayedMessage?.Invoke(null, DateTimeOffset.UtcNow.Add(dueAfter));
OnStoreDelayedMessage?.Invoke(null, DateTime.UtcNow.Add(dueAfter));
}

/// <returns>The time of the next timeout due</returns>
public async Task<DateTimeOffset> MoveDueMessages(int batchSize, SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken)
public async Task<DateTime> MoveDueMessages(int batchSize, SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken)
{
using (var command = new SqlCommand(moveDueCommand, connection, transaction))
{
Expand All @@ -58,18 +56,18 @@ public async Task<DateTimeOffset> MoveDueMessages(int batchSize, SqlConnection c
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
// No timeouts waiting
return DateTimeOffset.UtcNow.AddMinutes(1);
return DateTime.UtcNow.AddMinutes(1);
}

// Normalizing in case of clock drift between executing machine and SQL Server instance
var sqlNow = reader.GetDateTimeOffset(0);
var sqlNextDue = reader.GetDateTimeOffset(1);
var sqlNow = reader.GetDateTime(0);
var sqlNextDue = reader.GetDateTime(1);
if (sqlNextDue <= sqlNow)
{
return DateTimeOffset.UtcNow;
return DateTime.UtcNow;
}

return DateTimeOffset.UtcNow.Add(sqlNextDue - sqlNow);
return DateTime.UtcNow.Add(sqlNextDue - sqlNow);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public DueDelayedMessageProcessor(DelayedMessageTable table, SqlConnectionFactor
this.table = table;
this.connectionFactory = connectionFactory;
this.batchSize = batchSize;
nextExecution = DateTimeOffset.MinValue;
nextExecution = DateTime.MinValue;
oneMinute = TimeSpan.FromMinutes(1);

table.OnStoreDelayedMessage += OnDelayedMessageStored;
Expand Down Expand Up @@ -63,7 +63,7 @@ async Task MoveMaturedDelayedMessagesAndSwallowExceptions(CancellationToken move
}
}

async Task<DateTimeOffset> ExecuteOnce(CancellationToken moveDelayedMessagesCancellationToken)
async Task<DateTime> ExecuteOnce(CancellationToken moveDelayedMessagesCancellationToken)
{
using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false))
{
Expand All @@ -76,9 +76,9 @@ async Task<DateTimeOffset> ExecuteOnce(CancellationToken moveDelayedMessagesCanc
}
}

async Task WaitForNextExecution(DateTimeOffset nextDueTime, CancellationToken moveDelayedMessagesCancellationToken)
async Task WaitForNextExecution(DateTime nextDueTime, CancellationToken moveDelayedMessagesCancellationToken)
{
var now = DateTimeOffset.UtcNow;
var now = DateTime.UtcNow;

if (nextDueTime <= now)
{
Expand All @@ -99,12 +99,12 @@ async Task WaitForNextExecution(DateTimeOffset nextDueTime, CancellationToken mo
nextExecution = now + oneMinute;
}

while (DateTimeOffset.UtcNow < nextExecution)
while (DateTime.UtcNow < nextExecution)
{
await Task.Delay(1000, moveDelayedMessagesCancellationToken).ConfigureAwait(false);
}
}
void OnDelayedMessageStored(object sender, DateTimeOffset dueTime)
void OnDelayedMessageStored(object sender, DateTime dueTime)
{
if (dueTime < nextExecution)
{
Expand All @@ -118,7 +118,7 @@ void OnDelayedMessageStored(object sender, DateTimeOffset dueTime)

CancellationTokenSource moveDelayedMessagesCancellationTokenSource;
Task moveDelayedMessagesTask;
DateTimeOffset nextExecution;
DateTime nextExecution;
TimeSpan oneMinute;

static readonly ILog Logger = LogManager.GetLogger<DueDelayedMessageProcessor>();
Expand Down

0 comments on commit 00a3c3b

Please sign in to comment.