From 8735632ae574a590cb9d1d92b8704eb8f3bd87df Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Thu, 16 Mar 2023 12:47:45 +0100 Subject: [PATCH] Processing a control message causes the outbox to throw a null reference exception (#738) (#754) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Reproduce the bug * Cleanup * Improve requesting partition key not to be mapped * Fix bug * Move test and align configs * Flip around logic * Move the test file * Add tests and checks for missing table name * Share table name check test and fix validation * Make missing partition key test shared as well * Move to extension method * More succinct check --------- Co-authored-by: danielmarbach # Conflicts: # src/LogicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs # src/NServiceBus.Persistence.AzureTable/Outbox/LogicalOutboxBehavior.cs # src/NServiceBus.Persistence.AzureTable/Outbox/OutboxPersister.cs # src/PhysicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs Co-authored-by: Andreas Öhlund --- .../ConfigureAzureTablePersistence.cs | 57 +++++++++---- .../Outbox/LogicalOutboxBehavior.cs | 2 + .../Outbox/OutboxPersister.cs | 17 +++- .../Outbox/SetAsDispatchedHolderExtensions.cs | 19 +++++ .../ConfigureAzureTablePersistence.cs | 38 ++++++--- .../When_using_outbox_control_message.cs | 82 +++++++++++++++++++ ...oNotRegisterDefaultPartitionKeyProvider.cs | 9 ++ .../DoNotRegisterDefaultTableNameProvider.cs | 9 ++ ...unSettingsExtensionsRequirePartitionKey.cs | 13 +++ .../When_no_partition_key_is_configured.cs | 60 ++++++++++++++ .../When_no_table_name_is_configured.cs | 60 ++++++++++++++ 11 files changed, 335 insertions(+), 31 deletions(-) create mode 100644 src/NServiceBus.Persistence.AzureTable/Outbox/SetAsDispatchedHolderExtensions.cs create mode 100644 src/PhysicalOutbox.StorageTable.AcceptanceTests/When_using_outbox_control_message.cs create mode 100644 src/SharedAcceptanceTests.RequirePartitionKey/DoNotRegisterDefaultPartitionKeyProvider.cs create mode 100644 src/SharedAcceptanceTests.RequirePartitionKey/DoNotRegisterDefaultTableNameProvider.cs create mode 100644 src/SharedAcceptanceTests.RequirePartitionKey/RunSettingsExtensionsRequirePartitionKey.cs create mode 100644 src/SharedAcceptanceTests.RequirePartitionKey/When_no_partition_key_is_configured.cs create mode 100644 src/SharedAcceptanceTests.RequirePartitionKey/When_no_table_name_is_configured.cs diff --git a/src/LogicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs b/src/LogicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs index 21f9f1b2..d25bb414 100644 --- a/src/LogicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs +++ b/src/LogicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs @@ -29,26 +29,26 @@ public Task Configure(string endpointName, EndpointConfiguration configuration, recoverabilitySettings.Immediate(c => c.NumberOfRetries(1)); } - configuration.Pipeline.Register(new PartitionKeyProviderBehavior.PartitionKeyProviderBehaviorRegisterStep()); + if (!settings.TryGet(out _)) + { + configuration.Pipeline.Register(new PartitionKeyProviderBehavior.Registration()); + } + if (!settings.TryGet(out _)) + { + configuration.Pipeline.Register(new TableInformationProviderBehavior.Registration()); + } - return Task.FromResult(0); + return Task.CompletedTask; } Task IConfigureEndpointTestExecution.Cleanup() { - return Task.FromResult(0); + return Task.CompletedTask; } class PartitionKeyProviderBehavior : Behavior { - readonly ScenarioContext scenarioContext; - readonly IReadOnlySettings settings; - - public PartitionKeyProviderBehavior(ScenarioContext scenarioContext, IReadOnlySettings settings) - { - this.settings = settings; - this.scenarioContext = scenarioContext; - } + public PartitionKeyProviderBehavior(ScenarioContext scenarioContext) => this.scenarioContext = scenarioContext; public override Task Invoke(IIncomingLogicalMessageContext context, Func next) { @@ -57,6 +57,27 @@ public override Task Invoke(IIncomingLogicalMessageContext context, Func n context.Extensions.Set(new TableEntityPartitionKey(scenarioContext.TestRunId.ToString())); } + return next(); + } + + readonly ScenarioContext scenarioContext; + + public class Registration : RegisterStep + { + public Registration() : base(nameof(PartitionKeyProviderBehavior), + typeof(PartitionKeyProviderBehavior), + "Populates the partition key", + provider => new PartitionKeyProviderBehavior(provider.GetRequiredService())) => + InsertBeforeIfExists(nameof(LogicalOutboxBehavior)); + } + } + + class TableInformationProviderBehavior : Behavior + { + public TableInformationProviderBehavior(IReadOnlySettings settings) => this.settings = settings; + + public override Task Invoke(IIncomingLogicalMessageContext context, Func next) + { if (!settings.TryGet(out _) && !context.Extensions.TryGet(out _)) { context.Extensions.Set(new TableInformation(SetupFixture.TableName)); @@ -64,15 +85,15 @@ public override Task Invoke(IIncomingLogicalMessageContext context, Func n return next(); } - public class PartitionKeyProviderBehaviorRegisterStep : RegisterStep + readonly IReadOnlySettings settings; + + public class Registration : RegisterStep { - public PartitionKeyProviderBehaviorRegisterStep() : base(nameof(PartitionKeyProviderBehavior), - typeof(PartitionKeyProviderBehavior), - "Populates the partition key", - provider => new PartitionKeyProviderBehavior(provider.GetRequiredService(), provider.GetRequiredService())) - { + public Registration() : base(nameof(TableInformationProviderBehavior), + typeof(TableInformationProviderBehavior), + "Populates the table information", + provider => new TableInformationProviderBehavior(provider.GetRequiredService())) => InsertBeforeIfExists(nameof(LogicalOutboxBehavior)); - } } } } \ No newline at end of file diff --git a/src/NServiceBus.Persistence.AzureTable/Outbox/LogicalOutboxBehavior.cs b/src/NServiceBus.Persistence.AzureTable/Outbox/LogicalOutboxBehavior.cs index 1c0af959..003d6ab2 100644 --- a/src/NServiceBus.Persistence.AzureTable/Outbox/LogicalOutboxBehavior.cs +++ b/src/NServiceBus.Persistence.AzureTable/Outbox/LogicalOutboxBehavior.cs @@ -54,6 +54,8 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func Get(string messageId, ContextBag context, Cance if (!context.TryGet(out var partitionKey)) { - // we return null here to enable outbox work at logical stage - return null; + // because of the transactional session we cannot assume the incoming message is always present + if (!context.TryGet(out var incomingMessage) || + !incomingMessage.Headers.ContainsKey(Headers.ControlMessageHeader)) + { + // we return null here to enable outbox work at logical stage + return null; + } + + partitionKey = new TableEntityPartitionKey(messageId); + context.Set(partitionKey); } + setAsDispatchedHolder.ThrowIfTableClientIsNotSet(); + var outboxRecord = await setAsDispatchedHolder.TableHolder.Table .ReadOutboxRecord(messageId, partitionKey, context, cancellationToken) .ConfigureAwait(false); @@ -58,6 +69,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context } var setAsDispatchedHolder = context.Get(); + setAsDispatchedHolder.ThrowIfTableClientIsNotSet(); var outboxRecord = new OutboxRecord { @@ -76,6 +88,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context public Task SetAsDispatched(string messageId, ContextBag context, CancellationToken cancellationToken = default) { var setAsDispatchedHolder = context.Get(); + setAsDispatchedHolder.ThrowIfTableClientIsNotSet(); var tableHolder = setAsDispatchedHolder.TableHolder; var record = setAsDispatchedHolder.Record; diff --git a/src/NServiceBus.Persistence.AzureTable/Outbox/SetAsDispatchedHolderExtensions.cs b/src/NServiceBus.Persistence.AzureTable/Outbox/SetAsDispatchedHolderExtensions.cs new file mode 100644 index 00000000..7468be16 --- /dev/null +++ b/src/NServiceBus.Persistence.AzureTable/Outbox/SetAsDispatchedHolderExtensions.cs @@ -0,0 +1,19 @@ +#nullable enable + +namespace NServiceBus.Persistence.AzureTable +{ + using System; + + static class SetAsDispatchedHolderExtensions + { + public static void ThrowIfTableClientIsNotSet(this SetAsDispatchedHolder setAsDispatchedHolder) + { + if (setAsDispatchedHolder.TableHolder is { Table: not null }) + { + return; + } + + throw new Exception($"For the outbox to work a table name must be configured. Either configure a default one using '{nameof(ConfigureAzureStorage.DefaultTable)}' or set one via a behavior calling `context.Extensions.Set(new {nameof(TableInformation)}(\"SomeTableName\"))`"); + } + } +} \ No newline at end of file diff --git a/src/PhysicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs b/src/PhysicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs index 19565044..19f2f091 100644 --- a/src/PhysicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs +++ b/src/PhysicalOutbox.StorageTable.AcceptanceTests/ConfigureAzureTablePersistence.cs @@ -29,26 +29,28 @@ public Task Configure(string endpointName, EndpointConfiguration configuration, } // This populates the partition key at the physical stage to test the conventional outbox use-case - configuration.Pipeline.Register(typeof(PartitionKeyProviderBehavior), "Populates the partition key"); + if (!settings.TryGet(out _)) + { + configuration.Pipeline.Register(typeof(PartitionKeyProviderBehavior), "Populates the partition key"); + } + + if (!settings.TryGet(out _)) + { + configuration.Pipeline.Register(typeof(TableInformationProviderBehavior), "Populates the table information key"); + } + - return Task.FromResult(0); + return Task.CompletedTask; } Task IConfigureEndpointTestExecution.Cleanup() { - return Task.FromResult(0); + return Task.CompletedTask; } class PartitionKeyProviderBehavior : Behavior { - readonly ScenarioContext scenarioContext; - readonly IReadOnlySettings settings; - - public PartitionKeyProviderBehavior(ScenarioContext scenarioContext, IReadOnlySettings settings) - { - this.settings = settings; - this.scenarioContext = scenarioContext; - } + public PartitionKeyProviderBehavior(ScenarioContext scenarioContext) => this.scenarioContext = scenarioContext; public override Task Invoke(ITransportReceiveContext context, Func next) { @@ -57,11 +59,25 @@ public override Task Invoke(ITransportReceiveContext context, Func next) context.Extensions.Set(new TableEntityPartitionKey(scenarioContext.TestRunId.ToString())); } + return next(); + } + + readonly ScenarioContext scenarioContext; + } + + class TableInformationProviderBehavior : Behavior + { + public TableInformationProviderBehavior(IReadOnlySettings settings) => this.settings = settings; + + public override Task Invoke(ITransportReceiveContext context, Func next) + { if (!settings.TryGet(out _) && !context.Extensions.TryGet(out _)) { context.Extensions.Set(new TableInformation(SetupFixture.TableName)); } return next(); } + + readonly IReadOnlySettings settings; } } \ No newline at end of file diff --git a/src/PhysicalOutbox.StorageTable.AcceptanceTests/When_using_outbox_control_message.cs b/src/PhysicalOutbox.StorageTable.AcceptanceTests/When_using_outbox_control_message.cs new file mode 100644 index 00000000..9cd5aeb5 --- /dev/null +++ b/src/PhysicalOutbox.StorageTable.AcceptanceTests/When_using_outbox_control_message.cs @@ -0,0 +1,82 @@ +namespace NServiceBus.AcceptanceTests +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using AcceptanceTesting; + using EndpointTemplates; + using NServiceBus.AcceptanceTesting.Support; + using NServiceBus.Features; + using NServiceBus.Pipeline; + using NServiceBus.Routing; + using NServiceBus.Transport; + using NServiceBus.Unicast.Transport; + using NUnit.Framework; + + [TestFixture] + public class When_using_outbox_control_message : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_work() + { + var runSettings = new RunSettings(); + runSettings.DoNotRegisterDefaultPartitionKeyProvider(); + + var context = await Scenario.Define() + .WithEndpoint() + .Done(c => c.ProcessedControlMessage) + .Run(runSettings) + .ConfigureAwait(false); + + Assert.True(context.ProcessedControlMessage); + } + + public class Context : ScenarioContext + { + public bool ProcessedControlMessage { get; set; } + } + + public class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() => + EndpointSetup((config, runDescriptor) => + { + config.EnableOutbox(); + config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + config.RegisterStartupTask(); + config.Pipeline.Register(new ControlMessageBehavior(runDescriptor.ScenarioContext as Context), "Checks that the control message was processed successfully"); + }); + + class ControlMessageSender : FeatureStartupTask + { + public ControlMessageSender(IMessageDispatcher dispatcher) => this.dispatcher = dispatcher; + + protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken = default) + { + var controlMessage = ControlMessageFactory.Create(MessageIntent.Subscribe); + var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(Endpoint)))); + + return dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), cancellationToken); + } + + protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask; + + readonly IMessageDispatcher dispatcher; + } + + class ControlMessageBehavior : Behavior + { + public ControlMessageBehavior(Context testContext) => this.testContext = testContext; + + public override async Task Invoke(IIncomingPhysicalMessageContext context, Func next) + { + await next(); + + testContext.ProcessedControlMessage = true; + } + + readonly Context testContext; + } + } + } +} \ No newline at end of file diff --git a/src/SharedAcceptanceTests.RequirePartitionKey/DoNotRegisterDefaultPartitionKeyProvider.cs b/src/SharedAcceptanceTests.RequirePartitionKey/DoNotRegisterDefaultPartitionKeyProvider.cs new file mode 100644 index 00000000..6ca8012a --- /dev/null +++ b/src/SharedAcceptanceTests.RequirePartitionKey/DoNotRegisterDefaultPartitionKeyProvider.cs @@ -0,0 +1,9 @@ +namespace NServiceBus.AcceptanceTests +{ + class DoNotRegisterDefaultPartitionKeyProvider + { + public DoNotRegisterDefaultPartitionKeyProvider() + { + } + } +} \ No newline at end of file diff --git a/src/SharedAcceptanceTests.RequirePartitionKey/DoNotRegisterDefaultTableNameProvider.cs b/src/SharedAcceptanceTests.RequirePartitionKey/DoNotRegisterDefaultTableNameProvider.cs new file mode 100644 index 00000000..3c9f6c74 --- /dev/null +++ b/src/SharedAcceptanceTests.RequirePartitionKey/DoNotRegisterDefaultTableNameProvider.cs @@ -0,0 +1,9 @@ +namespace NServiceBus.AcceptanceTests +{ + class DoNotRegisterDefaultTableNameProvider + { + public DoNotRegisterDefaultTableNameProvider() + { + } + } +} \ No newline at end of file diff --git a/src/SharedAcceptanceTests.RequirePartitionKey/RunSettingsExtensionsRequirePartitionKey.cs b/src/SharedAcceptanceTests.RequirePartitionKey/RunSettingsExtensionsRequirePartitionKey.cs new file mode 100644 index 00000000..b39278ab --- /dev/null +++ b/src/SharedAcceptanceTests.RequirePartitionKey/RunSettingsExtensionsRequirePartitionKey.cs @@ -0,0 +1,13 @@ +namespace NServiceBus.AcceptanceTests +{ + using NServiceBus.AcceptanceTesting.Support; + + public static partial class RunSettingsExtensions + { + public static void DoNotRegisterDefaultPartitionKeyProvider(this RunSettings runSettings) => + runSettings.Set(new DoNotRegisterDefaultPartitionKeyProvider()); + + public static void DoNotRegisterDefaultTableNameProvider(this RunSettings runSettings) => + runSettings.Set(new DoNotRegisterDefaultTableNameProvider()); + } +} diff --git a/src/SharedAcceptanceTests.RequirePartitionKey/When_no_partition_key_is_configured.cs b/src/SharedAcceptanceTests.RequirePartitionKey/When_no_partition_key_is_configured.cs new file mode 100644 index 00000000..06e7b5b6 --- /dev/null +++ b/src/SharedAcceptanceTests.RequirePartitionKey/When_no_partition_key_is_configured.cs @@ -0,0 +1,60 @@ +namespace NServiceBus.AcceptanceTests +{ + using System.Linq; + using System.Threading.Tasks; + using AcceptanceTesting; + using EndpointTemplates; + using NServiceBus.AcceptanceTesting.Support; + using NUnit.Framework; + + [TestFixture] + public class When_no_partition_key_is_configured : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_throw_meaningful_exception() + { + var runSettings = new RunSettings(); + runSettings.DoNotRegisterDefaultPartitionKeyProvider(); + + var context = await Scenario.Define() + .WithEndpoint(b => + { + b.DoNotFailOnErrorMessages(); + b.When(s => s.SendLocal(new MyMessage())); + }) + .Done(c => c.FailedMessages.Any()) + .Run(runSettings) + .ConfigureAwait(false); + + var failure = context.FailedMessages.FirstOrDefault() + .Value.First(); + StringAssert.Contains("A partition key via", failure.Exception.Message); + } + + public class Context : ScenarioContext + { + + } + + public class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() => + EndpointSetup((config, runDescriptor) => + { + config.EnableOutbox(); + config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + }); + + class MyMessageHandler : IHandleMessages + { + public Task Handle(MyMessage message, IMessageHandlerContext context) + { + Assert.Fail("Should not be called"); + return Task.CompletedTask; + } + } + } + + class MyMessage : IMessage { } + } +} \ No newline at end of file diff --git a/src/SharedAcceptanceTests.RequirePartitionKey/When_no_table_name_is_configured.cs b/src/SharedAcceptanceTests.RequirePartitionKey/When_no_table_name_is_configured.cs new file mode 100644 index 00000000..6828f52d --- /dev/null +++ b/src/SharedAcceptanceTests.RequirePartitionKey/When_no_table_name_is_configured.cs @@ -0,0 +1,60 @@ +namespace NServiceBus.AcceptanceTests +{ + using System.Linq; + using System.Threading.Tasks; + using AcceptanceTesting; + using EndpointTemplates; + using NServiceBus.AcceptanceTesting.Support; + using NUnit.Framework; + + [TestFixture] + public class When_no_table_name_is_configured : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_throw_meaningful_exception() + { + var runSettings = new RunSettings(); + runSettings.DoNotRegisterDefaultTableNameProvider(); + + var context = await Scenario.Define() + .WithEndpoint(b => + { + b.DoNotFailOnErrorMessages(); + b.When(s => s.SendLocal(new MyMessage())); + }) + .Done(c => c.FailedMessages.Any()) + .Run(runSettings) + .ConfigureAwait(false); + + var failure = context.FailedMessages.FirstOrDefault() + .Value.First(); + StringAssert.Contains("table name", failure.Exception.Message); + } + + public class Context : ScenarioContext + { + + } + + public class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() => + EndpointSetup((config, runDescriptor) => + { + config.EnableOutbox(); + config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + }); + + class MyMessageHandler : IHandleMessages + { + public Task Handle(MyMessage message, IMessageHandlerContext context) + { + Assert.Fail("Should not be called"); + return Task.CompletedTask; + } + } + } + + class MyMessage : IMessage { } + } +} \ No newline at end of file