Skip to content

Commit

Permalink
Merge branch 'master' into feat/datalakes_key_namer
Browse files Browse the repository at this point in the history
  • Loading branch information
stheppi authored Apr 29, 2024
2 parents 85f9180 + ae57b61 commit e0dcb60
Show file tree
Hide file tree
Showing 57 changed files with 354 additions and 411 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ jobs:
format: 'HTML'
args: >-
--failOnCVSS 5
--suppression https://raw.githubusercontent.com/${{ github.event.repository.owner.login }}/${{ github.event.repository.name }}/${{ steps.branch_name.outputs.tag }}${{ steps.branch_name.outputs.current_branch }}/suppression.xml
--suppression https://raw.githubusercontent.com/${{ github.event.pull_request.head.repo.owner.login || github.event.repository.owner.login }}/${{ github.event.repository.name }}/${{ steps.branch_name.outputs.tag }}${{ steps.branch_name.outputs.current_branch }}/suppression.xml
- name: Upload Test results
uses: actions/upload-artifact@master
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/java-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ jobs:
format: 'HTML'
args: >-
--failOnCVSS 5
--suppression https://raw.githubusercontent.com/${{ github.event.pull_request.head.repo.owner.login }}/${{github.event.repository.name}}/${{ steps.branch_name.outputs.tag }}${{ steps.branch_name.outputs.current_branch }}/suppression.xml
--suppression https://raw.githubusercontent.com/${{ github.event.pull_request.head.repo.owner.login || github.event.repository.owner.login }}/${{github.event.repository.name}}/${{ steps.branch_name.outputs.tag }}${{ steps.branch_name.outputs.current_branch }}/suppression.xml
- name: Upload Test results
uses: actions/upload-artifact@master
Expand Down
18 changes: 17 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ ThisBuild / scalaVersion := Dependencies.scalaVersion

