Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow-up to #1682 #1697

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion projects/Test/Applications/CreateChannel/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,35 @@
using System;
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
Expand Down
53 changes: 48 additions & 5 deletions projects/Test/Applications/GH-1647/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,35 @@
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
using System.Text;
using RabbitMQ.Client;

Expand All @@ -9,18 +40,30 @@
Password = "guest"
};

var channelOptions = new CreateChannelOptions
{
PublisherConfirmationsEnabled = true,
PublisherConfirmationTrackingEnabled = true
};

var props = new BasicProperties();
byte[] msg = Encoding.UTF8.GetBytes("test");
await using var connection = await connectionFactory.CreateConnectionAsync();
for (int i = 0; i < 300; i++)
{
try
{
await using var channel = await connection.CreateChannelAsync(); // New channel for each message
await using var channel = await connection.CreateChannelAsync(channelOptions); // New channel for each message
await Task.Delay(1000);
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty,
mandatory: false, basicProperties: props, body: msg);
Console.WriteLine($"Sent message {i}");
if (await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty,
mandatory: false, basicProperties: props, body: msg))
{
Console.WriteLine($"Sent message {i}");
}
else
{
Console.Error.WriteLine($"[ERROR] message {i} not acked!");
}
}
catch (Exception ex)
{
Expand Down
8 changes: 6 additions & 2 deletions projects/Test/Applications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,16 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer
using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;


bool ack = false;
for (int i = 0; i < ItemsPerBatch; i++)
{
await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey,
ack = await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey,
basicProperties: s_properties, body: s_payload, mandatory: true);
Interlocked.Increment(ref s_messagesSent);
if (false == ack)
{
Console.Error.WriteLine("[ERROR] channel {0} saw nack!", publishChannel.ChannelNumber);
}
}

if (s_debug)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public async Task TestPublishRpcRightAfterReconnect()

try
{
await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: testQueueName,
mandatory: false, basicProperties: properties, body: _messageBody);
Assert.True(await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: testQueueName,
mandatory: false, basicProperties: properties, body: _messageBody));
}
catch (Exception e)
{
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ public async Task TestCloseWithinEventHandler_GH1567()

var bp = new BasicProperties();

await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
basicProperties: bp, mandatory: true, body: GetRandomBody(64));
Assert.True(await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
basicProperties: bp, mandatory: true, body: GetRandomBody(64)));

Assert.True(await tcs.Task);
}
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ public async Task TestAsyncEventingBasicConsumer_GH1038()
await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty,
mandatory: false, basicProperties: props, body: messageBodyBytes);
Assert.True(await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty,
mandatory: false, basicProperties: props, body: messageBodyBytes));

await Task.WhenAll(_onReceivedTcs.Task, _onCallbackExceptionTcs.Task);
Assert.True(await _onReceivedTcs.Task);
Expand Down
10 changes: 5 additions & 5 deletions projects/Test/Integration/TestBasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public async Task TestBasicRoundtripArray()
};
string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer);

await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody);
Assert.True(await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody));
bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5));
await _channel.BasicCancelAsync(tag);

Expand Down Expand Up @@ -131,7 +131,7 @@ public async Task TestBasicRoundtripReadOnlyMemory()
};
string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer);

await _channel.BasicPublishAsync("", q.QueueName, new ReadOnlyMemory<byte>(sendBody));
Assert.True(await _channel.BasicPublishAsync("", q.QueueName, new ReadOnlyMemory<byte>(sendBody)));
bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2));
await _channel.BasicCancelAsync(tag);

Expand Down Expand Up @@ -161,7 +161,7 @@ public async Task CanNotModifyPayloadAfterPublish()
};
string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer);

await _channel.BasicPublishAsync("", q.QueueName, sendBody);
Assert.True(await _channel.BasicPublishAsync("", q.QueueName, sendBody));
sendBody.AsSpan().Fill(1);

Assert.True(await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)));
Expand Down Expand Up @@ -247,7 +247,7 @@ public async Task TestMaxInboundMessageBodySize()

string tag = await channel.BasicConsumeAsync(q.QueueName, true, consumer);

await channel.BasicPublishAsync("", q.QueueName, msg0);
Assert.True(await channel.BasicPublishAsync("", q.QueueName, msg0));
AlreadyClosedException ex = await Assert.ThrowsAsync<AlreadyClosedException>(() =>
channel.BasicPublishAsync("", q.QueueName, msg1).AsTask());
Assert.IsType<MalformedFrameException>(ex.InnerException);
Expand Down Expand Up @@ -315,7 +315,7 @@ public async Task TestPropertiesRoundtrip_Headers()
};

string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer);
await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody);
Assert.True(await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody));
bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5));
await _channel.BasicCancelAsync(tag);
Assert.True(waitResFalse);
Expand Down
8 changes: 4 additions & 4 deletions projects/Test/Integration/TestConfirmSelect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength)

var properties = new BasicProperties();
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
mandatory: false, basicProperties: properties, body: body);
Assert.True(await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
mandatory: false, basicProperties: properties, body: body));

try
{
Expand All @@ -88,7 +88,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
CorrelationId = new string('o', correlationIdLength)
};
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
Assert.True(await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body));
}
catch
{
Expand All @@ -97,7 +97,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,

properties = new BasicProperties();
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
Assert.True(await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body));
// _output.WriteLine("I'm done...");
}
}
Expand Down
2 changes: 1 addition & 1 deletion projects/Test/OAuth2/TestOAuth2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private async Task PublishAsync(IChannel publishChannel)
AppId = "oauth2",
};

await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, basicProperties: properties, body: body);
Assert.True(await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, basicProperties: properties, body: body));
_testOutputHelper.WriteLine("Sent and confirmed message");
}

Expand Down