From 50277ab02752ae583675a158fe1beb7e79bdf16f Mon Sep 17 00:00:00 2001 From: Tomasz Maruszak Date: Thu, 22 Aug 2024 23:49:14 +0200 Subject: [PATCH] [Host.Memory] Ability to use headers in Memory provider without enabling message serialization #295 Signed-off-by: Tomasz Maruszak --- docs/provider_memory.md | 21 +++++++++- docs/provider_memory.t.md | 21 +++++++++- src/Host.Plugin.Properties.xml | 2 +- .../MemoryMessageBus.cs | 4 +- .../MemoryMessageBusSettings.cs | 7 ++++ .../MemoryMessageBusTests.cs | 40 ++++++++++++++++--- 6 files changed, 82 insertions(+), 13 deletions(-) diff --git a/docs/provider_memory.md b/docs/provider_memory.md index d81832d9..f6988fb3 100644 --- a/docs/provider_memory.md +++ b/docs/provider_memory.md @@ -5,7 +5,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Introduction](#introduction) - [Configuration](#configuration) - [Serialization](#serialization) - - [Virtual Topics](#virtual-topics) + - [Headers](#headers) - [Auto Declaration](#auto-declaration) - [Polymorphic message support](#polymorphic-message-support) - [Lifecycle](#lifecycle) @@ -68,6 +68,23 @@ services.AddSlimMessageBus(mbb => > When serialization is disabled for in memory passed messages, the exact same object instance send by the producer will be received by the consumer. Therefore state changes on the consumer end will be visible by the producer. > Consider making the messages immutable (read only) in that case. +### Headers + +The headers published to the memory bus are delivered to the consumer. +This is managed by the `cfg.EnableMessageHeaders` setting, which is enabled by default. + +````cs +services.AddSlimMessageBus(mbb => +{ + mbb.WithProviderMemory(cfg => + { + // Header passing can be disabled when not needed (and to save on memory allocations) + cfg.EnableMessageHeaders = false + }); +}); + +Before version v2.5.1, to enable header passing the [serialization](#serialization) had to be enabled. + ### Virtual Topics Unlike other transport providers, memory transport does not have true notion of topics (or queues). However, it is still required to use topic names. This is required, so that the bus knows on which virtual topic to deliver the message to, and from what virtual topic to consume from. @@ -80,7 +97,7 @@ mbb.Consume(x => x.Topic(x.MessageType.Name).WithConsumer(x => x.Topic("OrderSubmittedEvent").WithConsumer()); -``` +```` The producer configuration side should use `.DefaultTopic()` to set the virtual topic name: diff --git a/docs/provider_memory.t.md b/docs/provider_memory.t.md index e52d9205..ba26e494 100644 --- a/docs/provider_memory.t.md +++ b/docs/provider_memory.t.md @@ -5,7 +5,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat - [Introduction](#introduction) - [Configuration](#configuration) - [Serialization](#serialization) - - [Virtual Topics](#virtual-topics) + - [Headers](#headers) - [Auto Declaration](#auto-declaration) - [Polymorphic message support](#polymorphic-message-support) - [Lifecycle](#lifecycle) @@ -68,6 +68,23 @@ services.AddSlimMessageBus(mbb => > When serialization is disabled for in memory passed messages, the exact same object instance send by the producer will be received by the consumer. Therefore state changes on the consumer end will be visible by the producer. > Consider making the messages immutable (read only) in that case. +### Headers + +The headers published to the memory bus are delivered to the consumer. +This is managed by the `cfg.EnableMessageHeaders` setting, which is enabled by default. + +````cs +services.AddSlimMessageBus(mbb => +{ + mbb.WithProviderMemory(cfg => + { + // Header passing can be disabled when not needed (and to save on memory allocations) + cfg.EnableMessageHeaders = false + }); +}); + +Before version v2.5.1, to enable header passing the [serialization](#serialization) had to be enabled. + ### Virtual Topics Unlike other transport providers, memory transport does not have true notion of topics (or queues). However, it is still required to use topic names. This is required, so that the bus knows on which virtual topic to deliver the message to, and from what virtual topic to consume from. @@ -80,7 +97,7 @@ mbb.Consume(x => x.Topic(x.MessageType.Name).WithConsumer(x => x.Topic("OrderSubmittedEvent").WithConsumer()); -``` +```` The producer configuration side should use `.DefaultTopic()` to set the virtual topic name: diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml index d3116911..3585360d 100644 --- a/src/Host.Plugin.Properties.xml +++ b/src/Host.Plugin.Properties.xml @@ -4,7 +4,7 @@ - 2.5.0 + 2.5.1 \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs index ee351a37..984f2864 100644 --- a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs +++ b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs @@ -55,11 +55,11 @@ protected override void BuildPendingRequestStore() public override IDictionary CreateHeaders() { - if (ProviderSettings.EnableMessageSerialization) + if (ProviderSettings.EnableMessageHeaders) { return base.CreateHeaders(); } - // Memory bus does not require headers + // Do not use headers return null; } diff --git a/src/SlimMessageBus.Host.Memory/MemoryMessageBusSettings.cs b/src/SlimMessageBus.Host.Memory/MemoryMessageBusSettings.cs index ea39a5d5..418fabdc 100644 --- a/src/SlimMessageBus.Host.Memory/MemoryMessageBusSettings.cs +++ b/src/SlimMessageBus.Host.Memory/MemoryMessageBusSettings.cs @@ -16,4 +16,11 @@ public class MemoryMessageBusSettings /// However, if you prefer to have Publish operations non-blocking (asynchronous), you can disable this setting. /// public bool EnableBlockingPublish { get; set; } = true; + + /// + /// This setting allows you to enable or disable the passing of message headers from the publisher to the consumer. + /// By default, message headers are enabled for ease of use. However, if you are not using headers in the memory bus, disabling them can save on memory allocations. + /// + public bool EnableMessageHeaders { get; set; } = true; + } \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs index b205cf75..6b640745 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs @@ -72,14 +72,21 @@ public void When_Create_Given_MessageSerializationEnabled_And_NoSerializerProvid } [Theory] - [InlineData(false)] - [InlineData(true)] - public async Task When_Publish_Given_MessageSerializationSetting_Then_DeliversMessageInstanceToRespectiveConsumers(bool enableMessageSerialization) + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, true)] + [InlineData(true, false)] + public async Task When_Publish_Given_MessageSerializationSetting_Then_DeliversMessageInstanceToRespectiveConsumers(bool enableMessageSerialization, bool enableMessageHeaders) { // arrange const string topicA = "topic-a"; const string topicA2 = "topic-a-2"; const string topicB = "topic-b"; + var headers = new Dictionary + { + ["key1"] = "str", + ["key2"] = 2 + }; _builder.Produce(x => x.DefaultTopic(topicA)); _builder.Produce(x => x.DefaultTopic(topicB)); @@ -87,19 +94,21 @@ public async Task When_Publish_Given_MessageSerializationSetting_Then_DeliversMe _builder.Consume(x => x.Topic(topicA2).WithConsumer()); _builder.Consume(x => x.Topic(topicB).WithConsumer()); - var aConsumerMock = new Mock(); + var aConsumerMock = new Mock() { CallBase = true }; var aConsumer2Mock = new Mock(); var bConsumerMock = new Mock(); + _serviceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(SomeMessageAConsumer))).Returns(aConsumerMock.Object); _serviceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(SomeMessageAConsumer2))).Returns(aConsumer2Mock.Object); _serviceProviderMock.ProviderMock.Setup(x => x.GetService(typeof(SomeMessageBConsumer))).Returns(bConsumerMock.Object); _providerSettings.EnableMessageSerialization = enableMessageSerialization; + _providerSettings.EnableMessageHeaders = enableMessageHeaders; var m = new SomeMessageA(Guid.NewGuid()); // act - await _subject.Value.ProducePublish(m); + await _subject.Value.ProducePublish(m, headers: headers); // assert if (enableMessageSerialization) @@ -110,8 +119,21 @@ public async Task When_Publish_Given_MessageSerializationSetting_Then_DeliversMe { aConsumerMock.Verify(x => x.OnHandle(m), Times.Once); } + + aConsumerMock.VerifySet(x => x.Context = It.IsAny(), Times.Once); aConsumerMock.VerifyNoOtherCalls(); + if (enableMessageHeaders) + { + // All passed headers should be present in the consumer headers + headers.Should().BeSubsetOf(aConsumerMock.Object.Context.Headers); + } + else + { + // The headers should not be present in the consumer headers + aConsumerMock.Object.Context.Headers.Should().BeNull(); + } + aConsumer2Mock.Verify(x => x.OnHandle(It.IsAny()), Times.Never); aConsumer2Mock.VerifyNoOtherCalls(); @@ -172,6 +194,7 @@ public async Task When_Publish_Given_PerMessageScopeEnabled_Then_TheScopeIsCreat scopeProviderMock.Verify(x => x.GetService(typeof(SomeMessageAConsumer)), Times.Once); scopeProviderMock.Verify(x => x.GetService(typeof(IEnumerable>)), Times.Once); + consumerMock.VerifySet(x => x.Context = It.IsAny(), Times.Once); consumerMock.Verify(x => x.OnHandle(m), Times.Once); consumerMock.Verify(x => x.Dispose(), Times.Never); consumerMock.VerifyNoOtherCalls(); @@ -210,6 +233,7 @@ public async Task When_Publish_Given_PerMessageScopeDisabled_Then_TheScopeIsNotC _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once); _serviceProviderMock.ProviderMock.VerifyNoOtherCalls(); + consumerMock.VerifySet(x => x.Context = It.IsAny(), Times.Once); consumerMock.Verify(x => x.OnHandle(m), Times.Once); consumerMock.Verify(x => x.Dispose(), Times.Once); consumerMock.VerifyNoOtherCalls(); @@ -255,6 +279,7 @@ public async Task When_ProducePublish_Given_PerMessageScopeDisabledOrEnabled_And // current scope is not changed MessageScope.Current.Should().BeNull(); + consumerMock.VerifySet(x => x.Context = It.IsAny(), Times.Once); consumerMock.Verify(x => x.OnHandle(m), Times.Once); consumerMock.Verify(x => x.Dispose(), Times.Once); consumerMock.VerifyNoOtherCalls(); @@ -318,6 +343,7 @@ public async Task When_Publish_Given_TwoConsumersOnSameTopic_Then_BothAreInvoked _serviceProviderMock.ProviderMock.Verify(x => x.GetService(typeof(IMessageTypeResolver)), Times.Once); _serviceProviderMock.ProviderMock.VerifyNoOtherCalls(); + consumer1Mock.VerifySet(x => x.Context = It.IsAny(), Times.Once); consumer1Mock.Verify(x => x.OnHandle(m), Times.Once); consumer1Mock.VerifyNoOtherCalls(); @@ -475,8 +501,10 @@ public record SomeMessageA(Guid Value); public record SomeMessageB(Guid Value); -public class SomeMessageAConsumer : IConsumer, IDisposable +public class SomeMessageAConsumer : IConsumer, IConsumerWithContext, IDisposable { + public virtual IConsumerContext Context { get; set; } + public virtual void Dispose() { // Needed to check disposing