Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for Async Message Mapper Pipelines #2936

Merged
merged 18 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 144 additions & 132 deletions .idea/.idea.Brighter/.idea/httpRequests/http-requests-log.http

Large diffs are not rendered by default.

23 changes: 13 additions & 10 deletions docs/adr/0002-use-a-single-threaded-message-pump.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,36 @@ Accepted

Any service activator pattern will have a message pump, which reads from a queue.

There are different strategies we could use, a common one for example is to use a BlockingCollection to hold messages read from the queue, and then use threads from the threadpool to process those messages.
There are different strategies we could use, a common one for example is to use a BlockingCollection to hold messages read from the queue, and then use threads from the threadpool to process those messages. However, a multi-threaded pump has the issue that it will de-order an
otherwise ordered queue, as the threads will pull items from the blocking collection in parallel, not sequentially. In addition, where we have multiple threads it becomes difficult to create resources used by the pump without protecting them from race conditions.

However, a multi-threaded pump has the issue that it will de-order an otherwise ordered queue, as the threads will pull items from the blocking collection in parallel, not sequentially.
The other option would be to use the thread pool to service requests, creating a thread for each incoming message. This would not scale, as we would quickly run out of threads in the pool. To avoid this issue, solutions that rely on the thread pool typically have to govern
the number of thread pool threads that can be used for concurrent requests. The problem becomes that at scale the semaphore that governs the number of threads becomes a bottleneck.

In addition, where we have multiple threads it becomes difficult to create resources used by the pump without protecting them from race conditions.
The alternative to these multi-threaded approaches is to use a single-threaded message pump that reads from the queue, processes the message, and only when it has processed that message, processes the next item. This prevents de-ordering of the queue, because items are read in sequence.

The alternative is to use a single-threaded message pump that reads from the queue, processes the message, and only when it has processed that message, processes the next item. This prevents de-ordering of the queue, because items are read in sequence.

