Skip to content

Commit

Permalink
Try adding our own tracking back
Browse files Browse the repository at this point in the history
  • Loading branch information
bording authored and danielmarbach committed Sep 20, 2024
1 parent c509ef4 commit f012ae7
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 6 deletions.
137 changes: 131 additions & 6 deletions src/NServiceBus.Transport.RabbitMQ/Connection/ConfirmsAwareChannel.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
namespace NServiceBus.Transport.RabbitMQ
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using global::RabbitMQ.Client;
using global::RabbitMQ.Client.Events;
using NServiceBus.Logging;

sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routingTopology) : IDisposable
{
Expand All @@ -14,11 +18,22 @@ sealed class ConfirmsAwareChannel(IConnection connection, IRoutingTopology routi
public async Task Initialize(CancellationToken cancellationToken = default)
{
channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
await channel.ConfirmSelectAsync(trackConfirmations: true, cancellationToken).ConfigureAwait(false);

channel.BasicAcks += Channel_BasicAcks;
channel.BasicNacks += Channel_BasicNacks;
channel.BasicReturn += Channel_BasicReturn;
channel.ChannelShutdown += Channel_ModelShutdown;

await channel.ConfirmSelectAsync(trackConfirmations: false, cancellationToken).ConfigureAwait(false);
}

public async Task SendMessage(string address, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default)
{
var (taskCompletionSource, registration) = GetCancellableTaskCompletionSource(cancellationToken);
await using var _ = registration.ConfigureAwait(false);

properties.SetConfirmationId(channel.NextPublishSeqNo);

if (properties.Headers != null && properties.Headers.TryGetValue(DelayInfrastructure.DelayHeader, out var delayValue))
{
var routingKey = DelayInfrastructure.CalculateRoutingKey((int)delayValue, address, out var startingDelayLevel);
Expand All @@ -32,26 +47,136 @@ public async Task SendMessage(string address, OutgoingMessage message, BasicProp
await routingTopology.Send(channel, address, message, properties, cancellationToken).ConfigureAwait(false);
}

await channel.WaitForConfirmsOrDieAsync(cancellationToken).ConfigureAwait(false);
await taskCompletionSource.Task.ConfigureAwait(false);
}

public async Task PublishMessage(Type type, OutgoingMessage message, BasicProperties properties, CancellationToken cancellationToken = default)
{
var (taskCompletionSource, registration) = GetCancellableTaskCompletionSource(cancellationToken);
await using var _ = registration.ConfigureAwait(false);

properties.SetConfirmationId(channel.NextPublishSeqNo);

await routingTopology.Publish(channel, type, message, properties, cancellationToken).ConfigureAwait(false);
await channel.WaitForConfirmsOrDieAsync(cancellationToken).ConfigureAwait(false);

await taskCompletionSource.Task.ConfigureAwait(false);
}

public async Task RawSendInCaseOfFailure(string address, ReadOnlyMemory<byte> body, BasicProperties properties, CancellationToken cancellationToken = default)
{
var (taskCompletionSource, registration) = GetCancellableTaskCompletionSource(cancellationToken);
await using var _ = registration.ConfigureAwait(false);

properties.Headers ??= new Dictionary<string, object>();
properties.SetConfirmationId(channel.NextPublishSeqNo);

await routingTopology.RawSendInCaseOfFailure(channel, address, body, properties, cancellationToken).ConfigureAwait(false);
await channel.WaitForConfirmsOrDieAsync(cancellationToken).ConfigureAwait(false);

await taskCompletionSource.Task.ConfigureAwait(false);
}

public void Dispose()
(TaskCompletionSource, IAsyncDisposable) GetCancellableTaskCompletionSource(CancellationToken cancellationToken)
{
channel?.Dispose();
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
// TODO Should we use UnsafeRegister instead?
var registration = cancellationToken.Register(static state =>
{
var (tcs, cancellationToken) = ((TaskCompletionSource, CancellationToken))state!;
tcs.TrySetCanceled(cancellationToken);
}, (tcs, cancellationToken));
var added = messages.TryAdd(channel.NextPublishSeqNo, tcs);

if (!added)
{
throw new Exception($"Cannot publish a message with sequence number '{channel.NextPublishSeqNo}' on this channel. A message was already published on this channel with the same confirmation number.");
}

return (tcs, registration);
}

void Channel_BasicAcks(object sender, BasicAckEventArgs e)
{
if (!e.Multiple)
{
SetResult(e.DeliveryTag);
}
else
{
foreach (var message in messages)
{
if (message.Key <= e.DeliveryTag)
{
SetResult(message.Key);
}
}
}
}

void Channel_BasicNacks(object sender, BasicNackEventArgs e)
{
if (!e.Multiple)
{
SetException(e.DeliveryTag, "Message rejected by broker.");
}
else
{
foreach (var message in messages)
{
if (message.Key <= e.DeliveryTag)
{
SetException(message.Key, "Message rejected by broker.");
}
}
}
}

void Channel_BasicReturn(object sender, BasicReturnEventArgs e)
{
var message = $"Message could not be routed to {e.Exchange + e.RoutingKey}: {e.ReplyCode} {e.ReplyText}";

if (e.BasicProperties.TryGetConfirmationId(out var deliveryTag))
{
SetException(deliveryTag, message);
}
else
{
Logger.Warn(message);
}
}

void Channel_ModelShutdown(object sender, ShutdownEventArgs e)
{
do
{
foreach (var message in messages)
{
SetException(message.Key, $"Channel has been closed: {e}");
}
}
while (!messages.IsEmpty);
}

void SetResult(ulong key)
{
if (messages.TryRemove(key, out var tcs))
{
tcs.SetResult();
}
}

void SetException(ulong key, string exceptionMessage)
{
if (messages.TryRemove(key, out var tcs))
{
tcs.SetException(new Exception(exceptionMessage));
}
}

public void Dispose() => channel?.Dispose();

IChannel channel;
readonly ConcurrentDictionary<ulong, TaskCompletionSource> messages = new();

static readonly ILog Logger = LogManager.GetLogger(typeof(ConfirmsAwareChannel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ static bool CalculateDelay(DispatchProperties dispatchProperties, out long delay
return delayed;
}

public static void SetConfirmationId(this IBasicProperties properties, ulong confirmationId)
{
properties.Headers[ConfirmationIdHeader] = confirmationId.ToString();
}

public static bool TryGetConfirmationId(this IReadOnlyBasicProperties properties, out ulong confirmationId)
{
confirmationId = 0;

return properties.Headers.TryGetValue(ConfirmationIdHeader, out var value) &&
ulong.TryParse(value as byte[] ?? [], out confirmationId);
}

public const string ConfirmationIdHeader = "NServiceBus.Transport.RabbitMQ.ConfirmationId";
public const string UseNonPersistentDeliveryHeader = "NServiceBus.Transport.RabbitMQ.UseNonPersistentDelivery";
}
Expand Down

0 comments on commit f012ae7

Please sign in to comment.