Skip to content

Commit

Permalink
Async disposable
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Oct 23, 2024
1 parent 0e6b88f commit fb0d467
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public async Task Should_dispose_connection_when_disposed()
await channelProvider.CreateConnection();

var publishConnection = channelProvider.PublishConnections.Dequeue();
channelProvider.Dispose();
await channelProvider.DisposeAsync();

Assert.That(publishConnection.WasDisposed, Is.True);
}
Expand All @@ -53,7 +53,7 @@ public async Task Should_not_attempt_to_recover_during_dispose_when_retry_delay_

// Deliberately not completing the delay task with channelProvider.DelayTaskCompletionSource.SetResult(); before disposing
// to simulate a pending delay task
channelProvider.Dispose();
await channelProvider.DisposeAsync();

await channelProvider.FireAndForgetAction(CancellationToken.None);

Expand All @@ -75,7 +75,7 @@ public async Task Should_dispose_newly_established_connection()
// and await its completion after the channel provider has been disposed.
var fireAndForgetTask = channelProvider.FireAndForgetAction(CancellationToken.None);
channelProvider.DelayTaskCompletionSource.SetResult();
channelProvider.Dispose();
await channelProvider.DisposeAsync();

await fireAndForgetTask;

Expand Down
25 changes: 15 additions & 10 deletions src/NServiceBus.Transport.RabbitMQ/Connection/ChannelProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NServiceBus.Transport.RabbitMQ
using global::RabbitMQ.Client.Events;
using Logging;

class ChannelProvider : IDisposable
class ChannelProvider : IAsyncDisposable
{
public ChannelProvider(ConnectionFactory connectionFactory, TimeSpan retryDelay, IRoutingTopology routingTopology)
{
Expand Down Expand Up @@ -99,42 +99,47 @@ public async ValueTask<ConfirmsAwareChannel> GetPublishChannel(CancellationToken
return channel;
}

channel?.Dispose();
if (channel is not null)
{
await channel.DisposeAsync()
.ConfigureAwait(false);
}

channel = new ConfirmsAwareChannel(connection, routingTopology);
await channel.Initialize(cancellationToken).ConfigureAwait(false);

return channel;
}

public void ReturnPublishChannel(ConfirmsAwareChannel channel)
public ValueTask ReturnPublishChannel(ConfirmsAwareChannel channel, CancellationToken cancellationToken = default)
{
if (channel.IsOpen)
{
channels.Enqueue(channel);
return ValueTask.CompletedTask;
}
else
{
channel.Dispose();
}

return channel.DisposeAsync();
}

public void Dispose()
#pragma warning disable PS0018
public async ValueTask DisposeAsync()
#pragma warning restore PS0018
{
if (disposed)
{
return;
}

stoppingTokenSource.Cancel();
await stoppingTokenSource.CancelAsync().ConfigureAwait(false);
stoppingTokenSource.Dispose();

var oldConnection = Interlocked.Exchange(ref connection, null);
oldConnection?.Dispose();

foreach (var channel in channels)
{
channel.Dispose();
await channel.DisposeAsync().ConfigureAwait(false);
}

disposed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace NServiceBus.Transport.RabbitMQ
using System.Threading.Tasks;
using global::RabbitMQ.Client;

sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology) : IDisposable
sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology) : IAsyncDisposable
{
public bool IsOpen => channel.IsOpen;

Expand Down Expand Up @@ -55,7 +55,9 @@ await routingTopology.RawSendInCaseOfFailure(channel, address, body, properties,
.ConfigureAwait(false);
}

public void Dispose() => channel?.Dispose();
#pragma warning disable PS0018
public ValueTask DisposeAsync() => channel is not null ? channel.DisposeAsync() : ValueTask.CompletedTask;
#pragma warning restore PS0018

IChannel channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public override async Task Shutdown(CancellationToken cancellationToken = defaul
await Task.WhenAll(Receivers.Values.Select(r => r.StopReceive(cancellationToken)))
.ConfigureAwait(false);

channelProvider.Dispose();
await channelProvider.DisposeAsync().ConfigureAwait(false);
}

public override string ToTransportAddress(QueueAddress address) => TranslateAddress(address);
Expand Down
5 changes: 3 additions & 2 deletions src/NServiceBus.Transport.RabbitMQ/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ int GetDeliveryAttempts(BasicDeliverEventArgs message, string messageIdKey)
return attempts;
}

async Task MovePoisonMessage(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, string queue, CancellationToken messageProcessingCancellationToken)
async ValueTask MovePoisonMessage(AsyncEventingBasicConsumer consumer, BasicDeliverEventArgs message, string queue, CancellationToken messageProcessingCancellationToken)
{
try
{
Expand All @@ -521,7 +521,8 @@ async Task MovePoisonMessage(AsyncEventingBasicConsumer consumer, BasicDeliverEv
}
finally
{
channelProvider.ReturnPublishChannel(channel);
await channelProvider.ReturnPublishChannel(channel, messageProcessingCancellationToken)
.ConfigureAwait(false);
}
}
catch (Exception ex) when (!ex.IsCausedBy(messageProcessingCancellationToken))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public async Task Dispatch(TransportOperations outgoingMessages, TransportTransa
}
finally
{
channelProvider.ReturnPublishChannel(channel);
await channelProvider.ReturnPublishChannel(channel, cancellationToken)
.ConfigureAwait(false);
}
}

Expand Down

0 comments on commit fb0d467

Please sign in to comment.