diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsSubscription.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsSubscription.cs index af0a30daf6..e179c30da7 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsSubscription.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsSubscription.cs @@ -125,7 +125,7 @@ public SqsSubscription( int requeueCount = -1, TimeSpan? requeueDelay = null, int unacceptableMessageLimit = 0, - MessagePumpType messagePumpType = MessagePumpType.Proactor, + MessagePumpType messagePumpType = MessagePumpType.Unknown, IAmAChannelFactory? channelFactory = null, int lockTimeout = 10, int delaySeconds = 0, diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs index d3522b3ae0..4503df7a2c 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs @@ -65,7 +65,7 @@ public AzureServiceBusSubscription( int requeueCount = -1, TimeSpan? requeueDelay = null, int unacceptableMessageLimit = 0, - MessagePumpType messagePumpType = MessagePumpType.Proactor, + MessagePumpType messagePumpType = MessagePumpType.Unknown, IAmAChannelFactory? channelFactory = null, OnMissingChannel makeChannels = OnMissingChannel.Create, AzureServiceBusSubscriptionConfiguration? subscriptionConfiguration = null, diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaSubscription.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaSubscription.cs index c21c42c408..aff97d1e3b 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaSubscription.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaSubscription.cs @@ -148,7 +148,7 @@ public KafkaSubscription ( TimeSpan? maxPollInterval = null, TimeSpan? sweepUncommittedOffsetsInterval = null, IsolationLevel isolationLevel = IsolationLevel.ReadCommitted, - MessagePumpType messagePumpType = MessagePumpType.Proactor, + MessagePumpType messagePumpType = MessagePumpType.Unknown, int numOfPartitions = 1, short replicationFactor = 1, IAmAChannelFactory channelFactory = null, diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqSubscription.cs b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqSubscription.cs index c596cad591..3e7f09d61f 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqSubscription.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqSubscription.cs @@ -100,7 +100,7 @@ public RmqSubscription( TimeSpan? requeueDelay = null, int unacceptableMessageLimit = 0, bool isDurable = false, - MessagePumpType messagePumpType = MessagePumpType.Proactor, + MessagePumpType messagePumpType = MessagePumpType.Unknown, IAmAChannelFactory? channelFactory = null, bool highAvailability = false, ChannelName? deadLetterChannelName = null, diff --git a/src/Paramore.Brighter.MessagingGateway.Redis/RedisSubscription.cs b/src/Paramore.Brighter.MessagingGateway.Redis/RedisSubscription.cs index 718178f0c1..2b0f7e386b 100644 --- a/src/Paramore.Brighter.MessagingGateway.Redis/RedisSubscription.cs +++ b/src/Paramore.Brighter.MessagingGateway.Redis/RedisSubscription.cs @@ -57,7 +57,7 @@ protected RedisSubscription( int requeueCount = -1, TimeSpan? requeueDelay = null, int unacceptableMessageLimit = 0, - MessagePumpType messagePumpType = MessagePumpType.Proactor, + MessagePumpType messagePumpType = MessagePumpType.Unknown, IAmAChannelFactory? channelFactory = null, OnMissingChannel makeChannels = OnMissingChannel.Create, TimeSpan? emptyChannelDelay = null, diff --git a/src/Paramore.Brighter/MessagePumpType.cs b/src/Paramore.Brighter/MessagePumpType.cs index 80bf074148..252cb0074f 100644 --- a/src/Paramore.Brighter/MessagePumpType.cs +++ b/src/Paramore.Brighter/MessagePumpType.cs @@ -2,6 +2,7 @@ namespace Paramore.Brighter; public enum MessagePumpType { + Unknown, Reactor, Proactor } diff --git a/src/Paramore.Brighter/Subscription.cs b/src/Paramore.Brighter/Subscription.cs index 4cb5ab50b3..9d7eb07e0c 100644 --- a/src/Paramore.Brighter/Subscription.cs +++ b/src/Paramore.Brighter/Subscription.cs @@ -153,12 +153,15 @@ public Subscription( int requeueCount = -1, TimeSpan? requeueDelay = null, int unacceptableMessageLimit = 0, - MessagePumpType messagePumpType = MessagePumpType.Proactor, + MessagePumpType messagePumpType = MessagePumpType.Unknown, IAmAChannelFactory? channelFactory = null, OnMissingChannel makeChannels = OnMissingChannel.Create, TimeSpan? emptyChannelDelay = null, TimeSpan? channelFailureDelay = null) { + if (messagePumpType == MessagePumpType.Unknown) + throw new ConfigurationException("You must set a message pump type: use Reactor for sync pipelines; use Proactor for async pipelines"); + DataType = dataType; Name = name ?? new SubscriptionName(dataType.FullName!); ChannelName = channelName ?? new ChannelName(dataType.FullName!); diff --git a/tests/Paramore.Brighter.InMemory.Tests/Consumer/When_an_inmemory_channelfactory_is_called.cs b/tests/Paramore.Brighter.InMemory.Tests/Consumer/When_an_inmemory_channelfactory_is_called.cs index da2e306c9f..4fef77fac5 100644 --- a/tests/Paramore.Brighter.InMemory.Tests/Consumer/When_an_inmemory_channelfactory_is_called.cs +++ b/tests/Paramore.Brighter.InMemory.Tests/Consumer/When_an_inmemory_channelfactory_is_called.cs @@ -14,7 +14,7 @@ public void When_an_inmemory_channelfactory_is_called() var inMemoryChannelFactory = new InMemoryChannelFactory(internalBus, TimeProvider.System); //act - var channel = inMemoryChannelFactory.CreateSyncChannel(new Subscription(typeof(MyEvent))); + var channel = inMemoryChannelFactory.CreateSyncChannel(new Subscription(typeof(MyEvent), messagePumpType: MessagePumpType.Reactor)); //assert Assert.NotNull(channel);