Skip to content

Commit

Permalink
fix: make it mandatory to set the pump type
Browse files Browse the repository at this point in the history
  • Loading branch information
iancooper committed Jan 6, 2025
1 parent 928a904 commit 31d194d
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public SqsSubscription(
int requeueCount = -1,
TimeSpan? requeueDelay = null,
int unacceptableMessageLimit = 0,
MessagePumpType messagePumpType = MessagePumpType.Proactor,
MessagePumpType messagePumpType = MessagePumpType.Unknown,
IAmAChannelFactory? channelFactory = null,
int lockTimeout = 10,
int delaySeconds = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public AzureServiceBusSubscription(
int requeueCount = -1,
TimeSpan? requeueDelay = null,
int unacceptableMessageLimit = 0,
MessagePumpType messagePumpType = MessagePumpType.Proactor,
MessagePumpType messagePumpType = MessagePumpType.Unknown,
IAmAChannelFactory? channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create,
AzureServiceBusSubscriptionConfiguration? subscriptionConfiguration = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public KafkaSubscription (
TimeSpan? maxPollInterval = null,
TimeSpan? sweepUncommittedOffsetsInterval = null,
IsolationLevel isolationLevel = IsolationLevel.ReadCommitted,
MessagePumpType messagePumpType = MessagePumpType.Proactor,
MessagePumpType messagePumpType = MessagePumpType.Unknown,
int numOfPartitions = 1,
short replicationFactor = 1,
IAmAChannelFactory channelFactory = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public RmqSubscription(
TimeSpan? requeueDelay = null,
int unacceptableMessageLimit = 0,
bool isDurable = false,
MessagePumpType messagePumpType = MessagePumpType.Proactor,
MessagePumpType messagePumpType = MessagePumpType.Unknown,
IAmAChannelFactory? channelFactory = null,
bool highAvailability = false,
ChannelName? deadLetterChannelName = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected RedisSubscription(
int requeueCount = -1,
TimeSpan? requeueDelay = null,
int unacceptableMessageLimit = 0,
MessagePumpType messagePumpType = MessagePumpType.Proactor,
MessagePumpType messagePumpType = MessagePumpType.Unknown,
IAmAChannelFactory? channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create,
TimeSpan? emptyChannelDelay = null,
Expand Down
1 change: 1 addition & 0 deletions src/Paramore.Brighter/MessagePumpType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace Paramore.Brighter;

public enum MessagePumpType
{
Unknown,
Reactor,
Proactor
}
5 changes: 4 additions & 1 deletion src/Paramore.Brighter/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,15 @@ public Subscription(
int requeueCount = -1,
TimeSpan? requeueDelay = null,
int unacceptableMessageLimit = 0,
MessagePumpType messagePumpType = MessagePumpType.Proactor,
MessagePumpType messagePumpType = MessagePumpType.Unknown,
IAmAChannelFactory? channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create,
TimeSpan? emptyChannelDelay = null,
TimeSpan? channelFailureDelay = null)
{
if (messagePumpType == MessagePumpType.Unknown)
throw new ConfigurationException("You must set a message pump type: use Reactor for sync pipelines; use Proactor for async pipelines");

DataType = dataType;
Name = name ?? new SubscriptionName(dataType.FullName!);
ChannelName = channelName ?? new ChannelName(dataType.FullName!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public void When_an_inmemory_channelfactory_is_called()
var inMemoryChannelFactory = new InMemoryChannelFactory(internalBus, TimeProvider.System);

//act
var channel = inMemoryChannelFactory.CreateSyncChannel(new Subscription(typeof(MyEvent)));
var channel = inMemoryChannelFactory.CreateSyncChannel(new Subscription(typeof(MyEvent), messagePumpType: MessagePumpType.Reactor));

//assert
Assert.NotNull(channel);
Expand Down

0 comments on commit 31d194d

Please sign in to comment.