If a higher throughput is desired with a single threaded pump, then you can create multiple pumps. In essence, this is the competing consumers pattern, each performer is its own message pump.
This approach is the [Reactor](https://en.wikipedia.org/wiki/Reactor_pattern) pattern. The Reactor pattern uses a single thread to read from the queue, and then dispatches the message to a handler. If a higher throughput is desired with
a single threaded pump, then you can create multiple pumps. In essence, this is the competing consumers pattern, each performer is its own message pump. To make the Reactor pattern more performant, we can choose not to block on I/O by using asynchronous handlers.
This is the [Proactor](https://en.wikipedia.org/wiki/Proactor_pattern) pattern. Brighter provides a SynchronizationContext so that asynchronous handlers can be used, and the message pump will not block on I/O, whilst still prserving ordering.

The message pump performs the usual sequence of actions:

- GetMessage. Read Message From Queue
- Translate Message. Translate Message from Wire Format to Type
- Dispatch Message. Dispatch Message based on Type

Prior art for this is the Windows Event Loop which uses this approach, and is used by COM for integration via the Single-Threaded Apartment model.
Prior art for this is the Windows Event Loop which uses this approach, and is used by COM for integration via the Single-Threaded Apartment model.


## Decision

Use a single-threaded message pump to preserve ordering and ensure sequential access to shared resources. Allow multiple pump instances for throughput.
Use a single-threaded message pump to preserve ordering and ensure sequential access to shared resources. Allow multiple pump instances for throughput. Allow asynchronous handlers to prevent blocking on I/O.

## Consequences

This makes an async model harder, as it relies on a sequential processing strategy, which is the opposite of an async strategy which would allow a handler to yield when it was doing I/O, allowing another message to be consumed. Because this implicitly de-orders messages, this is not compatible with this approach.
This makes an async model harder, as it relies on a sequential processing strategy which implies that we must use the message pump thread for callbacks. This is the opposite of an async strategy that uses the thread pool for callbacks, which would prevent queueing of callbacks at the cost
of potentially running those callbacks out of order.

it may imply that we should consider having a 'pluggable' pump that can use different strategies, asynchronous where you do not require ordering, and synchronous where you do.
It may imply that we should consider having a 'pluggable' pump that can use different strategies, asynchronous where you do not require ordering, and synchronous where you do.


83 changes: 83 additions & 0 deletions docs/adr/0005-support-async-pipelines.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# 1. Record architecture decisions

Date: 2019-08-01

## Status

Accepted

## Context

Give we have decided to use a reactor pattern (see [Single Threaded Message Pump](0002-use-a-single-threaded-message-pump.md),
we need to decide how to support asynchronous pipelines. There are three requirements:

* We need to be able to support asynchronous handlers
* We need to be able to support asynchronous message mappers
* We need to provide our own synchronization context so that the thread on which callbacks are invoked is the message pump thread.

Why do handlers need to be asynchronous? Because they may need to perform I/O, such as talking to a database or a web service,
and we do not want to block the message pump thread.

Why do message mappers need to be asynchronous? Because they may need to perform I/O, such as talking to a schema registry or
using a Claim Check transform and we do not want to block the message pump thread.

Why do we need to provide our own synchronization context? Because we want to ensure that the thread on which callbacks are invoked
is the message pump thread. This is important because we want to ensure that the message pump thread is not blocked by I/O.

## Decision

### Synchronization Context
Implement a custom synchronization context that will invoke callbacks on the message pump thread. The synchronization context
is based on the [Stephen Toub Single Threaded Synchronization Context](https://devblogs.microsoft.com/pfxteam/await-synchronizationcontext-and-console-apps/),
and will run callbacks in order on the message pump thread.

### Asynchronous Handlers
Asynchronous handlers implement IHandleRequestsAsync<T> and return a Task. The message pump will await the task. Any callback after
the code returns from the handler will be invoked on the thread that invoked the handler.

#### Homogenous Handler Pipelines

A handler pipeline needs to be all asynchronous or all synchronous. If a handler pipeline contains both synchronous and asynchronous
then we would be forced to either block asynchronous handlers on a synchronous pipeline, or invoke synchronous handlers on a thread
other than the message pump thread.

### Asynchronous Message Mappers
Asynchronous message mappers implement IAmAMessageMapperAsync<T> and return a Task. The message pump will await the task. Any callback after
the code returns from the handler will be invoked on the thread that invoked the handler.

A message mapper pipeline terminates with an IAmAMessageMapper<T> or an IAmAMessageMapperAsync<T>. A pipeline may have transforms that
implement IAmAMessageTransform<T> or IAmAMessageTransformAsync<T>. (We do not use the message mapper interface for the pipeline,
by contrast to handlers, because whereas each handler step attempts to handle the message a transform is either wrapping a message built
by a message mapper or unwrapping a message to be passed to a message mapper.)

#### Homogenous Message Mapper Pipelines

A message mapper pipeline needs to be all asynchronous or all synchronous. If a message mapper pipeline contains both synchronous and asynchronous
then we would be forced to either block asynchronous message mappers on a synchronous pipeline, or invoke synchronous message mappers on a thread
other than the message pump thread.

### Posting a Message

When posting a message we assume that the async command processor Post methods will be for asynchronous message mappers and we search
for registered asynchonous message mappers before searching for synchronous message mappers. If the message mapper is synchronous then
we wrap it in a TaskCompletionSource and return the Task.

When posting a message we assume that the sync command processor Post methods will be for synchronous message mappers and we search
for registered synchronous message mappers before searching for asynchronous message mappers. If the message mapper is asynchronous then
we block on the Task.

### Receiving a Message

When receiving a message we assume that you will use an async message pump for an asynchronous message mapper and we search for registered
asynchronous message mappers before searching for synchronous message mappers. If the message mapper is synchronous then we wrap it in a TaskCompletionSource
and return the Task. We then use the synchronization context to ensure that the callback is invoked on the message pump thread.

When receiving a message we assume that you will use a sync message pump for a synchronous message mapper and we search for registered synchronous
message mappers before searching for asynchronous message mappers. If the message mapper is asynchronous then we block on the Task.

## Consequences

* We can support asynchronous handlers
* We can support asynchronous message mappers
* We provide our own synchronization context so that the thread on which callbacks are invoked is the message pump thread.

Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ THE SOFTWARE. */
#endregion

using System;
using System.Threading;
using System.Threading.Tasks;
using Greetings.Ports.Commands;
using Paramore.Brighter;

namespace Greetings.Ports.CommandHandlers
{
public class GreetingEventHandler : RequestHandler<GreetingEvent>
public class GreetingEventHandlerAsync : RequestHandlerAsync<GreetingEvent>
{
public override GreetingEvent Handle(GreetingEvent @event)
public override async Task<GreetingEvent> HandleAsync(GreetingEvent @event, CancellationToken cancellationToken = default)
{
Console.WriteLine("Received Greeting. Message Follows");
Console.WriteLine("----------------------------------");
Console.WriteLine(@event.Greeting);
Console.WriteLine("----------------------------------");
Console.WriteLine("Message Ends");
return base.Handle(@event);
return await base.HandleAsync(@event, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,48 +23,46 @@ THE SOFTWARE. */
#endregion

using System.Net.Mime;
using System.Threading.Tasks;
using Greetings.Ports.Commands;
using Confluent.Kafka;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using Paramore.Brighter;
using Paramore.Brighter.MessagingGateway.Kafka;

namespace Greetings.Ports.Mappers
{
public class GreetingEventMessageMapper : IAmAMessageMapper<GreetingEvent>
public class GreetingEventMessageMapperAsync(ISchemaRegistryClient schemaRegistryClient)
: IAmAMessageMapperAsync<GreetingEvent>
{
private readonly ISchemaRegistryClient _schemaRegistryClient;
private readonly string _partitionKey = "KafkaTestQueueExample_Partition_One";
private SerializationContext _serializationContext;
private readonly SerializationContext _serializationContext = new(MessageComponentType.Value, Topic);
private const string Topic = "greeting.event";

public GreetingEventMessageMapper(ISchemaRegistryClient schemaRegistryClient)
{
_schemaRegistryClient = schemaRegistryClient;
//We care about ensuring that we serialize the body using the Confluent tooling, as it registers and validates schema
_serializationContext = new SerializationContext(MessageComponentType.Value, Topic);
}

public Message MapToMessage(GreetingEvent request)
public async Task<Message> MapToMessage(GreetingEvent request)
{
var header = new MessageHeader(messageId: request.Id, topic: Topic, messageType: MessageType.MT_EVENT);
//This uses the Confluent JSON serializer, which wraps Newtonsoft but also performs schema registration and validation
var serializer = new JsonSerializer<GreetingEvent>(_schemaRegistryClient, ConfluentJsonSerializationConfig.SerdesJsonSerializerConfig(), ConfluentJsonSerializationConfig.NJsonSchemaGeneratorSettings()).AsSyncOverAsync();
var s = serializer.Serialize(request, _serializationContext);
var serializer = new JsonSerializer<GreetingEvent>(
schemaRegistryClient,
ConfluentJsonSerializationConfig.SerdesJsonSerializerConfig(),
ConfluentJsonSerializationConfig.NJsonSchemaGeneratorSettings()
);

var s = await serializer.SerializeAsync(request, _serializationContext);
var body = new MessageBody(s, MediaTypeNames.Application.Octet, CharacterEncoding.Raw);
header.PartitionKey = _partitionKey;

var message = new Message(header, body);
return message;
return new Message(header, body);
}

public GreetingEvent MapToRequest(Message message)
public async Task<GreetingEvent> MapToRequest(Message message)
{
var deserializer = new JsonDeserializer<GreetingEvent>().AsSyncOverAsync();
var deserializer = new JsonDeserializer<GreetingEvent>();
//This uses the Confluent JSON serializer, which wraps Newtonsoft but also performs schema registration and validation
var greetingCommand = deserializer.Deserialize(message.Body.Bytes, message.Body.Bytes is null, _serializationContext);
var greetingCommand
= await deserializer.DeserializeAsync(message.Body.Bytes, message.Body.Bytes is null, _serializationContext);

return greetingCommand;
}
Expand Down
98 changes: 44 additions & 54 deletions samples/KafkaSchemaRegistry/GreetingsReceiverConsole/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ THE SOFTWARE. */

#endregion

using System;
using System.IO;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Greetings.Ports.Commands;
Expand All @@ -38,63 +36,55 @@ THE SOFTWARE. */
using Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection;
using Paramore.Brighter.ServiceActivator.Extensions.Hosting;

namespace GreetingsReceiverConsole
{
public class Program
var host = Host.CreateDefaultBuilder(args)
.ConfigureHostConfiguration(configurationBuilder =>
{
public static async Task Main(string[] args)
configurationBuilder.SetBasePath(Directory.GetCurrentDirectory());
configurationBuilder.AddJsonFile("appsettings.json", optional: true);
configurationBuilder.AddCommandLine(args);
})
.ConfigureLogging((context, builder) =>
{
builder.AddConsole();
builder.AddDebug();
})
.ConfigureServices((hostContext, services) =>
{
var subscriptions = new KafkaSubscription[]
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureHostConfiguration(configurationBuilder =>
{
configurationBuilder.SetBasePath(Directory.GetCurrentDirectory());
configurationBuilder.AddJsonFile("appsettings.json", optional: true);
configurationBuilder.AddCommandLine(args);
})
.ConfigureLogging((context, builder) =>
{
builder.AddConsole();
builder.AddDebug();
})
.ConfigureServices((hostContext, services) =>
{
var subscriptions = new KafkaSubscription[]
{
new KafkaSubscription<GreetingEvent>(
new SubscriptionName("paramore.example.greeting"),
channelName: new ChannelName("greeting.event"),
routingKey: new RoutingKey("greeting.event"),
groupId: "kafka-GreetingsReceiverConsole-Sample",
timeoutInMilliseconds: 100,
offsetDefault: AutoOffsetReset.Earliest,
commitBatchSize: 5,
sweepUncommittedOffsetsIntervalMs: 10000)
};

//We take a direct dependency on the schema registry in the message mapper
var schemaRegistryConfig = new SchemaRegistryConfig { Url = "http://localhost:8081"};
var cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
services.AddSingleton<ISchemaRegistryClient>(cachedSchemaRegistryClient);
new KafkaSubscription<GreetingEvent>(
new SubscriptionName("paramore.example.greeting"),
channelName: new ChannelName("greeting.event"),
routingKey: new RoutingKey("greeting.event"),
groupId: "kafka-GreetingsReceiverConsole-Sample",
timeoutInMilliseconds: 100,
offsetDefault: AutoOffsetReset.Earliest,
commitBatchSize: 5,
sweepUncommittedOffsetsIntervalMs: 10000,
runAsync: true)
};

//We take a direct dependency on the schema registry in the message mapper
var schemaRegistryConfig = new SchemaRegistryConfig { Url = "http://localhost:8081" };
var cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
services.AddSingleton<ISchemaRegistryClient>(cachedSchemaRegistryClient);

//create the gateway
var consumerFactory = new KafkaMessageConsumerFactory(
new KafkaMessagingGatewayConfiguration { Name = "paramore.brighter", BootStrapServers = new[] { "localhost:9092" } }
);

services.AddServiceActivator(options =>
services.AddServiceActivator(options =>
{
options.Subscriptions = subscriptions;
options.ChannelFactory = new ChannelFactory(
new KafkaMessageConsumerFactory(
new KafkaMessagingGatewayConfiguration
{
options.Subscriptions = subscriptions;
options.ChannelFactory = new ChannelFactory(consumerFactory);
}).AutoFromAssemblies();
Name = "paramore.brighter", BootStrapServers = new[] { "localhost:9092" }
}
));
}).AutoFromAssemblies();


services.AddHostedService<ServiceActivatorHostedService>();
})
.UseConsoleLifetime()
.Build();
services.AddHostedService<ServiceActivatorHostedService>();
})
.UseConsoleLifetime()
.Build();

await host.RunAsync();
}
}
}
await host.RunAsync();
Loading
Loading