Skip to content

Commit

Permalink
Support async message mappers
Browse files Browse the repository at this point in the history
  • Loading branch information
iancooper committed Dec 21, 2023
1 parent a53c8a8 commit 9dbd16f
Show file tree
Hide file tree
Showing 74 changed files with 1,728 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,13 @@ public IBrighterBuilder MapperRegistryFromAssemblies(params Assembly[] assemblie
if (assemblies.Length == 0)
throw new ArgumentException("Value cannot be an empty collection.", nameof(assemblies));

var mappers =
from ti in assemblies.SelectMany(a => a.DefinedTypes).Distinct()
where ti.IsClass && !ti.IsAbstract && !ti.IsInterface
from i in ti.ImplementedInterfaces
where i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAmAMessageMapper<>)
select new { RequestType = i.GenericTypeArguments.First(), HandlerType = ti.AsType() };

foreach (var mapper in mappers)
{
_mapperRegistry.Add(mapper.RequestType, mapper.HandlerType);
}
RegisterMappersFromAssemblies(assemblies);
RegisterAsyncMappersFromAssemblies(assemblies);

return this;
}


/// <summary>
/// Register handlers with the built in subscriber registry
/// </summary>
Expand Down Expand Up @@ -222,5 +214,36 @@ from i in ti.ImplementedInterfaces
_serviceCollectionSubscriberRegistry.Add(subscriber.RequestType, subscriber.HandlerType);
}
}

private void RegisterMappersFromAssemblies(Assembly[] assemblies)
{
var mappers =
from ti in assemblies.SelectMany(a => a.DefinedTypes).Distinct()
where ti.IsClass && !ti.IsAbstract && !ti.IsInterface
from i in ti.ImplementedInterfaces
where i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAmAMessageMapper<>)
select new { RequestType = i.GenericTypeArguments.First(), HandlerType = ti.AsType() };

foreach (var mapper in mappers)
{
_mapperRegistry.Add(mapper.RequestType, mapper.HandlerType);
}
}

private void RegisterAsyncMappersFromAssemblies(Assembly[] assemblies)
{
var mappers =
from ti in assemblies.SelectMany(a => a.DefinedTypes).Distinct()
where ti.IsClass && !ti.IsAbstract && !ti.IsInterface
from i in ti.ImplementedInterfaces
where i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IAmAMessageMapperAsync<>)
select new { RequestType = i.GenericTypeArguments.First(), HandlerType = ti.AsType() };

