From 316fda6b15193d4acc22a1fbb397919bde2bf275 Mon Sep 17 00:00:00 2001 From: photowey Date: Fri, 5 Apr 2024 20:14:34 +0800 Subject: [PATCH 1/5] feat: Add: 1.runtime module; 2.Kafka engine; 3.components builder and service, Signed-off-by: photowey --- kafka-plus-core/pom.xml | 4 + .../core/clients/builder/AbstractBuilder.java | 40 +++++++ .../clients/builder/admin/AdminBuilder.java | 44 +++++++ .../builder/admin/AdminBuilderImpl.java | 110 ++++++++++++++++++ .../builder/admin/topic/NewTopicBuilder.java} | 6 +- .../admin/topic/NewTopicBuilderImpl.java | 26 +++++ .../builder/consumer/ConsumerBuilder.java | 56 +++++++++ .../builder/consumer/ConsumerBuilderImpl.java | 106 +++++++++++++++++ .../builder/producer/ProducerBuilder.java | 57 +++++++++ .../builder/producer/ProducerBuilderImpl.java | 106 +++++++++++++++++ .../photowey/kafka/plus/core/enums/Kafka.java | 26 +++++ kafka-plus-engine/pom.xml | 4 + .../kafka/plus/engine/AbstractEngine.java | 55 +++++++++ .../kafka/plus/engine/KafkaEngine.java | 33 ++++++ .../kafka/plus/engine/KafkaEngineAware.java | 10 +- .../kafka/plus/engine/KafkaEngineImpl.java | 48 ++++++++ .../plus/engine/holder/KafkaEngineHolder.java | 83 +++++++++++++ kafka-plus-runtime/pom.xml | 31 +++++ .../plus/runtime/service/AdminService.java | 31 +++++ .../plus/runtime/service/ConsumerService.java | 36 ++++++ .../plus/runtime/service/ProducerService.java | 36 ++++++ .../service/impl/AdminServiceImpl.java | 35 ++++++ .../service/impl/ConsumerServiceImpl.java | 35 ++++++ .../service/impl/ProducerServiceImpl.java | 34 ++++++ kafka-plus-runtime/src/main/resources/.Keep | 0 .../github/photowey/kafka/plus/runtime/.Keep | 0 kafka-plus-runtime/src/test/resources/.Keep | 0 pom.xml | 27 +++++ 28 files changed, 1072 insertions(+), 7 deletions(-) create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/AbstractBuilder.java create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilder.java create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilderImpl.java rename kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/{domain/consumer/builder/ConsumerBuilder.java => clients/builder/admin/topic/NewTopicBuilder.java} (83%) create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java create mode 100644 kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/AbstractEngine.java rename kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/domain/producer/builder/ProducerBuilder.java => kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineAware.java (80%) create mode 100644 kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineImpl.java create mode 100644 kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/holder/KafkaEngineHolder.java create mode 100644 kafka-plus-runtime/pom.xml create mode 100644 kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/AdminService.java create mode 100644 kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ConsumerService.java create mode 100644 kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ProducerService.java create mode 100644 kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/AdminServiceImpl.java create mode 100644 kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ConsumerServiceImpl.java create mode 100644 kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java create mode 100644 kafka-plus-runtime/src/main/resources/.Keep create mode 100644 kafka-plus-runtime/src/test/java/io/github/photowey/kafka/plus/runtime/.Keep create mode 100644 kafka-plus-runtime/src/test/resources/.Keep diff --git a/kafka-plus-core/pom.xml b/kafka-plus-core/pom.xml index a475cc0..5e34579 100644 --- a/kafka-plus-core/pom.xml +++ b/kafka-plus-core/pom.xml @@ -21,6 +21,10 @@ + + org.apache.kafka + kafka-clients + diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/AbstractBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/AbstractBuilder.java new file mode 100644 index 0000000..1b4de6c --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/AbstractBuilder.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.clients.builder; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * {@code AbstractService} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public abstract class AbstractBuilder { + + protected Map configs; + protected Properties props; + + protected void initConfigsIfNecessary() { + if (null == this.configs) { + this.configs = new HashMap<>(1 << 3); + } + } + +} \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilder.java new file mode 100644 index 0000000..2d01019 --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilder.java @@ -0,0 +1,44 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.clients.builder.admin; + +import org.apache.kafka.clients.admin.Admin; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Consumer; + +/** + * {@code AdminBuilder} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public interface AdminBuilder { + + AdminBuilder boostrapServers(String bootstrapServers); + + AdminBuilder props(Properties props); + + AdminBuilder configs(Map configMap); + + AdminBuilder checkProps(Consumer fx); + + AdminBuilder checkConfigs(Consumer> fx); + + Admin create(); +} diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilderImpl.java new file mode 100644 index 0000000..0f27f9d --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilderImpl.java @@ -0,0 +1,110 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.clients.builder.admin; + +import io.github.photowey.kafka.plus.core.clients.builder.AbstractBuilder; +import io.github.photowey.kafka.plus.core.enums.Kafka; +import org.apache.kafka.clients.admin.Admin; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Consumer; + +/** + * {@code AdminBuilderImpl} + *

+ * Examples: + *

+ * Properties props = new Properties();
+ * Admin admin = new AdminBuilderImpl()
+ *    .props(props)
+ *    .checkProps((x) -> {})
+ *    .build();
+ * 
+ * + *
+ * Map configMap = new HashMap<>();
+ * Admin admin = new AdminBuilderImpl()
+ *    .configMap(configMap)
+ *    .checkMap((x) -> {})
+ *    .build();
+ * 
+ * + *
+ * String bootstrapServers = "localhost:9092";
+ * Admin admin = new AdminBuilderImpl()
+ *    .boostrapServers(bootstrapServers)
+ *    .checkMap((x) -> {})
+ *    .build();
+ * 
+ * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public class AdminBuilderImpl extends AbstractBuilder implements AdminBuilder { + + @Override + public AdminBuilder boostrapServers(String bootstrapServers) { + super.initConfigsIfNecessary(); + super.configs.put(Kafka.Bootstrap.Server.ADDRESS.value(), bootstrapServers); + + return this; + } + + // ---------------------------------------------------------------- + + @Override + public AdminBuilder props(Properties props) { + super.props = props; + + return this; + } + + @Override + public AdminBuilder configs(Map configs) { + super.configs = configs; + + return this; + } + + // ---------------------------------------------------------------- + + @Override + public AdminBuilder checkProps(Consumer fx) { + fx.accept(super.props); + + return this; + } + + @Override + public AdminBuilder checkConfigs(Consumer> fx) { + fx.accept(super.configs); + + return this; + } + + // ---------------------------------------------------------------- + + @Override + public Admin create() { + if (null != super.props) { + return Admin.create(super.props); + } + + return Admin.create(super.configs); + } +} \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/domain/consumer/builder/ConsumerBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java similarity index 83% rename from kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/domain/consumer/builder/ConsumerBuilder.java rename to kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java index b15c9c0..81fef1f 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/domain/consumer/builder/ConsumerBuilder.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java @@ -13,14 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.github.photowey.kafka.plus.core.domain.consumer.builder; +package io.github.photowey.kafka.plus.core.clients.builder.admin.topic; /** - * {@code ConsumerBuilder} + * {@code NewTopicBuilder} * * @author photowey * @date 2024/04/05 * @since 1.0.0 */ -public interface ConsumerBuilder { +public interface NewTopicBuilder { } diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java new file mode 100644 index 0000000..cca3aab --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java @@ -0,0 +1,26 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.clients.builder.admin.topic; + +/** + * {@code NewTopicBuilderImpl} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public class NewTopicBuilderImpl implements NewTopicBuilder { +} \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java new file mode 100644 index 0000000..e9f0fad --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java @@ -0,0 +1,56 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.clients.builder.consumer; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Consumer; + +/** + * {@code ConsumerBuilder} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public interface ConsumerBuilder { + + ConsumerBuilder boostrapServers(String bootstrapServers); + + // ---------------------------------------------------------------- + + ConsumerBuilder keyDeserializer(Deserializer keyDeserializer); + + ConsumerBuilder valueDeserializer(Deserializer valueDeserializer); + + // ---------------------------------------------------------------- + ConsumerBuilder props(Properties props); + + ConsumerBuilder configs(Map configs); + + // ---------------------------------------------------------------- + + ConsumerBuilder checkProps(Consumer fx); + + ConsumerBuilder checkConfigs(Consumer> fx); + + // ---------------------------------------------------------------- + + KafkaConsumer create(); +} diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java new file mode 100644 index 0000000..882b9fc --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java @@ -0,0 +1,106 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.clients.builder.consumer; + +import io.github.photowey.kafka.plus.core.clients.builder.AbstractBuilder; +import io.github.photowey.kafka.plus.core.enums.Kafka; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Consumer; + +/** + * {@code ConsumerBuilderImpl} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public class ConsumerBuilderImpl extends AbstractBuilder implements ConsumerBuilder { + + private Deserializer keyDeserializer; + private Deserializer valueDeserializer; + + @Override + public ConsumerBuilder boostrapServers(String bootstrapServers) { + super.initConfigsIfNecessary(); + super.configs.put(Kafka.Bootstrap.Server.ADDRESS.value(), bootstrapServers); + + return this; + } + + // ---------------------------------------------------------------- + + @Override + public ConsumerBuilder keyDeserializer(Deserializer keyDeserializer) { + this.keyDeserializer = keyDeserializer; + + return this; + } + + @Override + public ConsumerBuilder valueDeserializer(Deserializer valueDeserializer) { + this.valueDeserializer = valueDeserializer; + + return this; + } + + // ---------------------------------------------------------------- + + @Override + public ConsumerBuilder props(Properties props) { + super.props = props; + + return this; + } + + @Override + public ConsumerBuilder configs(Map configs) { + super.configs = configs; + + return this; + } + + // ---------------------------------------------------------------- + + @Override + public ConsumerBuilder checkProps(Consumer fx) { + fx.accept(super.props); + + return this; + } + + @Override + public ConsumerBuilder checkConfigs(Consumer> fx) { + fx.accept(super.configs); + + return this; + } + + // ---------------------------------------------------------------- + + @Override + @SuppressWarnings("unchecked") + public KafkaConsumer create() { + if (null != super.props) { + return new KafkaConsumer<>(super.props, (Deserializer) this.keyDeserializer, (Deserializer) this.valueDeserializer); + } + + return new KafkaConsumer<>(this.configs, (Deserializer) this.keyDeserializer, (Deserializer) this.valueDeserializer); + } +} \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java new file mode 100644 index 0000000..7f578fe --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java @@ -0,0 +1,57 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.clients.builder.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Consumer; + +/** + * {@code ProducerBuilder} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public interface ProducerBuilder { + + ProducerBuilder boostrapServers(String bootstrapServers); + + // ---------------------------------------------------------------- + + ProducerBuilder keySerializer(Serializer keySerializer); + + ProducerBuilder valueSerializer(Serializer valueSerializer); + + // ---------------------------------------------------------------- + ProducerBuilder props(Properties props); + + ProducerBuilder configs(Map configs); + + // ---------------------------------------------------------------- + + ProducerBuilder checkProps(Consumer fx); + + ProducerBuilder checkConfigs(Consumer> fx); + + // ---------------------------------------------------------------- + + KafkaProducer create(); + +} \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java new file mode 100644 index 0000000..bf1aa6b --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java @@ -0,0 +1,106 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.clients.builder.producer; + +import io.github.photowey.kafka.plus.core.clients.builder.AbstractBuilder; +import io.github.photowey.kafka.plus.core.enums.Kafka; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; +import java.util.Properties; +import java.util.function.Consumer; + +/** + * {@code ProducerBuilderImpl} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public class ProducerBuilderImpl extends AbstractBuilder implements ProducerBuilder { + + private Serializer keySerializer; + private Serializer valueSerializer; + + @Override + public ProducerBuilder boostrapServers(String bootstrapServers) { + super.initConfigsIfNecessary(); + super.configs.put(Kafka.Bootstrap.Server.ADDRESS.value(), bootstrapServers); + + return this; + } + + // ---------------------------------------------------------------- + + @Override + public ProducerBuilder keySerializer(Serializer keySerializer) { + this.keySerializer = keySerializer; + + return null; + } + + @Override + public ProducerBuilder valueSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + + return null; + } + + // ---------------------------------------------------------------- + + @Override + public ProducerBuilder props(Properties props) { + super.props = props; + + return this; + } + + @Override + public ProducerBuilder configs(Map configs) { + super.configs = configs; + + return this; + } + + // ---------------------------------------------------------------- + + @Override + public ProducerBuilder checkProps(Consumer fx) { + fx.accept(super.props); + + return this; + } + + @Override + public ProducerBuilder checkConfigs(Consumer> fx) { + fx.accept(super.configs); + + return this; + } + + // ---------------------------------------------------------------- + + @Override + @SuppressWarnings("unchecked") + public KafkaProducer create() { + if (null != super.props) { + return new KafkaProducer<>(super.props, (Serializer) this.keySerializer, (Serializer) this.valueSerializer); + } + + return new KafkaProducer<>(super.configs, (Serializer) this.keySerializer, (Serializer) this.valueSerializer); + } +} \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java index 3e814dc..ce73772 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java @@ -15,6 +15,8 @@ */ package io.github.photowey.kafka.plus.core.enums; +import org.apache.kafka.clients.admin.AdminClientConfig; + /** * {@code Kafka} * @@ -35,6 +37,30 @@ public enum Mode { } + public enum Bootstrap { + + ; + + public enum Server { + + ADDRESS("bootstrap.servers", AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + + ; + + private String name; + private String value; + + Server(String name, String value) { + this.name = name; + this.value = value; + } + + public String value() { + return this.value; + } + } + } + public enum Consumer { ; diff --git a/kafka-plus-engine/pom.xml b/kafka-plus-engine/pom.xml index 7774217..486a80e 100644 --- a/kafka-plus-engine/pom.xml +++ b/kafka-plus-engine/pom.xml @@ -21,6 +21,10 @@ + + io.github.photowey + kafka-plus-runtime + diff --git a/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/AbstractEngine.java b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/AbstractEngine.java new file mode 100644 index 0000000..20f3504 --- /dev/null +++ b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/AbstractEngine.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.engine; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * {@code AbstractEngine} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public abstract class AbstractEngine implements KafkaEngine { + + protected final ConcurrentHashMap, Object> sharedObjects = new ConcurrentHashMap<>(); + + // ---------------------------------------------------------------- + + public void setSharedObject(Class sharedType, T t) { + this.sharedObjects.put(sharedType, t); + } + + // ---------------------------------------------------------------- + + public T getSharedObject(Class sharedType) { + Object target = this.sharedObjects.get(sharedType); + return (T) target; + } + + public T getSharedObject(Class sharedType, Supplier fx) { + Object target = this.sharedObjects.computeIfAbsent(sharedType, (x) -> fx.get()); + return (T) target; + } + + // ---------------------------------------------------------------- + + public void cleanSharedObjects() { + this.sharedObjects.clear(); + } +} \ No newline at end of file diff --git a/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngine.java b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngine.java index f1a9fb0..085c3e9 100644 --- a/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngine.java +++ b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngine.java @@ -15,6 +15,10 @@ */ package io.github.photowey.kafka.plus.engine; +import io.github.photowey.kafka.plus.runtime.service.AdminService; +import io.github.photowey.kafka.plus.runtime.service.ConsumerService; +import io.github.photowey.kafka.plus.runtime.service.ProducerService; + /** * {@code KafkaEngine} * @@ -23,4 +27,33 @@ * @since 1.0.0 */ public interface KafkaEngine extends Engine { + + /** + * Create {@link AdminService} instance. + *
+     * try (Admin admin = KafkaEngineHolder.INSTANCE.kafkaEngine().adminService().createAdmin()
+     *        .boostrapServers("localhost:9092")
+     *        .checkMap((x) -> {})
+     *        .create()) {
+     *    // do something.
+     * }
+     * 
+ * + * @return {@link AdminService} + */ + AdminService adminService(); + + /** + * Create {@link ConsumerService} instance. + * + * @return {@link ConsumerService} + */ + ConsumerService consumerService(); + + /** + * Create {@link ProducerService} instance. + * + * @return {@link ProducerService} + */ + ProducerService producerService(); } diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/domain/producer/builder/ProducerBuilder.java b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineAware.java similarity index 80% rename from kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/domain/producer/builder/ProducerBuilder.java rename to kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineAware.java index 9960ff8..073291c 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/domain/producer/builder/ProducerBuilder.java +++ b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineAware.java @@ -13,14 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.github.photowey.kafka.plus.core.domain.producer.builder; +package io.github.photowey.kafka.plus.engine; /** - * {@code ProducerBuilder} + * {@code KafkaEngineAware} * * @author photowey * @date 2024/04/05 * @since 1.0.0 */ -public interface ProducerBuilder { -} \ No newline at end of file +public interface KafkaEngineAware { + + void setKafkaEngine(KafkaEngine kafkaEngine); +} diff --git a/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineImpl.java b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineImpl.java new file mode 100644 index 0000000..b2b57ee --- /dev/null +++ b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineImpl.java @@ -0,0 +1,48 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.engine; + +import io.github.photowey.kafka.plus.runtime.service.AdminService; +import io.github.photowey.kafka.plus.runtime.service.ConsumerService; +import io.github.photowey.kafka.plus.runtime.service.ProducerService; +import io.github.photowey.kafka.plus.runtime.service.impl.AdminServiceImpl; +import io.github.photowey.kafka.plus.runtime.service.impl.ConsumerServiceImpl; +import io.github.photowey.kafka.plus.runtime.service.impl.ProducerServiceImpl; + +/** + * {@code KafkaEngineImpl} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public class KafkaEngineImpl extends AbstractEngine { + + @Override + public AdminService adminService() { + return this.getSharedObject(AdminService.class, AdminServiceImpl::new); + } + + @Override + public ConsumerService consumerService() { + return this.getSharedObject(ConsumerService.class, ConsumerServiceImpl::new); + } + + @Override + public ProducerService producerService() { + return this.getSharedObject(ProducerService.class, ProducerServiceImpl::new); + } +} \ No newline at end of file diff --git a/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/holder/KafkaEngineHolder.java b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/holder/KafkaEngineHolder.java new file mode 100644 index 0000000..9e6ed77 --- /dev/null +++ b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/holder/KafkaEngineHolder.java @@ -0,0 +1,83 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.engine.holder; + +import io.github.photowey.kafka.plus.engine.KafkaEngine; +import io.github.photowey.kafka.plus.engine.KafkaEngineImpl; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@code KafkaEngineHolder} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public enum KafkaEngineHolder { + + INSTANCE, + + ; + + private final AtomicBoolean initialized = new AtomicBoolean(false); + + private KafkaEngine kafkaEngine; + private final Lock lock = new ReentrantLock(); + + // ---------------------------------------------------------------- + + private static class KafkaEngineFactory implements Serializable { + private static final KafkaEngine INSTANCE = new KafkaEngineImpl(); + } + + // ---------------------------------------------------------------- + + public void kafkaEngine(KafkaEngine kafkaEngine) { + this.kafkaEngine(kafkaEngine, false); + } + + public void kafkaEngine(KafkaEngine kafkaEngine, boolean refresh) { + if (refresh) { + this.initialized.compareAndSet(false, true); + this.kafkaEngine = kafkaEngine; + + return; + } + + if (this.initialized.compareAndSet(false, true)) { + this.kafkaEngine = kafkaEngine; + } + } + + public KafkaEngine kafkaEngine() { + if (null == this.kafkaEngine) { + this.lock.lock(); + try { + if (null == this.kafkaEngine) { + this.kafkaEngine(KafkaEngineFactory.INSTANCE, true); + } + } finally { + this.lock.unlock(); + } + } + + return this.kafkaEngine; + } +} diff --git a/kafka-plus-runtime/pom.xml b/kafka-plus-runtime/pom.xml new file mode 100644 index 0000000..9200e4f --- /dev/null +++ b/kafka-plus-runtime/pom.xml @@ -0,0 +1,31 @@ + + + + 4.0.0 + + + io.github.photowey + kafka-plus + 3.7.0.1.0-SNAPSHOT + + + kafka-plus-runtime + + ${project.groupId}:${project.artifactId} + The runtime module of project kafka-plus + + + + + + + + + io.github.photowey + kafka-plus-core + + + + + \ No newline at end of file diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/AdminService.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/AdminService.java new file mode 100644 index 0000000..44c3421 --- /dev/null +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/AdminService.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.runtime.service; + +import io.github.photowey.kafka.plus.core.clients.builder.admin.AdminBuilder; + +/** + * {@code AdminService} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public interface AdminService { + + AdminBuilder createAdmin(); + +} \ No newline at end of file diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ConsumerService.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ConsumerService.java new file mode 100644 index 0000000..96275e9 --- /dev/null +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ConsumerService.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.runtime.service; + +import io.github.photowey.kafka.plus.core.clients.builder.consumer.ConsumerBuilder; + +/** + * {@code ConsumerService} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public interface ConsumerService { + + /** + * Create {@link ConsumerBuilder} instance. + * + * @return {@link ConsumerBuilder} + */ + ConsumerBuilder createConsumer(); + +} diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ProducerService.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ProducerService.java new file mode 100644 index 0000000..52524a3 --- /dev/null +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ProducerService.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.runtime.service; + +import io.github.photowey.kafka.plus.core.clients.builder.producer.ProducerBuilder; + +/** + * {@code ProducerService} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public interface ProducerService { + + /** + * Create {@link ProducerBuilder} instance. + * + * @return {@link ProducerBuilder} + */ + ProducerBuilder createProducer(); + +} diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/AdminServiceImpl.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/AdminServiceImpl.java new file mode 100644 index 0000000..79e60f6 --- /dev/null +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/AdminServiceImpl.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.runtime.service.impl; + +import io.github.photowey.kafka.plus.core.clients.builder.admin.AdminBuilder; +import io.github.photowey.kafka.plus.core.clients.builder.admin.AdminBuilderImpl; +import io.github.photowey.kafka.plus.runtime.service.AdminService; + +/** + * {@code AdminServiceImpl} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public class AdminServiceImpl implements AdminService { + + @Override + public AdminBuilder createAdmin() { + return new AdminBuilderImpl(); + } +} \ No newline at end of file diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ConsumerServiceImpl.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ConsumerServiceImpl.java new file mode 100644 index 0000000..ebaf6db --- /dev/null +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ConsumerServiceImpl.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.runtime.service.impl; + +import io.github.photowey.kafka.plus.core.clients.builder.consumer.ConsumerBuilder; +import io.github.photowey.kafka.plus.core.clients.builder.consumer.ConsumerBuilderImpl; +import io.github.photowey.kafka.plus.runtime.service.ConsumerService; + +/** + * {@code ConsumerServiceImpl} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public class ConsumerServiceImpl implements ConsumerService { + + @Override + public ConsumerBuilder createConsumer() { + return new ConsumerBuilderImpl(); + } +} \ No newline at end of file diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java new file mode 100644 index 0000000..565eff2 --- /dev/null +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java @@ -0,0 +1,34 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.runtime.service.impl; + +import io.github.photowey.kafka.plus.core.clients.builder.producer.ProducerBuilder; +import io.github.photowey.kafka.plus.runtime.service.ProducerService; + +/** + * {@code ProducerServiceImpl} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public class ProducerServiceImpl implements ProducerService { + + @Override + public ProducerBuilder createProducer() { + return null; + } +} \ No newline at end of file diff --git a/kafka-plus-runtime/src/main/resources/.Keep b/kafka-plus-runtime/src/main/resources/.Keep new file mode 100644 index 0000000..e69de29 diff --git a/kafka-plus-runtime/src/test/java/io/github/photowey/kafka/plus/runtime/.Keep b/kafka-plus-runtime/src/test/java/io/github/photowey/kafka/plus/runtime/.Keep new file mode 100644 index 0000000..e69de29 diff --git a/kafka-plus-runtime/src/test/resources/.Keep b/kafka-plus-runtime/src/test/resources/.Keep new file mode 100644 index 0000000..e69de29 diff --git a/pom.xml b/pom.xml index 87af5fd..a224eb5 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,7 @@ kafka-plus-core kafka-plus-engine + kafka-plus-runtime @@ -78,6 +79,32 @@ + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + io.github.photowey + kafka-plus-core + ${project.version} + + + io.github.photowey + kafka-plus-engine + ${project.version} + + + io.github.photowey + kafka-plus-runtime + ${project.version} + + + + dev From 13d2da0cd16be1e1fec69fdc11db7766ffb506ce Mon Sep 17 00:00:00 2001 From: photowey Date: Fri, 5 Apr 2024 20:21:59 +0800 Subject: [PATCH 2/5] feat: Add: 1.KafkaEngineGetter, Signed-off-by: photowey --- .../kafka/plus/engine/AbstractEngine.java | 13 +++++++- .../kafka/plus/engine/KafkaEngineGetter.java | 33 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineGetter.java diff --git a/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/AbstractEngine.java b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/AbstractEngine.java index 20f3504..f655fab 100644 --- a/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/AbstractEngine.java +++ b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/AbstractEngine.java @@ -15,6 +15,8 @@ */ package io.github.photowey.kafka.plus.engine; +import io.github.photowey.kafka.plus.engine.holder.KafkaEngineHolder; + import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; @@ -43,7 +45,16 @@ public T getSharedObject(Class sharedType) { } public T getSharedObject(Class sharedType, Supplier fx) { - Object target = this.sharedObjects.computeIfAbsent(sharedType, (x) -> fx.get()); + Object target = this.sharedObjects.computeIfAbsent(sharedType, (x) -> { + T t = fx.get(); + // Inject KafkaEngine if necessary. + if (t instanceof KafkaEngineAware) { + ((KafkaEngineAware) t).setKafkaEngine(KafkaEngineHolder.INSTANCE.kafkaEngine()); + } + + return t; + }); + return (T) target; } diff --git a/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineGetter.java b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineGetter.java new file mode 100644 index 0000000..cdeb4e0 --- /dev/null +++ b/kafka-plus-engine/src/main/java/io/github/photowey/kafka/plus/engine/KafkaEngineGetter.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.engine; + +/** + * {@code KafkaEngineGetter} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public interface KafkaEngineGetter { + + /** + * Get {@link KafkaEngine} instance. + * + * @return {@link KafkaEngine} + */ + KafkaEngine kafkaEngine(); +} From 06e1e34d631268f4fe72e582c185a1631b59e775 Mon Sep 17 00:00:00 2001 From: photowey Date: Fri, 5 Apr 2024 21:36:35 +0800 Subject: [PATCH 3/5] feat: Add: 1.parameter check for component builders. Signed-off-by: photowey --- .../core/clients/builder/AbstractBuilder.java | 13 +++ .../clients/builder/admin/AdminBuilder.java | 2 +- .../builder/admin/AdminBuilderImpl.java | 5 +- .../builder/admin/topic/NewTopicBuilder.java | 17 ++++ .../admin/topic/NewTopicBuilderImpl.java | 95 +++++++++++++++++++ .../builder/consumer/ConsumerBuilder.java | 2 +- .../builder/consumer/ConsumerBuilderImpl.java | 5 +- .../builder/producer/ProducerBuilder.java | 3 +- .../builder/producer/ProducerBuilderImpl.java | 5 +- .../exception/KafkaPlusRuntimeException.java | 46 +++++++++ .../plus/runtime/service/AdminService.java | 12 +++ .../service/impl/AdminServiceImpl.java | 7 ++ 12 files changed, 205 insertions(+), 7 deletions(-) create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/exception/KafkaPlusRuntimeException.java diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/AbstractBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/AbstractBuilder.java index 1b4de6c..bf2c7fb 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/AbstractBuilder.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/AbstractBuilder.java @@ -15,6 +15,8 @@ */ package io.github.photowey.kafka.plus.core.clients.builder; +import io.github.photowey.kafka.plus.core.exception.KafkaPlusRuntimeException; + import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -37,4 +39,15 @@ protected void initConfigsIfNecessary() { } } + protected void checkPropsIfNecessary() { + if (null == this.props || this.props.isEmpty()) { + throw new KafkaPlusRuntimeException("The props can't be null/empty"); + } + } + + protected void checkConfigsIfNecessary() { + if (null == this.configs || this.configs.isEmpty()) { + throw new KafkaPlusRuntimeException("The configs can't be null/empty"); + } + } } \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilder.java index 2d01019..175aab0 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilder.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilder.java @@ -40,5 +40,5 @@ public interface AdminBuilder { AdminBuilder checkConfigs(Consumer> fx); - Admin create(); + Admin build(); } diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilderImpl.java index 0f27f9d..b6b119e 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilderImpl.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/AdminBuilderImpl.java @@ -100,11 +100,14 @@ public AdminBuilder checkConfigs(Consumer> fx) { // ---------------------------------------------------------------- @Override - public Admin create() { + public Admin build() { if (null != super.props) { + this.checkPropsIfNecessary(); + return Admin.create(super.props); } + this.checkConfigsIfNecessary(); return Admin.create(super.configs); } } \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java index 81fef1f..9b467d4 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java @@ -15,6 +15,11 @@ */ package io.github.photowey.kafka.plus.core.clients.builder.admin.topic; +import org.apache.kafka.clients.admin.NewTopic; + +import java.util.List; +import java.util.Map; + /** * {@code NewTopicBuilder} * @@ -23,4 +28,16 @@ * @since 1.0.0 */ public interface NewTopicBuilder { + + NewTopicBuilder topic(String topic); + + NewTopicBuilder numPartitions(Integer numPartitions); + + NewTopicBuilder replicationFactor(Short replicationFactor); + + NewTopicBuilder replicasAssignments(Map> replicasAssignments); + + NewTopicBuilder configs(Map configs); + + NewTopic build(); } diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java index cca3aab..c7d39ae 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java @@ -15,6 +15,13 @@ */ package io.github.photowey.kafka.plus.core.clients.builder.admin.topic; +import io.github.photowey.kafka.plus.core.exception.KafkaPlusRuntimeException; +import org.apache.kafka.clients.admin.NewTopic; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + /** * {@code NewTopicBuilderImpl} * @@ -23,4 +30,92 @@ * @since 1.0.0 */ public class NewTopicBuilderImpl implements NewTopicBuilder { + + private String name; + private Integer numPartitions; + private Short replicationFactor; + private Map> replicasAssignments; + private Map configs = null; + + @Override + public NewTopicBuilder topic(String topic) { + this.name = topic; + + return this; + } + + @Override + public NewTopicBuilder numPartitions(Integer numPartitions) { + this.numPartitions = numPartitions; + + return this; + } + + @Override + public NewTopicBuilder replicationFactor(Short replicationFactor) { + this.replicationFactor = replicationFactor; + + return this; + } + + @Override + public NewTopicBuilder replicasAssignments(Map> replicasAssignments) { + this.replicasAssignments = replicasAssignments; + + return this; + } + + @Override + public NewTopicBuilder configs(Map configs) { + this.configs = configs; + + return this; + } + + @Override + public NewTopic build() { + this.check(); + + if (null != this.replicasAssignments) { + this.checkReplicasAssignmentsIfNecessary(); + + NewTopic newTopic = new NewTopic(this.name, this.replicasAssignments); + this.configs(newTopic, this.configs); + + return newTopic; + } + + NewTopic newTopic = new NewTopic(this.name, Optional.ofNullable(this.numPartitions), Optional.ofNullable(this.replicationFactor)); + this.configs(newTopic, this.configs); + + return newTopic; + } + + // ---------------------------------------------------------------- + + private void check() { + this.checkTopic(); + } + + // ---------------------------------------------------------------- + + private void checkTopic() { + if (null == this.name || this.name.isEmpty()) { + throw new KafkaPlusRuntimeException("The topic name can't be null/empty."); + } + } + + protected void checkReplicasAssignmentsIfNecessary() { + if (this.replicasAssignments.isEmpty()) { + throw new KafkaPlusRuntimeException("The replicasAssignments can't be empty"); + } + } + + // ---------------------------------------------------------------- + + private void configs(NewTopic topic, Map configs) { + if (null != configs) { + topic.configs(configs); + } + } } \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java index e9f0fad..2316bcb 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java @@ -52,5 +52,5 @@ public interface ConsumerBuilder { // ---------------------------------------------------------------- - KafkaConsumer create(); + KafkaConsumer build(); } diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java index 882b9fc..34df66f 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java @@ -96,11 +96,14 @@ public ConsumerBuilder checkConfigs(Consumer> fx) { @Override @SuppressWarnings("unchecked") - public KafkaConsumer create() { + public KafkaConsumer build() { if (null != super.props) { + this.checkPropsIfNecessary(); + return new KafkaConsumer<>(super.props, (Deserializer) this.keyDeserializer, (Deserializer) this.valueDeserializer); } + this.checkConfigsIfNecessary(); return new KafkaConsumer<>(this.configs, (Deserializer) this.keyDeserializer, (Deserializer) this.valueDeserializer); } } \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java index 7f578fe..08f23e8 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java @@ -52,6 +52,5 @@ public interface ProducerBuilder { // ---------------------------------------------------------------- - KafkaProducer create(); - + KafkaProducer build(); } \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java index bf1aa6b..08f452c 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java @@ -96,11 +96,14 @@ public ProducerBuilder checkConfigs(Consumer> fx) { @Override @SuppressWarnings("unchecked") - public KafkaProducer create() { + public KafkaProducer build() { if (null != super.props) { + this.checkPropsIfNecessary(); + return new KafkaProducer<>(super.props, (Serializer) this.keySerializer, (Serializer) this.valueSerializer); } + this.checkConfigsIfNecessary(); return new KafkaProducer<>(super.configs, (Serializer) this.keySerializer, (Serializer) this.valueSerializer); } } \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/exception/KafkaPlusRuntimeException.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/exception/KafkaPlusRuntimeException.java new file mode 100644 index 0000000..224db1d --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/exception/KafkaPlusRuntimeException.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.exception; + +/** + * {@code KafkaPlusRuntimeException} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public class KafkaPlusRuntimeException extends RuntimeException { + + public KafkaPlusRuntimeException() { + super(); + } + + public KafkaPlusRuntimeException(String message, Object... args) { + super(String.format(message, args)); + } + + public KafkaPlusRuntimeException(Throwable cause, String message, Object... args) { + super(String.format(message, args), cause); + } + + public KafkaPlusRuntimeException(Throwable cause) { + super(cause); + } + + protected KafkaPlusRuntimeException(Throwable cause, boolean enableSuppression, boolean writableStackTrace, String message, Object... args) { + super(String.format(message, args), cause, enableSuppression, writableStackTrace); + } +} \ No newline at end of file diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/AdminService.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/AdminService.java index 44c3421..7769090 100644 --- a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/AdminService.java +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/AdminService.java @@ -16,6 +16,7 @@ package io.github.photowey.kafka.plus.runtime.service; import io.github.photowey.kafka.plus.core.clients.builder.admin.AdminBuilder; +import io.github.photowey.kafka.plus.core.clients.builder.admin.topic.NewTopicBuilder; /** * {@code AdminService} @@ -26,6 +27,17 @@ */ public interface AdminService { + /** + * Create {@link AdminBuilder} instance. + * + * @return {@link AdminBuilder} + */ AdminBuilder createAdmin(); + /** + * Create {@link NewTopicBuilder} instance. + * + * @return {@link NewTopicBuilder} + */ + NewTopicBuilder createTopic(); } \ No newline at end of file diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/AdminServiceImpl.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/AdminServiceImpl.java index 79e60f6..7db4a8e 100644 --- a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/AdminServiceImpl.java +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/AdminServiceImpl.java @@ -17,6 +17,8 @@ import io.github.photowey.kafka.plus.core.clients.builder.admin.AdminBuilder; import io.github.photowey.kafka.plus.core.clients.builder.admin.AdminBuilderImpl; +import io.github.photowey.kafka.plus.core.clients.builder.admin.topic.NewTopicBuilder; +import io.github.photowey.kafka.plus.core.clients.builder.admin.topic.NewTopicBuilderImpl; import io.github.photowey.kafka.plus.runtime.service.AdminService; /** @@ -32,4 +34,9 @@ public class AdminServiceImpl implements AdminService { public AdminBuilder createAdmin() { return new AdminBuilderImpl(); } + + @Override + public NewTopicBuilder createTopic() { + return new NewTopicBuilderImpl(); + } } \ No newline at end of file From 8fa4e35bd3f1214bf3f7e1d4a093463d643f53af Mon Sep 17 00:00:00 2001 From: photowey Date: Sat, 6 Apr 2024 00:01:04 +0800 Subject: [PATCH 4/5] feat: Update: 1.Kafka components and add ut. Signed-off-by: photowey --- .../builder/admin/topic/NewTopicBuilder.java | 2 +- .../admin/topic/NewTopicBuilderImpl.java | 6 +- .../builder/consumer/ConsumerBuilder.java | 29 ++++- .../builder/consumer/ConsumerBuilderImpl.java | 70 +++++++++++- .../builder/producer/ProducerBuilder.java | 13 +++ .../builder/producer/ProducerBuilderImpl.java | 20 +++- .../photowey/kafka/plus/core/enums/Kafka.java | 88 ++++++++++++++- kafka-plus-engine/pom.xml | 7 +- .../github/photowey/kafka/plus/engine/.Keep | 0 .../kafka/plus/engine/AdminServiceTest.java | 73 ++++++++++++ .../plus/engine/ConsumerServiceTest.java | 106 ++++++++++++++++++ .../photowey/kafka/plus/engine/LocalTest.java | 66 +++++++++++ .../plus/engine/ProducerServiceTest.java | 105 +++++++++++++++++ .../service/impl/ProducerServiceImpl.java | 3 +- 14 files changed, 573 insertions(+), 15 deletions(-) delete mode 100644 kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/.Keep create mode 100644 kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/AdminServiceTest.java create mode 100644 kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ConsumerServiceTest.java create mode 100644 kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/LocalTest.java create mode 100644 kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ProducerServiceTest.java diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java index 9b467d4..2ca6cc2 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilder.java @@ -33,7 +33,7 @@ public interface NewTopicBuilder { NewTopicBuilder numPartitions(Integer numPartitions); - NewTopicBuilder replicationFactor(Short replicationFactor); + NewTopicBuilder replicationFactor(Integer replicationFactor); NewTopicBuilder replicasAssignments(Map> replicasAssignments); diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java index c7d39ae..f7844b3 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/admin/topic/NewTopicBuilderImpl.java @@ -52,8 +52,10 @@ public NewTopicBuilder numPartitions(Integer numPartitions) { } @Override - public NewTopicBuilder replicationFactor(Short replicationFactor) { - this.replicationFactor = replicationFactor; + public NewTopicBuilder replicationFactor(Integer replicationFactor) { + if (null != replicationFactor) { + this.replicationFactor = replicationFactor.shortValue(); + } return this; } diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java index 2316bcb..c012660 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilder.java @@ -15,11 +15,11 @@ */ package io.github.photowey.kafka.plus.core.clients.builder.consumer; +import io.github.photowey.kafka.plus.core.enums.Kafka; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.Deserializer; -import java.util.Map; -import java.util.Properties; +import java.util.*; import java.util.function.Consumer; /** @@ -33,6 +33,24 @@ public interface ConsumerBuilder { ConsumerBuilder boostrapServers(String bootstrapServers); + default ConsumerBuilder keyDeserializer(Class keyDeserializer) { + return this.keyDeserializer(keyDeserializer.getName()); + } + + default ConsumerBuilder valueDeserializer(Class valueDeserializer) { + return this.valueDeserializer(valueDeserializer.getName()); + } + + ConsumerBuilder keyDeserializer(String keyDeserializer); + + ConsumerBuilder valueDeserializer(String valueDeserializer); + + ConsumerBuilder autoOffsetReset(Kafka.Consumer.AutoOffsetReset offsetReset); + + ConsumerBuilder groupId(String groupId); + + ConsumerBuilder autoCommitEnabled(boolean enabled); + // ---------------------------------------------------------------- ConsumerBuilder keyDeserializer(Deserializer keyDeserializer); @@ -50,6 +68,13 @@ public interface ConsumerBuilder { ConsumerBuilder checkConfigs(Consumer> fx); + // ---------------------------------------------------------------- + default ConsumerBuilder subscribe(String... topics) { + return this.subscribe(new HashSet<>(Arrays.asList(topics))); + } + + ConsumerBuilder subscribe(Collection topics); + // ---------------------------------------------------------------- KafkaConsumer build(); diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java index 34df66f..5d14323 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/consumer/ConsumerBuilderImpl.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.Deserializer; +import java.util.Collection; import java.util.Map; import java.util.Properties; import java.util.function.Consumer; @@ -36,6 +37,8 @@ public class ConsumerBuilderImpl extends AbstractBuilder implements ConsumerBuil private Deserializer keyDeserializer; private Deserializer valueDeserializer; + private Collection topics; + @Override public ConsumerBuilder boostrapServers(String bootstrapServers) { super.initConfigsIfNecessary(); @@ -44,6 +47,46 @@ public ConsumerBuilder boostrapServers(String bootstrapServers) { return this; } + @Override + public ConsumerBuilder keyDeserializer(String keyDeserializer) { + super.initConfigsIfNecessary(); + super.configs.put(Kafka.Consumer.KEY_DESERIALIZER.key(), keyDeserializer); + + return this; + } + + @Override + public ConsumerBuilder valueDeserializer(String valueDeserializer) { + super.initConfigsIfNecessary(); + super.configs.put(Kafka.Consumer.VALUE_DESERIALIZER.key(), valueDeserializer); + + return this; + } + + @Override + public ConsumerBuilder autoOffsetReset(Kafka.Consumer.AutoOffsetReset offsetReset) { + super.initConfigsIfNecessary(); + super.configs.put(Kafka.Consumer.AUTO_OFFSET_RESET.key(), offsetReset.value()); + + return this; + } + + @Override + public ConsumerBuilder groupId(String groupId) { + super.initConfigsIfNecessary(); + super.configs.put(Kafka.Consumer.GROUP_ID.key(), groupId); + + return this; + } + + @Override + public ConsumerBuilder autoCommitEnabled(boolean enabled) { + super.initConfigsIfNecessary(); + super.configs.put(Kafka.Consumer.AUTO_COMMIT_ENABLED.key(), String.valueOf(enabled)); + + return this; + } + // ---------------------------------------------------------------- @Override @@ -94,16 +137,39 @@ public ConsumerBuilder checkConfigs(Consumer> fx) { // ---------------------------------------------------------------- + @Override + public ConsumerBuilder subscribe(Collection topics) { + this.topics = topics; + + return this; + } + + // ---------------------------------------------------------------- + @Override @SuppressWarnings("unchecked") public KafkaConsumer build() { if (null != super.props) { this.checkPropsIfNecessary(); - return new KafkaConsumer<>(super.props, (Deserializer) this.keyDeserializer, (Deserializer) this.valueDeserializer); + KafkaConsumer consumer = new KafkaConsumer<>( + super.props, (Deserializer) this.keyDeserializer, (Deserializer) this.valueDeserializer); + this.subscribe(consumer, this.topics); + + return consumer; } this.checkConfigsIfNecessary(); - return new KafkaConsumer<>(this.configs, (Deserializer) this.keyDeserializer, (Deserializer) this.valueDeserializer); + KafkaConsumer consumer = new KafkaConsumer<>( + this.configs, (Deserializer) this.keyDeserializer, (Deserializer) this.valueDeserializer); + this.subscribe(consumer, this.topics); + + return consumer; + } + + private void subscribe(KafkaConsumer consumer, Collection topics) { + if (null != this.topics && !this.topics.isEmpty()) { + consumer.subscribe(topics); + } } } \ No newline at end of file diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java index 08f23e8..21bdce1 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilder.java @@ -37,8 +37,21 @@ public interface ProducerBuilder { ProducerBuilder keySerializer(Serializer keySerializer); + ProducerBuilder valueSerializer(Serializer valueSerializer); + default ProducerBuilder keySerializer(Class keySerializer) { + return this.keySerializer(keySerializer.getName()); + } + + default ProducerBuilder valueSerializer(Class valueSerializer) { + return this.valueSerializer(valueSerializer.getName()); + } + + ProducerBuilder keySerializer(String keySerializer); + + ProducerBuilder valueSerializer(String valueSerializer); + // ---------------------------------------------------------------- ProducerBuilder props(Properties props); diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java index 08f452c..f9dc64c 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/producer/ProducerBuilderImpl.java @@ -50,14 +50,30 @@ public ProducerBuilder boostrapServers(String bootstrapServers) { public ProducerBuilder keySerializer(Serializer keySerializer) { this.keySerializer = keySerializer; - return null; + return this; } @Override public ProducerBuilder valueSerializer(Serializer valueSerializer) { this.valueSerializer = valueSerializer; - return null; + return this; + } + + @Override + public ProducerBuilder keySerializer(String keySerializer) { + super.initConfigsIfNecessary(); + super.configs.put(Kafka.Producer.KEY_SERIALIZER.key(), keySerializer); + + return this; + } + + @Override + public ProducerBuilder valueSerializer(String valueSerializer) { + super.initConfigsIfNecessary(); + super.configs.put(Kafka.Producer.VALUE_DESERIALIZER.key(), valueSerializer); + + return this; } // ---------------------------------------------------------------- diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java index ce73772..15294e5 100644 --- a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/enums/Kafka.java @@ -15,7 +15,10 @@ */ package io.github.photowey.kafka.plus.core.enums; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; /** * {@code Kafka} @@ -43,18 +46,24 @@ public enum Bootstrap { public enum Server { - ADDRESS("bootstrap.servers", AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + ADDRESS(CommonClientConfigs.BOOTSTRAP_SERVERS_DOC, AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + DEFAULT_LOCALHOST("bootstrap.servers.default.localhost", "localhost:9092"), + DEFAULT_LOOPBACK("bootstrap.servers.default.loopback", "127.0.0.1:9092"), ; - private String name; + private String doc; private String value; - Server(String name, String value) { - this.name = name; + Server(String doc, String value) { + this.doc = doc; this.value = value; } + public String doc() { + return this.doc; + } + public String value() { return this.value; } @@ -63,12 +72,83 @@ public String value() { public enum Consumer { + KEY_DESERIALIZER(ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), + VALUE_DESERIALIZER(ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), + + AUTO_OFFSET_RESET(ConsumerConfig.AUTO_OFFSET_RESET_DOC, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), + + GROUP_ID(CommonClientConfigs.GROUP_ID_DOC, ConsumerConfig.GROUP_ID_CONFIG), + AUTO_COMMIT_ENABLED("If true the consumer's offset will be periodically committed in the background.", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), + ; + + private String doc; + private String key; + + Consumer(String doc, String key) { + this.doc = doc; + this.key = key; + } + + public String doc() { + return this.doc; + } + + public String key() { + return this.key; + } + + // ---------------------------------------------------------------- + + public enum AutoOffsetReset { + + EARLIEST("automatically reset the offset to the earliest offset", "earliest"), + LATEST("automatically reset the offset to the earliest offset", "latest"), + NONE("throw exception to the consumer if no previous offset is found for the consumer's group", "none"), + ANYTHING("throw exception to the consumer", "anything else"), + + ; + + private String doc; + private String value; + + AutoOffsetReset(String doc, String value) { + this.doc = doc; + this.value = value; + } + + public String doc() { + return this.doc; + } + + public String value() { + return this.value; + } + } } public enum Producer { + KEY_SERIALIZER(ProducerConfig.KEY_SERIALIZER_CLASS_DOC, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), + VALUE_DESERIALIZER(ProducerConfig.VALUE_SERIALIZER_CLASS_DOC, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), + ; + + private String doc; + private String key; + + Producer(String doc, String key) { + this.doc = doc; + this.key = key; + } + + public String doc() { + return this.doc; + } + + public String key() { + return this.key; + } } } diff --git a/kafka-plus-engine/pom.xml b/kafka-plus-engine/pom.xml index 486a80e..0738a2b 100644 --- a/kafka-plus-engine/pom.xml +++ b/kafka-plus-engine/pom.xml @@ -25,7 +25,12 @@ io.github.photowey kafka-plus-runtime - + + org.junit.jupiter + junit-jupiter-api + test + + \ No newline at end of file diff --git a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/.Keep b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/.Keep deleted file mode 100644 index e69de29..0000000 diff --git a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/AdminServiceTest.java b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/AdminServiceTest.java new file mode 100644 index 0000000..995b5e8 --- /dev/null +++ b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/AdminServiceTest.java @@ -0,0 +1,73 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.engine; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.KafkaFuture; +import org.junit.jupiter.api.Assertions; + +import java.util.Collections; + +/** + * {@code AdminServiceTest} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +class AdminServiceTest extends LocalTest { + + //@Test + void testCreateTopic() throws Exception { + KafkaEngine kafkaEngine = super.kafkaEngine(); + try (Admin admin = super.kafkaEngine().adminService().createAdmin() + .boostrapServers(this.defaultBoostrapServers()) + .checkConfigs(super::testBoostrapServers) + .build()) { + + NewTopic topic = kafkaEngine.adminService().createTopic() + .topic(this.defaultTopic()) + .numPartitions(1) + .replicationFactor(1) + .build(); + + CreateTopicsResult topicsResult = admin.createTopics(Collections.singleton(topic)); + KafkaFuture f1 = topicsResult.all(); + f1.get(); + + Assertions.assertTrue(f1.isDone()); + } + } + + //@Test + void testDeleteTopic() throws Exception { + KafkaEngine kafkaEngine = super.kafkaEngine(); + try (Admin admin = kafkaEngine.adminService().createAdmin() + .boostrapServers(this.defaultBoostrapServers()) + .checkConfigs(super::testBoostrapServers) + .build()) { + + DeleteTopicsResult topicsResult = admin.deleteTopics(Collections.singleton(this.defaultTopic())); + KafkaFuture f1 = topicsResult.all(); + f1.get(); + + Assertions.assertTrue(f1.isDone()); + } + } +} \ No newline at end of file diff --git a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ConsumerServiceTest.java b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ConsumerServiceTest.java new file mode 100644 index 0000000..bd6ae16 --- /dev/null +++ b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ConsumerServiceTest.java @@ -0,0 +1,106 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.engine; + +import io.github.photowey.kafka.plus.core.enums.Kafka; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Assertions; + +import java.time.Duration; +import java.util.Collections; + +/** + * {@code ConsumerServiceTest} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +class ConsumerServiceTest extends LocalTest { + + //@Test + void testConsumer() { + try (KafkaConsumer consumer = this.kafkaEngine().consumerService().createConsumer() + .boostrapServers(this.defaultBoostrapServers()) + .keyDeserializer(StringDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .autoOffsetReset(Kafka.Consumer.AutoOffsetReset.EARLIEST) + .groupId(this.defaultGroup()) + .autoCommitEnabled(true) + .checkConfigs(super::testBoostrapServers) + .build()) { + + consumer.subscribe(Collections.singletonList(this.defaultTopic())); + + Assertions.assertNotNull(consumer); + } + } + + //@Test + void testConsumer_deserializer_class() { + try (KafkaConsumer consumer = this.kafkaEngine().consumerService().createConsumer() + .boostrapServers(this.defaultBoostrapServers()) + .keyDeserializer(StringDeserializer.class) + .valueDeserializer(StringDeserializer.class) + .autoOffsetReset(Kafka.Consumer.AutoOffsetReset.EARLIEST) + .groupId(this.defaultGroup()) + .autoCommitEnabled(true) + .checkConfigs(super::testBoostrapServers) + .build()) { + + consumer.subscribe(Collections.singletonList(this.defaultTopic())); + + for (int i = 0; i < 15; i++) { + + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + System.out.println("Key = " + record.key() + ", Value = " + record.value()); + } + + sleep(1_000L); + } + } + } + + //@Test + void testConsumer_deserializer_string() { + try (KafkaConsumer consumer = this.kafkaEngine().consumerService().createConsumer() + .boostrapServers(this.defaultBoostrapServers()) + .keyDeserializer(StringDeserializer.class.getName()) + .valueDeserializer(StringDeserializer.class.getName()) + .autoOffsetReset(Kafka.Consumer.AutoOffsetReset.EARLIEST) + .groupId(this.defaultGroup()) + .autoCommitEnabled(true) + .checkConfigs(super::testBoostrapServers) + .build()) { + + consumer.subscribe(Collections.singletonList(this.defaultTopic())); + + for (int i = 0; i < 15; i++) { + + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + System.out.println("Key = " + record.key() + ", Value = " + record.value()); + } + + sleep(1_000L); + } + } + } +} \ No newline at end of file diff --git a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/LocalTest.java b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/LocalTest.java new file mode 100644 index 0000000..164f63d --- /dev/null +++ b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/LocalTest.java @@ -0,0 +1,66 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.engine; + +import io.github.photowey.kafka.plus.core.enums.Kafka; +import io.github.photowey.kafka.plus.engine.holder.KafkaEngineHolder; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * {@code LocalTest} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +public abstract class LocalTest { + + protected static final String DEFAULT_BOOTSTRAP_SERVERS = Kafka.Bootstrap.Server.DEFAULT_LOCALHOST.value(); + protected static final String DEFAULT_HELLO_WORLD_TOPIC = "io.github.photowey.topic.helloworld"; + protected static final String DEFAULT_HELLO_WORLD_GROUP = "io.github.photowey.group.helloworld"; + + protected KafkaEngine kafkaEngine() { + return KafkaEngineHolder.INSTANCE.kafkaEngine(); + } + + protected String defaultTopic() { + return DEFAULT_HELLO_WORLD_TOPIC; + } + + protected String defaultGroup() { + return DEFAULT_HELLO_WORLD_GROUP; + } + + protected String defaultBoostrapServers() { + return DEFAULT_BOOTSTRAP_SERVERS; + } + + protected void testBoostrapServers(Map configs) { + if (null == configs.get(Kafka.Bootstrap.Server.ADDRESS.value())) { + throw new RuntimeException("The bootstrap server address can't be none/empty"); + } + } + + protected static void sleep(long millis) { + try { + TimeUnit.MILLISECONDS.sleep(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ProducerServiceTest.java b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ProducerServiceTest.java new file mode 100644 index 0000000..7fcf3a2 --- /dev/null +++ b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ProducerServiceTest.java @@ -0,0 +1,105 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.engine; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * {@code ProducerServiceTest} + * + * @author photowey + * @date 2024/04/05 + * @since 1.0.0 + */ +class ProducerServiceTest extends LocalTest { + + @Test + void testProducer() { + StringSerializer keySerializer = new StringSerializer(); + StringSerializer valueSerializer = new StringSerializer(); + try (KafkaProducer producer = this.kafkaEngine().producerService().createProducer() + .boostrapServers(this.defaultBoostrapServers()) + .keySerializer(keySerializer) + .valueSerializer(valueSerializer) + .build()) { + + Assertions.assertNotNull(producer); + } + } + + //@Test + void testProducer_serializer_instance() { + StringSerializer keySerializer = new StringSerializer(); + StringSerializer valueSerializer = new StringSerializer(); + try (KafkaProducer producer = this.kafkaEngine().producerService().createProducer() + .boostrapServers(this.defaultBoostrapServers()) + .keySerializer(keySerializer) + .valueSerializer(valueSerializer) + .build()) { + + for (int i = 0; i < 100; i++) { + ProducerRecord record = new ProducerRecord<>( + this.defaultTopic(), "key-" + i, "value-" + i + ); + producer.send(record); + } + } + + sleep(1_000L); + } + + //@Test + void testProducer_serializer_class() { + try (KafkaProducer producer = this.kafkaEngine().producerService().createProducer() + .boostrapServers(this.defaultBoostrapServers()) + .keySerializer(StringSerializer.class) + .valueSerializer(StringSerializer.class) + .build()) { + + for (int i = 0; i < 100; i++) { + ProducerRecord record = new ProducerRecord<>( + this.defaultTopic(), "key-" + i, "value-" + i + ); + producer.send(record); + } + } + + sleep(1_000L); + } + + //@Test + void testProducer_serializer_string() { + try (KafkaProducer producer = this.kafkaEngine().producerService().createProducer() + .boostrapServers(this.defaultBoostrapServers()) + .keySerializer(StringSerializer.class.getName()) + .valueSerializer(StringSerializer.class.getName()) + .build()) { + + for (int i = 0; i < 100; i++) { + ProducerRecord record = new ProducerRecord<>( + this.defaultTopic(), "key-" + i, "value-" + i + ); + producer.send(record); + } + } + + sleep(1_000L); + } +} \ No newline at end of file diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java index 565eff2..4d07372 100644 --- a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java @@ -16,6 +16,7 @@ package io.github.photowey.kafka.plus.runtime.service.impl; import io.github.photowey.kafka.plus.core.clients.builder.producer.ProducerBuilder; +import io.github.photowey.kafka.plus.core.clients.builder.producer.ProducerBuilderImpl; import io.github.photowey.kafka.plus.runtime.service.ProducerService; /** @@ -29,6 +30,6 @@ public class ProducerServiceImpl implements ProducerService { @Override public ProducerBuilder createProducer() { - return null; + return new ProducerBuilderImpl(); } } \ No newline at end of file From add10c086a136c1280bd5ae85971ccde54403c0c Mon Sep 17 00:00:00 2001 From: photowey Date: Sat, 6 Apr 2024 00:27:29 +0800 Subject: [PATCH 5/5] feat: Add: 1.Kafka ProducerRecord builder and ut. Signed-off-by: photowey --- .../builder/record/ProducerRecordBuilder.java | 43 +++++++ .../record/ProducerRecordBuilderImpl.java | 112 ++++++++++++++++++ .../kafka/plus/engine/AdminServiceTest.java | 18 ++- .../plus/engine/ConsumerServiceTest.java | 17 ++- .../plus/engine/ProducerServiceTest.java | 58 +++++++-- .../plus/runtime/service/ProducerService.java | 8 ++ .../service/impl/ProducerServiceImpl.java | 7 ++ 7 files changed, 244 insertions(+), 19 deletions(-) create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/record/ProducerRecordBuilder.java create mode 100644 kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/record/ProducerRecordBuilderImpl.java diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/record/ProducerRecordBuilder.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/record/ProducerRecordBuilder.java new file mode 100644 index 0000000..a552d21 --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/record/ProducerRecordBuilder.java @@ -0,0 +1,43 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.clients.builder.record; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; + +/** + * {@code ProducerRecordBuilder} + * + * @author photowey + * @date 2024/04/06 + * @since 1.0.0 + */ +public interface ProducerRecordBuilder { + + ProducerRecordBuilder topic(String topic); + + ProducerRecordBuilder partition(Integer partition); + + ProducerRecordBuilder headers(Iterable
headers); + + ProducerRecordBuilder key(K key); + + ProducerRecordBuilder value(V value); + + ProducerRecordBuilder timestamp(Long timestamp); + + ProducerRecord build(); +} diff --git a/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/record/ProducerRecordBuilderImpl.java b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/record/ProducerRecordBuilderImpl.java new file mode 100644 index 0000000..b0d1ede --- /dev/null +++ b/kafka-plus-core/src/main/java/io/github/photowey/kafka/plus/core/clients/builder/record/ProducerRecordBuilderImpl.java @@ -0,0 +1,112 @@ +/* + * Copyright © 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.photowey.kafka.plus.core.clients.builder.record; + +import io.github.photowey.kafka.plus.core.clients.builder.AbstractBuilder; +import io.github.photowey.kafka.plus.core.exception.KafkaPlusRuntimeException; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; + +/** + * {@code ProducerRecordBuilderImpl} + * + * @author photowey + * @date 2024/04/06 + * @since 1.0.0 + */ +public class ProducerRecordBuilderImpl extends AbstractBuilder implements ProducerRecordBuilder { + + private String topic; + private Integer partition; + private Iterable
headers; + private Object key; + private Object value; + private Long timestamp; + + @Override + public ProducerRecordBuilder topic(String topic) { + this.topic = topic; + + return this; + } + + @Override + public ProducerRecordBuilder partition(Integer partition) { + this.partition = partition; + + return this; + } + + @Override + public ProducerRecordBuilder headers(Iterable
headers) { + this.headers = headers; + + return this; + } + + @Override + public ProducerRecordBuilder key(K key) { + this.key = key; + + return this; + } + + @Override + public ProducerRecordBuilder value(V value) { + this.value = value; + + return this; + } + + @Override + public ProducerRecordBuilder timestamp(Long timestamp) { + this.timestamp = timestamp; + + return this; + } + + @Override + @SuppressWarnings("unchecked") + public ProducerRecord build() { + this.check(); + + return new ProducerRecord<>(this.topic, this.partition, this.timestamp, (K) this.key, (V) this.value, this.headers); + } + + private void check() { + this.checkTopic(); + this.checkKey(); + this.checkValue(); + } + + private void checkTopic() { + if (null == this.topic || this.topic.isEmpty()) { + throw new KafkaPlusRuntimeException("The topic name can't be null/empty."); + } + } + + private void checkKey() { + if (null == this.key) { + throw new KafkaPlusRuntimeException("The key can't be null."); + } + } + + private void checkValue() { + if (null == this.value) { + throw new KafkaPlusRuntimeException("The value can't be null."); + } + } +} \ No newline at end of file diff --git a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/AdminServiceTest.java b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/AdminServiceTest.java index 995b5e8..c26c29d 100644 --- a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/AdminServiceTest.java +++ b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/AdminServiceTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import java.util.Collections; @@ -33,10 +34,24 @@ */ class AdminServiceTest extends LocalTest { + @Test + void testAdmin() { + KafkaEngine kafkaEngine = super.kafkaEngine(); + + try (Admin admin = kafkaEngine.adminService().createAdmin() + .boostrapServers(this.defaultBoostrapServers()) + .checkConfigs(super::testBoostrapServers) + .build()) { + + Assertions.assertNotNull(admin); + } + } + //@Test void testCreateTopic() throws Exception { KafkaEngine kafkaEngine = super.kafkaEngine(); - try (Admin admin = super.kafkaEngine().adminService().createAdmin() + + try (Admin admin = kafkaEngine.adminService().createAdmin() .boostrapServers(this.defaultBoostrapServers()) .checkConfigs(super::testBoostrapServers) .build()) { @@ -58,6 +73,7 @@ void testCreateTopic() throws Exception { //@Test void testDeleteTopic() throws Exception { KafkaEngine kafkaEngine = super.kafkaEngine(); + try (Admin admin = kafkaEngine.adminService().createAdmin() .boostrapServers(this.defaultBoostrapServers()) .checkConfigs(super::testBoostrapServers) diff --git a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ConsumerServiceTest.java b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ConsumerServiceTest.java index bd6ae16..aeda638 100644 --- a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ConsumerServiceTest.java +++ b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ConsumerServiceTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Collections; @@ -34,9 +35,11 @@ */ class ConsumerServiceTest extends LocalTest { - //@Test + @Test void testConsumer() { - try (KafkaConsumer consumer = this.kafkaEngine().consumerService().createConsumer() + KafkaEngine kafkaEngine = this.kafkaEngine(); + + try (KafkaConsumer consumer = kafkaEngine.consumerService().createConsumer() .boostrapServers(this.defaultBoostrapServers()) .keyDeserializer(StringDeserializer.class) .valueDeserializer(StringDeserializer.class) @@ -52,9 +55,11 @@ void testConsumer() { } } - //@Test + @Test void testConsumer_deserializer_class() { - try (KafkaConsumer consumer = this.kafkaEngine().consumerService().createConsumer() + KafkaEngine kafkaEngine = this.kafkaEngine(); + + try (KafkaConsumer consumer = kafkaEngine.consumerService().createConsumer() .boostrapServers(this.defaultBoostrapServers()) .keyDeserializer(StringDeserializer.class) .valueDeserializer(StringDeserializer.class) @@ -80,7 +85,9 @@ void testConsumer_deserializer_class() { //@Test void testConsumer_deserializer_string() { - try (KafkaConsumer consumer = this.kafkaEngine().consumerService().createConsumer() + KafkaEngine kafkaEngine = this.kafkaEngine(); + + try (KafkaConsumer consumer = kafkaEngine.consumerService().createConsumer() .boostrapServers(this.defaultBoostrapServers()) .keyDeserializer(StringDeserializer.class.getName()) .valueDeserializer(StringDeserializer.class.getName()) diff --git a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ProducerServiceTest.java b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ProducerServiceTest.java index 7fcf3a2..ae2295c 100644 --- a/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ProducerServiceTest.java +++ b/kafka-plus-engine/src/test/java/io/github/photowey/kafka/plus/engine/ProducerServiceTest.java @@ -34,7 +34,10 @@ class ProducerServiceTest extends LocalTest { void testProducer() { StringSerializer keySerializer = new StringSerializer(); StringSerializer valueSerializer = new StringSerializer(); - try (KafkaProducer producer = this.kafkaEngine().producerService().createProducer() + + KafkaEngine kafkaEngine = this.kafkaEngine(); + + try (KafkaProducer producer = kafkaEngine.producerService().createProducer() .boostrapServers(this.defaultBoostrapServers()) .keySerializer(keySerializer) .valueSerializer(valueSerializer) @@ -44,20 +47,39 @@ void testProducer() { } } + @Test + void testProducerRecord() { + KafkaEngine kafkaEngine = this.kafkaEngine(); + + ProducerRecord record = kafkaEngine.producerService().createProducerRecord() + .topic(this.defaultTopic()) + .key("key-9527") + .value("value-9527") + .build(); + + Assertions.assertNotNull(record); + } + //@Test void testProducer_serializer_instance() { StringSerializer keySerializer = new StringSerializer(); StringSerializer valueSerializer = new StringSerializer(); - try (KafkaProducer producer = this.kafkaEngine().producerService().createProducer() + + KafkaEngine kafkaEngine = this.kafkaEngine(); + + try (KafkaProducer producer = kafkaEngine.producerService().createProducer() .boostrapServers(this.defaultBoostrapServers()) .keySerializer(keySerializer) .valueSerializer(valueSerializer) .build()) { for (int i = 0; i < 100; i++) { - ProducerRecord record = new ProducerRecord<>( - this.defaultTopic(), "key-" + i, "value-" + i - ); + ProducerRecord record = kafkaEngine.producerService().createProducerRecord() + .topic(this.defaultTopic()) + .key("key-" + i) + .value("value-" + i) + .build(); + producer.send(record); } } @@ -67,16 +89,21 @@ void testProducer_serializer_instance() { //@Test void testProducer_serializer_class() { - try (KafkaProducer producer = this.kafkaEngine().producerService().createProducer() + KafkaEngine kafkaEngine = this.kafkaEngine(); + + try (KafkaProducer producer = kafkaEngine.producerService().createProducer() .boostrapServers(this.defaultBoostrapServers()) .keySerializer(StringSerializer.class) .valueSerializer(StringSerializer.class) .build()) { for (int i = 0; i < 100; i++) { - ProducerRecord record = new ProducerRecord<>( - this.defaultTopic(), "key-" + i, "value-" + i - ); + ProducerRecord record = kafkaEngine.producerService().createProducerRecord() + .topic(this.defaultTopic()) + .key("key-" + i) + .value("value-" + i) + .build(); + producer.send(record); } } @@ -86,16 +113,21 @@ void testProducer_serializer_class() { //@Test void testProducer_serializer_string() { - try (KafkaProducer producer = this.kafkaEngine().producerService().createProducer() + KafkaEngine kafkaEngine = this.kafkaEngine(); + + try (KafkaProducer producer = kafkaEngine.producerService().createProducer() .boostrapServers(this.defaultBoostrapServers()) .keySerializer(StringSerializer.class.getName()) .valueSerializer(StringSerializer.class.getName()) .build()) { for (int i = 0; i < 100; i++) { - ProducerRecord record = new ProducerRecord<>( - this.defaultTopic(), "key-" + i, "value-" + i - ); + ProducerRecord record = kafkaEngine.producerService().createProducerRecord() + .topic(this.defaultTopic()) + .key("key-" + i) + .value("value-" + i) + .build(); + producer.send(record); } } diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ProducerService.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ProducerService.java index 52524a3..e576dc8 100644 --- a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ProducerService.java +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/ProducerService.java @@ -16,6 +16,7 @@ package io.github.photowey.kafka.plus.runtime.service; import io.github.photowey.kafka.plus.core.clients.builder.producer.ProducerBuilder; +import io.github.photowey.kafka.plus.core.clients.builder.record.ProducerRecordBuilder; /** * {@code ProducerService} @@ -26,6 +27,13 @@ */ public interface ProducerService { + /** + * Create {@link ProducerRecordBuilder} instance. + * + * @return {@link ProducerRecordBuilder} + */ + ProducerRecordBuilder createProducerRecord(); + /** * Create {@link ProducerBuilder} instance. * diff --git a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java index 4d07372..64aceb9 100644 --- a/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java +++ b/kafka-plus-runtime/src/main/java/io/github/photowey/kafka/plus/runtime/service/impl/ProducerServiceImpl.java @@ -17,6 +17,8 @@ import io.github.photowey.kafka.plus.core.clients.builder.producer.ProducerBuilder; import io.github.photowey.kafka.plus.core.clients.builder.producer.ProducerBuilderImpl; +import io.github.photowey.kafka.plus.core.clients.builder.record.ProducerRecordBuilder; +import io.github.photowey.kafka.plus.core.clients.builder.record.ProducerRecordBuilderImpl; import io.github.photowey.kafka.plus.runtime.service.ProducerService; /** @@ -32,4 +34,9 @@ public class ProducerServiceImpl implements ProducerService { public ProducerBuilder createProducer() { return new ProducerBuilderImpl(); } + + @Override + public ProducerRecordBuilder createProducerRecord() { + return new ProducerRecordBuilderImpl(); + } } \ No newline at end of file