Skip to content

Commit

Permalink
Processing a control message causes the outbox to throw a null refere…
Browse files Browse the repository at this point in the history
…nce exception (#738) (#754)

* Reproduce the bug

* Cleanup

* Improve requesting partition key not to be mapped

* Fix bug

* Move test and align configs

* Flip around logic

* Move the test file

* Add tests and checks for missing table name

* Share table name check test and fix validation

* Make missing partition key test shared as well

* Move to extension method

* More succinct check

---------

Co-authored-by: danielmarbach <[email protected]>
# Conflicts:
#	src/LogicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs
#	src/NServiceBus.Persistence.AzureTable/Outbox/LogicalOutboxBehavior.cs
#	src/NServiceBus.Persistence.AzureTable/Outbox/OutboxPersister.cs
#	src/PhysicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs

Co-authored-by: Andreas Öhlund <[email protected]>
  • Loading branch information
danielmarbach and andreasohlund authored Mar 16, 2023
1 parent 556b6c2 commit 8735632
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,26 @@ public Task Configure(string endpointName, EndpointConfiguration configuration,
recoverabilitySettings.Immediate(c => c.NumberOfRetries(1));
}

configuration.Pipeline.Register(new PartitionKeyProviderBehavior.PartitionKeyProviderBehaviorRegisterStep());
if (!settings.TryGet<DoNotRegisterDefaultPartitionKeyProvider>(out _))
{
configuration.Pipeline.Register(new PartitionKeyProviderBehavior.Registration());
}
if (!settings.TryGet<DoNotRegisterDefaultTableNameProvider>(out _))
{
configuration.Pipeline.Register(new TableInformationProviderBehavior.Registration());
}

return Task.FromResult(0);
return Task.CompletedTask;
}

Task IConfigureEndpointTestExecution.Cleanup()
{
return Task.FromResult(0);
return Task.CompletedTask;
}

class PartitionKeyProviderBehavior : Behavior<IIncomingLogicalMessageContext>
{
readonly ScenarioContext scenarioContext;
readonly IReadOnlySettings settings;

public PartitionKeyProviderBehavior(ScenarioContext scenarioContext, IReadOnlySettings settings)
{
this.settings = settings;
this.scenarioContext = scenarioContext;
}
public PartitionKeyProviderBehavior(ScenarioContext scenarioContext) => this.scenarioContext = scenarioContext;

public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
Expand All @@ -57,22 +57,43 @@ public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> n
context.Extensions.Set(new TableEntityPartitionKey(scenarioContext.TestRunId.ToString()));
}

return next();
}

readonly ScenarioContext scenarioContext;

public class Registration : RegisterStep
{
public Registration() : base(nameof(PartitionKeyProviderBehavior),
typeof(PartitionKeyProviderBehavior),
"Populates the partition key",
provider => new PartitionKeyProviderBehavior(provider.GetRequiredService<ScenarioContext>())) =>
InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
}
}

class TableInformationProviderBehavior : Behavior<IIncomingLogicalMessageContext>
{
public TableInformationProviderBehavior(IReadOnlySettings settings) => this.settings = settings;

public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
if (!settings.TryGet<TableInformation>(out _) && !context.Extensions.TryGet<TableInformation>(out _))
{
context.Extensions.Set(new TableInformation(SetupFixture.TableName));
}
return next();
}

public class PartitionKeyProviderBehaviorRegisterStep : RegisterStep
readonly IReadOnlySettings settings;

