From eb8b5b32b6a2288fbc459586a8ba2a3d72261fd6 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 9 Jan 2024 17:25:39 -0500 Subject: [PATCH] Make Log, Redis & RSocket consumers as auto-config * Fix all their Checkstyle violations * Expose convenient `Consumer>>` for RSocket consumer --- consumer/spring-log-consumer/README.adoc | 4 +- .../log/LogConsumerConfiguration.java | 16 +++--- .../consumer/log/LogConsumerProperties.java | 16 +++--- .../cloud/fn/consumer/log/package-info.java | 4 ++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + .../log/LogConsumerApplicationTests.java | 6 +-- consumer/spring-redis-consumer/README.adoc | 2 +- .../redis/RedisConsumerConfiguration.java | 10 ++-- .../redis/RedisConsumerProperties.java | 54 +++++++++---------- .../cloud/fn/consumer/redis/package-info.java | 4 ++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + .../redis/RedisConsumerTopicTests.java | 10 +--- .../src/test/resources/application.properties | 1 - consumer/spring-rsocket-consumer/README.adoc | 9 ++-- .../rsocket/RsocketConsumerConfiguration.java | 28 +++++++--- .../rsocket/RsocketConsumerProperties.java | 7 ++- .../fn/consumer/rsocket/package-info.java | 4 ++ ...ot.autoconfigure.AutoConfiguration.imports | 1 + .../rsocket/RsocketConsumerTests.java | 29 +++++----- 19 files changed, 119 insertions(+), 88 deletions(-) create mode 100644 consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/package-info.java create mode 100644 consumer/spring-log-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports create mode 100644 consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/package-info.java create mode 100644 consumer/spring-redis-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports delete mode 100644 consumer/spring-redis-consumer/src/test/resources/application.properties create mode 100644 consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/package-info.java create mode 100644 consumer/spring-rsocket-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports diff --git a/consumer/spring-log-consumer/README.adoc b/consumer/spring-log-consumer/README.adoc index 2f3bd9ec..a9fe9c37 100644 --- a/consumer/spring-log-consumer/README.adoc +++ b/consumer/spring-log-consumer/README.adoc @@ -5,7 +5,7 @@ The consumer uses the `LoggingMessageHandler` from Spring Integration. ## Beans for injection -You can import `LogConsumerConfiguration` in the application and then inject the following bean. +The `LogConsumerConfiguration` auto-configuration provides the following bean: `Consumer> logConsumer` @@ -13,7 +13,7 @@ You can use `logConsumer` as a qualifier when injecting. ## Configuration Options -All configuration properties are prefixed with `log`. +All configuration properties are prefixed with `log.consumer`. For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/consumer/log/LogConsumerProperties.java[LogConsumerProperties]. diff --git a/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/LogConsumerConfiguration.java b/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/LogConsumerConfiguration.java index bce52c5e..f98993b4 100644 --- a/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/LogConsumerConfiguration.java +++ b/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/LogConsumerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,33 +18,35 @@ import java.util.function.Consumer; +import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.messaging.Message; /** - * The Configuration class for {@link Consumer} which logs incoming data. For the logging - * logic a Spring Integration + * The Auto-configuration class for {@link Consumer} which logs incoming data. For the + * logging logic a Spring Integration * {@link org.springframework.integration.handler.LoggingHandler} is used. If incoming * payload is a {@code byte[]} and incoming message {@code contentType} header is * text-compatible (e.g. {@code application/json}), it is converted into a {@link String}. - * Otherwise the payload is passed to logger as is. + * Otherwise, the payload is passed to logger as is. * * @author Artem Bilan */ -@Configuration(proxyBeanMethods = false) +@AutoConfiguration @EnableConfigurationProperties(LogConsumerProperties.class) public class LogConsumerConfiguration { @Bean IntegrationFlow logConsumerFlow(LogConsumerProperties logSinkProperties) { - return IntegrationFlow.from(MessageConsumer.class, (gateway) -> gateway.beanName("logConsumer")) + return (flow) -> flow .log(logSinkProperties.getLevel(), logSinkProperties.getName(), logSinkProperties.getExpression()) .nullChannel(); } + @MessagingGateway(name = "logConsumer", defaultRequestChannel = "logConsumerFlow.input") private interface MessageConsumer extends Consumer> { } diff --git a/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/LogConsumerProperties.java b/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/LogConsumerProperties.java index ffb54929..77202351 100644 --- a/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/LogConsumerProperties.java +++ b/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/LogConsumerProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,17 +24,15 @@ import org.springframework.integration.handler.LoggingHandler; import org.springframework.validation.annotation.Validated; -import static org.springframework.integration.handler.LoggingHandler.Level.INFO; - /** - * Configuration properties for the Log Sink app. + * Configuration properties for the Log consumer. * * @author Gary Russell * @author Eric Bottard * @author Chris Schaefer * @author Artem Bilan */ -@ConfigurationProperties("log") +@ConfigurationProperties("log.consumer") @Validated public class LogConsumerProperties { @@ -52,11 +50,11 @@ public class LogConsumerProperties { /** * The level at which to log messages. */ - private LoggingHandler.Level level = INFO; + private LoggingHandler.Level level = LoggingHandler.Level.INFO; @NotBlank public String getName() { - return name; + return this.name; } public void setName(String name) { @@ -65,7 +63,7 @@ public void setName(String name) { @NotBlank public String getExpression() { - return expression; + return this.expression; } public void setExpression(String expression) { @@ -74,7 +72,7 @@ public void setExpression(String expression) { @NotNull public LoggingHandler.Level getLevel() { - return level; + return this.level; } public void setLevel(LoggingHandler.Level level) { diff --git a/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/package-info.java b/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/package-info.java new file mode 100644 index 00000000..c574f7f5 --- /dev/null +++ b/consumer/spring-log-consumer/src/main/java/org/springframework/cloud/fn/consumer/log/package-info.java @@ -0,0 +1,4 @@ +/** + * The Log consumer auto-configuration support. + */ +package org.springframework.cloud.fn.consumer.log; diff --git a/consumer/spring-log-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/consumer/spring-log-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..fe30ce89 --- /dev/null +++ b/consumer/spring-log-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.consumer.log.LogConsumerConfiguration diff --git a/consumer/spring-log-consumer/src/test/java/org/springframework/cloud/fn/consumer/log/LogConsumerApplicationTests.java b/consumer/spring-log-consumer/src/test/java/org/springframework/cloud/fn/consumer/log/LogConsumerApplicationTests.java index c7b208b0..0cdf8a20 100644 --- a/consumer/spring-log-consumer/src/test/java/org/springframework/cloud/fn/consumer/log/LogConsumerApplicationTests.java +++ b/consumer/spring-log-consumer/src/test/java/org/springframework/cloud/fn/consumer/log/LogConsumerApplicationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,7 @@ * @author Artem Bilan */ @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) -@SpringBootTest({ "log.name=foo", "log.level=warn", "log.expression=payload.toUpperCase()" }) +@SpringBootTest({ "log.consumer.name=foo", "log.consumer.level=warn", "log.consumer.expression=payload.toUpperCase()" }) class LogConsumerApplicationTests { @Autowired @@ -55,7 +55,7 @@ class LogConsumerApplicationTests { private LoggingHandler loggingHandler; @Test - public void testJsonContentType() { + void testJsonContentType() { Message message = MessageBuilder.withPayload("{\"foo\":\"bar\"}") .setHeader("contentType", new MimeType("json")) .build(); diff --git a/consumer/spring-redis-consumer/README.adoc b/consumer/spring-redis-consumer/README.adoc index 4d3e3355..958daef8 100644 --- a/consumer/spring-redis-consumer/README.adoc +++ b/consumer/spring-redis-consumer/README.adoc @@ -4,7 +4,7 @@ A consumer that allows you to write incoming messages into Redis. ## Beans for injection -You can import `RedisConsumerConfiguration` in the application and then inject the following bean. +The `RedisConsumerConfiguration` auto-configuration provides the following bean: `Consumer> redisConsumer` diff --git a/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerConfiguration.java b/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerConfiguration.java index a3ab67b0..60f9796a 100644 --- a/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerConfiguration.java +++ b/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,9 +18,10 @@ import java.util.function.Consumer; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.integration.redis.outbound.RedisPublishingMessageHandler; import org.springframework.integration.redis.outbound.RedisQueueOutboundChannelAdapter; @@ -29,13 +30,15 @@ import org.springframework.messaging.MessageHandler; /** + * Auto-configuration for Redis consumer. + * * @author Eric Bottard * @author Mark Pollack * @author Gary Russell * @author Soby Chacko * @author Artem Bilan */ -@Configuration +@AutoConfiguration(after = RedisAutoConfiguration.class) @EnableConfigurationProperties(RedisConsumerProperties.class) public class RedisConsumerConfiguration { @@ -47,6 +50,7 @@ public Consumer> redisConsumer(MessageHandler redisConsumerMessageHan @Bean public MessageHandler redisConsumerMessageHandler(RedisConnectionFactory redisConnectionFactory, RedisConsumerProperties redisConsumerProperties) { + if (redisConsumerProperties.isKeyPresent()) { RedisStoreWritingMessageHandler redisStoreWritingMessageHandler = new RedisStoreWritingMessageHandler( redisConnectionFactory); diff --git a/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerProperties.java b/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerProperties.java index 6c777f78..784be3c4 100644 --- a/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerProperties.java +++ b/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,15 +23,12 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.expression.Expression; -import org.springframework.expression.ExpressionParser; import org.springframework.expression.common.LiteralExpression; -import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.util.StringUtils; import org.springframework.validation.annotation.Validated; /** - * Used to configure those Redis Sink module options that are not related to connecting to - * Redis. + * The configuration properties for Redis consumer. * * @author Eric Bottard * @author Mark Pollack @@ -42,22 +39,20 @@ @Validated public class RedisConsumerProperties { - private static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser(); - /** * A SpEL expression to use for topic. */ - private String topicExpression; + private Expression topicExpression; /** * A SpEL expression to use for queue. */ - private String queueExpression; + private Expression queueExpression; /** * A SpEL expression to use for storing to a key. */ - private String keyExpression; + private Expression keyExpression; /** * A literal key name to use when storing to a key. @@ -75,55 +70,55 @@ public class RedisConsumerProperties { private String topic; public Expression keyExpression() { - return key != null ? new LiteralExpression(key) : EXPRESSION_PARSER.parseExpression(keyExpression); + return (this.key != null) ? new LiteralExpression(this.key) : this.keyExpression; } public Expression queueExpression() { - return queue != null ? new LiteralExpression(queue) : EXPRESSION_PARSER.parseExpression(queueExpression); + return (this.queue != null) ? new LiteralExpression(this.queue) : this.queueExpression; } public Expression topicExpression() { - return topic != null ? new LiteralExpression(topic) : EXPRESSION_PARSER.parseExpression(topicExpression); + return (this.topic != null) ? new LiteralExpression(this.topic) : this.topicExpression; } boolean isKeyPresent() { - return StringUtils.hasText(key) || keyExpression != null; + return StringUtils.hasText(this.key) || this.keyExpression != null; } boolean isQueuePresent() { - return StringUtils.hasText(queue) || queueExpression != null; + return StringUtils.hasText(this.queue) || this.queueExpression != null; } boolean isTopicPresent() { - return StringUtils.hasText(topic) || topicExpression != null; + return StringUtils.hasText(this.topic) || this.topicExpression != null; } - public String getTopicExpression() { - return topicExpression; + public Expression getTopicExpression() { + return this.topicExpression; } - public void setTopicExpression(String topicExpression) { + public void setTopicExpression(Expression topicExpression) { this.topicExpression = topicExpression; } - public String getQueueExpression() { - return queueExpression; + public Expression getQueueExpression() { + return this.queueExpression; } - public void setQueueExpression(String queueExpression) { + public void setQueueExpression(Expression queueExpression) { this.queueExpression = queueExpression; } - public String getKeyExpression() { - return keyExpression; + public Expression getKeyExpression() { + return this.keyExpression; } - public void setKeyExpression(String keyExpression) { + public void setKeyExpression(Expression keyExpression) { this.keyExpression = keyExpression; } public String getKey() { - return key; + return this.key; } public void setKey(String key) { @@ -131,7 +126,7 @@ public void setKey(String key) { } public String getQueue() { - return queue; + return this.queue; } public void setQueue(String queue) { @@ -139,7 +134,7 @@ public void setQueue(String queue) { } public String getTopic() { - return topic; + return this.topic; } public void setTopic(String topic) { @@ -151,7 +146,8 @@ public void setTopic(String topic) { @AssertTrue(message = "Exactly one of 'queue', 'queueExpression', 'key', 'keyExpression', " + "'topic' and 'topicExpression' must be set") public boolean isMutuallyExclusive() { - Object[] props = new Object[] { queue, queueExpression, key, keyExpression, topic, topicExpression }; + Object[] props = { this.queue, this.queueExpression, this.key, this.keyExpression, this.topic, + this.topicExpression }; return (props.length - 1) == Collections.frequency(Arrays.asList(props), null); } diff --git a/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/package-info.java b/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/package-info.java new file mode 100644 index 00000000..b69226b1 --- /dev/null +++ b/consumer/spring-redis-consumer/src/main/java/org/springframework/cloud/fn/consumer/redis/package-info.java @@ -0,0 +1,4 @@ +/** + * The Redis consumer auto-configuration support. + */ +package org.springframework.cloud.fn.consumer.redis; diff --git a/consumer/spring-redis-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/consumer/spring-redis-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..6abe7337 --- /dev/null +++ b/consumer/spring-redis-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.consumer.redis.RedisConsumerConfiguration diff --git a/consumer/spring-redis-consumer/src/test/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerTopicTests.java b/consumer/spring-redis-consumer/src/test/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerTopicTests.java index b69409c6..b984ae37 100644 --- a/consumer/spring-redis-consumer/src/test/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerTopicTests.java +++ b/consumer/spring-redis-consumer/src/test/java/org/springframework/cloud/fn/consumer/redis/RedisConsumerTopicTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -70,13 +70,7 @@ public void testWithTopic() throws Exception { container.stop(); } - private static class Listener { - - private final CountDownLatch latch; - - Listener(CountDownLatch latch) { - this.latch = latch; - } + record Listener(CountDownLatch latch) { @SuppressWarnings("unused") public void handleMessage(String s) { diff --git a/consumer/spring-redis-consumer/src/test/resources/application.properties b/consumer/spring-redis-consumer/src/test/resources/application.properties deleted file mode 100644 index b7db2541..00000000 --- a/consumer/spring-redis-consumer/src/test/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ -# Empty diff --git a/consumer/spring-rsocket-consumer/README.adoc b/consumer/spring-rsocket-consumer/README.adoc index 1aba1e70..5df129ef 100644 --- a/consumer/spring-rsocket-consumer/README.adoc +++ b/consumer/spring-rsocket-consumer/README.adoc @@ -5,11 +5,14 @@ The consumer uses the RSocket support from https://docs.spring.io/spring/docs/cu ## Beans for injection -You can import `RSocketConsumerConfiguration` in the application and then inject the following bean. +The `RSocketConsumerConfiguration` auto-configuration provides the following beans: -`Function>, Mono> rsocketConsumer` +`Function>, Mono> rsocketFunctionsConsumer` -You can use `rsocketConsumer` as a qualifier when injecting. +You can use `rsocketFunctionsConsumer` as a qualifier when injecting. + +The returned `Mono` has to be subscribed. +Or `Consumer>> rsocketConsumer` can be used instead which just does a `Mono.block()` before returning. ## Configuration Options diff --git a/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerConfiguration.java b/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerConfiguration.java index 26d86c46..c842360b 100644 --- a/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerConfiguration.java +++ b/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,31 +16,47 @@ package org.springframework.cloud.fn.consumer.rsocket; +import java.util.function.Consumer; import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import org.springframework.messaging.rsocket.RSocketRequester; -@Configuration(proxyBeanMethods = false) +/** + * Auto-configuration for RSocket consumer. + * + * @author Artem Bilan + */ +@AutoConfiguration(after = RSocketRequesterAutoConfiguration.class) @EnableConfigurationProperties(RsocketConsumerProperties.class) public class RsocketConsumerConfiguration { @Bean - public Function>, Mono> rsocketConsumer(RSocketRequester.Builder builder, + public Consumer>> rsocketConsumer( + @Qualifier("rsocketFunctionConsumer") Function>, Mono> rsocketFunctionConsumer) { + + return (data) -> rsocketFunctionConsumer.apply(data).block(); + } + + @Bean + public Function>, Mono> rsocketFunctionConsumer(RSocketRequester.Builder builder, RsocketConsumerProperties rsocketConsumerProperties) { - RSocketRequester rSocketRequester = rsocketConsumerProperties.getUri() != null + + RSocketRequester rSocketRequester = (rsocketConsumerProperties.getUri() != null) ? builder.websocket(rsocketConsumerProperties.getUri()) : builder.tcp(rsocketConsumerProperties.getHost(), rsocketConsumerProperties.getPort()); String route = rsocketConsumerProperties.getRoute(); - return input -> input.flatMap(message -> rSocketRequester.route(route).data(message.getPayload()).send()) + return (input) -> input.flatMap((message) -> rSocketRequester.route(route).data(message.getPayload()).send()) .ignoreElements(); } diff --git a/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerProperties.java b/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerProperties.java index 43769e51..765466c3 100644 --- a/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerProperties.java +++ b/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,11 @@ import org.springframework.boot.context.properties.ConfigurationProperties; +/** + * The RSocket consumer configuration properties. + * + * @author Artem Bilan + */ @ConfigurationProperties("rsocket.consumer") public class RsocketConsumerProperties { diff --git a/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/package-info.java b/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/package-info.java new file mode 100644 index 00000000..d1d76e66 --- /dev/null +++ b/consumer/spring-rsocket-consumer/src/main/java/org/springframework/cloud/fn/consumer/rsocket/package-info.java @@ -0,0 +1,4 @@ +/** + * The Rsocket consumer auto-configuration support. + */ +package org.springframework.cloud.fn.consumer.rsocket; diff --git a/consumer/spring-rsocket-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/consumer/spring-rsocket-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..852a2658 --- /dev/null +++ b/consumer/spring-rsocket-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.consumer.rsocket.RsocketConsumerConfiguration diff --git a/consumer/spring-rsocket-consumer/src/test/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerTests.java b/consumer/spring-rsocket-consumer/src/test/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerTests.java index fd296239..f94b1803 100644 --- a/consumer/spring-rsocket-consumer/src/test/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerTests.java +++ b/consumer/spring-rsocket-consumer/src/test/java/org/springframework/cloud/fn/consumer/rsocket/RsocketConsumerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,16 @@ package org.springframework.cloud.fn.consumer.rsocket; -import java.util.function.Function; +import java.util.function.Consumer; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfiguration; import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration; @@ -45,28 +45,27 @@ @DirtiesContext public class RsocketConsumerTests { - private static ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner() - .withUserConfiguration(RsocketConsumerConfiguration.class, RSocketRequesterAutoConfiguration.class, - RSocketStrategiesAutoConfiguration.class); + private static final ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(RSocketRequesterAutoConfiguration.class, + RSocketStrategiesAutoConfiguration.class, RsocketConsumerConfiguration.class)); @Autowired ApplicationContext applicationContext; + @SuppressWarnings("unchecked") @Test void testRsocketConsumer() { - RSocketServerBootstrap serverBootstrap = applicationContext.getBean(RSocketServerBootstrap.class); RSocketServer server = (RSocketServer) ReflectionTestUtils.getField(serverBootstrap, "server"); final int port = server.address().getPort(); applicationContextRunner .withPropertyValues("rsocket.consumer.port=" + port, "rsocket.consumer.route=test-route") - .run(context -> { - Function>, Mono> rsocketConsumer = context.getBean("rsocketConsumer", - Function.class); - rsocketConsumer.apply(Flux.just(new GenericMessage<>("Hello RSocket"))).subscribe(); + .run((context) -> { + Consumer>> rsocketConsumer = context.getBean("rsocketConsumer", Consumer.class); + rsocketConsumer.accept(Flux.just(new GenericMessage<>("Hello RSocket"))); - StepVerifier.create(RSocketserverApplication.fireForgetPayloads) + StepVerifier.create(RSocketserverApplication.fireForgetPayloads.asFlux()) .expectNext("Hello RSocket") .thenCancel() .verify(); @@ -79,11 +78,11 @@ void testRsocketConsumer() { @Controller static class RSocketserverApplication { - static final ReplayProcessor fireForgetPayloads = ReplayProcessor.create(); + static final Sinks.Many fireForgetPayloads = Sinks.many().replay().all(); @MessageMapping("test-route") void someMethod(String payload) { - fireForgetPayloads.onNext(payload); + fireForgetPayloads.emitNext(payload, Sinks.EmitFailureHandler.FAIL_FAST); } }