diff --git a/.github/workflows/java-build.yml b/.github/workflows/java-build.yml new file mode 100644 index 000000000..04bedeb28 --- /dev/null +++ b/.github/workflows/java-build.yml @@ -0,0 +1,135 @@ +name: CI-java +on: + pull_request: + workflow_call: + inputs: + version: + required: true + type: string + outputs: + java_modules: + description: "Stream reactor collection of java modules" + value: ${{ jobs.initiate-java-modules.outputs.java_matrix }} + +jobs: + + initiate-java-modules: + timeout-minutes: 5 + runs-on: ubuntu-latest + outputs: + java_matrix: ${{ steps.java-mods.outputs.java-matrix }} + steps: + - uses: actions/checkout@v4 + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + java-version: '17' + distribution: 'temurin' + cache: 'gradle' + - name: Generate modules lists + run: cd 'java-connectors' && ./gradlew releaseModuleList + env: + JVM_OPTS: -Xmx512m + - name: Read java modules lists + id: java-mods + run: | + echo "java-matrix=$(cat ./java-connectors/gradle-modules.txt)" >> $GITHUB_OUTPUT + + test: + needs: + - initiate-java-modules + strategy: + matrix: + module: ${{fromJSON(needs.initiate-java-modules.outputs.java_matrix)}} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + java-version: 17 + distribution: 'temurin' + + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + with: + gradle-version: 8.6 + + - name: Check License Headers and Test with Gradle + run: cd 'java-connectors' && ./gradlew ${{ matrix.module }}:test + + build-and-cache: + needs: + - test + - initiate-java-modules + strategy: + matrix: + module: ${{fromJSON(needs.initiate-java-modules.outputs.java_matrix)}} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + java-version: 17 + distribution: 'temurin' + cache: gradle + + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + with: + gradle-version: 8.6 + + - name: Execute Gradle build + run: cd 'java-connectors' && ./gradlew ${{ matrix.module }}:shadowJar --scan + + - name: Move to release folder + shell: bash + run: | + JAVA_RELEASE_FOLDER=java-connectors/release + JAVA_BUILD_FOLDER=java-connectors/${{ matrix.module }}/build/libs + mkdir -p $JAVA_RELEASE_FOLDER + cp $JAVA_BUILD_FOLDER/${{ matrix.module }}*.jar LICENSE $JAVA_RELEASE_FOLDER/ + + - name: Cache assembly + uses: actions/cache/save@v4 + with: + path: ./java-connectors/release/${{ matrix.module }}*.jar + key: assembly-java-${{ github.run_id }} + + jar-dependency-check: + needs: + - build-and-cache + - initiate-java-modules + timeout-minutes: 30 + runs-on: ubuntu-latest + strategy: + matrix: + module: ${{fromJSON(needs.initiate-java-modules.outputs.java_matrix)}} + steps: + - name: Restore assembly + uses: actions/cache/restore@v4 + with: + path: ./java-connectors/release/${{ matrix.module }}*.jar + key: assembly-java-${{ github.run_id }} + fail-on-cache-miss: true + + - name: Get branch names. + id: branch_name + uses: tj-actions/branch-names@v8 + - name: JAR Dependency Check + uses: dependency-check/Dependency-Check_Action@main + with: + project: kafka-connect-${{matrix.module}}-deps + path: ./java-connectors/release/${{ matrix.module }}*.jar + format: 'HTML' + args: >- + --failOnCVSS 5 + --suppression https://raw.githubusercontent.com/${{ github.event.pull_request.head.repo.owner.login }}/${{github.event.repository.name}}/${{ steps.branch_name.outputs.tag }}${{ steps.branch_name.outputs.current_branch }}/suppression.xml + + - name: Upload Test results + uses: actions/upload-artifact@master + with: + name: ${{matrix.module}}-depcheck-results + path: ${{github.workspace}}/reports \ No newline at end of file diff --git a/.github/workflows/java-release.yml b/.github/workflows/java-release.yml new file mode 100644 index 000000000..4738213cf --- /dev/null +++ b/.github/workflows/java-release.yml @@ -0,0 +1,95 @@ +name: Publish New Java Release +on: + push: + tags: + - "*" + workflow_dispatch: + +jobs: + validate-tag: + runs-on: ubuntu-latest + outputs: + draft_release: ${{ steps.get_tag.outputs.draft_release }} + tag: ${{ steps.get_tag.outputs.tag }} + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Get tag, release mode + shell: bash + id: get_tag + run: | + if [[ ${GITHUB_REF##*/} =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; + then + draft_release=false + elif [[ ${GITHUB_REF##*/} =~ ^[0-9]+\.[0-9]+\.[0-9]+(-(alpha|beta|rc)(\.[0-9]+)?)?(\+[A-Za-z0-9.]+)?$ ]]; + then + draft_release=true + else + echo "Exiting, github ref needs to be a tag with format x.y.z or x.y.z-(alpha|beta|rc)" + exit 1 + fi + echo "draft_release=$draft_release" >> $GITHUB_OUTPUT + echo "tag=${GITHUB_REF##*/}" >> $GITHUB_OUTPUT + + build: + needs: + - validate-tag + uses: ./.github/workflows/java-build.yml + with: + version: ${{ needs.validate-tag.outputs.tag }} + secrets: inherit + + create-release: + runs-on: ubuntu-latest + needs: + - validate-tag + - build + strategy: + # Avoid parallel uploads + max-parallel: 1 + # GitHub will NOT cancel all in-progress and queued jobs in the matrix if any job in the matrix fails, which could create inconsistencies. + # If any matrix job fails, the job will be marked as failure + fail-fast: false + matrix: + module: ${{fromJSON(needs.build.outputs.java_modules)}} + env: + DRAFT_RELEASE: '${{ needs.validate-tag.outputs.draft_release }}' + TAG: ${{ needs.validate-tag.outputs.tag }} + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + java-version: '17' + distribution: 'temurin' + cache: 'gradle' + + - name: Uncache assembly + uses: actions/cache/restore@v4 + with: + path: | + ./java-connectors/release/${{ matrix.module }}*.jar + key: assembly-java-${{ github.run_id }} + fail-on-cache-miss: true + + - name: Package Connector + shell: bash + run: | + JAVA_RELEASE_FOLDER=java-connectors/release + FOLDER=${{ matrix.module }}-${{ env.TAG }} + mkdir -p $FOLDER + cp $JAVA_RELEASE_FOLDER/${{ matrix.module }}*.jar LICENSE $FOLDER/ + zip -r "$FOLDER.zip" $FOLDER/ + + - name: Upload binaries to release + uses: svenstaro/upload-release-action@v2 + with: + file: ${{ matrix.module }}-${{ env.TAG }}.zip + asset_name: "${{ matrix.module }}-${{ env.TAG }}.zip" + release_name: 'Stream Reactor ${{ env.TAG }}' + prerelease: ${{ env.DRAFT_RELEASE }} \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 956b47966..a6fb59e27 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -22,14 +22,14 @@ jobs: shell: bash id: get_tag run: | - if [[ ${GITHUB_REF##*/} =~ ^[0-9]\.[0-9]\.[0-9]$ ]]; + if [[ ${GITHUB_REF##*/} =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then draft_release=false - elif [[ ${GITHUB_REF##*/} =~ ^[0-9]\.[0-9]\.[0-9]+(-(alpha|beta|rc)(\.[0-9]+)?)?(\+[A-Za-z0-9.]+)?$ ]]; + elif [[ ${GITHUB_REF##*/} =~ ^[0-9]+\.[0-9]+\.[0-9]+(-(alpha|beta|rc)(\.[0-9]+)?)?(\+[A-Za-z0-9.]+)?$ ]]; then draft_release=true else - echo "Exiting, github ref needs to be a tag with format x.y.z or x.y.z+(alpha|beta|rc)" + echo "Exiting, github ref needs to be a tag with format x.y.z or x.y.z-(alpha|beta|rc)" exit 1 fi echo "draft_release=$draft_release" >> $GITHUB_OUTPUT diff --git a/.gitignore b/.gitignore index d5b55b6dc..980db392b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +#Including java-specific ignores +#!include:java-connectors/.gitignore + .bsp stageman cass-test diff --git a/java-connectors/.gitignore b/java-connectors/.gitignore new file mode 100644 index 000000000..13047a0b9 --- /dev/null +++ b/java-connectors/.gitignore @@ -0,0 +1,47 @@ +.gradle +build/ +!gradle/wrapper/gradle-wrapper.jar +!**/src/main/**/build/ +!**/src/test/**/build/ + +### IntelliJ IDEA ### +.idea/* +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache +bin/ +!**/src/main/**/bin/ +!**/src/test/**/bin/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store + +### Lenses-specific ### +release/ +gradle-modules.txt \ No newline at end of file diff --git a/java-connectors/HEADER.txt b/java-connectors/HEADER.txt new file mode 100644 index 000000000..8828f9cd2 --- /dev/null +++ b/java-connectors/HEADER.txt @@ -0,0 +1,13 @@ +Copyright 2017-${year} ${name} Ltd + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/java-connectors/build.gradle b/java-connectors/build.gradle new file mode 100644 index 000000000..c7c74b793 --- /dev/null +++ b/java-connectors/build.gradle @@ -0,0 +1,177 @@ +plugins { + id 'com.github.johnrengelman.shadow' version '8.1.1' + id 'org.cadixdev.licenser' version '0.6.1' + id 'java' + id 'java-library' +} + +allprojects { + + group = "io.lenses.streamreactor" + version = "6.4.0-SNAPSHOT" + description = "stream-reactor" + + apply plugin: 'java' + apply plugin: 'java-library' + apply plugin: 'com.github.johnrengelman.shadow' + apply plugin: 'org.cadixdev.licenser' + + java { + setSourceCompatibility(JavaVersion.VERSION_11) + setTargetCompatibility(JavaVersion.VERSION_11) + } + + ext { + //DEPENDENCY VERSIONS + lombokVersion = '1.18.30' + kafkaVersion = '3.7.0' + logbackVersion = '1.4.14' + jUnitVersion = '5.9.1' + mockitoJupiterVersion = '5.10.0' + apacheToConfluentVersionAxis = ["2.8.1": "6.2.2", "3.3.0": "7.3.1"] + + //Other Manifest Info + mainClassName = '' + gitCommitHash = ("git rev-parse HEAD").execute().text.trim() + gitTag = ("git describe --abbrev=0 --tags").execute().text.trim() + gitRepo = ("git remote get-url origin").execute().text.trim() + + //for jar building + rootRelease = "${project.rootDir}/release/" + versionDir = "${rootRelease}/${project.description}-${project.version}" + confDir = "${versionDir}/conf" + libsDir = "${versionDir}/libs" + } + + repositories { + mavenCentral() + maven { + url "https://packages.confluent.io/maven/" + } + } + + dependencies { + //logback + implementation group: 'ch.qos.logback', name: 'logback-classic', version: logbackVersion + + //lombok + compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.30' + annotationProcessor group: 'org.projectlombok', name: 'lombok', version: lombokVersion + + //tests + testImplementation group: 'org.mockito', name: 'mockito-core', version: mockitoJupiterVersion + testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: mockitoJupiterVersion + testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.25.3' + + } + + test { + useJUnitPlatform() + + maxHeapSize = '1G' + + testLogging { + events "passed" + } + } + + license { + include("**/**.java", "**/**Test.java") + exclude("**/kcql/antlr4/**.java") //antlr generated files + header = project.file("${project.rootDir}/HEADER.txt") + newLine = false + + style { + java = 'BLOCK_COMMENT' + } + + properties { + name = 'Lenses.io' + year = LocalDate.now().year + } + } + + jar { + manifest { + attributes("StreamReactor-Version": project.version, + "Kafka-Version": kafkaVersion, + "Created-By": "Lenses", + "Created-At": new Date().format("YYYYMMDDHHmm"), + "Git-Repo": gitRepo, + "Git-Commit-Hash": gitCommitHash, + "Git-Tag": gitTag, + "StreamReactor-Docs": "https://docs.lenses.io/connectors/" + ) + } + } + + shadowJar { + + manifest { + attributes("StreamReactor-Version": project.version, + "Kafka-Version": kafkaVersion, + "Created-By": "Lenses", + "Created-At": new Date().format("YYYYMMDDHHmm"), + "Git-Repo": gitRepo, + "Git-Commit-Hash": gitCommitHash, + "Git-Tag": gitTag, + "StreamReactor-Docs": "https://docs.lenses.io/connectors/" + ) + } + configurations = [project.configurations.compileClasspath] + //archiveBaseName = "${project.name}-${project.version}-${kafkaVersion}-all" + zip64 true + + mergeServiceFiles { + exclude "META-INF/*.SF" + exclude "META-INF/*.DSA" + exclude "META-INF/*.RSA" + } + + //shadowing antlr packages in order to avoid conflict when using kafka connect + relocate('org.antlr', 'lshaded.antlr') + + dependencies { +// // UNCOMMENT BELOW IF NEED CLEAN JAR +// exclude(dependency { +// it.moduleGroup != 'io.lenses.streamreactor' +// }) +// exclude(dependency('org.apache.logging.log4j:log4j-core:2.11.1')) +// exclude(dependency("org.apache.avro:.*")) +// exclude(dependency("org.apache.kafka:.*")) +// exclude(dependency("io.confluent:.*")) +// exclude(dependency("org.apache.kafka:.*")) +// exclude(dependency("org.apache.zookeeper:.*")) +// exclude(dependency("com.google.guava:guava:28.1-android")) + } + + } + compileJava.dependsOn("checkLicenses") + + task fatJar(dependsOn: [test, jar, shadowJar]) + + task collectFatJar(type: Copy, dependsOn: [fatJar]) { + from("${buildDir}/libs").include("kafka-connect-*-all.jar") + .exclude("*-common-*").into(libsDir) + } +} + +task prepareRelease(dependsOn: [collectFatJar]) { + dependsOn subprojects.collectFatJar +} + +task releaseModuleList() { + def nonReleaseModules = ["java-reactor", "kafka-connect-cloud-common", + "kafka-connect-common", "kafka-connect-query-language"] + + def modulesFile = new File("gradle-modules.txt") + modulesFile.delete() + modulesFile.createNewFile() + + def modulesBuilder = new StringBuilder("[") + allprojects.name.stream() + .filter {moduleName -> !nonReleaseModules.contains(moduleName)} + .forEach {moduleName -> modulesBuilder.append("\"" + moduleName + "\",") } + modulesBuilder.deleteCharAt(modulesBuilder.lastIndexOf(",")).append("]") + modulesFile.append(modulesBuilder) +} diff --git a/java-connectors/config/checkstyle/checkstyle.xml b/java-connectors/config/checkstyle/checkstyle.xml new file mode 100644 index 000000000..c731fe024 --- /dev/null +++ b/java-connectors/config/checkstyle/checkstyle.xml @@ -0,0 +1,379 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/java-connectors/config/checkstyle/suppressions.xml b/java-connectors/config/checkstyle/suppressions.xml new file mode 100644 index 000000000..3d20b94b9 --- /dev/null +++ b/java-connectors/config/checkstyle/suppressions.xml @@ -0,0 +1,11 @@ + + + + + + + + + diff --git a/java-connectors/gradle/wrapper/gradle-wrapper.jar b/java-connectors/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 000000000..249e5832f Binary files /dev/null and b/java-connectors/gradle/wrapper/gradle-wrapper.jar differ diff --git a/java-connectors/gradle/wrapper/gradle-wrapper.properties b/java-connectors/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 000000000..a1610a6f4 --- /dev/null +++ b/java-connectors/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Sat Feb 10 15:56:21 CET 2024 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/java-connectors/gradlew b/java-connectors/gradlew new file mode 100755 index 000000000..1b6c78733 --- /dev/null +++ b/java-connectors/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/java-connectors/gradlew.bat b/java-connectors/gradlew.bat new file mode 100644 index 000000000..ac1b06f93 --- /dev/null +++ b/java-connectors/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/java-connectors/kafka-connect-azure-eventhubs/build.gradle b/java-connectors/kafka-connect-azure-eventhubs/build.gradle new file mode 100644 index 000000000..60ea80b6c --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/build.gradle @@ -0,0 +1,19 @@ +project(':kafka-connect-azure-eventhubs') { + + + test { + maxParallelForks = 1 + } + + dependencies { + implementation project(':kafka-connect-common') + implementation project(':kafka-connect-query-language') + +// //azure-specific dependencies in case we want to change from kafka protocol +// implementation group: 'com.azure', name: 'azure-identity', version: '1.11.2' +// implementation group: 'com.azure', name: 'azure-messaging-eventhubs', version: '5.18.0' +// implementation group: 'com.azure', name: 'azure-storage-blob', version: '12.25.1' +// implementation group: 'com.azure', name: 'azure-messaging-eventhubs-checkpointstore-blob', version: '1.19.0' + + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/config/AzureEventHubsConfigConstants.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/config/AzureEventHubsConfigConstants.java new file mode 100644 index 000000000..35fab9bb6 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/config/AzureEventHubsConfigConstants.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.config; + +import io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector; + +/** + * Class represents Config Constants for AzureEventHubsSourceConnector Config Definition. + */ +public class AzureEventHubsConfigConstants { + + + private static final String DOT = "."; + public static final String OPTIONAL_EMPTY_DEFAULT = ""; + public static final String CONNECTOR_PREFIX = "connect.eventhubs"; + public static final String SOURCE_CONNECTOR_PREFIX = CONNECTOR_PREFIX + DOT + "source"; + + public static final String CONNECTOR_NAME = "name"; + public static final String CONNECTOR_NAME_DOC = "Connector's name"; + public static final String CONNECTOR_NAME_DEFAULT = AzureEventHubsSourceConnector.class.getSimpleName(); + + public static final String CONNECTOR_WITH_CONSUMER_PREFIX = + SOURCE_CONNECTOR_PREFIX + DOT + "connection.settings" + DOT; + public static final String CONSUMER_OFFSET = SOURCE_CONNECTOR_PREFIX + DOT + "default.offset"; + public static final String CONSUMER_OFFSET_DOC = + "Specifies whether by default we should consumer from earliest (default) or latest offset."; + public static final String CONSUMER_OFFSET_DEFAULT = "earliest"; + public static final String CONSUMER_CLOSE_TIMEOUT = SOURCE_CONNECTOR_PREFIX + DOT + "close.timeout"; + public static final String CONSUMER_CLOSE_TIMEOUT_DOC = + "Specifies timeout for consumer closing."; + public static final String CONSUMER_CLOSE_TIMEOUT_DEFAULT = "30"; + + public static final String KCQL_CONFIG = CONNECTOR_PREFIX + DOT + "kcql"; + public static final String KCQL_DOC = + "KCQL expression describing field selection and data routing to the target."; + +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/config/AzureEventHubsSourceConfig.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/config/AzureEventHubsSourceConfig.java new file mode 100644 index 000000000..fba2ccd28 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/config/AzureEventHubsSourceConfig.java @@ -0,0 +1,122 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.config; + +import io.lenses.streamreactor.common.config.base.BaseConfig; +import io.lenses.streamreactor.common.config.base.intf.ConnectorPrefixed; +import java.util.Map; +import java.util.Set; +import java.util.function.UnaryOperator; +import lombok.Getter; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +/** + * Class represents Config Definition for AzureEventHubsSourceConnector. It additionally adds + * configs from org.apache.kafka.clients.consumer.ConsumerConfig but adds standard Connector + * prefixes to them. + */ +public class AzureEventHubsSourceConfig extends BaseConfig implements ConnectorPrefixed { + + public static final String CONNECTION_GROUP = "Connection"; + + private static final UnaryOperator CONFIG_NAME_PREFIX_APPENDER = name -> + AzureEventHubsConfigConstants.CONNECTOR_WITH_CONSUMER_PREFIX + name; + + private static final Set EXCLUDED_CONSUMER_PROPERTIES = + Set.of(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + ConsumerConfig.GROUP_ID_CONFIG, ConsumerConfig.CLIENT_ID_CONFIG, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + + + @Getter + static ConfigDef configDefinition; + + static { + ConfigDef kafkaConsumerConfigToExpose = getKafkaConsumerConfigToExpose(); + configDefinition = new ConfigDef(kafkaConsumerConfigToExpose) + .define(AzureEventHubsConfigConstants.CONNECTOR_NAME, + Type.STRING, + AzureEventHubsConfigConstants.CONNECTOR_NAME_DEFAULT, + Importance.HIGH, + AzureEventHubsConfigConstants.CONNECTOR_NAME_DOC, + CONNECTION_GROUP, + 1, + ConfigDef.Width.LONG, + AzureEventHubsConfigConstants.CONNECTOR_NAME + ).define(AzureEventHubsConfigConstants.CONSUMER_CLOSE_TIMEOUT, + Type.INT, + AzureEventHubsConfigConstants.CONSUMER_CLOSE_TIMEOUT_DEFAULT, + Importance.MEDIUM, + AzureEventHubsConfigConstants.CONSUMER_CLOSE_TIMEOUT_DOC, + CONNECTION_GROUP, + 3, + ConfigDef.Width.LONG, + AzureEventHubsConfigConstants.CONSUMER_CLOSE_TIMEOUT + ) + .define(AzureEventHubsConfigConstants.CONSUMER_OFFSET, + Type.STRING, + AzureEventHubsConfigConstants.CONSUMER_OFFSET_DEFAULT, + Importance.MEDIUM, + AzureEventHubsConfigConstants.CONSUMER_OFFSET_DOC, + CONNECTION_GROUP, + 4, + ConfigDef.Width.LONG, + AzureEventHubsConfigConstants.CONSUMER_OFFSET + ).define(AzureEventHubsConfigConstants.KCQL_CONFIG, + Type.STRING, + Importance.HIGH, + AzureEventHubsConfigConstants.KCQL_DOC, + "Mappings", + 1, + ConfigDef.Width.LONG, + AzureEventHubsConfigConstants.KCQL_CONFIG + ); + } + + public AzureEventHubsSourceConfig(Map properties) { + super(AzureEventHubsConfigConstants.CONNECTOR_PREFIX, getConfigDefinition(), properties); + } + + /** + * Provides prefixed KafkaConsumerConfig key. + * + * @param kafkaConsumerConfigKey from org.apache.kafka.clients.consumer.ConsumerConfig + * @return prefixed key. + */ + public static String getPrefixedKafkaConsumerConfigKey(String kafkaConsumerConfigKey) { + return CONFIG_NAME_PREFIX_APPENDER.apply(kafkaConsumerConfigKey); + } + + @Override + public String connectorPrefix() { + return connectorPrefix; + } + + private static ConfigDef getKafkaConsumerConfigToExpose() { + ConfigDef kafkaConsumerConfigToExpose = new ConfigDef(); + ConsumerConfig.configDef().configKeys().values().stream() + .filter(configKey -> !EXCLUDED_CONSUMER_PROPERTIES.contains(configKey.name)) + .forEach(configKey -> kafkaConsumerConfigToExpose.define( + CONFIG_NAME_PREFIX_APPENDER.apply(configKey.name), + configKey.type, configKey.defaultValue, + configKey.importance, configKey.documentation, configKey.group, + configKey.orderInGroup, configKey.width, configKey.displayName)); + + return kafkaConsumerConfigToExpose; + } +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/config/SourceDataType.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/config/SourceDataType.java new file mode 100644 index 000000000..92722e393 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/config/SourceDataType.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.config; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.connect.data.Schema; + +/** + * Class to indicate what kind of data is being received from Kafka Consumer. + */ +@Getter +public enum SourceDataType { + + BYTES(ByteArrayDeserializer.class, Schema.OPTIONAL_BYTES_SCHEMA); + + private final Class deserializerClass; + private final Schema schema; + private static final Map NAME_TO_DATA_SERIALIZER_TYPE; + + static { + NAME_TO_DATA_SERIALIZER_TYPE = + Arrays.stream(values()).collect(Collectors.toMap(Enum::name, Function.identity())); + } + + SourceDataType(Class deserializerClass, Schema schema) { + this.deserializerClass = deserializerClass; + this.schema = schema; + } + + public static SourceDataType fromName(String name) { + return NAME_TO_DATA_SERIALIZER_TYPE.get(name.toUpperCase()); + } + + /** + * Class indicates what data types are being transferred by Task. + */ + @Getter + @EqualsAndHashCode + public static class KeyValueTypes { + private final SourceDataType keyType; + private final SourceDataType valueType; + public static final KeyValueTypes DEFAULT_TYPES = new KeyValueTypes(BYTES, BYTES); + + public KeyValueTypes(SourceDataType keyType, SourceDataType valueType) { + this.keyType = keyType; + this.valueType = valueType; + } + } +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/mapping/SourceRecordMapper.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/mapping/SourceRecordMapper.java new file mode 100644 index 000000000..0c5f0436c --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/mapping/SourceRecordMapper.java @@ -0,0 +1,80 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.mapping; + +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.source.SourceRecord; + +/** + * Class with utility method to convert to SourceRecord. + */ +public class SourceRecordMapper { + + /** + * Method to make SourceRecord out of ConsumerRecord including optional byte headers from original + * message. + * + * @param consumerRecord original consumer record + * @param partitionKey AzureTopicPartitionKey to indicate topic and partition + * @param offsetMap AzureOffsetMarker to indicate offset + * @param outputTopic Output topic for record + * @param keySchema Schema of the key + * @param valueSchema Schema of the value + * @return SourceRecord with headers + */ + public static SourceRecord mapSourceRecordIncludingHeaders( + ConsumerRecord consumerRecord, + Map partitionKey, Map offsetMap, + String outputTopic, Schema keySchema, Schema valueSchema) { + Iterable
headers = consumerRecord.headers(); + ConnectHeaders connectHeaders = new ConnectHeaders(); + for (Header header : headers) { + connectHeaders.add(header.key(), + new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, header.value())); + } + return new SourceRecord(partitionKey, offsetMap, + outputTopic, null, keySchema, consumerRecord.key(), + valueSchema, consumerRecord.value(), consumerRecord.timestamp(), + connectHeaders); + } + + /** + * Method to make SourceRecord out of ConsumerRecord including optional byte headers + * from original message. + * + * @param consumerRecord original consumer record + * @param partitionKey partitionKey to indicate topic and partition + * @param offsetMap AzureOffsetMarker to indicate offset + * @param outputTopic Output topic for record + * @param keySchema Schema of the key + * @param valueSchema Schema of the value + * @return SourceRecord without headers + */ + public static SourceRecord mapSourceRecordWithoutHeaders( + ConsumerRecord consumerRecord, + Map partitionKey, Map offsetMap, + String outputTopic, Schema keySchema, Schema valueSchema) { + return new SourceRecord(partitionKey, offsetMap, + outputTopic, null, keySchema, consumerRecord.key(), + valueSchema, consumerRecord.value(), consumerRecord.timestamp(), null); + } + +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureConsumerRebalancerListener.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureConsumerRebalancerListener.java new file mode 100644 index 000000000..2b800e759 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureConsumerRebalancerListener.java @@ -0,0 +1,80 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import io.lenses.streamreactor.connect.azure.eventhubs.source.TopicPartitionOffsetProvider.AzureOffsetMarker; +import io.lenses.streamreactor.connect.azure.eventhubs.source.TopicPartitionOffsetProvider.AzureTopicPartitionKey; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; + +/** + * This class is an implementation of {@link ConsumerRebalanceListener} that can be used to provide + * OnlyOnce support and seek consumers into relevant offsets if needed. + */ +@Slf4j +public class AzureConsumerRebalancerListener implements ConsumerRebalanceListener { + + private final boolean shouldSeekToLatest; + private final TopicPartitionOffsetProvider topicPartitionOffsetProvider; + private final Consumer kafkaConsumer; + + /** + * Constructs {@link AzureConsumerRebalancerListener} for particular Kafka Consumer. + * + * @param topicPartitionOffsetProvider provider of committed offsets + * @param kafkaConsumer Kafka Consumer + * @param shouldSeekToLatest informs whether we should seek to latest or earliest if no offsets found + */ + public AzureConsumerRebalancerListener( + TopicPartitionOffsetProvider topicPartitionOffsetProvider, + Consumer kafkaConsumer, boolean shouldSeekToLatest) { + this.topicPartitionOffsetProvider = topicPartitionOffsetProvider; + this.kafkaConsumer = kafkaConsumer; + this.shouldSeekToLatest = shouldSeekToLatest; + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + // implementation not needed, offsets already committed + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + List partitionsWithoutOffsets = new ArrayList<>(); + partitions.forEach(partition -> { + AzureTopicPartitionKey partitionKey = new AzureTopicPartitionKey( + partition.topic(), partition.partition()); + Optional partitionOffset = topicPartitionOffsetProvider.getOffset(partitionKey); + partitionOffset.ifPresentOrElse( + offset -> kafkaConsumer.seek(partition, offset.getOffsetValue()), + () -> partitionsWithoutOffsets.add(partition)); + }); + if (!partitionsWithoutOffsets.isEmpty()) { + if (shouldSeekToLatest) { + kafkaConsumer.seekToEnd(partitionsWithoutOffsets); + } else { + kafkaConsumer.seekToBeginning(partitionsWithoutOffsets); + } + } + } + +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceConnector.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceConnector.java new file mode 100644 index 000000000..d0d019f65 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceConnector.java @@ -0,0 +1,90 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import static io.lenses.streamreactor.common.util.AsciiArtPrinter.printAsciiHeader; + +import io.lenses.streamreactor.common.util.JarManifest; +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants; +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig; +import io.lenses.streamreactor.connect.azure.eventhubs.util.KcqlConfigTopicMapper; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.ExactlyOnceSupport; +import org.apache.kafka.connect.source.SourceConnector; + +/** + * Implementation of {@link SourceConnector} for Microsoft Azure EventHubs. + */ +@Slf4j +public class AzureEventHubsSourceConnector extends SourceConnector { + + private final JarManifest jarManifest = + new JarManifest(getClass().getProtectionDomain().getCodeSource().getLocation()); + private Map configProperties; + + @Override + public void start(Map props) { + configProperties = props; + parseAndValidateConfigs(props); + printAsciiHeader(jarManifest, "/azure-eventhubs-ascii.txt"); + } + + @Override + public Class taskClass() { + return AzureEventHubsSourceTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + log.info("Setting task configurations for {} workers.", maxTasks); + List> taskConfigs = new ArrayList<>(maxTasks); + + IntStream.range(0, maxTasks).forEach(task -> taskConfigs.add(configProperties)); + return taskConfigs; + } + + @Override + public ExactlyOnceSupport exactlyOnceSupport(Map connectorConfig) { + return ExactlyOnceSupport.SUPPORTED; + } + + @Override + public void stop() { + // connector-specific implementation not needed + } + + @Override + public ConfigDef config() { + return AzureEventHubsSourceConfig.getConfigDefinition(); + } + + @Override + public String version() { + return jarManifest.getVersion(); + } + + private static void parseAndValidateConfigs(Map props) { + AzureEventHubsSourceConfig azureEventHubsSourceConfig = new AzureEventHubsSourceConfig(props); + String kcqlMappings = azureEventHubsSourceConfig.getString(AzureEventHubsConfigConstants.KCQL_CONFIG); + KcqlConfigTopicMapper.mapInputToOutputsFromConfig(kcqlMappings); + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceTask.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceTask.java new file mode 100644 index 000000000..8c63aa08d --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceTask.java @@ -0,0 +1,109 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import static java.util.Optional.ofNullable; + +import io.lenses.streamreactor.common.util.JarManifest; +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants; +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig; +import io.lenses.streamreactor.connect.azure.eventhubs.util.KcqlConfigTopicMapper; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +/** + * Implementation of {@link SourceTask} for Microsoft Azure EventHubs. + */ +@Slf4j +public class AzureEventHubsSourceTask extends SourceTask { + + private static final Duration ONE_SECOND_DURATION = Duration.of(1, ChronoUnit.SECONDS); + private Duration closeTimeout; + private static final int RECORDS_QUEUE_DEFAULT_SIZE = 10; + private final JarManifest jarManifest; + private EventHubsKafkaConsumerController eventHubsKafkaConsumerController; + private BlockingQueueProducerProvider blockingQueueProducerProvider; + + public AzureEventHubsSourceTask() { + jarManifest = new JarManifest(getClass().getProtectionDomain().getCodeSource().getLocation()); + } + + public AzureEventHubsSourceTask(JarManifest jarManifest) { + this.jarManifest = jarManifest; + } + + @Override + public String version() { + return jarManifest.getVersion(); + } + + @Override + public void start(Map props) { + OffsetStorageReader offsetStorageReader = ofNullable(this.context).flatMap( + context -> ofNullable(context.offsetStorageReader())).orElseThrow(); + AzureEventHubsSourceConfig azureEventHubsSourceConfig = new AzureEventHubsSourceConfig(props); + TopicPartitionOffsetProvider topicPartitionOffsetProvider = new TopicPartitionOffsetProvider(offsetStorageReader); + + ArrayBlockingQueue> recordsQueue = new ArrayBlockingQueue<>( + RECORDS_QUEUE_DEFAULT_SIZE); + Map inputToOutputTopics = KcqlConfigTopicMapper.mapInputToOutputsFromConfig( + azureEventHubsSourceConfig.getString(AzureEventHubsConfigConstants.KCQL_CONFIG)); + blockingQueueProducerProvider = new BlockingQueueProducerProvider(topicPartitionOffsetProvider); + KafkaByteBlockingQueuedProducer producer = blockingQueueProducerProvider.createProducer( + azureEventHubsSourceConfig, recordsQueue, inputToOutputTopics); + EventHubsKafkaConsumerController kafkaConsumerController = new EventHubsKafkaConsumerController( + producer, recordsQueue, inputToOutputTopics); + initialize(kafkaConsumerController, azureEventHubsSourceConfig); + } + + /** + * Initializes the Task. This method shouldn't be called if start() was already called with + * {@link EventHubsKafkaConsumerController} instance. + * + * @param eventHubsKafkaConsumerController {@link EventHubsKafkaConsumerController} for this task + * @param azureEventHubsSourceConfig config for task + */ + public void initialize(EventHubsKafkaConsumerController eventHubsKafkaConsumerController, + AzureEventHubsSourceConfig azureEventHubsSourceConfig) { + this.eventHubsKafkaConsumerController = eventHubsKafkaConsumerController; + closeTimeout = + Duration.of(azureEventHubsSourceConfig.getInt(AzureEventHubsConfigConstants.CONSUMER_CLOSE_TIMEOUT), + ChronoUnit.SECONDS); + log.info("{} initialised.", getClass().getSimpleName()); + } + + + @Override + public List poll() throws InterruptedException { + List poll = + eventHubsKafkaConsumerController.poll(ONE_SECOND_DURATION); + return poll.isEmpty() ? null : poll; + } + + @Override + public void stop() { + ofNullable(eventHubsKafkaConsumerController) + .ifPresent(consumerController -> consumerController.close(closeTimeout)); + } +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/BlockingQueueProducer.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/BlockingQueueProducer.java new file mode 100644 index 000000000..3cf3e54b9 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/BlockingQueueProducer.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import java.time.Duration; + +/** + * Interface of stoppable BockingQueue Producer. + */ +public interface BlockingQueueProducer { + + /** + * Method to start production to specified BlockingQueue. + */ + void start(); + + /** + * Method to stop production to specified BlockingQueue. + * + * @param timeoutDuration maximum time to stop the Producer + */ + void stop(Duration timeoutDuration); + +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/BlockingQueueProducerProvider.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/BlockingQueueProducerProvider.java new file mode 100644 index 000000000..c9b3babec --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/BlockingQueueProducerProvider.java @@ -0,0 +1,104 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants; +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig; +import io.lenses.streamreactor.connect.azure.eventhubs.config.SourceDataType.KeyValueTypes; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.ConfigException; + +/** + * Provider for BlockingQueuedKafkaConsumers. + */ +@Slf4j +public class BlockingQueueProducerProvider implements ProducerProvider { + + private static final boolean STRIP_PREFIX = true; + private static final String EARLIEST_OFFSET = "earliest"; + private static final String LATEST_OFFSET = "latest"; + private static final String CONSUMER_OFFSET_EXCEPTION_MESSAGE = + "allowed values are: earliest/latest"; + private final TopicPartitionOffsetProvider topicPartitionOffsetProvider; + + + public BlockingQueueProducerProvider(TopicPartitionOffsetProvider topicPartitionOffsetProvider) { + this.topicPartitionOffsetProvider = topicPartitionOffsetProvider; + } + + /** + * Instantiates BlockingQueuedKafkaConsumer from given properties. + * + * @param azureEventHubsSourceConfig Config of Task + * @param recordBlockingQueue BlockingQueue for ConsumerRecords + * @param inputToOutputTopics map of input to output topics + * @return BlockingQueuedKafkaConsumer instance. + */ + public KafkaByteBlockingQueuedProducer createProducer( + AzureEventHubsSourceConfig azureEventHubsSourceConfig, + BlockingQueue> recordBlockingQueue, + Map inputToOutputTopics) { + String connectorName = azureEventHubsSourceConfig.getString(AzureEventHubsConfigConstants.CONNECTOR_NAME); + final String clientId = connectorName + "#" + UUID.randomUUID(); + log.info("Attempting to create Client with Id:{}", clientId); + KeyValueTypes keyValueTypes = KeyValueTypes.DEFAULT_TYPES; + + Map consumerProperties = prepareConsumerProperties(azureEventHubsSourceConfig, + clientId, connectorName, keyValueTypes); + + KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties); + + boolean shouldSeekToLatest = shouldConsumerSeekToLatest(azureEventHubsSourceConfig); + Set inputTopics = inputToOutputTopics.keySet(); + + return new KafkaByteBlockingQueuedProducer(topicPartitionOffsetProvider, recordBlockingQueue, + kafkaConsumer, keyValueTypes, clientId, inputTopics, shouldSeekToLatest); + } + + private static Map prepareConsumerProperties( + AzureEventHubsSourceConfig azureEventHubsSourceConfig, String clientId, String connectorName, + KeyValueTypes keyValueTypes) { + Map consumerProperties = azureEventHubsSourceConfig.originalsWithPrefix( + AzureEventHubsConfigConstants.CONNECTOR_WITH_CONSUMER_PREFIX, STRIP_PREFIX); + + consumerProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, connectorName); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + keyValueTypes.getKeyType().getDeserializerClass()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + keyValueTypes.getValueType().getDeserializerClass()); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + return consumerProperties; + } + + private boolean shouldConsumerSeekToLatest(AzureEventHubsSourceConfig azureEventHubsSourceConfig) { + String seekValue = azureEventHubsSourceConfig.getString(AzureEventHubsConfigConstants.CONSUMER_OFFSET); + if (EARLIEST_OFFSET.equalsIgnoreCase(seekValue)) { + return false; + } else if (LATEST_OFFSET.equalsIgnoreCase(seekValue)) { + return true; + } + throw new ConfigException(AzureEventHubsConfigConstants.CONSUMER_OFFSET, seekValue, + CONSUMER_OFFSET_EXCEPTION_MESSAGE); + } +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/EventHubsKafkaConsumerController.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/EventHubsKafkaConsumerController.java new file mode 100644 index 000000000..6ff27db9f --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/EventHubsKafkaConsumerController.java @@ -0,0 +1,109 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import static io.lenses.streamreactor.connect.azure.eventhubs.mapping.SourceRecordMapper.mapSourceRecordIncludingHeaders; + +import io.lenses.streamreactor.connect.azure.eventhubs.source.TopicPartitionOffsetProvider.AzureOffsetMarker; +import io.lenses.streamreactor.connect.azure.eventhubs.source.TopicPartitionOffsetProvider.AzureTopicPartitionKey; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.connect.source.SourceRecord; + +/** + * Class is a bridge between EventHub KafkaConsumers and AzureEventHubsSourceTask. It verifies the + * configuration of kafka consumers and instantiates them, then allows AzureEventHubsSourceTask to + * pull for SourceRecords. + */ +@Slf4j +public class EventHubsKafkaConsumerController { + + private final BlockingQueue> recordsQueue; + private KafkaByteBlockingQueuedProducer queuedKafkaProducer; + private final Map inputToOutputTopics; + + /** + * Constructs EventHubsKafkaConsumerController. + * + * @param queuedKafkaProducer producer to the recordsQueue + * @param recordsQueue queue that contains EventHub records + * @param inputToOutputTopics input to output topics + */ + public EventHubsKafkaConsumerController(KafkaByteBlockingQueuedProducer queuedKafkaProducer, + BlockingQueue> recordsQueue, + Map inputToOutputTopics) { + this.recordsQueue = recordsQueue; + this.queuedKafkaProducer = queuedKafkaProducer; + this.inputToOutputTopics = inputToOutputTopics; + } + + /** + * This method leverages BlockingQueue mechanism that BlockingQueuedKafkaConsumer puts EventHub + * records into. It tries to poll the queue then returns list of SourceRecords + * + * @param duration how often to poll. + * @return list of SourceRecords (can be empty if it couldn't poll from queue) + * @throws InterruptedException if interrupted while polling + */ + public List poll(Duration duration) throws InterruptedException { + List sourceRecords = null; + + queuedKafkaProducer.start(); + + ConsumerRecords consumerRecords = null; + try { + consumerRecords = recordsQueue.poll( + duration.get(ChronoUnit.SECONDS), TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.info("{} has been interrupted on poll", this.getClass().getSimpleName()); + throw e; + } + + if (consumerRecords != null && !consumerRecords.isEmpty()) { + sourceRecords = new ArrayList<>(consumerRecords.count()); + for (ConsumerRecord consumerRecord : consumerRecords) { + + String inputTopic = consumerRecord.topic(); + AzureTopicPartitionKey azureTopicPartitionKey = new AzureTopicPartitionKey( + inputTopic, consumerRecord.partition()); + AzureOffsetMarker offsetMarker = new AzureOffsetMarker(consumerRecord.offset()); + + SourceRecord sourceRecord = mapSourceRecordIncludingHeaders(consumerRecord, + azureTopicPartitionKey, + offsetMarker, inputToOutputTopics.get(inputTopic), + queuedKafkaProducer.getKeyValueTypes().getKeyType().getSchema(), + queuedKafkaProducer.getKeyValueTypes().getValueType().getSchema()); + + sourceRecords.add(sourceRecord); + + } + } + return sourceRecords != null ? sourceRecords : Collections.emptyList(); + } + + public void close(Duration timeoutDuration) { + queuedKafkaProducer.stop(timeoutDuration); + } +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/KafkaByteBlockingQueuedProducer.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/KafkaByteBlockingQueuedProducer.java new file mode 100644 index 000000000..712b73e8c --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/KafkaByteBlockingQueuedProducer.java @@ -0,0 +1,123 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import io.lenses.streamreactor.connect.azure.eventhubs.config.SourceDataType.KeyValueTypes; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +/** + * Abstraction over Kafka {@link Consumer} class that wraps the Consumer into a thread and allows + * it to output its records into a {@link BlockingQueue} shared with {@link EventHubsKafkaConsumerController}. + */ +@Slf4j +public class KafkaByteBlockingQueuedProducer implements BlockingQueueProducer { + private static final Duration DEFAULT_POLL_DURATION = Duration.of(1, ChronoUnit.SECONDS); + private final TopicPartitionOffsetProvider topicPartitionOffsetProvider; + private final BlockingQueue> recordsQueue; + private final Consumer consumer; + private final String clientId; + private final Set inputTopics; + private EventhubsPollingRunnable pollingRunnable; + private final boolean shouldSeekToLatest; + @Getter + private final KeyValueTypes keyValueTypes; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private final AtomicBoolean running = new AtomicBoolean(false); + + /** + * Class is a proxy that allows access to some methods of Kafka Consumer. It's main purpose is to + * create a thread around the consumer and put consumer record into BlockingQueue. After that it + * starts consumption. + * + * @param topicPartitionOffsetProvider TopicPartitionOffsetProvider for subscription handler + * @param recordsQueue BlockingQueue to put records into + * @param consumer Kafka Consumer + * @param keyValueTypes {@link KeyValueTypes} instance indicating key and value + * types + * @param clientId consumer client id + * @param inputTopics kafka inputTopics to consume from + * @param shouldSeekToLatest informs where should consumer seek when there are no + * offsets committed + */ + public KafkaByteBlockingQueuedProducer(TopicPartitionOffsetProvider topicPartitionOffsetProvider, + BlockingQueue> recordsQueue, Consumer consumer, + KeyValueTypes keyValueTypes, String clientId, Set inputTopics, boolean shouldSeekToLatest) { + this.topicPartitionOffsetProvider = topicPartitionOffsetProvider; + this.recordsQueue = recordsQueue; + this.consumer = consumer; + this.clientId = clientId; + this.inputTopics = inputTopics; + this.shouldSeekToLatest = shouldSeekToLatest; + this.keyValueTypes = keyValueTypes; + + start(); + } + + /** + * Starts the production to the records queue. + */ + public void start() { + if (!initialized.getAndSet(true)) { + pollingRunnable = new EventhubsPollingRunnable(); + + new Thread(pollingRunnable).start(); + initialized.set(true); + } + } + + public void stop(Duration timeoutDuration) { + pollingRunnable.close(timeoutDuration); + running.set(false); + } + + private class EventhubsPollingRunnable implements Runnable { + + @Override + public void run() { + running.set(true); + log.info("Subscribing to topics: {}", String.join(",", inputTopics)); + consumer.subscribe(inputTopics, + new AzureConsumerRebalancerListener(topicPartitionOffsetProvider, consumer, shouldSeekToLatest)); + while (running.get()) { + ConsumerRecords consumerRecords = consumer.poll(DEFAULT_POLL_DURATION); + if (consumerRecords != null && !consumerRecords.isEmpty()) { + try { + boolean offer = false; + while (!offer) { + offer = recordsQueue.offer(consumerRecords, 5, TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Kafka Consumer with clientId={} has been interrupted on offering", clientId); + } + } + } + } + + void close(Duration timeoutDuration) { + consumer.close(timeoutDuration); + } + } +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/ProducerProvider.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/ProducerProvider.java new file mode 100644 index 000000000..6b69640c1 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/ProducerProvider.java @@ -0,0 +1,31 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +/** + * Interface of a class to produce BlockingQueueProducers. + */ +public interface ProducerProvider { + + BlockingQueueProducer createProducer(AzureEventHubsSourceConfig azureEventHubsSourceConfig, + BlockingQueue> recordBlockingQueue, + Map inputToOutputTopics); +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/TopicPartitionOffsetProvider.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/TopicPartitionOffsetProvider.java new file mode 100644 index 000000000..6b798d130 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/source/TopicPartitionOffsetProvider.java @@ -0,0 +1,91 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import java.util.HashMap; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +/** + * This class represents an abstraction over OffsetStorageReader that can be freely called by Azure + * EventHub Kafka Consumers when it was initialized once. It helps consumers to find out which + * offset were already committed inside Kafka Connect. + */ +@Slf4j +public final class TopicPartitionOffsetProvider { + + private static final String OFFSET_KEY = "OFFSET"; + + private final OffsetStorageReader offsetStorageReader; + + + public TopicPartitionOffsetProvider(OffsetStorageReader offsetStorageReader) { + this.offsetStorageReader = offsetStorageReader; + } + + /** + * Checks for committed offsets for topic+partition combo. + * + * @param azureTopicPartitionKey key of topic+partition combo. + * + * @return empty optional if topic+partition combo has not committed any offsets or + * AzureOffsetMarker if combo already did commit some. + */ + public Optional getOffset(AzureTopicPartitionKey azureTopicPartitionKey) { + return Optional.ofNullable(offsetStorageReader.offset(azureTopicPartitionKey)) + .map(offsetMap -> (Long) offsetMap.get(OFFSET_KEY)) + .map(AzureOffsetMarker::new); + } + + /** + * This class represents immutable map that represents topic and partition combo used by + * TopicPartitionOffsetProvider. + */ + public static class AzureTopicPartitionKey extends HashMap { + + private static final String TOPIC_KEY = "TOPIC"; + private static final String PARTITION_KEY = "PARTITION"; + + public AzureTopicPartitionKey(String topic, Integer partition) { + this.put(TOPIC_KEY, topic); + this.put(PARTITION_KEY, partition.toString()); + } + + public String getTopic() { + return get(TOPIC_KEY); + } + + public Integer getPartition() { + return Integer.valueOf(get(PARTITION_KEY)); + } + } + + /** + * This class represents immutable map that represents topic and partition combo offset used by + * Kafka Connect SourceRecords. + */ + public static class AzureOffsetMarker extends HashMap { + + public AzureOffsetMarker(Long offset) { + put(OFFSET_KEY, offset); + } + + public Long getOffsetValue() { + return (Long) get(OFFSET_KEY); + } + } +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/util/KcqlConfigTopicMapper.java b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/util/KcqlConfigTopicMapper.java new file mode 100644 index 000000000..fc3f5b627 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/java/io/lenses/streamreactor/connect/azure/eventhubs/util/KcqlConfigTopicMapper.java @@ -0,0 +1,77 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.util; + +import io.lenses.kcql.Kcql; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.common.config.ConfigException; + +/** + * Class that represents methods around KCQL topic handling. + */ +public class KcqlConfigTopicMapper { + + private static final String TOPIC_NAME_REGEX = "^[\\w][\\w\\-\\_\\.]*$"; + private static final Pattern TOPIC_NAME_PATTERN = Pattern.compile(TOPIC_NAME_REGEX); + public static final String TOPIC_NAME_ERROR_MESSAGE = + "%s topic %s, name is not correctly specified (It can contain only letters, numbers and hyphens," + + " underscores and dots and has to start with number or letter"; + + /** + * This method parses KCQL statements and fetches input and output topics checking against + * regex for invalid topic names in input and output. + * @param kcqlString string to parse + * @return map of input to output topic names + */ + public static Map mapInputToOutputsFromConfig(String kcqlString) { + List kcqls = Kcql.parseMultiple(kcqlString); + Map inputToOutputTopics = new HashMap<>(kcqls.size()); + List outputTopics = new ArrayList<>(kcqls.size()); + + for (Kcql kcql : kcqls) { + String inputTopic = kcql.getSource(); + String outputTopic = kcql.getTarget(); + + if (!topicNameMatchesAgainstRegex(inputTopic)) { + throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Input", inputTopic)); + } + if (!topicNameMatchesAgainstRegex(outputTopic)) { + throw new ConfigException(String.format(TOPIC_NAME_ERROR_MESSAGE, "Output", outputTopic)); + } + if (inputToOutputTopics.containsKey(inputTopic)) { + throw new ConfigException(String.format("Input %s cannot be mapped twice.", inputTopic)); + } + if (outputTopics.contains(outputTopic)) { + throw new ConfigException(String.format("Output %s cannot be mapped twice.", outputTopic)); + } + + inputToOutputTopics.put(inputTopic, outputTopic); + outputTopics.add(outputTopic); + } + + return inputToOutputTopics; + } + + private static boolean topicNameMatchesAgainstRegex(String topicName) { + final Matcher matcher = TOPIC_NAME_PATTERN.matcher(topicName); + return matcher.matches(); + } +} diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/java-connectors/kafka-connect-azure-eventhubs/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 000000000..5c06b3db4 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1 @@ +io.lenses.streamreactor.connect.azure.eventhubs.source.AzureEventHubsSourceConnector \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/main/resources/azure-eventhubs-ascii.txt b/java-connectors/kafka-connect-azure-eventhubs/src/main/resources/azure-eventhubs-ascii.txt new file mode 100644 index 000000000..0311112e7 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/main/resources/azure-eventhubs-ascii.txt @@ -0,0 +1,14 @@ + + ████████▀▀▀▀▀███████████████████████████████████████████████████████████████████ + █████▀ ▀████████████████████████████████████████████████████████████████ + ███▀ ▄█████▄ ▀██████████████████████████████████████████████████████████████ + ███ ▄███████▄ ██████ █████▌ █▌ ████ ███ ▄▄ ██ ███ ▄▄ ███ + ███ █████████ ██████ █████▌ ██████▌ ▀██ ██ ██████ ██████ ███████ + ███ ▀███████▀ ██████ █████▌ ██▌ █▄ █ ███▄▄ ██ ███▄▄ ███ + ████▄ ▄███████ █████▌ ██████▌ ███ ███████ █ ███████████ ██ + █████████ ████████████ ▌ █▌ ████▄ ██▄ ▄██ █▄ ▄███ + █████████ ████████████████████████████████████████████████████████████████████ + █████████ ▄████████████████████████████████████████████████████████████████████ + ████████████████████████████████████████████████████████████████████████████████ + █████████████████████████AZURE EVENT HUBS ██████████████████████████████████████ + ████████████████████████████████████████████████████████████████████████████████ diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/config/SourceDataTypeTest.java b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/config/SourceDataTypeTest.java new file mode 100644 index 000000000..6f2f0a829 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/config/SourceDataTypeTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.config; + +import static org.apache.kafka.connect.data.Schema.OPTIONAL_BYTES_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.connect.data.Schema; +import org.junit.jupiter.api.Test; + +class SourceDataTypeTest { + + @Test + void fromNameShouldReturnEnum() { + //given + + //when + SourceDataType s = SourceDataType.fromName(SourceDataType.BYTES.name()); + + //then + assertEquals(SourceDataType.BYTES, s); + } + + @Test + void getDeserializerClassShouldReturnSpecifiedDeserializer() { + //given + + //when + Class deserializerClass = SourceDataType.BYTES.getDeserializerClass(); + + //then + assertEquals(ByteArrayDeserializer.class, deserializerClass); + } + + @Test + void getSchema() { + //given + + //when + Schema schema = SourceDataType.BYTES.getSchema(); + + //then + assertEquals(OPTIONAL_BYTES_SCHEMA, schema); + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/mapping/SourceRecordMapperTest.java b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/mapping/SourceRecordMapperTest.java new file mode 100644 index 000000000..c5e1ecb35 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/mapping/SourceRecordMapperTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.mapping; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.from; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.lenses.streamreactor.connect.azure.eventhubs.source.TopicPartitionOffsetProvider.AzureOffsetMarker; +import io.lenses.streamreactor.connect.azure.eventhubs.source.TopicPartitionOffsetProvider.AzureTopicPartitionKey; +import java.util.Iterator; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Test; + +class SourceRecordMapperTest { + private static final String TOPIC = "topic"; + private static final Integer PARTITION = 10; + private static final Long OFFSET = 111L; + private static final Long TIMESTAMP = 2024L; + private static final String HEADER_KEY = "headerKey"; + private static final String OUTPUT_TOPIC = "OUTPUT"; + + @Test + void shouldMapSourceRecordIncludingHeaders() { + //given + AzureTopicPartitionKey topicPartitionKey = new AzureTopicPartitionKey(TOPIC, PARTITION); + AzureOffsetMarker azureOffsetMarker = new AzureOffsetMarker(OFFSET); + + byte[] exampleHeaderValue = new byte[] {1, 10}; + int headerLength = exampleHeaderValue.length; + Header mockedHeader = mock(Header.class); + when(mockedHeader.key()).thenReturn(HEADER_KEY); + when(mockedHeader.value()).thenReturn(exampleHeaderValue); + + Iterator
iterator = singletonList(mockedHeader).iterator(); + Headers mockedHeaders = mock(Headers.class); + when(mockedHeaders.iterator()).thenReturn(iterator); + + ConsumerRecord consumerRecord = mockConsumerRecord(Optional.of(mockedHeaders)); + + //when + Schema stringSchema = Schema.STRING_SCHEMA; + Schema optionalStringSchema = Schema.OPTIONAL_STRING_SCHEMA; + SourceRecord sourceRecord = SourceRecordMapper.mapSourceRecordIncludingHeaders( + consumerRecord, topicPartitionKey, azureOffsetMarker, + OUTPUT_TOPIC, optionalStringSchema, stringSchema); + + //then + assertRecordAttributesAreMappedFromSourceConsumerRecord(sourceRecord, consumerRecord, + OUTPUT_TOPIC, optionalStringSchema, stringSchema, topicPartitionKey, azureOffsetMarker); + verify(consumerRecord).headers(); + assertThat(sourceRecord.headers()).hasSize(1); + assertThat(((byte[])sourceRecord.headers().lastWithName(HEADER_KEY).value())).hasSize(headerLength); + } + + @Test + void mapSourceRecordWithoutHeaders() { + //given + AzureTopicPartitionKey topicPartitionKey = new AzureTopicPartitionKey(TOPIC, PARTITION); + AzureOffsetMarker azureOffsetMarker = new AzureOffsetMarker(OFFSET); + + ConsumerRecord consumerRecord = mockConsumerRecord(Optional.empty()); + + //when + Schema stringSchema = Schema.STRING_SCHEMA; + Schema optionalStringSchema = Schema.OPTIONAL_STRING_SCHEMA; + SourceRecord sourceRecord = SourceRecordMapper.mapSourceRecordWithoutHeaders( + consumerRecord, topicPartitionKey, azureOffsetMarker, OUTPUT_TOPIC, + optionalStringSchema, stringSchema); + + //then + assertRecordAttributesAreMappedFromSourceConsumerRecord(sourceRecord, consumerRecord, + OUTPUT_TOPIC, optionalStringSchema, stringSchema, topicPartitionKey, azureOffsetMarker); + assertThat(sourceRecord.headers()).isEmpty(); + } + + private static ConsumerRecord mockConsumerRecord(Optional mockedHeaders) { + ConsumerRecord consumerRecord = mock(ConsumerRecord.class); + when(consumerRecord.topic()).thenReturn(TOPIC); + when(consumerRecord.partition()).thenReturn(PARTITION); + when(consumerRecord.timestamp()).thenReturn(TIMESTAMP); + + mockedHeaders.ifPresent(headers -> when(consumerRecord.headers()).thenReturn(headers)); + return consumerRecord; + } + + private void assertRecordAttributesAreMappedFromSourceConsumerRecord(SourceRecord mappedRecord, + ConsumerRecord originalRecord, String outputTopic, Schema keySchema, Schema valueSchema, + AzureTopicPartitionKey sourcePartitionKey, AzureOffsetMarker offsetMarker) { + verify(originalRecord).timestamp(); + verify(originalRecord).key(); + verify(originalRecord).value(); + + assertThat(mappedRecord) + .returns(originalRecord.timestamp(), from(SourceRecord::timestamp)) + .returns(sourcePartitionKey, from(SourceRecord::sourcePartition)) + .returns(null, from(SourceRecord::kafkaPartition)) + .returns(offsetMarker, from(SourceRecord::sourceOffset)) + .returns(originalRecord.value(), from(SourceRecord::value)) + .returns(outputTopic, from(SourceRecord::topic)) + .returns(keySchema, from(SourceRecord::keySchema)) + .returns(valueSchema, from(SourceRecord::valueSchema)); + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureConsumerRebalancerListenerTest.java b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureConsumerRebalancerListenerTest.java new file mode 100644 index 000000000..03a0e41e5 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureConsumerRebalancerListenerTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.lenses.streamreactor.connect.azure.eventhubs.source.TopicPartitionOffsetProvider.AzureOffsetMarker; +import io.lenses.streamreactor.connect.azure.eventhubs.source.TopicPartitionOffsetProvider.AzureTopicPartitionKey; +import java.util.Collections; +import java.util.Optional; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; + +class AzureConsumerRebalancerListenerTest { + + private final boolean SEEK_TO_EARLIEST = false; + private final boolean SEEK_TO_LATEST = true; + + @Test + void onPartitionsAssignedShouldSeekToBeginningIfOffsetProviderProvidesEmptyOffsetAndSeekingToEarliest() { + //given + Consumer stringKafkaConsumer = mock(Consumer.class); + TopicPartitionOffsetProvider offsetProvider = mock( + TopicPartitionOffsetProvider.class); + AzureConsumerRebalancerListener testObj = + new AzureConsumerRebalancerListener(offsetProvider, stringKafkaConsumer, SEEK_TO_EARLIEST); + String topic = "topic1"; + Integer partition = 1; + TopicPartition topicPartition1 = mock(TopicPartition.class); + when(topicPartition1.topic()).thenReturn(topic); + when(topicPartition1.partition()).thenReturn(partition); + + //when + testObj.onPartitionsAssigned(Collections.singletonList(topicPartition1)); + + //then + verify(topicPartition1, times(1)).topic(); + verify(topicPartition1, times(1)).partition(); + verify(stringKafkaConsumer).seekToBeginning(anyList()); + } + + @Test + void onPartitionsAssignedShouldSeekToEndIfOffsetProviderProvidesEmptyOffsetAndSeekingToLatest() { + //given + Consumer stringKafkaConsumer = mock(Consumer.class); + TopicPartitionOffsetProvider offsetProvider = mock( + TopicPartitionOffsetProvider.class); + AzureConsumerRebalancerListener testObj = + new AzureConsumerRebalancerListener(offsetProvider, stringKafkaConsumer, SEEK_TO_LATEST); + String topic = "topic1"; + Integer partition = 1; + TopicPartition topicPartition1 = mock(TopicPartition.class); + when(topicPartition1.topic()).thenReturn(topic); + when(topicPartition1.partition()).thenReturn(partition); + + //when + testObj.onPartitionsAssigned(Collections.singletonList(topicPartition1)); + + //then + verify(topicPartition1, times(1)).topic(); + verify(topicPartition1, times(1)).partition(); + verify(stringKafkaConsumer).seekToEnd(anyList()); + } + + @Test + void onPartitionsAssignedShouldSeekToSpecificOffsetIfOffsetProviderProvidesIt() { + //given + Long specificOffset = 100L; + Consumer stringKafkaConsumer = mock(Consumer.class); + TopicPartitionOffsetProvider offsetProvider = mock( + TopicPartitionOffsetProvider.class); + when(offsetProvider.getOffset(any(AzureTopicPartitionKey.class))) + .thenReturn(Optional.of(new AzureOffsetMarker(specificOffset))); + AzureConsumerRebalancerListener testObj = + new AzureConsumerRebalancerListener(offsetProvider, stringKafkaConsumer, SEEK_TO_EARLIEST); + String topic = "topic1"; + Integer partition = 1; + TopicPartition topicPartition1 = mock(TopicPartition.class); + when(topicPartition1.topic()).thenReturn(topic); + when(topicPartition1.partition()).thenReturn(partition); + + //when + testObj.onPartitionsAssigned(Collections.singletonList(topicPartition1)); + + //then + verify(topicPartition1, times(1)).topic(); + verify(topicPartition1, times(1)).partition(); + verify(stringKafkaConsumer).seek(topicPartition1, specificOffset); + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceConnectorTest.java b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceConnectorTest.java new file mode 100644 index 000000000..79c6957b4 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceConnectorTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import static org.apache.kafka.connect.source.ExactlyOnceSupport.SUPPORTED; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.lenses.streamreactor.common.util.JarManifest; +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.connect.source.ExactlyOnceSupport; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; + +class AzureEventHubsSourceConnectorTest { + + private static final String CONNECTOR_NAME = "AzureEventHubsSource"; + private static final String KCQL = "INSERT INTO OUTPUT1 SELECT * FROM INPUT1;"; + private AzureEventHubsSourceConnector testObj; + + @BeforeEach + void setUp() { + try (MockedConstruction ignored = Mockito.mockConstruction(JarManifest.class)) { + testObj = new AzureEventHubsSourceConnector(); + } + } + + @Test + void taskConfigsShouldMultiplyConfigs() throws NoSuchFieldException, IllegalAccessException { + //given + Map simpleProperties = createSimplePropertiesWithKcql(); + int maxTasks = 3; + + //when + Field configPropertiesField = testObj.getClass() + .getDeclaredField("configProperties"); + configPropertiesField.setAccessible(true); + configPropertiesField.set(testObj, simpleProperties); + List> taskConfigs = testObj.taskConfigs(maxTasks); + + //then + for (Map taskConfig : taskConfigs){ + assertTrue(taskConfig.equals(simpleProperties)); + } + + } + + @Test + void exactlyOnceSupportShouldReturnSupported() { + //given + + //when + ExactlyOnceSupport exactlyOnceSupport = testObj.exactlyOnceSupport(new HashMap<>()); + + //then + assertEquals(SUPPORTED, exactlyOnceSupport); + } + + private Map createSimplePropertiesWithKcql() { + Map properties = Map.of( + AzureEventHubsConfigConstants.CONNECTOR_NAME, CONNECTOR_NAME, + "connector.class", AzureEventHubsSourceConnector.class.getCanonicalName(), + AzureEventHubsConfigConstants.KCQL_CONFIG, KCQL + ); + + return properties; + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceTaskTest.java b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceTaskTest.java new file mode 100644 index 000000000..da709bfa8 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/AzureEventHubsSourceTaskTest.java @@ -0,0 +1,144 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import io.lenses.streamreactor.common.util.JarManifest; +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants; +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; + +class AzureEventHubsSourceTaskTest { + + private AzureEventHubsSourceTask testObj; + private ListAppender logWatcher; + private JarManifest mockedJarManifest = mock(JarManifest.class); + + @BeforeEach + void setup() { + mockedJarManifest = mock(JarManifest.class); + testObj = new AzureEventHubsSourceTask(mockedJarManifest); + logWatcher = new ListAppender<>(); + logWatcher.start(); + ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(AzureEventHubsSourceTask.class)).addAppender(logWatcher); + } + + @AfterEach + void teardown() { + ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(AzureEventHubsSourceTask.class)).detachAndStopAllAppenders(); + } + + @Test + void stopShouldCallStopOnController() { + //given + Duration thirtySeconds = Duration.ofSeconds(30); + EventHubsKafkaConsumerController mockedController = mock(EventHubsKafkaConsumerController.class); + AzureEventHubsSourceConfig azureEventHubsSourceConfig = mock(AzureEventHubsSourceConfig.class); + when(azureEventHubsSourceConfig.getInt(AzureEventHubsConfigConstants.CONSUMER_CLOSE_TIMEOUT)) + .thenReturn(thirtySeconds.toSecondsPart()); + testObj.initialize(mockedController, azureEventHubsSourceConfig); + + //when + testObj.stop(); + + //then + verify(mockedController, times(1)).close(thirtySeconds); + } + + @Test + void initializeShouldLog() { + //given + EventHubsKafkaConsumerController mockedController = mock(EventHubsKafkaConsumerController.class); + AzureEventHubsSourceConfig azureEventHubsSourceConfig = mock(AzureEventHubsSourceConfig.class); + testObj.initialize(mockedController, azureEventHubsSourceConfig); + + //when + testObj.stop(); + + //then + assertEquals(1, logWatcher.list.size()); + assertEquals("AzureEventHubsSourceTask initialised.", logWatcher.list.get(0).getFormattedMessage()); + } + + @Test + void pollShouldCallPollOnControllerAndReturnNullIfListIsEmpty() throws InterruptedException { + //given + AzureEventHubsSourceConfig azureEventHubsSourceConfig = mock(AzureEventHubsSourceConfig.class); + EventHubsKafkaConsumerController mockedController = mock(EventHubsKafkaConsumerController.class); + testObj.initialize(mockedController, azureEventHubsSourceConfig); + when(mockedController.poll(any(Duration.class))).thenReturn(Collections.emptyList()); + + //when + List poll = testObj.poll(); + + //then + assertNull(poll); + } + + @Test + void pollShouldCallPollOnControllerAndReturnListThatHasElements() throws InterruptedException { + //given + AzureEventHubsSourceConfig azureEventHubsSourceConfig = mock(AzureEventHubsSourceConfig.class); + EventHubsKafkaConsumerController mockedController = mock(EventHubsKafkaConsumerController.class); + testObj.initialize(mockedController, azureEventHubsSourceConfig); + SourceRecord mockedRecord = mock(SourceRecord.class); + List sourceRecords = Collections.singletonList(mockedRecord); + when(mockedController.poll(any(Duration.class))).thenReturn(sourceRecords); + + //when + List poll = testObj.poll(); + + //then + assertNotNull(poll); + assertIterableEquals(sourceRecords, poll); + } + + @Test + void getVersionShouldDelegateToJarManifestGetVersion() { + //given + AzureEventHubsSourceConfig azureEventHubsSourceConfig = mock(AzureEventHubsSourceConfig.class); + EventHubsKafkaConsumerController mockedController = mock(EventHubsKafkaConsumerController.class); + testObj.initialize(mockedController, azureEventHubsSourceConfig); + final String SOME_VERSION = "SOME_VERSION"; + when(mockedJarManifest.getVersion()).thenReturn(SOME_VERSION); + + //when + String version = testObj.version(); + + //then + assertEquals(SOME_VERSION, version); + verify(mockedJarManifest, atMostOnce()).getVersion(); + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/BlockingQueueProducerProviderTest.java b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/BlockingQueueProducerProviderTest.java new file mode 100644 index 000000000..3fe1007b7 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/BlockingQueueProducerProviderTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsConfigConstants; +import io.lenses.streamreactor.connect.azure.eventhubs.config.AzureEventHubsSourceConfig; +import java.util.HashMap; +import java.util.concurrent.ArrayBlockingQueue; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import org.slf4j.LoggerFactory; + +class BlockingQueueProducerProviderTest { + + private ListAppender logWatcher; + + @BeforeEach + void setup() { + logWatcher = new ListAppender<>(); + logWatcher.start(); + ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(BlockingQueueProducerProvider.class)).addAppender(logWatcher); + } + + @AfterEach + void teardown() { + ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(BlockingQueueProducerProvider.class)).detachAndStopAllAppenders(); + } + + @Test + void whenConstructorInvokedWithoutOffsetParameterThenConfigExceptionIsThrown(){ + //given + AzureEventHubsSourceConfig azureConfigMock = mock(AzureEventHubsSourceConfig.class); + TopicPartitionOffsetProvider mockedOffsetProvider = mock(TopicPartitionOffsetProvider.class); + + + //when + BlockingQueueProducerProvider testObj = new BlockingQueueProducerProvider( + mockedOffsetProvider); + ConfigException configException; + try(MockedConstruction ignored = Mockito.mockConstruction(KafkaConsumer.class)){ + configException = assertThrows(ConfigException.class, () -> { + testObj.createProducer(azureConfigMock, new ArrayBlockingQueue<>(1), + new HashMap<>()); + }); + } + + + //then + assertEquals("Invalid value null for configuration connect.eventhubs.source.default.offset: " + + "allowed values are: earliest/latest", configException.getMessage()); + } + + @Test + void whenConstructorInvokedWithParametersThenMockKafkaConsumerShouldBeCreatedAndLogged(){ + //given + String earliestOffset = "earliest"; + TopicPartitionOffsetProvider mockedOffsetProvider = mock(TopicPartitionOffsetProvider.class); + + AzureEventHubsSourceConfig azureConfigMock = mock(AzureEventHubsSourceConfig.class); + when(azureConfigMock.getString(AzureEventHubsConfigConstants.CONSUMER_OFFSET)).thenReturn( + earliestOffset); + when(azureConfigMock.getString(AzureEventHubsConfigConstants.KCQL_CONFIG)) + .thenReturn("insert into output select * from input"); + + //when + BlockingQueueProducerProvider testObj = new BlockingQueueProducerProvider( + mockedOffsetProvider); + KafkaByteBlockingQueuedProducer consumer; + try(MockedConstruction ignored = Mockito.mockConstruction(KafkaConsumer.class)){ + consumer = testObj.createProducer(azureConfigMock, new ArrayBlockingQueue<>(1), + new HashMap<>()); + } + + //then + verify(azureConfigMock).getString(AzureEventHubsConfigConstants.CONNECTOR_NAME); + assertNotNull(consumer); + assertEquals(1, logWatcher.list.size()); + assertTrue(logWatcher.list.get(0).getFormattedMessage().startsWith("Attempting to create Client with Id")); + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/EventHubsKafkaConsumerControllerTest.java b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/EventHubsKafkaConsumerControllerTest.java new file mode 100644 index 000000000..da82a2f29 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/EventHubsKafkaConsumerControllerTest.java @@ -0,0 +1,183 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.lenses.streamreactor.connect.azure.eventhubs.config.SourceDataType; +import io.lenses.streamreactor.connect.azure.eventhubs.config.SourceDataType.KeyValueTypes; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Test; + +class EventHubsKafkaConsumerControllerTest { + + private static final String INPUT_TOPIC = "INPUT"; + private static final String OUTPUT_TOPIC = "OUTPUT"; + private static final String INPUT_TOPIC_2 = "INPUT2"; + private static final String OUTPUT_TOPIC_2 = "OUTPUT2"; + private static final int DEFAULT_CAPACITY = 10; + private static final Duration DURATION_2_SECONDS = Duration.of(2, ChronoUnit.SECONDS); + + private EventHubsKafkaConsumerController testObj; + + @Test + void pollShouldPollQueueAndReturnSourceRecords() throws InterruptedException { + //given + Map inputOutputMap = Map.of(INPUT_TOPIC, OUTPUT_TOPIC); + + SourceDataType mockedKeyDataType = mockSourceDataType(); + SourceDataType mockedValueDataType = mockSourceDataType(); + + KeyValueTypes mockedKeyValueTypes = mockKeyValueTypes(mockedKeyDataType, mockedValueDataType); + + KafkaByteBlockingQueuedProducer mockedBlockingProducer = mockByteBlockingProducer(mockedKeyValueTypes); + + ArrayBlockingQueue> recordsQueue = mockRecordsQueue(INPUT_TOPIC); + + //when + testObj = new EventHubsKafkaConsumerController(mockedBlockingProducer, recordsQueue, + inputOutputMap); + List sourceRecords = testObj.poll(DURATION_2_SECONDS); + + //then + verify(mockedBlockingProducer).start(); + verify(mockedKeyDataType, times(1)).getSchema(); + verify(mockedValueDataType, times(1)).getSchema(); + verify(mockedBlockingProducer, times(2)).getKeyValueTypes(); + assertEquals(1, sourceRecords.size()); + } + + @Test + void pollWithMultipleInputsAndOutputsShouldPollQueueAndReturnSourceRecordsToCorrectOutput() + throws InterruptedException { + //given + Map inputOutputMap = Map.of(INPUT_TOPIC, OUTPUT_TOPIC, INPUT_TOPIC_2, + OUTPUT_TOPIC_2); + + SourceDataType mockedKeyDataType = mockSourceDataType(); + SourceDataType mockedValueDataType = mockSourceDataType(); + + KeyValueTypes mockedKeyValueTypes = mockKeyValueTypes(mockedKeyDataType, mockedValueDataType); + + KafkaByteBlockingQueuedProducer mockedBlockingProducer = mockByteBlockingProducer(mockedKeyValueTypes); + + ArrayBlockingQueue> recordsQueue = mockRecordsQueue(INPUT_TOPIC, INPUT_TOPIC_2); + + //when + testObj = new EventHubsKafkaConsumerController(mockedBlockingProducer, recordsQueue, + inputOutputMap); + List sourceRecords = testObj.poll(DURATION_2_SECONDS); + + //then + verify(mockedBlockingProducer).start(); + verify(mockedKeyDataType, times(2)).getSchema(); //1x both records + verify(mockedValueDataType, times(2)).getSchema(); //1x both records + verify(mockedBlockingProducer, times(4)).getKeyValueTypes(); + assertEquals(2, sourceRecords.size()); + assertEquals(OUTPUT_TOPIC, sourceRecords.get(0).topic()); + assertEquals(OUTPUT_TOPIC_2, sourceRecords.get(1).topic()); + } + + @Test + void closeShouldCloseTheProducer() { + //given + KafkaByteBlockingQueuedProducer mockedBlockingProducer = mock( + KafkaByteBlockingQueuedProducer.class); + + Map inputOutputMap = Map.of(INPUT_TOPIC, OUTPUT_TOPIC); + + ArrayBlockingQueue> recordsQueue = mockRecordsQueue(); + + testObj = new EventHubsKafkaConsumerController(mockedBlockingProducer, recordsQueue, + inputOutputMap); + + //when + testObj.close(DURATION_2_SECONDS); + + //then + verify(mockedBlockingProducer).stop(DURATION_2_SECONDS); + } + + private static SourceDataType mockSourceDataType() { + SourceDataType mockedDataType = mock(SourceDataType.class); + when(mockedDataType.getSchema()).thenReturn(Schema.OPTIONAL_STRING_SCHEMA); + return mockedDataType; + } + + private static KeyValueTypes mockKeyValueTypes(SourceDataType keyDataType, SourceDataType valueDataType) { + KeyValueTypes mockedKeyValueTypes = mock(KeyValueTypes.class); + when(mockedKeyValueTypes.getKeyType()).thenReturn(keyDataType); + when(mockedKeyValueTypes.getValueType()).thenReturn(valueDataType); + return mockedKeyValueTypes; + } + + private static ConsumerRecord mockConsumerRecord( + String inputTopic, Optional headersMock) { + ConsumerRecord consumerRecord = mock(ConsumerRecord.class); + headersMock.ifPresent(headers -> when(consumerRecord.headers()).thenReturn(headers)); + when(consumerRecord.topic()).thenReturn(inputTopic); + return consumerRecord; + } + + private static Headers mockEmptyHeaders() { + Headers headersMock = mock(Headers.class); + when(headersMock.iterator()).thenReturn(Collections.emptyIterator()); + return headersMock; + } + + + private static KafkaByteBlockingQueuedProducer mockByteBlockingProducer(KeyValueTypes mockedKeyValueTypes) { + KafkaByteBlockingQueuedProducer mockedBlockingProducer = mock( + KafkaByteBlockingQueuedProducer.class); + when(mockedBlockingProducer.getKeyValueTypes()).thenReturn(mockedKeyValueTypes); + return mockedBlockingProducer; + } + + private static ArrayBlockingQueue> mockRecordsQueue + (String... inputTopics) { + + Headers headersMock = mockEmptyHeaders(); + + List> consumerRecordList = Arrays.stream(inputTopics) + .map(it -> mockConsumerRecord(it, Optional.of(headersMock))) + .collect(Collectors.toList()); + + ConsumerRecords mockedRecords = mock(ConsumerRecords.class); + when(mockedRecords.count()).thenReturn(consumerRecordList.size()); + when(mockedRecords.iterator()).thenReturn(consumerRecordList.iterator()); + + return new ArrayBlockingQueue<>(DEFAULT_CAPACITY, false, List.of(mockedRecords)); + } + + +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/KafkaByteBlockingQueuedProducerTest.java b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/KafkaByteBlockingQueuedProducerTest.java new file mode 100644 index 000000000..844803dee --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/KafkaByteBlockingQueuedProducerTest.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.lenses.streamreactor.connect.azure.eventhubs.config.SourceDataType.KeyValueTypes; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.BlockingQueue; +import org.apache.kafka.clients.consumer.Consumer; +import org.junit.jupiter.api.Test; +import org.mockito.internal.util.collections.Sets; + +class KafkaByteBlockingQueuedProducerTest { + + private static final String CLIENT_ID = "clientId"; + private static Consumer consumer = mock(Consumer.class); + + KafkaByteBlockingQueuedProducer testObj = new KafkaByteBlockingQueuedProducer( + mock(TopicPartitionOffsetProvider.class), mock(BlockingQueue.class), + consumer, KeyValueTypes.DEFAULT_TYPES, + CLIENT_ID, Sets.newSet("topic"), false); + + @Test + void closeShouldBeDelegatedToKafkaConsumer() { + //given + Duration tenSeconds = Duration.of(10, ChronoUnit.SECONDS); + + //when + testObj.stop(tenSeconds); + + //then + verify(consumer).close(eq(tenSeconds)); + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/TopicPartitionOffsetProviderTest.java b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/TopicPartitionOffsetProviderTest.java new file mode 100644 index 000000000..33490cbc3 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/source/TopicPartitionOffsetProviderTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.source; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.lenses.streamreactor.connect.azure.eventhubs.source.TopicPartitionOffsetProvider.AzureOffsetMarker; +import io.lenses.streamreactor.connect.azure.eventhubs.source.TopicPartitionOffsetProvider.AzureTopicPartitionKey; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.junit.jupiter.api.Test; + +class TopicPartitionOffsetProviderTest { + + @Test + void getOffsetShouldCallOffsetStorageReader() { + //given + OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class); + TopicPartitionOffsetProvider topicPartitionOffsetProvider = new TopicPartitionOffsetProvider( + offsetStorageReader); + String topic = "some_topic"; + Integer partition = 1; + + //when + AzureTopicPartitionKey azureTopicPartitionKey = new AzureTopicPartitionKey(topic, partition); + topicPartitionOffsetProvider.getOffset(azureTopicPartitionKey); + + //then + verify(offsetStorageReader).offset(azureTopicPartitionKey); + } + + @Test + void getOffsetShouldReturnEmptyOptionalIfCommitsNotFound() { + //given + OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class); + when(offsetStorageReader.offset(any(Map.class))).thenReturn(new HashMap()); + String topic = "some_topic"; + Integer partition = 1; + TopicPartitionOffsetProvider topicPartitionOffsetProvider = new TopicPartitionOffsetProvider( + offsetStorageReader); + + //when + AzureTopicPartitionKey azureTopicPartitionKey = new AzureTopicPartitionKey(topic, partition); + Optional offset = topicPartitionOffsetProvider.getOffset(azureTopicPartitionKey); + + //then + verify(offsetStorageReader).offset(azureTopicPartitionKey); + assertTrue(offset.isEmpty()); + } + + @Test + void getOffsetShouldReturnValidAzureOffsetMarkerIfCommitsFound() { + //given + long offsetOne = 1L; + String OFFSET_KEY = "OFFSET"; + OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class); + HashMap offsets = new HashMap<>(); + offsets.put(OFFSET_KEY, offsetOne); + when(offsetStorageReader.offset(any(Map.class))).thenReturn(offsets); + String topic = "some_topic"; + Integer partition = 1; + TopicPartitionOffsetProvider topicPartitionOffsetProvider = new TopicPartitionOffsetProvider( + offsetStorageReader); + + //when + AzureTopicPartitionKey azureTopicPartitionKey = new AzureTopicPartitionKey(topic, partition); + Optional offset = topicPartitionOffsetProvider.getOffset(azureTopicPartitionKey); + + //then + verify(offsetStorageReader).offset(azureTopicPartitionKey); + assertTrue(offset.isPresent()); + assertEquals(offsetOne, offset.get().getOffsetValue()); + } + + @Test + void azureTopicPartitionKeyShouldReturnTopicAndPartitionValues() { + //given + int partition = 10; + String topic = "topic"; + + //when + final AzureTopicPartitionKey azureTopicPartitionKey = new AzureTopicPartitionKey(topic, partition); + + //then + assertEquals(partition, azureTopicPartitionKey.getPartition()); + assertEquals(topic, azureTopicPartitionKey.getTopic()); + } + +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/util/KcqlConfigTopicMapperTest.java b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/util/KcqlConfigTopicMapperTest.java new file mode 100644 index 000000000..047f940f2 --- /dev/null +++ b/java-connectors/kafka-connect-azure-eventhubs/src/test/java/io/lenses/streamreactor/connect/azure/eventhubs/util/KcqlConfigTopicMapperTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.azure.eventhubs.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; + +class KcqlConfigTopicMapperTest { + + @Test + void mapInputToOutputsFromConfigForMultipleKcqlStatementsShouldRetunMapOfInputToOutput() { + //given + int numberOfMappings = 3; + List inputs = new ArrayList<>(numberOfMappings); + List outputs = new ArrayList<>(numberOfMappings); + String kcqlTemplate = "insert into %s select * from %s;"; + StringBuilder fullKcql = new StringBuilder(); + + for (int i = 0; i < numberOfMappings; i++) { + String newInput = "INPUT" + i; + String newOutput = "OUTPUT" + i; + + inputs.add(i, newInput); + outputs.add(i, newOutput); + fullKcql.append(String.format(kcqlTemplate, newOutput, newInput)); + } + //when + Map inputToOutputsFromConfig = KcqlConfigTopicMapper.mapInputToOutputsFromConfig( + fullKcql.toString()); + + //then + for (String input : inputToOutputsFromConfig.keySet()){ + int indexOfInput = inputs.indexOf(input); + assertNotEquals(-1, indexOfInput); + assertEquals(inputs.get(indexOfInput), input); + assertEquals(outputs.get(indexOfInput), inputToOutputsFromConfig.get(input)); + } + } + + @Test + void mapInputToOutputsFromConfigShouldntAllowForIllegalNames() { + //given + String illegalInputKcql = "INSERT INTO OUTPUT SELECT * FROM 'INPUT*_'"; + String illegalOutputKcql = "INSERT INTO 'OUTPUT*_' SELECT * FROM INPUT"; + String inputErrorMessage = "Input topic INPUT*_, name is not correctly specified " + + "(It can contain only letters, numbers and hyphens, underscores and " + + "dots and has to start with number or letter"; + String outputErrorMessage = "Output topic OUTPUT*_, name is not correctly specified " + + "(It can contain only letters, numbers and hyphens, underscores and " + + "dots and has to start with number or letter"; + + //when + mapInputToOutputAddertingExceptionWithSpecificMessage(illegalInputKcql, inputErrorMessage); + + mapInputToOutputAddertingExceptionWithSpecificMessage(illegalOutputKcql, outputErrorMessage); + } + + @Test + void mapInputToOutputsFromConfigShouldntAllowForOneToManyMappings() { + //given + String oneInputKcql = "INSERT INTO OUTPUT1 SELECT * FROM INPUT1;"; + String sameInputKcql = "INSERT INTO OUTPUT2 SELECT * FROM INPUT1;"; + String outputErrorMessage = "Input INPUT1 cannot be mapped twice."; + + //when + mapInputToOutputAddertingExceptionWithSpecificMessage(oneInputKcql + sameInputKcql, + outputErrorMessage); + } + + @Test + void mapInputToOutputsFromConfigShouldntAllowForMiltipleInputsToSameOutput() { + //given + String oneInputKcql = "INSERT INTO OUTPUT1 SELECT * FROM INPUT1;"; + String anotherInputToSameOutputKcql = "INSERT INTO OUTPUT1 SELECT * FROM INPUT2;"; + String outputErrorMessage = "Output OUTPUT1 cannot be mapped twice."; + + //when + mapInputToOutputAddertingExceptionWithSpecificMessage( + oneInputKcql + anotherInputToSameOutputKcql, + outputErrorMessage); + } + + private static void mapInputToOutputAddertingExceptionWithSpecificMessage(String illegalKcql, + String expectedMessage) { + assertThrows(ConfigException.class, + () -> KcqlConfigTopicMapper.mapInputToOutputsFromConfig(illegalKcql), expectedMessage); + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-common/build.gradle b/java-connectors/kafka-connect-common/build.gradle new file mode 100644 index 000000000..d4f372950 --- /dev/null +++ b/java-connectors/kafka-connect-common/build.gradle @@ -0,0 +1,17 @@ +project(":kafka-connect-common") { + test { + maxParallelForks = 1 + } + + dependencies { + //apache kafka + api group: 'org.apache.kafka', name: 'connect-json', version: kafkaVersion + api group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion + + //confluent - may be needed soon +// implementation group: 'io.confluent', name: 'kafka-json-schema-serializer', version: apacheToConfluentVersionAxis.get(kafkaVersion) +// implementation group: 'io.confluent', name: 'kafka-connect-avro-converter', version: apacheToConfluentVersionAxis.get(kafkaVersion) +// implementation group: 'io.confluent', name: 'kafka-connect-avro-data', version: apacheToConfluentVersionAxis.get(kafkaVersion) +// implementation group: 'io.confluent', name: 'kafka-connect-protobuf-converter', version: apacheToConfluentVersionAxis.get(kafkaVersion) + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/BaseConfig.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/BaseConfig.java new file mode 100644 index 000000000..6259e23d5 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/BaseConfig.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.config.base; + +import java.util.Map; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +/** + * Class represents base implementation of {@link AbstractConfig}. + */ +public abstract class BaseConfig extends AbstractConfig { + + protected final String connectorPrefix; + + public BaseConfig(String connectorPrefix, ConfigDef definition, Map properties) { + super(definition, properties); + this.connectorPrefix = connectorPrefix; + } +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/intf/ConnectorPrefixed.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/intf/ConnectorPrefixed.java new file mode 100644 index 000000000..69d977014 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/config/base/intf/ConnectorPrefixed.java @@ -0,0 +1,23 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.config.base.intf; + +/** + * Represents classes that has a Connector prefix (mostly Configurations). + */ +public interface ConnectorPrefixed { + String connectorPrefix(); +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/exception/ConnectorStartupException.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/exception/ConnectorStartupException.java new file mode 100644 index 000000000..01488d01e --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/exception/ConnectorStartupException.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.exception; + +/** + * Exception to indicate there's something wrong during Connector's startup. + */ +public class ConnectorStartupException extends RuntimeException { + + public ConnectorStartupException(Throwable cause) { + super(cause); + } +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/exception/InputStreamExtractionException.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/exception/InputStreamExtractionException.java new file mode 100644 index 000000000..a3cf1ee4c --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/exception/InputStreamExtractionException.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.exception; + +/** + * Exception to indicate there's something wrong when reading from InputStream. + */ +public class InputStreamExtractionException extends RuntimeException { + + public InputStreamExtractionException(Throwable cause) { + super(cause); + } +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/AsciiArtPrinter.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/AsciiArtPrinter.java new file mode 100644 index 000000000..854da5f20 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/AsciiArtPrinter.java @@ -0,0 +1,49 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import static java.util.Optional.ofNullable; + +import io.lenses.streamreactor.common.exception.InputStreamExtractionException; +import java.io.InputStream; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; + +/** + * Class used to print Lenses ASCII art. + */ +@Slf4j +public class AsciiArtPrinter { + + /** + * Method fetches ASCII art and logs it. If it cannot display ASCII art then it logs a + * warning with cause. + * + * @param jarManifest JarManifest of Connector + * @param asciiArtResource URI to ASCII art + */ + public static void printAsciiHeader(JarManifest jarManifest, String asciiArtResource) { + try { + Optional asciiArtStream = ofNullable( + AsciiArtPrinter.class.getResourceAsStream(asciiArtResource)); + asciiArtStream.ifPresent(inputStream -> log.info(InputStreamHandler.extractString(inputStream))); + } catch (InputStreamExtractionException exception) { + log.warn("Unable display ASCIIArt from input stream.", exception); + } + log.info(jarManifest.buildManifestString()); + } + +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/InputStreamHandler.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/InputStreamHandler.java new file mode 100644 index 000000000..453fed129 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/InputStreamHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import io.lenses.streamreactor.common.exception.InputStreamExtractionException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import lombok.extern.slf4j.Slf4j; + +/** + * Helper class to allow easy manipulation of InputStreams. + */ +@Slf4j +public class InputStreamHandler { + + /** + * Extracts String from InputStream byte buffer. Method is using char type buffer of + * length of 1024 which should make it performant but not take too much space by JVM. + * + * @param inputStream {@link InputStream to read} + * @return String representation of inputStream + * @throws InputStreamExtractionException with cause + */ + public static String extractString(InputStream inputStream) { + int bufferSize = 1024; + char[] buffer = new char[bufferSize]; + StringBuilder out = new StringBuilder(); + try (Reader in = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) { + for (int numRead; (numRead = in.read(buffer, 0, buffer.length)) > 0; ) { + out.append(buffer, 0, numRead); + } + } catch (IOException ioException) { + throw new InputStreamExtractionException(ioException); + } + return out.toString(); + } + +} diff --git a/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/JarManifest.java b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/JarManifest.java new file mode 100644 index 000000000..0306b5389 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/java/io/lenses/streamreactor/common/util/JarManifest.java @@ -0,0 +1,142 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import static io.lenses.streamreactor.common.util.JarManifest.ManifestAttributes.GIT_HASH; +import static io.lenses.streamreactor.common.util.JarManifest.ManifestAttributes.GIT_REPO; +import static io.lenses.streamreactor.common.util.JarManifest.ManifestAttributes.GIT_TAG; +import static io.lenses.streamreactor.common.util.JarManifest.ManifestAttributes.KAFKA_VER; +import static io.lenses.streamreactor.common.util.JarManifest.ManifestAttributes.REACTOR_DOCS; +import static io.lenses.streamreactor.common.util.JarManifest.ManifestAttributes.REACTOR_VER; +import static java.util.Optional.of; +import static java.util.Optional.ofNullable; + +import io.lenses.streamreactor.common.exception.ConnectorStartupException; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.jar.Attributes; +import java.util.jar.JarFile; +import java.util.jar.Manifest; + +/** + * Class that reads JAR Manifest files so we can easily get some of the properties from it. + */ +public class JarManifest { + + private static final String UNKNOWN = "unknown"; + private static final String NEW_LINE = System.getProperty("line.separator"); + private static final String SEMICOLON = ":"; + private final Map jarAttributes = new HashMap<>(); + + /** + * Creates JarManifest. + * @param location Jar file location + */ + public JarManifest(URL location) { + Manifest manifest; + + try (JarFile jarFile = new JarFile(new File(location.toURI()))) { + manifest = jarFile.getManifest(); + } catch (URISyntaxException | IOException e) { + throw new ConnectorStartupException(e); + } + extractMainAttributes(manifest.getMainAttributes()); + } + + /** + * Creates JarManifest. + * @param jarFile + */ + public JarManifest(JarFile jarFile) { + Manifest manifest; + try { + Optional jarFileOptional = of(jarFile); + manifest = jarFileOptional.get().getManifest(); + } catch (NullPointerException | IOException e) { + throw new ConnectorStartupException(e); + } + extractMainAttributes(manifest.getMainAttributes()); + } + + private void extractMainAttributes(Attributes mainAttributes) { + jarAttributes.put(REACTOR_VER.getAttributeName(), + ofNullable(mainAttributes.getValue(REACTOR_VER.getAttributeName())).orElse(UNKNOWN)); + jarAttributes.put(KAFKA_VER.getAttributeName(), + ofNullable(mainAttributes.getValue(KAFKA_VER.getAttributeName())).orElse(UNKNOWN)); + jarAttributes.put(GIT_REPO.getAttributeName(), + ofNullable(mainAttributes.getValue(GIT_REPO.getAttributeName())).orElse(UNKNOWN)); + jarAttributes.put(GIT_HASH.getAttributeName(), + ofNullable(mainAttributes.getValue(GIT_HASH.getAttributeName())).orElse(UNKNOWN)); + jarAttributes.put(GIT_TAG.getAttributeName(), + ofNullable(mainAttributes.getValue(GIT_TAG.getAttributeName())).orElse(UNKNOWN)); + jarAttributes.put(REACTOR_DOCS.getAttributeName(), + ofNullable(mainAttributes.getValue(REACTOR_DOCS.getAttributeName())).orElse(UNKNOWN)); + } + + /** + * Get StreamReactor version. + */ + public String getVersion() { + return jarAttributes.getOrDefault(REACTOR_VER.getAttributeName(), ""); + } + + /** + * Get all manifest file attributes in a {@link String} form. + */ + public String buildManifestString() { + StringBuilder manifestBuilder = new StringBuilder(); + manifestBuilder.append(REACTOR_VER.attributeName).append(SEMICOLON) + .append(jarAttributes.get(REACTOR_VER.getAttributeName())).append(NEW_LINE); + manifestBuilder.append(KAFKA_VER.attributeName).append(SEMICOLON) + .append(jarAttributes.get(KAFKA_VER.getAttributeName())).append(NEW_LINE); + manifestBuilder.append(GIT_REPO.attributeName).append(SEMICOLON) + .append(jarAttributes.get(GIT_REPO.getAttributeName())).append(NEW_LINE); + manifestBuilder.append(GIT_HASH.attributeName).append(SEMICOLON) + .append(jarAttributes.get(GIT_HASH.getAttributeName())).append(NEW_LINE); + manifestBuilder.append(GIT_TAG.attributeName).append(SEMICOLON) + .append(jarAttributes.get(GIT_TAG.getAttributeName())).append(NEW_LINE); + manifestBuilder.append(REACTOR_DOCS.attributeName).append(SEMICOLON) + .append(jarAttributes.get(REACTOR_DOCS.getAttributeName())).append(NEW_LINE); + return manifestBuilder.toString(); + } + + /** + * Enum that represents StreamReactor's important parameters from Manifest file. + */ + public enum ManifestAttributes { + REACTOR_VER("StreamReactor-Version"), + KAFKA_VER("Kafka-Version"), + GIT_REPO("Git-Repo"), + GIT_HASH("Git-Commit-Hash"), + GIT_TAG("Git-Tag"), + REACTOR_DOCS("StreamReactor-Docs"); + + private final String attributeName; + + ManifestAttributes(String attributeName) { + this.attributeName = attributeName; + } + + public String getAttributeName() { + return attributeName; + } + } +} diff --git a/java-connectors/kafka-connect-common/src/main/resources/logback.xml b/java-connectors/kafka-connect-common/src/main/resources/logback.xml new file mode 100644 index 000000000..920828a04 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/main/resources/logback.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + %d{ISO8601} %-5p [%t] [%c] [%M:%L] %m%n + + + + + + \ No newline at end of file diff --git a/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/AsciiArtPrinterTest.java b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/AsciiArtPrinterTest.java new file mode 100644 index 000000000..188d6ab13 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/AsciiArtPrinterTest.java @@ -0,0 +1,79 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; + +class AsciiArtPrinterTest { + + private static final String SOME_MANIFEST_DATA = "SOME_MANIFEST_DATA"; + private ListAppender logWatcher; + + @BeforeEach + void setUp() { + logWatcher = new ListAppender<>(); + logWatcher.start(); + ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(AsciiArtPrinter.class)).addAppender(logWatcher); + } + + @AfterEach + void teardown() { + ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(AsciiArtPrinter.class)).detachAndStopAllAppenders(); + } + + @Test + void printAsciiHeaderShouldPrintAsciiAndJarManifestIfBothValid() { + //given + JarManifest jarManifest = mock(JarManifest.class); + when(jarManifest.buildManifestString()).thenReturn(SOME_MANIFEST_DATA); + String testingAsciiArt = "/testingAsciiArt.txt"; + + //when + AsciiArtPrinter.printAsciiHeader(jarManifest, testingAsciiArt); + + //when + verify(jarManifest).buildManifestString(); + assertEquals(2, logWatcher.list.size()); + assertTrue(logWatcher.list.get(0).getFormattedMessage().startsWith("###########")); + assertTrue(logWatcher.list.get(1).getFormattedMessage().equals(SOME_MANIFEST_DATA)); + } + + @Test + void printAsciiHeaderShouldPrintOnlyJarManifestIfCouldntMakeInpurStreamToAscii() { + //given + JarManifest jarManifest = mock(JarManifest.class); + when(jarManifest.buildManifestString()).thenReturn(SOME_MANIFEST_DATA); + String testingAsciiArt = "/testingAsciiArta.txt"; + + //when + AsciiArtPrinter.printAsciiHeader(jarManifest, testingAsciiArt); + + //when + verify(jarManifest).buildManifestString(); + assertEquals(1, logWatcher.list.size()); + assertTrue(logWatcher.list.get(0).getFormattedMessage().equals(SOME_MANIFEST_DATA)); + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/JarManifestTest.java b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/JarManifestTest.java new file mode 100644 index 000000000..a1d7188bf --- /dev/null +++ b/java-connectors/kafka-connect-common/src/test/java/io/lenses/streamreactor/common/util/JarManifestTest.java @@ -0,0 +1,82 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.common.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.lenses.streamreactor.common.util.JarManifest.ManifestAttributes; +import java.io.IOException; +import java.util.jar.Attributes; +import java.util.jar.JarFile; +import java.util.jar.Manifest; +import org.junit.jupiter.api.Test; + +class JarManifestTest { + + private static final String UNKNOWN = "unknown"; + + JarManifest testObj; + + @Test + void getVersionShouldReturnStreamReactorVersionIfIncludedInManifest() throws IOException { + // given + final String STREAM_REACTOR_VERSION = "1.2.3"; + JarFile jarFile = mock(JarFile.class); + Manifest manifest = mock(Manifest.class); + Attributes attributes = mock(Attributes.class); + + when(jarFile.getManifest()).thenReturn(manifest); + when(manifest.getMainAttributes()).thenReturn(attributes); + when(attributes.getValue(ManifestAttributes.REACTOR_VER.getAttributeName())).thenReturn(STREAM_REACTOR_VERSION); + + testObj = new JarManifest(jarFile); + + // when + String streamReactorVersion = testObj.getVersion(); + + //then + verify(jarFile).getManifest(); + verify(manifest).getMainAttributes(); + verify(attributes).getValue(ManifestAttributes.REACTOR_VER.getAttributeName()); + assertEquals(streamReactorVersion, STREAM_REACTOR_VERSION); + } + + @Test + void getVersionShouldReturnUnknownVersionIfNotIncludedInManifest() throws IOException { + // given + JarFile jarFile = mock(JarFile.class); + Manifest manifest = mock(Manifest.class); + Attributes attributes = mock(Attributes.class); + + when(jarFile.getManifest()).thenReturn(manifest); + when(manifest.getMainAttributes()).thenReturn(attributes); + when(attributes.getValue(ManifestAttributes.REACTOR_VER.getAttributeName())).thenReturn(null); + + testObj = new JarManifest(jarFile); + + // when + String streamReactorVersion = testObj.getVersion(); + + //then + verify(jarFile).getManifest(); + verify(manifest).getMainAttributes(); + verify(attributes).getValue(ManifestAttributes.REACTOR_VER.getAttributeName()); + assertEquals(UNKNOWN, streamReactorVersion); + } +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-common/src/test/resources/testingAsciiArt.txt b/java-connectors/kafka-connect-common/src/test/resources/testingAsciiArt.txt new file mode 100644 index 000000000..9c9544df5 --- /dev/null +++ b/java-connectors/kafka-connect-common/src/test/resources/testingAsciiArt.txt @@ -0,0 +1,5 @@ +########### +##TESTING## +###ASCII### +####ART#### +########### \ No newline at end of file diff --git a/java-connectors/kafka-connect-query-language/.gitignore b/java-connectors/kafka-connect-query-language/.gitignore deleted file mode 100644 index 66712446c..000000000 --- a/java-connectors/kafka-connect-query-language/.gitignore +++ /dev/null @@ -1,28 +0,0 @@ -/docs/build -/*.iml -/*/sample.db -/.idea -/.gradle -/build -/configs -/*/build -*.class -*.log -temp - -# sbt specific -.cache -.history -.lib/ -dist/* -target/ -lib_managed/ -src_managed/ -project/boot/ -project/plugins/project/ - -# Scala-IDE specific -.scala_dependencies -.worksheet -src/main/java/com/datamountaineer/connector/config/antlr4/ -/src/main/java/com/datamountaineer/kcql/antlr4/ diff --git a/java-connectors/kafka-connect-query-language/build.gradle b/java-connectors/kafka-connect-query-language/build.gradle new file mode 100644 index 000000000..780a8e5ae --- /dev/null +++ b/java-connectors/kafka-connect-query-language/build.gradle @@ -0,0 +1,57 @@ +project(":kafka-connect-query-language") { + apply plugin: 'antlr' + + test { + maxParallelForks = 1 + } + + ext { + antlrVersion = "4.13.1" + } + + repositories { + mavenLocal() + mavenCentral() + } + + dependencies { + antlr group: 'org.antlr', name: 'antlr4', version: antlrVersion + implementation group: 'org.antlr', name: 'antlr4-runtime', version: antlrVersion + } + + generateGrammarSource { + arguments += ["-package", "io.lenses.kcql.antlr4"] + outputDirectory = new File("${project.projectDir}/build/generated/antlr/main/java/io/lenses/kcql/antlr4") + } + + sourceSets { + main.java.srcDirs += 'build/generated/antlr/main/java' + main.antlr.srcDirs = ['src/main/antlr4'] + } + + task sourcesJar(type: Jar) { + archiveClassifier = 'sources' + from sourceSets.main.allSource + } + + task javadocJar(type: Jar) { + archiveClassifier = 'javadoc' + from javadoc + } + + tasks.withType(Tar) { + compression Compression.GZIP + extension = 'tgz' + } + + + artifacts { + archives javadocJar, sourcesJar + } + + //order should be: generateGrammarSource -> updateLicenseMain -> compileJava + checkLicenseMain.dependsOn("generateGrammarSource") + + task compile(dependsOn: 'compileJava') + task fatJarNoTest(dependsOn: 'shadowJar') +} \ No newline at end of file diff --git a/java-connectors/kafka-connect-query-language/checkstyle/checkstyle.xml b/java-connectors/kafka-connect-query-language/checkstyle/checkstyle.xml deleted file mode 100644 index a46727da8..000000000 --- a/java-connectors/kafka-connect-query-language/checkstyle/checkstyle.xml +++ /dev/null @@ -1,86 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/java-connectors/settings.gradle b/java-connectors/settings.gradle new file mode 100644 index 000000000..842b4f643 --- /dev/null +++ b/java-connectors/settings.gradle @@ -0,0 +1,5 @@ +rootProject.name = 'java-reactor' +include 'kafka-connect-common', +'kafka-connect-azure-eventhubs', +'kafka-connect-query-language' +