Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for RabbitMQ.Client v7.0.0 #1446

Merged
merged 42 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9b0c5e9
Wip upgrade
danielmarbach Jul 30, 2024
9850a5b
More changes
bording Aug 28, 2024
a862309
More changes
bording Aug 29, 2024
570851b
Fix warnings
bording Aug 29, 2024
ef36c20
Remove our now unneeded publisher confirms handling
bording Aug 30, 2024
3105e60
Use CatchAsync
bording Aug 30, 2024
f3d1b64
Add awaits to tests
bording Aug 30, 2024
cdda1e4
Add missing await
bording Aug 30, 2024
f9cee9d
ConfirmsAwareChannel init for now
danielmarbach Sep 3, 2024
3d0e757
Wire up unregister event for consumer cancellation
danielmarbach Sep 3, 2024
3daaf26
Headers can be null when nothing was sent
danielmarbach Sep 4, 2024
6ca25c7
Add a done condition (still not an entirely perfect test but we'll go…
danielmarbach Sep 4, 2024
a7052f5
Cleanup test a bit
danielmarbach Sep 4, 2024
da2b7d0
Update approval file
bording Sep 4, 2024
30b0b0b
Don't cancel the connection close
bording Sep 4, 2024
3dcade7
ToHashSet
danielmarbach Sep 6, 2024
fcc9a8d
Cleanup test
danielmarbach Sep 6, 2024
843dee3
Switch regex approach
danielmarbach Sep 6, 2024
74acd57
Async continuation
danielmarbach Sep 6, 2024
362bd2c
Forward cancellation token with exception handling as a spike
danielmarbach Sep 6, 2024
090d022
Update to 7.0.0-rc.9
lukebakken Sep 6, 2024
3aad80b
Cleanup
bording Sep 10, 2024
4921a3a
More cleanup
danielmarbach Sep 11, 2024
377cc0f
Add more info to test exception message
bording Sep 11, 2024
2c66273
Add more logging
bording Sep 13, 2024
750d4b3
Simplify
bording Sep 13, 2024
2498275
Return channel after all tasks are completed now that send is async
bording Sep 13, 2024
e5415de
Put return in finally block
bording Sep 16, 2024
99bc699
Try adding our own tracking back
bording Sep 16, 2024
8d0305b
Revert "Try adding our own tracking back"
danielmarbach Sep 17, 2024
77d7b7c
WaitForConfirmsOrDieAsync for confirms
danielmarbach Sep 17, 2024
c509ef4
Upgrade to RC11
danielmarbach Sep 20, 2024
f012ae7
Try adding our own tracking back
bording Sep 16, 2024
7bde1bd
Comment
danielmarbach Sep 20, 2024
59a4e50
Synchronize for now the sequence number access around publish (delibe…
danielmarbach Sep 20, 2024
709bbd8
Move NextPublishSeqNo acquisition inside semaphore
bording Sep 20, 2024
d55f677
Remove TODOs that are not necessary
danielmarbach Sep 23, 2024
b3a6b64
Non-generic task completion source
danielmarbach Sep 23, 2024
342e8e0
readonly
danielmarbach Sep 23, 2024
2c60411
It seems we have to pass the non-readonly version
danielmarbach Sep 23, 2024
31dcd40
UnsafeRegister
danielmarbach Sep 23, 2024
7150e3c
Bump version to 10.0
bording Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Custom.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>

<PropertyGroup>
<MinVerMinimumMajorMinor>9.0</MinVerMinimumMajorMinor>
<MinVerMinimumMajorMinor>10.0</MinVerMinimumMajorMinor>
<MinVerAutoIncrement>minor</MinVerAutoIncrement>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
{
using System.Collections.Generic;
using System.Threading.Tasks;
using global::RabbitMQ.Client;

static class ChannelExtensions
{
public static void DeclareQuorumQueue(this IModel channel, string queueName)
public static Task<QueueDeclareOk> DeclareQuorumQueue(this IChannel channel, string queueName)
{
channel.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>
return channel.QueueDeclareAsync(queueName, true, false, false, new Dictionary<string, object>
{
{ "x-queue-type", "quorum" }
});
}

public static void DeclareClassicQueue(this IModel channel, string queueName)
public static Task<QueueDeclareOk> DeclareClassicQueue(this IChannel channel, string queueName)
{
channel.QueueDeclare(queueName, true, false, false);
return channel.QueueDeclareAsync(queueName, true, false, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,28 @@ public Task Configure(string endpointName, EndpointConfiguration configuration,
return Task.CompletedTask;
}

public Task Cleanup()
public async Task Cleanup()
{
PurgeQueues();

return Task.CompletedTask;
await PurgeQueues();
}

void PurgeQueues()
async Task PurgeQueues()
{
if (transport == null)
{
return;
}

var queues = transport.QueuesToCleanup.Distinct().ToArray();
var queues = transport.QueuesToCleanup.ToHashSet();

using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection("Test Queue Purger"))
using (var channel = connection.CreateModel())
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync("Test Queue Purger"))
using (var channel = await connection.CreateChannelAsync())
{
foreach (var queue in queues)
{
try
{
channel.QueuePurge(queue);
await channel.QueuePurgeAsync(queue);
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<ItemGroup>
<PackageReference Include="BitFaster.Caching" Version="2.5.1" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.2.2" GeneratePathProperty="true" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0-rc.11" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
Expand All @@ -9,12 +10,12 @@
public class When_classic_endpoint_uses_quorum_error_queue : NServiceBusAcceptanceTest
{
[Test]
public void Should_fail_to_start()
public async Task Should_fail_to_start()
{
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
using (var channel = await connection.CreateChannelAsync())
{
channel.DeclareQuorumQueue("rabbitmq.transport.tests.quorum-error");
await channel.DeclareQuorumQueue("rabbitmq.transport.tests.quorum-error");
}

var exception = Assert.CatchAsync<Exception>(async () => await Scenario.Define<ScenarioContext>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using NServiceBus.AcceptanceTests;
Expand All @@ -10,12 +11,12 @@
public class When_classic_endpoint_uses_quorum_queue : NServiceBusAcceptanceTest
{
[Test]
public void Should_fail_to_start()
public async Task Should_fail_to_start()
{
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
using (var channel = await connection.CreateChannelAsync())
{
channel.DeclareQuorumQueue(Conventions.EndpointNamingConvention(typeof(ClassicQueueEndpoint)));
await channel.DeclareQuorumQueue(Conventions.EndpointNamingConvention(typeof(ClassicQueueEndpoint)));
}

var exception = Assert.CatchAsync<Exception>(async () => await Scenario.Define<ScenarioContext>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus.AcceptanceTests;
using NUnit.Framework;

public class When_quorum_endpoint_uses_classic_error_queue : NServiceBusAcceptanceTest
{
[Test]
public void Should_fail_to_start()
public async Task Should_fail_to_start()
{
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
using (var channel = await connection.CreateChannelAsync())
{
channel.DeclareClassicQueue("rabbitmq.transport.tests.classic-error");
await channel.DeclareClassicQueue("rabbitmq.transport.tests.classic-error");
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using NServiceBus.AcceptanceTests;
Expand All @@ -9,12 +10,12 @@
public class When_quorum_endpoint_uses_classic_queue : NServiceBusAcceptanceTest
{
[Test]
public void Should_fail_to_start()
public async Task Should_fail_to_start()
{
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
using (var channel = await connection.CreateChannelAsync())
{
channel.DeclareClassicQueue(Conventions.EndpointNamingConvention(typeof(QuorumQueueEndpoint)));
await channel.DeclareClassicQueue(Conventions.EndpointNamingConvention(typeof(QuorumQueueEndpoint)));
}

var exception = Assert.CatchAsync<Exception>(async () => await Scenario.Define<ScenarioContext>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ public async Task Should_create_receiving_queues_as_quorum_queues()
{
var endpointInputQueue = Conventions.EndpointNamingConvention(typeof(QuorumQueueEndpoint));

using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
using (var channel = await connection.CreateChannelAsync())
{
channel.QueueDelete(endpointInputQueue, false, false);
channel.QueueDelete(endpointInputQueue + "-disc", false, false);
channel.QueueDelete("QuorumQueueSatelliteReceiver", false, false);
await channel.QueueDeleteAsync(endpointInputQueue, false, false);
await channel.QueueDeleteAsync(endpointInputQueue + "-disc", false, false);
await channel.QueueDeleteAsync("QuorumQueueSatelliteReceiver", false, false);
}

await Scenario.Define<ScenarioContext>()
Expand All @@ -29,23 +29,23 @@ await Scenario.Define<ScenarioContext>()
.Run();

// try to declare the same queue as a non-quorum queue, which should fail:
using (var connection = ConnectionHelper.ConnectionFactory.CreateConnection())
using (var connection = await ConnectionHelper.ConnectionFactory.CreateConnectionAsync())
{
using (var channel = connection.CreateModel())
using (var channel = await connection.CreateChannelAsync())
{
var mainQueueException = Assert.Catch<RabbitMQClientException>(() => channel.DeclareClassicQueue(endpointInputQueue));
var mainQueueException = Assert.CatchAsync<RabbitMQClientException>(async () => await channel.DeclareClassicQueue(endpointInputQueue));
Assert.That(mainQueueException.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type'"));
}

using (var channel = connection.CreateModel())
using (var channel = await connection.CreateChannelAsync())
{
var instanceSpecificQueueException = Assert.Catch<RabbitMQClientException>(() => channel.DeclareClassicQueue(endpointInputQueue + "-disc"));
var instanceSpecificQueueException = Assert.CatchAsync<RabbitMQClientException>(async () => await channel.DeclareClassicQueue(endpointInputQueue + "-disc"));
Assert.That(instanceSpecificQueueException.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type'"));
}

using (var channel = connection.CreateModel())
using (var channel = await connection.CreateChannelAsync())
{
var satelliteReceiver = Assert.Catch<RabbitMQClientException>(() => channel.DeclareClassicQueue("QuorumQueueSatelliteReceiver"));
var satelliteReceiver = Assert.CatchAsync<RabbitMQClientException>(async () => await channel.DeclareClassicQueue("QuorumQueueSatelliteReceiver"));
Assert.That(satelliteReceiver.Message, Does.Contain("PRECONDITION_FAILED - inequivalent arg 'x-queue-type'"));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus.AcceptanceTests;
Expand Down Expand Up @@ -32,59 +33,42 @@ public async Task Should_only_deliver_event_to_one_of_the_instances()
c.ServerBSubscribed = true;
});
})
.Done(ctx => ctx.Counter > 0)
.Run(TimeSpan.FromSeconds(10));

Assert.That(context.Counter, Is.EqualTo(1), "One of the scaled out instances should get the event");
}

public class ScaledOutSubscriber : EndpointConfigurationBuilder
{
public ScaledOutSubscriber()
{
EndpointSetup<DefaultPublisher>();
}
public ScaledOutSubscriber() => EndpointSetup<DefaultPublisher>();

class MyEventHandler : IHandleMessages<MyEvent>
class MyEventHandler(MyContext myContext) : IHandleMessages<MyEvent>
{
readonly MyContext myContext;

public MyEventHandler(MyContext context)
{
myContext = context;
}

public Task Handle(MyEvent message, IMessageHandlerContext context)
{
lock (objLock)
{
myContext.Counter++;
}

myContext.IncrementCounter();
return Task.CompletedTask;
}

static object objLock = new object();
}
}

public class Publisher : EndpointConfigurationBuilder
{
public Publisher()
{
EndpointSetup<DefaultPublisher>();
}
public Publisher() => EndpointSetup<DefaultPublisher>();
}

public class MyEvent : IEvent
{

}
public class MyEvent : IEvent;

class MyContext : ScenarioContext
{
public bool ServerASubscribed { get; set; }
public bool ServerBSubscribed { get; set; }
public int Counter { get; set; }
public int Counter => counter;

public void IncrementCounter() => Interlocked.Increment(ref counter);

int counter;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace NServiceBus.Transport.RabbitMQ.AcceptanceTests
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
Expand All @@ -19,26 +18,24 @@ public class When_the_broker_connection_is_lost : NServiceBusAcceptanceTest
[Test]
public async Task Should_reconnect()
{
var context = await Scenario.Define<MyContext>(myContext =>
var context = await Scenario.Define<MyContext>(ctx =>
{
myContext.MessageId = Guid.NewGuid().ToString();
ctx.MessageId = Guid.NewGuid().ToString();
})
.WithEndpoint<Receiver>()
.Done(c => c.GotTheMessage)
.Run();
.WithEndpoint<Receiver>()
.Done(c => c.GotTheMessage)
.Run();

Assert.That(context.GotTheMessage, Is.True, "Should receive the message");
}

public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
public Receiver() =>
EndpointSetup<DefaultServer>(e =>
{
e.EnableFeature<ConnectionKillerFeature>();
});
}

class ConnectionKillerFeature : Feature
{
Expand All @@ -48,15 +45,9 @@ protected override void Setup(FeatureConfigurationContext context)
context.RegisterStartupTask(b => b.GetRequiredService<ConnectionKiller>());
}

class ConnectionKiller : FeatureStartupTask
class ConnectionKiller(IMessageDispatcher sender, IReadOnlySettings settings, MyContext context)
: FeatureStartupTask
{
public ConnectionKiller(IMessageDispatcher sender, IReadOnlySettings settings, MyContext context)
{
this.context = context;
this.sender = sender;
this.settings = settings;
}

protected override async Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
{
await BreakConnectionBySendingInvalidMessage(cancellationToken);
Expand All @@ -70,7 +61,7 @@ async Task BreakConnectionBySendingInvalidMessage(CancellationToken cancellation
{
try
{
var outgoingMessage = new OutgoingMessage("Foo", [], new byte[0]);
var outgoingMessage = new OutgoingMessage("Foo", [], Array.Empty<byte>());
var props = new DispatchProperties
{
DiscardIfNotReceivedBefore =
Expand All @@ -84,10 +75,6 @@ async Task BreakConnectionBySendingInvalidMessage(CancellationToken cancellation
// Don't care
}
}

readonly MyContext context;
readonly IMessageDispatcher sender;
readonly IReadOnlySettings settings;
}
}

Expand Down
Loading