Skip to content

Commit

Permalink
Merge pull request #1 from photowey/dev
Browse files Browse the repository at this point in the history
[dev] Enhancements to Kafka Admin, Consumer, Producer and other components have been completed.
  • Loading branch information
photowey authored Apr 5, 2024
2 parents d922eb7 + add10c0 commit 3d039bd
Show file tree
Hide file tree
Showing 36 changed files with 2,098 additions and 8 deletions.
4 changes: 4 additions & 0 deletions kafka-plus-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
<!-- @formatter:on -->

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright © 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.photowey.kafka.plus.core.clients.builder;

import io.github.photowey.kafka.plus.core.exception.KafkaPlusRuntimeException;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
* {@code AbstractService}
*
* @author photowey
* @date 2024/04/05
* @since 1.0.0
*/
public abstract class AbstractBuilder {

protected Map<String, Object> configs;
protected Properties props;

protected void initConfigsIfNecessary() {
if (null == this.configs) {
this.configs = new HashMap<>(1 << 3);
}
}

protected void checkPropsIfNecessary() {
if (null == this.props || this.props.isEmpty()) {
throw new KafkaPlusRuntimeException("The props can't be null/empty");
}
}

protected void checkConfigsIfNecessary() {
if (null == this.configs || this.configs.isEmpty()) {
throw new KafkaPlusRuntimeException("The configs can't be null/empty");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.photowey.kafka.plus.core.clients.builder.admin;

import org.apache.kafka.clients.admin.Admin;

import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;

/**
* {@code AdminBuilder}
*
* @author photowey
* @date 2024/04/05
* @since 1.0.0
*/
public interface AdminBuilder {

AdminBuilder boostrapServers(String bootstrapServers);

AdminBuilder props(Properties props);

AdminBuilder configs(Map<String, Object> configMap);

AdminBuilder checkProps(Consumer<Properties> fx);

AdminBuilder checkConfigs(Consumer<Map<String, Object>> fx);

Admin build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright © 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.photowey.kafka.plus.core.clients.builder.admin;

import io.github.photowey.kafka.plus.core.clients.builder.AbstractBuilder;
import io.github.photowey.kafka.plus.core.enums.Kafka;
import org.apache.kafka.clients.admin.Admin;

import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;

/**
* {@code AdminBuilderImpl}
* <p>
* Examples:
* <pre>
* Properties props = new Properties();
* Admin admin = new AdminBuilderImpl()
* .props(props)
* .checkProps((x) -> {})
* .build();
* </pre>
*
* <pre>
* Map<String, Object> configMap = new HashMap<>();
* Admin admin = new AdminBuilderImpl()
* .configMap(configMap)
* .checkMap((x) -> {})
* .build();
* </pre>
*
* <pre>
* String bootstrapServers = "localhost:9092";
* Admin admin = new AdminBuilderImpl()
* .boostrapServers(bootstrapServers)
* .checkMap((x) -> {})
* .build();
* </pre>
*
* @author photowey
* @date 2024/04/05
* @since 1.0.0
*/
public class AdminBuilderImpl extends AbstractBuilder implements AdminBuilder {

@Override
public AdminBuilder boostrapServers(String bootstrapServers) {
super.initConfigsIfNecessary();
super.configs.put(Kafka.Bootstrap.Server.ADDRESS.value(), bootstrapServers);

return this;
}

// ----------------------------------------------------------------

@Override
public AdminBuilder props(Properties props) {
super.props = props;

return this;
}

@Override
public AdminBuilder configs(Map<String, Object> configs) {
super.configs = configs;

return this;
}

// ----------------------------------------------------------------

@Override
public AdminBuilder checkProps(Consumer<Properties> fx) {
fx.accept(super.props);

return this;
}

@Override
public AdminBuilder checkConfigs(Consumer<Map<String, Object>> fx) {
fx.accept(super.configs);

return this;
}

// ----------------------------------------------------------------

@Override
public Admin build() {
if (null != super.props) {
this.checkPropsIfNecessary();

return Admin.create(super.props);
}

this.checkConfigsIfNecessary();
return Admin.create(super.configs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright © 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.photowey.kafka.plus.core.clients.builder.admin.topic;

import org.apache.kafka.clients.admin.NewTopic;

import java.util.List;
import java.util.Map;

/**
* {@code NewTopicBuilder}
*
* @author photowey
* @date 2024/04/05
* @since 1.0.0
*/
public interface NewTopicBuilder {

NewTopicBuilder topic(String topic);

NewTopicBuilder numPartitions(Integer numPartitions);

NewTopicBuilder replicationFactor(Integer replicationFactor);

NewTopicBuilder replicasAssignments(Map<Integer, List<Integer>> replicasAssignments);

NewTopicBuilder configs(Map<String, String> configs);

NewTopic build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright © 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.photowey.kafka.plus.core.clients.builder.admin.topic;

import io.github.photowey.kafka.plus.core.exception.KafkaPlusRuntimeException;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* {@code NewTopicBuilderImpl}
*
* @author photowey
* @date 2024/04/05
* @since 1.0.0
*/
public class NewTopicBuilderImpl implements NewTopicBuilder {

private String name;
private Integer numPartitions;
private Short replicationFactor;
private Map<Integer, List<Integer>> replicasAssignments;
private Map<String, String> configs = null;

@Override
public NewTopicBuilder topic(String topic) {
this.name = topic;

return this;
}

@Override
public NewTopicBuilder numPartitions(Integer numPartitions) {
this.numPartitions = numPartitions;

return this;
}

@Override
public NewTopicBuilder replicationFactor(Integer replicationFactor) {
if (null != replicationFactor) {
this.replicationFactor = replicationFactor.shortValue();
}

return this;
}

@Override
public NewTopicBuilder replicasAssignments(Map<Integer, List<Integer>> replicasAssignments) {
this.replicasAssignments = replicasAssignments;

return this;
}

@Override
public NewTopicBuilder configs(Map<String, String> configs) {
this.configs = configs;

return this;
}

@Override
public NewTopic build() {
this.check();

if (null != this.replicasAssignments) {
this.checkReplicasAssignmentsIfNecessary();

NewTopic newTopic = new NewTopic(this.name, this.replicasAssignments);
this.configs(newTopic, this.configs);

return newTopic;
}

NewTopic newTopic = new NewTopic(this.name, Optional.ofNullable(this.numPartitions), Optional.ofNullable(this.replicationFactor));
this.configs(newTopic, this.configs);

return newTopic;
}

// ----------------------------------------------------------------

private void check() {
this.checkTopic();
}

// ----------------------------------------------------------------

private void checkTopic() {
if (null == this.name || this.name.isEmpty()) {
throw new KafkaPlusRuntimeException("The topic name can't be null/empty.");
}
}

protected void checkReplicasAssignmentsIfNecessary() {
if (this.replicasAssignments.isEmpty()) {
throw new KafkaPlusRuntimeException("The replicasAssignments can't be empty");
}
}

// ----------------------------------------------------------------

private void configs(NewTopic topic, Map<String, String> configs) {
if (null != configs) {
topic.configs(configs);
}
}
}
Loading

0 comments on commit 3d039bd

Please sign in to comment.