diff --git a/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfiguration.java b/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfiguration.java index 854dde6e..67d4fe79 100644 --- a/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfiguration.java +++ b/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.cloud.fn.common.debezium; import java.time.Clock; -import java.time.Duration; import java.util.Objects; import io.debezium.engine.ChangeEvent; @@ -40,7 +39,6 @@ import org.springframework.cloud.fn.common.debezium.DebeziumProperties.DebeziumFormat; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; -import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; /** @@ -63,6 +61,7 @@ * * @author Christian Tzolov * @author Corneil du Plessis + * @author Artem Bilan */ @AutoConfiguration @EnableConfigurationProperties(DebeziumProperties.class) @@ -75,9 +74,10 @@ public class DebeziumEngineBuilderAutoConfiguration { /** * The fully-qualified class name of the commit policy type. The default is a periodic * commit policy based upon time intervals. - * @param properties The 'debezium.properties.offset.flush.interval.ms' configuration + * @param properties the 'debezium.properties.offset.flush.interval.ms' configuration * is compulsory for the Periodic policy type. The ALWAYS and DEFAULT doesn't require * additional configuration. + * @return {@link OffsetCommitPolicy} bean */ @Bean @ConditionalOnMissingBean @@ -98,7 +98,7 @@ public OffsetCommitPolicy offsetCommitPolicy(DebeziumProperties properties) { * Use the specified clock when needing to determine the current time. Defaults to * {@link Clock#systemDefaultZone() system clock}, but you can override the Bean in * your configuration with you {@link Clock implementation}. Returns - * @return Clock for the system default zone. + * @return {@link Clock} for the system default zone. */ @Bean @ConditionalOnMissingBean @@ -109,8 +109,8 @@ public Clock debeziumClock() { /** * When the engine's {@link DebeziumEngine#run()} method completes, call the supplied * function with the results. - * @return Default completion callback that logs the completion status. The bean can - * be overridden in custom implementation. + * @return {@link CompletionCallback} that logs the completion status. The bean can be + * overridden in custom implementation. */ @Bean @ConditionalOnMissingBean @@ -121,7 +121,8 @@ public CompletionCallback completionCallback() { /** * During the engine run, provides feedback about the different stages according to * the completion state of each component running within the engine (connectors, tasks - * etc). The bean can be overridden in custom implementation. + * etc.). The bean can be overridden in custom implementation. + * @return {@link ConnectorCallback} */ @Bean @ConditionalOnMissingBean @@ -151,34 +152,19 @@ public DebeziumEngine.Builder> debeziumEngineBuilder .using((offsetCommitPolicy != NULL_OFFSET_COMMIT_POLICY) ? offsetCommitPolicy : null); } - /** - * Converts the {@link DebeziumFormat} enum into Debezium {@link SerializationFormat} - * class. - * @param debeziumFormat debezium format property. - */ private Class> serializationFormatClass(DebeziumFormat debeziumFormat) { - switch (debeziumFormat) { - case JSON: - return io.debezium.engine.format.JsonByteArray.class; - case AVRO: - return io.debezium.engine.format.Avro.class; - case PROTOBUF: - return io.debezium.engine.format.Protobuf.class; - default: - throw new IllegalArgumentException("Unknown debezium format: " + debeziumFormat); - } + return switch (debeziumFormat) { + case JSON -> io.debezium.engine.format.JsonByteArray.class; + case AVRO -> io.debezium.engine.format.Avro.class; + case PROTOBUF -> io.debezium.engine.format.Protobuf.class; + }; } /** * A callback function to be notified when the connector completes. */ - private static final CompletionCallback DEFAULT_COMPLETION_CALLBACK = new CompletionCallback() { - @Override - public void handle(boolean success, String message, Throwable error) { - logger.info(String.format("Debezium Engine completed with success:%s, message:%s ", success, message), - error); - } - }; + private static final CompletionCallback DEFAULT_COMPLETION_CALLBACK = (success, message, error) -> logger + .info(String.format("Debezium Engine completed with success:%s, message:%s ", success, message), error); /** * Callback function which informs users about the various stages a connector goes @@ -219,18 +205,16 @@ public void taskStopped() { /** * The policy that defines when the offsets should be committed to offset storage. */ - private static final OffsetCommitPolicy NULL_OFFSET_COMMIT_POLICY = new OffsetCommitPolicy() { - @Override - public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) { - throw new UnsupportedOperationException("Unimplemented method 'performCommit'"); - } + private static final OffsetCommitPolicy NULL_OFFSET_COMMIT_POLICY = (numberOfMessagesSinceLastCommit, + timeSinceLastCommit) -> { + throw new UnsupportedOperationException("Unimplemented method 'performCommit'"); }; /** * Determine if Debezium connector is available. This either kicks in if any debezium * connector is available. */ - @Order(Ordered.LOWEST_PRECEDENCE) + @Order static class OnDebeziumConnectorCondition extends AnyNestedCondition { OnDebeziumConnectorCondition() { diff --git a/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/DebeziumProperties.java b/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/DebeziumProperties.java index ad956e24..17d25b22 100644 --- a/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/DebeziumProperties.java +++ b/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/DebeziumProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,10 @@ import org.springframework.boot.context.properties.ConfigurationProperties; /** + * Debezium engine auto-configuration properties. + * * @author Christian Tzolov + * @author Artem Bilan */ @ConfigurationProperties("debezium") public class DebeziumProperties { @@ -50,7 +53,7 @@ public enum DebeziumFormat { } public final String contentType() { - return contentType; + return this.contentType; } }; @@ -59,15 +62,15 @@ public final String contentType() { * Spring pass-trough wrapper for debezium configuration properties. All properties * with a 'debezium.properties.*' prefix are native Debezium properties. */ - private Map properties = new HashMap<>(); + private final Map properties = new HashMap<>(); /** - * {@link ChangeEvent} Key and Payload formats. Defaults to 'JSON'. + * {@code io.debezium.engine.ChangeEvent} Key and Payload formats. Defaults to 'JSON'. */ private DebeziumFormat payloadFormat = DebeziumFormat.JSON; /** - * {@link ChangeEvent} header format. Defaults to 'JSON'. + * {@code io.debezium.engine.ChangeEvent} header format. Defaults to 'JSON'. */ private DebeziumFormat headerFormat = DebeziumFormat.JSON; @@ -77,11 +80,11 @@ public final String contentType() { private DebeziumOffsetCommitPolicy offsetCommitPolicy = DebeziumOffsetCommitPolicy.DEFAULT; public Map getProperties() { - return properties; + return this.properties; } public DebeziumFormat getPayloadFormat() { - return payloadFormat; + return this.payloadFormat; } public void setPayloadFormat(DebeziumFormat format) { @@ -89,7 +92,7 @@ public void setPayloadFormat(DebeziumFormat format) { } public DebeziumFormat getHeaderFormat() { - return headerFormat; + return this.headerFormat; } public void setHeaderFormat(DebeziumFormat headerFormat) { @@ -118,7 +121,7 @@ public enum DebeziumOffsetCommitPolicy { } public DebeziumOffsetCommitPolicy getOffsetCommitPolicy() { - return offsetCommitPolicy; + return this.offsetCommitPolicy; } public void setOffsetCommitPolicy(DebeziumOffsetCommitPolicy offsetCommitPolicy) { @@ -126,12 +129,13 @@ public void setOffsetCommitPolicy(DebeziumOffsetCommitPolicy offsetCommitPolicy) } /** - * Converts the Spring Framework "debezium.properties.*" properties into native + * Convert the Spring Framework "debezium.properties.*" properties into native * Debezium configuration. + * @return the properties for Debezium native configuration */ public Properties getDebeziumNativeConfiguration() { Properties outProps = new java.util.Properties(); - outProps.putAll(this.getProperties()); + outProps.putAll(getProperties()); return outProps; } diff --git a/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/EmbeddedEngineExecutorService.java b/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/EmbeddedEngineExecutorService.java index 37406fc8..16e839bc 100644 --- a/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/EmbeddedEngineExecutorService.java +++ b/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/EmbeddedEngineExecutorService.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,22 +28,23 @@ import org.springframework.context.SmartLifecycle; /** - * The Debezium Engine is designed to be submitted to an {@link Executor} or - * {@link ExecutorService} for execution by a single thread, and a running connector can - * be stopped either by calling {@link #stop()} from another thread or by interrupting the - * running thread (e.g., as is the case with {@link ExecutorService#shutdownNow()}). - * + * The Debezium Engine is designed to be submitted to an {@link ExecutorService} for + * execution by a single thread, and a running connector can be stopped either by calling + * {@link #stop()} from another thread or by interrupting the running thread (e.g., as is + * the case with {@link ExecutorService#shutdownNow()}). + *

* The EmbeddedEngineExecutorService provides a sample ExecutorService implementation * aligned with the Spring lifecycle. - * + *

* Note that the DebeziumReactiveConsumerConfiguration embeds an ExecutorService as part * of the Supplier<Flux<Message<?>>> configuration. * * @author Christian Tzolov + * @author Artem Bilan */ public class EmbeddedEngineExecutorService implements SmartLifecycle, AutoCloseable { - private static final Log logger = LogFactory.getLog(EmbeddedEngineExecutorService.class); + private static final Log LOGGER = LogFactory.getLog(EmbeddedEngineExecutorService.class); private final DebeziumEngine engine; @@ -58,7 +59,7 @@ public EmbeddedEngineExecutorService(DebeziumEngine engine) { @Override public void start() { - logger.info("Start Embedded Engine"); + LOGGER.info("Start Embedded Engine"); this.executor.execute(this.engine); this.running.set(true); } @@ -70,13 +71,13 @@ public void stop() { @Override public void close() { - logger.info("Stop Embedded Engine"); + LOGGER.info("Stop Embedded Engine"); try { this.engine.close(); this.running.set(false); } - catch (IOException e) { - logger.warn("Failed to close the Debezium Engine:", e); + catch (IOException ex) { + LOGGER.warn("Failed to close the Debezium Engine:", ex); } this.executor.shutdown(); } diff --git a/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/package-info.java b/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/package-info.java new file mode 100644 index 00000000..387a0042 --- /dev/null +++ b/common/spring-debezium-autoconfigure/src/main/java/org/springframework/cloud/fn/common/debezium/package-info.java @@ -0,0 +1,4 @@ +/** + * The common auto-configuration support for Debezium. + */ +package org.springframework.cloud.fn.common.debezium; diff --git a/common/spring-debezium-autoconfigure/src/test/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfigurationIntegrationTest.java b/common/spring-debezium-autoconfigure/src/test/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfigurationIntegrationTests.java similarity index 98% rename from common/spring-debezium-autoconfigure/src/test/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfigurationIntegrationTest.java rename to common/spring-debezium-autoconfigure/src/test/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfigurationIntegrationTests.java index 014987d0..c1a4f41a 100644 --- a/common/spring-debezium-autoconfigure/src/test/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfigurationIntegrationTest.java +++ b/common/spring-debezium-autoconfigure/src/test/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfigurationIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,9 +61,9 @@ */ @Tag("integration") @Testcontainers -public class DebeziumEngineBuilderAutoConfigurationIntegrationTest { +public class DebeziumEngineBuilderAutoConfigurationIntegrationTests { - private static final Log logger = LogFactory.getLog(DebeziumEngineBuilderAutoConfigurationIntegrationTest.class); + private static final Log logger = LogFactory.getLog(DebeziumEngineBuilderAutoConfigurationIntegrationTests.class); private static final String DATABASE_NAME = "inventory"; @@ -123,7 +123,7 @@ public void consumerTest() { "debezium.properties.transforms.unwrap.drop.tombstones=false", "debezium.properties.transforms.unwrap.delete.handling.mode=rewrite", "debezium.properties.transforms.unwrap.add.fields=name,db") - .run(context -> { + .run((context) -> { JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class); DebeziumCustomConsumerApplication.TestDebeziumConsumer testConsumer = context diff --git a/common/spring-debezium-autoconfigure/src/test/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfigurationTests.java b/common/spring-debezium-autoconfigure/src/test/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfigurationTests.java index 90391969..f5a75c20 100644 --- a/common/spring-debezium-autoconfigure/src/test/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfigurationTests.java +++ b/common/spring-debezium-autoconfigure/src/test/java/org/springframework/cloud/fn/common/debezium/DebeziumEngineBuilderAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,25 +39,20 @@ public class DebeziumEngineBuilderAutoConfigurationTests { @Test void noConnectorNoProperty() { - this.contextRunner.run((context) -> { - assertThat(context).doesNotHaveBean(DebeziumEngine.Builder.class); - }); + this.contextRunner.run((context) -> assertThat(context).doesNotHaveBean(DebeziumEngine.Builder.class)); } @Test void noConnectorWithProperty() { this.contextRunner.withPropertyValues("debezium.properties.connector.class=Dummy") .withClassLoader(new FilteredClassLoader("io.debezium.connector")) - .run((context) -> { - assertThat(context).doesNotHaveBean(DebeziumEngine.Builder.class); - }); + .run((context) -> assertThat(context).doesNotHaveBean(DebeziumEngine.Builder.class)); } @Test void withConnectorWithProperty() { - this.contextRunner.withPropertyValues("debezium.properties.connector.class=Dummy").run((context) -> { - assertThat(context).hasSingleBean(DebeziumEngine.Builder.class); - }); + this.contextRunner.withPropertyValues("debezium.properties.connector.class=Dummy") + .run((context) -> assertThat(context).hasSingleBean(DebeziumEngine.Builder.class)); } } diff --git a/dependencies.gradle b/dependencies.gradle index d7f70408..d48abdc5 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -3,7 +3,7 @@ ext { springCloudVersion = '2023.0.0' springCloudAwsVersion = '3.0.4' - debeziumVersion = '2.4.2.Final' + debeziumVersion = '2.5.0.Final' protobufVersion='3.25.1' springIntegrationAws = 'org.springframework.integration:spring-integration-aws:3.0.4' diff --git a/publish-maven.gradle b/publish-maven.gradle index f655787b..606dfba8 100644 --- a/publish-maven.gradle +++ b/publish-maven.gradle @@ -29,28 +29,28 @@ publishing { developer { id = 'artembilan' name = 'Artem Bilan' - email = 'abilan@vmware.com' + email = 'artem.bilan@broadcom.com' roles = ['project lead'] } developer { id = 'corneil' name = 'Corneil du Plessis' - email = 'pcorneil@vmware.com' + email = 'corneil.du-plessis@broadcom.com' } developer { id = 'sobychacko' name = 'Soby Chacko' - email = 'chackos@vmware.com' + email = 'soby.chacko@broadcom.com' } developer { id = 'onobc' name = 'Chris Bono' - email = 'cbono@vmware.com' + email = 'chris.bono@broadcom.com' } developer { id = 'tzolov' name = 'Christian Tzolov' - email = 'ctzolov@vmware.com' + email = 'christian.tzolov@broadcom.com' } } issueManagement { diff --git a/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumReactiveConsumerConfiguration.java b/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumReactiveConsumerConfiguration.java index 677ff174..dcd0a58a 100644 --- a/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumReactiveConsumerConfiguration.java +++ b/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumReactiveConsumerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.cloud.fn.supplier.debezium; import java.lang.reflect.Field; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -48,15 +47,17 @@ import org.springframework.util.MimeTypeUtils; /** + * The Debezium supplier auto-configuration. + * * @author Christian Tzolov * @author Artem Bilan */ @AutoConfiguration(after = DebeziumEngineBuilderAutoConfiguration.class) -@EnableConfigurationProperties({ DebeziumSupplierProperties.class }) +@EnableConfigurationProperties(DebeziumSupplierProperties.class) @ConditionalOnBean(DebeziumEngine.Builder.class) public class DebeziumReactiveConsumerConfiguration implements BeanClassLoaderAware { - private static final Log logger = LogFactory.getLog(DebeziumReactiveConsumerConfiguration.class); + private static final Log LOGGER = LogFactory.getLog(DebeziumReactiveConsumerConfiguration.class); /** * ORG_SPRINGFRAMEWORK_KAFKA_SUPPORT_KAFKA_NULL. @@ -72,7 +73,7 @@ public void setBeanClassLoader(ClassLoader classLoader) { Field field = clazz.getDeclaredField("INSTANCE"); this.kafkaNull = field.get(null); } - catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException e) { + catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException ex) { } } @@ -85,7 +86,7 @@ public void setBeanClassLoader(ClassLoader classLoader) { /** * Debezium Engine is designed to be submitted to an {@link ExecutorService} for * execution by a single thread, and a running connector can be stopped either by - * calling {@link #stop()} from another thread or by interrupting the running thread + * calling {@code stop()} from another thread or by interrupting the running thread * (e.g., as is the case with {@link ExecutorService#shutdownNow()}). */ private final ExecutorService debeziumExecutor = Executors.newSingleThreadExecutor(); @@ -102,8 +103,8 @@ public DebeziumEngine> debeziumEngine( public Supplier>> debeziumSupplier(DebeziumEngine> debeziumEngine) { return () -> this.eventSink.asFlux() - .doOnRequest(r -> debeziumExecutor.execute(debeziumEngine)) - .doOnTerminate(debeziumExecutor::shutdownNow); + .doOnRequest((r) -> this.debeziumExecutor.execute(debeziumEngine)) + .doOnTerminate(this.debeziumExecutor::shutdownNow); } @Bean @@ -111,7 +112,7 @@ public Supplier>> debeziumSupplier(DebeziumEngine> changeEventConsumer(DebeziumProperties engineProperties, DebeziumSupplierProperties supplierProperties) { - return new ChangeEventConsumer(engineProperties.getPayloadFormat().contentType(), + return new ChangeEventConsumer<>(engineProperties.getPayloadFormat().contentType(), supplierProperties.isCopyHeaders(), this.eventSink); } @@ -134,8 +135,8 @@ private ChangeEventConsumer(String contentType, boolean copyHeaders, Sinks.Many< @Override public void accept(ChangeEvent changeEvent) { - if (logger.isDebugEnabled()) { - logger.debug("[Debezium Event]: " + changeEvent.key()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[Debezium Event]: " + changeEvent.key()); } Object key = changeEvent.key(); @@ -156,7 +157,7 @@ public void accept(ChangeEvent changeEvent) { // If payload is still null ignore the message. if (payload == null) { - logger.info("Dropped null payload message"); + LOGGER.info("Dropped null payload message"); return; } @@ -170,9 +171,7 @@ public void accept(ChangeEvent changeEvent) { if (this.copyHeaders) { List> headers = changeEvent.headers(); if (headers != null && !headers.isEmpty()) { - Iterator> itr = headers.iterator(); - while (itr.hasNext()) { - Header header = itr.next(); + for (Header header : headers) { messageBuilder.setHeader(header.getKey(), header.getValue()); } } diff --git a/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumSupplierProperties.java b/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumSupplierProperties.java index 4cb07e0e..eb08a9f6 100644 --- a/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumSupplierProperties.java +++ b/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/DebeziumSupplierProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; /** + * Debezium supplier configuration properties. + * * @author Christian Tzolov */ @ConfigurationProperties("debezium.supplier") @@ -30,7 +32,7 @@ public class DebeziumSupplierProperties { private boolean copyHeaders = true; public boolean isCopyHeaders() { - return copyHeaders; + return this.copyHeaders; } public void setCopyHeaders(boolean copyHeaders) { diff --git a/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/package-info.java b/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/package-info.java new file mode 100644 index 00000000..ca919ebf --- /dev/null +++ b/supplier/spring-debezium-supplier/src/main/java/org/springframework/cloud/fn/supplier/debezium/package-info.java @@ -0,0 +1,4 @@ +/** + * The Debezium supplier support classes. + */ +package org.springframework.cloud.fn.supplier.debezium; diff --git a/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumReactiveConsumerConfigurationTests.java b/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumReactiveConsumerConfigurationTests.java index 07c7ef97..65da978f 100644 --- a/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumReactiveConsumerConfigurationTests.java +++ b/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumReactiveConsumerConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; -import org.springframework.boot.test.context.FilteredClassLoader; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.cloud.fn.common.debezium.DebeziumEngineBuilderAutoConfiguration; import org.springframework.cloud.fn.supplier.debezium.DebeziumReactiveConsumerConfiguration; @@ -42,16 +41,13 @@ public class DebeziumReactiveConsumerConfigurationTests { @Test void noConnectorNoProperty() { - this.contextRunner.run((context) -> { - assertThat(context).doesNotHaveBean(DebeziumEngine.class); - }); + this.contextRunner.run((context) -> assertThat(context).doesNotHaveBean(DebeziumEngine.class)); } @Test void withConnectorWithProperty() { - this.contextRunner.withPropertyValues("debezium.properties.connector.class=Dummy").run((context) -> { - assertThat(context).hasSingleBean(DebeziumEngine.class); - }); + this.contextRunner.withPropertyValues("debezium.properties.connector.class=Dummy") + .run((context) -> assertThat(context).hasSingleBean(DebeziumEngine.class)); } } diff --git a/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTest.java b/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java similarity index 79% rename from supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTest.java rename to supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java index 357811cc..e0fdeea2 100644 --- a/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTest.java +++ b/supplier/spring-debezium-supplier/src/test/java/org/springframework/cloud/fn/supplier/debezium/it/supplier/DebeziumSupplierIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,7 +67,7 @@ "app.datasource.driver-class-name=com.mysql.cj.jdbc.Driver", "app.datasource.type=com.zaxxer.hikari.HikariDataSource" }) @Testcontainers -public class DebeziumSupplierIntegrationTest { +public class DebeziumSupplierIntegrationTests { public static final String IMAGE_TAG = "2.3.3.Final"; @@ -111,33 +111,33 @@ void testDebeziumSupplier() { // Customers table .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\",\"__deleted\":\"false\"}")) + "{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}")) .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\",\"__deleted\":\"false\"}")) + "{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}")) .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\",\"__deleted\":\"false\"}")) + "{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}")) .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\",\"__deleted\":\"false\"}")) + "{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}")) // NEW Customer Insert .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":1005,\"first_name\":\"Test666\",\"last_name\":\"Test666\",\"email\":\"Test666@spring.org\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\",\"__deleted\":\"false\"}")) + "{\"id\":1005,\"first_name\":\"Test666\",\"last_name\":\"Test666\",\"email\":\"Test666@spring.org\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"customers\"}")) // Addresses table .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":10,\"customer_id\":1001,\"street\":\"3183 Moore Avenue\",\"city\":\"Euless\",\"state\":\"Texas\",\"zip\":\"76036\",\"type\":\"SHIPPING\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\",\"__deleted\":\"false\"}")) + "{\"id\":10,\"customer_id\":1001,\"street\":\"3183 Moore Avenue\",\"city\":\"Euless\",\"state\":\"Texas\",\"zip\":\"76036\",\"type\":\"SHIPPING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":11,\"customer_id\":1001,\"street\":\"2389 Hidden Valley Road\",\"city\":\"Harrisburg\",\"state\":\"Pennsylvania\",\"zip\":\"17116\",\"type\":\"BILLING\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\",\"__deleted\":\"false\"}")) + "{\"id\":11,\"customer_id\":1001,\"street\":\"2389 Hidden Valley Road\",\"city\":\"Harrisburg\",\"state\":\"Pennsylvania\",\"zip\":\"17116\",\"type\":\"BILLING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":12,\"customer_id\":1002,\"street\":\"281 Riverside Drive\",\"city\":\"Augusta\",\"state\":\"Georgia\",\"zip\":\"30901\",\"type\":\"BILLING\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\",\"__deleted\":\"false\"}")) + "{\"id\":12,\"customer_id\":1002,\"street\":\"281 Riverside Drive\",\"city\":\"Augusta\",\"state\":\"Georgia\",\"zip\":\"30901\",\"type\":\"BILLING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":13,\"customer_id\":1003,\"street\":\"3787 Brownton Road\",\"city\":\"Columbus\",\"state\":\"Mississippi\",\"zip\":\"39701\",\"type\":\"SHIPPING\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\",\"__deleted\":\"false\"}")) + "{\"id\":13,\"customer_id\":1003,\"street\":\"3787 Brownton Road\",\"city\":\"Columbus\",\"state\":\"Mississippi\",\"zip\":\"39701\",\"type\":\"SHIPPING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":14,\"customer_id\":1003,\"street\":\"2458 Lost Creek Road\",\"city\":\"Bethlehem\",\"state\":\"Pennsylvania\",\"zip\":\"18018\",\"type\":\"SHIPPING\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\",\"__deleted\":\"false\"}")) + "{\"id\":14,\"customer_id\":1003,\"street\":\"2458 Lost Creek Road\",\"city\":\"Bethlehem\",\"state\":\"Pennsylvania\",\"zip\":\"18018\",\"type\":\"SHIPPING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":15,\"customer_id\":1003,\"street\":\"4800 Simpson Square\",\"city\":\"Hillsdale\",\"state\":\"Oklahoma\",\"zip\":\"73743\",\"type\":\"BILLING\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\",\"__deleted\":\"false\"}")) + "{\"id\":15,\"customer_id\":1003,\"street\":\"4800 Simpson Square\",\"city\":\"Hillsdale\",\"state\":\"Oklahoma\",\"zip\":\"73743\",\"type\":\"BILLING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) .assertNext((message) -> assertThat(payloadString(message)).isEqualTo( - "{\"id\":16,\"customer_id\":1004,\"street\":\"1289 University Hill Road\",\"city\":\"Canehill\",\"state\":\"Arkansas\",\"zip\":\"72717\",\"type\":\"LIVING\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\",\"__deleted\":\"false\"}")) + "{\"id\":16,\"customer_id\":1004,\"street\":\"1289 University Hill Road\",\"city\":\"Canehill\",\"state\":\"Arkansas\",\"zip\":\"72717\",\"type\":\"LIVING\",\"__deleted\":\"false\",\"__name\":\"my-topic\",\"__db\":\"inventory\",\"__op\":\"r\",\"__table\":\"addresses\"}")) .thenCancel() .verify();