Skip to content

Commit

Permalink
Merge pull request #612 from Particular/backport-481
Browse files Browse the repository at this point in the history
Faster way to peek messages to retrieve message count (#480)
  • Loading branch information
bording authored May 20, 2020
2 parents 97256b1 + fcc8c90 commit 7ccc9be
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ await ExecuteInTransactionScope(async c =>
static async Task TryPeekQueueSize(TableBasedQueue tableBasedQueue)
{
await ExecuteInTransactionScope(async c => {
tableBasedQueue.FormatPeekCommand(100);
await tableBasedQueue.TryPeek(c, CancellationToken.None, PeekTimeoutInSeconds);
});
}
Expand Down
8 changes: 5 additions & 3 deletions src/NServiceBus.SqlServer/Queuing/SqlConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ DELETE FROM message

public static readonly string PeekText = @"
SELECT count(*) Id
FROM {0} WITH (READPAST)
WHERE Expires IS NULL
OR Expires > GETUTCDATE();";
FROM (
SELECT TOP {1} *
FROM {0} WITH (READPAST)
WHERE Expires IS NULL OR Expires > GETUTCDATE()
) as count_table;";

public static readonly string CreateQueueText = @"
IF EXISTS (
Expand Down
8 changes: 7 additions & 1 deletion src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public TableBasedQueue(string qualifiedTableName, string queueName)
#pragma warning disable 618
this.qualifiedTableName = qualifiedTableName;
Name = queueName;
peekCommand = Format(SqlConstants.PeekText, this.qualifiedTableName);
receiveCommand = Format(SqlConstants.ReceiveText, this.qualifiedTableName);
sendCommand = Format(SqlConstants.SendText, this.qualifiedTableName);
purgeCommand = Format(SqlConstants.PurgeText, this.qualifiedTableName);
Expand All @@ -39,6 +38,13 @@ public virtual async Task<int> TryPeek(SqlConnection connection, CancellationTok
}
}

public void FormatPeekCommand(int maxRecordsToPeek)
{
#pragma warning disable 618
peekCommand = Format(SqlConstants.PeekText, qualifiedTableName, maxRecordsToPeek);
#pragma warning restore 618
}

public virtual async Task<MessageReadResult> TryReceive(SqlConnection connection, SqlTransaction transaction)
{
using (var command = new SqlCommand(receiveCommand, connection, transaction))
Expand Down
1 change: 1 addition & 0 deletions src/NServiceBus.SqlServer/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public async Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext,

public void Start(PushRuntimeSettings limitations)
{
inputQueue.FormatPeekCommand(Math.Min(100, 10 * limitations.MaxConcurrency));
runningReceiveTasks = new ConcurrentDictionary<Task, Task>();
concurrencyLimiter = new SemaphoreSlim(limitations.MaxConcurrency);
cancellationTokenSource = new CancellationTokenSource();
Expand Down

0 comments on commit 7ccc9be

Please sign in to comment.