diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..8751b74
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,10 @@
+.idea
+target/
+node_modules/
+/bin/
+/build/
+.classpath
+.gradle
+.project
+.settings
+.git
diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..fa3df9b
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,7 @@
+root = true
+
+[*]
+charset = utf-8
+trim_trailing_whitespace = true
+indent_style = space
+indent_size = 2
diff --git a/.gitignore b/.gitignore
index c2e6efc..847f0d4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,9 @@
.idea
target/
node_modules/
+/bin/
+/build/
+.classpath
+.gradle
+.project
+.settings
diff --git a/Dockerfile b/Dockerfile
deleted file mode 120000
index 4fc9ef5..0000000
--- a/Dockerfile
+++ /dev/null
@@ -1 +0,0 @@
-Dockerfile.prod
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..79eabe2
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,39 @@
+FROM solsson/kafka-jre@sha256:06dabfc8cacd0687c8f52c52afd650444fb6d4a8e0b85f68557e6e7a5c71667c \
+ as build
+
+ENV GRADLE_VERSION=4.3.1
+
+RUN set -ex; \
+ export DEBIAN_FRONTEND=noninteractive; \
+ runDeps='curl'; \
+ buildDeps='ca-certificates unzip'; \
+ apt-get update && apt-get install -y $runDeps $buildDeps --no-install-recommends; \
+ \
+ cd /opt; \
+ curl -SLs -o gradle-$GRADLE_VERSION-bin.zip https://services.gradle.org/distributions/gradle-$GRADLE_VERSION-bin.zip; \
+ unzip gradle-$GRADLE_VERSION-bin.zip; \
+ rm gradle-$GRADLE_VERSION-bin.zip; \
+ ln -s /opt/gradle-$GRADLE_VERSION/bin/gradle /usr/local/bin/gradle; \
+ gradle -v
+
+WORKDIR /opt/src/kafka-topic-client
+COPY build.gradle ./
+
+RUN set -ex; \
+ mkdir -p src/main/java; \
+ echo "public class Dummy {}" > src/main/java/Dummy.java; \
+ gradle build; \
+ rm src/main/java/Dummy.java
+
+COPY . .
+
+RUN set -ex; \
+ gradle build
+
+FROM solsson/kafka-jre@sha256:06dabfc8cacd0687c8f52c52afd650444fb6d4a8e0b85f68557e6e7a5c71667c
+
+COPY --from=build /opt/src/kafka-topic-client/build/libs /usr/share/java/kafka-topic-client
+
+ENTRYPOINT [ "java", \
+ "-cp", "/usr/share/java/kafka-topic-client/*:/etc/kafka-topic-client/*", \
+ "se.yolean.kafka.topic.client.cli.Client" ]
diff --git a/Dockerfile.dev b/Dockerfile.dev
deleted file mode 100644
index 4fa908b..0000000
--- a/Dockerfile.dev
+++ /dev/null
@@ -1,15 +0,0 @@
-FROM maven:3.3.9-jdk-8
-
-WORKDIR /usr/src/app
-
-COPY target/kafka-topic-client-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-topic-client.jar
-
-ENV ZOOKEEPER_CONNECT "zookeeper:2181"
-ENV TOPIC_NAME "build-contract-test"
-ENV RESET_TOPIC false
-ENV NUM_PARTITIONS 1
-ENV NUM_REPLICAS 1
-ENV NUM_CREATE_RETRIES 5
-
-ENTRYPOINT ["java", "-jar", "kafka-topic-client.jar"]
-
diff --git a/Dockerfile.prod b/Dockerfile.prod
deleted file mode 100644
index 6804142..0000000
--- a/Dockerfile.prod
+++ /dev/null
@@ -1,30 +0,0 @@
-FROM maven:3.3.9-jdk-8
-
-WORKDIR /usr/src/app
-
-COPY maven-docker-build-settings.xml $MAVEN_CONFIG/settings.xml
-
-COPY pom.xml .
-
-RUN mkdir -p src/main/java src/test/java
-
-RUN mvn package
-
-COPY src src
-
-RUN mvn package
-
-RUN cp target/kafka-topic-client-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-topic-client.jar
-
-# This cleanup will probably not reduce image size as the layers have already been produced
-RUN mvn clean && rm -Rf /m2-build-repository && rm $MAVEN_CONFIG/settings.xml
-
-ENV ZOOKEEPER_CONNECT "zookeeper:2181"
-ENV TOPIC_NAME "build-contract-test"
-ENV RESET_TOPIC false
-ENV NUM_PARTITIONS 1
-ENV NUM_REPLICAS 1
-ENV NUM_CREATE_RETRIES 5
-
-ENTRYPOINT ["java", "-jar", "kafka-topic-client.jar"]
-
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..f433b1a
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,177 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..5bf13c4
--- /dev/null
+++ b/README.md
@@ -0,0 +1,7 @@
+
+
+## The `Topic` schema
+
+See `src/main/avro/Topic.avsc`.
+
+IDEs can get generated Java using `gradle build`.
diff --git a/build-contracts/docker-compose-create.yml b/build-contracts/docker-compose-create.yml
index 52809cb..7d7e50b 100644
--- a/build-contracts/docker-compose-create.yml
+++ b/build-contracts/docker-compose-create.yml
@@ -8,8 +8,10 @@ services:
- zookeeper
client:
build: ../
+ image: localhost:5000/yolean/kafka-topic-client:$PUSH_TAG
labels:
com.yolean.build-contract: ""
+ com.yolean.build-target: ""
links:
- zookeeper
test:
diff --git a/build.gradle b/build.gradle
new file mode 100644
index 0000000..c5c2db1
--- /dev/null
+++ b/build.gradle
@@ -0,0 +1,80 @@
+
+repositories {
+ mavenCentral()
+ jcenter()
+ maven {
+ url "http://packages.confluent.io/maven/"
+ }
+}
+
+apply plugin: 'java'
+apply plugin: "idea"
+apply plugin: "eclipse"
+apply plugin: "maven"
+apply plugin: "jacoco"
+
+group 'se.yolean'
+
+sourceCompatibility = 1.8
+
+apply plugin: 'application'
+mainClassName = 'se.yolean.kafka.topic.client.cli.Client'
+
+configurations {
+ compile.exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+ compile.exclude group: 'log4j', module: 'log4j'
+}
+
+dependencies {
+ compile group: 'javax.inject', name: 'javax.inject', version: '1'
+ compile group: 'com.google.inject', name: 'guice', version: '4.1.0'
+
+ compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.0.0'
+
+ compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '1.0.0'
+ compile group: 'com.101tec', name: 'zkclient', version: '0.10'
+
+ compile group: 'io.confluent', name: 'kafka-schema-registry-client', version: '4.0.0'
+
+ compile group: 'com.nurkiewicz.asyncretry', name: 'asyncretry', version: '0.0.7'
+
+ runtime group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'
+ runtime group: 'org.slf4j', name: 'log4j-over-slf4j', version: '1.7.25'
+ compile group: 'structlog4j', name: 'structlog4j-api', version: '1.0.0'
+ compile group: 'structlog4j', name: 'structlog4j-json', version: '1.0.0'
+
+ compile group: 'io.prometheus', name: 'simpleclient', version: '0.1.0'
+ compile group: 'io.prometheus', name: 'simpleclient_httpserver', version: '0.1.0'
+
+ testCompile group: 'junit', name: 'junit', version: '4.12'
+ testCompile group: 'org.mockito', name: 'mockito-core', version: '2.12.0'
+}
+
+task copyToLib(type: Copy) {
+ into "$buildDir/libs"
+ from configurations.runtime
+}
+
+build.dependsOn(copyToLib)
+
+// for .editorconfig support in Eclipse
+buildscript {
+ repositories {
+ jcenter()
+ }
+ dependencies {
+ classpath 'org.standardout:gradle-eclipseconfig:1.1.0'
+ classpath 'org.unbroken-dome.gradle-plugins:gradle-testsets-plugin:1.4.2'
+ classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.12.0'
+ }
+}
+apply plugin: 'org.standardout.eclipseconfig'
+
+apply plugin: 'org.unbroken-dome.test-sets'
+testSets {
+ itest
+}
+
+apply plugin: 'com.commercehub.gradle.plugin.avro'
+avro {
+}
diff --git a/kafka-topic-client.iml b/kafka-topic-client.iml
deleted file mode 100644
index 1f63cc9..0000000
--- a/kafka-topic-client.iml
+++ /dev/null
@@ -1,34 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/maven-docker-build-settings.xml b/maven-docker-build-settings.xml
deleted file mode 100644
index 4b756bb..0000000
--- a/maven-docker-build-settings.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
- /m2-build-repository
-
diff --git a/pom.xml b/pom.xml
deleted file mode 100644
index d7302f3..0000000
--- a/pom.xml
+++ /dev/null
@@ -1,57 +0,0 @@
-
-
- 4.0.0
-
- yolean
- kafka-topic-client
- 1.0-SNAPSHOT
-
-
- 1.8
- 1.8
-
-
-
-
-
- org.apache.kafka
- kafka_2.12
- 1.0.0
-
-
- com.101tec
- zkclient
- 0.10
-
-
-
-
-
-
-
- maven-assembly-plugin
-
-
-
- Client
-
-
-
- jar-with-dependencies
-
-
-
-
- make-assembly
- package
-
- single
-
-
-
-
-
-
-
diff --git a/src/itest/java/se/yolean/kafka/topic/client/cli/ManagedTopicsServiceTest.java b/src/itest/java/se/yolean/kafka/topic/client/cli/ManagedTopicsServiceTest.java
new file mode 100644
index 0000000..7409bcd
--- /dev/null
+++ b/src/itest/java/se/yolean/kafka/topic/client/cli/ManagedTopicsServiceTest.java
@@ -0,0 +1,39 @@
+package se.yolean.kafka.topic.client.cli;
+
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.Properties;
+
+import org.junit.Test;
+
+public class ManagedTopicsServiceTest {
+
+ private Properties getProperties(String... props) {
+ Properties config = new Properties();
+ for (String path : props) {
+ try {
+ Reader source = new FileReader(path);
+ config.load(source);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(path, e);
+ } catch (IOException e) {
+ throw new RuntimeException(path, e);
+ }
+ }
+ return config;
+ }
+
+ @Test
+ public void testManagedTopicsService() {
+ Properties config = getProperties(
+ "src/main/resources/default.properties",
+ "src/itest/resources/itest-dockercompose.properties"
+ );
+ config.setProperty("topic.declarations.consumer.polls.max", Integer.toString(3));
+ ManagedTopicsService service = new ManagedTopicsService(config);
+ service.start();
+ }
+
+}
diff --git a/src/itest/java/se/yolean/kafka/topic/client/config/ItestProps.java b/src/itest/java/se/yolean/kafka/topic/client/config/ItestProps.java
new file mode 100644
index 0000000..e632205
--- /dev/null
+++ b/src/itest/java/se/yolean/kafka/topic/client/config/ItestProps.java
@@ -0,0 +1,80 @@
+package se.yolean.kafka.topic.client.config;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+import se.yolean.kafka.topic.client.cli.Client;
+
+public class ItestProps extends AbstractModule {
+
+ //public static final ItestProps DOCKER_COMPOSE = new ItestProps("itest-dockercompose.properties");
+ public static final ItestProps DOCKER_COMPOSE = new ItestProps(new File("src/itest/resources/itest-dockercompose.properties"));
+
+ private Properties config;
+
+ public ItestProps(String itestPropertiesFielnameInClasspathRoot) {
+ this(getItestProperties(itestPropertiesFielnameInClasspathRoot));
+ }
+
+ public ItestProps(File itestPropertiesFile) {
+ this(getItestProperties(itestPropertiesFile));
+ }
+
+ protected ItestProps(Properties properties) {
+ this.config = properties;
+ }
+
+ public ItestProps override(String key, String value) {
+ Properties properties = new Properties();
+ properties.putAll(this.config);
+ properties.setProperty(key, value);
+ return new ItestProps(properties);
+ }
+
+ public ItestProps override(String key, int value) {
+ return this.override(key, Integer.toString(value));
+ }
+
+ @Override
+ protected void configure() {
+ System.out.print("Itest props: ");
+ this.config.list(System.out);
+ Names.bindProperties(super.binder(), this.config);
+ }
+
+ private static Properties getItestProperties(String itestPropertiesFielnameInClasspathRoot) {
+ Properties properties = new Properties();
+ try {
+ InputStream defaultProperties = Client.class.getResourceAsStream(Client.DEFAULT_PROPERTIES_FILE);
+ properties.load(defaultProperties);
+ InputStream itestProperties = ItestProps.class.getResourceAsStream(itestPropertiesFielnameInClasspathRoot);
+ properties.load(itestProperties);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return properties;
+ }
+
+ private static Properties getItestProperties(File itestPropertiesFile) {
+ Properties properties = new Properties();
+ try {
+ FileReader defaults = new FileReader(new File("src/main/resources/" + Client.DEFAULT_PROPERTIES_FILE));
+ properties.load(defaults);
+ FileReader itest = new FileReader(itestPropertiesFile);
+ properties.load(itest);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return properties;
+ }
+
+}
diff --git a/src/itest/java/se/yolean/kafka/topic/client/retryable/BrokerProbeIntegrationTest.java b/src/itest/java/se/yolean/kafka/topic/client/retryable/BrokerProbeIntegrationTest.java
new file mode 100644
index 0000000..402625e
--- /dev/null
+++ b/src/itest/java/se/yolean/kafka/topic/client/retryable/BrokerProbeIntegrationTest.java
@@ -0,0 +1,92 @@
+package se.yolean.kafka.topic.client.retryable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.junit.Test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import se.yolean.kafka.topic.client.config.AdminClientProvider;
+import se.yolean.kafka.topic.client.config.ItestProps;
+
+public class BrokerProbeIntegrationTest {
+
+ @Test
+ public void test() throws Exception {
+ Injector conf = Guice.createInjector(ItestProps.DOCKER_COMPOSE, new AbstractModule() {
+ protected void configure() {
+ bind(AdminClient.class).toProvider(AdminClientProvider.class);
+ }
+ });
+ BrokerProbe probe = conf.getInstance(BrokerProbe.class);
+ probe.call();
+ }
+
+ @Test
+ public void testTimeoutDescribeNodes() {
+ Injector conf = Guice.createInjector(ItestProps.DOCKER_COMPOSE.override("brokers.describe.timeout.ms", 1),
+ new AbstractModule() {
+ protected void configure() {
+ bind(AdminClient.class).toProvider(AdminClientProvider.class);
+ }
+ });
+ BrokerProbe.timeouts.clear();
+ try {
+ conf.getInstance(BrokerProbe.class).call();
+ fail("Should have thrown exception");
+ } catch (org.apache.kafka.common.errors.TimeoutException e) {
+ // ok, we don't wrap this unless we can also document a specific behavior
+ } catch (Exception e) {
+ fail("Should have thrown a specific exception");
+ }
+ assertEquals(1, BrokerProbe.timeouts.labels("broker_probe").get(), 0.1);
+ }
+
+ @Test
+ public void testTimeoutDescribeNodesGet() {
+ Injector conf = Guice.createInjector(ItestProps.DOCKER_COMPOSE
+ .override("brokers.describe.get.timeout.ms", 1),
+ new AbstractModule() {
+ protected void configure() {
+ bind(AdminClient.class).toProvider(AdminClientProvider.class);
+ }
+ });
+ BrokerProbe.timeouts.clear();
+ try {
+ conf.getInstance(BrokerProbe.class).call();
+ fail("Should have thrown exception");
+ } catch (java.util.concurrent.TimeoutException e) {
+ // ok, we don't wrap this unless we can also document a specific behavior
+ } catch (Exception e) {
+ fail("Should have thrown a specific exception");
+ }
+ assertEquals(1, BrokerProbe.timeouts.labels("broker_probe").get(), 0.1);
+ }
+
+ @Test
+ public void testBrokersNotEnough() {
+ Injector conf = Guice.createInjector(ItestProps.DOCKER_COMPOSE
+ .override("brokers.describe.available.min", 9),
+ new AbstractModule() {
+ protected void configure() {
+ bind(AdminClient.class).toProvider(AdminClientProvider.class);
+ }
+ });
+ BrokerProbe.timeouts.clear();
+ try {
+ conf.getInstance(BrokerProbe.class).call();
+ fail("Should have thrown exception");
+ } catch (NotEnoughBrokersException e) {
+ // good
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Should have thrown a specific exception");
+ }
+ assertEquals(0, BrokerProbe.timeouts.labels("broker_probe").get(), 0.1);
+ }
+
+}
diff --git a/src/itest/java/se/yolean/kafka/topic/client/service/IntegrationTestConfigLocalhost.java b/src/itest/java/se/yolean/kafka/topic/client/service/IntegrationTestConfigLocalhost.java
new file mode 100644
index 0000000..18a816d
--- /dev/null
+++ b/src/itest/java/se/yolean/kafka/topic/client/service/IntegrationTestConfigLocalhost.java
@@ -0,0 +1,33 @@
+package se.yolean.kafka.topic.client.service;
+
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+import se.yolean.kafka.topic.mgmt.TopicSchemaSourceProvider;
+
+public class IntegrationTestConfigLocalhost extends AbstractModule {
+
+ public static final int KAFKA_LISTENER_PORT = 9094;
+
+ @Override
+ protected void configure() {
+ bind(String.class).annotatedWith(Names.named("config:bootstrap")).toInstance("localhost:" + KAFKA_LISTENER_PORT);
+
+ bind(String.class).annotatedWith(Names.named("config:adminTopic")).toInstance("_topic_declarations");
+
+ bind(Integer.class).annotatedWith(Names.named("config:adminInitTimeoutMs")).toInstance(1000);
+
+ bind(Integer.class).annotatedWith(Names.named("config:adminTopicDesiredReplicationFactor")).toInstance(1);
+
+ bind(Properties.class).annotatedWith(Names.named("admin")).toProvider(AdminClientPropsProvider.class);
+
+ bind(String.class).annotatedWith(Names.named("config:schemaRegistryUrl")).toInstance("http://localhost:8081");
+
+ bind(Schema.class).toProvider(TopicSchemaSourceProvider.class);
+ }
+
+}
diff --git a/src/itest/java/se/yolean/kafka/topic/client/service/TopicDeclarationsTopicCheckTest.java b/src/itest/java/se/yolean/kafka/topic/client/service/TopicDeclarationsTopicCheckTest.java
new file mode 100644
index 0000000..6dc56cc
--- /dev/null
+++ b/src/itest/java/se/yolean/kafka/topic/client/service/TopicDeclarationsTopicCheckTest.java
@@ -0,0 +1,39 @@
+package se.yolean.kafka.topic.client.service;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TopicDeclarationsTopicCheckTest {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void test() throws Exception {
+ Injector injector = Guice.createInjector(new IntegrationTestConfigLocalhost());
+ TopicDeclarationsTopicCheck check = injector.getInstance(TopicDeclarationsTopicCheck.class);
+ check.createOrVerifyAdminTopic();
+ }
+
+}
diff --git a/src/itest/java/se/yolean/kafka/topic/mgmt/AdminSchemaUpdateTest.java b/src/itest/java/se/yolean/kafka/topic/mgmt/AdminSchemaUpdateTest.java
new file mode 100644
index 0000000..04527b3
--- /dev/null
+++ b/src/itest/java/se/yolean/kafka/topic/mgmt/AdminSchemaUpdateTest.java
@@ -0,0 +1,63 @@
+package se.yolean.kafka.topic.mgmt;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import se.yolean.kafka.topic.client.service.IntegrationTestConfigLocalhost;
+
+public class AdminSchemaUpdateTest {
+
+ private static Injector injector;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ injector = Guice.createInjector(
+ new IntegrationTestConfigLocalhost(),
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(SchemaRegistryClient.class).toProvider(SchemaRegistryClientProvider.class);
+ }
+ });
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void test() throws Exception {
+ AdminSchemaUpdate update = injector.getInstance(AdminSchemaUpdate.class);
+ update.getCurrentSchema();
+ }
+
+ @Test
+ public void uploadSchemaTest() throws Exception {
+ AdminSchemaUpdate update = injector.getInstance(AdminSchemaUpdate.class);
+ update.uploadCurrentSchema();
+ }
+
+}
diff --git a/src/itest/resources/itest-dockercompose.properties b/src/itest/resources/itest-dockercompose.properties
new file mode 100644
index 0000000..4ab8db7
--- /dev/null
+++ b/src/itest/resources/itest-dockercompose.properties
@@ -0,0 +1 @@
+bootstrap.servers=PLAINTEXT://localhost:9094
diff --git a/src/itest/resources/simplelogger.properties b/src/itest/resources/simplelogger.properties
new file mode 100644
index 0000000..ff165ea
--- /dev/null
+++ b/src/itest/resources/simplelogger.properties
@@ -0,0 +1,3 @@
+org.slf4j.simpleLogger.defaultLogLevel=info
+org.slf4j.simpleLogger.log.se.yolean=debug
+org.slf4j.simpleLogger.log.org.apache.kafka.clients.Metadata=debug
diff --git a/src/main/avro/Topic.avsc b/src/main/avro/Topic.avsc
new file mode 100644
index 0000000..fda5b1a
--- /dev/null
+++ b/src/main/avro/Topic.avsc
@@ -0,0 +1,8 @@
+{
+ "name": "Topic",
+ "namespace": "se.yolean.kafka.topic.declaration",
+ "type": "record",
+ "fields" : [
+ {"name": "name", "type": "string" }
+ ]
+}
diff --git a/src/main/java/Client.java b/src/main/java/se/yolean/kafka/topic/client/cli/Client.java
similarity index 56%
rename from src/main/java/Client.java
rename to src/main/java/se/yolean/kafka/topic/client/cli/Client.java
index 010107b..86283be 100644
--- a/src/main/java/Client.java
+++ b/src/main/java/se/yolean/kafka/topic/client/cli/Client.java
@@ -1,7 +1,13 @@
-import kafka.admin.AdminOperationException;
+package se.yolean.kafka.topic.client.cli;
+
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -13,6 +19,8 @@
public class Client {
+ public final static String DEFAULT_PROPERTIES_FILE = "default.properties";
+
final static String topicName = System.getenv("TOPIC_NAME");
final static boolean resetTopic = Boolean.parseBoolean(System.getenv("RESET_TOPIC"));
final static int partitions = Integer.parseInt(System.getenv("NUM_PARTITIONS"));
@@ -23,11 +31,56 @@ public class Client {
final static String zookeeperConnect = System.getenv("ZOOKEEPER_CONNECT");
+ static ClassLoader getClassLoaderForDefaults() {
+ return Client.class.getClassLoader();
+ }
+
+ static void managerStart(String managerPropertiesPath) {
+ Properties properties = new Properties();
+ InputStream defaultProperties = getClassLoaderForDefaults().getResourceAsStream(DEFAULT_PROPERTIES_FILE);
+ if (defaultProperties == null) {
+ throw new RuntimeException("Failed to load default properties " + DEFAULT_PROPERTIES_FILE);
+ }
+ try {
+ properties.load(defaultProperties);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to load default properties from " + DEFAULT_PROPERTIES_FILE, e);
+ }
+
+ File managerProperties = new File(managerPropertiesPath);
+ if (!managerProperties.exists()) {
+ throw new RuntimeException("Failed to find properties file " + managerPropertiesPath);
+ }
+ if (!managerProperties.canRead()) {
+ throw new RuntimeException("Unreadable properties file " + managerPropertiesPath);
+ }
+ FileReader managerPropertiesReader;
+ try {
+ managerPropertiesReader = new FileReader(managerProperties);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException("Reader failed to find properties file " + managerPropertiesPath, e);
+ }
+ try {
+ properties.load(managerPropertiesReader);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read properties file " + managerPropertiesPath, e);
+ }
+
+ new ManagedTopicsService(properties).start();
+ }
+
public static void main(String[] args) throws Exception {
- if (topicName.length() < 1) {
- throw new Exception("Missing environment variable 'TOPIC_NAME'!");
+ if (args.length > 0) {
+ String managerPropertiesPath = args[0];
+ managerStart(managerPropertiesPath);
+ return;
}
+ if (topicName.length() < 1) throw new Exception("Missing environment variable 'TOPIC_NAME'!");
+ if (zookeeperConnect.length() < 1) throw new Exception("Missing environment variable 'ZOOKEEKER_CONNECT'");
+
+ System.out.println("Connecting to zookeeper using address '" + zookeeperConnect + "'");
+
final int sessionTimeoutMs = 10 * 1000;
final int connectionTimeoutMs = 8 * 1000;
// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
@@ -69,6 +122,7 @@ private static void tryCreate(ZkUtils zkUtils, String topicName, int nRetriesLef
try {
AdminUtils.createTopic(zkUtils, topicName, partitions, replication, topicConfig, rackAwareMode);
} catch (Exception e) {
+ System.err.println("Topic create failed due to " + e.toString());
if (nRetriesLeft <= 0) {
throw new RuntimeException("Failed to create topic \"" + topicName + "\". Is Kafka and Zookeeper running?");
} else {
@@ -77,6 +131,8 @@ private static void tryCreate(ZkUtils zkUtils, String topicName, int nRetriesLef
tryCreate(zkUtils, topicName, nRetriesLeft - 1);
}
}
+
+ System.out.println("Successfully created topic '" + topicName + "'");
}
}
\ No newline at end of file
diff --git a/src/main/java/se/yolean/kafka/topic/client/cli/ManagedTopicsService.java b/src/main/java/se/yolean/kafka/topic/client/cli/ManagedTopicsService.java
new file mode 100644
index 0000000..5a54005
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/cli/ManagedTopicsService.java
@@ -0,0 +1,101 @@
+package se.yolean.kafka.topic.client.cli;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Properties;
+
+import com.github.structlog4j.ILogger;
+import com.github.structlog4j.SLoggerFactory;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import se.yolean.kafka.topic.client.config.ConfigModule;
+import se.yolean.kafka.topic.client.config.ManagerInitModule;
+import se.yolean.kafka.topic.client.config.MetricsModule;
+import se.yolean.kafka.topic.client.retryable.BrokerProbe;
+import se.yolean.kafka.topic.client.retryable.RestProxySetup;
+import se.yolean.kafka.topic.client.retryable.SchemaRegistrySetup;
+import se.yolean.kafka.topic.manager.configure.TopicDeclarationsPollModule;
+import se.yolean.kafka.topic.manager.tt.TopicsTopicWatcher;
+
+public class ManagedTopicsService implements Runnable {
+
+ public final ILogger log = SLoggerFactory.getLogger(this.getClass());
+
+ private final Injector serviceContext;
+
+ public ManagedTopicsService(Properties config) {
+ serviceContext = Guice.createInjector(
+ // ny async or retry behavior now, so configure long timeouts instead
+ //new ConcurrencyModule(),
+ new ConfigModule(config)
+ );
+ }
+
+ public void start() {
+ log.info("Starting Topic Manager Service without concurrency", "hostname", getHostname());
+ run();
+ }
+
+ public void stop() {
+ log.warn("TODO shutdown not implemented. Send termination signals or configure topic.declarations.consumer.polls.max.");
+ }
+
+ @Override
+ public void run() {
+ log.info("Running Topic Manager Service");
+
+ Injector initContext = serviceContext.createChildInjector(
+ new ManagerInitModule(),
+ new MetricsModule()
+ );
+
+ MetricsModule.Exporter exporter = initContext.getInstance(MetricsModule.Exporter.class);
+ log.info("Metrics exporter", "status", exporter.getStatus(), "port", exporter.getHttpPort());
+
+ BrokerProbe brokerProbe = initContext.getInstance(BrokerProbe.class);
+ BrokerProbe.KafkaStatus status;
+ try {
+ status = brokerProbe.call();
+ } catch (Exception e) {
+ throw new RuntimeException("unhandled", e);
+ }
+
+ SchemaRegistrySetup schemaRegistry = initContext.getInstance(SchemaRegistrySetup.class);
+ SchemaRegistrySetup.AdminSchemaStatus schemas;
+ try {
+ schemas = schemaRegistry.call();
+ } catch (Exception e) {
+ throw new RuntimeException("unhandled", e);
+ }
+
+ log.info("Both kafka and schema registry is ok, now create REST producer for declarations");
+ RestProxySetup restProxy = initContext.getInstance(RestProxySetup.class);
+
+ RestProxySetup.EndpointsStatus rest;
+ try {
+ rest = restProxy.call();
+ } catch (Exception e) {
+ throw new RuntimeException("unhandled", e);
+ }
+
+ log.info("REST endpoints also OK, let's start consuming topic declarations");
+
+ Injector managerContext = initContext.createChildInjector(
+ new TopicDeclarationsPollModule(status, schemas, rest));
+
+
+ TopicsTopicWatcher watch = managerContext.getInstance(TopicsTopicWatcher.class);
+ log.info("Handing control over to topic declarations poll loop", "impl", watch.getClass());
+ watch.run();
+ }
+
+ String getHostname() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ throw new RuntimeException("Failed to get hostname", e);
+ }
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/config/AdminClientProvider.java b/src/main/java/se/yolean/kafka/topic/client/config/AdminClientProvider.java
new file mode 100644
index 0000000..cc7ea38
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/config/AdminClientProvider.java
@@ -0,0 +1,34 @@
+package se.yolean.kafka.topic.client.config;
+
+import java.util.Properties;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Provider;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+
+import com.github.structlog4j.ILogger;
+import com.github.structlog4j.SLoggerFactory;
+
+public class AdminClientProvider implements Provider {
+
+ private final ILogger log = SLoggerFactory.getLogger(this.getClass());
+
+ private String bootstrap;
+
+ @Inject
+ public AdminClientProvider(@Named("bootstrap.servers") String bootstrap) {
+ this.bootstrap = bootstrap;
+ }
+
+ @Override
+ public AdminClient get() {
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
+ log.debug("Creating AdminClient", "bootstrap", bootstrap);
+ return AdminClient.create(props);
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/config/AdminConsumerPropsProvider.java b/src/main/java/se/yolean/kafka/topic/client/config/AdminConsumerPropsProvider.java
new file mode 100644
index 0000000..54d4c32
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/config/AdminConsumerPropsProvider.java
@@ -0,0 +1,39 @@
+package se.yolean.kafka.topic.client.config;
+
+import java.util.Properties;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Provider;
+
+/**
+ * Consume the admin topic.
+ */
+public class AdminConsumerPropsProvider implements Provider {
+
+ /**
+ * Same ID in all replicas, means they act as a consumer group.
+ *
+ * Value = {@value}
+ */
+ public static final String CONSUMER_GROUP_ID = "kafka-topic-client";
+
+ private String bootstrap;
+
+ @Inject
+ public AdminConsumerPropsProvider(@Named("config:bootstrap") String bootstrap) {
+ this.bootstrap = bootstrap;
+ }
+
+ @Override
+ public Properties get() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrap);
+ props.put("group.id", CONSUMER_GROUP_ID);
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ return props;
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/config/ConcurrencyModule.java b/src/main/java/se/yolean/kafka/topic/client/config/ConcurrencyModule.java
new file mode 100644
index 0000000..8dbfb0d
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/config/ConcurrencyModule.java
@@ -0,0 +1,22 @@
+package se.yolean.kafka.topic.client.config;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.inject.AbstractModule;
+
+public class ConcurrencyModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ // Using Scheduled because the retry lib depend(s|ed) on it
+ bind(ScheduledExecutorService.class).toProvider(ExecutorServiceProvider.class);
+
+ // I'm not so sure we should use this lib...
+ // Our callables have quite different characteristics,
+ // with kafka libs doing back-off within given timeouts.
+ // And we can probably do simple retries, while managing concurrency as suggested in
+ // http://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/
+ //bind(com.nurkiewicz.asyncretry.AsyncRetryExecutor.class).toProvider(ExecutorRetryProviderForInit.class);
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/config/ConfigModule.java b/src/main/java/se/yolean/kafka/topic/client/config/ConfigModule.java
new file mode 100644
index 0000000..4244ef4
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/config/ConfigModule.java
@@ -0,0 +1,36 @@
+package se.yolean.kafka.topic.client.config;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Properties;
+
+import org.apache.kafka.clients.admin.AdminClient;
+
+import com.github.structlog4j.ILogger;
+import com.github.structlog4j.SLoggerFactory;
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+public class ConfigModule extends AbstractModule {
+
+ private final ILogger log = SLoggerFactory.getLogger(this.getClass());
+
+ private Properties config;
+
+ public ConfigModule(Properties config) {
+ this.config = config;
+ logConfigValues();
+ }
+
+ void logConfigValues() {
+ StringWriter writer = new StringWriter();
+ config.list(new PrintWriter(writer));
+ log.info("Instance config: " + writer.getBuffer().toString());
+ }
+
+ @Override
+ protected void configure() {
+ Names.bindProperties(super.binder(), this.config);
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/config/DocumentedProperty.java b/src/main/java/se/yolean/kafka/topic/client/config/DocumentedProperty.java
new file mode 100644
index 0000000..4cc90d0
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/config/DocumentedProperty.java
@@ -0,0 +1,74 @@
+package se.yolean.kafka.topic.client.config;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DocumentedProperty {
+
+ private static final Map all = new LinkedHashMap<>(1);
+
+ static {
+ new DocumentedProperty("bootstrap.servers", Type.Str, true)
+ .setDescription("What any Kafka client nees");
+ }
+
+ public static final boolean has(String key) {
+ return all.containsKey(key);
+ }
+
+ public static final DocumentedProperty get(String key) {
+ return all.get(key);
+ }
+
+ /**
+ * Note that any property used as @Inject will be required regardless.
+ */
+ public static final List getRequired() {
+ return all.entrySet().stream()
+ .filter(p -> p.getValue().isRequired())
+ .map(p -> p.getKey())
+ .collect(Collectors.toList());
+ }
+
+ public enum Type {
+ Str,
+ Int,
+ Bool
+ }
+
+ private String key;
+ private Type type;
+ private boolean isRequired;
+ private String description = null;
+
+ private DocumentedProperty(String key, Type type, boolean isRequired) {
+ this.key = key;
+ this.type = type;
+ this.isRequired = isRequired;
+ all.put(key, this);
+ }
+
+ private DocumentedProperty setDescription(String description) {
+ this.description = description;
+ return this;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public boolean isRequired() {
+ return isRequired;
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/config/ExecutorServiceProvider.java b/src/main/java/se/yolean/kafka/topic/client/config/ExecutorServiceProvider.java
new file mode 100644
index 0000000..e6326c4
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/config/ExecutorServiceProvider.java
@@ -0,0 +1,26 @@
+package se.yolean.kafka.topic.client.config;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.inject.Provider;
+
+import com.github.structlog4j.ILogger;
+import com.github.structlog4j.SLoggerFactory;
+
+public class ExecutorServiceProvider implements Provider {
+
+ private final ILogger log = SLoggerFactory.getLogger(this.getClass());
+
+ private ScheduledExecutorService shared = null;
+
+ @Override
+ public ScheduledExecutorService get() {
+ if (shared == null) {
+ log.info("Creating new executor");
+ shared = Executors.newSingleThreadScheduledExecutor();
+ }
+ return shared;
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/config/ManagerInitModule.java b/src/main/java/se/yolean/kafka/topic/client/config/ManagerInitModule.java
new file mode 100644
index 0000000..db778ee
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/config/ManagerInitModule.java
@@ -0,0 +1,14 @@
+package se.yolean.kafka.topic.client.config;
+
+import org.apache.kafka.clients.admin.AdminClient;
+
+import com.google.inject.AbstractModule;
+
+public class ManagerInitModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(AdminClient.class).toProvider(AdminClientProvider.class);
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/config/MetricsModule.java b/src/main/java/se/yolean/kafka/topic/client/config/MetricsModule.java
new file mode 100644
index 0000000..6d3a3b7
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/config/MetricsModule.java
@@ -0,0 +1,65 @@
+package se.yolean.kafka.topic.client.config;
+
+import java.io.IOException;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Provider;
+
+import com.google.inject.AbstractModule;
+
+import io.prometheus.client.exporter.HTTPServer;
+
+public class MetricsModule extends AbstractModule implements Provider {
+
+ @Inject
+ @Named("prometheus.exporter.port")
+ private int port;
+
+ @Override
+ protected void configure() {
+ bind(Exporter.class).toProvider(this).asEagerSingleton();
+ }
+
+ @Override
+ public Exporter get() {
+ HTTPServer server;
+ try {
+ server = new HTTPServer(port);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to start metrics exporter on port " + port, e);
+ }
+
+ return new Exporter() {
+
+ public int getHttpPort() {
+ return port;
+ }
+
+ public void shutdown() {
+ server.stop();
+ }
+
+ @Override
+ public Status getStatus() {
+ return Status.running;
+ }
+
+ };
+ }
+
+ public interface Exporter {
+
+ enum Status {
+ running
+ }
+
+ int getHttpPort();
+
+ void shutdown();
+
+ Status getStatus();
+
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/retryable/BrokerProbe.java b/src/main/java/se/yolean/kafka/topic/client/retryable/BrokerProbe.java
new file mode 100644
index 0000000..1f1ab81
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/retryable/BrokerProbe.java
@@ -0,0 +1,93 @@
+package se.yolean.kafka.topic.client.retryable;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeClusterOptions;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+
+import com.github.structlog4j.ILogger;
+import com.github.structlog4j.SLoggerFactory;
+
+import io.prometheus.client.Counter;
+
+public class BrokerProbe implements Callable {
+
+ private final ILogger log = SLoggerFactory.getLogger(this.getClass());
+
+ // Should be made configurable, but let's keep them short and work on back-off
+
+ static final Counter timeouts = Counter.build()
+ .name("kafkatopics_timeouts").help("AdminClient.describeCluster timeouts")
+ .labelNames("broker_probe").register();
+
+ @Inject
+ private AdminClient adminClient;
+
+ @Inject
+ @Named("brokers.describe.timeout.ms")
+ private int describeTimeoutMs;
+
+ @Inject
+ @Named("brokers.describe.get.timeout.ms")
+ private int nodesTimeoutMs;
+
+ @Inject
+ @Named("brokers.describe.available.min")
+ private int brokersAvailableMin;
+
+ @Override
+ public KafkaStatus call() throws Exception {
+ DescribeClusterOptions options = new DescribeClusterOptions();
+ options.timeoutMs(describeTimeoutMs);
+ DescribeClusterResult describe = adminClient.describeCluster(options);
+
+ KafkaFuture> nodesFuture = describe.nodes();
+
+ Collection nodes = null;
+ try {
+ nodes = nodesFuture.get(nodesTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.error("Interrupted when waiting for nodes status", e);
+ throw e;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof org.apache.kafka.common.errors.TimeoutException) {
+ log.warn("Timeout waiting for describe nodes", "ms", describeTimeoutMs, "exception", e.getClass(),
+ "cause", e.getCause().getClass(), "causeMsg", e.getCause().getMessage());
+ timeouts.labels("broker_probe").inc();
+ throw (org.apache.kafka.common.errors.TimeoutException) e.getCause();
+ } else {
+ log.error("Execution error for nodes status", e);
+ throw e;
+ }
+ } catch (TimeoutException e) {
+ log.warn("Timeout waiting for nodes response", "ms", nodesTimeoutMs);
+ timeouts.labels("broker_probe").inc();
+ throw e;
+ } finally {
+ adminClient.close(); // depends on provider impl
+ }
+
+ if (nodes == null) {
+ throw new Exception("No broker information available");
+ }
+ if (nodes.size() < brokersAvailableMin) {
+ throw new NotEnoughBrokersException(brokersAvailableMin, nodes.size());
+ }
+
+ return new KafkaStatus();
+ }
+
+ public static class KafkaStatus {
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/retryable/NotEnoughBrokersException.java b/src/main/java/se/yolean/kafka/topic/client/retryable/NotEnoughBrokersException.java
new file mode 100644
index 0000000..d2c0fc8
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/retryable/NotEnoughBrokersException.java
@@ -0,0 +1,11 @@
+package se.yolean.kafka.topic.client.retryable;
+
+public class NotEnoughBrokersException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public NotEnoughBrokersException(int expected, int actual) {
+ super("Got " + actual + " brokers but at least " + expected + " is required");
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/retryable/RestProxySetup.java b/src/main/java/se/yolean/kafka/topic/client/retryable/RestProxySetup.java
new file mode 100644
index 0000000..ae40f74
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/retryable/RestProxySetup.java
@@ -0,0 +1,23 @@
+package se.yolean.kafka.topic.client.retryable;
+
+import java.util.concurrent.Callable;
+
+import com.github.structlog4j.ILogger;
+import com.github.structlog4j.SLoggerFactory;
+
+public class RestProxySetup implements Callable {
+
+ private final ILogger log = SLoggerFactory.getLogger(this.getClass());
+
+ @Override
+ public EndpointsStatus call() throws Exception {
+ log.warn("TODO set up REST endpoint for topic creation");
+ return new EndpointsStatus();
+ }
+
+ public static class EndpointsStatus {
+ }
+
+
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/retryable/SchemaRegistrySetup.java b/src/main/java/se/yolean/kafka/topic/client/retryable/SchemaRegistrySetup.java
new file mode 100644
index 0000000..2ed2c8c
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/retryable/SchemaRegistrySetup.java
@@ -0,0 +1,21 @@
+package se.yolean.kafka.topic.client.retryable;
+
+import java.util.concurrent.Callable;
+
+import com.github.structlog4j.ILogger;
+import com.github.structlog4j.SLoggerFactory;
+
+public class SchemaRegistrySetup implements Callable {
+
+ private final ILogger log = SLoggerFactory.getLogger(this.getClass());
+
+ @Override
+ public AdminSchemaStatus call() throws Exception {
+ log.warn("TODO idempotent conf of admin schema");
+ return new AdminSchemaStatus();
+ }
+
+ public static class AdminSchemaStatus {
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/retryable/TopicCreateOrVerify.java b/src/main/java/se/yolean/kafka/topic/client/retryable/TopicCreateOrVerify.java
new file mode 100644
index 0000000..58ea298
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/retryable/TopicCreateOrVerify.java
@@ -0,0 +1,7 @@
+package se.yolean.kafka.topic.client.retryable;
+
+import java.util.concurrent.Callable;
+
+public interface TopicCreateOrVerify extends Callable {
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/retryable/TopicOperationResult.java b/src/main/java/se/yolean/kafka/topic/client/retryable/TopicOperationResult.java
new file mode 100644
index 0000000..6a6d982
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/retryable/TopicOperationResult.java
@@ -0,0 +1,5 @@
+package se.yolean.kafka.topic.client.retryable;
+
+public interface TopicOperationResult {
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/service/AdminClientPropsProvider.java b/src/main/java/se/yolean/kafka/topic/client/service/AdminClientPropsProvider.java
new file mode 100644
index 0000000..1a87ac0
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/service/AdminClientPropsProvider.java
@@ -0,0 +1,28 @@
+package se.yolean.kafka.topic.client.service;
+
+import java.util.Properties;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Provider;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+
+@Deprecated // Inject AdminClient directly
+public class AdminClientPropsProvider implements Provider {
+
+ private String bootstrap;
+
+ @Inject
+ public AdminClientPropsProvider(@Named("config:bootstrap") String bootstrap) {
+ this.bootstrap = bootstrap;
+ }
+
+ @Override
+ public Properties get() {
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
+ return props;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/se/yolean/kafka/topic/client/service/StoreInitializationException.java b/src/main/java/se/yolean/kafka/topic/client/service/StoreInitializationException.java
new file mode 100644
index 0000000..6a8c862
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/service/StoreInitializationException.java
@@ -0,0 +1,29 @@
+package se.yolean.kafka.topic.client.service;
+
+public class StoreInitializationException extends Exception {
+
+ public StoreInitializationException() {
+ // TODO Auto-generated constructor stub
+ }
+
+ public StoreInitializationException(String arg0) {
+ super(arg0);
+ // TODO Auto-generated constructor stub
+ }
+
+ public StoreInitializationException(Throwable arg0) {
+ super(arg0);
+ // TODO Auto-generated constructor stub
+ }
+
+ public StoreInitializationException(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ // TODO Auto-generated constructor stub
+ }
+
+ public StoreInitializationException(String arg0, Throwable arg1, boolean arg2, boolean arg3) {
+ super(arg0, arg1, arg2, arg3);
+ // TODO Auto-generated constructor stub
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/client/service/TopicDeclarationsTopicCheck.java b/src/main/java/se/yolean/kafka/topic/client/service/TopicDeclarationsTopicCheck.java
new file mode 100644
index 0000000..85a4c1b
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/client/service/TopicDeclarationsTopicCheck.java
@@ -0,0 +1,155 @@
+package se.yolean.kafka.topic.client.service;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicDeclarationsTopicCheck {
+
+ private static final Logger log = LoggerFactory.getLogger(TopicDeclarationsTopicCheck.class);
+
+ @Inject
+ @Named("admin")
+ private Properties props;
+
+ @Inject
+ @Named("config:adminInitTimeoutMs")
+ private int initTimeout;
+
+ @Inject
+ @Named("config:adminTopic")
+ private String topic;
+
+ @Inject
+ @Named("config:adminTopicDesiredReplicationFactor")
+ private int desiredReplicationFactor;
+
+ void createOrVerifyAdminTopic() throws StoreInitializationException {
+
+ try (AdminClient admin = AdminClient.create(props)) {
+ //
+ Set allTopics = admin.listTopics().names().get(initTimeout, TimeUnit.MILLISECONDS);
+ if (allTopics.contains(topic)) {
+ verifySchemaTopic(admin);
+ } else {
+ createSchemaTopic(admin);
+ }
+ } catch (TimeoutException e) {
+ throw new StoreInitializationException(
+ "Timed out trying to create or validate topic declarations topic configuration",
+ e
+ );
+ } catch (InterruptedException | ExecutionException e) {
+ throw new StoreInitializationException(
+ "Failed trying to create or validate topic declarations topic configuration",
+ e
+ );
+ }
+ }
+
+ private void createSchemaTopic(AdminClient admin) throws StoreInitializationException,
+ InterruptedException,
+ ExecutionException,
+ TimeoutException {
+ log.info("Creating schemas topic {}", topic);
+
+ int numLiveBrokers = admin.describeCluster().nodes()
+ .get(initTimeout, TimeUnit.MILLISECONDS).size();
+ if (numLiveBrokers <= 0) {
+ throw new StoreInitializationException("No live Kafka brokers");
+ }
+
+ int schemaTopicReplicationFactor = Math.min(numLiveBrokers, desiredReplicationFactor);
+ if (schemaTopicReplicationFactor < desiredReplicationFactor) {
+ log.warn("Creating the topic declarations topic "
+ + topic
+ + " using a replication factor of "
+ + schemaTopicReplicationFactor
+ + ", which is less than the desired one of "
+ + desiredReplicationFactor + ". If this is a production environment, it's "
+ + "crucial to add more brokers and increase the replication factor of the topic.");
+ }
+
+ NewTopic schemaTopicRequest = new NewTopic(topic, 1, (short) schemaTopicReplicationFactor);
+ schemaTopicRequest.configs(
+ Collections.singletonMap(
+ TopicConfig.CLEANUP_POLICY_CONFIG,
+ TopicConfig.CLEANUP_POLICY_COMPACT
+ )
+ );
+ try {
+ admin.createTopics(Collections.singleton(schemaTopicRequest)).all()
+ .get(initTimeout, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof TopicExistsException) {
+ log.warn("Topic {} exists, but was not listed. Concurrent operations?", topic);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ private void verifySchemaTopic(AdminClient admin) throws StoreInitializationException,
+ InterruptedException,
+ ExecutionException,
+ TimeoutException {
+ log.info("Validating schemas topic {}", topic);
+
+ Set topics = Collections.singleton(topic);
+ Map topicDescription = admin.describeTopics(topics)
+ .all().get(initTimeout, TimeUnit.MILLISECONDS);
+
+ TopicDescription description = topicDescription.get(topic);
+ final int numPartitions = description.partitions().size();
+ if (numPartitions != 1) {
+ throw new StoreInitializationException("The topic declarations topic " + topic + " should have only 1 "
+ + "partition but has " + numPartitions);
+ }
+
+ if (description.partitions().get(0).replicas().size() < desiredReplicationFactor) {
+ log.warn("The replication factor of the topic declarations topic "
+ + topic
+ + " is less than the desired one of "
+ + desiredReplicationFactor
+ + ". If this is a production environment, it's crucial to add more brokers and "
+ + "increase the replication factor of the topic.");
+ }
+
+ ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+
+ Map configs =
+ admin.describeConfigs(Collections.singleton(topicResource)).all()
+ .get(initTimeout, TimeUnit.MILLISECONDS);
+ Config topicConfigs = configs.get(topicResource);
+ String retentionPolicy = topicConfigs.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
+ if (retentionPolicy == null || !TopicConfig.CLEANUP_POLICY_COMPACT.equals(retentionPolicy)) {
+ log.error("The retention policy of the topic declarations topic " + topic + " is incorrect. "
+ + "You must configure the topic to 'compact' cleanup policy to avoid Kafka "
+ + "deleting your schemas after a week. "
+ + "Refer to Kafka documentation for more details on cleanup policies");
+
+ throw new StoreInitializationException("The retention policy of the topic declarations topic " + topic
+ + " is incorrect. Expected cleanup.policy to be "
+ + "'compact' but it is " + retentionPolicy);
+
+ }
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/manager/configure/TopicDeclarationsPollModule.java b/src/main/java/se/yolean/kafka/topic/manager/configure/TopicDeclarationsPollModule.java
new file mode 100644
index 0000000..4a4a92b
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/manager/configure/TopicDeclarationsPollModule.java
@@ -0,0 +1,24 @@
+package se.yolean.kafka.topic.manager.configure;
+
+import com.google.inject.AbstractModule;
+
+import se.yolean.kafka.topic.client.retryable.BrokerProbe;
+import se.yolean.kafka.topic.client.retryable.RestProxySetup;
+import se.yolean.kafka.topic.client.retryable.SchemaRegistrySetup;
+
+public class TopicDeclarationsPollModule extends AbstractModule {
+
+ public TopicDeclarationsPollModule(
+ BrokerProbe.KafkaStatus initResult,
+ SchemaRegistrySetup.AdminSchemaStatus adminSchemaStatus,
+ RestProxySetup.EndpointsStatus restEndpointsStatus
+ ) {
+ }
+
+ @Override
+ protected void configure() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/manager/tt/TopicsTopicWatcher.java b/src/main/java/se/yolean/kafka/topic/manager/tt/TopicsTopicWatcher.java
new file mode 100644
index 0000000..4f72e1f
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/manager/tt/TopicsTopicWatcher.java
@@ -0,0 +1,29 @@
+package se.yolean.kafka.topic.manager.tt;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import com.github.structlog4j.ILogger;
+import com.github.structlog4j.SLoggerFactory;
+
+public class TopicsTopicWatcher implements Runnable {
+
+ private final ILogger log = SLoggerFactory.getLogger(this.getClass());
+
+ @Inject
+ @Named("topic.declarations.consumer.polls.max")
+ private int pollsMax;
+
+ @Override
+ public void run() {
+ for (int i = 0; pollsMax == -1 || i < pollsMax; i++) {
+ log.debug("Here we'll be repeating the topic management loop");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("ouch", e);
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/mgmt/AdminSchemaUpdate.java b/src/main/java/se/yolean/kafka/topic/mgmt/AdminSchemaUpdate.java
new file mode 100644
index 0000000..b431f0c
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/mgmt/AdminSchemaUpdate.java
@@ -0,0 +1,79 @@
+package se.yolean.kafka.topic.mgmt;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import javax.inject.Inject;
+
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+
+import com.github.structlog4j.ILogger;
+import com.github.structlog4j.SLoggerFactory;
+
+import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+
+public class AdminSchemaUpdate {
+
+ private ILogger log = SLoggerFactory.getLogger(this.getClass());
+
+ private SchemaRegistryClient client;
+
+ // TODO configurable
+ private String topicDeclarationSchemaName = "topic_declaration";
+
+ private Schema topicSchema;
+
+ @Inject
+ public AdminSchemaUpdate(SchemaRegistryClient client, Schema topicSchema) {
+ this.client = client;
+ this.topicSchema = topicSchema;
+ }
+
+ public void createOrVerifyAdminSchema() {
+ SchemaMetadata existing;
+ try {
+ existing = getCurrentSchema();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (RestClientException e) {
+ throw new RuntimeException(e);
+ }
+ if (existing != null) {
+ log.info("Stored schema is up-to-date", "id", existing.getId(), "version", existing.getVersion());
+ return;
+ }
+ try {
+ uploadCurrentSchema();
+ } catch (IOException e) {
+ throw new RuntimeException("Schema upload error", e);
+ } catch (RestClientException e) {
+ throw new RuntimeException("Schema upload error", e);
+ }
+ }
+
+ public SchemaMetadata getCurrentSchema() throws IOException, RestClientException {
+ Collection allSubjects = client.getAllSubjects();
+ for (String subject : allSubjects) {
+ SchemaMetadata metadata = client.getLatestSchemaMetadata(subject);
+ log.debug("Found schema", "subject", subject, "id", metadata.getId(), "version", metadata.getVersion());
+ Schema latestSchema = client.getBySubjectAndId(subject, metadata.getId());
+ if (topicSchema.equals(latestSchema)) {
+ log.info("This is the topic schema!", "subject", subject, "id", metadata.getId(), "version", metadata.getVersion(), "fields", latestSchema.getFields().size());
+ return metadata;
+ } else {
+ log.info("Not the topic schema", "subject", subject, "id", metadata.getId());
+ }
+ }
+ return null;
+ }
+
+ public void uploadCurrentSchema() throws IOException, RestClientException {
+ log.info("Uploading current schema to registry", "subject", topicDeclarationSchemaName, "json", topicSchema.toString());
+ int register = client.register(topicDeclarationSchemaName, topicSchema);
+ log.info("Uploaded schema", "id", register);
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/mgmt/DeclarationLogConsumer.java b/src/main/java/se/yolean/kafka/topic/mgmt/DeclarationLogConsumer.java
new file mode 100644
index 0000000..16ea6ae
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/mgmt/DeclarationLogConsumer.java
@@ -0,0 +1,9 @@
+package se.yolean.kafka.topic.mgmt;
+
+public class DeclarationLogConsumer {
+
+ public DeclarationLogConsumer() {
+ // TODO Auto-generated constructor stub
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/mgmt/SchemaRegistryClientProvider.java b/src/main/java/se/yolean/kafka/topic/mgmt/SchemaRegistryClientProvider.java
new file mode 100644
index 0000000..3f409d6
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/mgmt/SchemaRegistryClientProvider.java
@@ -0,0 +1,24 @@
+package se.yolean.kafka.topic.mgmt;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Provider;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+
+public class SchemaRegistryClientProvider implements Provider {
+
+ public static final int INITIAL_MAP_CAPACITY = 10;
+
+ @Inject
+ @Named("config:schemaRegistryUrl")
+ private String schemaRegistryBaseUrls;
+
+ @Override
+ public SchemaRegistryClient get() {
+ // are there other impls?
+ return new CachedSchemaRegistryClient(schemaRegistryBaseUrls, INITIAL_MAP_CAPACITY);
+ }
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/mgmt/TopicConsumerLoop.java b/src/main/java/se/yolean/kafka/topic/mgmt/TopicConsumerLoop.java
new file mode 100644
index 0000000..47a6e79
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/mgmt/TopicConsumerLoop.java
@@ -0,0 +1,10 @@
+package se.yolean.kafka.topic.mgmt;
+
+import org.apache.kafka.clients.consumer.Consumer;
+
+import se.yolean.kafka.topic.declaration.Topic;
+
+public class TopicConsumerLoop {
+
+
+}
diff --git a/src/main/java/se/yolean/kafka/topic/mgmt/TopicSchemaSourceProvider.java b/src/main/java/se/yolean/kafka/topic/mgmt/TopicSchemaSourceProvider.java
new file mode 100644
index 0000000..387069c
--- /dev/null
+++ b/src/main/java/se/yolean/kafka/topic/mgmt/TopicSchemaSourceProvider.java
@@ -0,0 +1,16 @@
+package se.yolean.kafka.topic.mgmt;
+
+import javax.inject.Provider;
+
+import org.apache.avro.Schema;
+
+import se.yolean.kafka.topic.declaration.Topic;
+
+public class TopicSchemaSourceProvider implements Provider {
+
+ @Override
+ public Schema get() {
+ return Topic.SCHEMA$;
+ }
+
+}
diff --git a/src/main/resources/default.properties b/src/main/resources/default.properties
new file mode 100644
index 0000000..bdcaae3
--- /dev/null
+++ b/src/main/resources/default.properties
@@ -0,0 +1,11 @@
+prometheus.exporter.port=5000
+
+# Minimum brokers available to consider startup completed
+brokers.describe.available.min=1
+
+# AdminClient timeouts, see BrokerProbeIntegrationTest for details
+brokers.describe.timeout.ms=25001
+brokers.describe.get.timeout.ms=25002
+
+# For testing. Default is to keep polling forever
+topic.declarations.consumer.polls.max=-1
diff --git a/src/test/java/se/yolean/kafka/topic/client/config/ConfigModuleTest.java b/src/test/java/se/yolean/kafka/topic/client/config/ConfigModuleTest.java
new file mode 100644
index 0000000..66cbbb5
--- /dev/null
+++ b/src/test/java/se/yolean/kafka/topic/client/config/ConfigModuleTest.java
@@ -0,0 +1,34 @@
+package se.yolean.kafka.topic.client.config;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class ConfigModuleTest {
+
+ @Test
+ public void test() {
+ Properties props = new Properties();
+ props.setProperty("bootstrap.servers", "PLAINTEXT://my-test-value:9092");
+ Injector injector = Guice.createInjector(new ConfigModule(props));
+ TestService1 t1 = injector.getInstance(TestService1.class);
+ assertEquals("PLAINTEXT://my-test-value:9092", t1.boostrapServers);
+ }
+
+ static class TestService1 {
+
+ @Inject
+ @Named("bootstrap.servers")
+ private String boostrapServers;
+
+ }
+
+}
diff --git a/src/test/java/se/yolean/kafka/topic/client/config/DocumentedPropertyTest.java b/src/test/java/se/yolean/kafka/topic/client/config/DocumentedPropertyTest.java
new file mode 100644
index 0000000..84ada36
--- /dev/null
+++ b/src/test/java/se/yolean/kafka/topic/client/config/DocumentedPropertyTest.java
@@ -0,0 +1,22 @@
+package se.yolean.kafka.topic.client.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class DocumentedPropertyTest {
+
+ @Test
+ public void testBootstrapServers() {
+ assertTrue(DocumentedProperty.has("bootstrap.servers"));
+ assertTrue("Should be listed as required property", DocumentedProperty.getRequired()
+ .stream().anyMatch(key -> "bootstrap.servers".equals(key)));
+ DocumentedProperty p = DocumentedProperty.get("bootstrap.servers");
+ assertNotNull(".set get should return the property", p);
+ assertEquals(DocumentedProperty.Type.Str, p.getType());
+ assertNotNull(p.getDescription());
+ }
+
+}
diff --git a/src/test/resources/simplelogger.properties b/src/test/resources/simplelogger.properties
new file mode 100644
index 0000000..5534212
--- /dev/null
+++ b/src/test/resources/simplelogger.properties
@@ -0,0 +1,2 @@
+org.slf4j.simpleLogger.defaultLogLevel=info
+org.slf4j.simpleLogger.log.se.yolean=debug
\ No newline at end of file