From 870a1f45f0d74c3bba6891a23360d8b06e1a3697 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 27 Jun 2024 10:29:14 -0700 Subject: [PATCH] Ensure that recovery uses copies of argument dicts Reported here: https://groups.google.com/g/rabbitmq-users/c/hk5pJ4cKF0c * Ensure that arguments passed to a queue, exchange, binding, or consumer are copied when being recorded. --- .../client/impl/RecordedBinding.cs | 9 ++ .../client/impl/RecordedConsumer.cs | 10 +- .../client/impl/RecordedExchange.cs | 10 +- .../client/impl/RecordedQueue.cs | 10 +- .../TestRecoveringConsumerEventHandlers.cs | 18 ++-- .../TestQueueRecoveryWithArguments.cs | 92 +++++++++++++++++++ 6 files changed, 140 insertions(+), 9 deletions(-) create mode 100644 projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs diff --git a/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs b/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs index ab2d72a1a3..71cd60f020 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs @@ -57,6 +57,15 @@ public RecordedBinding(bool isQueueBinding, string destination, string source, s _source = source; _routingKey = routingKey; _arguments = arguments; + + if (arguments is null) + { + _arguments = null; + } + else + { + _arguments = new Dictionary(arguments); + } } public RecordedBinding(string destination, in RecordedBinding old) diff --git a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs index a2c789c72d..2d7402cb5d 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs @@ -84,7 +84,15 @@ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer, _autoAck = autoAck; _exclusive = exclusive; - _arguments = arguments; + + if (arguments is null) + { + _arguments = null; + } + else + { + _arguments = new Dictionary(arguments); + } } public AutorecoveringChannel Channel => _channel; diff --git a/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs b/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs index a5650c81a1..fd492a3b28 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs @@ -56,7 +56,15 @@ public RecordedExchange(string name, string type, bool durable, bool autoDelete, _type = type; _durable = durable; _autoDelete = autoDelete; - _arguments = arguments; + + if (arguments is null) + { + _arguments = null; + } + else + { + _arguments = new Dictionary(arguments); + } } public Task RecoverAsync(IChannel channel, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs b/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs index d4148a2b9c..959a12b488 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs @@ -59,7 +59,15 @@ public RecordedQueue(string name, bool isServerNamed, bool durable, bool exclusi _durable = durable; _exclusive = exclusive; _autoDelete = autoDelete; - _arguments = arguments; + + if (arguments is null) + { + _arguments = null; + } + else + { + _arguments = new Dictionary(arguments); + } } public RecordedQueue(string newName, in RecordedQueue old) diff --git a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs index 66d08e9b03..d7db66feed 100644 --- a/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs +++ b/projects/Test/Integration/ConnectionRecovery/EventHandlerRecovery/Connection/TestRecoveringConsumerEventHandlers.cs @@ -69,10 +69,17 @@ public async Task TestRecoveringConsumerEventHandlers_Called(int iterations) [Fact] public async Task TestRecoveringConsumerEventHandler_EventArgumentsArePassedDown() { - var myArgs = new Dictionary { { "first-argument", "some-value" } }; + const string key = "first-argument"; + const string value = "some-value"; + + IDictionary arguments = new Dictionary + { + { key, value } + }; + RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false); var cons = new EventingBasicConsumer(_channel); - string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: myArgs); + string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: arguments); bool ctagMatches = false; bool consumerArgumentMatches = false; @@ -82,15 +89,14 @@ public async Task TestRecoveringConsumerEventHandler_EventArgumentsArePassedDown // passed to a CallbackExceptionHandler, instead of failing the test. Instead, we have to do this trick // and assert in the test function. ctagMatches = args.ConsumerTag == expectedCTag; - consumerArgumentMatches = (string)args.ConsumerArguments["first-argument"] == "some-value"; - args.ConsumerArguments["first-argument"] = "event-handler-set-this-value"; + consumerArgumentMatches = (string)args.ConsumerArguments[key] == value; }; await CloseAndWaitForRecoveryAsync(); Assert.True(ctagMatches, "expected consumer tag to match"); Assert.True(consumerArgumentMatches, "expected consumer arguments to match"); - string actualVal = (string)Assert.Contains("first-argument", myArgs as IDictionary); - Assert.Equal("event-handler-set-this-value", actualVal); + string actualVal = (string)Assert.Contains(key, arguments); + Assert.Equal(value, actualVal); } } } diff --git a/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs b/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs new file mode 100644 index 0000000000..6dcfd25962 --- /dev/null +++ b/projects/Test/Integration/ConnectionRecovery/TestQueueRecoveryWithArguments.cs @@ -0,0 +1,92 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration.ConnectionRecovery +{ + public class TestQueueRecoveryWithArguments : TestConnectionRecoveryBase + { + public TestQueueRecoveryWithArguments(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task TestQueueRecoveryWithDlxArgument_RabbitMQUsers_hk5pJ4cKF0c() + { + string tdiWaitExchangeName = GenerateExchangeName(); + string tdiRetryExchangeName = GenerateExchangeName(); + string testRetryQueueName = GenerateQueueName(); + string testQueueName = GenerateQueueName(); + + await _channel.ExchangeDeclareAsync(exchange: tdiWaitExchangeName, + type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); + await _channel.ExchangeDeclareAsync(exchange: tdiRetryExchangeName, + type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); + + var arguments = new Dictionary + { + { "x-dead-letter-exchange", "tdi.retry.exchange" }, + { "x-dead-letter-routing-key", "QueueTest" } + }; + + await _channel.QueueDeclareAsync(testRetryQueueName, durable: false, exclusive: false, autoDelete: false, arguments); + + arguments["x-dead-letter-exchange"] = "tdi.wait.exchange"; + arguments["x-dead-letter-routing-key"] = "QueueTest"; + + await _channel.QueueDeclareAsync(testQueueName, durable: false, exclusive: false, autoDelete: false, arguments); + + arguments.Remove("x-dead-letter-exchange"); + arguments.Remove("x-dead-letter-routing-key"); + + await _channel.QueueBindAsync(testRetryQueueName, tdiWaitExchangeName, testQueueName); + + await _channel.QueueBindAsync(testQueueName, tdiRetryExchangeName, testQueueName); + + var consumerAsync = new EventingBasicConsumer(_channel); + await _channel.BasicConsumeAsync(queue: testQueueName, autoAck: false, consumer: consumerAsync); + + await CloseAndWaitForRecoveryAsync(); + + QueueDeclareOk q0 = await _channel.QueueDeclarePassiveAsync(testRetryQueueName); + Assert.Equal(testRetryQueueName, q0.QueueName); + + QueueDeclareOk q1 = await _channel.QueueDeclarePassiveAsync(testQueueName); + Assert.Equal(testQueueName, q1.QueueName); + } + } +}