From d7f4862e620baaa8a47f05e0dc567d5c9cea8e12 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 23 Feb 2024 14:51:59 -0500 Subject: [PATCH 1/2] GH-5: Use `DebeziumMessageProducer` from Spring Integration Fixes: #5 * Expose `enableEmptyPayload` and `headerNamesToMap` configuration properties instead of plain `copyHeaders` * All the logic now is hidden in the `DebeziumMessageProducer` * Clean up tests according to a new code base * Fix warning for the deprecated Debezium properties. Now only one `delete.tombstone.handling.mode` is needed * Optimize the code flow for the `StepVerifier` via conversion pulled up to the `Flux` * Apparently there is some race condition in the `EmbeddedEngine` around stop functionality, so, add `debezium.embedded.shutdown.pause.before.interrupt.ms=500` instead of `5 minutes` by default --- .../spring-debezium-supplier/build.gradle | 1 + ...DebeziumReactiveConsumerConfiguration.java | 156 +++--------------- .../debezium/DebeziumSupplierProperties.java | 26 ++- ...iumReactiveConsumerConfigurationTests.java | 8 +- .../DebeziumSupplierIntegrationTests.java | 46 +++--- .../src/test/resources/logback-test.xml | 5 +- 6 files changed, 74 insertions(+), 168 deletions(-) diff --git a/supplier/spring-debezium-supplier/build.gradle b/supplier/spring-debezium-supplier/build.gradle index e8dc4b47..14e82958 100644 --- a/supplier/spring-debezium-supplier/build.gradle +++ b/supplier/spring-debezium-supplier/build.gradle @@ -4,6 +4,7 @@ repositories { dependencies { api project(':spring-debezium-autoconfigure') + api 'org.springframework.integration:spring-integration-debezium' api 'io.debezium:debezium-connector-mysql' api 'io.debezium:debezium-connector-mongodb' api 'io.debezium:debezium-connector-postgres' diff --git a/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumReactiveConsumerConfiguration.java b/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumReactiveConsumerConfiguration.java index dcd0a58a..012caab7 100644 --- a/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumReactiveConsumerConfiguration.java +++ b/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumReactiveConsumerConfiguration.java @@ -16,35 +16,25 @@ package org.springframework.cloud.fn.supplier.debezium; -import java.lang.reflect.Field; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.Consumer; import java.util.function.Supplier; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine.Builder; -import io.debezium.engine.Header; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; -import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.cloud.fn.common.debezium.DebeziumEngineBuilderAutoConfiguration; -import org.springframework.cloud.fn.common.debezium.DebeziumProperties; import org.springframework.context.annotation.Bean; +import org.springframework.integration.debezium.dsl.Debezium; +import org.springframework.integration.debezium.dsl.DebeziumMessageProducerSpec; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.util.ClassUtils; -import org.springframework.util.MimeTypeUtils; /** * The Debezium supplier auto-configuration. @@ -55,131 +45,29 @@ @AutoConfiguration(after = DebeziumEngineBuilderAutoConfiguration.class) @EnableConfigurationProperties(DebeziumSupplierProperties.class) @ConditionalOnBean(DebeziumEngine.Builder.class) -public class DebeziumReactiveConsumerConfiguration implements BeanClassLoaderAware { - - private static final Log LOGGER = LogFactory.getLog(DebeziumReactiveConsumerConfiguration.class); - - /** - * ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL. - */ - public static final String ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL = "org.springframework.kafka.support.KafkaNull"; - - private Object kafkaNull = null; - - @Override - public void setBeanClassLoader(ClassLoader classLoader) { - try { - Class clazz = ClassUtils.forName(ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL, classLoader); - Field field = clazz.getDeclaredField("INSTANCE"); - this.kafkaNull = field.get(null); - } - catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException ex) { - } - } - - /** - * Reactive Streams, single subscriber, sink used to push down the change event - * signals received from the Debezium Engine. - */ - private final Sinks.Many> eventSink = Sinks.many().unicast().onBackpressureError(); - - /** - * Debezium Engine is designed to be submitted to an {@link ExecutorService} for - * execution by a single thread, and a running connector can be stopped either by - * calling {@code stop()} from another thread or by interrupting the running thread - * (e.g., as is the case with {@link ExecutorService#shutdownNow()}). - */ - private final ExecutorService debeziumExecutor = Executors.newSingleThreadExecutor(); +public class DebeziumReactiveConsumerConfiguration { @Bean - public DebeziumEngine> debeziumEngine( - Consumer> changeEventConsumer, - Builder> debeziumEngineBuilder) { - - return debeziumEngineBuilder.notifying(changeEventConsumer).build(); + public Supplier>> debeziumSupplier(Publisher> debeziumPublisher) { + return () -> Flux.from(debeziumPublisher); } @Bean - public Supplier>> debeziumSupplier(DebeziumEngine> debeziumEngine) { - - return () -> this.eventSink.asFlux() - .doOnRequest((r) -> this.debeziumExecutor.execute(debeziumEngine)) - .doOnTerminate(this.debeziumExecutor::shutdownNow); - } - - @Bean - @ConditionalOnMissingBean - public Consumer> changeEventConsumer(DebeziumProperties engineProperties, - DebeziumSupplierProperties supplierProperties) { - - return new ChangeEventConsumer<>(engineProperties.getPayloadFormat().contentType(), - supplierProperties.isCopyHeaders(), this.eventSink); - } - - /** - * Format-agnostic change event consumer. - */ - private final class ChangeEventConsumer implements Consumer> { - - private final String contentType; - - private final boolean copyHeaders; - - private final Sinks.Many> eventSink; - - private ChangeEventConsumer(String contentType, boolean copyHeaders, Sinks.Many> eventSink) { - this.contentType = contentType; - this.copyHeaders = copyHeaders; - this.eventSink = eventSink; - } - - @Override - public void accept(ChangeEvent changeEvent) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("[Debezium Event]: " + changeEvent.key()); - } - - Object key = changeEvent.key(); - Object payload = changeEvent.value(); - String destination = changeEvent.destination(); - - // When the tombstone event is enabled, Debezium serializes the payload to - // null (e.g. empty payload) - // while the metadata information is carried through the headers - // (debezium_key). - // Note: Event for none flattened responses, when the - // debezium.properties.tombstones.on.delete=true - // (default), tombstones are generate by Debezium and handled by the code - // below. - if (payload == null) { - payload = DebeziumReactiveConsumerConfiguration.this.kafkaNull; - } - - // If payload is still null ignore the message. - if (payload == null) { - LOGGER.info("Dropped null payload message"); - return; - } - - MessageBuilder messageBuilder = MessageBuilder.withPayload(payload) - .setHeader("debezium_key", key) - .setHeader("debezium_destination", destination) - .setHeader(MessageHeaders.CONTENT_TYPE, - (payload.equals(DebeziumReactiveConsumerConfiguration.this.kafkaNull)) - ? MimeTypeUtils.TEXT_PLAIN_VALUE : this.contentType); - - if (this.copyHeaders) { - List> headers = changeEvent.headers(); - if (headers != null && !headers.isEmpty()) { - for (Header header : headers) { - messageBuilder.setHeader(header.getKey(), header.getValue()); - } - } - } - - this.eventSink.tryEmitNext(messageBuilder.build()); + public Publisher> debeziumPublisher(Builder> debeziumEngineBuilder, + DebeziumSupplierProperties supplierProperties, + @Nullable ComponentCustomizer debeziumMessageProducerSpecComponentCustomizer) { + + DebeziumMessageProducerSpec debeziumMessageProducerSpec = Debezium.inboundChannelAdapter(debeziumEngineBuilder) + .enableEmptyPayload(supplierProperties.isEnableEmptyPayload()) + .headerNames(supplierProperties.getHeaderNamesToMap()) + // TODO until Spring Integration 6.3.0-M2 + .autoStartup(false); + + if (debeziumMessageProducerSpecComponentCustomizer != null) { + debeziumMessageProducerSpecComponentCustomizer.customize(debeziumMessageProducerSpec); } + return IntegrationFlow.from(debeziumMessageProducerSpec).toReactivePublisher(true); } } diff --git a/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumSupplierProperties.java b/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumSupplierProperties.java index eb08a9f6..cdb7df59 100644 --- a/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumSupplierProperties.java +++ b/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumSupplierProperties.java @@ -22,21 +22,35 @@ * Debezium supplier configuration properties. * * @author Christian Tzolov + * @author Artem Bilan */ @ConfigurationProperties("debezium.supplier") public class DebeziumSupplierProperties { /** - * Copy Change Event headers into Message headers. + * Enable support for tombstone (aka delete) messages. */ - private boolean copyHeaders = true; + private boolean enableEmptyPayload = true; - public boolean isCopyHeaders() { - return this.copyHeaders; + /** + * Patterns for {@code ChangeEvent.headers()} to map. + */ + private String[] headerNamesToMap = { "*" }; + + public boolean isEnableEmptyPayload() { + return this.enableEmptyPayload; + } + + public void setEnableEmptyPayload(boolean enableEmptyPayload) { + this.enableEmptyPayload = enableEmptyPayload; + } + + public String[] getHeaderNamesToMap() { + return this.headerNamesToMap; } - public void setCopyHeaders(boolean copyHeaders) { - this.copyHeaders = copyHeaders; + public void setHeaderNamesToMap(String[] headerNamesToMap) { + this.headerNamesToMap = headerNamesToMap; } } diff --git a/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumReactiveConsumerConfigurationTests.java b/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumReactiveConsumerConfigurationTests.java index 65da978f..3ea90c6b 100644 --- a/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumReactiveConsumerConfigurationTests.java +++ b/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumReactiveConsumerConfigurationTests.java @@ -16,7 +16,8 @@ package org.springframework.cloud.fn.supplier.debezium.it.supplier; -import io.debezium.engine.DebeziumEngine; +import java.util.function.Supplier; + import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -30,6 +31,7 @@ * Tests for {@link DebeziumReactiveConsumerConfiguration}. * * @author Christian Tzolov + * @author Artem Bilan */ public class DebeziumReactiveConsumerConfigurationTests { @@ -41,13 +43,13 @@ public class DebeziumReactiveConsumerConfigurationTests { @Test void noConnectorNoProperty() { - this.contextRunner.run((context) -> assertThat(context).doesNotHaveBean(DebeziumEngine.class)); + this.contextRunner.run((context) -> assertThat(context).doesNotHaveBean(Supplier.class)); } @Test void withConnectorWithProperty() { this.contextRunner.withPropertyValues("debezium.properties.connector.class=Dummy") - .run((context) -> assertThat(context).hasSingleBean(DebeziumEngine.class)); + .run((context) -> assertThat(context).hasSingleBean(Supplier.class)); } } diff --git a/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java b/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java index e0fdeea2..89a69c79 100644 --- a/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java +++ b/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java @@ -16,6 +16,7 @@ package org.springframework.cloud.fn.supplier.debezium.it.supplier; +import java.time.Duration; import java.util.function.Supplier; import org.junit.jupiter.api.Test; @@ -41,16 +42,16 @@ properties = { "spring.cloud.function.definition=debeziumSupplier", // https://debezium.io/documentation/reference/transformations/event-flattening.html + "debezium.properties.debezium.embedded.shutdown.pause.before.interrupt.ms=500", "debezium.properties.transforms=unwrap", "debezium.properties.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState", - "debezium.properties.transforms.unwrap.drop.tombstones=true", - "debezium.properties.transforms.unwrap.delete.handling.mode=rewrite", + "debezium.properties.transforms.unwrap.delete.tombstone.handling.mode=rewrite", "debezium.properties.transforms.unwrap.add.fields=name,db,op,table", "debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory", "debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore", - // Drop schema from the message payload. + // Drop schema from the payload payload. "debezium.properties.key.converter.schemas.enable=false", "debezium.properties.value.converter.schemas.enable=false", @@ -101,52 +102,51 @@ void testDebeziumSupplier() { jdbcTemplate.update( "INSERT INTO `customers`(`first_name`,`last_name`,`email`) VALUES('Test666', 'Test666', 'Test666@spring.org')"); - Flux> messageFlux = this.debeziumSupplier.get(); + Flux payloadFlux = this.debeziumSupplier.get() + .map(Message::getPayload) + .cast(byte[].class) + .map(String::new); - // Message size should correspond to the number of insert statements in: + // payload size should correspond to the number of insert statements in: // https://github.com/debezium/container-images/blob/main/examples/mysql/2.3/inventory.sql // filtered by Customers and Addresses table. - StepVerifier.create(messageFlux) + StepVerifier.create(payloadFlux) .expectNextCount(16) // Skip the DDL transaction logs. // Customers table - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}")) - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}")) - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}")) - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}")) // NEW Customer Insert - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":1005,\"first_name\":\"Test666\",\"last_name\":\"Test666\",\"email\":\"Test666@spring.org\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}")) // Addresses table - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":10,\"customer_id\":1001,\"street\":\"3183 Moore Avenue\",\"city\":\"Euless\",\"state\":\"Texas\",\"zip\":\"76036\",\"type\":\"SHIPPING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":11,\"customer_id\":1001,\"street\":\"2389 Hidden Valley Road\",\"city\":\"Harrisburg\",\"state\":\"Pennsylvania\",\"zip\":\"17116\",\"type\":\"BILLING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":12,\"customer_id\":1002,\"street\":\"281 Riverside Drive\",\"city\":\"Augusta\",\"state\":\"Georgia\",\"zip\":\"30901\",\"type\":\"BILLING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":13,\"customer_id\":1003,\"street\":\"3787 Brownton Road\",\"city\":\"Columbus\",\"state\":\"Mississippi\",\"zip\":\"39701\",\"type\":\"SHIPPING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":14,\"customer_id\":1003,\"street\":\"2458 Lost Creek Road\",\"city\":\"Bethlehem\",\"state\":\"Pennsylvania\",\"zip\":\"18018\",\"type\":\"SHIPPING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":15,\"customer_id\":1003,\"street\":\"4800 Simpson Square\",\"city\":\"Hillsdale\",\"state\":\"Oklahoma\",\"zip\":\"73743\",\"type\":\"BILLING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) - .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( + .assertNext((payload) -> assertThat(payload).isEqualTo( "{\"id\":16,\"customer_id\":1004,\"street\":\"1289 University Hill Road\",\"city\":\"Canehill\",\"state\":\"Arkansas\",\"zip\":\"72717\",\"type\":\"LIVING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) .thenCancel() - .verify(); + .verify(Duration.ofSeconds(30)); } - private String payloadString(Message message) { - return new String((byte[]) message.getPayload()); - } - @SpringBootApplication(exclude = { MongoAutoConfiguration.class }) @Import({ TestJdbcTemplateConfiguration.class }) static class DebeziumSupplierTestApplication { diff --git a/supplier/spring-debezium-supplier/src/test/resources/logback-test.xml b/supplier/spring-debezium-supplier/src/test/resources/logback-test.xml index 227674db..0b3c2f82 100644 --- a/supplier/spring-debezium-supplier/src/test/resources/logback-test.xml +++ b/supplier/spring-debezium-supplier/src/test/resources/logback-test.xml @@ -21,10 +21,11 @@ - + - + + From b8a735e575a665df540fbf2cc44ed2de77c39308 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 23 Feb 2024 15:13:04 -0500 Subject: [PATCH 2/2] Fix typo in the `DebeziumSupplierIntegrationTests` --- .../debezium/it/supplier/DebeziumSupplierIntegrationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java b/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java index 89a69c79..d96f1309 100644 --- a/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java +++ b/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java @@ -51,7 +51,7 @@ "debezium.properties.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory", "debezium.properties.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore", - // Drop schema from the payload payload. + // Drop schema from the message payload. "debezium.properties.key.converter.schemas.enable=false", "debezium.properties.value.converter.schemas.enable=false",