Skip to content

Commit

Permalink
Implemented IConsumerBehavior/IProducerBehavior and moved tracing into
Browse files Browse the repository at this point in the history
ActivityConsumerBehavior/ActivityProducerBehavior
  • Loading branch information
BEagle1984 committed Dec 26, 2019
1 parent d942ec2 commit b39a4ab
Show file tree
Hide file tree
Showing 79 changed files with 1,256 additions and 961 deletions.
2 changes: 2 additions & 0 deletions docs/_data/navigation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ docs:
url: /docs/advanced/message-keys
- title: IInboundMessage
url: /docs/advanced/iinboundmessage
- title: Default Message Headers
url: /docs/advanced/headers
- title: Extras
children:
- title: Distributed Background Services
Expand Down
4 changes: 4 additions & 0 deletions docs/_docs/0-introduction/002-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ You can configuratively specify the error handling policies for each inbound con
* Move: move the message to another topic/queue (or re-enqueue it at the end of the same one)
We believe that combining this three policies you will be able to implement pretty much all use cases.

## Distributed tracing

Silverback integrates with `System.Diagnostics` to ensure the entire flow can easily be traced, also when involving a message broker.

## Modularity

Silverback is modular and shipped in multiple nuget packages to allow you to depend only on the parts you want to use.
2 changes: 1 addition & 1 deletion docs/_docs/0-introduction/003-releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ toc: true

