Skip to content

Commit

Permalink
Synchronize for now the sequence number access around publish (delibe…
Browse files Browse the repository at this point in the history
…rate wider scope for safety as a trial)
  • Loading branch information
danielmarbach committed Sep 20, 2024
1 parent 7bde1bd commit 59a4e50
Showing 1 changed file with 49 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,34 @@ public async Task SendMessage(string address, OutgoingMessage message, BasicProp
var (taskCompletionSource, registration) = GetCancellableTaskCompletionSource(cancellationToken);
await using var _ = registration.ConfigureAwait(false);

properties.SetConfirmationId(channel.NextPublishSeqNo);

if (properties.Headers != null && properties.Headers.TryGetValue(DelayInfrastructure.DelayHeader, out var delayValue))
try
{
var routingKey = DelayInfrastructure.CalculateRoutingKey((int)delayValue, address, out var startingDelayLevel);
await sequenceNumberSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

properties.SetConfirmationId(channel.NextPublishSeqNo);

await routingTopology.BindToDelayInfrastructure(channel, address, DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(address), cancellationToken).ConfigureAwait(false);
// The channel is used here directly because it is not the routing topologies concern to know about the sends to the delay infrastructure
await channel.BasicPublishAsync(DelayInfrastructure.LevelName(startingDelayLevel), routingKey, true, properties, message.Body, cancellationToken).ConfigureAwait(false);
if (properties.Headers != null &&
properties.Headers.TryGetValue(DelayInfrastructure.DelayHeader, out var delayValue))
{
var routingKey =
DelayInfrastructure.CalculateRoutingKey((int)delayValue, address, out var startingDelayLevel);

await routingTopology.BindToDelayInfrastructure(channel, address,
DelayInfrastructure.DeliveryExchange, DelayInfrastructure.BindingKey(address),
cancellationToken).ConfigureAwait(false);
// The channel is used here directly because it is not the routing topologies concern to know about the sends to the delay infrastructure
await channel.BasicPublishAsync(DelayInfrastructure.LevelName(startingDelayLevel), routingKey, true,
properties, message.Body, cancellationToken).ConfigureAwait(false);
}
else
{
await routingTopology.Send(channel, address, message, properties, cancellationToken)
.ConfigureAwait(false);
}
}
else
finally
{
await routingTopology.Send(channel, address, message, properties, cancellationToken).ConfigureAwait(false);
sequenceNumberSemaphore.Release();
}

await taskCompletionSource.Task.ConfigureAwait(false);
Expand All @@ -55,9 +70,19 @@ public async Task PublishMessage(Type type, OutgoingMessage message, BasicProper
var (taskCompletionSource, registration) = GetCancellableTaskCompletionSource(cancellationToken);
await using var _ = registration.ConfigureAwait(false);

properties.SetConfirmationId(channel.NextPublishSeqNo);
try
{
await sequenceNumberSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

await routingTopology.Publish(channel, type, message, properties, cancellationToken).ConfigureAwait(false);
properties.SetConfirmationId(channel.NextPublishSeqNo);

await routingTopology.Publish(channel, type, message, properties, cancellationToken)
.ConfigureAwait(false);
}
finally
{
sequenceNumberSemaphore.Release();
}

await taskCompletionSource.Task.ConfigureAwait(false);
}
Expand All @@ -68,9 +93,19 @@ public async Task RawSendInCaseOfFailure(string address, ReadOnlyMemory<byte> bo
await using var _ = registration.ConfigureAwait(false);

properties.Headers ??= new Dictionary<string, object>();
properties.SetConfirmationId(channel.NextPublishSeqNo);
try
{
await sequenceNumberSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

await routingTopology.RawSendInCaseOfFailure(channel, address, body, properties, cancellationToken).ConfigureAwait(false);
properties.SetConfirmationId(channel.NextPublishSeqNo);

await routingTopology.RawSendInCaseOfFailure(channel, address, body, properties, cancellationToken)
.ConfigureAwait(false);
}
finally
{
sequenceNumberSemaphore.Release();
}

await taskCompletionSource.Task.ConfigureAwait(false);
}
Expand Down Expand Up @@ -176,6 +211,7 @@ void SetException(ulong key, string exceptionMessage)

IChannel channel;
readonly ConcurrentDictionary<ulong, TaskCompletionSource> messages = new();
readonly SemaphoreSlim sequenceNumberSemaphore = new(1, 1);

static readonly ILog Logger = LogManager.GetLogger(typeof(ConfirmsAwareChannel));
}
Expand Down

0 comments on commit 59a4e50

Please sign in to comment.