Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update aqmp config #310

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 31 additions & 61 deletions partials/AmqpConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;

@Configuration
public class Config {
Expand All @@ -33,16 +27,22 @@ public class Config {
@Value("${amqp.broker.password}")
private String password;

{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
@Value("${amqp.exchange.{{- channelName -}}}")

{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;

{% endif %}{% endfor %}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
@Value("${amqp.queue.{{- channelName -}}}")
@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% endif %}

{% if channel.hasPublish() %}
@Value("${amqp.{{- channelName -}}.queue}")
private String {{channelName}}Queue;
{% endif %}

{% endif %}{% endfor %}
{% endfor %}

@Bean
public ConnectionFactory connectionFactory() {
Expand All @@ -54,64 +54,34 @@ public ConnectionFactory connectionFactory() {
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
public Declarables exchanges() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);
}

@Bean
public Declarables queues() {
public Declarables declarables() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() %} new TopicExchange({{channelName}}Exchange, true, false)
{% if not loop.last %},{% endif %}{% endif %}{% endfor %}

{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}{% if loop.first %},{% endif %} new Queue({{channelName}}Queue, true, false, false)
{% if not loop.last %},{% endif %}{% endif %} {% endfor %}
);
}

// consumer

@Autowired
MessageHandlerService messageHandlerService;
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}

@Bean
public IntegrationFlow {{channelName | camelCase}}Flow() {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory(), {{channelName}}Queue))
.handle(messageHandlerService::handle{{channelName | upperFirst}})
.get();
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
{% endif %}{% endfor %}

// publisher

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}

@Bean
public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() {
return new DirectChannel();
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
@ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel")
public AmqpOutboundEndpoint {{channelName | camelCase}}Outbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setExchangeName({{channelName}}Exchange);
outbound.setRoutingKey("#");
return outbound;
public AmqpTemplate template() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}
{% endif %}{% endfor %}
}
{% endmacro %}
43 changes: 43 additions & 0 deletions partials/AmqpPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{% macro amqpPublisher(asyncapi, params) %}

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor -%}
{% endif -%}
{% endfor %}


@Service
public class PublisherService {
@Autowired
private RabbitTemplate template;

{% for channelName, channel in asyncapi.channels() %}
Tenischev marked this conversation as resolved.
Show resolved Hide resolved
{% if channel.hasSubscribe() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;
@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% endif %}
{% endfor %}

{% for channelName, channel in asyncapi.channels() %}
Tenischev marked this conversation as resolved.
Show resolved Hide resolved
{% if channel.hasSubscribe() %}
{%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %}
public void {{channel.subscribe().id() | camelCase}}(){
{{schemaName}} {{channelName}}Payload = new {{schemaName}}();
template.convertAndSend({{channelName}}Exchange, {{channelName}}RoutingKey, {{channelName}}Payload);
}

{% endif %}
{% endfor %}

}

