Skip to content

Commit

Permalink
Remove @MessagingGateway usage
Browse files Browse the repository at this point in the history
The `@MessagingGateway` requires a classpath scanning which is not possible
when we use these function artifacts as dependencies, where packages are fully
different from what target project is built with
  • Loading branch information
artembilan committed Jan 12, 2024
1 parent 73111d9 commit cd03684
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import org.springframework.data.cassandra.core.WriteResult;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.integration.JavaUtils;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.cassandra.outbound.CassandraMessageHandler;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
import org.springframework.integration.support.json.Jackson2JsonObjectMapper;
import org.springframework.integration.transformer.AbstractPayloadTransformer;
import org.springframework.messaging.MessageHandler;
Expand All @@ -71,7 +71,7 @@ public class CassandraConsumerConfiguration {
private CassandraConsumerProperties cassandraSinkProperties;

@Bean
public Consumer<Object> cassandraConsumer(CassandraConsumerFunction cassandraConsumerFunction) {
Consumer<Object> cassandraConsumer(CassandraConsumerFunction cassandraConsumerFunction) {
return (payload) -> cassandraConsumerFunction.apply(payload).block();
}

Expand Down Expand Up @@ -121,6 +121,13 @@ public MessageHandler cassandraMessageHandler(ReactiveCassandraOperations cassan
return cassandraMessageHandler;
}

@Bean
AnnotationGatewayProxyFactoryBean<CassandraConsumerFunction> cassandraConsumerFunction() {
var gatewayProxyFactoryBean = new AnnotationGatewayProxyFactoryBean<>(CassandraConsumerFunction.class);
gatewayProxyFactoryBean.setDefaultRequestChannelName("cassandraConsumerFlow.input");
return gatewayProxyFactoryBean;
}

private static boolean isUuid(String uuid) {
if (uuid.length() == 36) {
String[] parts = uuid.split("-");
Expand Down Expand Up @@ -198,7 +205,6 @@ protected boolean looksLikeISO8601(String dateStr) {

}

@MessagingGateway(name = "cassandraConsumerFunction", defaultRequestChannel = "cassandraConsumerFlow.input")
interface CassandraConsumerFunction extends Function<Object, Mono<? extends WriteResult>> {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
import org.springframework.context.annotation.Bean;
import org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.MessageCountReleaseStrategy;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
Expand Down Expand Up @@ -138,6 +138,14 @@ IntegrationFlow elasticsearchConsumerFlow(@Qualifier("aggregator") MessageHandle
};
}

@Bean
@SuppressWarnings({ "unchecked", "rawtypes" })
AnnotationGatewayProxyFactoryBean<Consumer<Message<?>>> elasticsearchConsumer() {
var gatewayProxyFactoryBean = new AnnotationGatewayProxyFactoryBean<>(Consumer.class);
gatewayProxyFactoryBean.setDefaultRequestChannelName("elasticsearchConsumerFlow.input");
return (AnnotationGatewayProxyFactoryBean) gatewayProxyFactoryBean;
}

@Bean
public MessageHandler indexingHandler(ElasticsearchClient elasticsearchClient,
ElasticsearchConsumerProperties consumerProperties) {
Expand Down Expand Up @@ -303,9 +311,4 @@ record MessageWrapper(Message<?> message) {

}

@MessagingGateway(name = "elasticsearchConsumer", defaultRequestChannel = "elasticsearchConsumerFlow.input")
private interface MessageConsumer extends Consumer<Message<?>> {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.aggregator.DefaultAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.MessageCountReleaseStrategy;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.jdbc.SqlParameterSourceFactory;
import org.springframework.integration.store.MessageGroupStore;
Expand Down Expand Up @@ -131,6 +131,14 @@ IntegrationFlow jdbcConsumerFlow(@Qualifier("aggregator") MessageHandler aggrega
};
}

@Bean
@SuppressWarnings({ "unchecked", "rawtypes" })
AnnotationGatewayProxyFactoryBean<Consumer<Message<?>>> jdbcConsumer() {
var gatewayProxyFactoryBean = new AnnotationGatewayProxyFactoryBean<>(Consumer.class);
gatewayProxyFactoryBean.setDefaultRequestChannelName("jdbcConsumerFlow.input");
return (AnnotationGatewayProxyFactoryBean) gatewayProxyFactoryBean;
}

@Bean
FactoryBean<MessageHandler> aggregator(MessageGroupStore messageGroupStore) {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
Expand Down Expand Up @@ -232,11 +240,6 @@ public DataSourceInitializer nonBootDataSourceInitializer(DataSource dataSource,
return dataSourceInitializer;
}

@MessagingGateway(name = "jdbcConsumer", defaultRequestChannel = "jdbcConsumerFlow.input")
public interface MessageConsumer extends Consumer<Message<?>> {

}

private record ParameterFactory(MultiValueMap<String, Expression> columnExpressions,
EvaluationContext context) implements SqlParameterSourceFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
import org.springframework.messaging.Message;

/**
Expand All @@ -46,9 +46,12 @@ IntegrationFlow logConsumerFlow(LogConsumerProperties logSinkProperties) {
.nullChannel();
}

@MessagingGateway(name = "logConsumer", defaultRequestChannel = "logConsumerFlow.input")
private interface MessageConsumer extends Consumer<Message<?>> {

@Bean
@SuppressWarnings({ "unchecked", "rawtypes" })
AnnotationGatewayProxyFactoryBean<Consumer<Message<?>>> logConsumer() {
var gatewayProxyFactoryBean = new AnnotationGatewayProxyFactoryBean<>(Consumer.class);
gatewayProxyFactoryBean.setDefaultRequestChannelName("logConsumerFlow.input");
return (AnnotationGatewayProxyFactoryBean) gatewayProxyFactoryBean;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
import org.springframework.integration.sftp.dsl.Sftp;
import org.springframework.integration.sftp.dsl.SftpMessageHandlerSpec;
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
Expand Down Expand Up @@ -69,9 +69,12 @@ public IntegrationFlow ftpOutboundFlow(SftpConsumerProperties properties,
return (flow) -> flow.handle(handlerSpec);
}

@MessagingGateway(name = "sftpConsumer", defaultRequestChannel = "ftpOutboundFlow.input")
public interface MessageConsumer extends Consumer<Message<?>> {

@Bean
@SuppressWarnings({ "unchecked", "rawtypes" })
AnnotationGatewayProxyFactoryBean<Consumer<Message<?>>> sftpConsumer() {
var gatewayProxyFactoryBean = new AnnotationGatewayProxyFactoryBean<>(Consumer.class);
gatewayProxyFactoryBean.setDefaultRequestChannelName("ftpOutboundFlow.input");
return (AnnotationGatewayProxyFactoryBean) gatewayProxyFactoryBean;
}

}

0 comments on commit cd03684

Please sign in to comment.