Skip to content

Commit

Permalink
Merge branch 'master' into feat/azure-servicebus-connector
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan authored May 28, 2024
2 parents a745853 + 2dc3fbe commit 5295a92
Show file tree
Hide file tree
Showing 78 changed files with 2,915 additions and 152 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ lazy val `query-language` = (project in file("java-connectors/kafka-connect-quer
),
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.configureTests(baseTestDeps ++ javaCommonTestDeps)
.configureAntlr()

lazy val `java-common` = (project in file("java-connectors/kafka-connect-common"))
.dependsOn(`query-language`)
.settings(
settings ++
Seq(
Expand Down
3 changes: 3 additions & 0 deletions java-connectors/kafka-connect-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ project(":kafka-connect-common") {
}

dependencies {

implementation project(":kafka-connect-query-language")

//apache kafka
api group: 'org.apache.kafka', name: 'connect-json', version: kafkaVersion
api group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
Expand Down
8 changes: 5 additions & 3 deletions java-connectors/kafka-connect-gcp-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ project(":kafka-connect-gcp-common") {
}

ext {
gcpCloudVersion = "2.37.0"
gcpCloudVersion = "26.38.0"
}

dependencies {
Expand All @@ -16,7 +16,9 @@ project(":kafka-connect-gcp-common") {
api group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion

//gcp
implementation group: 'com.google.cloud', name: 'google-cloud-core', version: gcpCloudVersion
implementation group: 'com.google.cloud', name: 'google-cloud-core-http', version: gcpCloudVersion
implementation platform(group: 'com.google.cloud', name: 'libraries-bom', version: gcpCloudVersion)

implementation 'com.google.cloud:google-cloud-core'
implementation 'com.google.cloud:google-cloud-core-http'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.io.IOException;
import java.util.Optional;
import java.util.function.Supplier;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.experimental.UtilityClass;
import lombok.val;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -32,7 +35,7 @@
/**
* Utility class for configuring generic GCP service clients using a {@link GCPConnectionConfig}.
*/
@UtilityClass
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class GCPServiceBuilderConfigurer {

/**
Expand Down Expand Up @@ -97,7 +100,7 @@ private static RetrySettings createRetrySettings(RetryConfig httpRetryConfig) {
.build();
}

private Supplier<ConfigException> createConfigException(String message) {
private static Supplier<ConfigException> createConfigException(String message) {
return () -> new ConfigException(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.lenses.streamreactor.connect.gcp.common.config;

import org.apache.kafka.common.config.ConfigDef;

import io.lenses.streamreactor.common.config.base.ConfigSettings;
import io.lenses.streamreactor.common.config.base.RetryConfig;
import io.lenses.streamreactor.common.config.base.model.ConnectorPrefix;
Expand All @@ -23,7 +25,6 @@
import io.lenses.streamreactor.connect.gcp.common.auth.HttpTimeoutConfig;
import lombok.Getter;
import lombok.val;
import org.apache.kafka.common.config.ConfigDef;

/**
* Configuration settings for connecting to Google Cloud Platform (GCP) services.
Expand All @@ -34,13 +35,13 @@ public class GCPSettings implements ConfigSettings<GCPConnectionConfig> {

public static final String EMPTY_STRING = "";

private final String gcpProjectId;
private final String gcpQuotaProjectId;
private final String host;
private final String httpErrorRetryInterval;
private final String httpNbrOfRetries;
private final String httpSocketTimeout;
private final String httpConnectionTimeout;
private final String gcpProjectIdKey;
private final String gcpQuotaProjectIdKey;
private final String hostKey;
private final String httpErrorRetryIntervalKey;
private final String httpNbrOfRetriesKey;
private final String httpSocketTimeoutKey;
private final String httpConnectionTimeoutKey;

public static final Long HTTP_ERROR_RETRY_INTERVAL_DEFAULT = 50L;
public static final Integer HTTP_NUMBER_OF_RETIRES_DEFAULT = 5;
Expand All @@ -55,13 +56,13 @@ public class GCPSettings implements ConfigSettings<GCPConnectionConfig> {
* @param connectorPrefix the prefix used for configuration keys
*/
public GCPSettings(ConnectorPrefix connectorPrefix) {
gcpProjectId = connectorPrefix.prefixKey("gcp.project.id");
gcpQuotaProjectId = connectorPrefix.prefixKey("gcp.quota.project.id");
host = connectorPrefix.prefixKey("endpoint");
httpErrorRetryInterval = connectorPrefix.prefixKey("http.retry.interval");
httpNbrOfRetries = connectorPrefix.prefixKey("http.max.retries");
httpSocketTimeout = connectorPrefix.prefixKey("http.socket.timeout");
httpConnectionTimeout = connectorPrefix.prefixKey("http.connection.timeout");
gcpProjectIdKey = connectorPrefix.prefixKey("gcp.project.id");
gcpQuotaProjectIdKey = connectorPrefix.prefixKey("gcp.quota.project.id");
hostKey = connectorPrefix.prefixKey("endpoint");
httpErrorRetryIntervalKey = connectorPrefix.prefixKey("http.retry.interval");
httpNbrOfRetriesKey = connectorPrefix.prefixKey("http.max.retries");
httpSocketTimeoutKey = connectorPrefix.prefixKey("http.socket.timeout");
httpConnectionTimeoutKey = connectorPrefix.prefixKey("http.connection.timeout");

authModeSettings = new AuthModeSettings(connectorPrefix);
}
Expand All @@ -77,20 +78,20 @@ public ConfigDef withSettings(ConfigDef configDef) {
val conf =
configDef
.define(
gcpProjectId,
gcpProjectIdKey,
ConfigDef.Type.STRING,
EMPTY_STRING,
ConfigDef.Importance.HIGH,
"GCP Project ID")
.define(
gcpQuotaProjectId,
gcpQuotaProjectIdKey,
ConfigDef.Type.STRING,
EMPTY_STRING,
ConfigDef.Importance.HIGH,
"GCP Quota Project ID")
.define(host, ConfigDef.Type.STRING, EMPTY_STRING, ConfigDef.Importance.LOW, "GCP Host")
.define(hostKey, ConfigDef.Type.STRING, EMPTY_STRING, ConfigDef.Importance.LOW, "GCP Host")
.define(
httpNbrOfRetries,
httpNbrOfRetriesKey,
ConfigDef.Type.INT,
HTTP_NUMBER_OF_RETIRES_DEFAULT,
ConfigDef.Importance.MEDIUM,
Expand All @@ -99,9 +100,9 @@ public ConfigDef withSettings(ConfigDef configDef) {
"Error",
2,
ConfigDef.Width.LONG,
httpNbrOfRetries)
httpNbrOfRetriesKey)
.define(
httpErrorRetryInterval,
httpErrorRetryIntervalKey,
ConfigDef.Type.LONG,
HTTP_ERROR_RETRY_INTERVAL_DEFAULT,
ConfigDef.Importance.MEDIUM,
Expand All @@ -110,15 +111,15 @@ public ConfigDef withSettings(ConfigDef configDef) {
"Error",
3,
ConfigDef.Width.LONG,
httpErrorRetryInterval)
httpErrorRetryIntervalKey)
.define(
httpSocketTimeout,
httpSocketTimeoutKey,
ConfigDef.Type.LONG,
HTTP_SOCKET_TIMEOUT_DEFAULT,
ConfigDef.Importance.LOW,
"Socket timeout (ms)")
.define(
httpConnectionTimeout,
httpConnectionTimeoutKey,
ConfigDef.Type.LONG,
HTTP_CONNECTION_TIMEOUT_DEFAULT,
ConfigDef.Importance.LOW,
Expand All @@ -130,19 +131,19 @@ public ConfigDef withSettings(ConfigDef configDef) {
public GCPConnectionConfig parseFromConfig(ConfigSource configSource) {
val builder =
GCPConnectionConfig.builder().authMode(authModeSettings.parseFromConfig(configSource));
configSource.getString(gcpProjectId).ifPresent(builder::projectId);
configSource.getString(gcpQuotaProjectId).ifPresent(builder::quotaProjectId);
configSource.getString(host).ifPresent(builder::host);
configSource.getString(gcpProjectIdKey).ifPresent(builder::projectId);
configSource.getString(gcpQuotaProjectIdKey).ifPresent(builder::quotaProjectId);
configSource.getString(hostKey).ifPresent(builder::host);

val retryConfig =
new RetryConfig(
configSource.getInt(httpNbrOfRetries).orElse(HTTP_NUMBER_OF_RETIRES_DEFAULT),
configSource.getLong(httpErrorRetryInterval).orElse(HTTP_ERROR_RETRY_INTERVAL_DEFAULT));
configSource.getInt(httpNbrOfRetriesKey).orElse(HTTP_NUMBER_OF_RETIRES_DEFAULT),
configSource.getLong(httpErrorRetryIntervalKey).orElse(HTTP_ERROR_RETRY_INTERVAL_DEFAULT));

val timeoutConfig =
new HttpTimeoutConfig(
configSource.getLong(httpSocketTimeout).orElse(null),
configSource.getLong(httpConnectionTimeout).orElse(null));
configSource.getLong(httpSocketTimeoutKey).orElse(null),
configSource.getLong(httpConnectionTimeoutKey).orElse(null));

builder.httpRetryConfig(retryConfig);
builder.timeouts(timeoutConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.io.ByteStreams;
import java.io.File;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import lombok.experimental.UtilityClass;

@UtilityClass
import com.google.common.io.ByteStreams;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class TestFileUtil {

static String streamToString(InputStream inputStream) throws Exception {
Expand Down
29 changes: 29 additions & 0 deletions java-connectors/kafka-connect-gcp-pubsub/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
project(":kafka-connect-gcp-pubsub") {

test {
maxParallelForks = 1
}

dependencies {
implementation project(':kafka-connect-query-language')
implementation project(':kafka-connect-common')
implementation project(':kafka-connect-gcp-common')

//gcp
implementation platform(group: 'com.google.cloud', name: 'libraries-bom', version: project(':kafka-connect-gcp-common').ext.gcpCloudVersion)

implementation 'com.google.cloud:google-cloud-pubsub'
implementation 'io.grpc:grpc-core'
implementation 'io.grpc:grpc-netty-shaded'
implementation 'io.grpc:grpc-stub'
implementation 'io.grpc:grpc-protobuf-lite'

implementation 'com.google.protobuf:protobuf-java'
implementation 'com.google.api.grpc:proto-google-common-protos'

implementation 'io.perfmark:perfmark-api:0.27.0'

implementation(group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: caffeineVersion)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.gcp.pubsub.source;

import static io.lenses.streamreactor.common.util.AsciiArtPrinter.printAsciiHeader;

import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;

import io.lenses.streamreactor.common.util.JarManifest;
import io.lenses.streamreactor.common.util.TasksSplitter;
import io.lenses.streamreactor.connect.gcp.pubsub.source.configdef.PubSubConfigSettings;
import lombok.val;

/**
* GCPPubSubSourceConnector is a source connector for Google Cloud Pub/Sub.
* It is responsible for starting the connector, creating tasks, and stopping the connector.
*/
public class GCPPubSubSourceConnector extends SourceConnector {

private Map<String, String> props;

private final PubSubConfigSettings pubSubConfigSettings = new PubSubConfigSettings();

private final JarManifest jarManifest =
new JarManifest(getClass().getProtectionDomain().getCodeSource().getLocation());

@Override
public void start(Map<String, String> props) {
printAsciiHeader(jarManifest, "/gcp-pubsub-source-ascii.txt");
this.props = validateProps(props);
}

private Map<String, String> validateProps(Map<String, String> props) {
try {
val pubSubConfigDef = new PubSubConfigSettings();
val pubSubSourceConfig = pubSubConfigDef.parse(props);
pubSubSourceConfig.validateKcql();
return props;
} catch (Exception e) {
throw new ConnectException("Invalid connector properties configuration: " + e.getMessage(), e);
}
}

@Override
public Class<? extends Task> taskClass() {
return GCPPubSubSourceTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return TasksSplitter.splitByKcqlStatements(
maxTasks,
props,
PubSubConfigSettings.getKcqlSettings()
);
}

@Override
public void stop() {
// No implementation required!
}

@Override
public ConfigDef config() {
return pubSubConfigSettings.getConfigDef();
}

@Override
public String version() {
return jarManifest.getVersion();
}
}
Loading

0 comments on commit 5295a92

Please sign in to comment.