Skip to content

Commit

Permalink
Improve/simplify generated kafka key
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Nov 20, 2019
1 parent 668ff0e commit c92cc7f
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 261 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<License>MIT</License>
<Copyright>Copyright (c) 2019 Sergio Aquilini</Copyright>
<VersionSuffix></VersionSuffix>
<BaseVersion>1.0.2$(VersionSuffix)</BaseVersion>
<BaseVersion>1.0.3$(VersionSuffix)</BaseVersion>
<ProjectUrl>https://beagle1984.github.io/silverback/</ProjectUrl>
<RepositoryUrl>https://github.com/BEagle1984/silverback/</RepositoryUrl>
<RepositoryType>git</RepositoryType>
Expand Down
4 changes: 2 additions & 2 deletions docs/_data/navigation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ docs:
url: /docs/advanced/serialization
- title: Observable Bus
url: /docs/advanced/rx
- title: Kafka Partitioning
url: /docs/advanced/partitioning
- title: Kafka Message Key (Partitioning)
url: /docs/advanced/kafka-message-key
- title: Using IBroker
url: /docs/advanced/broker
- title: Chunking
Expand Down
12 changes: 11 additions & 1 deletion docs/_docs/0-introduction/003-releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ permalink: /docs/releases
toc: true
---

## [1.0.3](https://github.com/BEagle1984/silverback/releases/tag/1.0.3)

### Fixes
* Kafka message key is not hashed anymore to avoid possible collisions and simplify debugging
* Not really a fix but `PartitioningKeyMemberAttribute` has been deprecated in favor of `KafkaKeyMemberAttribute`, since the message key isn't used just for partitioning (see [Kafka Message Key]({{ site.baseurl }}/docs/advanced/kafka-message-key))

## [1.0.2](https://github.com/BEagle1984/silverback/releases/tag/1.0.2)

### Fixes
Expand Down Expand Up @@ -38,8 +44,12 @@ toc: true
* Some changes in `IInboundMessage` and `IOutboundMessage` interfaces
* Changes to the schema of the outbox table (`Silverback.Messaging.Connectors.Model.OutboundMessage`)
* The configuration fluent API changed quite a bit, refer to the current documentation (e.g. [Using the Bus]({{ site.baseurl }}/docs/quickstart/bus) and [Connecting to a Message Broker]({{ site.baseurl }}/docs/quickstart/message-broker))

**Important!** `WithConnectionTo<KafkaBroker>` has to be replaced with `WithConnectionToKafka` in order for all features to work properly. When failing to do so no message key will be generated, causing the messages to land in a random partition and/or preventing to publish to a compacted topic. (see [Kafka Message Key]({{ site.baseurl }}/docs/advanced/kafka-message-key))
{: .notice--warning}

* `Silverback.Integration.EntityFrameworkCore` and `Silverback.EventSourcing.EntityFrameworkCore` have been deprecated (`Silverback.Core.EntityFrameworkCore` contains all the necessary logic to use EF as store)
* `KeyMemberAttribute` has been renamed to `PartitioningKeyMemberAttribute` (see [Kafka Partitioning]({{ site.baseurl }}/docs/advanced/partitioning))
* `KeyMemberAttribute` has been renamed to `PartitioningKeyMemberAttribute` (see [Kafka Message Key]({{ site.baseurl }}/docs/advanced/kafka-message-key))

## [0.10.0](https://github.com/BEagle1984/silverback/releases/tag/0.10.0)

Expand Down
28 changes: 28 additions & 0 deletions docs/_docs/3-advanced/303-kafka-key.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
title: Kafka Message Key (Partitioning)
permalink: /docs/advanced/kafka-message-key
toc: true
---

Apache Kafka require a message key for different purposes, such as:
* **Partitioning**: Kafka can guarantee ordering only inside the same partition and it is therefore important to be able to route correlated messages into the same partition. To do so you need to specify a key for each message and Kafka will put all messages with the same key in the same partition.
* **Compacting topics**: A topic can be configured with `cleanup.policy=compact` to instruct Kafka to keep only the latest message related to a certain object, identified by the message key. In other words Kafka will retain only 1 message per each key value.

Silverback offers a convenient way to specify the message key. It is enough to decorate the properties that must be part of the key with `KafkaKeyMemberAttribute`.

```c#
public class MultipleKeyMembersMessage : IIntegrationMessage
{
public Guid Id { get; set; }

[KafkaKeyMember]
public string One { get; set; }

[KafkaKeyMember]
public string Two { get; set; }

public string Three { get; set; }
}
```

