Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update aqmp config #310

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 39 additions & 59 deletions partials/AmqpConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;

@Configuration
public class Config {
Expand All @@ -33,16 +27,17 @@ public class Config {
@Value("${amqp.broker.password}")
private String password;

{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
@Value("${amqp.exchange.{{- channelName -}}}")

{% for channelName, channel in asyncapi.channels() %}
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;

{% endif %}{% endfor %}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
@Value("${amqp.queue.{{- channelName -}}}")
@Value("${amqp.{{- channelName -}}.queue}")
private String {{channelName}}Queue;

{% endif %}{% endfor %}
@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% endfor %}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that this is a good idea to generate exchange and queue for all channels without taking into account if API really has publish\subscribe for this channel


@Bean
public ConnectionFactory connectionFactory() {
Expand All @@ -54,64 +49,49 @@ public ConnectionFactory connectionFactory() {
}

@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
public Declarables exchanges() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}
new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);
}

@Bean
public Declarables queues() {
return new Declarables(
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %}
{% endif %}{% endfor %}
);
}
public Declarables declarables() {
{% for channelName, channel in asyncapi.channels() %}
Queue {{channelName}}_Queue = new Queue({{channelName}}Queue);
{% endfor %}

// consumer
{% for channelName, channel in asyncapi.channels() %}
TopicExchange {{channelName}}_Exchange = new TopicExchange({{channelName}}Exchange);
{% endfor %}

@Autowired
MessageHandlerService messageHandlerService;
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
{% for channelName, channel in asyncapi.channels() %}
Binding {{channelName}}_Binding = BindingBuilder.bind({{channelName}}_Queue)
.to({{channelName}}_Exchange).with({{channelName}}RoutingKey);
{% endfor %}

@Bean
public IntegrationFlow {{channelName | camelCase}}Flow() {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory(), {{channelName}}Queue))
.handle(messageHandlerService::handle{{channelName | upperFirst}})
.get();
return new Declarables(
{% set i = 1 %}{% for channelName, channel in asyncapi.channels() %}{% if i == asyncapi.channels() | size %}
{{channelName}}_Queue,
{{channelName}}_Exchange,
{{channelName}}_Binding
{% else %}
{{channelName}}_Queue,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here, you could use {% if not loop.last %},{% endif %} to add coma after _Binding instead of counting i

{{channelName}}_Exchange,
{{channelName}}_Binding,
{% set i = i+1 %} {% endif %} {% endfor %}
);
}
{% endif %}{% endfor %}

// publisher

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}

@Bean
public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() {
return new DirectChannel();
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}

@Bean
@ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel")
public AmqpOutboundEndpoint {{channelName | camelCase}}Outbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setExchangeName({{channelName}}Exchange);
outbound.setRoutingKey("#");
return outbound;
public AmqpTemplate template() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}
{% endif %}{% endfor %}
}
{% endmacro %}
38 changes: 38 additions & 0 deletions partials/AmqpPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{% macro amqpPublisher(asyncapi, params) %}

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor -%}
{% endif -%}
{% endfor %}


@Service
public class PublisherService {
@Autowired
private RabbitTemplate template;

{% for channelName, channel in asyncapi.channels() %}
Tenischev marked this conversation as resolved.
Show resolved Hide resolved
@Value("${amqp.{{- channelName -}}.exchange}")
private String {{channelName}}Exchange;

@Value("${amqp.{{- channelName -}}.routingKey}")
private String {{channelName}}RoutingKey;
{% endfor %}

{% for channelName, channel in asyncapi.channels() %}
Tenischev marked this conversation as resolved.
Show resolved Hide resolved
{%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %}
public void {{channel.subscribe().id() | camelCase}}(){
{{schemaName}} {{channelName}}Payload = new {{schemaName}}();
template.convertAndSend({{channelName}}Exchange, {{channelName}}RoutingKey, {{channelName}}Payload);
}
{% endfor %}
}