{% endmacro %}
1 change: 1 addition & 0 deletions template/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ repositories {
dependencies {
{%- if asyncapi | isProtocol('amqp') %}
implementation('org.springframework.integration:spring-integration-amqp')
implementation('org.springframework.integration:spring-integration-amqp')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's doubled?

{% endif -%}
{%- if asyncapi | isProtocol('mqtt') %}
implementation('org.springframework.integration:spring-integration-mqtt')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public void run(String... args) {
{%- for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}(){% else %}"Hello World from {{channelName}}"{% endif %});
publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}()
{% elif asyncapi | isProtocol('amqp') %}{% else %}"Hello World from {{channelName}}"{% endif %});
{% endfor -%}
{% endif -%}
{%- endfor %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
{%- endif %}
{%- endfor %}
{% endif %}
{% if asyncapi | isProtocol('amqp') and hasPublish %}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}
@Service
public class MessageHandlerService {

Expand All @@ -52,9 +62,21 @@ public class MessageHandlerService {
}
{%- endif %}
{% endfor %}

{% elif asyncapi | isProtocol('amqp') %}
{% for channelName, channel in asyncapi.channels() %}
Tenischev marked this conversation as resolved.
Show resolved Hide resolved
{% if channel.hasPublish() %}
{%- set schemaName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
@RabbitListener(queues = "${amqp.{{- channelName -}}.queue}")
public void {{channel.publish().id() | camelCase}}({{schemaName}} {{channelName}}Payload ){
LOGGER.info("Message received from {{- channelName -}} : " + {{channelName}}Payload);
}
{% endif %}
{% endfor %}

{% else %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
{% if channel.hasPublish() %}
{% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %}
* {{line | safe}}{% endfor %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package {{ params['userJavaPackage'] }}.service;
{%- from "partials/CommonPublisher.java" import commonPublisher -%}
{%- from "partials/KafkaPublisher.java" import kafkaPublisher -%}
{%- from "partials/AmqpPublisher.java" import amqpPublisher -%}
{%- if asyncapi | isProtocol('kafka') -%}
{{- kafkaPublisher(asyncapi, params) -}}
{%- elif asyncapi | isProtocol('amqp') -%}
{{- amqpPublisher(asyncapi, params) -}}
{%- else -%}
{{- commonPublisher(asyncapi) -}}
{%- endif -%}
22 changes: 8 additions & 14 deletions template/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,21 @@
{%- endif -%}
{%- endfor -%}

{%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %}
{%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %}
amqp:
broker: {% for line in server.description() | splitByLines %}
# {{line | safe}}{% endfor %}
host: {{server.url() | replace(':{port}', '') }}
host: {% if server.variable('port') %}{{server.url() | replace('{port}', server.variable('port').defaultValue())}}{% else %}{{server.url()}}{% endif %}
port: {% if server.variable('port') %}{{server.variable('port').defaultValue()}}{% endif %}
username:
username: {% if server.variable('username') %}{{server.variable('username').defaultValue()}}{% endif %}
password:
exchange:
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() and channel.subscribe().binding('amqp') %}
{{channelName}}: {{channel.subscribe().binding('amqp').exchange.name}}
{% endif %}
{% endfor %}
queue:
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() and channel.publish().binding('amqp') %}
{{channelName}}: {{channel.publish().binding('amqp').queue.name}}
{% endif %}
{{channelName}}:
{% if channel.hasSubscribe() %} exchange: {{channel.subscribe().binding('amqp').exchange.name}} {% endif %}
{% if channel.hasSubscribe() %} routingKey: {{channel.subscribe().binding('amqp').routingKey}}{% endif %}
{% if channel.hasPublish() %} queue: {{channel.publish().binding('amqp').queue.name}}{% endif %}
{% endfor %}
{% endif %}
{% endif %}

{% if server.protocol() == 'mqtt' %}
mqtt:
Expand Down
83 changes: 83 additions & 0 deletions tests/user_examples/aqmp_config_multiple-channels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
asyncapi: 2.5.0
info:
title: Streetlights API Simplified
version: 1.0.0
description: |
The Smartylighting Streetlights API allows you to remotely manage the city lights.
This is a simplified version of the Streetlights API from other examples. This version is used in AsyncAPI documentation.
license:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0
servers:
production:
url: localhost
protocol: amqp
description: RabbitMQ
variables:
port:
default: '5672'
username:
default: guest

channels:
lightMeasured_Streetlight1:
publish:
summary: Inform about environmental lighting conditions for Streetlight 1.
operationId: readLightMeasurement_Streetlight1
bindings:
amqp:
is: queue
queue:
name: lightMeasurementQueue_Streetlight1
message:
$ref: '#/components/messages/lightMeasured'
subscribe:
operationId: updateLightMeasurement_Streetlight1
message:
$ref: '#/components/messages/lightMeasured'
bindings:
amqp:
is: routingKey
exchange:
name: lightMeasurementExchange_Streetlight1
routingKey: lightMeasurementRoutingKey_Streetlight1
lightMeasured_Streetlight2:
publish:
summary: Inform about environmental lighting conditions for Streetlight 2.
operationId: readLightMeasurement_Streetlight2
bindings:
amqp:
is: queue
queue:
name: lightMeasurementQueue_Streetlight2
message:
$ref: '#/components/messages/lightMeasured'
subscribe:
operationId: updateLightMeasurement_Streetlight2
message:
$ref: '#/components/messages/lightMeasured'
bindings:
amqp:
is: routingKey
exchange:
name: lightMeasurementExchange_Streetlight2
routingKey: lightMeasurementRoutingKey_Streetlight2
components:
messages:
lightMeasured:
summary: Inform about environmental lighting conditions for a particular streetlight.
payload:
$ref: "#/components/schemas/lightMeasuredPayload"
schemas:
lightMeasuredPayload:
type: object
properties:
id:
type: integer
minimum: 0
description: Id of the streetlight.
lumens:
type: integer
minimum: 0
description: Light intensity measured in lumens.

Loading