Skip to content

Commit

Permalink
improve factories, make itests work
Browse files Browse the repository at this point in the history
  • Loading branch information
zambrovski committed Nov 17, 2023
1 parent ebfaa10 commit 9685b73
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import org.springframework.test.context.junit.jupiter.SpringExtension
webEnvironment = SpringBootTest.WebEnvironment.MOCK,
properties = [
// axon-1
"correlate.channels.axon-1.type=axon-event",
"correlate.channels.axon-1.enabled=true",
"correlate.channels.axonOne.type=axon-event",
"correlate.channels.axonOne.enabled=true",
// axon-2
"correlate.channels.axon-2.type=axon-event",
"correlate.channels.axon-2.enabled=true",
"correlate.channels.axon-2.beanName=specifiedHandlerName",
"correlate.channels.axon-2.beanNamePrefix=specified",
// unknown-type
"correlate.channels.unknown-type.type=unknown-type",
"correlate.channels.unknown-type.enabled=true",
Expand All @@ -43,24 +43,24 @@ internal class AxonChannelConfigurationIT {
private lateinit var handlers: Map<String, AxonEventMessageHandler>

@Autowired
@Qualifier("specifiedHandlerName")
@Qualifier("specifiedConverter")
private lateinit var converter: AxonEventMessageHeaderConverter


@Test
fun configures_two_consumers() {

assertThat(handlers).hasSize(2)
assertThat(handlers.keys).containsExactlyInAnyOrder("axon-1-handler", "specifiedHandlerName")
assertThat(handlers["axon-1-handler"]!!.channelName).isEqualTo("axon-1")
assertThat(handlers["specifiedHandlerName"]!!.channelName).isEqualTo("axon-2")
assertThat(handlers["specifiedHandlerName"]!!.axonEventMessageHeaderConverter).isEqualTo(converter)
assertThat(handlers.keys).containsExactlyInAnyOrder("axonOneHandler", "specifiedHandler")
assertThat(handlers["axonOneHandler"]!!.channelName).isEqualTo("axonOne")
assertThat(handlers["specifiedHandler"]!!.channelName).isEqualTo("axon-2")
assertThat(handlers["specifiedHandler"]!!.axonEventMessageHeaderConverter).isEqualTo(converter)
}

@SpringBootApplication(exclude = [BatchCorrelationSchedulerConfiguration::class])
class TestApplication {
@Bean("specifiedHandlerName")
fun specifiedHandlerName(): AxonEventMessageHeaderConverter = mock()
@Bean("specifiedConverter")
fun doesNotMatter(): AxonEventMessageHeaderConverter = mock()

@Bean
fun singleMessageCorrelationStrategy(): SingleMessageCorrelationStrategy = mock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ import org.springframework.test.context.junit.jupiter.SpringExtension
webEnvironment = SpringBootTest.WebEnvironment.MOCK,
properties = [
// kafka-1
"correlate.channels.kafka-1.type=stream",
"correlate.channels.kafka-1.enabled=true",
"correlate.channels.kafkaOne.type=stream",
"correlate.channels.kafkaOne.enabled=true",
// kafka-2
"correlate.channels.kafka-2.type=stream",
"correlate.channels.kafka-2.enabled=true",
"correlate.channels.kafka-2.beanName=specifiedConsumerName",
"correlate.channels.kafkaTwo.type=stream",
"correlate.channels.kafkaTwo.enabled=true",
"correlate.channels.kafkaTwo.beanNamePrefix=specifiedName",
// unknown-type
"correlate.channels.unknown-type.type=unknown-type",
"correlate.channels.unknown-type.enabled=true",
"correlate.channels.unknownType.type=unknown-type",
"correlate.channels.unknownType.enabled=true",
// disabled
"correlate.channels.disabled.type=stream",
"correlate.channels.disabled.enabled=false",
// function declaration
"spring.cloud.stream.function.definition=kafka-1-consumer; specified-consumer-name",
"spring.cloud.stream.function.definition=kafkaOneConsumer; specifiedNameConsumer",
// bindings
"spring.cloud.stream.function.bindings.kafka-1-consumer-in-0=correlate-ingress-binding-1",
"spring.cloud.stream.function.bindings.specified-consumer-name-in-0=correlate-ingress-binding-1",
"spring.cloud.stream.function.bindings.kafkaOneConsumer-in-0=correlate-ingress-binding-1",
"spring.cloud.stream.function.bindings.specifiedNameConsumer-in-0=correlate-ingress-binding-1",

]
)
Expand All @@ -49,23 +49,23 @@ internal class SpringCloudStreamChannelConfigurationIT {
private lateinit var consumers: Map<String, StreamByteMessageConsumer>

@Autowired
@Qualifier("specifiedConsumerName")
private lateinit var converter: StreamChannelMessageHeaderConverter

@Qualifier("specifiedNameConverter")
private lateinit var converter: StreamChannelMessageHeaderConverter // this converter will be picked up because of the name of the field

@Test
fun configures_two_consumers() {

assertThat(consumers).hasSize(2)
assertThat(consumers.keys).containsExactlyInAnyOrder("kafka-1-consumer", "specified-consumer-name")
assertThat(consumers["kafka-1-consumer"]!!.channelName).isEqualTo("kafka-1")
assertThat(consumers["specified-consumer-name"]!!.channelName).isEqualTo("kafka-2")
assertThat(consumers["specified-consumer-name"]!!.streamChannelMessageHeaderConverter).isEqualTo(converter)
assertThat(consumers.keys).containsExactlyInAnyOrder("kafkaOneConsumer", "specifiedNameConsumer")
assertThat(consumers["kafkaOneConsumer"]!!.channelName).isEqualTo("kafkaOne")
assertThat(consumers["specifiedNameConsumer"]!!.channelName).isEqualTo("kafkaTwo")
assertThat(consumers["specifiedNameConsumer"]!!.streamChannelMessageHeaderConverter).isEqualTo(converter)
}

@SpringBootApplication(exclude = [BatchCorrelationSchedulerConfiguration::class])
class TestApplication {
@Bean("specifiedConsumerName")

@Bean("specifiedNameConverter")
fun qualifiedConverter(): StreamChannelMessageHeaderConverter = mock()

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ spring:
- org.springframework.cloud.stream.config.BindingsEndpointAutoConfiguration
- org.springframework.cloud.stream.config.BindingServiceConfiguration
- org.springframework.cloud.stream.binder.kafka.config.ExtendedBindingHandlerMappingsProviderConfiguration
- org.springframework.cloud.stream.function.FunctionConfiguration
1 change: 1 addition & 0 deletions example/itest/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ spring:
show-sql: false

axon:
disable-axoniq-console-message: true
axonserver:
enabled: false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,25 @@ class AxonChannelProxyFactory(
val encoding: String = requireNotNull( getEncoding(config) ) { "Channel encoding is required, please set either globally or for channel." }
val encoder = requireNotNull(payloadDecoders.find { it.supports(encoding) }) { "Could not find decoder for configured message encoding '$encoding'." }

val handlerName = config.beanName ?: "$name-handler"
// lookup named converter or take the default one
val converterName = (config.beanNamePrefix ?: name) + "Converter"
val converter: AxonEventMessageHeaderConverter = applicationContext.getQualifiedBeanWithFallback(converterName, DEFAULT_MESSAGE_HEADER_CONVERTER_NAME)

// lookup consumer or create one
val handlerName = (config.beanNamePrefix ?: name) + "Handler"
if (!applicationContext.containsBean(handlerName)) {
// the channel handler is not configured yet.
val handler = AxonEventMessageHandler(
messageAcceptor = channelMessageAcceptor,
metrics = metrics,
axonEventMessageHeaderConverter = applicationContext.getQualifiedBeanWithFallback(name, DEFAULT_MESSAGE_HEADER_CONVERTER_NAME),
axonEventMessageHeaderConverter = converter,
encoder = encoder,
channelName = name
)
applicationContext.registerBean(handlerName, AxonEventMessageHandler::class.java, Supplier { handler })
logger.info { "[Camunda CORRELATE] Registered AxonEventMessageHandler for channel '$name' named '$handlerName'." }
} else {
logger.info { "[Camunda CORRELATE] Found a bean '$handlerName', skipping construction." }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ data class ChannelConfigurationProperties(
*/
val type: String,
/**
* Name of the bean to register the consumer.
* Prefix for the name of beans to create.
* If not provided the <channel-name>Consumer will be used as bean name of the consumer.
* If not provided the <channel-name>Converter will be used as bean name of the channel message header converter.
* If you register beans with those names in your code, the creation is skipped.
*/
val beanName: String? = null,
val beanNamePrefix: String? = null,
/**
* Additional properties.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class SpringCloudStreamChannelConfiguration {

companion object {
const val CHANNEL_TYPE = "stream"
const val DEFAULT_CHANNEL_MESSAGE_HEADER_CONVERTER = "channelMessageHeaderConverter"
const val DEFAULT_CHANNEL_MESSAGE_HEADER_CONVERTER = "${CHANNEL_TYPE}ChannelMessageHeaderConverter"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,24 @@ class SpringCloudStreamChannelProxyFactory(
if (this::applicationContext.isInitialized) {
logger.debug { "[Camunda CORRELATE] Creating channel consumers for Spring Cloud Streams: ${springCloudConfigurations.keys.joinToString(", ")}." }
springCloudConfigurations.forEach { (name, config) ->
val consumerName = config.beanName ?: "$name-consumer"
// lookup named converter or take the default one
val converterName = (config.beanNamePrefix ?: name) + "Converter"
val converter: StreamChannelMessageHeaderConverter = applicationContext.getQualifiedBeanWithFallback(converterName, DEFAULT_CHANNEL_MESSAGE_HEADER_CONVERTER)

// lookup consumer or create one
val consumerName = (config.beanNamePrefix ?: name) + "Consumer"
if (!applicationContext.containsBean(consumerName)) {
// the channel is not configured yet.
val consumer = StreamByteMessageConsumer(
messageAcceptor = channelMessageAcceptor,
metrics = metrics,
streamChannelMessageHeaderConverter = applicationContext.getQualifiedBeanWithFallback(name, DEFAULT_CHANNEL_MESSAGE_HEADER_CONVERTER),
streamChannelMessageHeaderConverter = converter,
channelName = name
)
applicationContext.registerBean(consumerName, StreamByteMessageConsumer::class.java, Supplier { consumer })
logger.info { "[Camunda CORRELATE] Registered StreamByteMessageConsumer for channel '$name' named '$consumerName'." }
} else {
logger.info { "[Camunda CORRELATE] Found a bean '$consumerName', skipping construction." }
}
}
}
Expand Down

0 comments on commit 9685b73

Please sign in to comment.