### What's new
* Added `IEndpointsConfigurator` interface to allow splitting the endpoints configuration across multiple types (see [Connecting to a Message Broker]({{ site.baseurl }}/docs/quickstart/message-broker#using-iendpointsconfigurator))
* Added support for distributed tracing (based on standard [System.Diagnostics.DiagnosticSource](https://www.nuget.org/packages/System.Diagnostics.DiagnosticSource/))
* Added support for distributed tracing (based on [System.Diagnostics](https://docs.microsoft.com/en-us/dotnet/api/system.diagnostics.activity?view=netcore-3.1))
* Added `IProducerBehavior` and `IConsumerBehavior` to create an extension point closer to the actual message broker logic (see [Behaviors]({{ site.baseurl }}/docs/quickstart/behaviors))

### Breaking Changes
Expand Down
10 changes: 8 additions & 2 deletions docs/_docs/1-quickstart/107-behaviors.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void ConfigureServices(IServiceCollection services)

At every call to `IPublisher.Publish` the `Handle` method of each registered behavior is called, passing in the array of messages and the delegate to the next step in the pipeline. This gives you the flexibility to execute any sort of code before and after the messages have been actually published (before or after calling the `next()` step). You can for example modify the messages before publishing them, validate them (like in the above example), add some logging / tracing, etc.

### Example: Message headers
### Example1: Modifying outbound message headers

The behaviors can be quite useful to get and set the message headers for inbound/outbound messages.

Expand All @@ -70,6 +70,9 @@ public class CustomHeadersBehavior : IBehavior
}
}
```

### Example2: Logging inbound message headers

```c#
public class LogHeadersBehavior : IBehavior
{
Expand Down Expand Up @@ -105,4 +108,7 @@ public class LogHeadersBehavior : IBehavior

## IProducerBehavior and IConsumerBehavior

The `IProducerBehavior` and `IConsumerBehavior` are similar to the `IBehavior` but work at a lower level, much closer to the message broker. You should be able to accomplish most tasks with the normal `IBehavior`.
The `IProducerBehavior` and `IConsumerBehavior` are similar to the `IBehavior` but work at a lower level, much closer to the message broker. You should be able to accomplish most tasks with the normal `IBehavior`.

**Note:** Because of the way the Silverback's broker integration works `IProducerBehavior` and `IConsumerBehavior` implementations can only be registered as singleton.
{: .notice--info}
14 changes: 1 addition & 13 deletions docs/_docs/3-advanced/301-serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,7 @@ The deserializer function provided by `JsonMessageSerializer` will obviously try

This is the suggested serialization strategy when both producer and consumer are based on Silverback but may not be ideal for interoperability.

### Message Headers

Here is the list of headers that Silverback may add to the produced messages, depending on the scenario:

Header | Optional | Description
:-- | :-: | :--
`x-message-id` | (yes) | A unique identifier that may be useful for tracing. It may not be present if the produced message isn't implementing `IIntegrationMessage` and no `Id` or `MessageId` property of a supported type is defined.
`x-message-type` | no | The assembly qualified name of the message type.
`x-failed-attempts` | yes | If an exception if thrown the failed attempts will be incremented and stored as header. This is necessary for the [error policies]({{ site.baseurl }}/docs/configuration/inbound#error-handling) to work.
`x-chunk-id` | yes | The unique id of the message chunk, used when [chunking]({{ site.baseurl }}/docs/advanced/chunking) is enabled.
`x-chunks-count` | yes | The total number of chunks the message was splitted into, used when [chunking]({{ site.baseurl }}/docs/advanced/chunking) is enabled.
`x-batch-id` | yes | The unique id assigned to the messages batch, used mostly for tracing, when [batch processing]({{ site.baseurl }}/docs/configuration/inbound#batch-processing) is enabled.
`x-batch-size` | yes | The total number of messages in the batch, used mostly for tracing, when [batch processing]({{ site.baseurl }}/docs/configuration/inbound#batch-processing) is enabled.
Have a look at the [Default Message Headers]({{ site.baseurl }}/docs/advanced/headers) section for an overview on the headers that are appended to the messages.

## Typed JsonMessageSerializer

Expand Down
24 changes: 24 additions & 0 deletions docs/_docs/3-advanced/350-default-headers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
title: Default Message Headers
permalink: /docs/advanced/headers

toc: false
---

Silverback will add some headers to the produced messages. They may vary depending on the scenario.
Here is the list of the default headers that may be sent.

Header | Optional | Description
:-- | :-: | :--
`x-message-id` | yes<sup>1</sup> | A unique identifier that may be useful for tracing. It may not be present if the produced message isn't implementing `IIntegrationMessage` and no `Id` or `MessageId` property of a supported type is defined.
`x-message-type` | no | The assembly qualified name of the message type.
`x-failed-attempts` | yes | If an exception if thrown the failed attempts will be incremented and stored as header. This is necessary for the [error policies]({{ site.baseurl }}/docs/configuration/inbound#error-handling) to work.
`x-chunk-id` | yes | The unique id of the message chunk, used when [chunking]({{ site.baseurl }}/docs/advanced/chunking) is enabled.
`x-chunks-count` | yes | The total number of chunks the message was splitted into, used when [chunking]({{ site.baseurl }}/docs/advanced/chunking) is enabled.
`x-batch-id` | yes | The unique id assigned to the messages batch, used mostly for tracing, when [batch processing]({{ site.baseurl }}/docs/configuration/inbound#batch-processing) is enabled.
`x-batch-size` | yes | The total number of messages in the batch, used mostly for tracing, when [batch processing]({{ site.baseurl }}/docs/configuration/inbound#batch-processing) is enabled.
`traceparent` | no | The current `Activity.Id`, used by the `IConsumer` implementation to set the `Activity.ParentId`, thus enabling distributed tracing across the message broker. Note that an `Activity` is automatically started by the default `IProducer` implementation. See [System.Diagnostics documentation](https://docs.microsoft.com/en-us/dotnet/api/system.diagnostics.activity?view=netcore-3.1) for details about `Activity` and distributed tracing in asp.net core and [W3C Trace Context proposal](https://www.w3.org/TR/trace-context-1) for details about the headers.
`tracestate` | yes | The `Activity.TraceStateString`. See also the [W3C Trace Context proposal](https://www.w3.org/TR/trace-context-1) for details.
`tracebaggage` | yes | The string representation of the `Activity.Baggage` dictionary. See [System.Diagnostics documentation](https://docs.microsoft.com/en-us/dotnet/api/system.diagnostics.activity?view=netcore-3.1) for details.

_<sup>1</sup> The `x-message-id` header is always set for_
2 changes: 1 addition & 1 deletion src/Silverback.Core/Messaging/Publishing/IBehavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface IBehavior
/// Process, handles or transforms the messages being published to the internal bus.
/// </summary>
/// <param name="messages">The messages being published.</param>
/// <param name="next">The next handler down the pipeline.</param>
/// <param name="next">The next behavior in the pipeline.</param>
/// <returns>The actual messages to be published.</returns>
Task<IEnumerable<object>> Handle(IEnumerable<object> messages, MessagesHandler next);
}
Expand Down
6 changes: 3 additions & 3 deletions src/Silverback.Core/Messaging/Publishing/Publisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ private async Task<IEnumerable<object>> Publish(IEnumerable<object> messages, bo

private Task<IEnumerable<object>> ExecutePipeline(IEnumerable<IBehavior> behaviors, IEnumerable<object> messages, Func<IEnumerable<object>, Task<IEnumerable<object>>> finalAction)
{
if (behaviors == null || !behaviors.Any())
return finalAction(messages);
if (behaviors != null && behaviors.Any())
return behaviors.First().Handle(messages, m => ExecutePipeline(behaviors.Skip(1), m, finalAction));

return behaviors.First().Handle(messages, m => ExecutePipeline(behaviors.Skip(1), m, finalAction));
return finalAction(messages);
}

private Task<IEnumerable<object>> InvokeExclusiveMethods(IEnumerable<object> messages, bool executeAsync) =>
Expand Down
2 changes: 1 addition & 1 deletion src/Silverback.Core/Util/EnumerableOfTypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal static class EnumerableOfTypeExtensions
public static IEnumerable<object> OfType(this IEnumerable<object> source, Type type) =>
typeof(Enumerable)
.GetMethod("OfType", BindingFlags.Static | BindingFlags.Public)
.MakeGenericMethod(type)
?.MakeGenericMethod(type)
.Invoke(null, new object[] {source})
as IEnumerable<object> ?? Enumerable.Empty<object>();
}
Expand Down
2 changes: 2 additions & 0 deletions src/Silverback.Core/Util/EnumerableSelectExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -24,6 +25,7 @@ public static IEnumerable<TResult> ParallelSelect<T, TResult>(this IEnumerable<T
}

// http://blog.briandrupieski.com/throttling-asynchronous-methods-in-csharp
[SuppressMessage("ReSharper", "AccessToDisposedClosure")]
public static async Task<IEnumerable<TResult>> ParallelSelectAsync<T, TResult>(this IEnumerable<T> source,
Func<T, Task<TResult>> selector, int? maxDegreeOfParallelism = null)
{
Expand Down
5 changes: 3 additions & 2 deletions src/Silverback.Core/Util/EnumerableSortExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ internal static class EnumerableSortExtensions
{
public static IEnumerable<T> SortBySortIndex<T>(this IEnumerable<T> items)
{
var sorted = items.OfType<ISorted>().OrderBy(b => b.SortIndex).ToList();
var unsortable = items.Where(b => !(b is ISorted)).ToList();
var list = items.ToList();
var sorted = list.OfType<ISorted>().OrderBy(b => b.SortIndex).ToList();
var unsortable = list.Where(b => !(b is ISorted)).ToList();

return sorted
.Where(b => b.SortIndex <= 0).Cast<T>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ public class InMemoryBroker : Broker<IEndpoint>
private readonly MessageLogger _messageLogger;
private readonly ConcurrentDictionary<string, InMemoryTopic> _topics = new ConcurrentDictionary<string, InMemoryTopic>();

public InMemoryBroker(MessageKeyProvider messageKeyProvider, ILoggerFactory loggerFactory, MessageLogger messageLogger) : base(loggerFactory)
public InMemoryBroker(
MessageKeyProvider messageKeyProvider,
IEnumerable<IBrokerBehavior> behaviors,
ILoggerFactory loggerFactory,
MessageLogger messageLogger)
: base(behaviors, loggerFactory)
{
_messageKeyProvider = messageKeyProvider;
_messageLogger = messageLogger;
Expand All @@ -23,11 +28,17 @@ public InMemoryBroker(MessageKeyProvider messageKeyProvider, ILoggerFactory logg
internal InMemoryTopic GetTopic(string name) =>
_topics.GetOrAdd(name, _ => new InMemoryTopic(name));

protected override Producer InstantiateProducer(IEndpoint endpoint) =>
new InMemoryProducer(this, endpoint, _messageKeyProvider, LoggerFactory.CreateLogger<InMemoryProducer>(), _messageLogger);

protected override Consumer InstantiateConsumer(IEndpoint endpoint) =>
GetTopic(endpoint.Name).Subscribe(new InMemoryConsumer(this, endpoint));
protected override Producer InstantiateProducer(IEndpoint endpoint, IEnumerable<IProducerBehavior> behaviors) =>
new InMemoryProducer(
this,
endpoint,
_messageKeyProvider,
behaviors,
LoggerFactory.CreateLogger<InMemoryProducer>(),
_messageLogger);

protected override Consumer InstantiateConsumer(IEndpoint endpoint, IEnumerable<IConsumerBehavior> behaviors) =>
GetTopic(endpoint.Name).Subscribe(new InMemoryConsumer(this, endpoint, behaviors));

protected override void Connect(IEnumerable<IConsumer> consumers)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace Silverback.Messaging.Broker
{
public class InMemoryConsumer : Consumer<InMemoryBroker, IEndpoint>
{
public InMemoryConsumer(IBroker broker, IEndpoint endpoint) : base(broker, endpoint)
public InMemoryConsumer(IBroker broker, IEndpoint endpoint, IEnumerable<IConsumerBehavior> behaviors)
: base(broker, endpoint, behaviors)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@ namespace Silverback.Messaging.Broker
{
public class InMemoryProducer : Producer<InMemoryBroker, IEndpoint>
{
public InMemoryProducer(IBroker broker, IEndpoint endpoint, MessageKeyProvider messageKeyProvider,
ILogger<Producer> logger, MessageLogger messageLogger)
: base(broker, endpoint, messageKeyProvider, logger, messageLogger)
public InMemoryProducer(
IBroker broker,
IEndpoint endpoint,
MessageKeyProvider messageKeyProvider,
IEnumerable<IProducerBehavior> behaviors,
ILogger<Producer> logger,
MessageLogger messageLogger)
: base(broker, endpoint, messageKeyProvider, behaviors, logger, messageLogger)
{
}

protected override IOffset Produce(byte[] serializedMessage, IEnumerable<MessageHeader> headers) =>
Broker.GetTopic(Endpoint.Name).Publish(serializedMessage, headers);
protected override IOffset Produce(RawBrokerMessage message) =>
Broker.GetTopic(Endpoint.Name).Publish(message.RawContent, message.Headers);

protected override Task<IOffset> ProduceAsync(byte[] serializedMessage, IEnumerable<MessageHeader> headers) =>
Broker.GetTopic(Endpoint.Name).PublishAsync(serializedMessage, headers);
protected override Task<IOffset> ProduceAsync(RawBrokerMessage message) =>
Broker.GetTopic(Endpoint.Name).PublishAsync(message.RawContent, message.Headers);
}
}
26 changes: 20 additions & 6 deletions src/Silverback.Integration.Kafka/Messaging/Broker/KafkaBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,33 @@ public class KafkaBroker : Broker<KafkaEndpoint>
private readonly ILoggerFactory _loggerFactory;
private readonly MessageLogger _messageLogger;

public KafkaBroker(MessageKeyProvider messageKeyProvider, ILoggerFactory loggerFactory,
MessageLogger messageLogger) : base(loggerFactory)
public KafkaBroker(
MessageKeyProvider messageKeyProvider,
IEnumerable<IBrokerBehavior> behaviors,
ILoggerFactory loggerFactory,
MessageLogger messageLogger)
: base(behaviors, loggerFactory)
{
_messageKeyProvider = messageKeyProvider;
_loggerFactory = loggerFactory;
_messageLogger = messageLogger;
}

protected override Producer InstantiateProducer(IEndpoint endpoint) =>
new KafkaProducer(this, (KafkaProducerEndpoint) endpoint, _messageKeyProvider, _loggerFactory.CreateLogger<KafkaProducer>(), _messageLogger);
protected override Producer InstantiateProducer(IEndpoint endpoint, IEnumerable<IProducerBehavior> behaviors) =>
new KafkaProducer(
this,
(KafkaProducerEndpoint) endpoint,
_messageKeyProvider,
behaviors,
_loggerFactory.CreateLogger<KafkaProducer>(),
_messageLogger);

protected override Consumer InstantiateConsumer(IEndpoint endpoint) =>
new KafkaConsumer(this, (KafkaConsumerEndpoint) endpoint, _loggerFactory.CreateLogger<KafkaConsumer>());
protected override Consumer InstantiateConsumer(IEndpoint endpoint, IEnumerable<IConsumerBehavior> behaviors) =>
new KafkaConsumer(
this,
(KafkaConsumerEndpoint) endpoint,
behaviors,
_loggerFactory.CreateLogger<KafkaConsumer>());

protected override void Connect(IEnumerable<IConsumer> consumers) =>
consumers.Cast<KafkaConsumer>().ToList().ForEach(c => c.Connect());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Silverback.Diagnostics;
using Silverback.Messaging.Messages;

namespace Silverback.Messaging.Broker
{
public class KafkaConsumer : DiagnosticsConsumer<KafkaBroker, KafkaConsumerEndpoint>, IDisposable
public class KafkaConsumer : Consumer<KafkaBroker, KafkaConsumerEndpoint>, IDisposable
{
private readonly ILogger<KafkaConsumer> _logger;

private InnerConsumerWrapper _innerConsumer;
private int _messagesSinceCommit;

public KafkaConsumer(IBroker broker, KafkaConsumerEndpoint endpoint, ILogger<KafkaConsumer> logger)
: base(broker, endpoint)
public KafkaConsumer(
IBroker broker,
KafkaConsumerEndpoint endpoint,
IEnumerable<IConsumerBehavior> behaviors,
ILogger<KafkaConsumer> logger)
: base(broker, endpoint, behaviors)
{
_logger = logger;

Expand Down
Loading

0 comments on commit b39a4ab

Please sign in to comment.