If no key members are specified no key will be generated and the messages will land in a random partition.
28 changes: 0 additions & 28 deletions docs/_docs/3-advanced/303-partitioning.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Silverback.Examples.Common.Messages
{
public class PartitionedSimpleIntegrationEvent : IntegrationEvent
{
[PartitioningKeyMember]
[KafkaKeyMember]
public string Key { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ namespace Silverback.Messaging.Behaviors
{
public class KafkaPartitioningKeyBehavior : ISortedBehavior
{
private static readonly HashAlgorithm HashAlgorithm = MD5.Create();

public KafkaPartitioningKeyBehavior()
{
}

public int SortIndex { get; } = 200;

public Task<IEnumerable<object>> Handle(IEnumerable<object> messages, MessagesHandler next)
Expand All @@ -31,14 +25,12 @@ public Task<IEnumerable<object>> Handle(IEnumerable<object> messages, MessagesHa

private void SetPartitioningKey(IOutboundMessage outboundMessage)
{
var key = KeyHelper.GetMessageKey(outboundMessage.Content);
var key = KafkaKeyHelper.GetMessageKey(outboundMessage.Content);

if (key == null)
return;

var keyHash = Convert.ToBase64String(HashAlgorithm.ComputeHash(key));

outboundMessage.Headers.AddOrReplace(KafkaProducer.PartitioningKeyHeaderKey, keyHash);
outboundMessage.Headers.AddOrReplace(KafkaProducer.PartitioningKeyHeaderKey, key);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Silverback.Messaging.Messages;
Expand Down Expand Up @@ -76,7 +77,7 @@ private byte[] GetPartitioningKey(IEnumerable<MessageHeader> headers)

return headerValue == null
? null
: Convert.FromBase64String(headerValue);
: Encoding.UTF8.GetBytes(headerValue);
}

private Confluent.Kafka.IProducer<byte[], byte[]> GetInnerProducer() =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2019 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

using System;

namespace Silverback.Messaging.Messages
{
/// <summary>
/// The properties decorated with this attribute will be used
/// to build the message key that will used by Kafka
/// (for partitioning, compacting, etc.).
/// </summary>
[AttributeUsage(AttributeTargets.Property)]
public class KafkaKeyMemberAttribute : Attribute
{
}
}
28 changes: 16 additions & 12 deletions src/Silverback.Integration.Kafka/Messaging/Messages/KeyHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,31 @@
// This code is licensed under MIT license (see LICENSE file for details)

using System.Linq;
using System.Text;
using Newtonsoft.Json;

namespace Silverback.Messaging.Messages
{
/// <summary>
/// Helps to retrieve the message properties with KeyMember attribute for creating a key.
/// </summary>
internal static class KeyHelper
internal static class KafkaKeyHelper
{
public static byte[] GetMessageKey(object message)
public static string GetMessageKey(object message)
{
var keysDictionary =
message.GetType()
.GetProperties()
.Where(p => p.IsDefined(typeof(PartitioningKeyMemberAttribute), true))
.ToDictionary(p => p.Name, p => p.GetValue(message, null));
.Where(p => p.IsDefined(typeof(KafkaKeyMemberAttribute), true) ||
p.IsDefined(typeof(PartitioningKeyMemberAttribute), true))
.Select(p => new
{
p.Name,
Value = p.GetValue(message, null).ToString()
})
.ToArray();

return keysDictionary.Count > 0
? Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(keysDictionary))
: null;
if (!keysDictionary.Any())
return null;

return keysDictionary.Length == 1
? keysDictionary.First().Value :
string.Join(",", keysDictionary.Select(p => $"{p.Name}={p.Value}"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Silverback.Messaging.Messages
/// to build a key that will determine the destination partition on Kafka.
/// </summary>
[AttributeUsage(AttributeTargets.Property)]
[Obsolete("PartitioningKeyMemberAttribute is deprecated, use KafkaKeyMemberAttribute instead.")]
public class PartitioningKeyMemberAttribute : Attribute
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void Handle_NoKeyMembersMessage_KeyHeaderIsNotSet()
}

[Fact]
public void Handle_SingleKeyMemberMessagesWithSameKey_SameKeyHeaderIsSet()
public void Handle_SingleKeyMemberMessages_KeyHeaderIsSet()
{
var message1 = new OutboundMessage<SingleKeyMemberMessage>(
new SingleKeyMemberMessage
Expand All @@ -51,55 +51,23 @@ public void Handle_SingleKeyMemberMessagesWithSameKey_SameKeyHeaderIsSet()
new SingleKeyMemberMessage
{
Id = Guid.NewGuid(),
One = "1",
Two = "2-diff",
Three = "3-diff"
One = "a",
Two = "b",
Three = "c"
},
null,
new KafkaProducerEndpoint("test-endpoint"));

new KafkaPartitioningKeyBehavior().Handle(new[] {message1, message2}, Task.FromResult);

message1.Headers.Should().ContainEquivalentOf(
new MessageHeader("x-kafka-partitioning-key", "Zylv21PxiCCwBS+PDYZZAQ=="));
message1.Headers.Should().BeEquivalentTo(message2.Headers);
}

[Fact]
public void Handle_SingleKeyMemberMessagesWithDifferentKey_DifferentKeyHeadersAreSet()
{
var message1 = new OutboundMessage<SingleKeyMemberMessage>(
new SingleKeyMemberMessage
{
Id = Guid.NewGuid(),
One = "1",
Two = "2",
Three = "3"
},
null,
new KafkaProducerEndpoint("test-endpoint"));
var message2 = new OutboundMessage<SingleKeyMemberMessage>(
new SingleKeyMemberMessage
{
Id = Guid.NewGuid(),
One = "1-diff",
Two = "2",
Three = "3"
},
null,
new KafkaProducerEndpoint("test-endpoint"));

new KafkaPartitioningKeyBehavior().Handle(new[] { message1, message2 }, Task.FromResult);

message1.Headers.Should().ContainEquivalentOf(
new MessageHeader("x-kafka-partitioning-key", "Zylv21PxiCCwBS+PDYZZAQ=="));
new MessageHeader("x-kafka-partitioning-key", "1"));
message2.Headers.Should().ContainEquivalentOf(
new MessageHeader("x-kafka-partitioning-key", "pPCpQ+48gFjvHFaQQIiR8w=="));
message1.Headers.Should().NotBeEquivalentTo(message2.Headers);
new MessageHeader("x-kafka-partitioning-key", "a"));
}

[Fact]
public void Handle_MultipleKeyMembersMessagesWithSameKey_SameKeyHeaderIsSet()
public void Handle_MultipleKeyMembersMessages_KeyHeaderIsSet()
{
var message1 = new OutboundMessage<MultipleKeyMembersMessage>(
new MultipleKeyMembersMessage
Expand All @@ -115,51 +83,19 @@ public void Handle_MultipleKeyMembersMessagesWithSameKey_SameKeyHeaderIsSet()
new MultipleKeyMembersMessage
{
Id = Guid.NewGuid(),
One = "1",
Two = "2",
Three = "3-diff"
},
null,
new KafkaProducerEndpoint("test-endpoint"));

new KafkaPartitioningKeyBehavior().Handle(new[] { message1, message2 }, Task.FromResult);

message1.Headers.Should().ContainEquivalentOf(
new MessageHeader("x-kafka-partitioning-key", "e/faTzFR/Ey+iSN44gESAg=="));
message1.Headers.Should().BeEquivalentTo(message2.Headers);
}

[Fact]
public void Handle_MultipleKeyMembersMessagesWithDifferentKey_DifferentKeyHeadersAreSet()
{
var message1 = new OutboundMessage<MultipleKeyMembersMessage>(
new MultipleKeyMembersMessage
{
Id = Guid.NewGuid(),
One = "1",
Two = "2",
Three = "3"
},
null,
new KafkaProducerEndpoint("test-endpoint"));
var message2 = new OutboundMessage<MultipleKeyMembersMessage>(
new MultipleKeyMembersMessage
{
Id = Guid.NewGuid(),
One = "1",
Two = "2-diff",
Three = "3"
One = "a",
Two = "b",
Three = "c"
},
null,
new KafkaProducerEndpoint("test-endpoint"));

new KafkaPartitioningKeyBehavior().Handle(new[] { message1, message2 }, Task.FromResult);

message1.Headers.Should().ContainEquivalentOf(
new MessageHeader("x-kafka-partitioning-key", "e/faTzFR/Ey+iSN44gESAg=="));
new MessageHeader("x-kafka-partitioning-key", "One=1,Two=2"));
message2.Headers.Should().ContainEquivalentOf(
new MessageHeader("x-kafka-partitioning-key", "mtNLKV41qmbmRzbizV9QrA=="));
message1.Headers.Should().NotBeEquivalentTo(message2.Headers);
new MessageHeader("x-kafka-partitioning-key", "One=a,Two=b"));
}
}
}
Loading

0 comments on commit c92cc7f

Please sign in to comment.