Skip to content

Commit

Permalink
Merge pull request #997 from DaAlbrecht/master
Browse files Browse the repository at this point in the history
feat: support rabbitmq headers
  • Loading branch information
aashikam authored Sep 28, 2024
2 parents 49acbc9 + bc17160 commit 8de41e6
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 11 deletions.
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "rabbitmq"
version = "3.1.0"
version = "3.1.1"
authors = ["Ballerina"]
keywords = ["service", "client", "messaging", "network", "pubsub"]
repository = "https://github.com/ballerina-platform/module-ballerinax-rabbitmq"
Expand All @@ -18,8 +18,8 @@ path = "./lib/amqp-client-5.18.0.jar"
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "rabbitmq-native"
version = "3.1.0"
path = "../native/build/libs/rabbitmq-native-3.1.0.jar"
version = "3.1.1"
path = "../native/build/libs/rabbitmq-native-3.1.1-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
Expand Down
2 changes: 1 addition & 1 deletion ballerina/CompilerPlugin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ id = "rabbitmq-compiler-plugin"
class = "io.ballerina.stdlib.rabbitmq.plugin.RabbitmqCompilerPlugin"

[[dependency]]
path = "../compiler-plugin/build/libs/rabbitmq-compiler-plugin-3.1.0.jar"
path = "../compiler-plugin/build/libs/rabbitmq-compiler-plugin-3.1.1-SNAPSHOT.jar"
2 changes: 1 addition & 1 deletion ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ modules = [
[[package]]
org = "ballerinax"
name = "rabbitmq"
version = "3.1.0"
version = "3.1.1"
dependencies = [
{org = "ballerina", name = "constraint"},
{org = "ballerina", name = "crypto"},
Expand Down
2 changes: 2 additions & 0 deletions ballerina/rabbitmq_commons.bal
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ public const TOPIC_EXCHANGE = "topic";
# + contentType - The content type of the message
# + contentEncoding - The content encoding of the message
# + correlationId - The client-specific ID that can be used to mark or identify messages between clients
# + headers - A map of additional arbitrary headers to be included in the message
public type BasicProperties record {|
string replyTo?;
string contentType?;
string contentEncoding?;
string correlationId?;
map<anydata> headers?;
|};

# Additional configurations used to declare a queue.
Expand Down
43 changes: 40 additions & 3 deletions ballerina/tests/rabbitmq_client_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,43 @@ public isolated function testProducerTransactional() returns error? {
return;
}

@test:Config {
dependsOn: [testClient],
groups: ["rabbitmq"]
}
public isolated function testProducerWithHeaders() returns error? {
string queue = "testProducerWithHeaders";
string message = "Test producing with headers";
Client newClient = check new (DEFAULT_HOST, DEFAULT_PORT);
map<string> headers = {
"header1": "value1",
"header2": "value2"
};
check newClient->queueDeclare(queue);
check newClient->publishMessage({content: message.toBytes(), routingKey: queue, properties: {headers: headers}});
BytesMessage|Error consumeResult = newClient->consumeMessage(queue, false);
if consumeResult is BytesMessage {
string messageContent = check 'string:fromBytes(consumeResult.content);
BasicProperties? properties = consumeResult.properties;

if properties is BasicProperties {
map<anydata>? receivedHeaders = properties.headers;
if receivedHeaders is () {
test:assertFail("No headers received.");
} else {
test:assertEquals(receivedHeaders["header1"], "value1", msg = "Header1 mismatch.");
test:assertEquals(receivedHeaders["header2"], "value2", msg = "Header2 mismatch.");
}
}
log:printInfo("The message received: " + messageContent);
test:assertEquals(messageContent, message, msg = "Message received does not match.");
} else {
test:assertFail("Error when trying to consume messages using client.");
}
check newClient->close();
return;
}

@test:Config {
dependsOn: [testClient],
groups: ["rabbitmq"]
Expand Down Expand Up @@ -418,7 +455,7 @@ public isolated function testDeclareQueueWithArgsNegative() returns error? {
string expectedError = "Error occurred while declaring the queue: Unsupported type in arguments map passed "
+ "while declaring a queue.";
test:assertEquals(result.message(), expectedError,
msg = "Error message mismatch in declaring queue with invalid args.");
msg = "Error message mismatch in declaring queue with invalid args.");
}
return newClient->close();
}
Expand Down Expand Up @@ -594,11 +631,11 @@ public function testListenerQueueDeclareDuplicate() returns error? {
}

@test:Config {
dependsOn: [testListener, testSyncConsumer,testListenerQueueDeclareDuplicate],
dependsOn: [testListener, testSyncConsumer, testListenerQueueDeclareDuplicate],
groups: ["rabbitmq"]
}
public function testListenerQueueDeclareDuplicateError() returns error? {
Listener channelListener = check new(DEFAULT_HOST, DEFAULT_PORT);
Listener channelListener = check new (DEFAULT_HOST, DEFAULT_PORT);
if channelListener is Listener {
error? result = channelListener.attach(queueConfigDuplicateError);
test:assertTrue(result is error, msg = "Error expected when declaring same queue with different properties.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class PluginConstants {
public static final String MESSAGE_CONTENT_TYPE = "contentType";
public static final String MESSAGE_CONTENT_ENCODING = "contentEncoding";
public static final String MESSAGE_CORRELATION_ID = "correlationId";
public static final String MESSAGE_HEADERS = "headers";

// return types error or nil
public static final String ERROR = "error";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_CORRELATION_ID;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_DELIVERY_TAG;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_EXCHANGE;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_HEADERS;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_PROPERTIES;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_REPLY_TO;
import static io.ballerina.stdlib.rabbitmq.plugin.PluginConstants.MESSAGE_ROUTING_KEY;
Expand Down Expand Up @@ -403,10 +404,11 @@ private boolean validatePropertiesField(TypeSymbol propertiesTypeSymbol) {
propertiesRecordSymbol = (RecordTypeSymbol) propertiesTypeSymbol;
}
Map<String, RecordFieldSymbol> propertiesFieldDescriptors = propertiesRecordSymbol.fieldDescriptors();
if (propertiesFieldDescriptors.size() != 4 || !propertiesFieldDescriptors.containsKey(MESSAGE_REPLY_TO) ||
if (propertiesFieldDescriptors.size() != 5 || !propertiesFieldDescriptors.containsKey(MESSAGE_REPLY_TO) ||
!propertiesFieldDescriptors.containsKey(MESSAGE_CONTENT_TYPE) ||
!propertiesFieldDescriptors.containsKey(MESSAGE_CONTENT_ENCODING) ||
!propertiesFieldDescriptors.containsKey(MESSAGE_CORRELATION_ID)) {
!propertiesFieldDescriptors.containsKey(MESSAGE_CORRELATION_ID) ||
!propertiesFieldDescriptors.containsKey(MESSAGE_HEADERS)) {
return false;
}
if (propertiesFieldDescriptors.get(MESSAGE_REPLY_TO).typeDescriptor().typeKind() != STRING) {
Expand All @@ -421,6 +423,9 @@ private boolean validatePropertiesField(TypeSymbol propertiesTypeSymbol) {
if (propertiesFieldDescriptors.get(MESSAGE_CORRELATION_ID).typeDescriptor().typeKind() != STRING) {
return false;
}
if (propertiesFieldDescriptors.get(MESSAGE_HEADERS).typeDescriptor().typeKind() != MAP) {
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public class RabbitMQConstants {
public static final BString ALIAS_CONTENT_TYPE = StringUtils.fromString("contentType");
public static final BString ALIAS_CONTENT_ENCODING = StringUtils.fromString("contentEncoding");
public static final BString ALIAS_CORRELATION_ID = StringUtils.fromString("correlationId");
public static final BString ALIAS_HEADERS = StringUtils.fromString("headers");

private RabbitMQConstants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Map;

import static io.ballerina.runtime.api.TypeTags.INTERSECTION_TAG;
import static io.ballerina.runtime.api.TypeTags.STRING_TAG;
Expand Down Expand Up @@ -138,14 +139,24 @@ public static BMap<BString, Object> createAndPopulateMessageRecord(byte[] messag
String contentType = properties.getContentType();
String contentEncoding = properties.getContentEncoding();
String correlationId = properties.getCorrelationId();
Map<String, Object> headersMap = properties.getHeaders();
BMap<BString, Object> headers = ValueCreator.createMapValue();

if (headersMap != null) {
headersMap.forEach((key, value) -> headers.put(StringUtils.fromString(key), StringUtils.fromString(
value.toString())));
}

BMap<BString, Object> basicProperties =
ValueCreator.createRecordValue(getModule(),
RabbitMQConstants.RECORD_BASIC_PROPERTIES);
Object[] propValues = new Object[4];
Object[] propValues = new Object[5];
propValues[0] = replyTo;
propValues[1] = contentType;
propValues[2] = contentEncoding;
propValues[3] = correlationId;
propValues[4] = headers;

messageRecord.put(StringUtils.fromString(MESSAGE_PROPERTIES_FIELD), ValueCreator
.createRecordValue(basicProperties, propValues));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ public static Object publishNative(Environment environment, BObject channelObj,
String contentType = null;
String contentEncoding = null;
String correlationId = null;
Map<String, Object> headers = new HashMap<>();
if (basicPropsMap.containsKey(RabbitMQConstants.ALIAS_REPLY_TO)) {
replyTo = basicPropsMap.getStringValue(RabbitMQConstants.ALIAS_REPLY_TO).getValue();
}
Expand All @@ -295,6 +296,14 @@ public static Object publishNative(Environment environment, BObject channelObj,
if (basicPropsMap.containsKey(RabbitMQConstants.ALIAS_CORRELATION_ID)) {
correlationId = basicPropsMap.getStringValue(RabbitMQConstants.ALIAS_CORRELATION_ID).getValue();
}
if (basicPropsMap.containsKey(RabbitMQConstants.ALIAS_HEADERS)) {
@SuppressWarnings(RabbitMQConstants.UNCHECKED)
BMap<BString, BString> headersMap = (BMap<BString, BString>) basicPropsMap
.getMapValue(RabbitMQConstants.ALIAS_HEADERS);
headersMap.entrySet()
.forEach(entry -> headers.put(entry.getKey().getValue(),
headersMap.getStringValue(entry.getKey()).getValue()));
}
if (replyTo != null) {
builder.replyTo(replyTo);
}
Expand All @@ -307,6 +316,9 @@ public static Object publishNative(Environment environment, BObject channelObj,
if (correlationId != null) {
builder.correlationId(correlationId);
}
if (!headers.isEmpty()) {
builder.headers(headers);
}
}
if (TransactionResourceManager.getInstance().isInTransaction()) {
RabbitMQUtils.handleTransaction(channelObj);
Expand Down

0 comments on commit 8de41e6

Please sign in to comment.