diff --git a/Directory.Build.props b/Directory.Build.props index bc0a86361..d17709654 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -6,7 +6,7 @@ MIT Copyright (c) 2019 Sergio Aquilini - 1.0.2$(VersionSuffix) + 1.0.3$(VersionSuffix) https://beagle1984.github.io/silverback/ https://github.com/BEagle1984/silverback/ git diff --git a/docs/_data/navigation.yml b/docs/_data/navigation.yml index 5d4d873f6..34b786851 100644 --- a/docs/_data/navigation.yml +++ b/docs/_data/navigation.yml @@ -65,8 +65,8 @@ docs: url: /docs/advanced/serialization - title: Observable Bus url: /docs/advanced/rx - - title: Kafka Partitioning - url: /docs/advanced/partitioning + - title: Kafka Message Key (Partitioning) + url: /docs/advanced/kafka-message-key - title: Using IBroker url: /docs/advanced/broker - title: Chunking diff --git a/docs/_docs/0-introduction/003-releases.md b/docs/_docs/0-introduction/003-releases.md index c48846880..c3983c21b 100644 --- a/docs/_docs/0-introduction/003-releases.md +++ b/docs/_docs/0-introduction/003-releases.md @@ -4,6 +4,12 @@ permalink: /docs/releases toc: true --- +## [1.0.3](https://github.com/BEagle1984/silverback/releases/tag/1.0.3) + +### Fixes +* Kafka message key is not hashed anymore to avoid possible collisions and simplify debugging +* Not really a fix but `PartitioningKeyMemberAttribute` has been deprecated in favor of `KafkaKeyMemberAttribute`, since the message key isn't used just for partitioning (see [Kafka Message Key]({{ site.baseurl }}/docs/advanced/kafka-message-key)) + ## [1.0.2](https://github.com/BEagle1984/silverback/releases/tag/1.0.2) ### Fixes @@ -38,8 +44,12 @@ toc: true * Some changes in `IInboundMessage` and `IOutboundMessage` interfaces * Changes to the schema of the outbox table (`Silverback.Messaging.Connectors.Model.OutboundMessage`) * The configuration fluent API changed quite a bit, refer to the current documentation (e.g. [Using the Bus]({{ site.baseurl }}/docs/quickstart/bus) and [Connecting to a Message Broker]({{ site.baseurl }}/docs/quickstart/message-broker)) + +**Important!** `WithConnectionTo` has to be replaced with `WithConnectionToKafka` in order for all features to work properly. When failing to do so no message key will be generated, causing the messages to land in a random partition and/or preventing to publish to a compacted topic. (see [Kafka Message Key]({{ site.baseurl }}/docs/advanced/kafka-message-key)) +{: .notice--warning} + * `Silverback.Integration.EntityFrameworkCore` and `Silverback.EventSourcing.EntityFrameworkCore` have been deprecated (`Silverback.Core.EntityFrameworkCore` contains all the necessary logic to use EF as store) -* `KeyMemberAttribute` has been renamed to `PartitioningKeyMemberAttribute` (see [Kafka Partitioning]({{ site.baseurl }}/docs/advanced/partitioning)) +* `KeyMemberAttribute` has been renamed to `PartitioningKeyMemberAttribute` (see [Kafka Message Key]({{ site.baseurl }}/docs/advanced/kafka-message-key)) ## [0.10.0](https://github.com/BEagle1984/silverback/releases/tag/0.10.0) diff --git a/docs/_docs/3-advanced/303-kafka-key.md b/docs/_docs/3-advanced/303-kafka-key.md new file mode 100644 index 000000000..37af8e1a7 --- /dev/null +++ b/docs/_docs/3-advanced/303-kafka-key.md @@ -0,0 +1,28 @@ +--- +title: Kafka Message Key (Partitioning) +permalink: /docs/advanced/kafka-message-key +toc: true +--- + +Apache Kafka require a message key for different purposes, such as: +* **Partitioning**: Kafka can guarantee ordering only inside the same partition and it is therefore important to be able to route correlated messages into the same partition. To do so you need to specify a key for each message and Kafka will put all messages with the same key in the same partition. +* **Compacting topics**: A topic can be configured with `cleanup.policy=compact` to instruct Kafka to keep only the latest message related to a certain object, identified by the message key. In other words Kafka will retain only 1 message per each key value. + +Silverback offers a convenient way to specify the message key. It is enough to decorate the properties that must be part of the key with `KafkaKeyMemberAttribute`. + +```c# +public class MultipleKeyMembersMessage : IIntegrationMessage +{ + public Guid Id { get; set; } + + [KafkaKeyMember] + public string One { get; set; } + + [KafkaKeyMember] + public string Two { get; set; } + + public string Three { get; set; } +} +``` + +If no key members are specified no key will be generated and the messages will land in a random partition. diff --git a/docs/_docs/3-advanced/303-partitioning.md b/docs/_docs/3-advanced/303-partitioning.md deleted file mode 100644 index 58d7ee913..000000000 --- a/docs/_docs/3-advanced/303-partitioning.md +++ /dev/null @@ -1,28 +0,0 @@ ---- -title: Kafka Partitioning -permalink: /docs/advanced/partitioning -toc: false ---- - -Kafka can guarantee ordering only inside the same partition and it is therefore important to be able to route correlated messages into the same partition. - -To do so you need to specify a key for each message and Kafka will put all messages with the same key in the same partition. - -Silverback offers a convenient way to specify the message key. It is enough to decorate the properties that must be part of the key with `PartitioningKeyMemberAttribute`. - -```c# -public class MultipleKeyMembersMessage : IIntegrationMessage -{ - public Guid Id { get; set; } - - [PartitioningKeyMember] - public string One { get; set; } - - [PartitioningKeyMember] - public string Two { get; set; } - - public string Three { get; set; } -} -``` - -If no key members are specified no key will be generated and the messages will land in a random partition. \ No newline at end of file diff --git a/samples/Examples/src/Silverback.Examples.Common/Messages/PartitionedSimpleIntegrationEvent.cs b/samples/Examples/src/Silverback.Examples.Common/Messages/PartitionedSimpleIntegrationEvent.cs index faa256210..0c958ad48 100644 --- a/samples/Examples/src/Silverback.Examples.Common/Messages/PartitionedSimpleIntegrationEvent.cs +++ b/samples/Examples/src/Silverback.Examples.Common/Messages/PartitionedSimpleIntegrationEvent.cs @@ -7,7 +7,7 @@ namespace Silverback.Examples.Common.Messages { public class PartitionedSimpleIntegrationEvent : IntegrationEvent { - [PartitioningKeyMember] + [KafkaKeyMember] public string Key { get; set; } } } \ No newline at end of file diff --git a/src/Silverback.Integration.Kafka/Messaging/Behaviors/KafkaPartitioningKeyBehavior.cs b/src/Silverback.Integration.Kafka/Messaging/Behaviors/KafkaPartitioningKeyBehavior.cs index cbbd29eb3..17c47ffcb 100644 --- a/src/Silverback.Integration.Kafka/Messaging/Behaviors/KafkaPartitioningKeyBehavior.cs +++ b/src/Silverback.Integration.Kafka/Messaging/Behaviors/KafkaPartitioningKeyBehavior.cs @@ -15,12 +15,6 @@ namespace Silverback.Messaging.Behaviors { public class KafkaPartitioningKeyBehavior : ISortedBehavior { - private static readonly HashAlgorithm HashAlgorithm = MD5.Create(); - - public KafkaPartitioningKeyBehavior() - { - } - public int SortIndex { get; } = 200; public Task> Handle(IEnumerable messages, MessagesHandler next) @@ -31,14 +25,12 @@ public Task> Handle(IEnumerable messages, MessagesHa private void SetPartitioningKey(IOutboundMessage outboundMessage) { - var key = KeyHelper.GetMessageKey(outboundMessage.Content); + var key = KafkaKeyHelper.GetMessageKey(outboundMessage.Content); if (key == null) return; - var keyHash = Convert.ToBase64String(HashAlgorithm.ComputeHash(key)); - - outboundMessage.Headers.AddOrReplace(KafkaProducer.PartitioningKeyHeaderKey, keyHash); + outboundMessage.Headers.AddOrReplace(KafkaProducer.PartitioningKeyHeaderKey, key); } } } \ No newline at end of file diff --git a/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaProducer.cs b/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaProducer.cs index deec882d3..ada80b377 100644 --- a/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaProducer.cs +++ b/src/Silverback.Integration.Kafka/Messaging/Broker/KafkaProducer.cs @@ -5,6 +5,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Silverback.Messaging.Messages; @@ -76,7 +77,7 @@ private byte[] GetPartitioningKey(IEnumerable headers) return headerValue == null ? null - : Convert.FromBase64String(headerValue); + : Encoding.UTF8.GetBytes(headerValue); } private Confluent.Kafka.IProducer GetInnerProducer() => diff --git a/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaKeyMemberAttribute.cs b/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaKeyMemberAttribute.cs new file mode 100644 index 000000000..f42b2c437 --- /dev/null +++ b/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaKeyMemberAttribute.cs @@ -0,0 +1,17 @@ +// Copyright (c) 2019 Sergio Aquilini +// This code is licensed under MIT license (see LICENSE file for details) + +using System; + +namespace Silverback.Messaging.Messages +{ + /// + /// The properties decorated with this attribute will be used + /// to build the message key that will used by Kafka + /// (for partitioning, compacting, etc.). + /// + [AttributeUsage(AttributeTargets.Property)] + public class KafkaKeyMemberAttribute : Attribute + { + } +} \ No newline at end of file diff --git a/src/Silverback.Integration.Kafka/Messaging/Messages/KeyHelper.cs b/src/Silverback.Integration.Kafka/Messaging/Messages/KeyHelper.cs index 2632234fa..0ff0c0865 100644 --- a/src/Silverback.Integration.Kafka/Messaging/Messages/KeyHelper.cs +++ b/src/Silverback.Integration.Kafka/Messaging/Messages/KeyHelper.cs @@ -2,27 +2,31 @@ // This code is licensed under MIT license (see LICENSE file for details) using System.Linq; -using System.Text; -using Newtonsoft.Json; namespace Silverback.Messaging.Messages { - /// - /// Helps to retrieve the message properties with KeyMember attribute for creating a key. - /// - internal static class KeyHelper + internal static class KafkaKeyHelper { - public static byte[] GetMessageKey(object message) + public static string GetMessageKey(object message) { var keysDictionary = message.GetType() .GetProperties() - .Where(p => p.IsDefined(typeof(PartitioningKeyMemberAttribute), true)) - .ToDictionary(p => p.Name, p => p.GetValue(message, null)); + .Where(p => p.IsDefined(typeof(KafkaKeyMemberAttribute), true) || + p.IsDefined(typeof(PartitioningKeyMemberAttribute), true)) + .Select(p => new + { + p.Name, + Value = p.GetValue(message, null).ToString() + }) + .ToArray(); - return keysDictionary.Count > 0 - ? Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(keysDictionary)) - : null; + if (!keysDictionary.Any()) + return null; + + return keysDictionary.Length == 1 + ? keysDictionary.First().Value : + string.Join(",", keysDictionary.Select(p => $"{p.Name}={p.Value}")); } } } \ No newline at end of file diff --git a/src/Silverback.Integration.Kafka/Messaging/Messages/PartitioningKeyMemberAttribute.cs b/src/Silverback.Integration.Kafka/Messaging/Messages/PartitioningKeyMemberAttribute.cs index f58ed4cec..6aae43607 100644 --- a/src/Silverback.Integration.Kafka/Messaging/Messages/PartitioningKeyMemberAttribute.cs +++ b/src/Silverback.Integration.Kafka/Messaging/Messages/PartitioningKeyMemberAttribute.cs @@ -10,6 +10,7 @@ namespace Silverback.Messaging.Messages /// to build a key that will determine the destination partition on Kafka. /// [AttributeUsage(AttributeTargets.Property)] + [Obsolete("PartitioningKeyMemberAttribute is deprecated, use KafkaKeyMemberAttribute instead.")] public class PartitioningKeyMemberAttribute : Attribute { } diff --git a/tests/Silverback.Integration.Kafka.Tests/Messaging/Behaviors/KafkaPartitioningKeyBehaviorTests.cs b/tests/Silverback.Integration.Kafka.Tests/Messaging/Behaviors/KafkaPartitioningKeyBehaviorTests.cs index bbf4c7e48..a841278df 100644 --- a/tests/Silverback.Integration.Kafka.Tests/Messaging/Behaviors/KafkaPartitioningKeyBehaviorTests.cs +++ b/tests/Silverback.Integration.Kafka.Tests/Messaging/Behaviors/KafkaPartitioningKeyBehaviorTests.cs @@ -35,7 +35,7 @@ public void Handle_NoKeyMembersMessage_KeyHeaderIsNotSet() } [Fact] - public void Handle_SingleKeyMemberMessagesWithSameKey_SameKeyHeaderIsSet() + public void Handle_SingleKeyMemberMessages_KeyHeaderIsSet() { var message1 = new OutboundMessage( new SingleKeyMemberMessage @@ -51,9 +51,9 @@ public void Handle_SingleKeyMemberMessagesWithSameKey_SameKeyHeaderIsSet() new SingleKeyMemberMessage { Id = Guid.NewGuid(), - One = "1", - Two = "2-diff", - Three = "3-diff" + One = "a", + Two = "b", + Three = "c" }, null, new KafkaProducerEndpoint("test-endpoint")); @@ -61,45 +61,13 @@ public void Handle_SingleKeyMemberMessagesWithSameKey_SameKeyHeaderIsSet() new KafkaPartitioningKeyBehavior().Handle(new[] {message1, message2}, Task.FromResult); message1.Headers.Should().ContainEquivalentOf( - new MessageHeader("x-kafka-partitioning-key", "Zylv21PxiCCwBS+PDYZZAQ==")); - message1.Headers.Should().BeEquivalentTo(message2.Headers); - } - - [Fact] - public void Handle_SingleKeyMemberMessagesWithDifferentKey_DifferentKeyHeadersAreSet() - { - var message1 = new OutboundMessage( - new SingleKeyMemberMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2", - Three = "3" - }, - null, - new KafkaProducerEndpoint("test-endpoint")); - var message2 = new OutboundMessage( - new SingleKeyMemberMessage - { - Id = Guid.NewGuid(), - One = "1-diff", - Two = "2", - Three = "3" - }, - null, - new KafkaProducerEndpoint("test-endpoint")); - - new KafkaPartitioningKeyBehavior().Handle(new[] { message1, message2 }, Task.FromResult); - - message1.Headers.Should().ContainEquivalentOf( - new MessageHeader("x-kafka-partitioning-key", "Zylv21PxiCCwBS+PDYZZAQ==")); + new MessageHeader("x-kafka-partitioning-key", "1")); message2.Headers.Should().ContainEquivalentOf( - new MessageHeader("x-kafka-partitioning-key", "pPCpQ+48gFjvHFaQQIiR8w==")); - message1.Headers.Should().NotBeEquivalentTo(message2.Headers); + new MessageHeader("x-kafka-partitioning-key", "a")); } [Fact] - public void Handle_MultipleKeyMembersMessagesWithSameKey_SameKeyHeaderIsSet() + public void Handle_MultipleKeyMembersMessages_KeyHeaderIsSet() { var message1 = new OutboundMessage( new MultipleKeyMembersMessage @@ -115,40 +83,9 @@ public void Handle_MultipleKeyMembersMessagesWithSameKey_SameKeyHeaderIsSet() new MultipleKeyMembersMessage { Id = Guid.NewGuid(), - One = "1", - Two = "2", - Three = "3-diff" - }, - null, - new KafkaProducerEndpoint("test-endpoint")); - - new KafkaPartitioningKeyBehavior().Handle(new[] { message1, message2 }, Task.FromResult); - - message1.Headers.Should().ContainEquivalentOf( - new MessageHeader("x-kafka-partitioning-key", "e/faTzFR/Ey+iSN44gESAg==")); - message1.Headers.Should().BeEquivalentTo(message2.Headers); - } - - [Fact] - public void Handle_MultipleKeyMembersMessagesWithDifferentKey_DifferentKeyHeadersAreSet() - { - var message1 = new OutboundMessage( - new MultipleKeyMembersMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2", - Three = "3" - }, - null, - new KafkaProducerEndpoint("test-endpoint")); - var message2 = new OutboundMessage( - new MultipleKeyMembersMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2-diff", - Three = "3" + One = "a", + Two = "b", + Three = "c" }, null, new KafkaProducerEndpoint("test-endpoint")); @@ -156,10 +93,9 @@ public void Handle_MultipleKeyMembersMessagesWithDifferentKey_DifferentKeyHeader new KafkaPartitioningKeyBehavior().Handle(new[] { message1, message2 }, Task.FromResult); message1.Headers.Should().ContainEquivalentOf( - new MessageHeader("x-kafka-partitioning-key", "e/faTzFR/Ey+iSN44gESAg==")); + new MessageHeader("x-kafka-partitioning-key", "One=1,Two=2")); message2.Headers.Should().ContainEquivalentOf( - new MessageHeader("x-kafka-partitioning-key", "mtNLKV41qmbmRzbizV9QrA==")); - message1.Headers.Should().NotBeEquivalentTo(message2.Headers); + new MessageHeader("x-kafka-partitioning-key", "One=a,Two=b")); } } } \ No newline at end of file diff --git a/tests/Silverback.Integration.Kafka.Tests/Messaging/Messages/KafkaKeyHelperTests.cs b/tests/Silverback.Integration.Kafka.Tests/Messaging/Messages/KafkaKeyHelperTests.cs new file mode 100644 index 000000000..e354f9f3d --- /dev/null +++ b/tests/Silverback.Integration.Kafka.Tests/Messaging/Messages/KafkaKeyHelperTests.cs @@ -0,0 +1,78 @@ +// Copyright (c) 2019 Sergio Aquilini +// This code is licensed under MIT license (see LICENSE file for details) + +using System; +using FluentAssertions; +using Silverback.Messaging.Messages; +using Silverback.Tests.Integration.Kafka.TestTypes.Messages; +using Xunit; + +namespace Silverback.Tests.Integration.Kafka.Messaging.Messages +{ + public class KafkaKeyHelperTests + { + [Fact] + public void GetMessageKey_NoKeyMembersMessage_NullIsReturned() + { + var message = new NoKeyMembersMessage + { + Id = Guid.NewGuid(), + One = "1", + Two = "2", + Three = "3" + }; + + var key = KafkaKeyHelper.GetMessageKey(message); + + key.Should().BeNull(); + } + + [Fact] + public void GetMessageKey_SingleKeyMemberMessage_PropertyValueIsReturned() + { + var message = new SingleKeyMemberMessage + { + Id = Guid.NewGuid(), + One = "1", + Two = "2", + Three = "3" + }; + + var key = KafkaKeyHelper.GetMessageKey(message); + + key.Should().Be("1"); + } + + [Fact] + public void GetMessageKey_MultipleKeyMembersMessagesWithSameKey_ComposedKeyIsReturned() + { + var message = new MultipleKeyMembersMessage + { + Id = Guid.NewGuid(), + One = "1", + Two = "2", + Three = "3" + }; + + var key = KafkaKeyHelper.GetMessageKey(message); + + key.Should().Be("One=1,Two=2"); + } + + [Fact] + public void GetMessageKey_LegacyKeyMemberAttribute_PropertyValueIsReturned() + { + var message = new LegacyKeyMemberAttributeMessage + { + Id = Guid.NewGuid(), + One = "11", + Two = "22", + Three = "33" + }; + + var key = KafkaKeyHelper.GetMessageKey(message); + + key.Should().Be("11"); + } + } +} diff --git a/tests/Silverback.Integration.Kafka.Tests/Messaging/Messages/KeyHelperTests.cs b/tests/Silverback.Integration.Kafka.Tests/Messaging/Messages/KeyHelperTests.cs deleted file mode 100644 index 6707d6009..000000000 --- a/tests/Silverback.Integration.Kafka.Tests/Messaging/Messages/KeyHelperTests.cs +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright (c) 2019 Sergio Aquilini -// This code is licensed under MIT license (see LICENSE file for details) - -using System; -using FluentAssertions; -using Silverback.Messaging.Messages; -using Silverback.Tests.Integration.Kafka.TestTypes.Messages; -using Xunit; - -namespace Silverback.Tests.Integration.Kafka.Messaging.Messages -{ - public class KeyHelperTests - { - [Fact] - public void GetMessageKey_NoKeyMembersMessage_KeyIsEmpty() - { - var message = new NoKeyMembersMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2", - Three = "3" - }; - - var key = KeyHelper.GetMessageKey(message); - - key.Should().BeNull(); - } - - [Fact] - public void GetMessageKey_SingleKeyMemberMessagesWithSameKey_KeyIsEqual() - { - var message1 = new SingleKeyMemberMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2", - Three = "3" - }; - var message2 = new SingleKeyMemberMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2-diff", - Three = "3-diff" - }; - - var key1 = KeyHelper.GetMessageKey(message1); - var key2 = KeyHelper.GetMessageKey(message2); - - key2.Should().BeEquivalentTo(key1); - } - - [Fact] - public void GetMessageKey_SingleKeyMemberMessagesWithDifferentKey_KeyIsNotEqual() - { - var message1 = new SingleKeyMemberMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2", - Three = "3" - }; - var message2 = new SingleKeyMemberMessage - { - Id = Guid.NewGuid(), - One = "1-diff", - Two = "2", - Three = "3" - }; - - var key1 = KeyHelper.GetMessageKey(message1); - var key2 = KeyHelper.GetMessageKey(message2); - - key2.Should().NotBeEquivalentTo(key1); - } - - [Fact] - public void GetMessageKey_MultipleKeyMembersMessagesWithSameKey_KeyIsEqual() - { - var message1 = new MultipleKeyMembersMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2", - Three = "3" - }; - var message2 = new MultipleKeyMembersMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2", - Three = "3-diff" - }; - - var key1 = KeyHelper.GetMessageKey(message1); - var key2 = KeyHelper.GetMessageKey(message2); - - key2.Should().BeEquivalentTo(key1); - } - - [Fact] - public void GetMessageKey_MultipleKeyMembersMessagesWithDifferentKey_KeyIsNotEqual() - { - var message1 = new MultipleKeyMembersMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2", - Three = "3" - }; - var message2 = new MultipleKeyMembersMessage - { - Id = Guid.NewGuid(), - One = "1", - Two = "2-diff", - Three = "3" - }; - - var key1 = KeyHelper.GetMessageKey(message1); - var key2 = KeyHelper.GetMessageKey(message2); - - key2.Should().NotBeEquivalentTo(key1); - } - } -} diff --git a/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/LegacyKeyMemberAttributeMessage.cs b/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/LegacyKeyMemberAttributeMessage.cs new file mode 100644 index 000000000..7ec0cb1bb --- /dev/null +++ b/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/LegacyKeyMemberAttributeMessage.cs @@ -0,0 +1,20 @@ +// Copyright (c) 2019 Sergio Aquilini +// This code is licensed under MIT license (see LICENSE file for details) + +using System; +using Silverback.Messaging.Messages; + +namespace Silverback.Tests.Integration.Kafka.TestTypes.Messages +{ + public class LegacyKeyMemberAttributeMessage : IMessage + { + public Guid Id { get; set; } + + [PartitioningKeyMember] + public string One { get; set; } + + public string Two { get; set; } + + public string Three { get; set; } + } +} \ No newline at end of file diff --git a/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/MultipleFieldsKeyMessage.cs b/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/MultipleFieldsKeyMessage.cs index dd7cf78e0..e0ed8a6bf 100644 --- a/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/MultipleFieldsKeyMessage.cs +++ b/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/MultipleFieldsKeyMessage.cs @@ -9,9 +9,9 @@ namespace Silverback.Tests.Integration.Kafka.TestTypes.Messages public class MultipleKeyMembersMessage : IMessage { public Guid Id { get; set; } - [PartitioningKeyMember] + [KafkaKeyMember] public string One { get; set; } - [PartitioningKeyMember] + [KafkaKeyMember] public string Two { get; set; } public string Three { get; set; } } diff --git a/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/SingleFieldKeyMessage.cs b/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/SingleFieldKeyMessage.cs index 79f756dd6..c22a79398 100644 --- a/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/SingleFieldKeyMessage.cs +++ b/tests/Silverback.Integration.Kafka.Tests/TestTypes/Messages/SingleFieldKeyMessage.cs @@ -10,7 +10,7 @@ public class SingleKeyMemberMessage : IMessage { public Guid Id { get; set; } - [PartitioningKeyMember] + [KafkaKeyMember] public string One { get; set; } public string Two { get; set; }