foreach (var mapper in mappers)
{
_mapperRegistry.AddAsync(mapper.RequestType, mapper.HandlerType);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,15 @@ public static MessageMapperRegistry MessageMapperRegistry(IServiceProvider provi
null
);

foreach (var messageMapper in serviceCollectionMessageMapperRegistry)
foreach (var messageMapper in serviceCollectionMessageMapperRegistry.Mappers)
{
messageMapperRegistry.Register(messageMapper.Key, messageMapper.Value);
}

foreach (var messageMapper in serviceCollectionMessageMapperRegistry.AsyncMappers)
{
messageMapperRegistry.RegisterAsync(messageMapper.Key, messageMapper.Value);
}

return messageMapperRegistry;
}
Expand Down Expand Up @@ -423,5 +428,17 @@ public static ServiceProviderTransformerFactory TransformFactory(IServiceProvide
{
return new ServiceProviderTransformerFactory(provider);
}

/// <summary>
/// Creates transforms. Normally you don't need to call this, it is called by the builder for Brighter or
/// the Service Activator
/// Visibility is required for use from both
/// </summary>
/// <param name="provider">The IoC container to build the transform factory over</param>
/// <returns></returns>
public static ServiceProviderTransformerFactoryAsync TransformFactoryAsync(IServiceProvider provider)
{
return new ServiceProviderTransformerFactoryAsync(provider);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,20 @@ namespace Paramore.Brighter.Extensions.DependencyInjection
/// When parsing for message mappers in assemblies, stores any found message mappers. A later step will add these to the message mapper registry
/// Not used directly
/// </summary>
public class ServiceCollectionMessageMapperRegistry: IEnumerable<KeyValuePair<Type, Type>>
public class ServiceCollectionMessageMapperRegistry
{
private readonly IServiceCollection _serviceCollection;
private readonly Dictionary<Type, Type> _mapperCollection = new Dictionary<Type, Type>();
private readonly ServiceLifetime _lifetime;

public Dictionary<Type, Type> Mappers { get; } = new Dictionary<Type, Type>();
public Dictionary<Type, Type> AsyncMappers { get; } = new Dictionary<Type, Type>();

public ServiceCollectionMessageMapperRegistry(IServiceCollection serviceCollection, ServiceLifetime lifetime = ServiceLifetime.Singleton)
{
_serviceCollection = serviceCollection;
_lifetime = lifetime;
}

/// <summary>
/// Register a mapper with the collection (generic version)
/// </summary>
Expand All @@ -57,6 +59,16 @@ public void Register<TRequest, TMessageMapper>() where TRequest : class, IReques
Add(typeof(TRequest), typeof(TMessageMapper));
}

/// <summary>
/// Register a mapper with the collection (generic version)
/// </summary>
/// <typeparam name="TRequest">The type of the request to map</typeparam>
/// <typeparam name="TMessageMapper">The type of the mapper</typeparam>
public void RegisterAsync<TRequest, TMessageMapper>() where TRequest : class, IRequest where TMessageMapper : class, IAmAMessageMapperAsync<TRequest>
{
AddAsync(typeof(TRequest), typeof(TMessageMapper));
}

/// <summary>
/// Add a mapper to the collection
/// </summary>
Expand All @@ -65,25 +77,18 @@ public void Register<TRequest, TMessageMapper>() where TRequest : class, IReques
public void Add(Type message, Type mapper)
{
_serviceCollection.TryAdd(new ServiceDescriptor(mapper, mapper, _lifetime));
_mapperCollection.Add(message, mapper);
Mappers.Add(message, mapper);
}

/// <summary>
/// Get the genericly typed iterator over this collection
/// </summary>
/// <returns>An iterator</returns>
public IEnumerator<KeyValuePair<Type, Type>> GetEnumerator()
{
return _mapperCollection.GetEnumerator();
}


/// <summary>
/// Get the untyped iterator over the collection
/// Add a mapper to the collection
/// </summary>
/// <returns>An iterator</returns>
IEnumerator IEnumerable.GetEnumerator()
/// <param name="message">The type of message to map</param>
/// <param name="mapper">The type of the mapper</param>
public void AddAsync(Type message, Type mapper)
{
return GetEnumerator();
_serviceCollection.TryAdd(new ServiceDescriptor(mapper, mapper, _lifetime));
AsyncMappers.Add(message, mapper);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)

private static MessageBody ReadMessageBody(Amazon.SQS.Model.Message sqsMessage, string contentType)
{
if(contentType == CompressPayloadTransformer.GZIP || contentType == CompressPayloadTransformer.DEFLATE || contentType == CompressPayloadTransformer.BROTLI)
if(contentType == CompressPayloadTransformerAsync.GZIP || contentType == CompressPayloadTransformerAsync.DEFLATE || contentType == CompressPayloadTransformerAsync.BROTLI)
return new MessageBody(sqsMessage.Body, contentType, CharacterEncoding.Base64);

return new MessageBody(sqsMessage.Body, contentType, CharacterEncoding.UTF8);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Transactions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -70,9 +69,10 @@ private static Dispatcher BuildDispatcher(IServiceProvider serviceProvider)

var messageMapperRegistry = ServiceCollectionExtensions.MessageMapperRegistry(serviceProvider);
var messageTransformFactory = ServiceCollectionExtensions.TransformFactory(serviceProvider);
var messageTransformFactoryAsync = ServiceCollectionExtensions.TransformFactoryAsync(serviceProvider);

return dispatcherBuilder
.MessageMappers(messageMapperRegistry, messageTransformFactory)
.MessageMappers(messageMapperRegistry, messageTransformFactory, messageTransformFactoryAsync)
.DefaultChannelFactory(options.ChannelFactory)
.Subscriptions(options.Subscriptions).Build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public Dispatcher Build(string hostName)

return DispatchBuilder.With()
.CommandProcessorFactory(() => new CommandProcessorProvider(commandProcessor))
.MessageMappers(incomingMessageMapperRegistry, null)
.MessageMappers(incomingMessageMapperRegistry, null, null)
.DefaultChannelFactory(_channelFactory)
.Subscriptions(subscriptions)
.Build();
Expand Down
15 changes: 10 additions & 5 deletions src/Paramore.Brighter.ServiceActivator/DispatchBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ public INeedAMessageMapper CommandProcessorFactory(Func<IAmACommandProcessorProv
/// The message mappers used to map between commands, events, and on-the-wire handlers.
/// </summary>
/// <param name="theMessageMapperRegistry">The message mapper registry.</param>
/// <param name="messageTransformerFactory"></param>
/// <param name="messageTransformerFactory">A factory to produce transformers for a message mapper</param>
/// <param name="messageTransformFactoryAsync">A factory to produce async transformers for a message mapper</param>
/// <returns>INeedAChannelFactory.</returns>
public INeedAChannelFactory MessageMappers(
IAmAMessageMapperRegistry theMessageMapperRegistry,
IAmAMessageTransformerFactory messageTransformerFactory)
IAmAMessageTransformerFactory messageTransformerFactory,
IAmAMessageTransformerFactoryAsync messageTransformFactoryAsync)
{
_messageMapperRegistry = theMessageMapperRegistry;
_messageTransformerFactory = messageTransformerFactory;
Expand Down Expand Up @@ -136,6 +138,8 @@ public Dispatcher Build()
{
return new Dispatcher(_commandProcessorFactory, _messageMapperRegistry, _subscriptions, _messageTransformerFactory);
}


}

#region Progressive Interfaces
Expand Down Expand Up @@ -163,10 +167,11 @@ public interface INeedAMessageMapper
/// </summary>
/// <param name="messageMapperRegistry">The message mapper registry.</param>
/// <param name="messageTransformerFactory">The factory for creating transforms</param>
/// <param name="messageTransformFactoryAsync">The factory for creating async transforms</param>
/// <returns>INeedAChannelFactory.</returns>
INeedAChannelFactory MessageMappers(
IAmAMessageMapperRegistry messageMapperRegistry,
IAmAMessageTransformerFactory messageTransformerFactory);
INeedAChannelFactory MessageMappers(IAmAMessageMapperRegistry messageMapperRegistry,
IAmAMessageTransformerFactory messageTransformerFactory,
IAmAMessageTransformerFactoryAsync messageTransformFactoryAsync);
}
/// <summary>
/// Interface INeedAChannelFactory
Expand Down
Loading

0 comments on commit 9dbd16f

Please sign in to comment.