public class Registration : RegisterStep
{
public PartitionKeyProviderBehaviorRegisterStep() : base(nameof(PartitionKeyProviderBehavior),
typeof(PartitionKeyProviderBehavior),
"Populates the partition key",
provider => new PartitionKeyProviderBehavior(provider.GetRequiredService<ScenarioContext>(), provider.GetRequiredService<IReadOnlySettings>()))
{
public Registration() : base(nameof(TableInformationProviderBehavior),
typeof(TableInformationProviderBehavior),
"Populates the table information",
provider => new TableInformationProviderBehavior(provider.GetRequiredService<IReadOnlySettings>())) =>
InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL
IOutboxTransaction.PartitionKey = partitionKey;
IOutboxTransaction.StorageSession.TableHolder = tableHolder;

setAsDispatchedHolder.ThrowIfTableClientIsNotSet();

var outboxRecord = await tableHolder.Table.ReadOutboxRecord(context.MessageId, IOutboxTransaction.PartitionKey.Value, context.Extensions, context.CancellationToken)
.ConfigureAwait(false);

Expand Down
17 changes: 15 additions & 2 deletions src/NServiceBus.Persistence.AzureTable/Outbox/OutboxPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading.Tasks;
using Extensibility;
using Microsoft.Azure.Cosmos.Table;
using NServiceBus.Transport;
using Outbox;

class OutboxPersister : IOutboxStorage
Expand Down Expand Up @@ -34,10 +35,20 @@ public async Task<OutboxMessage> Get(string messageId, ContextBag context, Cance

if (!context.TryGet<TableEntityPartitionKey>(out var partitionKey))
{
// we return null here to enable outbox work at logical stage
return null;
// because of the transactional session we cannot assume the incoming message is always present
if (!context.TryGet<IncomingMessage>(out var incomingMessage) ||
!incomingMessage.Headers.ContainsKey(Headers.ControlMessageHeader))
{
// we return null here to enable outbox work at logical stage
return null;
}

partitionKey = new TableEntityPartitionKey(messageId);
context.Set(partitionKey);
}

setAsDispatchedHolder.ThrowIfTableClientIsNotSet();

var outboxRecord = await setAsDispatchedHolder.TableHolder.Table
.ReadOutboxRecord(messageId, partitionKey, context, cancellationToken)
.ConfigureAwait(false);
Expand All @@ -58,6 +69,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context
}

var setAsDispatchedHolder = context.Get<SetAsDispatchedHolder>();
setAsDispatchedHolder.ThrowIfTableClientIsNotSet();

var outboxRecord = new OutboxRecord
{
Expand All @@ -76,6 +88,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context
public Task SetAsDispatched(string messageId, ContextBag context, CancellationToken cancellationToken = default)
{
var setAsDispatchedHolder = context.Get<SetAsDispatchedHolder>();
setAsDispatchedHolder.ThrowIfTableClientIsNotSet();

var tableHolder = setAsDispatchedHolder.TableHolder;
var record = setAsDispatchedHolder.Record;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#nullable enable

namespace NServiceBus.Persistence.AzureTable
{
using System;

static class SetAsDispatchedHolderExtensions
{
public static void ThrowIfTableClientIsNotSet(this SetAsDispatchedHolder setAsDispatchedHolder)
{
if (setAsDispatchedHolder.TableHolder is { Table: not null })
{
return;
}

throw new Exception($"For the outbox to work a table name must be configured. Either configure a default one using '{nameof(ConfigureAzureStorage.DefaultTable)}' or set one via a behavior calling `context.Extensions.Set(new {nameof(TableInformation)}(\"SomeTableName\"))`");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,28 @@ public Task Configure(string endpointName, EndpointConfiguration configuration,
}

// This populates the partition key at the physical stage to test the conventional outbox use-case
configuration.Pipeline.Register(typeof(PartitionKeyProviderBehavior), "Populates the partition key");
if (!settings.TryGet<DoNotRegisterDefaultPartitionKeyProvider>(out _))
{
configuration.Pipeline.Register(typeof(PartitionKeyProviderBehavior), "Populates the partition key");
}

if (!settings.TryGet<DoNotRegisterDefaultTableNameProvider>(out _))
{
configuration.Pipeline.Register(typeof(TableInformationProviderBehavior), "Populates the table information key");
}


return Task.FromResult(0);
return Task.CompletedTask;
}

Task IConfigureEndpointTestExecution.Cleanup()
{
return Task.FromResult(0);
return Task.CompletedTask;
}

class PartitionKeyProviderBehavior : Behavior<ITransportReceiveContext>
{
readonly ScenarioContext scenarioContext;
readonly IReadOnlySettings settings;

public PartitionKeyProviderBehavior(ScenarioContext scenarioContext, IReadOnlySettings settings)
{
this.settings = settings;
this.scenarioContext = scenarioContext;
}
public PartitionKeyProviderBehavior(ScenarioContext scenarioContext) => this.scenarioContext = scenarioContext;

public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
Expand All @@ -57,11 +59,25 @@ public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
context.Extensions.Set(new TableEntityPartitionKey(scenarioContext.TestRunId.ToString()));
}

return next();
}

readonly ScenarioContext scenarioContext;
}

class TableInformationProviderBehavior : Behavior<ITransportReceiveContext>
{
public TableInformationProviderBehavior(IReadOnlySettings settings) => this.settings = settings;

public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
if (!settings.TryGet<TableInformation>(out _) && !context.Extensions.TryGet<TableInformation>(out _))
{
context.Extensions.Set(new TableInformation(SetupFixture.TableName));
}
return next();
}

readonly IReadOnlySettings settings;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
namespace NServiceBus.AcceptanceTests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.Features;
using NServiceBus.Pipeline;
using NServiceBus.Routing;
using NServiceBus.Transport;
using NServiceBus.Unicast.Transport;
using NUnit.Framework;

[TestFixture]
public class When_using_outbox_control_message : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_work()
{
var runSettings = new RunSettings();
runSettings.DoNotRegisterDefaultPartitionKeyProvider();

var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>()
.Done(c => c.ProcessedControlMessage)
.Run(runSettings)
.ConfigureAwait(false);

Assert.True(context.ProcessedControlMessage);
}

public class Context : ScenarioContext
{
public bool ProcessedControlMessage { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint() =>
EndpointSetup<DefaultServer>((config, runDescriptor) =>
{
config.EnableOutbox();
config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
config.RegisterStartupTask<ControlMessageSender>();
config.Pipeline.Register(new ControlMessageBehavior(runDescriptor.ScenarioContext as Context), "Checks that the control message was processed successfully");
});

class ControlMessageSender : FeatureStartupTask
{
public ControlMessageSender(IMessageDispatcher dispatcher) => this.dispatcher = dispatcher;

protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
{
var controlMessage = ControlMessageFactory.Create(MessageIntent.Subscribe);
var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(Endpoint))));

return dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), cancellationToken);
}

protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask;

readonly IMessageDispatcher dispatcher;
}

class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
{
public ControlMessageBehavior(Context testContext) => this.testContext = testContext;

public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
await next();

testContext.ProcessedControlMessage = true;
}

readonly Context testContext;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace NServiceBus.AcceptanceTests
{
class DoNotRegisterDefaultPartitionKeyProvider
{
public DoNotRegisterDefaultPartitionKeyProvider()
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace NServiceBus.AcceptanceTests
{
class DoNotRegisterDefaultTableNameProvider
{
public DoNotRegisterDefaultTableNameProvider()
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace NServiceBus.AcceptanceTests
{
using NServiceBus.AcceptanceTesting.Support;

public static partial class RunSettingsExtensions
{
public static void DoNotRegisterDefaultPartitionKeyProvider(this RunSettings runSettings) =>
runSettings.Set(new DoNotRegisterDefaultPartitionKeyProvider());

public static void DoNotRegisterDefaultTableNameProvider(this RunSettings runSettings) =>
runSettings.Set(new DoNotRegisterDefaultTableNameProvider());
}
}
Loading

0 comments on commit 8735632

Please sign in to comment.