Skip to content

Commit

Permalink
SQL Server transport refuses to process messages if there are too man…
Browse files Browse the repository at this point in the history
…y messages in the queue (#483)

Fixes #481 

When peeking messages, we retrieve the count only up to the maximum concurrency level. This prevents the transport from effectively locking up if there are a lot of records to peek.
  • Loading branch information
kbaley authored Mar 26, 2019
1 parent f354822 commit 5e7fbb0
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static async Task TryPeekQueueSize(TableBasedQueue tableBasedQueue, SqlConnectio
using (var connection = await sqlConnectionFactory.OpenNewConnection())
using (var tx = connection.BeginTransaction())
{
tableBasedQueue.FormatPeekCommand(100);
await tableBasedQueue.TryPeek(connection, tx, CancellationToken.None, PeekTimeoutInSeconds);
scope.Complete();
}
Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.SqlServer/Queuing/SqlConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ DELETE FROM message

public static readonly string PeekText = @"
SELECT count(*) Id
FROM {0} WITH (READPAST);";
FROM (SELECT TOP {1} * FROM {0} WITH (READPAST)) as count_table;";

public static readonly string AddMessageBodyStringColumn = @"
IF NOT EXISTS (
Expand Down
8 changes: 7 additions & 1 deletion src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public TableBasedQueue(string qualifiedTableName, string queueName)
#pragma warning disable 618
this.qualifiedTableName = qualifiedTableName;
Name = queueName;
peekCommand = Format(SqlConstants.PeekText, this.qualifiedTableName);
receiveCommand = Format(SqlConstants.ReceiveText, this.qualifiedTableName);
sendCommand = Format(SqlConstants.SendText, this.qualifiedTableName);
purgeCommand = Format(SqlConstants.PurgeText, this.qualifiedTableName);
Expand All @@ -39,6 +38,13 @@ public virtual async Task<int> TryPeek(SqlConnection connection, SqlTransaction
}
}

public void FormatPeekCommand(int maxRecordsToPeek)
{
#pragma warning disable 618
peekCommand = Format(SqlConstants.PeekText, qualifiedTableName, maxRecordsToPeek);
#pragma warning restore 618
}

public virtual async Task<MessageReadResult> TryReceive(SqlConnection connection, SqlTransaction transaction)
{
using (var command = new SqlCommand(receiveCommand, connection, transaction))
Expand Down
1 change: 1 addition & 0 deletions src/NServiceBus.SqlServer/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public async Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext,

public void Start(PushRuntimeSettings limitations)
{
inputQueue.FormatPeekCommand(Math.Min(100, 10 * limitations.MaxConcurrency));
runningReceiveTasks = new ConcurrentDictionary<Task, Task>();
concurrencyLimiter = new SemaphoreSlim(limitations.MaxConcurrency);
cancellationTokenSource = new CancellationTokenSource();
Expand Down

0 comments on commit 5e7fbb0

Please sign in to comment.