From 74c28a220ff8576b21f64d01051e62b28e08219d Mon Sep 17 00:00:00 2001 From: Xu Chen Date: Sat, 16 Dec 2023 15:52:22 +0800 Subject: [PATCH] [Spark] Support Gluten Vectorized Engine (#374) * initial support for gluten plugin Signed-off-by: chenxu * fix memory leak caused by inconsistent arrow version Signed-off-by: chenxu * remove compile time dependencies for gluten Signed-off-by: chenxu * refine gluten test Signed-off-by: chenxu * fix path cleanup in test Signed-off-by: chenxu * fix test case for gluten Signed-off-by: chenxu * temporarily disable gluten test Signed-off-by: chenxu --------- Signed-off-by: chenxu Co-authored-by: chenxu --- .github/workflows/deployment.yml | 40 +++---- .github/workflows/flink-cdc-hdfs-test.yml | 6 +- .github/workflows/flink-cdc-test.yml | 6 +- .github/workflows/maven-test.yml | 58 ++++----- .github/workflows/native-build.yml | 42 +++---- .github/workflows/presto-cdc-test.yml | 6 +- .github/workflows/rust-ci.yml | 2 +- .github/workflows/rust-clippy.yml | 2 +- .github/workflows/website-build.yml | 4 +- .github/workflows/website-publish.yml | 4 +- lakesoul-common/pom.xml | 2 - lakesoul-spark/pom.xml | 56 ++++++++- .../parquet/NativeVectorizedReader.java | 7 +- .../sql/LakeSoulSparkSessionExtension.scala | 4 +- .../parquet/NativeParquetOutputWriter.scala | 12 +- .../GlutenCompatPostInjectColumnar.scala | 43 +++++++ .../spark/sql/vectorized/GlutenUtils.scala | 61 ++++++++++ .../spark/sql/vectorized/NativeIOUtils.scala | 8 +- .../src/test/resources/log4j2-test.properties | 27 +++++ .../apache/spark/sql/lakesoul/CDCSuite.scala | 2 +- .../rules/LakeSoulGlutenCompatSuite.scala | 113 ++++++++++++++++++ native-io/lakesoul-io-java/pom.xml | 4 +- .../lakesoul/lakesoul/io/NativeIOBase.java | 17 ++- .../lakesoul/lakesoul/io/NativeIOReader.java | 3 +- .../lakesoul/LakeSoulArrowReader.scala | 41 ++++--- pom.xml | 1 + 26 files changed, 432 insertions(+), 139 deletions(-) create mode 100644 lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/rules/GlutenCompatPostInjectColumnar.scala create mode 100644 lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/GlutenUtils.scala create mode 100644 lakesoul-spark/src/test/resources/log4j2-test.properties create mode 100644 lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulGlutenCompatSuite.scala diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml index 0dfb86fa8..f224df8bd 100644 --- a/.github/workflows/deployment.yml +++ b/.github/workflows/deployment.yml @@ -14,9 +14,9 @@ jobs: build-linux-x86_64: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -34,11 +34,11 @@ jobs: use-cross: true command: build args: '--manifest-path rust/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features' - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu path: ./rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_io_c.so - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu path: ./rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_metadata_c.so @@ -46,9 +46,9 @@ jobs: build-windows-x86_64: runs-on: windows-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -70,11 +70,11 @@ jobs: with: command: build args: '--manifest-path rust/Cargo.toml --release --all-features' - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativeio-x86_64-pc-windows-msvc path: ./rust/target/release/lakesoul_io_c.dll - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-pc-windows-msvc path: ./rust/target/release/lakesoul_metadata_c.dll @@ -84,9 +84,9 @@ jobs: steps: - name: Install automake run: brew install automake - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -108,11 +108,11 @@ jobs: with: command: build args: '--manifest-path rust/Cargo.toml --release --all-features' - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativeio-x86_64-apple-darwin path: ./rust/target/release/liblakesoul_io_c.dylib - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-apple-darwin path: ./rust/target/release/liblakesoul_metadata_c.dylib @@ -122,33 +122,33 @@ jobs: runs-on: ubuntu-latest needs: [ build-linux-x86_64, build-windows-x86_64, build-macos-x86_64 ] steps: - - uses: actions/checkout@v3 - - uses: actions/download-artifact@v3 + - uses: actions/checkout@v4 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-apple-darwin path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-pc-windows-msvc path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-apple-darwin path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-pc-windows-msvc path: ./rust/target/release/ - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' diff --git a/.github/workflows/flink-cdc-hdfs-test.yml b/.github/workflows/flink-cdc-hdfs-test.yml index 5d882d899..72c33a63c 100644 --- a/.github/workflows/flink-cdc-hdfs-test.yml +++ b/.github/workflows/flink-cdc-hdfs-test.yml @@ -29,9 +29,9 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -185,7 +185,7 @@ jobs: - name: Upload Log if: always() continue-on-error: true - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: flink-cluster-log path: flink-session-cluster.log diff --git a/.github/workflows/flink-cdc-test.yml b/.github/workflows/flink-cdc-test.yml index f872f3f3c..78ce7edf3 100644 --- a/.github/workflows/flink-cdc-test.yml +++ b/.github/workflows/flink-cdc-test.yml @@ -29,9 +29,9 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -177,7 +177,7 @@ jobs: - name: Upload Log if: always() continue-on-error: true - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: flink-cluster-log path: flink-session-cluster.log diff --git a/.github/workflows/maven-test.yml b/.github/workflows/maven-test.yml index 87cc577f3..cb249b9fd 100644 --- a/.github/workflows/maven-test.yml +++ b/.github/workflows/maven-test.yml @@ -29,9 +29,9 @@ jobs: build-rust-linux-x86_64: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -49,11 +49,11 @@ jobs: use-cross: true command: build args: '--manifest-path rust/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features' - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_io_c.so - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_metadata_c.so @@ -84,9 +84,9 @@ jobs: - 5432:5432 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -101,11 +101,11 @@ jobs: with: version: "23.x" repo-token: ${{ secrets.GITHUB_TOKEN }} - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/release/ @@ -119,7 +119,7 @@ jobs: - name: Upload Test Report if: always() continue-on-error: true - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: maven-test-report-artifact-spark-1 path: lakesoul-spark/target/site @@ -152,9 +152,9 @@ jobs: - 5432:5432 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -169,11 +169,11 @@ jobs: with: version: "23.x" repo-token: ${{ secrets.GITHUB_TOKEN }} - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/release/ @@ -187,7 +187,7 @@ jobs: - name: Upload Test Report if: always() continue-on-error: true - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: maven-test-report-artifact-spark-2 path: lakesoul-spark/target/site @@ -220,9 +220,9 @@ jobs: - 5432:5432 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -249,11 +249,11 @@ jobs: $HADOOP_HOME/sbin/stop-dfs.sh $HADOOP_HOME/sbin/start-dfs.sh $HADOOP_HOME/bin/hadoop fs -chmod -R 777 / - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/release/ @@ -267,7 +267,7 @@ jobs: - name: Upload Test Report if: always() continue-on-error: true - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: maven-test-report-artifact-spark-2 path: lakesoul-spark/target/site @@ -300,9 +300,9 @@ jobs: - 5432:5432 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -317,11 +317,11 @@ jobs: with: version: "23.x" repo-token: ${{ secrets.GITHUB_TOKEN }} - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/release/ @@ -335,7 +335,7 @@ jobs: - name: Upload Test Report if: always() continue-on-error: true - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: maven-test-report-artifact-flink-1 path: lakesoul-flink/target/site @@ -368,9 +368,9 @@ jobs: - 5432:5432 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -405,11 +405,11 @@ jobs: $HADOOP_HOME/sbin/stop-dfs.sh $HADOOP_HOME/sbin/start-dfs.sh $HADOOP_HOME/bin/hadoop fs -chmod -R 777 / - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu-maven-test path: ./rust/target/release/ @@ -423,7 +423,7 @@ jobs: - name: Upload Test Report if: always() continue-on-error: true - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: maven-test-report-artifact-flink-1 path: lakesoul-flink/target/site diff --git a/.github/workflows/native-build.yml b/.github/workflows/native-build.yml index 446610226..9cd1088bf 100644 --- a/.github/workflows/native-build.yml +++ b/.github/workflows/native-build.yml @@ -26,9 +26,9 @@ jobs: build-linux-x86_64: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -47,11 +47,11 @@ jobs: use-cross: true command: build args: '--manifest-path rust/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features' - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu path: ./rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_io_c.so - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu path: ./rust/target/x86_64-unknown-linux-gnu/release/liblakesoul_metadata_c.so @@ -59,9 +59,9 @@ jobs: build-windows-x86_64: runs-on: windows-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -84,11 +84,11 @@ jobs: with: command: build args: '--manifest-path rust/Cargo.toml --release --all-features' - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativeio-x86_64-pc-windows-msvc path: ./rust/target/release/lakesoul_io_c.dll - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-pc-windows-msvc path: ./rust/target/release/lakesoul_metadata_c.dll @@ -98,9 +98,9 @@ jobs: steps: - name: Install automake run: brew install automake - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -123,11 +123,11 @@ jobs: with: command: build args: '--manifest-path rust/Cargo.toml --release --all-features' - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativeio-x86_64-apple-darwin path: ./rust/target/release/liblakesoul_io_c.dylib - - uses: actions/upload-artifact@master + - uses: actions/upload-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-apple-darwin path: ./rust/target/release/liblakesoul_metadata_c.dylib @@ -136,33 +136,33 @@ jobs: runs-on: ubuntu-latest needs: [ build-linux-x86_64, build-windows-x86_64, build-macos-x86_64 ] steps: - - uses: actions/checkout@v3 - - uses: actions/download-artifact@v3 + - uses: actions/checkout@v4 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-unknown-linux-gnu path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-apple-darwin path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativeio-x86_64-pc-windows-msvc path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-apple-darwin path: ./rust/target/release/ - - uses: actions/download-artifact@v3 + - uses: actions/download-artifact@v4 with: name: lakesoul-nativemetadata-x86_64-pc-windows-msvc path: ./rust/target/release/ - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -176,7 +176,7 @@ jobs: run: | MAVEN_OPTS="-Xmx4000m" mvn -q -B package --file pom.xml -Pcross-build -DskipTests -Dmaven.test.skip=true - name: Upload Package - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: maven-package-upload path: | diff --git a/.github/workflows/presto-cdc-test.yml b/.github/workflows/presto-cdc-test.yml index 3d6f6ba4e..914779fc0 100644 --- a/.github/workflows/presto-cdc-test.yml +++ b/.github/workflows/presto-cdc-test.yml @@ -29,9 +29,9 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up JDK 8 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '8' distribution: 'temurin' @@ -191,7 +191,7 @@ jobs: - name: Upload Log if: always() continue-on-error: true - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: flink-cluster-log path: flink-session-cluster.log diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index 06e875fed..9dda4e230 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -46,7 +46,7 @@ jobs: - 5432:5432 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install psql run: sudo apt-get install -y postgresql-client-14 - name: Install cargo diff --git a/.github/workflows/rust-clippy.yml b/.github/workflows/rust-clippy.yml index 9119efc19..b4384139e 100644 --- a/.github/workflows/rust-clippy.yml +++ b/.github/workflows/rust-clippy.yml @@ -26,7 +26,7 @@ jobs: clippy_check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: actions-rs/toolchain@v1 with: profile: minimal diff --git a/.github/workflows/website-build.yml b/.github/workflows/website-build.yml index 1b7f52b2b..f2d45cf29 100644 --- a/.github/workflows/website-build.yml +++ b/.github/workflows/website-build.yml @@ -21,8 +21,8 @@ jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions/setup-node@v3 + - uses: actions/checkout@v4 + - uses: actions/setup-node@v4 with: node-version: 18 cache-dependency-path: 'website/package-lock.json' diff --git a/.github/workflows/website-publish.yml b/.github/workflows/website-publish.yml index 540fd2afd..3982f265b 100644 --- a/.github/workflows/website-publish.yml +++ b/.github/workflows/website-publish.yml @@ -13,8 +13,8 @@ jobs: publish: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions/setup-node@v3 + - uses: actions/checkout@v4 + - uses: actions/setup-node@v4 with: node-version: 18 cache-dependency-path: 'website/package-lock.json' diff --git a/lakesoul-common/pom.xml b/lakesoul-common/pom.xml index 1ae690d62..ac29cb845 100644 --- a/lakesoul-common/pom.xml +++ b/lakesoul-common/pom.xml @@ -117,8 +117,6 @@ SPDX-License-Identifier: Apache-2.0 42.5.1 - 2.12.10 - 2.12 provided 0.6.1 3.5.1 diff --git a/lakesoul-spark/pom.xml b/lakesoul-spark/pom.xml index 59c4eea0a..b3ca587e5 100644 --- a/lakesoul-spark/pom.xml +++ b/lakesoul-spark/pom.xml @@ -20,14 +20,13 @@ SPDX-License-Identifier: Apache-2.0 2.4.0-spark-3.3-SNAPSHOT - 2.12.10 - 2.12 3.2.14 3.3.1 7.3.1 1.11.0 8.0.30 2.12.7 + 1.1.0 provided 2 @@ -275,6 +274,51 @@ SPDX-License-Identifier: Apache-2.0 test + + + io.glutenproject + backends-velox + ${gluten.version} + test + + + io.glutenproject + gluten-core + ${gluten.version} + + + io.glutenproject + spark-sql-columnar-shims-spark32 + + + test + + + io.glutenproject + gluten-core + ${gluten.version} + + + io.glutenproject + spark-sql-columnar-shims-spark32 + + + tests + test + + + io.glutenproject + gluten-data + ${gluten.version} + test + + + + + + + + @@ -454,19 +498,19 @@ SPDX-License-Identifier: Apache-2.0 org.apache.arrow.flatbuf - com.lakesoul.shaded.org.apache.arrow.flatbuf + io.glutenproject.shaded.org.apache.arrow.flatbuf org.apache.arrow.memory - com.lakesoul.shaded.org.apache.arrow.memory + io.glutenproject.shaded.org.apache.arrow.memory org.apache.arrow.util - com.lakesoul.shaded.org.apache.arrow.util + io.glutenproject.shaded.org.apache.arrow.util org.apache.arrow.vector - com.lakesoul.shaded.org.apache.arrow.vector + io.glutenproject.shaded.org.apache.arrow.vector com.google.protobuf diff --git a/lakesoul-spark/src/main/java/org/apache/spark/sql/execution/datasources/parquet/NativeVectorizedReader.java b/lakesoul-spark/src/main/java/org/apache/spark/sql/execution/datasources/parquet/NativeVectorizedReader.java index db39d203d..51e302405 100644 --- a/lakesoul-spark/src/main/java/org/apache/spark/sql/execution/datasources/parquet/NativeVectorizedReader.java +++ b/lakesoul-spark/src/main/java/org/apache/spark/sql/execution/datasources/parquet/NativeVectorizedReader.java @@ -25,10 +25,7 @@ import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.apache.spark.sql.vectorized.NativeIOOptions; -import org.apache.spark.sql.vectorized.NativeIOUtils; +import org.apache.spark.sql.vectorized.*; import java.io.IOException; import java.time.ZoneId; @@ -175,6 +172,7 @@ public void initialize(InputSplit[] inputSplits, TaskAttemptContext taskAttemptC @Override public void close() throws IOException { + closeCurrentBatch(); if (columnarBatch != null) { columnarBatch.close(); columnarBatch = null; @@ -229,6 +227,7 @@ public void setThreadNum(int threadNum) { private void recreateNativeReader() throws IOException { close(); NativeIOReader reader = new NativeIOReader(); + GlutenUtils.setArrowAllocator(reader); for (String path : filePathList) { reader.addFile(path); } diff --git a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/sql/LakeSoulSparkSessionExtension.scala b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/sql/LakeSoulSparkSessionExtension.scala index 8bd516be0..aca29041b 100644 --- a/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/sql/LakeSoulSparkSessionExtension.scala +++ b/lakesoul-spark/src/main/scala/com/dmetasoul/lakesoul/sql/LakeSoulSparkSessionExtension.scala @@ -108,6 +108,8 @@ class LakeSoulSparkSessionExtension extends (SparkSessionExtensions => Unit) { SetPartitionAndOrdering(session) } - + extensions.injectColumnar { session => + GlutenCompatPostInjectColumnar(session) + } } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala index 9ad145314..d525a9d70 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/NativeParquetOutputWriter.scala @@ -5,7 +5,6 @@ package org.apache.spark.sql.execution.datasources.v2.parquet import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter -import com.dmetasoul.lakesoul.lakesoul.memory.ArrowMemoryUtils import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.types.pojo.Schema @@ -17,7 +16,7 @@ import org.apache.spark.sql.execution.datasources.OutputWriter import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.NativeIOUtils +import org.apache.spark.sql.vectorized.{GlutenUtils, NativeIOUtils} class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZoneId: String, context: TaskAttemptContext) extends OutputWriter { @@ -26,7 +25,10 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo private var recordCount = 0 val arrowSchema: Schema = ArrowUtils.toArrowSchema(dataSchema, timeZoneId) + private val nativeIOWriter: NativeIOWriter = new NativeIOWriter(arrowSchema) + + GlutenUtils.setArrowAllocator(nativeIOWriter) nativeIOWriter.setRowGroupRowNumber(NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE) nativeIOWriter.addFile(path) @@ -34,8 +36,7 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo nativeIOWriter.initializeWriter() - val allocator: BufferAllocator = - ArrowMemoryUtils.rootAllocator.newChildAllocator("toBatchIterator", 0, Long.MaxValue) + val allocator: BufferAllocator = nativeIOWriter.getAllocator private val root: VectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, allocator) @@ -59,10 +60,9 @@ class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZo nativeIOWriter.write(root) nativeIOWriter.flush() - nativeIOWriter.close() recordWriter.reset() root.close() - allocator.close() + nativeIOWriter.close() } } diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/rules/GlutenCompatPostInjectColumnar.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/rules/GlutenCompatPostInjectColumnar.scala new file mode 100644 index 000000000..c08c4638b --- /dev/null +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/lakesoul/rules/GlutenCompatPostInjectColumnar.scala @@ -0,0 +1,43 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package org.apache.spark.sql.lakesoul.rules + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.merge.MergeDeltaParquetScan +import org.apache.spark.sql.execution.{ColumnarRule, ColumnarToRowExec, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.vectorized.GlutenUtils + +/** + * To be compatible with Gluten, we remove RowToVeloxColumnar and ColumnarToRow wraps + * for lakesoul's batch scan, since lakesoul has already read data as arrow vectors. + * + * @param session + */ +case class GlutenCompatPostInjectColumnar(session: SparkSession) extends ColumnarRule { + + private def isLakeSoulScan(scan: Scan): Boolean = { + scan.getClass.getSimpleName.contains("NativeParquetScan") || + scan.isInstanceOf[MergeDeltaParquetScan] + } + + private def transform(plan: SparkPlan): SparkPlan = plan match { + case UnaryExecNode(plan, ColumnarToRowExec(scan: BatchScanExec)) + if plan.getClass.getName == "io.glutenproject.execution.RowToVeloxColumnarExec" && + isLakeSoulScan(scan.scan) + => scan + case p => + p.withNewChildren(p.children.map(transform)) + } + + override def postColumnarTransitions: Rule[SparkPlan] = plan => { + if (GlutenUtils.isGlutenEnabled) + transform(plan) + else + plan + } +} diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/GlutenUtils.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/GlutenUtils.scala new file mode 100644 index 000000000..10ad6aa32 --- /dev/null +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/GlutenUtils.scala @@ -0,0 +1,61 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package org.apache.spark.sql.vectorized + +import com.dmetasoul.lakesoul.lakesoul.io.NativeIOBase +import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.vector.ValueVector +import org.apache.spark.SparkContext + +import java.lang.reflect.{Constructor, Method} + +/** + * Use reflection to get Gluten's objects to avoid import gluten class + */ +object GlutenUtils { + lazy val isGlutenEnabled: Boolean = + SparkContext.getActive.exists(_.getConf.get("spark.plugins", "").contains("io.glutenproject.GlutenPlugin")) + + private lazy val getGlutenAllocatorMethod: Method = { + val cls = Class.forName("io.glutenproject.memory.arrowalloc.ArrowBufferAllocators") + cls.getDeclaredMethod("contextInstance") + } + + /** + * This cannot be lazy because gluten's allocator + * is associated with each of Spark's context + */ + private def getGlutenAllocator: BufferAllocator = { + if (isGlutenEnabled) { + getGlutenAllocatorMethod.invoke(null).asInstanceOf[BufferAllocator] + } else { + null + } + } + + def setArrowAllocator(io: NativeIOBase): Unit = { + if (isGlutenEnabled) { + io.setExternalAllocator(getGlutenAllocator) + } + } + + private lazy val glutenArrowColumnVectorCtor: Constructor[_] = { + if (isGlutenEnabled) { + val cls = Class.forName("io.glutenproject.vectorized.ArrowWritableColumnVector") + cls.getConstructor(classOf[ValueVector], classOf[ValueVector], classOf[Int], classOf[Int], classOf[Boolean]) + } else { + null + } + } + + def createArrowColumnVector(vector: ValueVector): ColumnVector = { + if (isGlutenEnabled) { + val args = Array[AnyRef](vector, null, Integer.valueOf(0), Integer.valueOf(vector.getValueCapacity), java.lang.Boolean.FALSE) + glutenArrowColumnVectorCtor.newInstance(args:_*).asInstanceOf[ColumnVector] + } else { + new org.apache.spark.sql.arrow.ArrowColumnVector(vector) + } + } +} diff --git a/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala b/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala index 0af230f58..1a4571b1d 100644 --- a/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala +++ b/lakesoul-spark/src/main/scala/org/apache/spark/sql/vectorized/NativeIOUtils.scala @@ -9,8 +9,8 @@ import com.dmetasoul.lakesoul.meta.DBUtil import org.apache.arrow.vector.{ValueVector, VectorSchemaRoot} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.permission.{FsAction, FsPermission} -import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.s3a.S3AFileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.parquet.hadoop.ParquetInputFormat @@ -46,12 +46,12 @@ object NativeIOUtils{ .toArray } - private def asArrowColumnVector(vector: ValueVector): org.apache.spark.sql.arrow.ArrowColumnVector = { - new org.apache.spark.sql.arrow.ArrowColumnVector(vector) + private def asArrowColumnVector(vector: ValueVector): ColumnVector = { + GlutenUtils.createArrowColumnVector(vector) } private def asColumnVector(vector: ValueVector): ColumnVector = { - asArrowColumnVector(vector).asInstanceOf[ColumnVector] + asArrowColumnVector(vector) } def getNativeIOOptions(taskAttemptContext: TaskAttemptContext, file: Path): NativeIOOptions = { diff --git a/lakesoul-spark/src/test/resources/log4j2-test.properties b/lakesoul-spark/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..f31842ca3 --- /dev/null +++ b/lakesoul-spark/src/test/resources/log4j2-test.properties @@ -0,0 +1,27 @@ +# SPDX-FileCopyrightText: 2023 LakeSoul Contributors +# +# SPDX-License-Identifier: Apache-2.0 + +# Set to debug or trace if log4j initialization is failing +status = warn + +# Name of the configuration +name = ConsoleLogConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# Root logger level +rootLogger.level = warn + +# Root logger referring to console appender +rootLogger.appenderRef.stdout.ref = consoleLogger + +loggers = tmm,lakesoul +logger.tmm.name = org.apache.spark.memory.TaskMemoryManager +logger.tmm.level = WARN +logger.lakesoul.name = org.apache.flink.lakesoul +logger.lakesoul.level = ERROR diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala index 4baa5e578..0e1636865 100644 --- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala @@ -35,7 +35,7 @@ class CDCSuite val session = new LakeSoulTestSparkSession(sparkConf) session.conf.set("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName) session.conf.set(SQLConf.DEFAULT_CATALOG.key, "lakesoul") - session.conf.set(LakeSoulSQLConf.NATIVE_IO_ENABLE.key, true) + session.conf.set(LakeSoulSQLConf.NATIVE_IO_ENABLE.key, value = true) session.sparkContext.setLogLevel("ERROR") session diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulGlutenCompatSuite.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulGlutenCompatSuite.scala new file mode 100644 index 000000000..21ace03f5 --- /dev/null +++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulGlutenCompatSuite.scala @@ -0,0 +1,113 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package org.apache.spark.sql.lakesoul.rules + +import com.dmetasoul.lakesoul.tables.LakeSoulTable +import io.glutenproject.execution.WholeStageTransformerSuite +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.ExtendedMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog +import org.apache.spark.util.Utils +import org.junit.runner.RunWith +import org.scalatestplus.junit.JUnitRunner + +import java.io.File + +//@RunWith(classOf[JUnitRunner]) +class LakeSoulGlutenCompatSuite + extends WholeStageTransformerSuite { + + override def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.codegen.wholeStage", "false") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.network.timeout", "10000000") + .set("spark.gluten.sql.native.writer.enabled", "true") + .set("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName) + .set(SQLConf.DEFAULT_CATALOG.key, LakeSoulCatalog.CATALOG_NAME) + .set("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension") + } + + import testImplicits._ + + test("lakesoul write scan - nopk") { + withTempDir(dir => { + val tablePath = dir.getCanonicalPath + val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name") + df.write + .mode("overwrite") + .format("lakesoul") + .option("rangePartitions","date") + .save(tablePath) + val dfRead = spark.read.format("lakesoul").load(tablePath).select("date", "id", "name") + assert(dfRead.queryExecution.explainString(ExtendedMode).contains("InputIteratorTransformer")) + assert(!dfRead.queryExecution.explainString(ExtendedMode).matches(".*\\bColumnarToRow\\b.*")) + checkAnswer(dfRead, Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date", "id", "name")) + }) + } + + test("lakesoul write scan - pk") { + withTempDir(dir => { + val tablePath = dir.getCanonicalPath + val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name") + df.write + .mode("overwrite") + .format("lakesoul") + .option("hashPartitions","id") + .option("hashBucketNum","2") + .option("rangePartitions","date") + .save(tablePath) + val dfRead = spark.read.format("lakesoul").load(tablePath).select("date", "id", "name") + assert(dfRead.queryExecution.explainString(ExtendedMode).contains("InputIteratorTransformer")) + assert(!dfRead.queryExecution.explainString(ExtendedMode).matches(".*\\bColumnarToRow\\b.*")) + checkAnswer(dfRead, Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date", "id", "name")) + }) + } + + test("lakesoul write scan - table") { + withTable("temp")({ + val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name") + df.write + .mode("overwrite") + .format("lakesoul") + .option("hashPartitions","id") + .option("hashBucketNum","2") + .option("rangePartitions","date") + .saveAsTable("temp") + val dfRead = spark.sql(s"select date, id, name from temp") + assert(dfRead.queryExecution.explainString(ExtendedMode).contains("InputIteratorTransformer")) + assert(!dfRead.queryExecution.explainString(ExtendedMode).matches(".*\\bColumnarToRow\\b.*")) + checkAnswer(dfRead, Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date", "id", "name")) + }) + } + + override def withTable(tableNames: String*)(f: => Unit): Unit = { + Utils.tryWithSafeFinally(f) { + tableNames.foreach { name => + spark.sql(s"DROP TABLE IF EXISTS $name") + } + } + } + + override def withTempDir(f: File => Unit): Unit = { + val dir = Utils.createTempDir() + try { + f(dir) + waitForTasksToFinish() + } finally { + Utils.deleteRecursively(dir) + try { + LakeSoulTable.forPath(dir.getCanonicalPath).dropTable() + } catch { + case _: Exception => + } + } + } + + override protected val backend: String = "velox" + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" +} diff --git a/native-io/lakesoul-io-java/pom.xml b/native-io/lakesoul-io-java/pom.xml index c039d27e7..7d8629e85 100644 --- a/native-io/lakesoul-io-java/pom.xml +++ b/native-io/lakesoul-io-java/pom.xml @@ -24,9 +24,7 @@ SPDX-License-Identifier: Apache-2.0 8 8 - 11.0.0 - 2.12.10 - 2.12 + 12.0.0 3.1.0 diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java index 7370513e2..875b234ec 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOBase.java @@ -4,15 +4,15 @@ package com.dmetasoul.lakesoul.lakesoul.io; +import com.dmetasoul.lakesoul.lakesoul.io.jnr.JnrLoader; +import com.dmetasoul.lakesoul.lakesoul.io.jnr.LibLakeSoulIO; +import com.dmetasoul.lakesoul.lakesoul.memory.ArrowMemoryUtils; import jnr.ffi.ObjectReferenceManager; import jnr.ffi.Pointer; import jnr.ffi.Runtime; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.CDataDictionaryProvider; import org.apache.arrow.c.Data; -import com.dmetasoul.lakesoul.lakesoul.io.jnr.JnrLoader; -import com.dmetasoul.lakesoul.lakesoul.io.jnr.LibLakeSoulIO; -import com.dmetasoul.lakesoul.lakesoul.memory.ArrowMemoryUtils; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.types.pojo.Schema; @@ -38,12 +38,15 @@ public class NativeIOBase implements AutoCloseable { protected CDataDictionaryProvider provider; + protected boolean hasExternalAllocator; + public static boolean isNativeIOLibExist() { return JnrLoader.get() != null; } public NativeIOBase(String allocatorName) { this.allocator = ArrowMemoryUtils.rootAllocator.newChildAllocator(allocatorName, 0, Long.MAX_VALUE); + this.hasExternalAllocator = false; this.provider = new CDataDictionaryProvider(); libLakeSoulIO = JnrLoader.get(); @@ -56,6 +59,11 @@ public NativeIOBase(String allocatorName) { setThreadNum(2); } + public void setExternalAllocator(BufferAllocator allocator) { + this.allocator = allocator; + this.hasExternalAllocator = true; + } + public void addFile(String file) { Pointer ptr = LibLakeSoulIO.buildStringPointer(libLakeSoulIO, file); ioConfigBuilder = libLakeSoulIO.lakesoul_config_builder_add_single_file(ioConfigBuilder, ptr); @@ -131,7 +139,8 @@ public void close() throws Exception { provider = null; } if (allocator != null) { - allocator.close(); + if (!hasExternalAllocator) + allocator.close(); allocator = null; } } diff --git a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOReader.java b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOReader.java index b27f67d71..2ed72f802 100644 --- a/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOReader.java +++ b/native-io/lakesoul-io-java/src/main/java/com/dmetasoul/lakesoul/lakesoul/io/NativeIOReader.java @@ -4,11 +4,10 @@ package com.dmetasoul.lakesoul.lakesoul.io; +import com.dmetasoul.lakesoul.lakesoul.io.jnr.LibLakeSoulIO; import jnr.ffi.Pointer; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.Data; -import com.dmetasoul.lakesoul.lakesoul.io.jnr.LibLakeSoulIO; -import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import java.io.IOException; diff --git a/native-io/lakesoul-io-java/src/main/scala/com/dmetasoul/lakesoul/LakeSoulArrowReader.scala b/native-io/lakesoul-io-java/src/main/scala/com/dmetasoul/lakesoul/LakeSoulArrowReader.scala index 35b862dfd..3cf0f5078 100644 --- a/native-io/lakesoul-io-java/src/main/scala/com/dmetasoul/lakesoul/LakeSoulArrowReader.scala +++ b/native-io/lakesoul-io-java/src/main/scala/com/dmetasoul/lakesoul/LakeSoulArrowReader.scala @@ -38,30 +38,19 @@ case class LakeSoulArrowReader(reader: NativeIOReader, val iterator = new BatchIterator class BatchIterator extends Iterator[Option[VectorSchemaRoot]] { - private var vsrFuture: Future[Option[VectorSchemaRoot]] = _ + private var vsr: Option[VectorSchemaRoot] = _ var finished = false override def hasNext: Boolean = { if (!finished) { clean() - val p = Promise[Option[VectorSchemaRoot]]() - vsrFuture = p.future + val p = Promise[Option[Int]]() val consumerSchema = ArrowSchema.allocateNew(reader.getAllocator) val consumerArray = ArrowArray.allocateNew(reader.getAllocator) val provider = new CDataDictionaryProvider reader.nextBatch((rowCount, err) => { if (rowCount > 0) { - try { - val root: VectorSchemaRoot = { - Data.importVectorSchemaRoot(reader.getAllocator, consumerArray, consumerSchema, provider) - } - if (root.getSchema.getFields.isEmpty) { - root.setRowCount(rowCount) - } - p.success(Some(root)) - } catch { - case e: Throwable => p.failure(e) - } + p.success(Some(rowCount)) } else { if (err == null) { p.success(None) @@ -73,8 +62,16 @@ case class LakeSoulArrowReader(reader: NativeIOReader, }, consumerSchema.memoryAddress, consumerArray.memoryAddress) try { Await.result(p.future, timeout milli) match { - case Some(_) => true - case _ => false + case Some(rowCount) => + val root: VectorSchemaRoot = { + Data.importVectorSchemaRoot(reader.getAllocator, consumerArray, consumerSchema, provider) + } + root.setRowCount(rowCount) + vsr = Some(root) + true + case _ => + vsr = None + false } } catch { case e: java.util.concurrent.TimeoutException => @@ -91,6 +88,7 @@ case class LakeSoulArrowReader(reader: NativeIOReader, consumerSchema.close() } } else { + clean() false } } @@ -99,7 +97,7 @@ case class LakeSoulArrowReader(reader: NativeIOReader, if (ex.isDefined) { throw ex.get } - Await.result(vsrFuture, timeout milli) + vsr } private def finish(): Unit = { @@ -109,10 +107,11 @@ case class LakeSoulArrowReader(reader: NativeIOReader, } def clean(): Unit = { - if (vsrFuture != null && vsrFuture.isCompleted) { - vsrFuture.value match { - case Some(Success(Some(batch))) => - batch.close() + if (vsr != null) { + vsr match { + case Some(root) => + root.close() + vsr = None case _ => } } diff --git a/pom.xml b/pom.xml index a7cae2eaf..18e6027b4 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,7 @@ SPDX-License-Identifier: Apache-2.0 2.17.2 provided 3.3.1 + 2.12.15