lazy val subProjects: Seq[Project] = Seq(
`query-language`,
`java-common`,
common,
`sql-common`,
`cloud-common`,
Expand Down Expand Up @@ -58,6 +59,19 @@ lazy val `query-language` = (project in file("java-connectors/kafka-connect-quer
.configureTests(baseTestDeps)
.configureAntlr()

lazy val `java-common` = (project in file("java-connectors/kafka-connect-common"))
.settings(
settings ++
Seq(
name := "kafka-connect-java-common",
description := "Common components from java",
libraryDependencies ++= javaCommonDeps,
publish / skip := true,
),
)
.configureAssembly(false)
.configureTests(javaCommonTestDeps)

lazy val `sql-common` = (project in file("kafka-connect-sql-common"))
.dependsOn(`query-language`)
.dependsOn(`common`)
Expand All @@ -75,6 +89,7 @@ lazy val `sql-common` = (project in file("kafka-connect-sql-common"))

lazy val common = (project in file("kafka-connect-common"))
.dependsOn(`query-language`)
.dependsOn(`java-common`)
.settings(
settings ++
Seq(
Expand Down Expand Up @@ -459,7 +474,8 @@ val generateDepCheckModulesList = taskKey[Seq[File]]("generateDepCheckModulesLis
Compile / generateModulesList :=
new FileWriter(subProjects).generate((Compile / resourceManaged).value / "modules.txt")
Compile / generateDepCheckModulesList :=
new FileWriter(subProjects.tail).generate((Compile / resourceManaged).value / "depcheck-modules.txt")
new FileWriter(subProjects.filter(sp => !sp.base.asPath.startsWith("java-connectors/")))
.generate((Compile / resourceManaged).value / "depcheck-modules.txt")
Compile / generateItModulesList :=
new FileWriter(
subProjects.filter(p => p.containsDir("src/it")),
Expand Down
2 changes: 1 addition & 1 deletion java-connectors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
allprojects {

group = "io.lenses.streamreactor"
version = "6.4.0-SNAPSHOT"
version = "7.0.1-SNAPSHOT"
description = "stream-reactor"

apply plugin: 'java'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.jar.Attributes;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
import java.util.stream.Collectors;

/**
* Class that reads JAR Manifest files so we can easily get some of the properties from it.
Expand All @@ -44,51 +47,48 @@ public class JarManifest {
private static final String UNKNOWN = "unknown";
private static final String NEW_LINE = System.getProperty("line.separator");
private static final String SEMICOLON = ":";
private final Map<String, String> jarAttributes = new HashMap<>();
private Map<String, String> jarAttributes = new HashMap<>();

/**
* Creates JarManifest.
* @param location Jar file location
*/
public JarManifest(URL location) {
Manifest manifest;

try (JarFile jarFile = new JarFile(new File(location.toURI()))) {
manifest = jarFile.getManifest();
try {
File file = new File(location.toURI());
if (file.isFile()) {
try (JarFile jarFile = new JarFile(file)) {
ofNullable(jarFile.getManifest()).flatMap(mf -> of(mf.getMainAttributes()))
.ifPresent(mainAttrs -> jarAttributes = extractMainAttributes(mainAttrs));
}
}
} catch (URISyntaxException | IOException e) {
throw new ConnectorStartupException(e);
}
extractMainAttributes(manifest.getMainAttributes());
}

/**
* Creates JarManifest.
* @param jarFile
*/
public JarManifest(JarFile jarFile) {
Manifest manifest;
try {
Optional<JarFile> jarFileOptional = of(jarFile);
manifest = jarFileOptional.get().getManifest();
} catch (NullPointerException | IOException e) {
throw new ConnectorStartupException(e);
Optional<JarFile> jarFileOptional = ofNullable(jarFile);
if (jarFileOptional.isPresent()) {
try (JarFile jf = jarFileOptional.get()) {
ofNullable(jf.getManifest()).flatMap(mf -> of(mf.getMainAttributes()))
.ifPresent(mainAttrs -> jarAttributes = extractMainAttributes(mainAttrs));
} catch (IOException e) {
throw new ConnectorStartupException(e);
}
}
extractMainAttributes(manifest.getMainAttributes());
}

private void extractMainAttributes(Attributes mainAttributes) {
jarAttributes.put(REACTOR_VER.getAttributeName(),
ofNullable(mainAttributes.getValue(REACTOR_VER.getAttributeName())).orElse(UNKNOWN));
jarAttributes.put(KAFKA_VER.getAttributeName(),
ofNullable(mainAttributes.getValue(KAFKA_VER.getAttributeName())).orElse(UNKNOWN));
jarAttributes.put(GIT_REPO.getAttributeName(),
ofNullable(mainAttributes.getValue(GIT_REPO.getAttributeName())).orElse(UNKNOWN));
jarAttributes.put(GIT_HASH.getAttributeName(),
ofNullable(mainAttributes.getValue(GIT_HASH.getAttributeName())).orElse(UNKNOWN));
jarAttributes.put(GIT_TAG.getAttributeName(),
ofNullable(mainAttributes.getValue(GIT_TAG.getAttributeName())).orElse(UNKNOWN));
jarAttributes.put(REACTOR_DOCS.getAttributeName(),
ofNullable(mainAttributes.getValue(REACTOR_DOCS.getAttributeName())).orElse(UNKNOWN));
private Map<String, String> extractMainAttributes(Attributes mainAttributes) {
return Collections.unmodifiableMap(Arrays.stream(ManifestAttributes.values())
.collect(Collectors.toMap(ManifestAttributes::getAttributeName,
manifestAttribute ->
ofNullable(mainAttributes.getValue(manifestAttribute.getAttributeName())).orElse(UNKNOWN))
));
}

/**
Expand All @@ -103,18 +103,12 @@ public String getVersion() {
*/
public String buildManifestString() {
StringBuilder manifestBuilder = new StringBuilder();
manifestBuilder.append(REACTOR_VER.attributeName).append(SEMICOLON)
.append(jarAttributes.get(REACTOR_VER.getAttributeName())).append(NEW_LINE);
manifestBuilder.append(KAFKA_VER.attributeName).append(SEMICOLON)
.append(jarAttributes.get(KAFKA_VER.getAttributeName())).append(NEW_LINE);
manifestBuilder.append(GIT_REPO.attributeName).append(SEMICOLON)
.append(jarAttributes.get(GIT_REPO.getAttributeName())).append(NEW_LINE);
manifestBuilder.append(GIT_HASH.attributeName).append(SEMICOLON)
.append(jarAttributes.get(GIT_HASH.getAttributeName())).append(NEW_LINE);
manifestBuilder.append(GIT_TAG.attributeName).append(SEMICOLON)
.append(jarAttributes.get(GIT_TAG.getAttributeName())).append(NEW_LINE);
manifestBuilder.append(REACTOR_DOCS.attributeName).append(SEMICOLON)
.append(jarAttributes.get(REACTOR_DOCS.getAttributeName())).append(NEW_LINE);
List<ManifestAttributes> attributesInStringOrder =
List.of(REACTOR_VER, KAFKA_VER, GIT_REPO, GIT_HASH, GIT_TAG, REACTOR_DOCS);
attributesInStringOrder.forEach(
attribute -> manifestBuilder.append(attribute.attributeName).append(SEMICOLON)
.append(jarAttributes.get(attribute.getAttributeName())).append(NEW_LINE)
);
return manifestBuilder.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.lenses.streamreactor.common.util;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -30,6 +31,7 @@
class JarManifestTest {

private static final String UNKNOWN = "unknown";
private static final String EMPTY_STRING = "";

JarManifest testObj;

Expand Down Expand Up @@ -79,4 +81,15 @@ void getVersionShouldReturnUnknownVersionIfNotIncludedInManifest() throws IOExce
verify(attributes).getValue(ManifestAttributes.REACTOR_VER.getAttributeName());
assertEquals(UNKNOWN, streamReactorVersion);
}

@Test
void getVersionShouldReturnDefaultIfFileProvidedIsNotJar() {
//given

//when
testObj = new JarManifest(getClass().getProtectionDomain().getCodeSource().getLocation());

//then
assertThat(testObj.getVersion()).isEqualTo(EMPTY_STRING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.lenses.streamreactor.connect.aws.s3.sink

import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.common.utils.JarManifestProvided
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3ConsumerGroupsSinkConfigDef
import io.lenses.streamreactor.connect.cloud.common.config.TaskDistributor
Expand All @@ -29,13 +29,10 @@ import java.util
/**
* A connector which stores the latest Kafka consumer group offset from "__consumer_offsets" topic in S3.
*/
class S3ConsumerGroupsSinkConnector extends SinkConnector with LazyLogging {
class S3ConsumerGroupsSinkConnector extends SinkConnector with LazyLogging with JarManifestProvided {

private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
private val props: util.Map[String, String] = new util.HashMap[String, String]()

override def version(): String = manifest.version()

override def taskClass(): Class[_ <: Task] = classOf[S3ConsumerGroupsSinkTask]

override def config(): ConfigDef = S3ConsumerGroupsSinkConfigDef.config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package io.lenses.streamreactor.connect.aws.s3.sink

import cats.implicits.toShow
import io.lenses.streamreactor.common.errors.ErrorHandler
import io.lenses.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.common.util.AsciiArtPrinter.printAsciiHeader
import io.lenses.streamreactor.common.utils.JarManifestProvided
import io.lenses.streamreactor.connect.aws.s3.auth.AwsS3ClientCreator
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.CONNECTOR_PREFIX
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3ConsumerGroupsSinkConfig
Expand All @@ -45,15 +45,11 @@ import scala.jdk.CollectionConverters.MapHasAsScala
* But since the s3 key is unique for group-topic-partition the last write will be the latest offset.
*/

class S3ConsumerGroupsSinkTask extends SinkTask with ErrorHandler {

private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
class S3ConsumerGroupsSinkTask extends SinkTask with ErrorHandler with JarManifestProvided {

private var connectorTaskId: ConnectorTaskId = _
private var writerManager: ConsumerGroupsWriter = _

override def version(): String = manifest.version()

override def start(fallbackProps: util.Map[String, String]): Unit = {

printAsciiHeader(manifest, "/aws-s3-cg-sink-ascii.txt")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package io.lenses.streamreactor.connect.aws.s3.sink

import io.lenses.streamreactor.common.utils.JarManifest
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.common.utils.JarManifestProvided
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfigDef
import io.lenses.streamreactor.connect.cloud.common.config.TaskDistributor
Expand All @@ -26,13 +26,10 @@ import org.apache.kafka.connect.sink.SinkConnector

import java.util

class S3SinkConnector extends SinkConnector with LazyLogging {
class S3SinkConnector extends SinkConnector with LazyLogging with JarManifestProvided {

private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
private val props: util.Map[String, String] = new util.HashMap[String, String]()

override def version(): String = manifest.version()

override def taskClass(): Class[_ <: Task] = classOf[S3SinkTask]

override def config(): ConfigDef = S3SinkConfigDef.config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.lenses.streamreactor.connect.aws.s3.sink

import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.common.util.JarManifest
import io.lenses.streamreactor.connect.aws.s3.auth.AwsS3ClientCreator
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator
Expand All @@ -33,7 +33,7 @@ class S3SinkTask
extends CloudSinkTask[S3FileMetadata, S3SinkConfig, S3Client](
S3ConfigSettings.CONNECTOR_PREFIX,
"/aws-s3-sink-ascii.txt",
JarManifest(S3SinkTask.getClass.getProtectionDomain.getCodeSource.getLocation),
new JarManifest(S3SinkTask.getClass.getProtectionDomain.getCodeSource.getLocation),
) {

val writerManagerCreator = new WriterManagerCreator[S3FileMetadata, S3SinkConfig]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package io.lenses.streamreactor.connect.aws.s3.source

import io.lenses.streamreactor.common.utils.JarManifest
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.common.utils.JarManifestProvided
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.CONNECTOR_PREFIX
import io.lenses.streamreactor.connect.aws.s3.source.config.S3SourceConfigDef
import io.lenses.streamreactor.connect.cloud.common.config.TaskDistributor
Expand All @@ -27,13 +27,10 @@ import org.apache.kafka.connect.source.SourceConnector

import java.util

class S3SourceConnector extends SourceConnector with LazyLogging {
class S3SourceConnector extends SourceConnector with LazyLogging with JarManifestProvided {

private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
private val props: util.Map[String, String] = new util.HashMap[String, String]()

override def version(): String = manifest.version()

override def taskClass(): Class[_ <: Task] = classOf[S3SourceTask]

override def config(): ConfigDef = S3SourceConfigDef.config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package io.lenses.streamreactor.connect.datalake.sink

import io.lenses.streamreactor.common.utils.JarManifest
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.common.utils.JarManifestProvided
import io.lenses.streamreactor.connect.datalake.config.AzureConfigSettings
import io.lenses.streamreactor.connect.datalake.sink.config.DatalakeSinkConfigDef
import io.lenses.streamreactor.connect.cloud.common.config.TaskDistributor
Expand All @@ -26,13 +26,10 @@ import org.apache.kafka.connect.sink.SinkConnector

import java.util

class DatalakeSinkConnector extends SinkConnector with LazyLogging {
class DatalakeSinkConnector extends SinkConnector with LazyLogging with JarManifestProvided {

private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
private val props: util.Map[String, String] = new util.HashMap[String, String]()

override def version(): String = manifest.version()

override def taskClass(): Class[_ <: Task] = classOf[DatalakeSinkTask]

override def config(): ConfigDef = DatalakeSinkConfigDef.config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.lenses.streamreactor.connect.datalake.sink

import com.azure.storage.file.datalake.DataLakeServiceClient
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.common.util.JarManifest
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
Expand All @@ -31,7 +31,7 @@ class DatalakeSinkTask
extends CloudSinkTask[DatalakeFileMetadata, DatalakeSinkConfig, DataLakeServiceClient](
AzureConfigSettings.CONNECTOR_PREFIX,
"/datalake-sink-ascii.txt",
JarManifest(DatalakeSinkTask.getClass.getProtectionDomain.getCodeSource.getLocation),
new JarManifest(DatalakeSinkTask.getClass.getProtectionDomain.getCodeSource.getLocation),
) {

override def createClient(config: DatalakeSinkConfig): Either[Throwable, DataLakeServiceClient] =
Expand Down
Loading

0 comments on commit e0dcb60

Please sign in to comment.