diff --git a/example/itest/src/test/kotlin/io/holunda/camunda/bpm/correlate/ingress/axon/AxonChannelConfigurationIT.kt b/example/itest/src/test/kotlin/io/holunda/camunda/bpm/correlate/ingress/axon/AxonChannelConfigurationIT.kt index b91ef9d..1bcd766 100644 --- a/example/itest/src/test/kotlin/io/holunda/camunda/bpm/correlate/ingress/axon/AxonChannelConfigurationIT.kt +++ b/example/itest/src/test/kotlin/io/holunda/camunda/bpm/correlate/ingress/axon/AxonChannelConfigurationIT.kt @@ -20,12 +20,12 @@ import org.springframework.test.context.junit.jupiter.SpringExtension webEnvironment = SpringBootTest.WebEnvironment.MOCK, properties = [ // axon-1 - "correlate.channels.axon-1.type=axon-event", - "correlate.channels.axon-1.enabled=true", + "correlate.channels.axonOne.type=axon-event", + "correlate.channels.axonOne.enabled=true", // axon-2 "correlate.channels.axon-2.type=axon-event", "correlate.channels.axon-2.enabled=true", - "correlate.channels.axon-2.beanName=specifiedHandlerName", + "correlate.channels.axon-2.beanNamePrefix=specified", // unknown-type "correlate.channels.unknown-type.type=unknown-type", "correlate.channels.unknown-type.enabled=true", @@ -43,7 +43,7 @@ internal class AxonChannelConfigurationIT { private lateinit var handlers: Map @Autowired - @Qualifier("specifiedHandlerName") + @Qualifier("specifiedConverter") private lateinit var converter: AxonEventMessageHeaderConverter @@ -51,16 +51,16 @@ internal class AxonChannelConfigurationIT { fun configures_two_consumers() { assertThat(handlers).hasSize(2) - assertThat(handlers.keys).containsExactlyInAnyOrder("axon-1-handler", "specifiedHandlerName") - assertThat(handlers["axon-1-handler"]!!.channelName).isEqualTo("axon-1") - assertThat(handlers["specifiedHandlerName"]!!.channelName).isEqualTo("axon-2") - assertThat(handlers["specifiedHandlerName"]!!.axonEventMessageHeaderConverter).isEqualTo(converter) + assertThat(handlers.keys).containsExactlyInAnyOrder("axonOneHandler", "specifiedHandler") + assertThat(handlers["axonOneHandler"]!!.channelName).isEqualTo("axonOne") + assertThat(handlers["specifiedHandler"]!!.channelName).isEqualTo("axon-2") + assertThat(handlers["specifiedHandler"]!!.axonEventMessageHeaderConverter).isEqualTo(converter) } @SpringBootApplication(exclude = [BatchCorrelationSchedulerConfiguration::class]) class TestApplication { - @Bean("specifiedHandlerName") - fun specifiedHandlerName(): AxonEventMessageHeaderConverter = mock() + @Bean("specifiedConverter") + fun doesNotMatter(): AxonEventMessageHeaderConverter = mock() @Bean fun singleMessageCorrelationStrategy(): SingleMessageCorrelationStrategy = mock() diff --git a/example/itest/src/test/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelConfigurationIT.kt b/example/itest/src/test/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelConfigurationIT.kt index 071de36..6621285 100644 --- a/example/itest/src/test/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelConfigurationIT.kt +++ b/example/itest/src/test/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelConfigurationIT.kt @@ -20,23 +20,23 @@ import org.springframework.test.context.junit.jupiter.SpringExtension webEnvironment = SpringBootTest.WebEnvironment.MOCK, properties = [ // kafka-1 - "correlate.channels.kafka-1.type=stream", - "correlate.channels.kafka-1.enabled=true", + "correlate.channels.kafkaOne.type=stream", + "correlate.channels.kafkaOne.enabled=true", // kafka-2 - "correlate.channels.kafka-2.type=stream", - "correlate.channels.kafka-2.enabled=true", - "correlate.channels.kafka-2.beanName=specifiedConsumerName", + "correlate.channels.kafkaTwo.type=stream", + "correlate.channels.kafkaTwo.enabled=true", + "correlate.channels.kafkaTwo.beanNamePrefix=specifiedName", // unknown-type - "correlate.channels.unknown-type.type=unknown-type", - "correlate.channels.unknown-type.enabled=true", + "correlate.channels.unknownType.type=unknown-type", + "correlate.channels.unknownType.enabled=true", // disabled "correlate.channels.disabled.type=stream", "correlate.channels.disabled.enabled=false", // function declaration - "spring.cloud.stream.function.definition=kafka-1-consumer; specified-consumer-name", + "spring.cloud.stream.function.definition=kafkaOneConsumer; specifiedNameConsumer", // bindings - "spring.cloud.stream.function.bindings.kafka-1-consumer-in-0=correlate-ingress-binding-1", - "spring.cloud.stream.function.bindings.specified-consumer-name-in-0=correlate-ingress-binding-1", + "spring.cloud.stream.function.bindings.kafkaOneConsumer-in-0=correlate-ingress-binding-1", + "spring.cloud.stream.function.bindings.specifiedNameConsumer-in-0=correlate-ingress-binding-1", ] ) @@ -49,23 +49,23 @@ internal class SpringCloudStreamChannelConfigurationIT { private lateinit var consumers: Map @Autowired - @Qualifier("specifiedConsumerName") - private lateinit var converter: StreamChannelMessageHeaderConverter - + @Qualifier("specifiedNameConverter") + private lateinit var converter: StreamChannelMessageHeaderConverter // this converter will be picked up because of the name of the field @Test fun configures_two_consumers() { assertThat(consumers).hasSize(2) - assertThat(consumers.keys).containsExactlyInAnyOrder("kafka-1-consumer", "specified-consumer-name") - assertThat(consumers["kafka-1-consumer"]!!.channelName).isEqualTo("kafka-1") - assertThat(consumers["specified-consumer-name"]!!.channelName).isEqualTo("kafka-2") - assertThat(consumers["specified-consumer-name"]!!.streamChannelMessageHeaderConverter).isEqualTo(converter) + assertThat(consumers.keys).containsExactlyInAnyOrder("kafkaOneConsumer", "specifiedNameConsumer") + assertThat(consumers["kafkaOneConsumer"]!!.channelName).isEqualTo("kafkaOne") + assertThat(consumers["specifiedNameConsumer"]!!.channelName).isEqualTo("kafkaTwo") + assertThat(consumers["specifiedNameConsumer"]!!.streamChannelMessageHeaderConverter).isEqualTo(converter) } @SpringBootApplication(exclude = [BatchCorrelationSchedulerConfiguration::class]) class TestApplication { - @Bean("specifiedConsumerName") + + @Bean("specifiedNameConverter") fun qualifiedConverter(): StreamChannelMessageHeaderConverter = mock() @Bean diff --git a/example/itest/src/test/resources/application-axon-event.yml b/example/itest/src/test/resources/application-axon-event.yml index 146f7c4..8707695 100644 --- a/example/itest/src/test/resources/application-axon-event.yml +++ b/example/itest/src/test/resources/application-axon-event.yml @@ -6,3 +6,4 @@ spring: - org.springframework.cloud.stream.config.BindingsEndpointAutoConfiguration - org.springframework.cloud.stream.config.BindingServiceConfiguration - org.springframework.cloud.stream.binder.kafka.config.ExtendedBindingHandlerMappingsProviderConfiguration + - org.springframework.cloud.stream.function.FunctionConfiguration diff --git a/example/itest/src/test/resources/application.yml b/example/itest/src/test/resources/application.yml index d73dd6f..67a8457 100644 --- a/example/itest/src/test/resources/application.yml +++ b/example/itest/src/test/resources/application.yml @@ -6,6 +6,7 @@ spring: show-sql: false axon: + disable-axoniq-console-message: true axonserver: enabled: false diff --git a/extension/axon/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/axon/AxonChannelProxyFactory.kt b/extension/axon/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/axon/AxonChannelProxyFactory.kt index 5670bcd..58ba879 100644 --- a/extension/axon/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/axon/AxonChannelProxyFactory.kt +++ b/extension/axon/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/axon/AxonChannelProxyFactory.kt @@ -47,18 +47,25 @@ class AxonChannelProxyFactory( val encoding: String = requireNotNull( getEncoding(config) ) { "Channel encoding is required, please set either globally or for channel." } val encoder = requireNotNull(payloadDecoders.find { it.supports(encoding) }) { "Could not find decoder for configured message encoding '$encoding'." } - val handlerName = config.beanName ?: "$name-handler" + // lookup named converter or take the default one + val converterName = (config.beanNamePrefix ?: name) + "Converter" + val converter: AxonEventMessageHeaderConverter = applicationContext.getQualifiedBeanWithFallback(converterName, DEFAULT_MESSAGE_HEADER_CONVERTER_NAME) + + // lookup consumer or create one + val handlerName = (config.beanNamePrefix ?: name) + "Handler" if (!applicationContext.containsBean(handlerName)) { // the channel handler is not configured yet. val handler = AxonEventMessageHandler( messageAcceptor = channelMessageAcceptor, metrics = metrics, - axonEventMessageHeaderConverter = applicationContext.getQualifiedBeanWithFallback(name, DEFAULT_MESSAGE_HEADER_CONVERTER_NAME), + axonEventMessageHeaderConverter = converter, encoder = encoder, channelName = name ) applicationContext.registerBean(handlerName, AxonEventMessageHandler::class.java, Supplier { handler }) logger.info { "[Camunda CORRELATE] Registered AxonEventMessageHandler for channel '$name' named '$handlerName'." } + } else { + logger.info { "[Camunda CORRELATE] Found a bean '$handlerName', skipping construction." } } } } diff --git a/extension/spring-boot-starter/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/ChannelConfigurationProperties.kt b/extension/spring-boot-starter/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/ChannelConfigurationProperties.kt index 38523c2..5a98c9f 100644 --- a/extension/spring-boot-starter/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/ChannelConfigurationProperties.kt +++ b/extension/spring-boot-starter/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/ChannelConfigurationProperties.kt @@ -13,9 +13,12 @@ data class ChannelConfigurationProperties( */ val type: String, /** - * Name of the bean to register the consumer. + * Prefix for the name of beans to create. + * If not provided the Consumer will be used as bean name of the consumer. + * If not provided the Converter will be used as bean name of the channel message header converter. + * If you register beans with those names in your code, the creation is skipped. */ - val beanName: String? = null, + val beanNamePrefix: String? = null, /** * Additional properties. */ diff --git a/extension/spring-cloud-stream/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelConfiguration.kt b/extension/spring-cloud-stream/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelConfiguration.kt index 55c9b0a..a5a1456 100644 --- a/extension/spring-cloud-stream/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelConfiguration.kt +++ b/extension/spring-cloud-stream/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelConfiguration.kt @@ -19,7 +19,7 @@ class SpringCloudStreamChannelConfiguration { companion object { const val CHANNEL_TYPE = "stream" - const val DEFAULT_CHANNEL_MESSAGE_HEADER_CONVERTER = "channelMessageHeaderConverter" + const val DEFAULT_CHANNEL_MESSAGE_HEADER_CONVERTER = "${CHANNEL_TYPE}ChannelMessageHeaderConverter" } /** diff --git a/extension/spring-cloud-stream/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelProxyFactory.kt b/extension/spring-cloud-stream/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelProxyFactory.kt index ca0aa3d..cbf65b3 100644 --- a/extension/spring-cloud-stream/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelProxyFactory.kt +++ b/extension/spring-cloud-stream/src/main/kotlin/io/holunda/camunda/bpm/correlate/ingress/cloudstream/SpringCloudStreamChannelProxyFactory.kt @@ -37,17 +37,24 @@ class SpringCloudStreamChannelProxyFactory( if (this::applicationContext.isInitialized) { logger.debug { "[Camunda CORRELATE] Creating channel consumers for Spring Cloud Streams: ${springCloudConfigurations.keys.joinToString(", ")}." } springCloudConfigurations.forEach { (name, config) -> - val consumerName = config.beanName ?: "$name-consumer" + // lookup named converter or take the default one + val converterName = (config.beanNamePrefix ?: name) + "Converter" + val converter: StreamChannelMessageHeaderConverter = applicationContext.getQualifiedBeanWithFallback(converterName, DEFAULT_CHANNEL_MESSAGE_HEADER_CONVERTER) + + // lookup consumer or create one + val consumerName = (config.beanNamePrefix ?: name) + "Consumer" if (!applicationContext.containsBean(consumerName)) { // the channel is not configured yet. val consumer = StreamByteMessageConsumer( messageAcceptor = channelMessageAcceptor, metrics = metrics, - streamChannelMessageHeaderConverter = applicationContext.getQualifiedBeanWithFallback(name, DEFAULT_CHANNEL_MESSAGE_HEADER_CONVERTER), + streamChannelMessageHeaderConverter = converter, channelName = name ) applicationContext.registerBean(consumerName, StreamByteMessageConsumer::class.java, Supplier { consumer }) logger.info { "[Camunda CORRELATE] Registered StreamByteMessageConsumer for channel '$name' named '$consumerName'." } + } else { + logger.info { "[Camunda CORRELATE] Found a bean '$consumerName', skipping construction." } } } }