{% endmacro %}
1 change: 1 addition & 0 deletions template/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ repositories {
dependencies {
{%- if asyncapi | isProtocol('amqp') %}
implementation('org.springframework.integration:spring-integration-amqp')
implementation('org.springframework.integration:spring-integration-amqp')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's doubled?

{% endif -%}
{%- if asyncapi | isProtocol('mqtt') %}
implementation('org.springframework.integration:spring-integration-mqtt')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public void run(String... args) {
{%- for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}(){% else %}"Hello World from {{channelName}}"{% endif %});
publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}()
{% elif asyncapi | isProtocol('amqp') %}{% else %}"Hello World from {{channelName}}"{% endif %});
{% endfor -%}
{% endif -%}
{%- endfor %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
{%- endif %}
{%- endfor %}
{% endif %}
{% if asyncapi | isProtocol('amqp') and hasPublish %}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}
@Service
public class MessageHandlerService {

Expand All @@ -52,6 +62,16 @@ public class MessageHandlerService {
}
{%- endif %}
{% endfor %}

{% elif asyncapi | isProtocol('amqp') %}
{% for channelName, channel in asyncapi.channels() %}
Tenischev marked this conversation as resolved.
Show resolved Hide resolved
{%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think channel.publish().message() should be there, not subscribe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding of AMQP, the messages are published into an exchange and listens from a queue. This queue is defined under the subscriber/consumer module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a small logical/conceptual change required here. Will go through it once again and revise it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the messages are published into an exchange and listens from a queue" - agree.
But please have a look to explanation of AyncAPI publish\subscribe meaning - https://www.asyncapi.com/docs/tutorials/getting-started/hello-world

In this example, you only have one channel called hello. The sample application subscribes to this channel to receive hello {name} messages.

asyncapi: 2.6.0
info:
  title: Hello world application
  version: '0.1.0'
channels:
  hello:
    publish:
      message:
        payload:
          type: string
          pattern: '^hello .+$'

By means, if AsyncAPI define publish, that means that our generated application should be able to receive messages from the publish. And vice-versa, if AsyncAPI define subscribe, that means that our generated application should be able to sned messages to the subscribe.

The MessageHandlerService should be responsible for receiving messages to do so, listeners for the AsyncAPI publish channels should be defined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I had earlier referred to this doc from which my understanding of publish/subscribe was the opposite of your explanation. That is, Publish sends messages and Subscriber receives it. Following this, I thought a listener would be most appropriate for a subscriber instead.
  2. With the upcoming v3 changes to AsyncAPI, publish/subscribe will be replaced by send/receive. Should I hold on to the current changes until then? It will require a rewrite of the template generation.

Copy link
Member

@Tenischev Tenischev Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I do not find the doc is opposite to what I wrote.
It is the same meaning:
image
If in AsyncAPI the publish defined, then we should generate Subscriber\Consumer\MessageHandler.

Updating to AsyncAPI v3 is a global task and need to be implemented in whole template, not a part.
I would suggest to you to finish current PR for v2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I got where I went wrong. I was trying to validate the doc from a server perspective while it should've been the client instead. Sorry about that. So just to confirm, publish defines the consumer and subscribe defines the producer.
In that case for AMQP, queue and routing key should be defined under publish while exchange should come under subscribe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case for AMQP, queue and routing key should be defined under publish while exchange should come under subscribe.

Yes, this is also my understanding

Also, AMQP channel binding might be interesting for you in perspective of this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Will update accordingly.

@RabbitListener(queues = "${amqp.{{- channelName -}}.queue}")
public void {{channel.publish().id() | camelCase}}({{schemaName}} {{channelName}}Payload ){
LOGGER.info("Message received from {{- schemaName -}} : " + {{channelName}}Payload);
}
{% endfor %}

{% else %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package {{ params['userJavaPackage'] }}.service;
{%- from "partials/CommonPublisher.java" import commonPublisher -%}
{%- from "partials/KafkaPublisher.java" import kafkaPublisher -%}
{%- from "partials/AmqpPublisher.java" import amqpPublisher -%}
{%- if asyncapi | isProtocol('kafka') -%}
{{- kafkaPublisher(asyncapi, params) -}}
{%- elif asyncapi | isProtocol('amqp') -%}
{{- amqpPublisher(asyncapi, params) -}}
{%- else -%}
{{- commonPublisher(asyncapi) -}}
{%- endif -%}
22 changes: 8 additions & 14 deletions template/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,21 @@
{%- endif -%}
{%- endfor -%}

{%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %}
{%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %}
amqp:
broker: {% for line in server.description() | splitByLines %}
# {{line | safe}}{% endfor %}
host: {{server.url() | replace(':{port}', '') }}
host: {% if server.variable('port') %}{{server.url() | replace('{port}', server.variable('port').defaultValue())}}{% else %}{{server.url()}}{% endif %}
port: {% if server.variable('port') %}{{server.variable('port').defaultValue()}}{% endif %}
username:
username: {% if server.variable('username') %}{{server.variable('username').defaultValue()}}{% endif %}
password:
exchange:
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasSubscribe() and channel.subscribe().binding('amqp') %}
{{channelName}}: {{channel.subscribe().binding('amqp').exchange.name}}
{% endif %}
{% endfor %}
queue:
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() and channel.publish().binding('amqp') %}
{{channelName}}: {{channel.publish().binding('amqp').queue.name}}
{% endif %}
{{channelName}}:
exchange: {{channel.publish().binding('amqp').exchange}}
queue: {{channel.subscribe().binding('amqp').queue}}
routingKey: {{channel.subscribe().binding('amqp').routingKey}}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that this is a good idea to generate exchange and queue for all channels without taking into account if API really has publish\subscribe for this channel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Tenischev So would the best way to go about it be to put in a check to see if the publish/ subscribe has been defined for that channel and then define exchange/queues etc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes, of course this is not so critical for the configuration file, but it will keep it clean

{% endfor %}
{% endif %}
{% endif %}

{% if server.protocol() == 'mqtt' %}
mqtt:
Expand Down
76 changes: 76 additions & 0 deletions tests/user_examples/aqmp_config_multiple-channels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
asyncapi: 2.5.0
info:
title: Streetlights API Simplified
version: 1.0.0
description: |
The Smartylighting Streetlights API allows you to remotely manage the city lights.
This is a simplified version of the Streetlights API from other examples. This version is used in AsyncAPI documentation.
license:
name: Apache 2.0
url: https://www.apache.org/licenses/LICENSE-2.0
servers:
production:
url: localhost
protocol: amqp
description: RabbitMQ
variables:
port:
default: '5672'
username:
default: guest

channels:
lightMeasured_Streetlight1:
publish:
summary: Inform about environmental lighting conditions for Streetlight 1.
operationId: readLightMeasurement_Streetlight1
bindings:
amqp:
exchange: lightMeasuredExchange
message:
$ref: '#/components/messages/lightMeasured'
subscribe:
operationId: updateLightMeasurement_Streetlight1
message:
$ref: '#/components/messages/lightMeasured'
bindings:
amqp:
queue: lightMeasuredQueue_Streetlight1
routingKey: lightMeasuredRoutingKey_Streetlight1
lightMeasured_Streetlight2:
publish:
summary: Inform about environmental lighting conditions for Streetlight 2.
operationId: readLightMeasurement_Streetlight2
bindings:
amqp:
exchange: lightMeasuredExchange
message:
$ref: '#/components/messages/lightMeasured'
subscribe:
operationId: updateLightMeasurement_Streetlight2
message:
$ref: '#/components/messages/lightMeasured'
bindings:
amqp:
queue: lightMeasuredQueue_Streetlight2
routingKey: lightMeasuredRoutingKey_Streetlight2

components:
messages:
lightMeasured:
summary: Inform about environmental lighting conditions for a particular streetlight.
payload:
$ref: "#/components/schemas/lightMeasuredPayload"
schemas:
lightMeasuredPayload:
type: object
properties:
id:
type: integer
minimum: 0
description: Id of the streetlight.
lumens:
type: integer
minimum: 0
description: Light intensity measured in lumens.

Loading