Skip to content

Commit

Permalink
Make Log, Redis & RSocket consumers as auto-config
Browse files Browse the repository at this point in the history
* Fix all their Checkstyle violations
* Expose convenient `Consumer<Flux<Message<?>>>` for RSocket consumer
  • Loading branch information
artembilan committed Jan 9, 2024
1 parent 68e0990 commit eb8b5b3
Show file tree
Hide file tree
Showing 19 changed files with 119 additions and 88 deletions.
4 changes: 2 additions & 2 deletions consumer/spring-log-consumer/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ The consumer uses the `LoggingMessageHandler` from Spring Integration.

## Beans for injection

You can import `LogConsumerConfiguration` in the application and then inject the following bean.
The `LogConsumerConfiguration` auto-configuration provides the following bean:

`Consumer<Message<?>> logConsumer`

You can use `logConsumer` as a qualifier when injecting.

## Configuration Options

All configuration properties are prefixed with `log`.
All configuration properties are prefixed with `log.consumer`.

For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/consumer/log/LogConsumerProperties.java[LogConsumerProperties].

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,33 +18,35 @@

import java.util.function.Consumer;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.messaging.Message;

/**
* The Configuration class for {@link Consumer} which logs incoming data. For the logging
* logic a Spring Integration
* The Auto-configuration class for {@link Consumer} which logs incoming data. For the
* logging logic a Spring Integration
* {@link org.springframework.integration.handler.LoggingHandler} is used. If incoming
* payload is a {@code byte[]} and incoming message {@code contentType} header is
* text-compatible (e.g. {@code application/json}), it is converted into a {@link String}.
* Otherwise the payload is passed to logger as is.
* Otherwise, the payload is passed to logger as is.
*
* @author Artem Bilan
*/
@Configuration(proxyBeanMethods = false)
@AutoConfiguration
@EnableConfigurationProperties(LogConsumerProperties.class)
public class LogConsumerConfiguration {

@Bean
IntegrationFlow logConsumerFlow(LogConsumerProperties logSinkProperties) {
return IntegrationFlow.from(MessageConsumer.class, (gateway) -> gateway.beanName("logConsumer"))
return (flow) -> flow
.log(logSinkProperties.getLevel(), logSinkProperties.getName(), logSinkProperties.getExpression())
.nullChannel();
}

@MessagingGateway(name = "logConsumer", defaultRequestChannel = "logConsumerFlow.input")
private interface MessageConsumer extends Consumer<Message<?>> {

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,17 +24,15 @@
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.validation.annotation.Validated;

import static org.springframework.integration.handler.LoggingHandler.Level.INFO;

/**
* Configuration properties for the Log Sink app.
* Configuration properties for the Log consumer.
*
* @author Gary Russell
* @author Eric Bottard
* @author Chris Schaefer
* @author Artem Bilan
*/
@ConfigurationProperties("log")
@ConfigurationProperties("log.consumer")
@Validated
public class LogConsumerProperties {

Expand All @@ -52,11 +50,11 @@ public class LogConsumerProperties {
/**
* The level at which to log messages.
*/
private LoggingHandler.Level level = INFO;
private LoggingHandler.Level level = LoggingHandler.Level.INFO;

@NotBlank
public String getName() {
return name;
return this.name;
}

public void setName(String name) {
Expand All @@ -65,7 +63,7 @@ public void setName(String name) {

@NotBlank
public String getExpression() {
return expression;
return this.expression;
}

public void setExpression(String expression) {
Expand All @@ -74,7 +72,7 @@ public void setExpression(String expression) {

@NotNull
public LoggingHandler.Level getLevel() {
return level;
return this.level;
}

public void setLevel(LoggingHandler.Level level) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The Log consumer auto-configuration support.
*/
package org.springframework.cloud.fn.consumer.log;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.cloud.fn.consumer.log.LogConsumerConfiguration
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,7 +44,7 @@
* @author Artem Bilan
*/
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@SpringBootTest({ "log.name=foo", "log.level=warn", "log.expression=payload.toUpperCase()" })
@SpringBootTest({ "log.consumer.name=foo", "log.consumer.level=warn", "log.consumer.expression=payload.toUpperCase()" })
class LogConsumerApplicationTests {

@Autowired
Expand All @@ -55,7 +55,7 @@ class LogConsumerApplicationTests {
private LoggingHandler loggingHandler;

@Test
public void testJsonContentType() {
void testJsonContentType() {
Message<String> message = MessageBuilder.withPayload("{\"foo\":\"bar\"}")
.setHeader("contentType", new MimeType("json"))
.build();
Expand Down
2 changes: 1 addition & 1 deletion consumer/spring-redis-consumer/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A consumer that allows you to write incoming messages into Redis.

## Beans for injection

You can import `RedisConsumerConfiguration` in the application and then inject the following bean.
The `RedisConsumerConfiguration` auto-configuration provides the following bean:

`Consumer<Message<?>> redisConsumer`

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,9 +18,10 @@

import java.util.function.Consumer;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.outbound.RedisPublishingMessageHandler;
import org.springframework.integration.redis.outbound.RedisQueueOutboundChannelAdapter;
Expand All @@ -29,13 +30,15 @@
import org.springframework.messaging.MessageHandler;

/**
* Auto-configuration for Redis consumer.
*
* @author Eric Bottard
* @author Mark Pollack
* @author Gary Russell
* @author Soby Chacko
* @author Artem Bilan
*/
@Configuration
@AutoConfiguration(after = RedisAutoConfiguration.class)
@EnableConfigurationProperties(RedisConsumerProperties.class)
public class RedisConsumerConfiguration {

Expand All @@ -47,6 +50,7 @@ public Consumer<Message<?>> redisConsumer(MessageHandler redisConsumerMessageHan
@Bean
public MessageHandler redisConsumerMessageHandler(RedisConnectionFactory redisConnectionFactory,
RedisConsumerProperties redisConsumerProperties) {

if (redisConsumerProperties.isKeyPresent()) {
RedisStoreWritingMessageHandler redisStoreWritingMessageHandler = new RedisStoreWritingMessageHandler(
redisConnectionFactory);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,15 +23,12 @@

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated;

/**
* Used to configure those Redis Sink module options that are not related to connecting to
* Redis.
* The configuration properties for Redis consumer.
*
* @author Eric Bottard
* @author Mark Pollack
Expand All @@ -42,22 +39,20 @@
@Validated
public class RedisConsumerProperties {

private static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser();

/**
* A SpEL expression to use for topic.
*/
private String topicExpression;
private Expression topicExpression;

/**
* A SpEL expression to use for queue.
*/
private String queueExpression;
private Expression queueExpression;

/**
* A SpEL expression to use for storing to a key.
*/
private String keyExpression;
private Expression keyExpression;

/**
* A literal key name to use when storing to a key.
Expand All @@ -75,71 +70,71 @@ public class RedisConsumerProperties {
private String topic;

public Expression keyExpression() {
return key != null ? new LiteralExpression(key) : EXPRESSION_PARSER.parseExpression(keyExpression);
return (this.key != null) ? new LiteralExpression(this.key) : this.keyExpression;
}

public Expression queueExpression() {
return queue != null ? new LiteralExpression(queue) : EXPRESSION_PARSER.parseExpression(queueExpression);
return (this.queue != null) ? new LiteralExpression(this.queue) : this.queueExpression;
}

public Expression topicExpression() {
return topic != null ? new LiteralExpression(topic) : EXPRESSION_PARSER.parseExpression(topicExpression);
return (this.topic != null) ? new LiteralExpression(this.topic) : this.topicExpression;
}

boolean isKeyPresent() {
return StringUtils.hasText(key) || keyExpression != null;
return StringUtils.hasText(this.key) || this.keyExpression != null;
}

boolean isQueuePresent() {
return StringUtils.hasText(queue) || queueExpression != null;
return StringUtils.hasText(this.queue) || this.queueExpression != null;
}

boolean isTopicPresent() {
return StringUtils.hasText(topic) || topicExpression != null;
return StringUtils.hasText(this.topic) || this.topicExpression != null;
}

public String getTopicExpression() {
return topicExpression;
public Expression getTopicExpression() {
return this.topicExpression;
}

public void setTopicExpression(String topicExpression) {
public void setTopicExpression(Expression topicExpression) {
this.topicExpression = topicExpression;
}

public String getQueueExpression() {
return queueExpression;
public Expression getQueueExpression() {
return this.queueExpression;
}

public void setQueueExpression(String queueExpression) {
public void setQueueExpression(Expression queueExpression) {
this.queueExpression = queueExpression;
}

public String getKeyExpression() {
return keyExpression;
public Expression getKeyExpression() {
return this.keyExpression;
}

public void setKeyExpression(String keyExpression) {
public void setKeyExpression(Expression keyExpression) {
this.keyExpression = keyExpression;
}

public String getKey() {
return key;
return this.key;
}

public void setKey(String key) {
this.key = key;
}

public String getQueue() {
return queue;
return this.queue;
}

public void setQueue(String queue) {
this.queue = queue;
}

public String getTopic() {
return topic;
return this.topic;
}

public void setTopic(String topic) {
Expand All @@ -151,7 +146,8 @@ public void setTopic(String topic) {
@AssertTrue(message = "Exactly one of 'queue', 'queueExpression', 'key', 'keyExpression', "
+ "'topic' and 'topicExpression' must be set")
public boolean isMutuallyExclusive() {
Object[] props = new Object[] { queue, queueExpression, key, keyExpression, topic, topicExpression };
Object[] props = { this.queue, this.queueExpression, this.key, this.keyExpression, this.topic,
this.topicExpression };
return (props.length - 1) == Collections.frequency(Arrays.asList(props), null);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The Redis consumer auto-configuration support.
*/
package org.springframework.cloud.fn.consumer.redis;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.cloud.fn.consumer.redis.RedisConsumerConfiguration
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,13 +70,7 @@ public void testWithTopic() throws Exception {
container.stop();
}

private static class Listener {

private final CountDownLatch latch;

Listener(CountDownLatch latch) {
this.latch = latch;
}
record Listener(CountDownLatch latch) {

@SuppressWarnings("unused")
public void handleMessage(String s) {
Expand Down

This file was deleted.

Loading

0 comments on commit eb8b5b3

Please sign in to comment.