Skip to content

Commit

Permalink
GCP Commons Extraction-
Browse files Browse the repository at this point in the history
* Extracting new java module (gcp-commons) for use in both the GCP Storage connectors, and future planned connectors.
* Rewriting GCP authentication code in Java.
* Including unit testing.
* Currently just in SBT - no gradle config created yet.
  • Loading branch information
davidsloan committed Apr 30, 2024
1 parent 9b1722f commit 91c0cc9
Show file tree
Hide file tree
Showing 91 changed files with 2,086 additions and 789 deletions.
17 changes: 17 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ThisBuild / scalaVersion := Dependencies.scalaVersion
lazy val subProjects: Seq[Project] = Seq(
`query-language`,
`java-common`,
`gcp-common`,
common,
`sql-common`,
`cloud-common`,
Expand Down Expand Up @@ -72,6 +73,21 @@ lazy val `java-common` = (project in file("java-connectors/kafka-connect-common"
.configureAssembly(false)
.configureTests(javaCommonTestDeps)

lazy val `gcp-common` = (project in file("java-connectors/kafka-connect-gcp-common"))
.dependsOn(`java-common`)
.settings(
settings ++
Seq(
name := "kafka-connect-gcp-common",
description := "GCP Commons Module",
libraryDependencies ++= kafkaConnectGcpCommonDeps,
publish / skip := true,
),
)
.configureAssembly(true)
.configureTests(javaCommonTestDeps)
.configureAntlr()

lazy val `sql-common` = (project in file("kafka-connect-sql-common"))
.dependsOn(`query-language`)
.dependsOn(`common`)
Expand Down Expand Up @@ -170,6 +186,7 @@ lazy val `azure-datalake` = (project in file("kafka-connect-azure-datalake"))

lazy val `gcp-storage` = (project in file("kafka-connect-gcp-storage"))
.dependsOn(common)
.dependsOn(`gcp-common`)
.dependsOn(`cloud-common` % "compile->compile;test->test;it->it")
.dependsOn(`test-common` % "test->compile")
.settings(
Expand Down
3 changes: 2 additions & 1 deletion java-connectors/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ bin/

### Lenses-specific ###
release/
gradle-modules.txt
gradle-modules.txt
**/src/main/gen/*
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.config.base;

import lombok.AllArgsConstructor;
import org.apache.kafka.common.config.types.Password;

import java.util.Map;
import java.util.Optional;

/**
* A wrapper for Kafka Connect properties that provides methods to retrieve property values.
*/
@AllArgsConstructor
public class ConfigMap {

private Map<String, Object> wrapped;

/**
* Retrieves a String property value associated with the given key.
*
* @param key the property key
* @return an {@link Optional} containing the property value if present, otherwise empty
*/
public Optional<String> getString(String key) {
return Optional.ofNullable((String) wrapped.get(key));
}

/**
* Retrieves a Password property value associated with the given key.
*
* @param key the property key
* @return an {@link Optional} containing the {@link Password} value if present, otherwise empty
*/
public Optional<Password> getPassword(String key) {
return Optional.ofNullable((Password) wrapped.get(key));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.config.base;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
@AllArgsConstructor
public class RetryConfig {

private int numberOfRetries;
private long errorRetryInterval;

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.config
case class RetryConfig(numberOfRetries: Int, errorRetryInterval: Long)
package io.lenses.streamreactor.common.config.base.intf;

public interface ConnectionConfig {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.config.base;

import org.apache.kafka.common.config.types.Password;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ConfigMapTest {

private ConfigMap configMap;

@BeforeEach
void setUp() {
// Create a sample map with test properties
Map<String, Object> testMap = new HashMap<>();
testMap.put("username", "user123");
testMap.put("password", new Password("secret"));

// Initialize ConfigMap with the test map
configMap = new ConfigMap(testMap);
}

@Test
void testGetString_existingKey_shouldReturnValue() {
// Test existing key
Optional<String> value = configMap.getString("username");

assertTrue(value.isPresent());
assertEquals("user123", value.get());
}

@Test
void testGetString_nonExistingKey_shouldReturnEmpty() {
// Test non-existing key
Optional<String> value = configMap.getString("invalidKey");

assertFalse(value.isPresent());
}

@Test
void testGetPassword_existingKey_shouldReturnPassword() {
// Test existing key for Password type
Optional<Password> password = configMap.getPassword("password");

assertTrue(password.isPresent());
assertEquals("secret", password.get().value());
}

@Test
void testGetPassword_nonExistingKey_shouldReturnEmpty() {
// Test non-existing key for Password type
Optional<Password> password = configMap.getPassword("invalidKey");

assertFalse(password.isPresent());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.gcp.common.auth;

import io.lenses.streamreactor.common.config.base.RetryConfig;
import io.lenses.streamreactor.common.config.base.intf.ConnectionConfig;
import io.lenses.streamreactor.connect.gcp.common.auth.mode.AuthMode;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

import java.util.Optional;

@Data
@Builder
@AllArgsConstructor
public class GCPConnectionConfig implements ConnectionConfig {

private Optional<String> projectId;
private Optional<String> quotaProjectId;
private AuthMode authMode;
private Optional<String> host;
private RetryConfig httpRetryConfig;
private HttpTimeoutConfig timeouts;

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.gcp.common.auth;

import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.Service;
import com.google.cloud.ServiceOptions;
import com.google.cloud.TransportOptions;
import com.google.cloud.http.HttpTransportOptions;
import io.lenses.streamreactor.common.config.base.RetryConfig;
import lombok.experimental.UtilityClass;
import lombok.val;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.Optional;
/**
* Utility class for configuring generic GCP service clients using a {@link GCPConnectionConfig}.
*/
@UtilityClass
public class GCPServiceBuilderConfigurer {

/**
* Configures a GCP service client builder with the provided {@link GCPConnectionConfig}.
*
* @param <X> Type representing the GCP service interface (e.g., Storage, BigQuery)
* @param <Y> Type representing the service options (e.g., StorageOptions, BigQueryOptions)
* @param <B> Type representing the service options builder (e.g., StorageOptions.Builder, BigQueryOptions.Builder)
* @param config The GCP connection configuration containing settings such as host, project ID, and authentication details.
* @param builder The builder instance of the GCP service client options.
* @return The configured builder instance with updated settings.
* @throws IOException if an error occurs during configuration, such as credential retrieval.
*/
public static <
X extends Service<Y>,
Y extends ServiceOptions<X, Y>,
B extends ServiceOptions.Builder<X, Y, B>
>
B configure(GCPConnectionConfig config, B builder) throws IOException {

config.getHost().ifPresent(builder::setHost);

config.getProjectId().ifPresent(builder :: setProjectId);

config.getQuotaProjectId().ifPresent(builder :: setQuotaProjectId);

val authMode = config.getAuthMode();

builder.setCredentials(authMode.getCredentials());

builder.setRetrySettings(createRetrySettings(config.getHttpRetryConfig()));

createTransportOptions(config.getTimeouts()).ifPresent(builder::setTransportOptions);

return builder;
}

private static Optional<TransportOptions> createTransportOptions(HttpTimeoutConfig timeoutConfig) {
if (timeoutConfig.getConnectionTimeout().isPresent() || timeoutConfig.getSocketTimeout().isPresent()) {
HttpTransportOptions.Builder httpTransportOptionsBuilder = HttpTransportOptions.newBuilder();
timeoutConfig.getSocketTimeout().ifPresent(sock ->
httpTransportOptionsBuilder.setReadTimeout(sock.intValue())
);
timeoutConfig.getConnectionTimeout().ifPresent(conn ->
httpTransportOptionsBuilder.setConnectTimeout(conn.intValue())
);
return Optional.of(httpTransportOptionsBuilder.build());
}
return Optional.empty();
}

private static RetrySettings createRetrySettings(RetryConfig httpRetryConfig) {

return RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(httpRetryConfig.getErrorRetryInterval()))
.setMaxRetryDelay(Duration.ofMillis(httpRetryConfig.getErrorRetryInterval() * 5))
.setMaxAttempts(httpRetryConfig.getNumberOfRetries())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.gcp.common.auth;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

import javax.swing.text.html.Option;
import java.util.Optional;

@Data
@Builder
@AllArgsConstructor
public class HttpTimeoutConfig {
private Optional<Long> socketTimeout;
private Optional<Long> connectionTimeout;
}
Loading

0 comments on commit 91c0cc9

Please sign in to comment.