Skip to content

Commit

Permalink
Update Kafka sample to use async pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
iancooper committed Jan 10, 2024
1 parent 46cd384 commit d392c74
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ THE SOFTWARE. */
#endregion

using System;
using System.Threading;
using System.Threading.Tasks;
using Greetings.Ports.Commands;
using Paramore.Brighter;

namespace Greetings.Ports.CommandHandlers
{
public class GreetingEventHandler : RequestHandler<GreetingEvent>
public class GreetingEventHandlerAsync : RequestHandlerAsync<GreetingEvent>
{
public override GreetingEvent Handle(GreetingEvent @event)
public override async Task<GreetingEvent> HandleAsync(GreetingEvent @event, CancellationToken cancellationToken = default)
{
Console.WriteLine("Received Greeting. Message Follows");
Console.WriteLine("----------------------------------");
Console.WriteLine(@event.Greeting);
Console.WriteLine("----------------------------------");
Console.WriteLine("Message Ends");
return base.Handle(@event);
return await base.HandleAsync(@event, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,48 +23,46 @@ THE SOFTWARE. */
#endregion

using System.Net.Mime;
using System.Threading.Tasks;
using Greetings.Ports.Commands;
using Confluent.Kafka;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using Paramore.Brighter;
using Paramore.Brighter.MessagingGateway.Kafka;

namespace Greetings.Ports.Mappers
{
public class GreetingEventMessageMapper : IAmAMessageMapper<GreetingEvent>
public class GreetingEventMessageMapperAsync(ISchemaRegistryClient schemaRegistryClient)
: IAmAMessageMapperAsync<GreetingEvent>
{
private readonly ISchemaRegistryClient _schemaRegistryClient;
private readonly string _partitionKey = "KafkaTestQueueExample_Partition_One";
private SerializationContext _serializationContext;
private readonly SerializationContext _serializationContext = new(MessageComponentType.Value, Topic);
private const string Topic = "greeting.event";

public GreetingEventMessageMapper(ISchemaRegistryClient schemaRegistryClient)
{
_schemaRegistryClient = schemaRegistryClient;
//We care about ensuring that we serialize the body using the Confluent tooling, as it registers and validates schema
_serializationContext = new SerializationContext(MessageComponentType.Value, Topic);
}

public Message MapToMessage(GreetingEvent request)
public async Task<Message> MapToMessage(GreetingEvent request)
{
var header = new MessageHeader(messageId: request.Id, topic: Topic, messageType: MessageType.MT_EVENT);
//This uses the Confluent JSON serializer, which wraps Newtonsoft but also performs schema registration and validation
var serializer = new JsonSerializer<GreetingEvent>(_schemaRegistryClient, ConfluentJsonSerializationConfig.SerdesJsonSerializerConfig(), ConfluentJsonSerializationConfig.NJsonSchemaGeneratorSettings()).AsSyncOverAsync();
var s = serializer.Serialize(request, _serializationContext);
var serializer = new JsonSerializer<GreetingEvent>(
schemaRegistryClient,
ConfluentJsonSerializationConfig.SerdesJsonSerializerConfig(),
ConfluentJsonSerializationConfig.NJsonSchemaGeneratorSettings()
);

var s = await serializer.SerializeAsync(request, _serializationContext);
var body = new MessageBody(s, MediaTypeNames.Application.Octet, CharacterEncoding.Raw);
header.PartitionKey = _partitionKey;

var message = new Message(header, body);
return message;
return new Message(header, body);
}

public GreetingEvent MapToRequest(Message message)
public async Task<GreetingEvent> MapToRequest(Message message)
{
var deserializer = new JsonDeserializer<GreetingEvent>().AsSyncOverAsync();
var deserializer = new JsonDeserializer<GreetingEvent>();
//This uses the Confluent JSON serializer, which wraps Newtonsoft but also performs schema registration and validation
var greetingCommand = deserializer.Deserialize(message.Body.Bytes, message.Body.Bytes is null, _serializationContext);
var greetingCommand
= await deserializer.DeserializeAsync(message.Body.Bytes, message.Body.Bytes is null, _serializationContext);

return greetingCommand;
}
Expand Down
22 changes: 12 additions & 10 deletions samples/KafkaSchemaRegistry/GreetingsReceiverConsole/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace GreetingsReceiverConsole
{
public class Program
{
public static async Task Main(string[] args)
public static Task Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureHostConfiguration(configurationBuilder =>
Expand All @@ -68,24 +68,26 @@ public static async Task Main(string[] args)
timeoutInMilliseconds: 100,
offsetDefault: AutoOffsetReset.Earliest,
commitBatchSize: 5,
sweepUncommittedOffsetsIntervalMs: 10000)
sweepUncommittedOffsetsIntervalMs: 10000,
isAsync: true)
};

//We take a direct dependency on the schema registry in the message mapper
var schemaRegistryConfig = new SchemaRegistryConfig { Url = "http://localhost:8081"};
var cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
services.AddSingleton<ISchemaRegistryClient>(cachedSchemaRegistryClient);


//create the gateway
var consumerFactory = new KafkaMessageConsumerFactory(
new KafkaMessagingGatewayConfiguration { Name = "paramore.brighter", BootStrapServers = new[] { "localhost:9092" } }
);

services.AddServiceActivator(options =>
{
options.Subscriptions = subscriptions;
options.ChannelFactory = new ChannelFactory(consumerFactory);
options.ChannelFactory = new ChannelFactory(
new KafkaMessageConsumerFactory(
new KafkaMessagingGatewayConfiguration
{
Name = "paramore.brighter",
BootStrapServers = new[] { "localhost:9092" }
}
));
}).AutoFromAssemblies();


Expand All @@ -94,7 +96,7 @@ public static async Task Main(string[] args)
.UseConsoleLifetime()
.Build();

await host.RunAsync();
return host.RunAsync();
}
}
}
69 changes: 36 additions & 33 deletions samples/KafkaSchemaRegistry/GreetingsSender/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ THE SOFTWARE. */
using System;
using System.IO;
using System.Threading.Tasks;
using System.Transactions;
using Confluent.SchemaRegistry;
using Greetings.Ports.Commands;
using Microsoft.Extensions.Configuration;
Expand All @@ -42,9 +41,9 @@ THE SOFTWARE. */

namespace GreetingsSender
{
internal static class Program
public static class Program
{
static async Task Main(string[] args)
public static Task Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureHostConfiguration(configurationBuilder =>
Expand All @@ -53,40 +52,14 @@ static async Task Main(string[] args)
configurationBuilder.AddJsonFile("appsettings.json", optional: true);
configurationBuilder.AddCommandLine(args);
})
.ConfigureLogging((context, builder) =>
.ConfigureLogging((_, builder) =>
{
builder.ClearProviders();
builder.AddConsole();
builder.AddDebug();
})
.ConfigureServices((hostContext, services) =>
{
var retryPolicy = Policy.Handle<Exception>().WaitAndRetry(new[]
{
TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(150)
});

var circuitBreakerPolicy =
Policy.Handle<Exception>().CircuitBreaker(1, TimeSpan.FromMilliseconds(500));

var retryPolicyAsync = Policy.Handle<Exception>().WaitAndRetryAsync(new[]
{
TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(150)
});

var circuitBreakerPolicyAsync = Policy.Handle<Exception>()
.CircuitBreakerAsync(1, TimeSpan.FromMilliseconds(500));

var policyRegistry = new PolicyRegistry
{
{CommandProcessor.RETRYPOLICY, retryPolicy},
{CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy},
{CommandProcessor.RETRYPOLICYASYNC, retryPolicyAsync},
{CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicyAsync}
};

//We take a direct dependency on the schema registry in the message mapper
var schemaRegistryConfig = new SchemaRegistryConfig { Url = "http://localhost:8081"};
var cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
Expand All @@ -98,7 +71,7 @@ static async Task Main(string[] args)
Name = "paramore.brighter.greetingsender",
BootStrapServers = new[] {"localhost:9092"}
},
new KafkaPublication[]
new[]
{
new KafkaPublication
{
Expand All @@ -112,7 +85,7 @@ static async Task Main(string[] args)

services.AddBrighter(options =>
{
options.PolicyRegistry = policyRegistry;
options.PolicyRegistry = RegisterPolicies();
})
.UseExternalBus((configure) =>
{
Expand All @@ -125,7 +98,37 @@ static async Task Main(string[] args)
.UseConsoleLifetime()
.Build();

await host.RunAsync();
return host.RunAsync();
}

private static PolicyRegistry RegisterPolicies()
{
var retryPolicy = Policy.Handle<Exception>().WaitAndRetry(new[]
{
TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(150)
});

var circuitBreakerPolicy =
Policy.Handle<Exception>().CircuitBreaker(1, TimeSpan.FromMilliseconds(500));

var retryPolicyAsync = Policy.Handle<Exception>().WaitAndRetryAsync(new[]
{
TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(150)
});

var circuitBreakerPolicyAsync = Policy.Handle<Exception>()
.CircuitBreakerAsync(1, TimeSpan.FromMilliseconds(500));

var policyRegistry = new PolicyRegistry
{
{CommandProcessor.RETRYPOLICY, retryPolicy},
{CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy},
{CommandProcessor.RETRYPOLICYASYNC, retryPolicyAsync},
{CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicyAsync}
};
return policyRegistry;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ public Task StopAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

private void DoWork(object state)
private async void DoWork(object state)
{
_iteration++;

var greetingEvent = new GreetingEvent{ Id = Guid.NewGuid(), Greeting = $"Hello # {_iteration}"};

_processor.Post(greetingEvent);
await _processor.PostAsync(greetingEvent);

_logger.LogInformation("Sending message with id {Id} and greeting {Request}", greetingEvent.Id,
greetingEvent.Greeting);
Expand Down

0 comments on commit d392c74

Please sign in to comment.