diff --git a/.dockerignore b/.dockerignore index c182d1d56..f88740ad4 100644 --- a/.dockerignore +++ b/.dockerignore @@ -13,3 +13,12 @@ ui/.next .gitignore .github .gitmodules + + +build +*.jar +.idea +.gradle +.dockerignore +.sdkmanrc + diff --git a/.github/actions/genprotos/action.yml b/.github/actions/genprotos/action.yml index abedf058b..142856dc3 100644 --- a/.github/actions/genprotos/action.yml +++ b/.github/actions/genprotos/action.yml @@ -12,7 +12,7 @@ runs: ./flow/generated/protos ./nexus/pt/src/gen ./ui/grpc_generated - key: ${{ runner.os }}-build-genprotos-${{ hashFiles('./protos/peers.proto', './protos/flow.proto', './protos/route.proto') }} + key: ${{ runner.os }}-build-genprotos-${{ hashFiles('./protos/peers.proto', './protos/flow.proto', './protos/route.proto', './protos/flow-jvm.proto') }} - if: steps.cache.outputs.cache-hit != 'true' uses: actions/setup-go@v5 diff --git a/.github/workflows/flow-jvm-build.yml b/.github/workflows/flow-jvm-build.yml new file mode 100644 index 000000000..de6175baf --- /dev/null +++ b/.github/workflows/flow-jvm-build.yml @@ -0,0 +1,38 @@ +name: Build & Test Flow JVM + +on: + push: + branches: [main] + pull_request: + branches: [main] + paths: [flow-jvm/**, protos/**, .sdkmanrc] + +jobs: + build-test: + name: Build & Test Flow JVM + strategy: + matrix: + runner: [ubuntu-latest] + runs-on: ${{ matrix.runner }} + steps: + - name: checkout + uses: actions/checkout@v4 + + - uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '21' + cache: 'gradle' + + - name: Check dependency versions + run: | + java -version + gradle -version + working-directory: flow-jvm + - name: Download/Setup gradle wrapper + working-directory: flow-jvm + run: | + gradle wrapper + - name: Build + working-directory: flow-jvm + run: ./gradlew quarkusGenerateCode --stacktrace --info && ./gradlew build test --stacktrace --info diff --git a/.sdkmanrc b/.sdkmanrc new file mode 100644 index 000000000..64fb7e6bd --- /dev/null +++ b/.sdkmanrc @@ -0,0 +1,5 @@ +# Enable auto-env through the sdkman_auto_env config +# Add key=value pairs of SDKs to use below +java=21.0.3-tem +quarkus=3.11.3 +gradle=8.6 diff --git a/docker-bake.hcl b/docker-bake.hcl index 6e6098ca1..538e652c8 100644 --- a/docker-bake.hcl +++ b/docker-bake.hcl @@ -16,7 +16,8 @@ group "default" { "flow-worker", "flow-api", "flow-snapshot-worker", - "peerdb-ui" + "peerdb-ui", + "flow-jvm", ] } @@ -90,3 +91,17 @@ target "peerdb-ui" { "${REGISTRY}/peerdb-ui:${SHA_SHORT}", ] } + +target "flow-jvm" { + context = "." + dockerfile = "stacks/jvm.Dockerfile" + target = "runner" + platforms = [ + "linux/amd64", + "linux/arm64", + ] + tags = [ + "${REGISTRY}/flow-jvm:${TAG}", + "${REGISTRY}/flow-jvm:${SHA_SHORT}", + ] +} diff --git a/flow-jvm/.dockerignore b/flow-jvm/.dockerignore new file mode 100644 index 000000000..94b4e1666 --- /dev/null +++ b/flow-jvm/.dockerignore @@ -0,0 +1,13 @@ +#* +#!build/*-runner +#!build/*-runner.jar +#!build/lib/* +#!build/quarkus-app/* +build +*.jar +.idea +.gradle +src/main/docker +.dockerignore +.git +.sdkmanrc diff --git a/flow-jvm/.editorconfig b/flow-jvm/.editorconfig new file mode 100644 index 000000000..af4c496ae --- /dev/null +++ b/flow-jvm/.editorconfig @@ -0,0 +1,3 @@ +[*.java] +ij_java_names_count_to_use_import_on_demand = 999 +ij_java_class_count_to_use_import_on_demand = 999 diff --git a/flow-jvm/.gitattributes b/flow-jvm/.gitattributes new file mode 100644 index 000000000..4951bacfb --- /dev/null +++ b/flow-jvm/.gitattributes @@ -0,0 +1,3 @@ +gradlew linguist-generated=true +gradlew.bat linguist-generated=true + diff --git a/flow-jvm/.gitignore b/flow-jvm/.gitignore new file mode 100644 index 000000000..1c48a8073 --- /dev/null +++ b/flow-jvm/.gitignore @@ -0,0 +1,41 @@ +# Gradle +.gradle/ +build/ + +# Eclipse +.project +.classpath +.settings/ +bin/ + +# IntelliJ +.idea +*.ipr +*.iml +*.iws + +# NetBeans +nb-configuration.xml + +# Visual Studio Code +.vscode +.factorypath + +# OSX +.DS_Store + +# Vim +*.swp +*.swo + +# patch +*.orig +*.rej + +# Local environment +.env + +# Plugin directory +/.quarkus/cli/plugins/ + +*.jar diff --git a/flow-jvm/.sdkmanrc b/flow-jvm/.sdkmanrc new file mode 120000 index 000000000..f0da53ae9 --- /dev/null +++ b/flow-jvm/.sdkmanrc @@ -0,0 +1 @@ +../.sdkmanrc \ No newline at end of file diff --git a/flow-jvm/README.md b/flow-jvm/README.md new file mode 100644 index 000000000..44b67d48d --- /dev/null +++ b/flow-jvm/README.md @@ -0,0 +1,73 @@ +# flow-jvm + + +## Dependencies +Install sdkman and run `sdk env install` to setup the environment and run `gradle wrapper` to install gradle wrapper. + + + +This project uses Quarkus, the Supersonic Subatomic Java Framework. + +If you want to learn more about Quarkus, please visit its website: https://quarkus.io/ . + + +## Generate the java code from proto files +```shell script +./gradlew quarkusGenerateCode +``` + +or +```shell +./gradlew clean quarkusGenerateCode +``` + +## Running the application in dev mode (`quarkusGenerateCode` can be skipped) + +You can run your application in dev mode that enables live coding using: +```shell script +./gradlew quarkusDev +``` + +> **_NOTE:_** Quarkus now ships with a Dev UI, which is available in dev mode only at http://localhost:9801/q/dev/. + +## Packaging and running the application + +The application can be packaged using: +```shell script +./gradlew build +``` +It produces the `quarkus-run.jar` file in the `build/quarkus-app/` directory. +Be aware that it’s not an _über-jar_ as the dependencies are copied into the `build/quarkus-app/lib/` directory. + +The application is now runnable using `java -jar build/quarkus-app/quarkus-run.jar`. + +If you want to build an _über-jar_, execute the following command: +```shell script +./gradlew build -Dquarkus.package.jar.type=uber-jar +``` + +The application, packaged as an _über-jar_, is now runnable using `java -jar build/*-runner.jar`. + +## Creating a native executable + +You can create a native executable using: +```shell script +./gradlew build -Dquarkus.native.enabled=true +``` + +Or, if you don't have GraalVM installed, you can run the native executable build in a container using: +```shell script +./gradlew build -Dquarkus.native.enabled=true -Dquarkus.native.container-build=true +``` + +You can then execute your native executable with: `./build/flow-jvm-0.0.1-SNAPSHOT-runner` + +If you want to learn more about building native executables, please consult https://quarkus.io/guides/gradle-tooling. + +## Provided Code + +### REST + +Easily start your REST Web Services + +[Related guide section...](https://quarkus.io/guides/getting-started-reactive#reactive-jax-rs-resources) diff --git a/flow-jvm/build.gradle b/flow-jvm/build.gradle new file mode 100644 index 000000000..ae782c656 --- /dev/null +++ b/flow-jvm/build.gradle @@ -0,0 +1,112 @@ +plugins { + id 'java' + id 'io.quarkus' +} + +repositories { + mavenCentral() + mavenLocal() +} + +ext { + icebergLibVersion = '1.5.2' +} + + +dependencies { + implementation 'io.quarkus:quarkus-config-yaml' + implementation enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}") + implementation 'io.quarkus:quarkus-arc' + implementation 'io.quarkus:quarkus-grpc' + implementation 'io.quarkus:quarkus-rest' + implementation 'io.quarkus:quarkus-logging-json' + + // Logging adapter for dependencies, also prevents duplicate slf4j binding warnings https://quarkus.io/guides/logging#logging-apis + implementation("org.jboss.logging:commons-logging-jboss-logging") + implementation("org.jboss.logmanager:log4j-jboss-logmanager") + implementation("org.jboss.logmanager:log4j2-jboss-logmanager") + implementation("org.jboss.slf4j:slf4j-jboss-logmanager") + + implementation "org.apache.iceberg:iceberg-core:${icebergLibVersion}" + implementation "org.apache.iceberg:iceberg-common:${icebergLibVersion}" + implementation "org.apache.iceberg:iceberg-data:${icebergLibVersion}" + implementation "org.apache.iceberg:iceberg-parquet:${icebergLibVersion}" + + // This is forced due to version conflicts + implementation 'io.grpc:grpc-protobuf:1.63.0' + implementation 'com.google.protobuf:protobuf-java:4.27.1' + + implementation 'org.apache.avro:avro:1.11.3' + + implementation 'org.apache.hadoop:hadoop-client:3.4.0' + implementation 'org.apache.hadoop:hadoop-common:3.4.0' + + // Drivers for JDBC Catalogs + runtimeOnly 'org.postgresql:postgresql:42.7.3' + + // AWS Dependencies + implementation "org.apache.iceberg:iceberg-aws:${icebergLibVersion}" + runtimeOnly "org.apache.iceberg:iceberg-aws-bundle:${icebergLibVersion}" +// runtimeOnly 'software.amazon.awssdk:bundle:2.25.60' + +// // HIVE4 Dependencies +// implementation 'org.apache.hive:hive-iceberg-catalog:4.0.0' +// // DO NOT USE THE BELOW DEPENDENCIES https://github.com/apache/iceberg/issues/10429 +//// implementation "org.apache.iceberg:iceberg-hive-metastore:${icebergLibVersion}" +//// runtimeOnly "org.apache.hive:hive-metastore:4.0.0" + + // HIVE 3 Dependencies + implementation "org.apache.iceberg:iceberg-hive-metastore:${icebergLibVersion}" + runtimeOnly "org.apache.hive:hive-metastore:3.1.3" + + + + + // GCP Dependencies + implementation "org.apache.iceberg:iceberg-gcp:${icebergLibVersion}" + // This is currently causing issues with GRPC versions mismatch + runtimeOnly "org.apache.iceberg:iceberg-gcp-bundle:${icebergLibVersion}" + + + testImplementation 'io.quarkus:quarkus-junit5' + testImplementation 'io.rest-assured:rest-assured' + +} + + +configurations.configureEach { +// // ch.qos.logback:logback-core +// exclude group: 'ch.qos.logback', module: 'logback-core' + // ch.qos.logback:logback-classic + exclude group: 'ch.qos.logback', module: 'logback-classic' + // org.apache.logging.log4j:log4j-slf4j-impl + exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j-impl' + // org.slf4j:slf4j-reload4j + exclude group: 'org.slf4j', module: 'slf4j-reload4j' +} + + +group 'io.peerdb' +version '0.0.1-SNAPSHOT' + +java { + sourceCompatibility = JavaVersion.VERSION_21 + targetCompatibility = JavaVersion.VERSION_21 +} + +test { + systemProperty "java.util.logging.manager", "org.jboss.logmanager.LogManager" +} +compileJava { + options.encoding = 'UTF-8' + options.compilerArgs << '-parameters' +} + +compileTestJava { + options.encoding = 'UTF-8' +} + + +quarkus { + quarkusBuildProperties.put("quarkus.grpc.codegen.proto-directory", "${project.projectDir}/../protos") +} diff --git a/flow-jvm/gradle.properties b/flow-jvm/gradle.properties new file mode 100644 index 000000000..0225dfbc1 --- /dev/null +++ b/flow-jvm/gradle.properties @@ -0,0 +1,7 @@ +#Gradle properties +#Fri May 24 04:59:17 IST 2024 +quarkusPlatformArtifactId=quarkus-bom +quarkusPlatformGroupId=io.quarkus.platform +quarkusPlatformVersion=3.11.3 +quarkusPluginId=io.quarkus +quarkusPluginVersion=3.11.3 diff --git a/flow-jvm/gradle/wrapper/gradle-wrapper.properties b/flow-jvm/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 000000000..a80b22ce5 --- /dev/null +++ b/flow-jvm/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/flow-jvm/gradlew b/flow-jvm/gradlew new file mode 100755 index 000000000..1aa94a426 --- /dev/null +++ b/flow-jvm/gradlew @@ -0,0 +1,249 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/flow-jvm/gradlew.bat b/flow-jvm/gradlew.bat new file mode 100644 index 000000000..7101f8e46 --- /dev/null +++ b/flow-jvm/gradlew.bat @@ -0,0 +1,92 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/flow-jvm/settings.gradle b/flow-jvm/settings.gradle new file mode 100644 index 000000000..2969c7626 --- /dev/null +++ b/flow-jvm/settings.gradle @@ -0,0 +1,11 @@ +pluginManagement { + repositories { + mavenCentral() + gradlePluginPortal() + mavenLocal() + } + plugins { + id "${quarkusPluginId}" version "${quarkusPluginVersion}" + } +} +rootProject.name='flow-jvm' diff --git a/flow-jvm/src/main/docker/Dockerfile.jvm b/flow-jvm/src/main/docker/Dockerfile.jvm new file mode 100644 index 000000000..37d52c961 --- /dev/null +++ b/flow-jvm/src/main/docker/Dockerfile.jvm @@ -0,0 +1,97 @@ +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode +# +# Before building the container image run: +# +# ./gradlew build +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/flow-jvm-jvm . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/flow-jvm-jvm +# +# If you want to include the debug port into your docker image +# you will have to expose the debug port (default 5005 being the default) like this : EXPOSE 8080 5005. +# Additionally you will have to set -e JAVA_DEBUG=true and -e JAVA_DEBUG_PORT=*:5005 +# when running the container +# +# Then run the container using : +# +# docker run -i --rm -p 8080:8080 quarkus/flow-jvm-jvm +# +# This image uses the `run-java.sh` script to run the application. +# This scripts computes the command line to execute your Java application, and +# includes memory/GC tuning. +# You can configure the behavior using the following environment properties: +# - JAVA_OPTS: JVM options passed to the `java` command (example: "-verbose:class") +# - JAVA_OPTS_APPEND: User specified Java options to be appended to generated options +# in JAVA_OPTS (example: "-Dsome.property=foo") +# - JAVA_MAX_MEM_RATIO: Is used when no `-Xmx` option is given in JAVA_OPTS. This is +# used to calculate a default maximal heap memory based on a containers restriction. +# If used in a container without any memory constraints for the container then this +# option has no effect. If there is a memory constraint then `-Xmx` is set to a ratio +# of the container available memory as set here. The default is `50` which means 50% +# of the available memory is used as an upper boundary. You can skip this mechanism by +# setting this value to `0` in which case no `-Xmx` option is added. +# - JAVA_INITIAL_MEM_RATIO: Is used when no `-Xms` option is given in JAVA_OPTS. This +# is used to calculate a default initial heap memory based on the maximum heap memory. +# If used in a container without any memory constraints for the container then this +# option has no effect. If there is a memory constraint then `-Xms` is set to a ratio +# of the `-Xmx` memory as set here. The default is `25` which means 25% of the `-Xmx` +# is used as the initial heap size. You can skip this mechanism by setting this value +# to `0` in which case no `-Xms` option is added (example: "25") +# - JAVA_MAX_INITIAL_MEM: Is used when no `-Xms` option is given in JAVA_OPTS. +# This is used to calculate the maximum value of the initial heap memory. If used in +# a container without any memory constraints for the container then this option has +# no effect. If there is a memory constraint then `-Xms` is limited to the value set +# here. The default is 4096MB which means the calculated value of `-Xms` never will +# be greater than 4096MB. The value of this variable is expressed in MB (example: "4096") +# - JAVA_DIAGNOSTICS: Set this to get some diagnostics information to standard output +# when things are happening. This option, if set to true, will set +# `-XX:+UnlockDiagnosticVMOptions`. Disabled by default (example: "true"). +# - JAVA_DEBUG: If set remote debugging will be switched on. Disabled by default (example: +# true"). +# - JAVA_DEBUG_PORT: Port used for remote debugging. Defaults to 5005 (example: "8787"). +# - CONTAINER_CORE_LIMIT: A calculated core limit as described in +# https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt. (example: "2") +# - CONTAINER_MAX_MEMORY: Memory limit given to the container (example: "1024"). +# - GC_MIN_HEAP_FREE_RATIO: Minimum percentage of heap free after GC to avoid expansion. +# (example: "20") +# - GC_MAX_HEAP_FREE_RATIO: Maximum percentage of heap free after GC to avoid shrinking. +# (example: "40") +# - GC_TIME_RATIO: Specifies the ratio of the time spent outside the garbage collection. +# (example: "4") +# - GC_ADAPTIVE_SIZE_POLICY_WEIGHT: The weighting given to the current GC time versus +# previous GC times. (example: "90") +# - GC_METASPACE_SIZE: The initial metaspace size. (example: "20") +# - GC_MAX_METASPACE_SIZE: The maximum metaspace size. (example: "100") +# - GC_CONTAINER_OPTIONS: Specify Java GC to use. The value of this variable should +# contain the necessary JRE command-line options to specify the required GC, which +# will override the default of `-XX:+UseParallelGC` (example: -XX:+UseG1GC). +# - HTTPS_PROXY: The location of the https proxy. (example: "myuser@127.0.0.1:8080") +# - HTTP_PROXY: The location of the http proxy. (example: "myuser@127.0.0.1:8080") +# - NO_PROXY: A comma separated lists of hosts, IP addresses or domains that can be +# accessed directly. (example: "foo.example.com,bar.example.com") +# +### +FROM registry.access.redhat.com/ubi8/openjdk-21:1.19 + +ENV LANGUAGE='en_US:en' + + +# We make four distinct layers so if there are application changes the library layers can be re-used +COPY --chown=185 build/quarkus-app/lib/ /deployments/lib/ +COPY --chown=185 build/quarkus-app/*.jar /deployments/ +COPY --chown=185 build/quarkus-app/app/ /deployments/app/ +COPY --chown=185 build/quarkus-app/quarkus/ /deployments/quarkus/ + +EXPOSE 8080 +USER 185 +ENV JAVA_OPTS_APPEND="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager" +ENV JAVA_APP_JAR="/deployments/quarkus-run.jar" + +ENTRYPOINT [ "/opt/jboss/container/java/run/run-java.sh" ] + diff --git a/flow-jvm/src/main/docker/Dockerfile.legacy-jar b/flow-jvm/src/main/docker/Dockerfile.legacy-jar new file mode 100644 index 000000000..439a10a1c --- /dev/null +++ b/flow-jvm/src/main/docker/Dockerfile.legacy-jar @@ -0,0 +1,93 @@ +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode +# +# Before building the container image run: +# +# ./gradlew build -Dquarkus.package.jar.type=legacy-jar +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.legacy-jar -t quarkus/flow-jvm-legacy-jar . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/flow-jvm-legacy-jar +# +# If you want to include the debug port into your docker image +# you will have to expose the debug port (default 5005 being the default) like this : EXPOSE 8080 5005. +# Additionally you will have to set -e JAVA_DEBUG=true and -e JAVA_DEBUG_PORT=*:5005 +# when running the container +# +# Then run the container using : +# +# docker run -i --rm -p 8080:8080 quarkus/flow-jvm-legacy-jar +# +# This image uses the `run-java.sh` script to run the application. +# This scripts computes the command line to execute your Java application, and +# includes memory/GC tuning. +# You can configure the behavior using the following environment properties: +# - JAVA_OPTS: JVM options passed to the `java` command (example: "-verbose:class") +# - JAVA_OPTS_APPEND: User specified Java options to be appended to generated options +# in JAVA_OPTS (example: "-Dsome.property=foo") +# - JAVA_MAX_MEM_RATIO: Is used when no `-Xmx` option is given in JAVA_OPTS. This is +# used to calculate a default maximal heap memory based on a containers restriction. +# If used in a container without any memory constraints for the container then this +# option has no effect. If there is a memory constraint then `-Xmx` is set to a ratio +# of the container available memory as set here. The default is `50` which means 50% +# of the available memory is used as an upper boundary. You can skip this mechanism by +# setting this value to `0` in which case no `-Xmx` option is added. +# - JAVA_INITIAL_MEM_RATIO: Is used when no `-Xms` option is given in JAVA_OPTS. This +# is used to calculate a default initial heap memory based on the maximum heap memory. +# If used in a container without any memory constraints for the container then this +# option has no effect. If there is a memory constraint then `-Xms` is set to a ratio +# of the `-Xmx` memory as set here. The default is `25` which means 25% of the `-Xmx` +# is used as the initial heap size. You can skip this mechanism by setting this value +# to `0` in which case no `-Xms` option is added (example: "25") +# - JAVA_MAX_INITIAL_MEM: Is used when no `-Xms` option is given in JAVA_OPTS. +# This is used to calculate the maximum value of the initial heap memory. If used in +# a container without any memory constraints for the container then this option has +# no effect. If there is a memory constraint then `-Xms` is limited to the value set +# here. The default is 4096MB which means the calculated value of `-Xms` never will +# be greater than 4096MB. The value of this variable is expressed in MB (example: "4096") +# - JAVA_DIAGNOSTICS: Set this to get some diagnostics information to standard output +# when things are happening. This option, if set to true, will set +# `-XX:+UnlockDiagnosticVMOptions`. Disabled by default (example: "true"). +# - JAVA_DEBUG: If set remote debugging will be switched on. Disabled by default (example: +# true"). +# - JAVA_DEBUG_PORT: Port used for remote debugging. Defaults to 5005 (example: "8787"). +# - CONTAINER_CORE_LIMIT: A calculated core limit as described in +# https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt. (example: "2") +# - CONTAINER_MAX_MEMORY: Memory limit given to the container (example: "1024"). +# - GC_MIN_HEAP_FREE_RATIO: Minimum percentage of heap free after GC to avoid expansion. +# (example: "20") +# - GC_MAX_HEAP_FREE_RATIO: Maximum percentage of heap free after GC to avoid shrinking. +# (example: "40") +# - GC_TIME_RATIO: Specifies the ratio of the time spent outside the garbage collection. +# (example: "4") +# - GC_ADAPTIVE_SIZE_POLICY_WEIGHT: The weighting given to the current GC time versus +# previous GC times. (example: "90") +# - GC_METASPACE_SIZE: The initial metaspace size. (example: "20") +# - GC_MAX_METASPACE_SIZE: The maximum metaspace size. (example: "100") +# - GC_CONTAINER_OPTIONS: Specify Java GC to use. The value of this variable should +# contain the necessary JRE command-line options to specify the required GC, which +# will override the default of `-XX:+UseParallelGC` (example: -XX:+UseG1GC). +# - HTTPS_PROXY: The location of the https proxy. (example: "myuser@127.0.0.1:8080") +# - HTTP_PROXY: The location of the http proxy. (example: "myuser@127.0.0.1:8080") +# - NO_PROXY: A comma separated lists of hosts, IP addresses or domains that can be +# accessed directly. (example: "foo.example.com,bar.example.com") +# +### +FROM registry.access.redhat.com/ubi8/openjdk-21:1.19 + +ENV LANGUAGE='en_US:en' + + +COPY build/lib/* /deployments/lib/ +COPY build/*-runner.jar /deployments/quarkus-run.jar + +EXPOSE 8080 +USER 185 +ENV JAVA_OPTS_APPEND="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager" +ENV JAVA_APP_JAR="/deployments/quarkus-run.jar" + +ENTRYPOINT [ "/opt/jboss/container/java/run/run-java.sh" ] diff --git a/flow-jvm/src/main/docker/Dockerfile.native b/flow-jvm/src/main/docker/Dockerfile.native new file mode 100644 index 000000000..096c071eb --- /dev/null +++ b/flow-jvm/src/main/docker/Dockerfile.native @@ -0,0 +1,27 @@ +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode. +# +# Before building the container image run: +# +# ./gradlew build -Dquarkus.native.enabled=true +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.native -t quarkus/flow-jvm . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/flow-jvm +# +### +FROM registry.access.redhat.com/ubi8/ubi-minimal:8.9 +WORKDIR /work/ +RUN chown 1001 /work \ + && chmod "g+rwX" /work \ + && chown 1001:root /work +COPY --chown=1001:root build/*-runner /work/application + +EXPOSE 8080 +USER 1001 + +ENTRYPOINT ["./application", "-Dquarkus.http.host=0.0.0.0"] diff --git a/flow-jvm/src/main/docker/Dockerfile.native-micro b/flow-jvm/src/main/docker/Dockerfile.native-micro new file mode 100644 index 000000000..6842a77cf --- /dev/null +++ b/flow-jvm/src/main/docker/Dockerfile.native-micro @@ -0,0 +1,30 @@ +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode. +# It uses a micro base image, tuned for Quarkus native executables. +# It reduces the size of the resulting container image. +# Check https://quarkus.io/guides/quarkus-runtime-base-image for further information about this image. +# +# Before building the container image run: +# +# ./gradlew build -Dquarkus.native.enabled=true +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.native-micro -t quarkus/flow-jvm . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/flow-jvm +# +### +FROM quay.io/quarkus/quarkus-micro-image:2.0 +WORKDIR /work/ +RUN chown 1001 /work \ + && chmod "g+rwX" /work \ + && chown 1001:root /work +COPY --chown=1001:root build/*-runner /work/application + +EXPOSE 8080 +USER 1001 + +ENTRYPOINT ["./application", "-Dquarkus.http.host=0.0.0.0"] diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/DefaultExceptionHandlerProvider.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/DefaultExceptionHandlerProvider.java new file mode 100644 index 000000000..c332c2d18 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/DefaultExceptionHandlerProvider.java @@ -0,0 +1,47 @@ +package io.peerdb.flow.jvm; + +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.quarkus.grpc.ExceptionHandler; +import io.quarkus.grpc.ExceptionHandlerProvider; +import io.quarkus.logging.Log; +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class DefaultExceptionHandlerProvider implements ExceptionHandlerProvider { + public static boolean invoked; + + private static Exception toStatusException(Throwable t) { + return Status.fromThrowable(t).withDescription(t.getMessage()).asRuntimeException(); + } + + @Override + public ExceptionHandler createHandler(ServerCall.Listener listener, + ServerCall serverCall, Metadata metadata) { + return new DefaultExceptionHandler<>(listener, serverCall, metadata); + } + + @Override + public Throwable transform(Throwable t) { + invoked = true; + Log.errorf(t, "Received error in gRPC call: '%s'", t.getMessage()); + return toStatusException(t); + } + + private static class DefaultExceptionHandler extends ExceptionHandler { + public DefaultExceptionHandler(ServerCall.Listener listener, ServerCall call, Metadata metadata) { + super(listener, call, metadata); + } + + @Override + protected void handleException(Throwable t, ServerCall call, Metadata metadata) { + invoked = true; + StatusRuntimeException sre = (StatusRuntimeException) ExceptionHandlerProvider.toStatusException(t, true); + Metadata trailers = sre.getTrailers() != null ? sre.getTrailers() : metadata; + call.close(sre.getStatus(), trailers); + } + } +} + diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/RequestLoggingInterceptor.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/RequestLoggingInterceptor.java new file mode 100644 index 000000000..ed18b809e --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/RequestLoggingInterceptor.java @@ -0,0 +1,53 @@ +package io.peerdb.flow.jvm; + + +import com.google.common.base.Stopwatch; +import io.grpc.ForwardingServerCall; +import io.grpc.ForwardingServerCallListener; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.quarkus.grpc.GlobalInterceptor; +import io.quarkus.logging.Log; +import jakarta.enterprise.context.ApplicationScoped; + +import java.util.concurrent.TimeUnit; + +@ApplicationScoped +@GlobalInterceptor +public class RequestLoggingInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata, ServerCallHandler serverCallHandler) { + var stopwatch = Stopwatch.createStarted(); + Log.infof("Received request for method {%s}", serverCall.getMethodDescriptor().getFullMethodName()); + ServerCall listener = new ForwardingServerCall.SimpleForwardingServerCall<>(serverCall) { + }; + return new CallListener<>(serverCallHandler, listener, metadata, serverCall, stopwatch); + + } + + private static class CallListener extends ForwardingServerCallListener.SimpleForwardingServerCallListener { + + private final ServerCall serverCall; + private final Stopwatch stopwatch; + + public CallListener(ServerCallHandler serverCallHandler, ServerCall listener, Metadata metadata, ServerCall serverCall, Stopwatch stopwatch) { + super(serverCallHandler.startCall(listener, metadata)); + this.serverCall = serverCall; + this.stopwatch = stopwatch; + } + + @Override + public void onMessage(ReqT message) { + super.onMessage(message); + } + + @Override + public void onComplete() { + Log.infof("Call completed for method: {%s} in %d ms", serverCall.getMethodDescriptor().getFullMethodName(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + super.onComplete(); + } + + } +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/avro/AvroIcebergRecordConverter.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/avro/AvroIcebergRecordConverter.java new file mode 100644 index 000000000..9c9407c44 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/avro/AvroIcebergRecordConverter.java @@ -0,0 +1,43 @@ +package io.peerdb.flow.jvm.iceberg.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.io.DecoderFactory; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.avro.DataReader; + +import java.io.IOException; + +public class AvroIcebergRecordConverter { + private final org.apache.iceberg.Schema icebergSchema; + private final Schema icebergAvroSchema; + private final DataReader dataReader; + + public AvroIcebergRecordConverter(String avroSchemaString, org.apache.iceberg.Schema icebergSchema, String tableName) { + this(new Schema.Parser().parse(avroSchemaString), icebergSchema, tableName); + + } + + public AvroIcebergRecordConverter(Schema sourceAvroSchema, org.apache.iceberg.Schema icebergSchema, String tableName) { + this.icebergSchema = icebergSchema; + this.icebergAvroSchema = AvroSchemaUtil.convert(icebergSchema, tableName); + this.dataReader = DataReader.create(icebergSchema, icebergAvroSchema); + this.dataReader.setSchema(sourceAvroSchema); + + } + + public org.apache.iceberg.data.GenericRecord toIcebergRecord(byte[] avroBytes) throws IOException { + try (var byteStream = new SeekableByteArrayInput(avroBytes)) { + var binaryDecoder = DecoderFactory.get().binaryDecoder(byteStream, null); + return this.dataReader.read(null, binaryDecoder); + } + } + + public org.apache.iceberg.Schema getIcebergSchema() { + return icebergSchema; + } + + public Schema getIcebergAvroSchema() { + return icebergAvroSchema; + } +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/CatalogLoader.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/CatalogLoader.java new file mode 100644 index 000000000..b9377fc31 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/CatalogLoader.java @@ -0,0 +1,64 @@ +package io.peerdb.flow.jvm.iceberg.catalog; + + +import io.peerdb.flow.jvm.iceberg.catalog.io.mapper.GCSIOConfigMapper; +import io.peerdb.flow.jvm.iceberg.catalog.io.mapper.S3IOConfigMapper; +import io.peerdb.flow.jvm.iceberg.catalog.mapper.HiveConfigMapper; +import io.peerdb.flow.jvm.iceberg.catalog.mapper.JdbcCatalogMapper; +import io.peerdb.flow.peers.IcebergCatalog; +import io.quarkus.logging.Log; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; + +import java.util.Collections; +import java.util.Map; + + +@Singleton +public class CatalogLoader { + @Inject + HiveConfigMapper hiveConfigMapper; + + @Inject + JdbcCatalogMapper jdbcCatalogMapper; + + @Inject + S3IOConfigMapper s3IOConfigMapper; + + @Inject + GCSIOConfigMapper gcsIOConfigMapper; + + + public Catalog loadCatalog(IcebergCatalog icebergCatalogConfig) { + var icebergIOConfig = icebergCatalogConfig.getIoConfig(); + var fileIoConfig = switch (icebergIOConfig.getConfigCase()) { + case S3 -> s3IOConfigMapper.map(icebergIOConfig.getS3()); + case GCS -> gcsIOConfigMapper.map(icebergIOConfig.getGcs()); + default -> { + Log.errorf("Unexpected value for file io config: %s", icebergIOConfig.getConfigCase()); + yield Collections.emptyMap(); + } + }; + + var catalogConfig = switch (icebergCatalogConfig.getConfigCase()) { + case HIVE -> + hiveConfigMapper.map(icebergCatalogConfig.getCommonConfig(), icebergCatalogConfig.getHive(), fileIoConfig); + case JDBC -> + jdbcCatalogMapper.map(icebergCatalogConfig.getCommonConfig(), icebergCatalogConfig.getJdbc(), fileIoConfig); + default -> + throw new IllegalArgumentException("Unexpected value for catalog config: " + icebergCatalogConfig.getConfigCase()); + }; + var hadoopConfiguration = getHadoopConfiguration(icebergCatalogConfig.getCommonConfig().getHadoopPropertiesMap()); + return CatalogUtil.buildIcebergCatalog(icebergCatalogConfig.getCommonConfig().getName(), catalogConfig, hadoopConfiguration); + } + + private Configuration getHadoopConfiguration(Map hadoopConfig) { + var conf = new Configuration(); + hadoopConfig.forEach(conf::set); + return conf; + } + +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/FileIOConfigMapper.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/FileIOConfigMapper.java new file mode 100644 index 000000000..c62acc288 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/FileIOConfigMapper.java @@ -0,0 +1,28 @@ +package io.peerdb.flow.jvm.iceberg.catalog.io.mapper; + + +import io.quarkus.logging.Log; +import org.apache.iceberg.CatalogProperties; + +import java.util.HashMap; +import java.util.Map; + +public abstract class FileIOConfigMapper { + + protected abstract Map mapSpecific(T config); + + protected Map mapCommon() { + return Map.of( + CatalogProperties.FILE_IO_IMPL, this.implementationClass() + ); + } + + public final Map map(T config) { + var map = new HashMap<>(mapCommon()); + map.putAll(this.mapSpecific(config)); + Log.debugf("Mapped IO config: %s", map); + return map; + } + + public abstract String implementationClass(); +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/FixedS3FileIO.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/FixedS3FileIO.java new file mode 100644 index 000000000..4658a1cb6 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/FixedS3FileIO.java @@ -0,0 +1,10 @@ +package io.peerdb.flow.jvm.iceberg.catalog.io.mapper; + +import org.apache.iceberg.aws.s3.S3FileIO; + +/** + * TODO This class should act as a delegate to the S3FileIO class, temporarily fixing till it is fixed upstream. + * Maybe a way to use it is to use Lombok's @Delegate functionality. + */ +public class FixedS3FileIO extends S3FileIO { +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/GCSIOConfigMapper.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/GCSIOConfigMapper.java new file mode 100644 index 000000000..c84abba72 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/GCSIOConfigMapper.java @@ -0,0 +1,24 @@ +package io.peerdb.flow.jvm.iceberg.catalog.io.mapper; + +import com.google.common.collect.ImmutableMap; +import io.peerdb.flow.peers.IcebergGCSIoConfig; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.iceberg.gcp.GCPProperties; + +import java.util.Map; + +@ApplicationScoped +public class GCSIOConfigMapper extends FileIOConfigMapper { + @Override + protected Map mapSpecific(IcebergGCSIoConfig config) { + // TODO complete this + var builder = ImmutableMap.builder() + .put(GCPProperties.GCS_PROJECT_ID, config.getProjectId()); + return builder.build(); + } + + @Override + public String implementationClass() { + return "org.apache.iceberg.gcp.gcs.GCSFileIO"; + } +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/S3IOConfigMapper.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/S3IOConfigMapper.java new file mode 100644 index 000000000..54a2c9045 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/io/mapper/S3IOConfigMapper.java @@ -0,0 +1,34 @@ +package io.peerdb.flow.jvm.iceberg.catalog.io.mapper; + +import com.google.common.collect.ImmutableMap; +import io.peerdb.flow.peers.IcebergS3IoConfig; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.iceberg.aws.s3.S3FileIOProperties; + +import java.util.Map; + +@ApplicationScoped +public class S3IOConfigMapper extends FileIOConfigMapper { + @Override + protected Map mapSpecific(IcebergS3IoConfig config) { + var builder = ImmutableMap.builder() + .put(S3FileIOProperties.ACCESS_KEY_ID, config.getAccessKeyId()) + .put(S3FileIOProperties.SECRET_ACCESS_KEY, config.getSecretAccessKey()); + if (config.hasEndpoint()) { + builder.put(S3FileIOProperties.ENDPOINT, config.getEndpoint()); + } + if (config.hasPathStyleAccess()) { + builder.put(S3FileIOProperties.PATH_STYLE_ACCESS, String.valueOf(config.getPathStyleAccess())); + } + if (config.hasCrossRegionAccessEnabled()) { + // This only works when https://github.com/apache/iceberg/issues/9785 is fixed or {@link io.peerdb.flow.jvm.iceberg.catalog.io.mapper.FixedS3FileIO} is added + builder.put("s3.cross-region-access-enabled", config.getCrossRegionAccessEnabled()); + } + return builder.build(); + } + + @Override + public String implementationClass() { + return "org.apache.iceberg.aws.s3.S3FileIO"; + } +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/mapper/CatalogConfigMapper.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/mapper/CatalogConfigMapper.java new file mode 100644 index 000000000..52adaaa9b --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/mapper/CatalogConfigMapper.java @@ -0,0 +1,40 @@ +package io.peerdb.flow.jvm.iceberg.catalog.mapper; + + +import com.google.common.collect.ImmutableMap; +import io.peerdb.flow.peers.CommonIcebergCatalog; +import io.quarkus.logging.Log; +import org.apache.iceberg.CatalogProperties; + +import java.util.HashMap; +import java.util.Map; + +public abstract class CatalogConfigMapper { + protected Map mapCommon(CommonIcebergCatalog config) { + var builder = ImmutableMap.builder() + .put(CatalogProperties.URI, config.getUri()) + .put(CatalogProperties.WAREHOUSE_LOCATION, config.getWarehouseLocation()) + .put(CatalogProperties.CATALOG_IMPL, this.implementationClass()); + if (config.hasClientPoolSize()) { + builder.put(CatalogProperties.CLIENT_POOL_SIZE, String.valueOf(config.getClientPoolSize())); + } + if (config.hasCacheEnabled()) { + builder.put(CatalogProperties.CACHE_ENABLED, String.valueOf(config.getCacheEnabled())); + } +// builder.putAll(config.getAdditionalPropertiesMap()); + return builder.build(); + } + + public Map map(CommonIcebergCatalog commonConfig, T config, Map fileIoConfig) { + var map = new HashMap<>(mapCommon(commonConfig)); + map.putAll(this.mapSpecific(config)); + map.putAll(fileIoConfig); + Log.debugf("Mapped catalog config: %s", map); + return map; + } + + + protected abstract Map mapSpecific(T config); + + public abstract String implementationClass(); +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/mapper/HiveConfigMapper.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/mapper/HiveConfigMapper.java new file mode 100644 index 000000000..67f9a431b --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/mapper/HiveConfigMapper.java @@ -0,0 +1,29 @@ +package io.peerdb.flow.jvm.iceberg.catalog.mapper; + +import com.google.common.collect.ImmutableMap; +import io.peerdb.flow.peers.HiveIcebergCatalog; +import jakarta.inject.Singleton; +import org.apache.iceberg.CatalogUtil; + +import java.util.Map; + + +/** + * This requires the underlying thrift connection like thrift://localhost:9083 + */ + +@Singleton +public class HiveConfigMapper extends CatalogConfigMapper { + @Override + protected Map mapSpecific(HiveIcebergCatalog config) { + return ImmutableMap.builder() + // TODO add these if needed +// .put(HiveCatalog.HMS_DB_OWNER, "hive") + .build(); + } + + @Override + public String implementationClass() { + return CatalogUtil.ICEBERG_CATALOG_HIVE; + } +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/mapper/JdbcCatalogMapper.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/mapper/JdbcCatalogMapper.java new file mode 100644 index 000000000..9e759a752 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/catalog/mapper/JdbcCatalogMapper.java @@ -0,0 +1,29 @@ +package io.peerdb.flow.jvm.iceberg.catalog.mapper; + +import io.peerdb.flow.peers.JdbcIcebergCatalog; +import jakarta.inject.Singleton; +import org.apache.iceberg.jdbc.JdbcCatalog; + +import java.util.Map; +import java.util.stream.Collectors; + + +@Singleton +public class JdbcCatalogMapper extends CatalogConfigMapper { + + @Override + protected Map mapSpecific(JdbcIcebergCatalog config) { + return Map.of( + "user", config.getUser(), + "password", config.getPassword(), + "useSSL", config.hasUseSsl() ? String.valueOf(config.getUseSsl()) : "true", + "verifyServerCertificate", config.hasVerifyServerCertificate() ? String.valueOf(config.getVerifyServerCertificate()) : "false" + + ).entrySet().stream().collect(Collectors.toMap(e -> JdbcCatalog.PROPERTY_PREFIX + e.getKey(), Map.Entry::getValue)); + } + + @Override + public String implementationClass() { + return JdbcCatalog.class.getName(); + } +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/exceptions/AppendAlreadyDoneException.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/exceptions/AppendAlreadyDoneException.java new file mode 100644 index 000000000..0ee5fc6e6 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/exceptions/AppendAlreadyDoneException.java @@ -0,0 +1,11 @@ +package io.peerdb.flow.jvm.iceberg.exceptions; + +public class AppendAlreadyDoneException extends RuntimeException { + public AppendAlreadyDoneException() { + super(); + } + + public AppendAlreadyDoneException(String message) { + super(message); + } +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/lock/LockManager.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/lock/LockManager.java new file mode 100644 index 000000000..e5e4f95e3 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/lock/LockManager.java @@ -0,0 +1,21 @@ +package io.peerdb.flow.jvm.iceberg.lock; + + +import com.google.common.util.concurrent.Striped; +import jakarta.enterprise.context.ApplicationScoped; + +import java.util.concurrent.locks.Lock; + +@ApplicationScoped +public class LockManager { + int stripeCount = 10_000; + + Striped locker = Striped.lock(stripeCount); + + + // This is just abstracted out to enable changing the lock implementation in the future + public Lock newLock(Object key) { + return locker.get(key); + } + +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/resource/IcebergResource.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/resource/IcebergResource.java new file mode 100644 index 000000000..a47c2d876 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/resource/IcebergResource.java @@ -0,0 +1,101 @@ +package io.peerdb.flow.jvm.iceberg.resource; + +import io.peerdb.flow.jvm.grpc.AppendRecordsRequest; +import io.peerdb.flow.jvm.grpc.AppendRecordsResponse; +import io.peerdb.flow.jvm.grpc.AppendRecordsStreamRequest; +import io.peerdb.flow.jvm.grpc.AppendRecordsStreamResponse; +import io.peerdb.flow.jvm.grpc.CountRecordRequest; +import io.peerdb.flow.jvm.grpc.CountRecordResponse; +import io.peerdb.flow.jvm.grpc.CreateTableRequest; +import io.peerdb.flow.jvm.grpc.CreateTableResponse; +import io.peerdb.flow.jvm.grpc.DropTableRequest; +import io.peerdb.flow.jvm.grpc.DropTableResponse; +import io.peerdb.flow.jvm.grpc.IcebergProxyService; +import io.peerdb.flow.jvm.grpc.InsertChangesRequest; +import io.peerdb.flow.jvm.grpc.InsertChangesResponse; +import io.peerdb.flow.jvm.iceberg.service.IcebergService; +import io.quarkus.grpc.GrpcService; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.common.annotation.RunOnVirtualThread; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import jakarta.inject.Inject; + +import java.util.Optional; + +@GrpcService +public class IcebergResource implements IcebergProxyService { + @Inject + IcebergService icebergService; + + @RunOnVirtualThread + @Override + public Uni createTable(CreateTableRequest request) { + return Uni.createFrom().item(() -> + CreateTableResponse.newBuilder() + .setTableName( + icebergService.createTable(request.getTableInfo(), request.getSchema()).name() + ).build()); + } + + + @RunOnVirtualThread + @Override + public Uni dropTable(DropTableRequest request) { + return Uni.createFrom().item(() -> DropTableResponse.newBuilder().setSuccess(icebergService.dropTable(request.getTableInfo(), request.getPurge())).build()); + + } + + @RunOnVirtualThread + @Override + public Uni countRecords(CountRecordRequest request) { + return Uni.createFrom().item(() -> { + var count = icebergService.processTableCountRequest(request); + return CountRecordResponse.newBuilder().setCount(count).build(); + }); + } + + + @RunOnVirtualThread + @Override + public Uni insertChanges(InsertChangesRequest request) { + return Uni.createFrom() + .item(() -> InsertChangesResponse.newBuilder() + .setSuccess( + icebergService.insertChanges( + request.getTableInfo(), + request.getSchema(), + request.getChangesList(), + Optional.ofNullable(request.hasBranchOptions() ? request.getBranchOptions() : null) + )) + .build()); + + } + + /** + * Append records to the iceberg table with records all encoded in the request body + * @deprecated Use {@link #streamingAppendRecords(Multi)} instead for better performance + * @param request AppendRecordsRequest containing the records to be appended along with table info + * @return AppendRecordsResponse containing the success status of the operation + */ + @Blocking + @Override + @Deprecated + public Uni appendRecords(AppendRecordsRequest request) { + return Uni.createFrom().item(() -> AppendRecordsResponse.newBuilder() + .setSuccess( + icebergService.processAppendRecordsRequest(request)) + .build()); + } + + /** + * Append records to the iceberg table with records streamed in the request body + * @param request AppendRecordsStreamRequest containing the records to be appended along with table info as the first record + * @return AppendRecordsStreamResponse containing the success status of the operation + */ + @Blocking + @Override + public Uni streamingAppendRecords(Multi request) { + return icebergService.appendRecordsAsync(request); + } +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/service/IcebergService.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/service/IcebergService.java new file mode 100644 index 000000000..8e2f32e69 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/service/IcebergService.java @@ -0,0 +1,390 @@ +package io.peerdb.flow.jvm.iceberg.service; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Streams; +import io.peerdb.flow.jvm.grpc.AppendRecordTableHeader; +import io.peerdb.flow.jvm.grpc.AppendRecordsRequest; +import io.peerdb.flow.jvm.grpc.AppendRecordsStreamRequest; +import io.peerdb.flow.jvm.grpc.AppendRecordsStreamResponse; +import io.peerdb.flow.jvm.grpc.BranchOptions; +import io.peerdb.flow.jvm.grpc.CountRecordRequest; +import io.peerdb.flow.jvm.grpc.InsertRecord; +import io.peerdb.flow.jvm.grpc.RecordChange; +import io.peerdb.flow.jvm.grpc.TableInfo; +import io.peerdb.flow.jvm.iceberg.avro.AvroIcebergRecordConverter; +import io.peerdb.flow.jvm.iceberg.catalog.CatalogLoader; +import io.peerdb.flow.jvm.iceberg.exceptions.AppendAlreadyDoneException; +import io.peerdb.flow.jvm.iceberg.lock.LockManager; +import io.peerdb.flow.jvm.iceberg.writer.AppendFilesWriter; +import io.peerdb.flow.jvm.iceberg.writer.RecordWriterFactory; +import io.quarkus.logging.Log; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.unchecked.Unchecked; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@ApplicationScoped +public class IcebergService { + + static final int maxIdempotencyKeyAgeDays = 7; + @Inject + CatalogLoader catalogLoader; + @Inject + LockManager lockManager; + @Inject + RecordWriterFactory recordWriterFactory; + + private static void writeRecordStream(Stream recordStream, AvroIcebergRecordConverter converter, TaskWriter writer) { + // use non parallel and use multi everywhere + recordStream.parallel().map(insertRecord -> { + try { + return converter.toIcebergRecord(insertRecord.getRecord().toByteArray()); + } catch (IOException e) { + Log.errorf(e, "Error while converting record"); + throw new UncheckedIOException(e); + } + }).toList().forEach(record -> { + try { + writer.write(record); + } catch (IOException e) { + Log.errorf(e, "Error while writing record"); + throw new UncheckedIOException(e); + } + }); + } + + private Object getTableLockKey(TableInfo tableInfo) { + return List.of(tableInfo.getIcebergCatalog(), tableInfo.getNamespaceList(), tableInfo.getTableName()); + } + + private TableIdentifier getTableIdentifier(TableInfo tableInfo) { + return TableIdentifier.parse(tableInfo.getTableName()); + } + + public Table createTable(TableInfo tableInfo, String schema) { + var icebergCatalog = tableInfo.getIcebergCatalog(); + + var catalog = catalogLoader.loadCatalog(icebergCatalog); + var typeSchema = getIcebergSchema(schema); + // TODO Below require that the primary keys are non-null + var fieldList = typeSchema.columns(); + var primaryKeyFieldIds = tableInfo.getPrimaryKeyList().stream().map(pk -> + Objects.requireNonNull(typeSchema.findField(pk), String.format("Primary key %s not found in schema", pk)).fieldId() + ).collect(Collectors.toSet()); + var icebergSchema = new Schema(fieldList, primaryKeyFieldIds); + + Preconditions.checkArgument(icebergSchema.asStruct().equals(typeSchema.asStruct()), "Primary key based schema not equivalent to type schema [%s!=%s]", icebergSchema.asStruct(), typeSchema.asStruct()); + + var tableIdentifier = getTableIdentifier(tableInfo); + // We create the namespace if needed + if (!tableIdentifier.namespace().isEmpty() && catalog instanceof SupportsNamespaces namespacedCatalog && !namespacedCatalog.namespaceExists(tableIdentifier.namespace())) { + try { + Log.infof("Creating namespace %s", tableIdentifier.namespace()); + namespacedCatalog.createNamespace(tableIdentifier.namespace()); + } catch (AlreadyExistsException e) { + Log.warnf("Namespace %s already exists, skipping", tableIdentifier.namespace()); + } catch (UnsupportedOperationException e) { + Log.warnf("Namespace creation not supported by catalog %s, skipping", icebergCatalog); + } + } + Log.infof("Will now create table %s", tableInfo.getTableName()); + var table = catalog.createTable(tableIdentifier, icebergSchema); + Log.infof("Created table %s", tableInfo.getTableName()); + return table; + } + + public boolean dropTable(TableInfo tableInfo, boolean purge) { + var icebergCatalog = tableInfo.getIcebergCatalog(); + var catalog = catalogLoader.loadCatalog(icebergCatalog); + return catalog.dropTable(getTableIdentifier(tableInfo), purge); + } + + public boolean processAppendRecordsRequest(AppendRecordsRequest request) { + String avroSchema = request.getTableHeader().getSchema(); + Optional idempotencyKey = Optional.ofNullable(request.getTableHeader().hasIdempotencyKey() ? request.getTableHeader().getIdempotencyKey() : null); + return appendRecords(request.getTableHeader().getTableInfo(), avroSchema, request.getRecordsList().stream(), idempotencyKey); + } + + record AppendRecordTableContext(AppendRecordTableHeader tableHeader, Catalog catalog, Table table, + AppendFilesWriter appendFilesWriter) { + } + + + public Uni appendRecordsAsync(Multi request) { + var tableContext = new AtomicReference(); + var counter = new AtomicInteger(); + var resultUni = request.map(record -> Pair.of(counter.getAndIncrement(), record)) + // Initialize the Iceberg Table and Create the AppendFilesWriter for Appending Records + .map(Unchecked.function(pair -> { + var index = pair.getKey(); + var record = pair.getValue(); + if (index == 0) { + if (record.getCommandCase() == AppendRecordsStreamRequest.CommandCase.TABLE_HEADER) { + var tableHeader = record.getTableHeader(); + var avroSchema = tableHeader.getSchema(); + var icebergCatalog = catalogLoader.loadCatalog(tableHeader.getTableInfo().getIcebergCatalog()); + var table = icebergCatalog.loadTable(getTableIdentifier(tableHeader.getTableInfo())); + if (isAppendAlreadyDone(table, Optional.ofNullable(tableHeader.hasIdempotencyKey() ? tableHeader.getIdempotencyKey() : null))) { + throw new AppendAlreadyDoneException(String.format("Append already done for table %s with idempotency key %s", table.name(), tableHeader.getIdempotencyKey())); + } + tableContext.set(new AppendRecordTableContext(tableHeader, icebergCatalog, table, new AppendFilesWriter(recordWriterFactory, avroSchema, table))); + return Optional.empty(); + } + throw new IllegalArgumentException("Expected TableHeader as the first message, got " + record.getCommandCase()); + } else if (record.getCommandCase() == AppendRecordsStreamRequest.CommandCase.RECORD) { + return Optional.of(record.getRecord()); + } else { + throw new IllegalArgumentException("Expected InsertRecord as the rest of the messages, got " + record.getCommandCase()); + } + })).filter(Optional::isPresent).map(Optional::get) + // Write the records + .map(Unchecked.function(insertRecord -> { + var appendRecordTableContext = tableContext.get(); + var appendFilesWriter = appendRecordTableContext.appendFilesWriter(); + try { + appendFilesWriter.writeAvroBytesRecord(insertRecord.getRecord().toByteArray()); + } catch (IOException e) { + Log.errorf(e, "Error while converting record"); + throw new UncheckedIOException(e); + } + return true; + })).filter(obj -> !obj).collect().asList().replaceWithVoid() + // Now commit to the table using critical section to prevent the requirement of metadata refresh and transactions failing + .map(Unchecked.function((ignored) -> { + var appendRecordTableContext = tableContext.get(); + var tableHeader = appendRecordTableContext.tableHeader(); + var table = appendRecordTableContext.table(); + var tableInfo = tableHeader.getTableInfo(); + var idempotencyKey = Optional.ofNullable(tableHeader.hasIdempotencyKey() ? tableHeader.getIdempotencyKey() : null); + var appendFilesWriter = appendRecordTableContext.appendFilesWriter(); + var dataFiles = appendFilesWriter.complete(); + return commitDataFilesToTableWithLock(table, tableInfo, idempotencyKey, dataFiles); + })) + .onFailure(AppendAlreadyDoneException.class).recoverWithItem((error) -> { + Log.warnf("Received AppendAlreadyCompletedException, ignoring", error.getMessage()); + return true; + }); + return resultUni.map(success -> AppendRecordsStreamResponse.newBuilder().setSuccess(success).build()); + } + + + private boolean commitDataFilesToTableWithLock(Table table, TableInfo tableInfo, Optional idempotencyKey, DataFile[] dataFiles) { + var recordCount = Arrays.stream(dataFiles).map(ContentFile::recordCount).reduce(0L, Long::sum); + Log.infof("Converted %d records to %d data files for table %s", recordCount, dataFiles.length, table.name()); + + var lockKey = List.of(tableInfo.getIcebergCatalog().toString(), tableInfo.getNamespaceList(), tableInfo.getTableName()); + Log.infof("Will now acquire lock for table %s by idempotency key %s for lockHashCode: %d", table.name(), idempotencyKey.orElse(""), lockKey.hashCode()); + var lock = lockManager.newLock(lockKey); + var lockStopWatch = Stopwatch.createStarted(); + lock.lock(); + try { + Log.infof("Acquired lock for table %s in %d ms by idempotency key %s", table.name(), lockStopWatch.elapsed(TimeUnit.MILLISECONDS), idempotencyKey.orElse("")); + Log.infof("Will now refresh table %s", table.name()); + table.refresh(); + if (isAppendAlreadyDone(table, idempotencyKey)) { + return true; + } + var transaction = table.newTransaction(); + Log.infof("Will now append files to table %s", table.name()); + var appendFiles = transaction.newAppend(); + + Arrays.stream(dataFiles).forEach(appendFiles::appendFile); + Log.infof("Appended files to table %s", table.name()); + appendFiles.commit(); + Log.infof("Committed files to table %s", table.name()); + idempotencyKey.ifPresent(key -> { + Log.infof("Will now create branch %s for table %s", key, table.name()); + transaction.manageSnapshots().createBranch(getBranchNameFromIdempotencyKey(key)) + .setMaxRefAgeMs(getBranchNameFromIdempotencyKey(key), Duration.ofDays(maxIdempotencyKeyAgeDays).toMillis()) + .commit(); + Log.infof("Created branch %s for table %s", key, table.name()); + }); + transaction.table().refresh(); + + Log.infof("Will now commit transaction for table %s", table.name()); + transaction.commitTransaction(); + Log.infof("Committed transaction for table %s", table.name()); + + return true; + } finally { + lock.unlock(); + Log.infof("Released lock for table %s by idempotency key %s", table.name(), idempotencyKey.orElse("")); + } + } + + public boolean appendRecords(TableInfo tableInfo, String avroSchema, Stream recordStream, Optional idempotencyKey) { + var icebergCatalog = catalogLoader.loadCatalog(tableInfo.getIcebergCatalog()); + var table = icebergCatalog.loadTable(getTableIdentifier(tableInfo)); + + if (isAppendAlreadyDone(table, idempotencyKey)) { + return true; + } + + Log.infof("Converting append records to data files for table %s", table.name()); + var dataFiles = getAppendDataFiles(avroSchema, table, recordStream); + return commitDataFilesToTableWithLock(table, tableInfo, idempotencyKey, dataFiles); + + } + + private DataFile[] getAppendDataFiles(String avroSchema, Table table, Stream recordStream) { + WriteResult writeResult; + try (var writer = recordWriterFactory.createRecordWriter(table)) { + var converter = new AvroIcebergRecordConverter(avroSchema, table.schema(), table.name()); + Log.infof("Will now write records to append to table %s", table.name()); + var stopwatch = Stopwatch.createStarted(); + writeRecordStream(recordStream, converter, writer); + Log.infof("Completed writing records to append to table %s in %d ms", table.name(), stopwatch.elapsed(TimeUnit.MILLISECONDS)); + try { + writeResult = writer.complete(); + } catch (IOException e) { + Log.errorf(e, "Error while completing writing records"); + throw new UncheckedIOException(e); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return writeResult.dataFiles(); + } + + private boolean isAppendAlreadyDone(Table table, Optional idempotencyKey) { + if (idempotencyKey.isPresent()) { + var branchName = getBranchNameFromIdempotencyKey(idempotencyKey.get()); + if (table.refs().containsKey(branchName)) { + Log.warnf("Already committed work found for table %s with idempotency key %s", table.name(), idempotencyKey.get()); + return true; + } + } + return false; + } + + public long processTableCountRequest(CountRecordRequest request) { + var tableInfo = request.getTableInfo(); + var icebergCatalog = tableInfo.getIcebergCatalog(); + var catalog = catalogLoader.loadCatalog(icebergCatalog); + var table = catalog.loadTable(getTableIdentifier(tableInfo)); + + Log.debugf("For table %s, schema is %s", tableInfo.getTableName(), table.schema()); + var count = 0L; + try (var tableScan = IcebergGenerics.read(table).build()) { + count = Streams.stream(tableScan.iterator()).reduce(0L, (current, record) -> current + 1L, Long::sum); + } catch (IOException e) { + Log.errorf(e, "Error reading table %s", tableInfo.getTableName()); + throw new RuntimeException(e); + } + return count; + } + + public boolean insertChanges(TableInfo tableInfo, String avroSchema, List recordChanges, Optional branchOptions) { + var icebergCatalog = catalogLoader.loadCatalog(tableInfo.getIcebergCatalog()); + var table = icebergCatalog.loadTable(getTableIdentifier(tableInfo)); + if (branchOptions.isPresent()) { + var branchName = branchOptions.get().getBranch(); + if (table.refs().containsKey(branchName)) { + switch (branchOptions.get().getBranchCreateConflictPolicy()) { + case ERROR -> + throw new IllegalArgumentException(String.format("Branch %s already exists", branchName)); + case IGNORE -> { + return false; + } + case DROP -> table.newTransaction().manageSnapshots().removeBranch(branchName).commit(); + default -> + throw new IllegalArgumentException(String.format("Unrecognized branch create conflict policy %s", branchOptions.get().getBranchCreateConflictPolicy())); + } + } + } + var writer = recordWriterFactory.createRecordWriter(table); + + var converter = new AvroIcebergRecordConverter(avroSchema, table.schema(), table.name()); + recordChanges.forEach(recordChange -> { + switch (recordChange.getChangeCase()) { + case INSERT: + Log.tracef("Inserting record: %s", recordChange.getInsert()); + var insertRecord = recordChange.getInsert(); + try { + var genericRecord = converter.toIcebergRecord(insertRecord.getRecord().toByteArray()); + } catch (IOException e) { + Log.errorf(e, "Error while converting record"); + throw new RuntimeException(e); + } + + break; + case DELETE: + Log.tracef("Deleting record: %s", recordChange.getDelete()); + var deleteRecord = recordChange.getDelete(); + break; + case UPDATE: + Log.tracef("Updating record: %s", recordChange.getUpdate()); + var updateRecord = recordChange.getUpdate(); + break; + } + }); + + + WriteResult writeResult; + try { + writeResult = writer.complete(); + } catch (IOException e) { + Log.errorf(e, "Error while completing writing records"); + throw new RuntimeException(e); + } + + var transaction = table.newTransaction(); + branchOptions.ifPresent(options -> transaction.manageSnapshots().createBranch(options.getBranch()) +// .setMaxRefAgeMs() +// .setMinSnapshotsToKeep() +// .setMaxSnapshotAgeMs() + .commit()); + + + var appendFiles = transaction.newAppend(); + + if (branchOptions.isPresent()) { + appendFiles = appendFiles.toBranch(branchOptions.get().getBranch()); + } + + Arrays.stream(writeResult.dataFiles()).forEach(appendFiles::appendFile); + appendFiles.commit(); + transaction.commitTransaction(); + return false; + } + + public Schema getIcebergSchema(String schemaString) { + var avroSchemaParser = new org.apache.avro.Schema.Parser(); + var avroSchema = avroSchemaParser.parse(schemaString); + return AvroSchemaUtil.toIceberg(avroSchema); + } + + + private String getBranchNameFromIdempotencyKey(String idempotencyKey) { + return String.format("__peerdb-idem-%s", idempotencyKey); + } +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/writer/AppendFilesWriter.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/writer/AppendFilesWriter.java new file mode 100644 index 000000000..64388f7bd --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/writer/AppendFilesWriter.java @@ -0,0 +1,36 @@ +package io.peerdb.flow.jvm.iceberg.writer; + + +import io.peerdb.flow.jvm.iceberg.avro.AvroIcebergRecordConverter; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.TaskWriter; + +import java.io.IOException; + +public class AppendFilesWriter implements AutoCloseable { + + private final TaskWriter writer; + private final AvroIcebergRecordConverter converter; + + public AppendFilesWriter(RecordWriterFactory factory, String avroSchema, Table table) { + this.writer = factory.createRecordWriter(table); + this.converter = new AvroIcebergRecordConverter(avroSchema, table.schema(), table.name()); + } + + public void writeAvroBytesRecord(byte[] avroBytes) throws IOException { + var record = converter.toIcebergRecord(avroBytes); + writer.write(record); + } + + + public DataFile[] complete() throws IOException { + return writer.complete().dataFiles(); + } + + @Override + public void close() throws IOException { + writer.close(); + } +} diff --git a/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/writer/RecordWriterFactory.java b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/writer/RecordWriterFactory.java new file mode 100644 index 000000000..1e6ba8d66 --- /dev/null +++ b/flow-jvm/src/main/java/io/peerdb/flow/jvm/iceberg/writer/RecordWriterFactory.java @@ -0,0 +1,41 @@ +package io.peerdb.flow.jvm.iceberg.writer; + + +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.util.PropertyUtil; + +import java.util.UUID; + +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; + +@ApplicationScoped +public class RecordWriterFactory { + public TaskWriter createRecordWriter(Table table) { + // TODO add support for partition ID + return createUnpartitionedRecordWriter(table); + } + + + private TaskWriter createUnpartitionedRecordWriter(Table table) { + var appenderFactory = new GenericAppenderFactory(table.schema(), table.spec()); + var format = FileFormat.fromString(table.properties().getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT)); + var outputFileFactory = OutputFileFactory.builderFor(table, 1, System.currentTimeMillis()) + .defaultSpec(table.spec()) + .operationId(UUID.randomUUID().toString()) + .format(format) + .build(); + var targetFileSize = PropertyUtil.propertyAsLong(table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + + return new UnpartitionedWriter<>(table.spec(), format, appenderFactory, outputFileFactory, table.io(), targetFileSize); + } + +} diff --git a/flow-jvm/src/main/resources/application.properties b/flow-jvm/src/main/resources/application.properties new file mode 100644 index 000000000..f0b85408f --- /dev/null +++ b/flow-jvm/src/main/resources/application.properties @@ -0,0 +1 @@ +# USE applciation.yaml instead diff --git a/flow-jvm/src/main/resources/application.yaml b/flow-jvm/src/main/resources/application.yaml new file mode 100644 index 000000000..4c6b7ffe6 --- /dev/null +++ b/flow-jvm/src/main/resources/application.yaml @@ -0,0 +1,36 @@ +quarkus: + generate-code: + grpc: + scan-for-imports: all + grpc: + server: + port: ${FLOW_JVM_PORT:9801} + + # 512 MB + max-inbound-message-size: 536870912 + + use-separate-server: false + log: + level: INFO + category: + "io.peerdb": + level: ${PEERDB_LOG_LEVEL:INFO} + # This is build time + min-level: ${PEERDB_MIN_LOG_LEVEL:INFO} + + console: + level: ${PEERDB_LOG_LEVEL:INFO} + http: + port: ${FLOW_JVM_PORT:9801} + + +"%dev": + quarkus: + log: + console: + json: false +"%test": + quarkus: + log: + console: + json: false diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 3a69765e4..1d290878b 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -14,6 +14,7 @@ import ( connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse" connelasticsearch "github.com/PeerDB-io/peer-flow/connectors/connelasticsearch" conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" + conniceberg "github.com/PeerDB-io/peer-flow/connectors/iceberg" connkafka "github.com/PeerDB-io/peer-flow/connectors/kafka" connmysql "github.com/PeerDB-io/peer-flow/connectors/mysql" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" @@ -347,6 +348,12 @@ func LoadPeer(ctx context.Context, catalogPool *pgxpool.Pool, peerName string) ( return nil, fmt.Errorf("failed to unmarshal Elasticsearch config: %w", err) } peer.Config = &protos.Peer_ElasticsearchConfig{ElasticsearchConfig: &config} + case protos.DBType_ICEBERG: + var config protos.IcebergConfig + if err := proto.Unmarshal(peerOptions, &config); err != nil { + return nil, fmt.Errorf("failed to unmarshal Iceberg config: %w", err) + } + peer.Config = &protos.Peer_IcebergConfig{IcebergConfig: &config} default: return nil, fmt.Errorf("unsupported peer type: %s", peer.Type) } @@ -378,6 +385,8 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) { return connpubsub.NewPubSubConnector(ctx, inner.PubsubConfig) case *protos.Peer_ElasticsearchConfig: return connelasticsearch.NewElasticsearchConnector(ctx, inner.ElasticsearchConfig) + case *protos.Peer_IcebergConfig: + return conniceberg.NewIcebergConnector(ctx, inner.IcebergConfig) default: return nil, errors.ErrUnsupported } @@ -428,6 +437,7 @@ var ( _ CDCSyncConnector = &conns3.S3Connector{} _ CDCSyncConnector = &connclickhouse.ClickhouseConnector{} _ CDCSyncConnector = &connelasticsearch.ElasticsearchConnector{} + _ CDCSyncConnector = &conniceberg.IcebergConnector{} _ CDCSyncPgConnector = &connpostgres.PostgresConnector{} @@ -459,6 +469,7 @@ var ( _ QRepSyncConnector = &conns3.S3Connector{} _ QRepSyncConnector = &connclickhouse.ClickhouseConnector{} _ QRepSyncConnector = &connelasticsearch.ElasticsearchConnector{} + _ QRepSyncConnector = &conniceberg.IcebergConnector{} _ QRepSyncPgConnector = &connpostgres.PostgresConnector{} @@ -473,6 +484,7 @@ var ( _ ValidationConnector = &connclickhouse.ClickhouseConnector{} _ ValidationConnector = &connbigquery.BigQueryConnector{} _ ValidationConnector = &conns3.S3Connector{} + _ ValidationConnector = &conniceberg.IcebergConnector{} _ Connector = &connmysql.MySqlConnector{} ) diff --git a/flow/connectors/iceberg/iceberg.go b/flow/connectors/iceberg/iceberg.go new file mode 100644 index 000000000..9f03d2f14 --- /dev/null +++ b/flow/connectors/iceberg/iceberg.go @@ -0,0 +1,336 @@ +package iceberg + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + "go.temporal.io/sdk/log" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/PeerDB-io/peer-flow/alerting" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/datatypes" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_guages" + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +type IcebergConnector struct { + *metadataStore.PostgresMetadata + logger log.Logger + config *protos.IcebergConfig + grpcConnection *grpc.ClientConn + proxyClient protos.IcebergProxyServiceClient +} + +func (c *IcebergConnector) GetTableSchema( + ctx context.Context, + req *protos.GetTableSchemaBatchInput, +) (*protos.GetTableSchemaBatchOutput, error) { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) EnsurePullability( + ctx context.Context, + req *protos.EnsurePullabilityBatchInput, +) (*protos.EnsurePullabilityBatchOutput, error) { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) ExportTxSnapshot(ctx context.Context) (*protos.ExportTxSnapshotOutput, any, error) { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) FinishExport(a any) error { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) SetupReplConn(ctx context.Context) error { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) ReplPing(ctx context.Context) error { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) UpdateReplStateLastOffset(lastOffset int64) { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) PullFlowCleanup(ctx context.Context, jobName string) error { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) HandleSlotInfo( + ctx context.Context, + alerter *alerting.Alerter, + catalogPool *pgxpool.Pool, + slotName string, + peerName string, + slotMetricGuages peerdb_guages.SlotMetricGuages, +) error { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) GetSlotInfo(ctx context.Context, slotName string) ([]*protos.SlotInfo, error) { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) AddTablesToPublication(ctx context.Context, req *protos.AddTablesToPublicationInput) error { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) PullRecords( + ctx context.Context, + catalogPool *pgxpool.Pool, + req *model.PullRecordsRequest[model.RecordItems], +) error { + // TODO implement me + panic("implement me") +} + +func (c *IcebergConnector) StartSetupNormalizedTables(ctx context.Context) (any, error) { + // TODO might be better to do all tables in 1 go + return nil, nil +} + +func (c *IcebergConnector) CleanupSetupNormalizedTables(ctx context.Context, tx any) {} + +func (c *IcebergConnector) FinishSetupNormalizedTables(ctx context.Context, tx any) error { + return nil +} + +func (c *IcebergConnector) SyncFlowCleanup(ctx context.Context, jobName string) error { + err := c.PostgresMetadata.SyncFlowCleanup(ctx, jobName) + if err != nil { + return fmt.Errorf("unable to clear metadata for sync flow cleanup : %w", err) + } + // TODO implement this + c.logger.Debug("SyncFlowCleanup for Iceberg is a no-op") + return nil +} + +func (c *IcebergConnector) SetupNormalizedTable( + ctx context.Context, + tx any, + tableIdentifier string, + tableSchema *protos.TableSchema, + softDeleteColName string, + syncedAtColName string, +) (bool, error) { + primaryKeyColumns := make(map[string]struct{}, len(tableSchema.PrimaryKeyColumns)) + for _, col := range tableSchema.PrimaryKeyColumns { + primaryKeyColumns[col] = struct{}{} + } + + qFields := make([]qvalue.QField, len(tableSchema.Columns)) + for i, fieldDescription := range tableSchema.Columns { + colName := fieldDescription.Name + qValueKind := qvalue.QValueKind(fieldDescription.Type) + var precision, scale int16 + if qValueKind == qvalue.QValueKindNumeric { + precision, scale = datatypes.ParseNumericTypmod(fieldDescription.TypeModifier) + } + + _, isPrimaryKey := primaryKeyColumns[colName] + + qField := qvalue.QField{ + Name: colName, + Type: qValueKind, + Precision: precision, + Scale: scale, + Nullable: !isPrimaryKey, + } + qFields[i] = qField + } + + qFields = addPeerMetaColumns(qFields, softDeleteColName, syncedAtColName) + + avroSchema, err := getAvroSchema(tableIdentifier, qvalue.NewQRecordSchema(qFields)) + if err != nil { + return false, err + } + + // TODO save to a buffer and call when Finish is called + // TODO maybe later migrate to a streaming rpc with transaction support + tableResponse, err := c.proxyClient.CreateTable(ctx, &protos.CreateTableRequest{ + TableInfo: &protos.TableInfo{ + Namespace: nil, + TableName: tableIdentifier, + IcebergCatalog: c.config.CatalogConfig, + PrimaryKey: tableSchema.GetPrimaryKeyColumns(), + }, + Schema: avroSchema.Schema, + }) + if err != nil { + return false, err + } + c.logger.Debug("Created iceberg table", slog.String("table", tableResponse.TableName)) + // TODO need to re-enable this and see why it is failing + // if tableResponse.TableName != tableIdentifier { + // return false, fmt.Errorf("created table name mismatch: %s != %s", tableResponse.TableName, tableIdentifier) + //} + return true, nil +} + +func addPeerMetaColumns(qFields []qvalue.QField, softDeleteColName string, syncedAtColName string) []qvalue.QField { + qFields = append(qFields, + qvalue.QField{ + Name: softDeleteColName, + Type: qvalue.QValueKindBoolean, + Nullable: true, + }, qvalue.QField{ + Name: syncedAtColName, + Type: qvalue.QValueKindTimestampTZ, + Nullable: true, + }) + return qFields +} + +func NewIcebergConnector( + ctx context.Context, + config *protos.IcebergConfig, +) (*IcebergConnector, error) { + logger := logger.LoggerFromCtx(ctx) + conn, err := grpc.NewClient( + peerdbenv.PeerDBFlowJvmAddress(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, fmt.Errorf("failed to connect to Iceberg proxy: %w", err) + } + client := protos.NewIcebergProxyServiceClient(conn) + + pgMetadata, err := metadataStore.NewPostgresMetadata(ctx) + if err != nil { + logger.Error("failed to create postgres metadata store", "error", err) + return nil, err + } + + return &IcebergConnector{ + PostgresMetadata: pgMetadata, + logger: logger, + config: config, + grpcConnection: conn, + proxyClient: client, + }, nil +} + +func (c *IcebergConnector) CreateRawTable(_ context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { + c.logger.Info("CreateRawTable for Iceberg is a no-op") + return nil, nil +} + +func (c *IcebergConnector) Close() error { + return c.grpcConnection.Close() +} + +func (c *IcebergConnector) ValidateCheck(ctx context.Context) error { + // Create a table with a random name based on current time + // TODO ask for namespace in the peer settings and use that instead of __peerdb_test + // Can also ask for a boolean if provided namespace is to be created or not + tableName := fmt.Sprintf("__peerdb_test_%d.__peerdb_test_flow_%d", time.Now().Unix(), time.Now().Unix()) + c.logger.Debug("Will try to create iceberg table", "table", tableName) + _, err := c.proxyClient.CreateTable(ctx, + &protos.CreateTableRequest{ + TableInfo: &protos.TableInfo{ + Namespace: nil, + TableName: tableName, + IcebergCatalog: c.config.CatalogConfig, + PrimaryKey: nil, + }, + Schema: `{ + "type": "record", + "name": "TestObject", + "namespace": "", + "fields": [ + { + "name": "hello", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "some", + "type": [ + "null", + "string" + ], + "default": null + } + ] +}`, + }) + if err != nil { + return err + } + c.logger.Debug("Created iceberg table, will try to drop it now", "table", tableName) + dropTable, err := c.proxyClient.DropTable(ctx, + &protos.DropTableRequest{ + TableInfo: &protos.TableInfo{ + Namespace: nil, + TableName: tableName, + IcebergCatalog: c.config.CatalogConfig, + PrimaryKey: nil, + }, + Purge: true, + }, + ) + if err != nil { + return err + } + if !dropTable.Success { + return fmt.Errorf("failed to drop table %s", tableName) + } + c.logger.Debug("Dropped iceberg table", slog.String("table", tableName)) + return nil +} + +func (c *IcebergConnector) ConnectionActive(ctx context.Context) error { + // TODO implement this for iceberg + return nil +} + +func (c *IcebergConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) + + lastCheckpoint := req.Records.GetLastCheckpoint() + err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint) + if err != nil { + c.logger.Error("failed to increment id", "error", err) + return nil, err + } + numRecords := 0 + return &model.SyncResponse{ + LastSyncedCheckpointID: lastCheckpoint, + NumRecordsSynced: int64(numRecords), + TableNameRowsMapping: tableNameRowsMapping, + TableSchemaDeltas: req.Records.SchemaDeltas, + }, nil +} + +func (c *IcebergConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error { + c.logger.Info("ReplayTableSchemaDeltas for Iceberg is a no-op") + return nil +} diff --git a/flow/connectors/iceberg/qrep.go b/flow/connectors/iceberg/qrep.go new file mode 100644 index 000000000..6ce51f9ec --- /dev/null +++ b/flow/connectors/iceberg/qrep.go @@ -0,0 +1,238 @@ +package iceberg + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/linkedin/goavro/v2" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/peerdbenv/features" +) + +func (c *IcebergConnector) SyncQRepRecords( + ctx context.Context, + config *protos.QRepConfig, + partition *protos.QRepPartition, + stream *model.QRecordStream, +) (int, error) { + if !features.IcebergFeatureStreamingDisabled(ctx) { + return c.streamRecords(ctx, config, partition, stream) + } + return c.sendRecordsJoined(ctx, config, partition, stream) +} + +func (c *IcebergConnector) sendRecordsJoined( + ctx context.Context, + config *protos.QRepConfig, + partition *protos.QRepPartition, + stream *model.QRecordStream, +) (int, error) { + c.logger.Info("[iceberg qrep.go]:sending records joined") + schema := stream.Schema() + + schema.Fields = addPeerMetaColumns(schema.Fields, config.SoftDeleteColName, config.SyncedAtColName) + dstTableName := config.DestinationTableIdentifier + + avroSchema, err := getAvroSchema(dstTableName, schema) + if err != nil { + return 0, err + } + + avroConverter := model.NewQRecordAvroConverter( + avroSchema, + protos.DBType_ICEBERG, + schema.GetColumnNames(), + logger.LoggerFromCtx(ctx), + ) + codec, err := goavro.NewCodec(avroSchema.Schema) + if err != nil { + return 0, fmt.Errorf("failed to create Avro codec: %w", err) + } + binaryRecords := make([]*protos.InsertRecord, 0) + for record := range stream.Records { + record = append(record, + // Add soft delete + qvalue.QValueBoolean{ + Val: false, + }, // add synced at colname + qvalue.QValueTimestampTZ{ + Val: time.Now(), + }) + + converted, err := avroConverter.Convert(record) + if err != nil { + return 0, err + } + binaryData := make([]byte, 0) + native, err := codec.BinaryFromNative(binaryData, converted) + if err != nil { + return 0, fmt.Errorf("failed to convert Avro map to binary: %w", err) + } + + binaryRecords = append(binaryRecords, &protos.InsertRecord{ + Record: native, + }) + } + + requestIdempotencyKey := fmt.Sprintf("_peerdb_qrep-%s-%s", config.FlowJobName, partition.PartitionId) + + appendRecordsResponse, err := c.proxyClient.AppendRecords(ctx, + &protos.AppendRecordsRequest{ + TableHeader: &protos.AppendRecordTableHeader{ + TableInfo: &protos.TableInfo{ + // Namespace: nil, + TableName: dstTableName, + IcebergCatalog: c.config.CatalogConfig, + // PrimaryKey: nil, + }, + Schema: avroSchema.Schema, + IdempotencyKey: &requestIdempotencyKey, + }, + Records: binaryRecords, + }, + ) + if err != nil { + return 0, err + } + + logger.LoggerFromCtx(ctx).Info("AppendRecordsResponse", slog.Any("response", appendRecordsResponse.Success)) + + err = c.PostgresMetadata.FinishQRepPartition(ctx, partition, config.FlowJobName, time.Now()) + if err != nil { + return 0, err + } + return len(binaryRecords), nil +} + +func (c *IcebergConnector) streamRecords( + ctx context.Context, + config *protos.QRepConfig, + partition *protos.QRepPartition, + stream *model.QRecordStream, +) (int, error) { + c.logger.Info("[iceberg qrep.go]:streaming records") + schema := stream.Schema() + + schema.Fields = addPeerMetaColumns(schema.Fields, config.SoftDeleteColName, config.SyncedAtColName) + dstTableName := config.DestinationTableIdentifier + + avroSchema, err := getAvroSchema(dstTableName, schema) + if err != nil { + return 0, err + } + + avroConverter := model.NewQRecordAvroConverter( + avroSchema, + protos.DBType_ICEBERG, + schema.GetColumnNames(), + logger.LoggerFromCtx(ctx), + ) + codec, err := goavro.NewCodec(avroSchema.Schema) + if err != nil { + return 0, fmt.Errorf("failed to create Avro codec: %w", err) + } + requestIdempotencyKey := fmt.Sprintf("_peerdb_qrep-%s-%s", config.FlowJobName, partition.PartitionId) + + tableHeader := &protos.AppendRecordTableHeader{ + TableInfo: &protos.TableInfo{ + // Namespace: nil, + TableName: dstTableName, + IcebergCatalog: c.config.CatalogConfig, + // PrimaryKey: nil, + }, + Schema: avroSchema.Schema, + IdempotencyKey: &requestIdempotencyKey, + } + recordStream, err := c.proxyClient.StreamingAppendRecords(ctx) + if err != nil { + return 0, err + } + + err = recordStream.Send(&protos.AppendRecordsStreamRequest{ + Command: &protos.AppendRecordsStreamRequest_TableHeader{ + TableHeader: tableHeader, + }, + }) + // TODO what to do with recordStream? + if err != nil { + return 0, err + } + + recordCount := 0 + for record := range stream.Records { + record = append(record, + // Add soft delete + qvalue.QValueBoolean{ + Val: false, + }, // add synced at colname + qvalue.QValueTimestampTZ{ + Val: time.Now(), + }) + + converted, err := avroConverter.Convert(record) + if err != nil { + return 0, err + } + binaryData := make([]byte, 0) + native, err := codec.BinaryFromNative(binaryData, converted) + if err != nil { + return 0, fmt.Errorf("failed to convert Avro map to binary: %w", err) + } + insertRecord := &protos.InsertRecord{ + Record: native, + } + err = recordStream.Send(&protos.AppendRecordsStreamRequest{ + Command: &protos.AppendRecordsStreamRequest_Record{ + Record: insertRecord, + }, + }) + if err != nil { + return 0, err + } + recordCount++ + } + + c.logger.Info("closing record stream") + appendRecordsStreamResponse, err := recordStream.CloseAndRecv() + if err != nil { + return 0, err + } + logger.LoggerFromCtx(ctx).Info("AppendRecordsResponse", slog.Any("response", appendRecordsStreamResponse.Success)) + + err = c.PostgresMetadata.FinishQRepPartition(ctx, partition, config.FlowJobName, time.Now()) + if err != nil { + return 0, err + } + return recordCount, nil +} + +func getAvroSchema( + dstTableName string, + schema qvalue.QRecordSchema, +) (*model.QRecordAvroSchemaDefinition, error) { + avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, protos.DBType_ICEBERG) + if err != nil { + return nil, fmt.Errorf("failed to define Avro schema: %w", err) + } + + return avroSchema, nil +} + +// Iceberg just sets up destination, not metadata tables +func (c *IcebergConnector) SetupQRepMetadataTables(_ context.Context, config *protos.QRepConfig) error { + c.logger.Info("QRep metadata setup not needed for Iceberg.") + return nil +} + +// Iceberg doesn't check if partition is already synced, but file with same name is overwritten +func (c *IcebergConnector) IsQRepPartitionSynced(ctx context.Context, + config *protos.IsQRepPartitionSyncedInput, +) (bool, error) { + return c.PostgresMetadata.IsQRepPartitionSynced(ctx, config) +} diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 14c7b31ef..943195b7b 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -79,5 +79,6 @@ func (c *S3Connector) SetupQRepMetadataTables(_ context.Context, config *protos. func (c *S3Connector) IsQRepPartitionSynced(_ context.Context, config *protos.IsQRepPartitionSyncedInput, ) (bool, error) { + // TODO maybe we should actually check if the file exists instead of retrying blindly return false, nil } diff --git a/flow/connectors/utils/peers.go b/flow/connectors/utils/peers.go index b871e6037..5421622f1 100644 --- a/flow/connectors/utils/peers.go +++ b/flow/connectors/utils/peers.go @@ -85,6 +85,12 @@ func CreatePeerNoValidate( return wrongConfigResponse, nil } innerConfig = esConfigObject.ElasticsearchConfig + case protos.DBType_ICEBERG: + icebergConfigObject, ok := config.(*protos.Peer_IcebergConfig) + if !ok { + return wrongConfigResponse, nil + } + innerConfig = icebergConfigObject.IcebergConfig default: return wrongConfigResponse, nil } @@ -96,9 +102,9 @@ func CreatePeerNoValidate( } _, err := pool.Exec(ctx, ` - INSERT INTO peers (name, type, options) + INSERT INTO peers (name, type, options) VALUES ($1, $2, $3) - ON CONFLICT (name) DO UPDATE + ON CONFLICT (name) DO UPDATE SET type = $2, options = $3`, peer.Name, peerType, encodedConfig, ) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 648a6aa7a..9ce7a6acd 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -50,6 +50,14 @@ type AvroSchemaField struct { LogicalType string `json:"logicalType,omitempty"` } +// AvroSchemaFixed TODO this needs to be studied for Iceberg +type AvroSchemaFixed struct { + Type string `json:"type"` + Name string `json:"name"` + LogicalType string `json:"logicalType,omitempty"` + Size int `json:"size"` +} + func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, targetDB protos.DBType) (decimal.Decimal, error) { if targetDB == protos.DBType_SNOWFLAKE || targetDB == protos.DBType_BIGQUERY { bidigi := datatypes.CountDigits(num.BigInt()) @@ -81,6 +89,16 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci case QValueKindInterval: return "string", nil case QValueKindUUID: + if targetDWH == protos.DBType_ICEBERG { + return "string", nil + // TODO use proper fixed uuids for iceberg as below + // return AvroSchemaFixed{ + // Type: "fixed", + // Size: 16, + // Name: "uuid_fixed_" + name, + // LogicalType: "uuid", + // }, nil + } return AvroSchemaLogical{ Type: "string", LogicalType: "uuid", @@ -120,12 +138,20 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci } return "string", nil case QValueKindTimestamp, QValueKindTimestampTZ: - if targetDWH == protos.DBType_CLICKHOUSE { + if targetDWH == protos.DBType_CLICKHOUSE || (targetDWH == protos.DBType_ICEBERG && kind == QValueKindTimestamp) { return AvroSchemaLogical{ Type: "long", LogicalType: "timestamp-micros", }, nil } + if targetDWH == protos.DBType_ICEBERG { + // This is specific to Iceberg, to enable timestamp with timezone + return map[string]interface{}{ + "type": "long", + "logicalType": "timestamp-micros", + "adjust-to-utc": true, + }, nil + } return "string", nil case QValueKindHStore, QValueKindJSON, QValueKindStruct: return "string", nil @@ -351,7 +377,16 @@ func QValueToAvro(value QValue, field *QField, targetDWH protos.DBType, logger l case QValueArrayDate: return c.processArrayDate(v.Val), nil case QValueUUID: - return c.processUUID(v.Val), nil + if c.TargetDWH == protos.DBType_ICEBERG { + // TODO make this a fixed type for iceberg uuids + // return c.processUUID(v.Val, "uuid_fixed_"+field.Name), nil + genUuid, err := uuid.FromBytes(v.Val[:]) + if err != nil { + return nil, fmt.Errorf("failed to convert UUID to string: %w", err) + } + return c.processNullableUnion("string", genUuid.String()) + } + return c.processUUIDString(v.Val), nil case QValueGeography, QValueGeometry, QValuePoint: return c.processGeospatial(v.Value().(string)), nil default: @@ -546,7 +581,7 @@ func (c *QValueAvroConverter) processHStore(hstore string) (interface{}, error) return jsonString, nil } -func (c *QValueAvroConverter) processUUID(byteData [16]byte) interface{} { +func (c *QValueAvroConverter) processUUIDString(byteData [16]byte) interface{} { uuidString := uuid.UUID(byteData).String() if c.Nullable { return goavro.Union("string", uuidString) @@ -554,6 +589,18 @@ func (c *QValueAvroConverter) processUUID(byteData [16]byte) interface{} { return uuidString } +// processUUID converts a UUID byte array to a string or a byte array based on the nullable flag +// TODO it needs to be used once we have fixed types for Iceberg +// +//nolint:unused +func (c *QValueAvroConverter) processUUID(byteData [16]byte, uuidTypeName string) interface{} { + if c.Nullable { + // Slice is required by goavro + return goavro.Union(uuidTypeName, byteData[:]) + } + return byteData +} + func (c *QValueAvroConverter) processGeospatial(geoString string) interface{} { if c.Nullable { return goavro.Union("string", geoString) diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 6c7501b0a..bba3ae2d0 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -92,3 +92,8 @@ func PeerDBAlertingEmailSenderRegion() string { func PeerDBAlertingEmailSenderReplyToAddresses() string { return GetEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "") } + +// PeerDBFlowJvmAddress is the URL of the gRPC server for the JVM-based proxy, Eg: "localhost:9801" +func PeerDBFlowJvmAddress() string { + return GetEnvString("PEERDB_FLOW_JVM_ADDRESS", "") +} diff --git a/flow/peerdbenv/features/iceberg.go b/flow/peerdbenv/features/iceberg.go new file mode 100644 index 000000000..4ece9c001 --- /dev/null +++ b/flow/peerdbenv/features/iceberg.go @@ -0,0 +1,21 @@ +package features + +import ( + "context" + "log/slog" + "strconv" + + "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +func IcebergFeatureStreamingDisabled(ctx context.Context) bool { + strValue := peerdbenv.GetEnvString("ICEBERG_FEATURE_STREAMING_DISABLED", "false") + value, err := strconv.ParseBool(strValue) + if err != nil { + // only log and return false + logger.LoggerFromCtx(ctx).Error("Failed to get ICEBERG_FEATURE_STREAMING_DISABLED", slog.Any("error", err)) + return false + } + return value +} diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index fbb4951d5..cb7c9fd76 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -955,5 +955,9 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu .and_then(|s| s.parse::().ok()) .unwrap_or_default(), }), + // TODO complete this for iceberg once finalized + DbType::Iceberg => Config::IcebergConfig(pt::peerdb_peers::IcebergConfig { + catalog_config: None, + }), })) } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index f25754496..f18160af2 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -104,6 +104,7 @@ impl Catalog { elasticsearch_config.encode_to_vec() } Config::MysqlConfig(mysql_config) => mysql_config.encode_to_vec(), + Config::IcebergConfig(iceberg_config) => {iceberg_config.encode_to_vec()} } }; @@ -351,6 +352,11 @@ impl Catalog { pt::peerdb_peers::MySqlConfig::decode(options).with_context(err)?; Config::MysqlConfig(mysql_config) } + DbType::Iceberg => { + let iceberg_config = + pt::peerdb_peers::IcebergConfig::decode(options).with_context(err)?; + Config::IcebergConfig(iceberg_config) + } }) } else { None diff --git a/protos/flow-jvm.proto b/protos/flow-jvm.proto new file mode 100644 index 000000000..33ba2d916 --- /dev/null +++ b/protos/flow-jvm.proto @@ -0,0 +1,123 @@ +syntax = "proto3"; + +package peerdb_flow_jvm; + +import "peers.proto"; + +option java_multiple_files = true; +option java_package = "io.peerdb.flow.jvm.grpc"; + +service IcebergProxyService { + rpc CreateTable(CreateTableRequest) returns (CreateTableResponse) {} + + rpc DropTable(DropTableRequest) returns (DropTableResponse) {} + + rpc CountRecords(CountRecordRequest) returns (CountRecordResponse) {} + + rpc InsertChanges(InsertChangesRequest) returns (InsertChangesResponse) {} + + rpc AppendRecords(AppendRecordsRequest) returns (AppendRecordsResponse) {} + + rpc StreamingAppendRecords(stream AppendRecordsStreamRequest) returns (AppendRecordsStreamResponse) {} +} + +message TableInfo { + repeated string namespace = 1; + string table_name = 2; + peerdb_peers.IcebergCatalog iceberg_catalog = 3; + repeated string primary_key = 4; +} + +message CreateTableRequest { + TableInfo table_info = 1; + string schema = 2; +} + +message CreateTableResponse { + string table_name = 1; +} + +message DropTableRequest { + TableInfo table_info = 1; + bool purge = 2; +} + +message DropTableResponse { + bool success = 1; +} + +message CountRecordRequest { + TableInfo table_info = 1; +} + +message CountRecordResponse { + int64 count = 1; +} + +message BranchOptions { + string branch = 1; + BranchCreateConflictPolicy branch_create_conflict_policy = 2; +} + +enum BranchCreateConflictPolicy { + ERROR = 0; + DROP = 1; + IGNORE = 2; +} + +message InsertChangesRequest { + TableInfo table_info = 1; + string schema = 2; + repeated RecordChange changes = 3; + optional BranchOptions branch_options = 4; +} + +message InsertRecord { + bytes record = 1; +} + +message DeleteRecord { + bytes record = 1; +} + +message UpdateRecord { + bytes record = 1; +} + +message RecordChange { + oneof change { + InsertRecord insert = 1; + DeleteRecord delete = 2; + UpdateRecord update = 3; + } +} +message InsertChangesResponse { + bool success = 1; +} + +message AppendRecordTableHeader { + TableInfo table_info = 1; + string schema = 2; + optional string idempotency_key = 3; +} + +message AppendRecordsRequest { + AppendRecordTableHeader table_header = 1; + repeated InsertRecord records = 2; +} + +message AppendRecordsResponse { + bool success = 1; +} + + +message AppendRecordsStreamRequest { + oneof command { + AppendRecordTableHeader table_header = 1; + InsertRecord record = 2; + } +} + +message AppendRecordsStreamResponse { + bool success = 1; +} diff --git a/protos/peers.proto b/protos/peers.proto index eb7ac528b..845404998 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -2,6 +2,9 @@ syntax = "proto3"; package peerdb_peers; +option java_multiple_files = true; +option java_package = "io.peerdb.flow.peers"; + message SSHConfig { string host = 1; uint32 port = 2; @@ -102,7 +105,7 @@ message S3Config { optional string endpoint = 6; } -message ClickhouseConfig{ +message ClickhouseConfig { string host = 1; uint32 port = 2; string user = 3; @@ -160,6 +163,72 @@ message ElasticsearchConfig { optional string api_key = 5; } +message IcebergConfig { + IcebergCatalog catalog_config = 1; +} + +message IcebergS3IoConfig { + optional string access_key_id = 1; + optional string secret_access_key = 2; + optional string endpoint = 3; + // Set to true to use for services like MinIO + optional bool path_style_access = 4; + // For enabling cross region bucket access, works when https://github.com/apache/iceberg/issues/9785 is resolved + optional string cross_region_access_enabled = 5; +} + +message IcebergGCSIoConfig { + optional string project_id = 1; + optional string bucket = 2; + optional string credentials = 3; +} + +message IcebergIOConfig { + oneof config { + IcebergS3IoConfig s3 = 1; + IcebergGCSIoConfig gcs = 2; + } +} + +message CommonIcebergCatalog { + string name = 1; + string uri = 2; + string warehouse_location = 3; + optional int32 client_pool_size = 4; + optional bool cache_enabled = 5; + map hadoop_properties = 6; +} + +message HiveIcebergCatalog {} + +message HadoopIcebergCatalog {} + +message RestIcebergCatalog {} + +message GlueIcebergCatalog {} + +message JdbcIcebergCatalog { + optional string user = 1; + optional string password = 2; + optional bool use_ssl = 3; + optional bool verify_server_certificate = 4; +} + +message NessieIcebergCatalog {} + +message IcebergCatalog { + CommonIcebergCatalog common_config = 1; + peerdb_peers.IcebergIOConfig io_config = 2; + oneof config { + HiveIcebergCatalog hive = 3; + HadoopIcebergCatalog hadoop = 4; + RestIcebergCatalog rest = 5; + GlueIcebergCatalog glue = 6; + JdbcIcebergCatalog jdbc = 7; + NessieIcebergCatalog nessie = 8; + } +} + enum DBType { BIGQUERY = 0; SNOWFLAKE = 1; @@ -173,6 +242,7 @@ enum DBType { PUBSUB = 10; EVENTHUBS = 11; ELASTICSEARCH = 12; + ICEBERG = 13; } message Peer { @@ -191,5 +261,6 @@ message Peer { PubSubConfig pubsub_config = 13; ElasticsearchConfig elasticsearch_config = 14; MySqlConfig mysql_config = 15; + IcebergConfig iceberg_config = 16; } } diff --git a/stacks/jvm.Dockerfile b/stacks/jvm.Dockerfile new file mode 100644 index 000000000..d7b5a9057 --- /dev/null +++ b/stacks/jvm.Dockerfile @@ -0,0 +1,116 @@ +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode +# +# If you want to include the debug port into your docker image +# you will have to expose the debug port (default 5005 being the default) like this : EXPOSE 8080 5005. +# Additionally you will have to set -e JAVA_DEBUG=true and -e JAVA_DEBUG_PORT=*:5005 +# when running the container +# +# Then run the container using : +# +# docker run -i --rm -p 8080:8080 quarkus/flow-jvm-jvm +# +# This image uses the `run-java.sh` script to run the application. +# This scripts computes the command line to execute your Java application, and +# includes memory/GC tuning. +# You can configure the behavior using the following environment properties: +# - JAVA_OPTS: JVM options passed to the `java` command (example: "-verbose:class") +# - JAVA_OPTS_APPEND: User specified Java options to be appended to generated options +# in JAVA_OPTS (example: "-Dsome.property=foo") +# - JAVA_MAX_MEM_RATIO: Is used when no `-Xmx` option is given in JAVA_OPTS. This is +# used to calculate a default maximal heap memory based on a containers restriction. +# If used in a container without any memory constraints for the container then this +# option has no effect. If there is a memory constraint then `-Xmx` is set to a ratio +# of the container available memory as set here. The default is `50` which means 50% +# of the available memory is used as an upper boundary. You can skip this mechanism by +# setting this value to `0` in which case no `-Xmx` option is added. +# - JAVA_INITIAL_MEM_RATIO: Is used when no `-Xms` option is given in JAVA_OPTS. This +# is used to calculate a default initial heap memory based on the maximum heap memory. +# If used in a container without any memory constraints for the container then this +# option has no effect. If there is a memory constraint then `-Xms` is set to a ratio +# of the `-Xmx` memory as set here. The default is `25` which means 25% of the `-Xmx` +# is used as the initial heap size. You can skip this mechanism by setting this value +# to `0` in which case no `-Xms` option is added (example: "25") +# - JAVA_MAX_INITIAL_MEM: Is used when no `-Xms` option is given in JAVA_OPTS. +# This is used to calculate the maximum value of the initial heap memory. If used in +# a container without any memory constraints for the container then this option has +# no effect. If there is a memory constraint then `-Xms` is limited to the value set +# here. The default is 4096MB which means the calculated value of `-Xms` never will +# be greater than 4096MB. The value of this variable is expressed in MB (example: "4096") +# - JAVA_DIAGNOSTICS: Set this to get some diagnostics information to standard output +# when things are happening. This option, if set to true, will set +# `-XX:+UnlockDiagnosticVMOptions`. Disabled by default (example: "true"). +# - JAVA_DEBUG: If set remote debugging will be switched on. Disabled by default (example: +# true"). +# - JAVA_DEBUG_PORT: Port used for remote debugging. Defaults to 5005 (example: "8787"). +# - CONTAINER_CORE_LIMIT: A calculated core limit as described in +# https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt. (example: "2") +# - CONTAINER_MAX_MEMORY: Memory limit given to the container (example: "1024"). +# - GC_MIN_HEAP_FREE_RATIO: Minimum percentage of heap free after GC to avoid expansion. +# (example: "20") +# - GC_MAX_HEAP_FREE_RATIO: Maximum percentage of heap free after GC to avoid shrinking. +# (example: "40") +# - GC_TIME_RATIO: Specifies the ratio of the time spent outside the garbage collection. +# (example: "4") +# - GC_ADAPTIVE_SIZE_POLICY_WEIGHT: The weighting given to the current GC time versus +# previous GC times. (example: "90") +# - GC_METASPACE_SIZE: The initial metaspace size. (example: "20") +# - GC_MAX_METASPACE_SIZE: The maximum metaspace size. (example: "100") +# - GC_CONTAINER_OPTIONS: Specify Java GC to use. The value of this variable should +# contain the necessary JRE command-line options to specify the required GC, which +# will override the default of `-XX:+UseParallelGC` (example: -XX:+UseG1GC). +# - HTTPS_PROXY: The location of the https proxy. (example: "myuser@127.0.0.1:8080") +# - HTTP_PROXY: The location of the http proxy. (example: "myuser@127.0.0.1:8080") +# - NO_PROXY: A comma separated lists of hosts, IP addresses or domains that can be +# accessed directly. (example: "foo.example.com,bar.example.com") +# +### +FROM gradle:8.8.0-jdk21 as builder + + + +WORKDIR /home/gradle/work + +# Copy the project files +COPY --chown=gradle:gradle flow-jvm/gradle gradle +COPY --chown=gradle:gradle flow-jvm/*.gradle flow-jvm/gradle.properties ./ + +# Build once to cache dependencies +RUN gradle build -x quarkusGenerateCode -x quarkusGenerateCodeDev --stacktrace --info + +# Gathers all proto files +COPY --chown=gradle:gradle protos/*.proto ../protos/ +# Adds any extra configuration files if needed for build +COPY --chown=gradle:gradle flow-jvm/src/main/resources ./src/main/resources + +# Only generate the code from the proto files +RUN gradle quarkusGenerateCode -x quarkusGenerateCodeDev --stacktrace --info + + +COPY --chown=gradle:gradle flow-jvm . +# Finally build the project +RUN gradle build --stacktrace --info + + +FROM registry.access.redhat.com/ubi8/openjdk-21:1.19 as runner + + + +ENV LANGUAGE='en_US:en' + + +# We make four distinct layers so if there are application changes the library layers can be re-used +COPY --from=builder --chown=185 /home/gradle/work/build/quarkus-app/lib/ /deployments/lib/ +COPY --from=builder --chown=185 /home/gradle/work/build/quarkus-app/*.jar /deployments/ +COPY --from=builder --chown=185 /home/gradle/work/build/quarkus-app/app/ /deployments/app/ +COPY --from=builder --chown=185 /home/gradle/work/build/quarkus-app/quarkus/ /deployments/quarkus/ + +# GRPC Port can be changed via FLOW_JVM_PORT env var +EXPOSE 9801 +USER 185 +ENV JAVA_OPTS_APPEND="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager" +ENV JAVA_APP_JAR="/deployments/quarkus-run.jar" +ENV PEERDB_LOG_LEVEL="INFO" + +ENTRYPOINT [ "/opt/jboss/container/java/run/run-java.sh" ] + diff --git a/ui/app/api/peers/getTruePeer.ts b/ui/app/api/peers/getTruePeer.ts index 55435a30d..0c2ebfc10 100644 --- a/ui/app/api/peers/getTruePeer.ts +++ b/ui/app/api/peers/getTruePeer.ts @@ -4,6 +4,7 @@ import { ClickhouseConfig, ElasticsearchConfig, EventHubGroupConfig, + IcebergConfig, KafkaConfig, MySqlConfig, Peer, @@ -54,6 +55,9 @@ export const getTruePeer = (peer: CatalogPeer) => { case 12: newPeer.elasticsearchConfig = ElasticsearchConfig.decode(options); break; + case 13: + newPeer.icebergConfig = IcebergConfig.decode(options); + break; default: return newPeer; } diff --git a/ui/app/api/peers/route.ts b/ui/app/api/peers/route.ts index 607f3658a..dd990df53 100644 --- a/ui/app/api/peers/route.ts +++ b/ui/app/api/peers/route.ts @@ -10,6 +10,7 @@ import { DBType, ElasticsearchConfig, EventHubGroupConfig, + IcebergConfig, KafkaConfig, Peer, PostgresConfig, @@ -89,6 +90,12 @@ const constructPeer = ( type: DBType.ELASTICSEARCH, elasticsearchConfig: config as ElasticsearchConfig, }; + case 'ICEBERG': + return { + name, + type: DBType.ICEBERG, + icebergConfig: config as IcebergConfig, + }; default: return; } diff --git a/ui/app/dto/PeersDTO.ts b/ui/app/dto/PeersDTO.ts index 81c559e77..6cdf529e6 100644 --- a/ui/app/dto/PeersDTO.ts +++ b/ui/app/dto/PeersDTO.ts @@ -5,6 +5,7 @@ import { ElasticsearchConfig, EventHubConfig, EventHubGroupConfig, + IcebergConfig, KafkaConfig, PostgresConfig, PubSubConfig, @@ -54,7 +55,8 @@ export type PeerConfig = | PubSubConfig | EventHubConfig | EventHubGroupConfig - | ElasticsearchConfig; + | ElasticsearchConfig + | IcebergConfig; export type CatalogPeer = { id: number; name: string; diff --git a/ui/app/peers/create/[peerType]/handlers.ts b/ui/app/peers/create/[peerType]/handlers.ts index 477818244..64a6b9395 100644 --- a/ui/app/peers/create/[peerType]/handlers.ts +++ b/ui/app/peers/create/[peerType]/handlers.ts @@ -10,6 +10,7 @@ import { chSchema, ehGroupSchema, esSchema, + iceSchema, kaSchema, peerNameSchema, pgSchema, @@ -77,10 +78,14 @@ const validateFields = ( case 'ELASTICSEARCH': const esConfig = esSchema.safeParse(config); if (!esConfig.success) { - console.log(esConfig.error); validationErr = esConfig.error.issues[0].message; } break; + case 'ICEBERG': + const icebergConfig = iceSchema.safeParse(config); + if (!icebergConfig.success) + validationErr = icebergConfig.error.issues[0].message; + break; default: validationErr = 'Unsupported peer type ' + type; } diff --git a/ui/app/peers/create/[peerType]/helpers/ice.ts b/ui/app/peers/create/[peerType]/helpers/ice.ts new file mode 100644 index 000000000..fc66a31b0 --- /dev/null +++ b/ui/app/peers/create/[peerType]/helpers/ice.ts @@ -0,0 +1,339 @@ +import { PeerSetter } from '@/app/dto/PeersDTO'; +import { + CommonIcebergCatalog, + IcebergCatalog, + IcebergConfig, + IcebergIOConfig, + IcebergS3IoConfig, + JdbcIcebergCatalog, +} from '@/grpc_generated/peers'; +import { PeerSetting } from './common'; + +export const CommonConfigSettings: PeerSetting[] = [ + { + label: 'Catalog name', + stateHandler: (value, setter) => + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentCatalogConfig = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const newCatalogConfig = { + ...currentCatalogConfig, + commonConfig: { + ...(currentCatalogConfig.commonConfig ?? blankCommonConfig), + name: value as string, + }, + }; + return { ...curr, catalogConfig: newCatalogConfig }; + }), + tips: 'Name for the Iceberg Catalog (should be the same as used by the querying engine)', + helpfulLink: + 'https://iceberg.apache.org/docs/1.5.2/configuration/?h=catalog#catalog-properties', + }, + { + label: 'URI', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentCatalogConfig = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const newCatalogConfig: IcebergCatalog = { + ...currentCatalogConfig, + commonConfig: { + ...(currentCatalogConfig.commonConfig ?? blankCommonConfig), + uri: (value as string) || '', + }, + }; + return { ...curr, catalogConfig: newCatalogConfig }; + }); + }, + tips: 'URI of the catalog (eg thrift://hive-host:9083)', + }, + { + label: 'Warehouse location', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentCatalogConfig = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const newCatalogConfig: IcebergCatalog = { + ...currentCatalogConfig, + commonConfig: { + ...(currentCatalogConfig.commonConfig ?? blankCommonConfig), + warehouseLocation: value as string, + }, + }; + return { ...curr, catalogConfig: newCatalogConfig }; + }); + }, + tips: 'URI to the warehouse location (eg s3://mybucket/mypath/subpath)', + }, + { + label: 'Client pool size', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentIcebergCatalog = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const newCatalogConfig: IcebergCatalog = { + ...currentIcebergCatalog, + commonConfig: { + ...(currentIcebergCatalog.commonConfig ?? blankCommonConfig), + clientPoolSize: parseInt(value as string), + }, + }; + return { ...curr, catalogConfig: newCatalogConfig }; + }); + }, + tips: 'Number of clients to keep in the pool', + type: 'number', + optional: true, + }, +]; + +export const FileIoSettings: PeerSetting[] = [ + { + label: 'Access Key ID', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentIcebergCatalog = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const currentIoConfig = currentIcebergCatalog.ioConfig ?? blankIoConfig; + const newFileIoConfig: IcebergCatalog = { + ...currentIcebergCatalog, + ioConfig: { + ...currentIoConfig, + s3: { + ...(currentIoConfig.s3 ?? blankS3IcebergConfig), + accessKeyId: value as string, + }, + }, + }; + return { ...curr, catalogConfig: newFileIoConfig }; + }); + }, + tips: 'The AWS access key ID associated with your account.', + helpfulLink: + 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', + }, + { + label: 'Secret Access Key', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentIcebergCatalog = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const currentIoConfig = currentIcebergCatalog.ioConfig ?? blankIoConfig; + const newFileIoConfig: IcebergCatalog = { + ...currentIcebergCatalog, + ioConfig: { + ...currentIoConfig, + s3: { + ...(currentIoConfig.s3 ?? blankS3IcebergConfig), + secretAccessKey: value as string, + }, + }, + }; + return { ...curr, catalogConfig: newFileIoConfig }; + }); + }, + tips: 'The AWS secret access key associated with your account.', + helpfulLink: + 'https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html', + }, + { + label: 'Endpoint', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentIcebergCatalog = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const currentIoConfig = currentIcebergCatalog.ioConfig ?? blankIoConfig; + const newFileIoConfig: IcebergCatalog = { + ...currentIcebergCatalog, + ioConfig: { + ...currentIoConfig, + s3: { + ...(currentIoConfig.s3 ?? blankS3IcebergConfig), + endpoint: value as string, + }, + }, + }; + return { ...curr, catalogConfig: newFileIoConfig }; + }); + }, + tips: 'The endpoint of your S3 bucket. This is optional.', + optional: true, + }, + { + label: 'Path style access', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentIcebergCatalog = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const currentIoConfig = currentIcebergCatalog.ioConfig ?? blankIoConfig; + const newFileIoConfig: IcebergCatalog = { + ...currentIcebergCatalog, + ioConfig: { + ...currentIoConfig, + s3: { + ...(currentIoConfig.s3 ?? blankS3IcebergConfig), + pathStyleAccess: value as boolean, + }, + }, + }; + return { ...curr, catalogConfig: newFileIoConfig }; + }); + }, + type: 'switch', + tips: 'Set to true to use for services like MinIO. This is optional', + optional: true, + }, +]; + +export const JdbcConfigSettings: PeerSetting[] = [ + { + label: 'User', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentIcebergCatalog = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const jdbcCatalog: IcebergCatalog = { + ...currentIcebergCatalog, + jdbc: { + ...(currentIcebergCatalog.jdbc ?? blankJdbcConfig), + user: value as string, + }, + }; + return { ...curr, catalogConfig: jdbcCatalog }; + }); + }, + tips: 'Username for the JDBC connection', + helpfulLink: 'https://iceberg.apache.org/docs/1.5.2/jdbc/', + }, + { + label: 'Password', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentIcebergCatalog = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const jdbcCatalog: IcebergCatalog = { + ...currentIcebergCatalog, + jdbc: { + ...(currentIcebergCatalog.jdbc ?? blankJdbcConfig), + password: value as string, + }, + }; + return { ...curr, catalogConfig: jdbcCatalog }; + }); + }, + tips: 'Password for the JDBC connection', + type: 'password', + }, + { + label: 'Use SSL?', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentIcebergCatalog = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const jdbcCatalog: IcebergCatalog = { + ...currentIcebergCatalog, + jdbc: { + ...(currentIcebergCatalog.jdbc ?? blankJdbcConfig), + useSsl: value as boolean, + }, + }; + return { ...curr, catalogConfig: jdbcCatalog }; + }); + }, + type: 'switch', + optional: true, + tips: 'To enables SSL for the JDBC connection', + }, + { + label: 'Verify server certificate?', + stateHandler: (value, setter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentIcebergCatalog = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const jdbcCatalog: IcebergCatalog = { + ...currentIcebergCatalog, + jdbc: { + ...(currentIcebergCatalog.jdbc ?? blankJdbcConfig), + verifyServerCertificate: value as boolean, + }, + }; + return { ...curr, catalogConfig: jdbcCatalog }; + }); + }, + type: 'switch', + optional: true, + tips: 'To verify the server certificate for the JDBC connection. This is optional', + }, +]; + +export const handleHiveSelection = (setter: PeerSetter) => { + setter((curr) => { + const currentIcebergConfig = curr as IcebergConfig; + const currentCatalogConfig = + currentIcebergConfig.catalogConfig ?? blankCatalogConfig; + const newCatalogConfig: IcebergCatalog = { + ...currentCatalogConfig, + jdbc: undefined, + hive: {}, + }; + return { ...curr, catalogConfig: newCatalogConfig }; + }); +}; + +const blankCommonConfig: CommonIcebergCatalog = { + name: '', + uri: '', + warehouseLocation: '', + clientPoolSize: undefined, + hadoopProperties: {}, +}; + +const blankS3IcebergConfig: IcebergS3IoConfig = { + accessKeyId: '', + secretAccessKey: '', + endpoint: '', + pathStyleAccess: false, +}; + +const blankIoConfig: IcebergIOConfig = { + s3: { + accessKeyId: '', + secretAccessKey: '', + endpoint: '', + pathStyleAccess: false, + }, +}; + +const blankJdbcConfig: JdbcIcebergCatalog = { + user: '', + password: '', + useSsl: false, + verifyServerCertificate: false, +}; + +const blankCatalogConfig: IcebergCatalog = { + commonConfig: blankCommonConfig, + ioConfig: blankIoConfig, + hive: undefined, + hadoop: undefined, + rest: undefined, + glue: undefined, + jdbc: undefined, + nessie: undefined, +}; + +export const blankIcebergConfig: IcebergConfig = { + catalogConfig: blankCatalogConfig, +}; diff --git a/ui/app/peers/create/[peerType]/page.tsx b/ui/app/peers/create/[peerType]/page.tsx index d0de7cab0..c9647c0eb 100644 --- a/ui/app/peers/create/[peerType]/page.tsx +++ b/ui/app/peers/create/[peerType]/page.tsx @@ -13,9 +13,11 @@ import { notifyErr } from '@/app/utils/notify'; import TitleCase from '@/app/utils/titlecase'; import ElasticsearchConfigForm from '@/components/PeerForms/ElasticsearchConfigForm'; import EventhubsForm from '@/components/PeerForms/Eventhubs/EventhubGroupConfig'; +import IcebergConfigForm from '@/components/PeerForms/Iceberg/IcebergConfig'; import { ElasticsearchConfig, EventHubGroupConfig, + IcebergConfig, } from '@/grpc_generated/peers'; import { Button } from '@/lib/Button'; import { ButtonGroup } from '@/lib/ButtonGroup'; @@ -99,6 +101,13 @@ export default function CreateConfig({ setter={setConfig} /> ); + case 'ICEBERG': + return ( + + ); default: return <>; } diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts index 498613edb..3ece4a1b2 100644 --- a/ui/app/peers/create/[peerType]/schema.ts +++ b/ui/app/peers/create/[peerType]/schema.ts @@ -508,3 +508,81 @@ export const esSchema = z message: 'Authentication info not valid', } ); + +const CommonIcebergCatalogSchema = z.object({ + name: z + .string({ + required_error: 'Catalog name is required', + invalid_type_error: 'Catalog name must be a string', + }) + .min(1, { message: 'Catalog name must be non-empty' }), + uri: z + .string({ + required_error: 'Catalog URI is required', + invalid_type_error: 'Catalog URI must be a string', + }) + .url('URI must be of URI format') + .min(1, { message: 'Catalog URI must be non-empty' }), + warehouseLocation: z + .string({ + required_error: 'Warehouse location is required', + invalid_type_error: 'Warehouse location must be a string', + }) + .url('Warehouse location must be of URI format') + .min(1, { message: 'Warehouse location must be non-empty' }), + clientPoolSize: z.number().optional(), + hadoopProperties: z.record(z.string()).optional(), +}); + +const IcebergS3IoConfigSchema = z.object({ + accessKeyId: z + .string({ + required_error: 'Access key ID is required', + invalid_type_error: 'Access key ID must be a string', + }) + .min(1, { message: 'Access key ID must be non-empty' }), + secretAccessKey: z + .string({ + required_error: 'Secret access key is required', + invalid_type_error: 'Secret access key must be a string', + }) + .min(1, { message: 'Secret access key must be non-empty' }), + endpoint: z.string().optional(), + pathStyleAccess: z.boolean(), +}); + +const IcebergIOConfigSchema = z.object({ + s3: IcebergS3IoConfigSchema, +}); + +const JdbcIcebergCatalogSchema = z.object({ + user: z + .string({ + required_error: 'User is required', + invalid_type_error: 'User must be a string', + }) + .min(1, { message: 'User must be non-empty' }), + password: z + .string({ + required_error: 'Password is required', + invalid_type_error: 'Password must be a string', + }) + .min(1, { message: 'Password must be non-empty' }), + useSsl: z.boolean(), + verifyServerCertificate: z.boolean(), +}); + +const IcebergCatalogSchema = z.object({ + commonConfig: CommonIcebergCatalogSchema, + ioConfig: IcebergIOConfigSchema, + hive: z.unknown().optional(), + hadoop: z.unknown().optional(), + rest: z.unknown().optional(), + glue: z.unknown().optional(), + jdbc: JdbcIcebergCatalogSchema.optional(), + nessie: z.unknown().optional(), +}); + +export const iceSchema = z.object({ + catalogConfig: IcebergCatalogSchema, +}); diff --git a/ui/components/PeerComponent.tsx b/ui/components/PeerComponent.tsx index 5689ea845..cb1602072 100644 --- a/ui/components/PeerComponent.tsx +++ b/ui/components/PeerComponent.tsx @@ -50,6 +50,9 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => { case DBType.ELASTICSEARCH: case 'ELASTICSEARCH': return '/svgs/elasticsearch.svg'; + case DBType.ICEBERG: + case 'ICEBERG': + return '/images/iceberg.png'; default: return '/svgs/pg.svg'; } diff --git a/ui/components/PeerForms/Iceberg/IcebergConfig.tsx b/ui/components/PeerForms/Iceberg/IcebergConfig.tsx new file mode 100644 index 000000000..49aeef44b --- /dev/null +++ b/ui/components/PeerForms/Iceberg/IcebergConfig.tsx @@ -0,0 +1,267 @@ +'use client'; +import { PeerSetter } from '@/app/dto/PeersDTO'; +import { + CommonConfigSettings, + FileIoSettings, + JdbcConfigSettings, + handleHiveSelection, +} from '@/app/peers/create/[peerType]/helpers/ice'; +import { InfoPopover } from '@/components/InfoPopover'; +import { IcebergConfig } from '@/grpc_generated/peers'; +import { Label } from '@/lib/Label'; +import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout'; +import { Switch } from '@/lib/Switch'; +import { TextField } from '@/lib/TextField'; +import { Tooltip } from '@/lib/Tooltip/Tooltip'; +import { useEffect, useState } from 'react'; +import ReactSelect from 'react-select'; + +interface IcebergConfigProps { + icebergConfig: IcebergConfig; + setter: PeerSetter; +} + +const IcebergConfigForm = ({ icebergConfig, setter }: IcebergConfigProps) => { + const specificCatalogOptions = [ + { value: 'hive', label: 'Hive' }, + { value: 'jdbc', label: 'JDBC' }, + ]; + + const [specificCatalog, setSpecificCatalog] = useState<'hive' | 'jdbc'>( + 'jdbc' + ); + + useEffect(() => { + if (specificCatalog === 'hive') { + handleHiveSelection(setter); + } + }, [specificCatalog, setter]); + + return ( +
+
+ + {CommonConfigSettings.map((setting) => ( + + {setting.label} + {!setting.optional && ( + + + + )} + + } + action={ +
+ setting.stateHandler(e.target.value, setter)} + /> + {setting.tips && ( + + )} +
+ } + /> + ))} +
+ +
+ + {FileIoSettings.map((setting) => + setting.type == 'switch' ? ( + {setting.label}} + action={ +
+ + setting.stateHandler(checked, setter) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ) : ( + + {setting.label} + {!setting.optional && ( + + + + )} + + } + action={ +
+ + setting.stateHandler(e.target.value, setter) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ) + )} +
+ +
+ + Choose Iceberg Catalog Type + + + + + } + action={ + option.value === specificCatalog + )} + onChange={(option) => { + if (option) { + setSpecificCatalog(option.value as 'hive' | 'jdbc'); + } + }} + /> + } + /> + + {specificCatalog === 'jdbc' && ( +
+ + {JdbcConfigSettings.map((setting) => + setting.type == 'switch' ? ( + {setting.label}} + action={ +
+ + setting.stateHandler(checked, setter) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ) : ( + + {setting.label} + {!setting.optional && ( + + + + )} + + } + action={ +
+ + setting.stateHandler(e.target.value, setter) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ) + )} +
+ )} +
+
+ ); +}; + +export default IcebergConfigForm; diff --git a/ui/components/PeerTypeComponent.tsx b/ui/components/PeerTypeComponent.tsx index fbcc565e3..aa13a8f56 100644 --- a/ui/components/PeerTypeComponent.tsx +++ b/ui/components/PeerTypeComponent.tsx @@ -28,6 +28,8 @@ export const DBTypeToGoodText = (ptype?: DBType) => { return 'PubSub'; case DBType.ELASTICSEARCH: return 'Elasticsearch'; + case DBType.ICEBERG: + return 'Iceberg'; default: return 'Unrecognised'; } diff --git a/ui/components/SelectSource.tsx b/ui/components/SelectSource.tsx index 7cbe9d8b7..ef30631cd 100644 --- a/ui/components/SelectSource.tsx +++ b/ui/components/SelectSource.tsx @@ -42,7 +42,15 @@ const dbTypes = [ 'TEMBO', 'CRUNCHY POSTGRES', ], - ['Warehouses', 'SNOWFLAKE', 'BIGQUERY', 'S3', 'CLICKHOUSE', 'ELASTICSEARCH'], + [ + 'Warehouses', + 'SNOWFLAKE', + 'BIGQUERY', + 'S3', + 'CLICKHOUSE', + 'ELASTICSEARCH', + 'ICEBERG', + ], ['Queues', 'REDPANDA', 'CONFLUENT', 'KAFKA', 'EVENTHUBS', 'PUBSUB'], ]; diff --git a/ui/public/images/iceberg.png b/ui/public/images/iceberg.png new file mode 100644 index 000000000..e4a99c395 Binary files /dev/null and b/ui/public/images/iceberg.png differ