From ef46681c73accc2fd2f818fb4588b84fd91943c0 Mon Sep 17 00:00:00 2001 From: Socrates Date: Wed, 4 Dec 2024 09:06:05 +0800 Subject: [PATCH] [fix](hudi) upgrade hudi to 0.15.0 (#44267) ### What problem does this PR solve? 1. upgrade hudi to 0.15.0. 2. impl new hudi jni reader based on hudi-hadoop-mr 3. add session variable `hudi_jni_scanner` to choose which hudi jni reader to use, "hadoop" means HadoopHudiJniReader, "spark" means old HudiJniReader, default value is "hadoop" 4. support session variable `force_jni_scanner` for hudi 5. add more cases for hudi p2 ### Release note [opt](hudi) upgrade hudi to 0.15 and support hadoop jni reader --- .../vec/exec/format/table/hudi_jni_reader.cpp | 14 +- .../vec/exec/format/table/hudi_jni_reader.h | 4 +- be/src/vec/exec/scan/vfile_scanner.cpp | 17 +- build.sh | 2 + conf/be.conf | 2 +- .../hadoop-hudi-scanner/pom.xml | 227 +++++++++++++++ .../doris/hudi/HadoopHudiColumnValue.java | 219 ++++++++++++++ .../doris/hudi/HadoopHudiJniScanner.java | 271 ++++++++++++++++++ .../src/main/resources/package.xml | 41 +++ .../java/org/apache/doris/hudi/Utils.java | 4 +- .../apache/doris/hudi/BaseSplitReader.scala | 15 +- fe/be-java-extensions/pom.xml | 1 + .../doris/datasource/FileQueryScanNode.java | 2 + .../hive/HiveMetaStoreClientHelper.java | 5 +- .../doris/datasource/hudi/HudiUtils.java | 2 +- .../hudi/source/COWIncrementalRelation.java | 11 +- .../hudi/source/HudiLocalEngineContext.java | 67 ++--- .../hudi/source/HudiPartitionProcessor.java | 14 +- .../datasource/hudi/source/HudiScanNode.java | 61 ++-- .../datasource/hudi/source/HudiSplit.java | 3 +- .../hudi/source/MORIncrementalRelation.java | 14 +- .../paimon/source/PaimonScanNode.java | 39 ++- .../translator/PhysicalPlanTranslator.java | 2 +- .../doris/planner/SingleNodePlanner.java | 2 +- .../org/apache/doris/qe/SessionVariable.java | 14 + fe/pom.xml | 6 +- gensrc/thrift/PlanNodes.thrift | 4 +- .../hudi/test_hudi_incremental.out | 174 +++++++++++ .../hudi/test_hudi_schema_evolution.out | 32 +++ .../hudi/test_hudi_snapshot.out | Bin 348526 -> 696105 bytes .../hudi/test_hudi_timetravel.out | 120 ++++++++ .../hudi/test_hudi_catalog.groovy | 2 +- .../hudi/test_hudi_incremental.groovy | 16 +- .../hudi/test_hudi_schema_evolution.groovy | 14 +- .../hudi/test_hudi_snapshot.groovy | 13 +- .../hudi/test_hudi_timestamp.groovy | 2 +- .../hudi/test_hudi_timetravel.groovy | 15 +- 37 files changed, 1312 insertions(+), 139 deletions(-) create mode 100644 fe/be-java-extensions/hadoop-hudi-scanner/pom.xml create mode 100644 fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java create mode 100644 fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java create mode 100644 fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp b/be/src/vec/exec/format/table/hudi_jni_reader.cpp index 33ba92b540a497..cb109bf05a2393 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp @@ -18,7 +18,6 @@ #include "hudi_jni_reader.h" #include -#include #include "runtime/descriptors.h" #include "runtime/runtime_state.h" @@ -65,7 +64,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, {"input_format", _hudi_params.input_format}}; // Use compatible hadoop client to read data - for (auto& kv : _scan_params.properties) { + for (const auto& kv : _scan_params.properties) { if (kv.first.starts_with(HOODIE_CONF_PREFIX)) { params[kv.first] = kv.second; } else { @@ -73,8 +72,15 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, } } - _jni_connector = std::make_unique("org/apache/doris/hudi/HudiJniScanner", params, - required_fields); + if (_hudi_params.hudi_jni_scanner == "hadoop") { + _jni_connector = std::make_unique( + "org/apache/doris/hudi/HadoopHudiJniScanner", params, required_fields); + } else if (_hudi_params.hudi_jni_scanner == "spark") { + _jni_connector = std::make_unique("org/apache/doris/hudi/HudiJniScanner", + params, required_fields); + } else { + DCHECK(false) << "Unsupported hudi jni scanner: " << _hudi_params.hudi_jni_scanner; + } } Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h b/be/src/vec/exec/format/table/hudi_jni_reader.h index e9bb55a69a77e7..bfa0291a61035c 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.h +++ b/be/src/vec/exec/format/table/hudi_jni_reader.h @@ -17,9 +17,7 @@ #pragma once -#include - -#include +#include #include #include #include diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 997eef02090912..ffedf6f0d58461 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -23,18 +23,15 @@ #include #include -#include #include #include #include -#include #include #include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" -#include "common/object_pool.h" #include "io/cache/block_file_cache_profile.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" @@ -47,7 +44,6 @@ #include "vec/common/string_ref.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/columns_with_type_and_name.h" -#include "vec/core/field.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_nullable.h" @@ -720,17 +716,16 @@ Status VFileScanner::_get_next_reader() { // create reader for specific format Status init_status; - TFileFormatType::type format_type = _params->format_type; + // for compatibility, if format_type is not set in range, use the format type of params + TFileFormatType::type format_type = + range.__isset.format_type ? range.format_type : _params->format_type; // JNI reader can only push down column value range bool push_down_predicates = !_is_load && _params->format_type != TFileFormatType::FORMAT_JNI; + // for compatibility, this logic is deprecated in 3.1 if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) { - if (range.table_format_params.table_format_type == "hudi" && - range.table_format_params.hudi_params.delta_logs.empty()) { - // fall back to native reader if there is no log file - format_type = TFileFormatType::FORMAT_PARQUET; - } else if (range.table_format_params.table_format_type == "paimon" && - !range.table_format_params.paimon_params.__isset.paimon_split) { + if (range.table_format_params.table_format_type == "paimon" && + !range.table_format_params.paimon_params.__isset.paimon_split) { // use native reader auto format = range.table_format_params.paimon_params.file_format; if (format == "orc") { diff --git a/build.sh b/build.sh index 6f3ddfa236fb10..8f1262aa76b211 100755 --- a/build.sh +++ b/build.sh @@ -525,6 +525,7 @@ fi if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then modules+=("fe-common") modules+=("be-java-extensions/hudi-scanner") + modules+=("be-java-extensions/hadoop-hudi-scanner") modules+=("be-java-extensions/java-common") modules+=("be-java-extensions/java-udf") modules+=("be-java-extensions/jdbc-scanner") @@ -814,6 +815,7 @@ EOF extensions_modules=("java-udf") extensions_modules+=("jdbc-scanner") extensions_modules+=("hudi-scanner") + extensions_modules+=("hadoop-hudi-scanner") extensions_modules+=("paimon-scanner") extensions_modules+=("trino-connector-scanner") extensions_modules+=("max-compute-scanner") diff --git a/conf/be.conf b/conf/be.conf index fc89b5985b2454..896c7b74f22bc4 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -24,7 +24,7 @@ LOG_DIR="${DORIS_HOME}/log/" JAVA_OPTS="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$LOG_DIR/jni.log -Xloggc:$LOG_DIR/be.gc.log.$CUR_DATE -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=50M -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" # For jdk 17, this JAVA_OPTS will be used as default JVM options -JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$LOG_DIR/jni.log -Xlog:gc*:$LOG_DIR/be.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED" +JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Djol.skipHotspotSAAttach=true -Xmx2048m -DlogPath=$LOG_DIR/jni.log -Xlog:gc*:$LOG_DIR/be.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED" # Set your own JAVA_HOME # JAVA_HOME=/path/to/jdk/ diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml new file mode 100644 index 00000000000000..4b80d49de17527 --- /dev/null +++ b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml @@ -0,0 +1,227 @@ + + + + + + be-java-extensions + org.apache.doris + ${revision} + + 4.0.0 + hadoop-hudi-scanner + + + ${basedir}/../../ + 1 + 0.15.0 + 1.11.3 + 1.5.4-2 + 3.1.2-22 + + + + + org.apache.doris + java-common + ${project.version} + + + org.apache.thrift + libthrift + + + + + + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + + + + + org.apache.hadoop + hadoop-common + + + + org.apache.hadoop + hadoop-aws + + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + + org.junit.jupiter + junit-jupiter + + + + + org.apache.hudi + hudi-common + ${hudi.version} + + + org.apache.hbase + hbase-client + + + org.apache.hbase + hbase-server + + + org.apache.thrift + libthrift + + + com.fasterxml.jackson.core + jackson-databind + + + + + + + org.apache.hudi + hudi-io + ${hudi.version} + + + + + org.apache.hudi + hudi-hadoop-mr + ${hudi.version} + + + + + org.apache.parquet + parquet-hadoop-bundle + ${parquet.version} + + + + + org.apache.parquet + parquet-avro + ${parquet.version} + + + + + org.apache.avro + avro + ${avro.version} + + + org.apache.commons + commons-compress + + + + + + io.airlift + concurrent + 202 + + + + + io.airlift + aircompressor + ${aircompressor.version} + + + + com.github.luben + zstd-jni + ${luben.zstd.jni.version} + + + + com.esotericsoftware + kryo-shaded + 4.0.2 + + + + + io.trino.hive + hive-apache + ${hive-apache.version} + + + org.apache.thrift + libthrift + + + org.apache.parquet + * + + + org.apache.avro + * + + + io.airlift + aircompressor + + + + + + + hadoop-hudi-scanner + + + org.apache.maven.plugins + maven-assembly-plugin + + + src/main/resources/package.xml + + + + + + + + + + make-assembly + package + + single + + + + + + + diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java new file mode 100644 index 00000000000000..ae0199d07d27c5 --- /dev/null +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hudi; + +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.ColumnValue; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.io.LongWritable; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Map; + +public class HadoopHudiColumnValue implements ColumnValue { + private ColumnType dorisType; + private ObjectInspector fieldInspector; + private Object fieldData; + private final ZoneId zoneId; + + public HadoopHudiColumnValue(ZoneId zoneId) { + this.zoneId = zoneId; + } + + public void setRow(Object record) { + this.fieldData = record; + } + + public void setField(ColumnType dorisType, ObjectInspector fieldInspector) { + this.dorisType = dorisType; + this.fieldInspector = fieldInspector; + } + + private Object inspectObject() { + return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData); + } + + @Override + public boolean getBoolean() { + return (boolean) inspectObject(); + } + + @Override + public short getShort() { + return (short) inspectObject(); + } + + @Override + public int getInt() { + return (int) inspectObject(); + } + + @Override + public float getFloat() { + return (float) inspectObject(); + } + + @Override + public long getLong() { + return (long) inspectObject(); + } + + @Override + public double getDouble() { + return (double) inspectObject(); + } + + @Override + public String getString() { + return inspectObject().toString(); + } + + @Override + public byte[] getBytes() { + return (byte[]) inspectObject(); + } + + + @Override + public byte getByte() { + throw new UnsupportedOperationException("Hoodie type does not support tinyint"); + } + + @Override + public BigDecimal getDecimal() { + return ((HiveDecimal) inspectObject()).bigDecimalValue(); + } + + @Override + public LocalDate getDate() { + return LocalDate.ofEpochDay((((DateObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData)) + .toEpochDay()); + } + + @Override + public LocalDateTime getDateTime() { + if (fieldData instanceof Timestamp) { + return ((Timestamp) fieldData).toLocalDateTime(); + } else if (fieldData instanceof TimestampWritableV2) { + return LocalDateTime.ofInstant(Instant.ofEpochSecond((((TimestampObjectInspector) fieldInspector) + .getPrimitiveJavaObject(fieldData)).toEpochSecond()), zoneId); + } else { + long datetime = ((LongWritable) fieldData).get(); + long seconds; + long nanoseconds; + if (dorisType.getPrecision() == 3) { + seconds = datetime / 1000; + nanoseconds = (datetime % 1000) * 1000000; + } else if (dorisType.getPrecision() == 6) { + seconds = datetime / 1000000; + nanoseconds = (datetime % 1000000) * 1000; + } else { + throw new RuntimeException("Hoodie timestamp only support milliseconds and microseconds, " + + "wrong precision = " + dorisType.getPrecision()); + } + return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, nanoseconds), zoneId); + } + } + + @Override + public boolean canGetStringAsBytes() { + return false; + } + + @Override + public boolean isNull() { + return fieldData == null; + } + + @Override + public BigInteger getBigInteger() { + throw new UnsupportedOperationException("Hoodie type does not support largeint"); + } + + @Override + public byte[] getStringAsBytes() { + throw new UnsupportedOperationException("Hoodie type does not support getStringAsBytes"); + } + + @Override + public void unpackArray(List values) { + ListObjectInspector inspector = (ListObjectInspector) fieldInspector; + List items = inspector.getList(fieldData); + ObjectInspector itemInspector = inspector.getListElementObjectInspector(); + for (int i = 0; i < items.size(); i++) { + Object item = items.get(i); + HadoopHudiColumnValue childValue = new HadoopHudiColumnValue(zoneId); + childValue.setRow(item); + childValue.setField(dorisType.getChildTypes().get(0), itemInspector); + values.add(childValue); + } + } + + @Override + public void unpackMap(List keys, List values) { + MapObjectInspector inspector = (MapObjectInspector) fieldInspector; + ObjectInspector keyObjectInspector = inspector.getMapKeyObjectInspector(); + ObjectInspector valueObjectInspector = inspector.getMapValueObjectInspector(); + for (Map.Entry kv : inspector.getMap(fieldData).entrySet()) { + HadoopHudiColumnValue key = new HadoopHudiColumnValue(zoneId); + key.setRow(kv.getKey()); + key.setField(dorisType.getChildTypes().get(0), keyObjectInspector); + keys.add(key); + + HadoopHudiColumnValue value = new HadoopHudiColumnValue(zoneId); + value.setRow(kv.getValue()); + value.setField(dorisType.getChildTypes().get(1), valueObjectInspector); + values.add(value); + } + } + + @Override + public void unpackStruct(List structFieldIndex, List values) { + StructObjectInspector inspector = (StructObjectInspector) fieldInspector; + List fields = inspector.getAllStructFieldRefs(); + for (int i = 0; i < structFieldIndex.size(); i++) { + Integer idx = structFieldIndex.get(i); + HadoopHudiColumnValue value = new HadoopHudiColumnValue(zoneId); + Object obj = null; + if (idx != null) { + StructField sf = fields.get(idx); + obj = inspector.getStructFieldData(fieldData, sf); + } + value.setRow(obj); + value.setField(dorisType.getChildTypes().get(i), fields.get(i).getFieldObjectInspector()); + values.add(value); + } + } +} diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java new file mode 100644 index 00000000000000..f2b38815a366fe --- /dev/null +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hudi; + +import org.apache.doris.common.classloader.ThreadClassLoaderContext; +import org.apache.doris.common.jni.JniScanner; +import org.apache.doris.common.jni.vec.ColumnType; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * HadoopHudiJniScanner is a JniScanner implementation that reads Hudi data using hudi-hadoop-mr. + */ +public class HadoopHudiJniScanner extends JniScanner { + private static final Logger LOG = LoggerFactory.getLogger(HadoopHudiJniScanner.class); + + private static final String HADOOP_CONF_PREFIX = "hadoop_conf."; + + // Hudi data info + private final String basePath; + private final String dataFilePath; + private final long dataFileLength; + private final String[] deltaFilePaths; + private final String instantTime; + private final String serde; + private final String inputFormat; + + // schema info + private final String hudiColumnNames; + private final String[] hudiColumnTypes; + private final String[] requiredFields; + private List requiredColumnIds; + private ColumnType[] requiredTypes; + + // Hadoop info + private RecordReader reader; + private StructObjectInspector rowInspector; + private final ObjectInspector[] fieldInspectors; + private final StructField[] structFields; + private Deserializer deserializer; + private final Map fsOptionsProps; + + // scanner info + private final HadoopHudiColumnValue columnValue; + private final int fetchSize; + private final ClassLoader classLoader; + + public HadoopHudiJniScanner(int fetchSize, Map params) { + this.basePath = params.get("base_path"); + this.dataFilePath = params.get("data_file_path"); + this.dataFileLength = Long.parseLong(params.get("data_file_length")); + if (Strings.isNullOrEmpty(params.get("delta_file_paths"))) { + this.deltaFilePaths = new String[0]; + } else { + this.deltaFilePaths = params.get("delta_file_paths").split(","); + } + this.instantTime = params.get("instant_time"); + this.serde = params.get("serde"); + this.inputFormat = params.get("input_format"); + + this.hudiColumnNames = params.get("hudi_column_names"); + this.hudiColumnTypes = params.get("hudi_column_types").split("#"); + this.requiredFields = params.get("required_fields").split(","); + + this.fieldInspectors = new ObjectInspector[requiredFields.length]; + this.structFields = new StructField[requiredFields.length]; + this.fsOptionsProps = Maps.newHashMap(); + for (Map.Entry entry : params.entrySet()) { + if (entry.getKey().startsWith(HADOOP_CONF_PREFIX)) { + fsOptionsProps.put(entry.getKey().substring(HADOOP_CONF_PREFIX.length()), entry.getValue()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("get hudi params {}: {}", entry.getKey(), entry.getValue()); + } + } + + ZoneId zoneId; + if (Strings.isNullOrEmpty(params.get("time_zone"))) { + zoneId = ZoneId.systemDefault(); + } else { + zoneId = ZoneId.of(params.get("time_zone")); + } + this.columnValue = new HadoopHudiColumnValue(zoneId); + this.fetchSize = fetchSize; + this.classLoader = this.getClass().getClassLoader(); + } + + @Override + public void open() throws IOException { + try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { + initRequiredColumnsAndTypes(); + initTableInfo(requiredTypes, requiredFields, fetchSize); + Properties properties = getReaderProperties(); + initReader(properties); + } catch (Exception e) { + close(); + LOG.warn("failed to open hadoop hudi jni scanner", e); + throw new IOException("failed to open hadoop hudi jni scanner: " + e.getMessage(), e); + } + } + + @Override + public int getNext() throws IOException { + try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + int numRows = 0; + for (; numRows < fetchSize; numRows++) { + if (!reader.next(key, value)) { + break; + } + Object rowData = deserializer.deserialize(value); + for (int i = 0; i < fields.length; i++) { + Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); + columnValue.setRow(fieldData); + // LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}", + // numRows, i, types[i].getName(), types[i].getType().name(), + // fieldInspectors[i].getTypeName()); + columnValue.setField(types[i], fieldInspectors[i]); + appendData(i, columnValue); + } + } + return numRows; + } catch (Exception e) { + close(); + LOG.warn("failed to get next in hadoop hudi jni scanner", e); + throw new IOException("failed to get next in hadoop hudi jni scanner: " + e.getMessage(), e); + } + } + + @Override + public void close() throws IOException { + try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + LOG.warn("failed to close hadoop hudi jni scanner", e); + throw new IOException("failed to close hadoop hudi jni scanner: " + e.getMessage(), e); + } + } + + private void initRequiredColumnsAndTypes() { + String[] splitHudiColumnNames = hudiColumnNames.split(","); + + Map hudiColNameToIdx = + IntStream.range(0, splitHudiColumnNames.length) + .boxed() + .collect(Collectors.toMap(i -> splitHudiColumnNames[i], i -> i)); + + Map hudiColNameToType = + IntStream.range(0, splitHudiColumnNames.length) + .boxed() + .collect(Collectors.toMap(i -> splitHudiColumnNames[i], i -> hudiColumnTypes[i])); + + requiredTypes = Arrays.stream(requiredFields) + .map(field -> ColumnType.parseType(field, hudiColNameToType.get(field))) + .toArray(ColumnType[]::new); + + requiredColumnIds = Arrays.stream(requiredFields) + .mapToInt(hudiColNameToIdx::get) + .boxed().collect(Collectors.toList()); + } + + private Properties getReaderProperties() { + Properties properties = new Properties(); + properties.setProperty("hive.io.file.readcolumn.ids", Joiner.on(",").join(requiredColumnIds)); + properties.setProperty("hive.io.file.readcolumn.names", Joiner.on(",").join(this.requiredFields)); + properties.setProperty("columns", this.hudiColumnNames); + properties.setProperty("columns.types", Joiner.on(",").join(hudiColumnTypes)); + properties.setProperty("serialization.lib", this.serde); + properties.setProperty("hive.io.file.read.all.columns", "false"); + fsOptionsProps.forEach(properties::setProperty); + return properties; + } + + private void initReader(Properties properties) throws Exception { + String realtimePath = dataFileLength != -1 ? dataFilePath : deltaFilePaths[0]; + long realtimeLength = dataFileLength != -1 ? dataFileLength : 0; + Path path = new Path(realtimePath); + FileSplit fileSplit = new FileSplit(path, 0, realtimeLength, (String[]) null); + List logFiles = Arrays.stream(deltaFilePaths).map(HoodieLogFile::new) + .collect(Collectors.toList()); + FileSplit hudiSplit = + new HoodieRealtimeFileSplit(fileSplit, basePath, logFiles, instantTime, false, Option.empty()); + + JobConf jobConf = new JobConf(new Configuration()); + properties.stringPropertyNames().forEach(name -> jobConf.set(name, properties.getProperty(name))); + InputFormat inputFormatClass = createInputFormat(jobConf, inputFormat); + reader = (RecordReader) inputFormatClass + .getRecordReader(hudiSplit, jobConf, Reporter.NULL); + + deserializer = getDeserializer(jobConf, properties, serde); + rowInspector = getTableObjectInspector(deserializer); + for (int i = 0; i < requiredFields.length; i++) { + StructField field = rowInspector.getStructFieldRef(requiredFields[i]); + structFields[i] = field; + fieldInspectors[i] = field.getFieldObjectInspector(); + } + } + + private InputFormat createInputFormat(Configuration conf, String inputFormat) throws Exception { + Class clazz = conf.getClassByName(inputFormat); + Class> cls = + (Class>) clazz.asSubclass(InputFormat.class); + return ReflectionUtils.newInstance(cls, conf); + } + + private Deserializer getDeserializer(Configuration configuration, Properties properties, String name) + throws Exception { + Class deserializerClass = Class.forName(name, true, JavaUtils.getClassLoader()) + .asSubclass(Deserializer.class); + Deserializer deserializer = deserializerClass.getConstructor().newInstance(); + deserializer.initialize(configuration, properties); + return deserializer; + } + + private StructObjectInspector getTableObjectInspector(Deserializer deserializer) throws Exception { + ObjectInspector inspector = deserializer.getObjectInspector(); + Preconditions.checkArgument(inspector.getCategory() == ObjectInspector.Category.STRUCT, + "expected STRUCT: %s", inspector.getCategory()); + return (StructObjectInspector) inspector; + } +} diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml new file mode 100644 index 00000000000000..4bbb2610603363 --- /dev/null +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml @@ -0,0 +1,41 @@ + + + + jar-with-dependencies + + jar + + false + + + / + true + true + runtime + + + **/Log4j2Plugins.dat + + + + + diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java index 5614f8bcc96eb1..3e07c8917905a3 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java @@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import java.io.BufferedReader; import java.io.File; @@ -75,7 +76,8 @@ public static void killProcess(long pid) { } public static HoodieTableMetaClient getMetaClient(Configuration conf, String basePath) { + HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf); return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> HoodieTableMetaClient.builder() - .setConf(conf).setBasePath(basePath).build()); + .setConf(hadoopStorageConfiguration).setBasePath(basePath).build()); } } diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala index dcc068ad7006d8..fc8d74f9713c26 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala @@ -36,13 +36,15 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.{ConfigUtils, StringUtils} import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.hadoop.CachingPath +import org.apache.hudi.hadoop.fs.CachingPath import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} -import org.apache.hudi.io.storage.HoodieAvroHFileReader +import org.apache.hudi.io.hadoop.HoodieHBaseAvroHFileReader import org.apache.hudi.metadata.HoodieTableMetadataUtil import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkConfUtils, HoodieTableSchema, HoodieTableState} +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.log4j.Logger import org.apache.spark.sql.adapter.Spark3_4Adapter import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters} @@ -430,7 +432,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) { try { if (shouldExtractPartitionValuesFromPartitionPath) { val filePath = new Path(split.dataFilePath) - val tablePathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(tableInformation.metaClient.getBasePathV2) + val tablePathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(new Path(tableInformation.metaClient.getBasePathV2.toUri)) val partitionPathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(filePath.getParent) val relativePath = new URI(tablePathWithoutScheme.toString).relativize(new URI(partitionPathWithoutScheme.toString)).toString val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean @@ -497,8 +499,11 @@ abstract class BaseSplitReader(val split: HoodieSplit) { options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { partitionedFile => { - val reader = new HoodieAvroHFileReader( - hadoopConf, partitionedFile.filePath.toPath, new CacheConfig(hadoopConf)) + var hadoopStorageConfiguration = new HadoopStorageConfiguration(hadoopConf); + var storagePath = new StoragePath(partitionedFile.toPath.toUri.getPath); + var emptySchema = org.apache.hudi.common.util.Option.empty[org.apache.avro.Schema]() + val reader = new HoodieHBaseAvroHFileReader( + hadoopStorageConfiguration, storagePath, emptySchema) val requiredRowSchema = requiredDataSchema.structTypeSchema // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml index bbe056739d51ec..5d56ef76e7c3ef 100644 --- a/fe/be-java-extensions/pom.xml +++ b/fe/be-java-extensions/pom.xml @@ -22,6 +22,7 @@ under the License. 4.0.0 hudi-scanner + hadoop-hudi-scanner java-common java-udf jdbc-scanner diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index d1de1306d04693..5ea2a2637d86e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -433,6 +433,8 @@ private TScanRangeLocations splitToScanRange( } } + // set file format type, and the type might fall back to native format in setScanParams + rangeDesc.setFormatType(getFileFormatType()); setScanParams(rangeDesc, fileSplit); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); TScanRangeLocation location = new TScanRangeLocation(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index 97032467cec765..0f60a61b27538c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -64,6 +64,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -828,8 +829,10 @@ public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction act public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) { String hudiBasePath = table.getRemoteTable().getSd().getLocation(); Configuration conf = getConfiguration(table); + HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf); return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), - () -> HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build()); + () -> HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath) + .build()); } public static Configuration getConfiguration(HMSExternalTable table) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java index d7803b1a516f9e..c98d994a28a08f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java @@ -86,7 +86,7 @@ public static String convertAvroToHiveType(Schema schema) { case LONG: if (logicalType instanceof LogicalTypes.TimestampMillis || logicalType instanceof LogicalTypes.TimestampMicros) { - return logicalType.getName(); + return "timestamp"; } if (logicalType instanceof LogicalTypes.TimeMicros) { return handleUnsupportedType(schema); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java index 7981a0b4f261ff..843dded27969ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.StoragePath; import java.io.IOException; import java.util.ArrayList; @@ -105,7 +106,7 @@ public COWIncrementalRelation(Map optParams, Configuration confi List commitsToReturn = commitsTimelineToReturn.getInstants(); // todo: support configuration hoodie.datasource.read.incr.filters - Path basePath = metaClient.getBasePathV2(); + StoragePath basePath = metaClient.getBasePathV2(); Map regularFileIdToFullPath = new HashMap<>(); Map metaBootstrapFileIdToFullPath = new HashMap<>(); HoodieTimeline replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline(); @@ -113,8 +114,8 @@ public COWIncrementalRelation(Map optParams, Configuration confi for (HoodieInstant instant : replacedTimeline.getInstants()) { HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class).getPartitionToReplaceFileIds().forEach( - (key, value) -> value.forEach( - e -> replacedFile.put(e, FSUtils.getPartitionPath(basePath, key).toString()))); + (key, value) -> value.forEach( + e -> replacedFile.put(e, FSUtils.constructAbsolutePath(basePath, key).toString()))); } fileToWriteStat = new HashMap<>(); @@ -123,7 +124,7 @@ public COWIncrementalRelation(Map optParams, Configuration confi commitTimeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); metadata.getPartitionToWriteStats().forEach((partition, stats) -> { for (HoodieWriteStat stat : stats) { - fileToWriteStat.put(FSUtils.getPartitionPath(basePath, stat.getPath()).toString(), stat); + fileToWriteStat.put(FSUtils.constructAbsolutePath(basePath, stat.getPath()).toString(), stat); } }); if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.getTimestamp())) { @@ -158,7 +159,7 @@ public COWIncrementalRelation(Map optParams, Configuration confi } - fs = basePath.getFileSystem(configuration); + fs = new Path(basePath.toUri().getPath()).getFileSystem(configuration); fullTableScan = shouldFullTableScan(); includeStartTime = !fullTableScan; if (fullTableScan || commitsToReturn.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java index 26ef6fdfef7086..fecc026cf8d046 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java @@ -17,10 +17,6 @@ package org.apache.doris.datasource.hudi.source; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; import org.apache.hudi.common.data.HoodieData; @@ -39,7 +35,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.StorageConfiguration; import java.util.Collections; import java.util.Iterator; @@ -50,18 +46,20 @@ import java.util.stream.Stream; /** - * This file is copied from org.apache.hudi.common.engine.HoodieLocalEngineContext. + * This file is copied from + * org.apache.hudi.common.engine.HudiLocalEngineContext. * Because we need set ugi in thread pool - * A java based engine context, use this implementation on the query engine integrations if needed. + * A java based engine context, use this implementation on the query engine + * integrations if needed. */ public final class HudiLocalEngineContext extends HoodieEngineContext { - public HudiLocalEngineContext(Configuration conf) { + public HudiLocalEngineContext(StorageConfiguration conf) { this(conf, new LocalTaskContextSupplier()); } - public HudiLocalEngineContext(Configuration conf, TaskContextSupplier taskContextSupplier) { - super(new SerializableConfiguration(conf), taskContextSupplier); + public HudiLocalEngineContext(StorageConfiguration conf, TaskContextSupplier taskContextSupplier) { + super(conf, taskContextSupplier); } @Override @@ -81,27 +79,18 @@ public HoodieData parallelize(List data, int parallelism) { @Override public List map(List data, SerializableFunction func, int parallelism) { - return data.stream().parallel().map(v1 -> { - try { - return HiveMetaStoreClientHelper.ugiDoAs(getHadoopConf().get(), () -> func.apply(v1)); - } catch (Exception e) { - throw new HoodieException("Error occurs when executing map", e); - } - }).collect(Collectors.toList()); + return data.stream().parallel().map(FunctionWrapper.throwingMapWrapper(func)).collect(Collectors.toList()); } @Override public List mapToPairAndReduceByKey( - List data, - SerializablePairFunction mapToPairFunc, - SerializableBiFunction reduceFunc, int parallelism) { + List data, SerializablePairFunction mapToPairFunc, SerializableBiFunction reduceFunc, + int parallelism) { return data.stream().parallel().map(FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc)) - .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() - .map(list -> - list.stream() - .map(e -> e.getValue()) + .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() + .map(list -> list.stream().map(e -> e.getValue()) .reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).get()) - .collect(Collectors.toList()); + .collect(Collectors.toList()); } @Override @@ -109,29 +98,28 @@ public Stream> mapPartitionsToPairAndReduceByKey( Stream data, SerializablePairFlatMapFunction, K, V> flatMapToPairFunc, SerializableBiFunction reduceFunc, int parallelism) { return FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator()) - .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream() - .map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map( - Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))) - .filter(Objects::nonNull); + .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream() + .map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map( + Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))) + .filter(Objects::nonNull); } @Override public List reduceByKey( List> data, SerializableBiFunction reduceFunc, int parallelism) { return data.stream().parallel() - .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() - .map(list -> - list.stream() - .map(e -> e.getValue()) - .reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() + .map(list -> list.stream().map(e -> e.getValue()) + .reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)) + .orElse(null)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); } @Override public List flatMap(List data, SerializableFunction> func, int parallelism) { - return - data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func)).collect(Collectors.toList()); + return data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func)) + .collect(Collectors.toList()); } @Override @@ -142,8 +130,7 @@ public void foreach(List data, SerializableConsumer consumer, int para @Override public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) { return data.stream().map(FunctionWrapper.throwingMapToPairWrapper(func)).collect( - Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal) - ); + Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java index 738b2638588e03..0ab9fef951a378 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; @@ -49,14 +48,15 @@ public List getAllPartitionNames(HoodieTableMetaClient tableMetaClient) .build(); HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create( - new HudiLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig, + new HudiLocalEngineContext(tableMetaClient.getStorageConf()), tableMetaClient.getStorage(), + metadataConfig, tableMetaClient.getBasePathV2().toString(), true); return newTableMetadata.getAllPartitionPaths(); } public List getPartitionNamesBeforeOrEquals(HoodieTimeline timeline, String timestamp) { - return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths( + return new ArrayList<>(HoodieTableMetadataUtil.getWritePartitionPaths( timeline.findInstantsBeforeOrEquals(timestamp).getInstants().stream().map(instant -> { try { return TimelineUtils.getCommitMetadata(instant, timeline); @@ -67,7 +67,7 @@ public List getPartitionNamesBeforeOrEquals(HoodieTimeline timeline, Str } public List getPartitionNamesInRange(HoodieTimeline timeline, String startTimestamp, String endTimestamp) { - return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths( + return new ArrayList<>(HoodieTableMetadataUtil.getWritePartitionPaths( timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants().stream().map(instant -> { try { return TimelineUtils.getCommitMetadata(instant, timeline); @@ -101,8 +101,10 @@ public static List parsePartitionValues(List partitionColumns, S } else { // If the partition column size is not equal to the partition fragments size // and the partition column size > 1, we do not know how to map the partition - // fragments to the partition columns and therefore return an empty tuple. We don't - // fail outright so that in some cases we can fallback to reading the table as non-partitioned + // fragments to the partition columns and therefore return an empty tuple. We + // don't + // fail outright so that in some cases we can fallback to reading the table as + // non-partitioned // one throw new RuntimeException("Failed to parse partition values of path: " + partitionPath); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index a8f2a362bfde8d..28805aae63c1e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -37,6 +37,7 @@ import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -48,8 +49,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; @@ -62,6 +61,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -87,7 +88,7 @@ public class HudiScanNode extends HiveScanNode { private static final Logger LOG = LogManager.getLogger(HudiScanNode.class); - private boolean isCowOrRoTable; + private boolean isCowTable; private final AtomicLong noLogsSplitNum = new AtomicLong(0); @@ -113,19 +114,23 @@ public class HudiScanNode extends HiveScanNode { private boolean incrementalRead = false; private TableScanParams scanParams; private IncrementalRelation incrementalRelation; + private SessionVariable sessionVariable; /** * External file scan node for Query Hudi table - * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv + * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column + * priv * eg: s3 tvf - * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check + * These scan nodes do not have corresponding catalog/database/table info, so no + * need to do priv check */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, - Optional scanParams, Optional incrementalRelation) { + Optional scanParams, Optional incrementalRelation, + SessionVariable sessionVariable) { super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); - isCowOrRoTable = hmsTable.isHoodieCowTable(); + isCowTable = hmsTable.isHoodieCowTable(); if (LOG.isDebugEnabled()) { - if (isCowOrRoTable) { + if (isCowTable) { LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getFullQualifiers()); } else { LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", @@ -136,11 +141,12 @@ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumn this.scanParams = scanParams.orElse(null); this.incrementalRelation = incrementalRelation.orElse(null); this.incrementalRead = (this.scanParams != null && this.scanParams.incrementalRead()); + this.sessionVariable = sessionVariable; } @Override public TFileFormatType getFileFormatType() throws UserException { - if (isCowOrRoTable) { + if (canUseNativeReader()) { return super.getFileFormatType(); } else { // Use jni to read hudi table in BE @@ -185,13 +191,13 @@ protected void doInitialize() throws UserException { throw new UserException("Not support function '" + scanParams.getParamType() + "' in hudi table"); } if (incrementalRead) { - if (isCowOrRoTable) { + if (isCowTable) { try { Map serd = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); if ("true".equals(serd.get("hoodie.query.as.ro.table")) && hmsTable.getRemoteTable().getTableName().endsWith("_ro")) { // Incremental read RO table as RT table, I don't know why? - isCowOrRoTable = false; + isCowTable = false; LOG.warn("Execute incremental read on RO table: {}", hmsTable.getFullQualifiers()); } } catch (Exception e) { @@ -236,7 +242,15 @@ protected Map getLocationProperties() throws UserException { @Override protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { if (split instanceof HudiSplit) { - setHudiParams(rangeDesc, (HudiSplit) split); + HudiSplit hudiSplit = (HudiSplit) split; + if (rangeDesc.getFormatType() == TFileFormatType.FORMAT_JNI + && !sessionVariable.isForceJniScanner() + && hudiSplit.getHudiDeltaLogs().isEmpty()) { + // no logs, is read optimize table, fallback to use native reader + // TODO: support read orc hudi table in native reader + rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET); + } + setHudiParams(rangeDesc, hudiSplit); } } @@ -255,10 +269,15 @@ private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) { fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes()); // TODO(gaoxin): support complex types // fileDesc.setNestedFields(hudiSplit.getNestedFields()); + fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner()); tableFormatFileDesc.setHudiParams(fileDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); } + private boolean canUseNativeReader() { + return !sessionVariable.isForceJniScanner() && isCowTable; + } + private List getPrunedPartitions( HoodieTableMetaClient metaClient, Option snapshotTimestamp) throws AnalysisException { List partitionColumnTypes = hmsTable.getPartitionColumnTypes(); @@ -304,7 +323,8 @@ private List getPrunedPartitions( } } } - // unpartitioned table, create a dummy partition to save location and inputformat, + // unpartitioned table, create a dummy partition to save location and + // inputformat, // so that we can unify the interface. HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true, hmsTable.getRemoteTable().getSd().getInputFormat(), @@ -315,7 +335,7 @@ private List getPrunedPartitions( } private List getIncrementalSplits() { - if (isCowOrRoTable) { + if (canUseNativeReader()) { List splits = incrementalRelation.collectSplits(); noLogsSplitNum.addAndGet(splits.size()); return splits; @@ -336,15 +356,15 @@ private void getPartitionSplits(HivePartition partition, List splits) thr globPath = hudiClient.getBasePathV2().toString() + "/*"; } else { partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), - new Path(partition.getPath())); + new StoragePath(partition.getPath())); globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); } - List statuses = FSUtils.getGlobStatusExcludingMetaFolder( - hudiClient.getRawFs(), new Path(globPath)); + List statuses = FSUtils.getGlobStatusExcludingMetaFolder( + hudiClient.getRawHoodieStorage(), new StoragePath(globPath)); HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, - timeline, statuses.toArray(new FileStatus[0])); + timeline, statuses); - if (isCowOrRoTable) { + if (canUseNativeReader()) { fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { noLogsSplitNum.incrementAndGet(); String filePath = baseFile.getPath(); @@ -473,7 +493,7 @@ private HudiSplit generateHudiSplit(FileSlice fileSlice, List partitionV fileSlice.getPartitionPath(); List logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) - .map(Path::toString) + .map(StoragePath::toString) .collect(Collectors.toList()); if (logs.isEmpty()) { noLogsSplitNum.incrementAndGet(); @@ -492,6 +512,7 @@ private HudiSplit generateHudiSplit(FileSlice fileSlice, List partitionV split.setHudiColumnNames(columnNames); split.setHudiColumnTypes(columnTypes); split.setInstantTime(queryInstant); + split.setHudiJniScanner(sessionVariable.getHudiJniScanner()); return split; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java index c72f7621feaa55..2270d2017937da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java @@ -40,6 +40,5 @@ public HudiSplit(LocationPath file, long start, long length, long fileLength, St private List hudiColumnNames; private List hudiColumnTypes; private List nestedFields; + private String hudiJniScanner; } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java index c06fcc2a578d43..7df013599229fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java @@ -20,9 +20,7 @@ import org.apache.doris.spi.Split; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.GlobPattern; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -34,6 +32,8 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.storage.StoragePathInfo; import java.io.IOException; import java.util.ArrayList; @@ -54,7 +54,7 @@ public class MORIncrementalRelation implements IncrementalRelation { private final boolean endInstantArchived; private final List includedCommits; private final List commitsMetadata; - private final FileStatus[] affectedFilesInCommits; + private final List affectedFilesInCommits; private final boolean fullTableScan; private final String globPattern; private final boolean includeStartTime; @@ -96,7 +96,7 @@ public MORIncrementalRelation(Map optParams, Configuration confi includedCommits = getIncludedCommits(); commitsMetadata = getCommitsMetadata(); affectedFilesInCommits = HoodieInputFormatUtils.listAffectedFilesForCommits(configuration, - new Path(metaClient.getBasePath()), commitsMetadata); + metaClient.getBasePathV2(), commitsMetadata); fullTableScan = shouldFullTableScan(); if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME && fullTableScan) { throw new HoodieException("Cannot use stateTransitionTime while enables full table scan"); @@ -152,8 +152,8 @@ private boolean shouldFullTableScan() throws IOException { if (should) { return true; } - for (FileStatus fileStatus : affectedFilesInCommits) { - if (!metaClient.getFs().exists(fileStatus.getPath())) { + for (StoragePathInfo fileStatus : affectedFilesInCommits) { + if (!metaClient.getRawHoodieStorage().exists(fileStatus.getPath())) { return true; } } @@ -199,7 +199,7 @@ public List collectFileSlices() throws HoodieException { String latestCommit = includedCommits.get(includedCommits.size() - 1).getTimestamp(); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, scanTimeline, affectedFilesInCommits); - Stream fileSlices = HoodieInputFormatUtils.getWritePartitionPaths(commitsMetadata) + Stream fileSlices = HoodieTableMetadataUtil.getWritePartitionPaths(commitsMetadata) .stream().flatMap(relativePartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit)); if ("".equals(globPattern)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 59f51c8425c7f2..bf917804fb7b6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -105,9 +105,9 @@ public String toString() { private String serializedTable; public PaimonScanNode(PlanNodeId id, - TupleDescriptor desc, - boolean needCheckColumnPriv, - SessionVariable sessionVariable) { + TupleDescriptor desc, + boolean needCheckColumnPriv, + SessionVariable sessionVariable) { super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv); this.sessionVariable = sessionVariable; } @@ -127,8 +127,7 @@ protected void convertPredicate() { predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts); } - private static final Base64.Encoder BASE64_ENCODER = - java.util.Base64.getUrlEncoder().withoutPadding(); + private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding(); public static String encodeObjectToString(T t) { try { @@ -156,11 +155,24 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value()); TPaimonFileDesc fileDesc = new TPaimonFileDesc(); org.apache.paimon.table.source.Split split = paimonSplit.getSplit(); + + String fileFormat = getFileFormat(paimonSplit.getPathString()); if (split != null) { // use jni reader + rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI); fileDesc.setPaimonSplit(encodeObjectToString(split)); + } else { + // use native reader + if (fileFormat.equals("orc")) { + rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC); + } else if (fileFormat.equals("parquet")) { + rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET); + } else { + throw new RuntimeException("Unsupported file format: " + fileFormat); + } } - fileDesc.setFileFormat(getFileFormat(paimonSplit.getPathString())); + + fileDesc.setFileFormat(fileFormat); fileDesc.setPaimonPredicate(encodeObjectToString(predicates)); fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName()) .collect(Collectors.joining(","))); @@ -172,7 +184,8 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) fileDesc.setTblId(source.getTargetTable().getId()); fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime()); fileDesc.setPaimonTable(encodeObjectToString(source.getPaimonTable())); - // The hadoop conf should be same with PaimonExternalCatalog.createCatalog()#getConfiguration() + // The hadoop conf should be same with + // PaimonExternalCatalog.createCatalog()#getConfiguration() fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties()); Optional optDeletionFile = paimonSplit.getDeletionFile(); if (optDeletionFile.isPresent()) { @@ -190,8 +203,8 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) @Override public List getSplits() throws UserException { boolean forceJniScanner = sessionVariable.isForceJniScanner(); - SessionVariable.IgnoreSplitType ignoreSplitType = - SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType()); + SessionVariable.IgnoreSplitType ignoreSplitType = SessionVariable.IgnoreSplitType + .valueOf(sessionVariable.getIgnoreSplitType()); List splits = new ArrayList<>(); int[] projected = desc.getSlots().stream().mapToInt( slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName()))) @@ -288,7 +301,8 @@ public List getSplits() throws UserException { } this.selectedPartitionNum = selectedPartitionValues.size(); // TODO: get total partition number - // We should set fileSplitSize at the end because fileSplitSize may be modified in splitFile. + // We should set fileSplitSize at the end because fileSplitSize may be modified + // in splitFile. splits.forEach(s -> s.setTargetSplitSize(fileSplitSize)); return splits; } @@ -318,8 +332,9 @@ public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundExce @Override public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { - // return new ArrayList<>(source.getPaimonTable().partitionKeys()); - //Paymon is not aware of partitions and bypasses some existing logic by returning an empty list + // return new ArrayList<>(source.getPaimonTable().partitionKeys()); + // Paymon is not aware of partitions and bypasses some existing logic by + // returning an empty list return new ArrayList<>(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 3a31381854c6c4..a6007e0020fc28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -651,7 +651,7 @@ public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTransla + " for Hudi table"); PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan; ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false, - hudiScan.getScanParams(), hudiScan.getIncrementalRelation()); + hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), ConnectContext.get().getSessionVariable()); if (fileScan.getTableSnapshot().isPresent()) { ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index d94ad0a2552240..df898c69ebcbfc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1969,7 +1969,7 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s + "please set enable_nereids_planner = true to enable new optimizer"); } scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, - Optional.empty(), Optional.empty()); + Optional.empty(), Optional.empty(), ConnectContext.get().getSessionVariable()); break; case ICEBERG: scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 3ac50c9d7b53a1..35828c90f8aa0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -633,6 +633,8 @@ public class SessionVariable implements Serializable, Writable { public static final String FORCE_JNI_SCANNER = "force_jni_scanner"; + public static final String HUDI_JNI_SCANNER = "hudi_jni_scanner"; + public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = "enable_count_push_down_for_external_table"; public static final String SHOW_ALL_FE_CONNECTION = "show_all_fe_connection"; @@ -2072,6 +2074,10 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"}) private boolean forceJniScanner = false; + @VariableMgr.VarAttr(name = HUDI_JNI_SCANNER, description = { "使用那种hudi jni scanner, 'hadoop' 或 'spark'", + "Which hudi jni scanner to use, 'hadoop' or 'spark'" }) + private String hudiJniScanner = "hadoop"; + @VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE, description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown optimization for external table"}) private boolean enableCountPushDownForExternalTable = true; @@ -4482,6 +4488,10 @@ public boolean isForceJniScanner() { return forceJniScanner; } + public String getHudiJniScanner() { + return hudiJniScanner; + } + public String getIgnoreSplitType() { return ignoreSplitType; } @@ -4502,6 +4512,10 @@ public void setForceJniScanner(boolean force) { forceJniScanner = force; } + public void setHudiJniScanner(String hudiJniScanner) { + this.hudiJniScanner = hudiJniScanner; + } + public boolean isEnableCountPushDownForExternalTable() { return enableCountPushDownForExternalTable; } diff --git a/fe/pom.xml b/fe/pom.xml index d78cfd50b819b4..0d2e3f70aa6e6a 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -317,7 +317,7 @@ under the License. 1.11.4 17.0.0 - 0.14.1 + 0.15.0 2.7.4-11 3.0.0-8 @@ -371,7 +371,7 @@ under the License. 435 2.1.1 9.4 - 202 + 202 1.2.27 12.22.0 5.3.0 @@ -1649,7 +1649,7 @@ under the License. io.airlift concurrent - ${airlift.version} + ${airlift.concurrent.version} com.azure diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0bbd364fda1c2a..3a0a995ca459e2 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -353,7 +353,6 @@ struct TMaxComputeFileDesc { 1: optional string partition_spec // deprecated 2: optional string session_id 3: optional string table_batch_read_session - } struct THudiFileDesc { @@ -367,6 +366,7 @@ struct THudiFileDesc { 8: optional list column_names; 9: optional list column_types; 10: optional list nested_fields; + 11: optional string hudi_jni_scanner; } struct TLakeSoulFileDesc { @@ -406,6 +406,7 @@ enum TTextSerdeType { struct TFileScanRangeParams { // deprecated, move to TFileScanRange 1: optional Types.TFileType file_type; + // deprecated, move to TFileScanRange 2: optional TFileFormatType format_type; // deprecated, move to TFileScanRange 3: optional TFileCompressType compress_type; @@ -480,6 +481,7 @@ struct TFileRangeDesc { // for hive table, different files may have different fs, // so fs_name should be with TFileRangeDesc 12: optional string fs_name + 13: optional TFileFormatType format_type; } struct TSplitSource { diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out index b1bdad85013bfc..50644f34961942 100644 --- a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out +++ b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out @@ -347,3 +347,177 @@ -- !incremental_9_10 -- 1000 +-- !incremental_1_end -- +9000 + +-- !incremental_earliest_1 -- +1000 + +-- !incremental_2_end -- +8000 + +-- !incremental_earliest_2 -- +2000 + +-- !incremental_1_2 -- +1000 + +-- !incremental_3_end -- +7000 + +-- !incremental_earliest_3 -- +3000 + +-- !incremental_2_3 -- +1000 + +-- !incremental_4_end -- +6000 + +-- !incremental_earliest_4 -- +4000 + +-- !incremental_3_4 -- +1000 + +-- !incremental_5_end -- +5000 + +-- !incremental_earliest_5 -- +5000 + +-- !incremental_4_5 -- +1000 + +-- !incremental_6_end -- +4000 + +-- !incremental_earliest_6 -- +6000 + +-- !incremental_5_6 -- +1000 + +-- !incremental_7_end -- +3000 + +-- !incremental_earliest_7 -- +7000 + +-- !incremental_6_7 -- +1000 + +-- !incremental_8_end -- +2000 + +-- !incremental_earliest_8 -- +8000 + +-- !incremental_7_8 -- +1000 + +-- !incremental_9_end -- +1000 + +-- !incremental_earliest_9 -- +9000 + +-- !incremental_8_9 -- +1000 + +-- !incremental_10_end -- +0 + +-- !incremental_earliest_10 -- +10000 + +-- !incremental_9_10 -- +1000 + +-- !incremental_1_end -- +9000 + +-- !incremental_earliest_1 -- +1000 + +-- !incremental_2_end -- +8000 + +-- !incremental_earliest_2 -- +2000 + +-- !incremental_1_2 -- +1000 + +-- !incremental_3_end -- +7000 + +-- !incremental_earliest_3 -- +3000 + +-- !incremental_2_3 -- +1000 + +-- !incremental_4_end -- +6000 + +-- !incremental_earliest_4 -- +4000 + +-- !incremental_3_4 -- +1000 + +-- !incremental_5_end -- +5000 + +-- !incremental_earliest_5 -- +5000 + +-- !incremental_4_5 -- +1000 + +-- !incremental_6_end -- +4000 + +-- !incremental_earliest_6 -- +6000 + +-- !incremental_5_6 -- +1000 + +-- !incremental_7_end -- +3000 + +-- !incremental_earliest_7 -- +7000 + +-- !incremental_6_7 -- +1000 + +-- !incremental_8_end -- +2000 + +-- !incremental_earliest_8 -- +8000 + +-- !incremental_7_8 -- +1000 + +-- !incremental_9_end -- +1000 + +-- !incremental_earliest_9 -- +9000 + +-- !incremental_8_9 -- +1000 + +-- !incremental_10_end -- +0 + +-- !incremental_earliest_10 -- +10000 + +-- !incremental_9_10 -- +1000 + diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out b/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out index 12dd0cf086d3f0..da7273d4c14ef9 100644 --- a/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out +++ b/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out @@ -31,3 +31,35 @@ 20241118012149007 20241118012149007_0_4 5 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 5 Eva {"age":31.5, "address":"Chengdu"} 20241118012149007 20241118012149007_0_5 6 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 6 Frank {"age":29.2, "address":"Wuhan"} +-- !adding_simple_columns_table -- +20241118012126237 20241118012126237_0_1 1 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 1 Alice \N +20241118012126237 20241118012126237_0_0 2 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 2 Bob \N +20241118012126237 20241118012126237_0_2 3 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 3 Cathy \N +20241118012132306 20241118012132306_0_3 4 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 4 David 25 +20241118012132306 20241118012132306_0_4 5 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 5 Eva 30 +20241118012132306 20241118012132306_0_5 6 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 6 Frank 28 + +-- !altering_simple_columns_table -- +20241118012136512 20241118012136512_0_0 1 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 1 Alice 25.0 +20241118012136512 20241118012136512_0_2 2 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 2 Bob 30.0 +20241118012136512 20241118012136512_0_1 3 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 3 Cathy 28.0 +20241118012138287 20241118012138287_0_3 4 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 4 David 26.0 +20241118012138287 20241118012138287_0_4 5 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 5 Eva 31.5 +20241118012138287 20241118012138287_0_5 6 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 6 Frank 29.2 + +-- !adding_complex_columns_table -- +20241118012144831 20241118012144831_0_1 1 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 1 Alice {"age":25, "address":"Guangzhou", "email":null} +20241118012144831 20241118012144831_0_0 2 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 2 Bob {"age":30, "address":"Shanghai", "email":null} +20241118012144831 20241118012144831_0_2 3 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 3 Cathy {"age":28, "address":"Beijing", "email":null} +20241118012146150 20241118012146150_0_3 4 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 4 David {"age":25, "address":"Shenzhen", "email":"david@example.com"} +20241118012146150 20241118012146150_0_4 5 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 5 Eva {"age":30, "address":"Chengdu", "email":"eva@example.com"} +20241118012146150 20241118012146150_0_5 6 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 6 Frank {"age":28, "address":"Wuhan", "email":"frank@example.com"} + +-- !altering_complex_columns_table -- +20241118012147879 20241118012147879_0_0 1 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 1 Alice {"age":25, "address":"Guangzhou"} +20241118012147879 20241118012147879_0_2 2 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 2 Bob {"age":30, "address":"Shanghai"} +20241118012147879 20241118012147879_0_1 3 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 3 Cathy {"age":28, "address":"Beijing"} +20241118012149007 20241118012149007_0_3 4 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 4 David {"age":26, "address":"Shenzhen"} +20241118012149007 20241118012149007_0_4 5 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 5 Eva {"age":31.5, "address":"Chengdu"} +20241118012149007 20241118012149007_0_5 6 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 6 Frank {"age":29.2, "address":"Wuhan"} + diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out b/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out index efad67ffbfa8c407fa42d0dd28f569f492bd1d12..1e151c2a86fa200668caaaf943a01f9e4c11f5a3 100644 GIT binary patch delta 40026 zcmeHQ36vDoxjwb@46PA`L5A62W>{6KseKtlLQoS?mONCbAzHHAZZs#RAFHz6?ZdDI9karkHeG|0K zsj9AfYrA#pzyJO3*Wdr&SHFGi_wGLa`c+q5doy3U_GaPvqvor+C7VfAR&_;Dbj4tM z$mx(fOxx0vt}Hv6@1-S6@m)!`EJt!Q&5>-^Ry4y=JjImh`m2S714B78scWX~I7wX& zpC~Dcr0C0JxlOa$WG7{7nxU%6LcX6Q^^`3q+vXat-SgvUnRDifXeU+9+VsU}XTd7= zI}n~XEph6J#uIlTV>;HcSmkDH+OW=Q#|F#MSfi)Qrlgw53tGt~z1?mm+j?R0(pHaj zWwPnkw${~MnQqcLv$?ge&>sZI=DuUoGj^jO1BhSw{ef|T(&b)=ueZ@Hc2t9lxfJQ2zu{##X8H;RYO;N$#nEU!Uymr z+wh4Lc!BNPfvl^F%^r{`ke-gL+k;!Q_U1_+l40YRaQ|0-73o}KCLc00pYE5FHtT$c0|*VT>}Eko02OO~HA#gPqJ zO$WLpn=*>!S+?ZaS|C}n;n;!i+C;YL$`xGAG}0SZfbSMaXD*MLxu7+l=_8e;fIJnC z>1?Er-7Kg_lI}jTrgQF28(u zUHR?LA$^7BrNkq?Pn9#p1_7;ruKSJHO1DiD8XYI?yT0p6t||MdhcrPw=&qzGULeb| zY7^Jl|MU2{tWWZE-VDB`xaqtEEo>K$Nhq?M(&@e1_(r9*RV2;vG?$=hs%rRE zDRLor zH!pPa^tQXX34-n@#d-HnrdRw~sG-a6;~S=%s*y~~p6xqoTGBLvItwgIa)_-+rlvZs zqXvrS;BsNnzz|M!lZx)hwvsd|RED-pRhi0A@nC>FU6)u9*DA;TA6}%A=c%|Q?O+xVwum>MxT0oOO))GISM<+VI^26HF;6? z#AXRQmtjqaa+HOwdwH~5)-;-gdbyxAn+w~Dn+@Bl+ykvs7}Pw4-0Cb-Gh`LXR_dAa zUBAoCuL=6|1={dO;doKAY?HoN6edoT>=Xn-F%^8C3KgShDO1)By7?)w{?ncn1g>X6 z)v2~2>FKlrQMP@_cMZ=`Y&+2W0|Yqe&gJsqH?L^a{n^Yuss-IcN~J4LOEl4E{=lW0 zmXbB3uX#xhuT&u}pU7v@ouoi-yP2yKZA-PNJDP7TZhO0t5X*xApewO8Re};sOLpLx zl5P2cVmY!#oKQw9#o6tPM6se+g|2*VHGAOdT)sP9Da@9tq!hY&fmqLJbfv<}8-6N^ zEpGFfT{-B_iZY(!LIa#xAiZ5~HbaW;OSK}NuT$j|p_#S3rYRmFrVLFe`DssqTCzRK zF?AEV+$ElZhH3b6QND5dafN)Qm)$u}20I3~y9XUbWw}&SNtsMH)fOMSsES^BGM`E= zaQ&72q&t(PW0K-H@l$ShH(mL%$4W}zeY97B-; z=wvx?bXWB}_V~&=ThWilBd@akqt}SK3zXOGt}L8k8uW^rggUz9BB6@5zAmQd>ko>_ z@$SX@R9UE{=I7|^lb{7;T;tYrlj+XW_!>HAq0lhhvFs$8Eg@;og057Fq`PTD^7K&r zSix0dyDGhHE7ZUcPIQu*DWh!rSPoDFN}H~<8CuHFWk8Lrf27218+4q&AWY~lw%OOBcJBQ7RQ5kGmo36#$h;_A<#dalDEX$WCuw90=>tbw} znGr3_*jT>&z76K8Cd(D>F|fwX$2E%oNl8o3U{xO~v+?SFXYKtkYQrQxuw{ zsF6w1m+8!Af{gD*jVaUPc4o5ZQ(=De`2>4ytv!SkD)j_Y%nL{m*fne z(-~xShUt@hp|Y*c%NDwF*)?>`Lt?Fn4n?EwKM-|#!F_z=blp^wx&vLO64Yd%!GIu8 ze6H^Ll4p3DE(d5oS|oHsIMGSMA~sbksjKgrtJ*TfY%`2D#Z1YL&5oAMRm)_3PqdUf zAS6y?LV~5{*z*>Qj&`waVCL$qC^=zyR>R&jtiFv66U1Xp9or-OdnaCiUnYI~E%VN0 z;`pnpS6sFlzhZPWz4s*6`OGYLe`A@D_ny{1cXGKeQY2YRTCmdaIUr?gBe4oH-93=1 zAlKaueOh7Xo&$R^!UrUC0~ zwoo^ahGA)?J;~OfI(1zmlC2QJECS#2122tkXn(y09i4+zM22dG!uQf%^^oNzi>Rh0Dx-mvH!b7Al8vBhz8 zTS{n{ff6QN*EC(3q$M+uQDzS|mt#TK>VfWPw(Aoe-EcJW8N!)K6}`5pCsiyMVTB_f zh!W%oi;R(vZL3N$aA6!{=wpR;48Gg3mMWjPcJFr`w|B25+FPE_E+1IVG>sZ%jDBfd zS+|v~n+Z|=QYMEF|L|OFnQFtrG$a-+_2{?o(}A@!%GtYZKO4KF95>2fk~od*Ra1%s zduDZ4u65=>b8CM#jc$uaPq(+Xq}-@?~alRcucCrvX84SAJ% zuF&NcC}#`f1$67ocCIUKd%B9gxe_M+w+6WC39#=g&F9e8Smb7X`eFw+PQXY1N=u@d z_TIoX*}50FuIdo9PZ-a-W2Pn7&@9Qc4NJ3h#|=z<|1yzGp^NVN7qMo1Zne9way6my zO}cBHP|rEEeYv2txT`PDWFws#zDG2WJrhH*u4Y&uIj^|#$2EejTgCQC&BfRLt*MQe zK|qM=N<_oJLl10JlZHm1LYKM>8GtRhe~~ghy{=!Nn>LE|O~eg9R9|5Y2GbSeQs{=1 zR&05wp8j$IryQ}+_xquD`a8QQcez-Tn8(I*bp1`-c+mmOfGV0yA8X<3PxQ33>g%o} z1wJc}Y6U1yT9ce0z(7qk3?oqXFAwSI?ar+sgg%-V#!v3=cFU$M1~%cIm+7o5nxoUZ z@8p{3;?1H$pZ`5qoj4m(+*jQ7AIA%-p;7Ml!sIW{tq(NEFg!((Q50t0XtE+XDlA!5 zg9xS5mSqr#f6&}Y4FZ&Z!~kcW#aOJeFN81c-0HR;MJocys*y7{$k^ZZQS z?v+qJ|rSiiY@#A%#Lch&J>stG^J zRXg7jN;}^Y|7Q$6Z)&3M1SsncsGy<1ICS%QTn(*SD=v#spBVLtQJ)Z$`$(cbW9gos zi*?5<2H@hM;684sI-C#=@f8L%rva_mu+sHFff{!VSF%;h3lzij4AYk64n?;C5g35E zY|EizmWho62gjE_NZ6rm98AV)xu1sw+z5*uRN2_w1~cY05_ zQ`q}yirt|P1$lzdo}4(=0{r4CdfHHQz#v9|zE8s+Z8nf({7Yc#x~t1+T~{Hrnu2@= z+sh2k4}~_ytzz8jK-}tBy5~z`-EpebArB3>g7zG5I!PG=d>12XNt5v}tFD;r|5nw& znAXz*TS==xyO36wr$T^qNq-<*cUdkFM$v>Be6Eh~nrco35OYC{0LkPVy902EoOMK>9&KRG4!p zgs397c+tQRh*KXR9qOI|2JVj>9jZXj-X=E3e5jZY)zZ(k_iQo8&M|I1!l~I`Ugy?8jOn?XE^H6@T73WGqNN!GF={?-= zSW(22EhX?74GPpK-A}_0in()JnkU1tfaq{xDYK%=TEwyG8#i(>D=PNJ+v_+rTyMN5 zC-Qa24}%p2$66h(H(*7xK4=X;HB+VC)OHH&RXP3pk>_fS^Ss_|gkk^1CMw75}2wzbG`(@7>Jr*uI$CBG9cV zp^>I;;Y+0@=X2w#H~c%lyOcXZY~$(9&*6xCJC3}1J3mDz{pJ|)VSyg;0wUb-JCVEj zSn=yT^UUA9ePZLUtLQNg@{gBZyo8gg>7uKJ2CD4lOFPzamx|GyzVI}z>G=Zp^8~xc z?$TUC{E4vPYk0;_ev2bl{EnYKs&wls{K;Zz7(GR#I z>D;^cCi>#r{EpJbclf)+Qu}%0(rT95NFO;uD3v~Q2X{{uUZRn9jzvQZWW>ezF`YHN(0x6e-*>0-g^=b56l(rs4C4pRd}p=`#axhuc9Bni*KNxU4}!h z$cub*hE~S#22JfbPg4{hRy6O<@H!(pjsJ?tMgTs|uag=_fve7r0y~R+rkJ z6SpVAs>@uBQXl=hI};QUzZ&RU>v737pWr`P{sfI*L;ATl#dh{x8|f#%jb!uR#hc1C z_KWW$ofWv31orV7_EvUF6?k(0M1jKwLogg##IVr)eOh8*q#}luecI{rZDM^)l#7XS zA5o&*$OH}>R2hdBF%0AsEGt-OjS-0$R;I`u7sV=!BZkEh!{Ufx|3_%CgRtZV+y9}3 z3JaTmz#PCwCRA7%n_Lu!3Ol?)g$+vEp~VO*hX{kTjZBQNa)>ZFjuH0H5F>1O!NJP0 z!J-(4BM=;{92=~MMLqg|3l2u_{S#tq#Sy{ch+uIP&fQHXh z(Iwvy>u7Vc7$S!=t7-P+L_M85Lwr0!4ljHbhqp*L93h9dj6*uDi%&54@K%wbhRdqy zz{ALO=iw~ihj}3?z|T%W8c5`@91iEdF_)3HwqAKd?y357KBkV9tF8RE; zD*_Oa+{3WLuZj#Y{9S?pg?~t}GVEb!;oDWwgU`QFd;+-Pk}6hbb#&toP>u*U3~Ot{ ztw;km?3%#P!Y@`ajBr{tkinf*6!o=-!GoKsfC}D`peX4c1_+*7u37O-6dHiwiK7?} zc*7`${tec`kE#=R-%(sk6+L%qqW;jCk?&K$xItqoM*HGv0^(@`4zph=JuqK*v-JslE5I&Ej&ZQ z>Zvvdxd1yCRx^xqTVlxY=H57R#Ya~$qU3-WVVlKkH#8O#N0lOqWby5zcQbTzDPLT? zrJ;Co4fuP|Aeoa< z{fze*Uihi4A?xwdS{BHpxbgmw^SCh)zHsd$A>+}hLU4-RKs4{ID!y}cL-D82mKdn{ z!$er*_TMtf<26+b*E}VvQt{0)%`tq#S$`|D9NF8`Z9Er}96=Th!OefB5P*Yr*s!3^ zQwK(9D%3KuaUs2Mn}|vO;we$%DN*APfCrlq^?eEfIH)m(1ysho#{K7xif2NNXF?tB zOsFFe`){!I4+~{P0EZD}HOdR)2~dYQ0qRHu{Tr0EVPS-jwGrhsW@p9oo>tC#IugJU_YLj0q5|#vn8+}`U-B$=DYfSS*a6sg3(r6z zdiTeX1lD&Ekf+&A^Z^;kh}gJxAvbhlV~G4w1POdOA~!C6imRuWoX73D3I5Te%C6DZ ztm5nF_tV_tWp3lv^O4@u$vshal9s=Ui?(*-v5}KB3z$OZ^m4l*(&KBZxtAFrIAIi6 zjzNNP8@GoD;7JE<=5lVh}(!;yB3Mv8z4e>iuCjwPZc|S$y%DGGb&&o` znbsH{>ez-uk%M$tz#DHzf#D#%rJB(iA4q`A_;eKm{JtYb>_xomD-m%qd~5qT6gc#J z4wHNS6Ss>zpK<#-`mcXPNkY$O7RG2#namg-YX5IkMZ{oWj{_%+5h?;Mp7wPac51pXtQ9sAws@X`t zFq{8whzY8rBA4edxDuVU6vrZ;XFPjPneezz)q(KXcNsqn!sGiC3UbiW8Wsu|yEm_4 zgL=$yT{*R~q1d$fCa#et9}pC}_^)Dh?b(>;zC5YCA5DzCAeSY5#dWpgMGMo{)0Qkh zXNn^mvdRK!$)=3xe4b@Xo~;Fvg-K%Uz;|sT+jQj$u4WqP4X3`YoGd&J1sR8eJQxnR z&kVUE77B7u&LX?64wzmUmg!@o9q$`(%9 z74t$^M!orPazaH_Ql`Sv{rUGW{fJHv$Ka2H(mE_GGxpE+JwNc$cA)M5_gS9wvxfSrG0M7kVdsa8avW-;P2d#Jm&7c?HOJ!Zu)F>ajZU0_y;qW z9F|cZ-xC-bdv|MwY8w!jO1iG@q!9U2qbuifb!P-Y+S6RkmJF3(!tKD)CC89GDXlx5VDg zVIKfb5_KdM7p2&d0IsaI64N0Zr5L#=5wxrzPKhEBmn)HMhd7aP%#>IpVpbA4RU{E5 zWhG_N`Fds#oI9jS_fPVN4Ae~T-0onfx99i#e&27t`Tgzh{@zWePm1dOnSrOyeKURj z+&43?TrUP+PY-RO)Glz-&~@K+6;1a%MJF~@9E);==v+4m)wrXIGyfyAZ*dJL@`6Jx zT{8;URHEt%r3z6G5p9w>la!PU$I*yUXw;g#pqCt{F!0{XFVVZq+>gGC18n`+k9T0m z#DCvJe{dE@bJ!O*XRDvBH*j=cBmQ!tE2>p^XpB`C8nfl<%-Ai(F>f{u>s%h)Er(`& z%qnwXy)pK1;j?32I5)>@UYS+KCdcZ{nHi1`aHh|iGqVj|!@D)kDwPHK=6r=!qc=QW z?Q4uZCjWoAv4FRx!e*^eype7wh#xv1pV`)wE1%;Qv>`>P)%O^!eDN zqM0Sz#G)m^GflaX4PwmCvMGl zXYXNiTr7Px)1A>xU0irIJJDKqU|>9OY#JDb;!wj;bj@}Z=39nBOka0ZRd*N-S}$zb zRIJN2tw#6Fn`PA*p1izfIbwP$Gaz31bXFbS%c|AleXQy)3rKwb4mS8HR%^5#AKjQW zG|Lue|6^)gkl)W{#p%}z8)gF6mqnS#Ol9#l4*Q;i>3{#ml#!HE^$ww=TTipR!xi zBjc@Sf0`5j`L8mYw`hi42y~4(%q0rdJjfMyeZ{pMOEGoVVzx_&VT#e`^7|Ioa3U*E zhZ3`(5vlc>LU9wwA)-%e+9a_{n&YTCEd;DmZ`+jEa$p?1a)g5ivlu>l<(&ub9>;O- z@%W9lJ;9gjuqSoi5TmuO?mHkMzWmVv#J3t9)#ss0ksX<3l>qmgvl>iA!Z7T{-7ely zTktq+hc8XTc!m;3tRGGf7j&oOsHS0P;=+N>?wsj3hNg*Kx1{?FH()Tfp5j|pWEBjB z>AI@GG83CvP$}lgsU+oXz9!C0=X!36gf5|fmSvV4F}g87DAsN3*fG4@mp`OloC;6! zSc}^_z~&%v;!-QypSGZ@uck-EtG8x{OwaQ)PF*OT=0Ny$j@!p=qUf#*YoU3j?~v7# zsn3fP%XDv@ZLb;ZqdbLsVKtm9dr1*9HM=eODJa0VQe@*{Ebl_`Q)MRqOIzyqmX8bi z{akNG)gdhP^td?oiNc_-8PqmyUr|-f#ng06&Eq(NJ%z9GJw4D@Ppv%1X3EvbDI`^3 zMoPZCUWLK9CD%_(|M!j|@%}^)iGeJfS!xQErls-V4{-C}NUzBAjLsW7qr(d@=Ftl)Y#wMmTpdPjc{ zI8Z#7dWxrcHl{&bD5P#EoVXUTRXd=*yL#EG;niza;U{_ITA=;2$u-k7wWPO}F7%7{ z{S@s#T$4gsJUAaqLHm|U-4JJ=OmD~{3LxUdV&}M6_i{cb9{IK!c zZBN%M#nOohH{ke+jhn96e!x}3w&6HEK|c=jv4#^1wyo<_E2!!f^P#GTs6MHhlf)@u z5j3JE%?H0?BkaTXEoyh*&re=?3kQ#4--&(OdoM4(bl1W4pTqH+U&e4hc5ET`M@hsT`&H6U5XCus+6G@#lw6>AS9!Xx;XPrc0-mpRBt_DjJM9c)sG*5aL3_9u}Gjn zJ0NS!vl!wI7q*5Y`Y^9j*EvkF);jxQ_r!4?tzHn;5d6p)mm9N~d(xmdE&9Po%l-2m z0|ypZwfHH-X9ykT$w|$4@rL{WJiIEFUQ2ClJ@cIn<0=fUsl#xnT!SxgWoYCwQ!#z& z83FYi*B48_)zu@0zM5Y*!Vm}HEB$iaYp%3t@R21`TzV$eFC)d#Jk2$02z=|};@kO~ zvY%pA=#w1`wUP!5KPo&WZnq2l z>k_}+P^GaS-I^L~75D8B=l)f?vq#3Fr&zRV?ccwLi4!jrdNY=3wpcJKZtv^rjtzq0 zKsy~1K1hW*^$7wU=9miZmZfkT$%*fCle(T@uc!AeuI0>v=Gca17EIc<2ne9eL)4li zbW+ty1TL9ci5%rf0UCC%b)fOaZ{px1*mp>6jx~-FBskExFlg}VSnKlHo*nWYASID6 zjb^PHDPaY|8!2Ifd$ZNB63#@QODidM{Uo=s^}=AT^H90U5JQT0ztlMa&v+u88y!`2 zv!oG9=~b$SbjorFtY{(CH}1hMYOvvo=TkTw8sL6$cQq$Kj-krsNlMT0a>Em6-p_Y$nU^*xilQ6rcqC=j zuptwRgj6|16}!}IpO}9-n-f11wl$Y^@%!OKN5~k#qX4*#PR8!7~9`# zBMO$5p2bE7i-t8S&$D>t=EHiqvFriDs?5s9PR*7pJm~@UvW2<~Qsru`w*3ygrkMUO zso^dKHXa(GIz&v5b#-?TI8?;1U+?OhFkJ_+z3wWu=E;1GVC4)QIiwX>7N>-pnk&|A z&-RGSxx)JKTIe^u ztTny2Co6q>J9T>@)9-SEB#ro7;XZtiP7&eQ#C8O&07qAxJR&1LGimy^++sNCvrLCUdLOY;N&Tfv?YUQ@Mzmv~}9^irK|5f3XB%qN{ z70o6k+r}-PZ?=BQ*0(Mm>ktQikloQg6+*QU*4`WYueQD*vulV;3!Qx#3vT_yaAs7z z^@VKV{MqrsU!>1J@=W^W?D=zAhm#TeFXac%&%T{G(7%X`#x`^YJO&O^8fZjp0nrp@ zdW^ZgX#m?O7+uIyb;~y`1>ylsGaMv2jvHVN5P^EC$q8|Bnr*uV@l2-p2$L1v!VPpS z9|#HVrN^UQ)FF8#c-SClRk{@YWFf15eK__x`guM_`ObZy+G5$C8=P&JCY#qktm z)wp%4?ZH5JM6q?>#`R2Fz&Re-S}1;P)D+!w4C@?qz3LBFxGsVehdY%-3-=40RV-r~zI9jSh9UHlft2X4IhrH|uvgG`>W9Vtx4 z2RGnl8v9WE=MKEM{cXz>ExS_hb8UkTD-#8EE}vy5%Mvw0fa)^uILy;5 z0EXT~A{}Yb9T4-X^oh4sQe#;Yt19TF-0+xzfRw`Ptcpir6Rnb|Ihv}qPF(Ew2)6>{ z5g1{fqzJh1J&M6~XdzdDve~P(t%kH+nyC70d0u{&)H>#tE7Arfl*@3$CpUKuiEn={ zPdn~_-}1$Uucmv%S8mOAh+}&S{TbYkR_Zq=VxQyR)UkQHN}Z#CU4757WLyQH4N~b7 zOwK@%r>Z8=Jj=6yJBepM$lYW~;DZLP0lx`rhUibXVL_lG&@bZIp6cq%wF8I~evtz3 zQGukIL|p3Y8dzM*WeNsNH;@lKN|7K@hN2pWh%>1pCelil3Oq6y5aDJ&G?2sSAK!|x z_;0_CvHLSpvarp63`6cGe{iGp!Lz+gVa(&Tbs9k^R& z8~9VzJQvOvY9}uJAlI$RNJM^NVy-h`w504&`BDj2<|{UEq1V!bX#(edFhjD3kz0t% zJ5$}6z2Sm5@q=`4Rs$R?F2B*arSo@*&8phjd=9y49B z^yRMZTdsyQ!g-15HLI&M8}IfU^Fg)W2Eq;)P&yyEf^GYDv|RK$uClaUGd~B3l&tDpEA31^%+eh|d_yqP%x#`7rvF7z}AC5oV z@-V=Wm-2^4Fy44C1_^br-THweRUE{dgh8-XoYC#^(YmR2tcyQi{TK4%`#u)LE+t+c z0J9QjmoB3&(SfLfBT<2!%lS7f2&yGbfEbXeIrd0ci$lf`E+iTF?rGMLf4gBVM9t@G zV;x(ws$&sxVKg&5j%Zi4u~MsTih?YvWLUrkq-*KB7iJ@10Mwko(GhzD9JiIjr*n$} zUDNCYI-MJ ze{22sdb29k46*bjWQ~vSOAk!Ap5d{P% z&58HFm+$%LirK*D?SC#aD2iT=wn6XW!Pzi2-pTd5GZdFQayMmkl+Wm6x$WZ4Te1^3 zOTVnz$lM(6Qx%{9lSn-DPDYhP3AkagfT+|$Qgi0jR5$KBKt)Uw{uq%S@?Bu%o=hA8 zULx~mX5bMMv6_tZRF$+NJ!w5-tzE-~;F^hzV0Ss2w4!WMpVZY!qL+wj+B&K&AJ#sk ze%*~-zRHlU6Z?dR;fMeFSpD^9ad6iS64AuAczkDk5RU^V?-|3<#AXRtp14VB?5D9y z)!r@B(naj?CVf6O`85;L=P$^bEVc*Zmv}rGzx?|rr25C3OkuZvwjCtOwTp3+0BI{! zdD#m@uTiOKfyb2=S9&mLHfQ3(pQO7xc1b)_-2PN%a~2Uf0P(|xks_rf-B2|o-_aLb zDZ2ueMz-J>;>=R2Zw9#`B6k#}9T#?3w`4?xqLPg1HRf8b3Eh=5NdglUOapHveV4>1 z<$E){3NsXCw27F|*2HLUdMHK3g`cI#4Q(jAjd_VDewf>maU4@z{+$jW5j!CG79*N# zM&%Y2sf7a!()Ju%fx;s2a#X|OmiYde{MPa1_<1&L#4x-EtYoDQ2zD~lFN#a)oS0(i zQtR~g?1t3^;R7L7uo2ZQvW~5)trLH<^Qglu9_YYp7%qrvBE%at0}m;k$^wl6DEFfB zOmc0O?eMDgk=ZFceqv1wt2O=Pka%%_jt<<#Y750Z2w#$+(NtKimU(Q`5IM+V6be3n znd%Uq-q&@5IC*_X?{p?+#$Bpg!d zqz=Xdh2#M}B}x6vZ@F*acnylMtkBUM;n~VFF$96kqX~Y(*e!b_Os49$v;R{7l~D$c zydTj05QWJ}1kN7l2T~Ff?1fl15{c9&EA_CIx_C3}tgI!20$rIlOPfel{$6Lo(|G286$V8DI0#Zld7yv>=tfgXwxMLx)fSVVobiUhY&I3GK zCFfU)n|L1#4OtpX$UFtBAcC_nujvfw2MK`}KLK~tuExi)i43+|m;OV4+R$3>ePffj z*qhFt|4&b*JJRAM3jpW-r@LPJ5O&!*yy&Rh@_mY0H8Mvej)28o6Y_82J+5H5VeJfcym z;xyey;NyWPVod=$hmAr>*meR`dUSYk)pabCNaT4Q08SrOJ7765WmE!OG6IG*K@e*@ z27s;@Yv~g^tkj@*{NHu_OMoqz?`FR6i|qMdWK)L=;{8AC7~H%@X42Wk+%;y>HD=OE z|9p*^1cQ|H(|_NYNv)0_Z`lIObS-qF)~aPYXsV6cAYb7PcvBVnKOV>C_D*!g&Tr0|~ ze_uFB|Gb#`=+ig%ubGw<$=6!=OaBjPNx@6kXi320uF;YnLdos_H!Vqkmh^G4bD}`R z^>R=)6^@CwzmPsAej)#Sa~#8*Ji7P~(i_El0G7@#mQq9Gi$J>qwDI_00io$a2a^_f z7HY4k6%Y@QE(j3-s=74WH*IuZ;6usdF=TTbno*R%2eXk%fZpXe%>)mFXkeuJc7#de zds)E1e*|L?C0`HRrU0j@mg!r>r;0b_cu~g=zL>F;=l||-`hi~) zL*wcGt*VY{v~Hta2n;L*B|?y|0vG%Ol{i2JP4sPLW8W^Oi z1^7^v4FjeRBC^I*&cG@GVH&WxrmA3Xq4tkDI~DX#Qv-`@xQIyhsL&}XKz6Q=zwHV*Ngyj z{wR2avWCP^@s0V8kD!;M^=*5@@L|BQ=uDGsPOG_)cZ1Dy`78MipfI8nMDmV`DlMs? z7lGf3?@u7-4roUzMWYJJO#NdFttc)7BL~cK&?0ffP-;+A$}QDIMbXj3D=&6#61V@8 zT*olJ=v2F34{i^&>jZ=h1o`$)b`CzhM6>rUfc|pqYv6j{`$wI>CenUp1NcyA@`#{p zw1YV|XAaO;NHZ8~xWTC1*gA-Dz_Gz(1*e%`UED%9+@j|A1sJ>7~FPmJqQ<6lQ zm<3f$iR#Cd=^LR^*VVw+C{6)@M+(0oBMe*o=(*hHZPYZ6 zq5$&C(G+p}_c}MAlv+Tc3F-p!y=Miou#SEPNATC`rMV?4F1HEz4FmP z)GsNIKqLYi)NjVb4+fTE%n_F4uCO-4s^!nbyb9^FW2Xd;{^Zy$)W*w_KANa+gPJ(x zFSEJim&a}tAvsz<_OQA6h*glaD7D1;J(-a)tTvnoc%!SFI6^zL)FAfLT4%r5H|{f& znW&7K93DsK;59OEag+eA2c&fkR07=mGhJJ5L5r(=3_uifeArF!e_;!wc}kq!jd*R{ zJLz2O-3z6OUG-To(a@hBH@dV*N^veR#bh5@xhj_K2agziSK`7_X0$awhdu%|z&{Y# zEeEm)(vgLD6D=5^KcLmYKzpurRd+gwq0tPJL>3NOZ7r-yPHTK8>fs!3l+h*EzHikA zS_2cI0Q&RM3?h0cD$~O!aSmv_=VK^$Tma_O*GJG z2xmcTAs~i>!lg&e2uKFd>yL^z291f)|DEbV%r6<5J__tE*qMqSFi>Q;2EHpru^jYk z5+F)Kj0L@p7@FU2^gB?`JIHIiHs zN$1Cw#(w`}Kl#>eQjJf>I=D=jgaFOedF9Y@-}mxEFzTUK=FxL3!ucx=IhCfQV1quA zRIYoPETŵ*%F&4#UMw;NJvM*OQW3gQSj)sQV?jdc<+@|r}$B4rbnFqL>>$eGz*IpzndC=`nGm|2&AH zYq*G7&~y;z(SD6Ba!_W}24J8ma?;RCIw*{I+)9FM*b9N%jQvP;>rT8#--OMKUogHQ zc46F%)NN6IAW#3?Aq!vPMk3eXd1xqy zPpG1&Kl%)9xJW|qRvn#<38%0Jw3>F)0B(qwdMY!KhO>u&|q(DNyAcoGp`~bIzDqOmE*ZBKvB?b zYBLLXjH7{_t>bnA>p(X%(LE>`1wRVC^5`~0|A)uHxYsoNDjkF-^sdP+HqT>d7({D0 zv2dD6EIbqgy^eG@aws4o=+&_;%|t7ofu}3L0YD~jo(6tafF5Z)X+gzlt_xKKT}v@2 PdgD~x1=mwfuIv8*SMN=H diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out index a9b5d23595a8e3..00d15805baf04e 100644 --- a/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out +++ b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out @@ -119,3 +119,123 @@ -- !timetravel10 -- 10000 +-- !timetravel1 -- +1000 + +-- !timetravel2 -- +2000 + +-- !timetravel3 -- +3000 + +-- !timetravel4 -- +4000 + +-- !timetravel5 -- +5000 + +-- !timetravel6 -- +6000 + +-- !timetravel7 -- +7000 + +-- !timetravel8 -- +8000 + +-- !timetravel9 -- +9000 + +-- !timetravel10 -- +10000 + +-- !timetravel1 -- +1000 + +-- !timetravel2 -- +2000 + +-- !timetravel3 -- +3000 + +-- !timetravel4 -- +4000 + +-- !timetravel5 -- +5000 + +-- !timetravel6 -- +6000 + +-- !timetravel7 -- +7000 + +-- !timetravel8 -- +8000 + +-- !timetravel9 -- +9000 + +-- !timetravel10 -- +10000 + +-- !timetravel1 -- +1000 + +-- !timetravel2 -- +2000 + +-- !timetravel3 -- +3000 + +-- !timetravel4 -- +4000 + +-- !timetravel5 -- +5000 + +-- !timetravel6 -- +6000 + +-- !timetravel7 -- +7000 + +-- !timetravel8 -- +8000 + +-- !timetravel9 -- +9000 + +-- !timetravel10 -- +10000 + +-- !timetravel1 -- +1000 + +-- !timetravel2 -- +2000 + +-- !timetravel3 -- +3000 + +-- !timetravel4 -- +4000 + +-- !timetravel5 -- +5000 + +-- !timetravel6 -- +6000 + +-- !timetravel7 -- +7000 + +-- !timetravel8 -- +8000 + +-- !timetravel9 -- +9000 + +-- !timetravel10 -- +10000 + diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy index f2082ef89c7a50..149eecf5817bd4 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy @@ -36,4 +36,4 @@ suite("test_hudi_catalog", "p2,external,hudi,external_remote,external_remote_hud def tables = sql """ show tables; """ assertTrue(tables.size() > 0) sql """drop catalog if exists ${catalog_name};""" -} \ No newline at end of file +} diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy index 8cc1d2a852b8c4..885903646cc5b5 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy @@ -60,7 +60,6 @@ suite("test_hudi_incremental", "p2,external,hudi,external_remote,external_remote "20241114152009764", "20241114152011901", ] - test_hudi_incremental_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_cow_partition order by _hoodie_commit_time;" def timestamps_cow_partition = [ @@ -75,7 +74,6 @@ suite("test_hudi_incremental", "p2,external,hudi,external_remote,external_remote "20241114152147114", "20241114152156417", ] - test_hudi_incremental_querys("user_activity_log_cow_partition", timestamps_cow_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_mor_non_partition order by _hoodie_commit_time;" def timestamps_mor_non_partition = [ @@ -90,7 +88,6 @@ suite("test_hudi_incremental", "p2,external,hudi,external_remote,external_remote "20241114152028770", "20241114152030746", ] - test_hudi_incremental_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_mor_partition order by _hoodie_commit_time;" def timestamps_mor_partition = [ @@ -105,7 +102,18 @@ suite("test_hudi_incremental", "p2,external,hudi,external_remote,external_remote "20241114152323587", "20241114152334111", ] + + test_hudi_incremental_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) + test_hudi_incremental_querys("user_activity_log_cow_partition", timestamps_cow_partition) + test_hudi_incremental_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) + test_hudi_incremental_querys("user_activity_log_mor_partition", timestamps_mor_partition) + sql """set force_jni_scanner=true;""" + // don't support incremental query for cow table by jni reader + // test_hudi_incremental_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) + // test_hudi_incremental_querys("user_activity_log_cow_partition", timestamps_cow_partition) + test_hudi_incremental_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) test_hudi_incremental_querys("user_activity_log_mor_partition", timestamps_mor_partition) + // sql """set force_jni_scanner=false;""" sql """drop catalog if exists ${catalog_name};""" -} \ No newline at end of file +} diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy index b247aaf492400d..0da88447cdef15 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy @@ -33,7 +33,18 @@ suite("test_hudi_schema_evolution", "p2,external,hudi,external_remote,external_r sql """ switch ${catalog_name};""" sql """ use regression_hudi;""" sql """ set enable_fallback_to_original_planner=false """ + + qt_adding_simple_columns_table """ select * from adding_simple_columns_table order by id """ + qt_altering_simple_columns_table """ select * from altering_simple_columns_table order by id """ + // qt_deleting_simple_columns_table """ select * from deleting_simple_columns_table order by id """ + // qt_renaming_simple_columns_table """ select * from renaming_simple_columns_table order by id """ + qt_adding_complex_columns_table """ select * from adding_complex_columns_table order by id """ + qt_altering_complex_columns_table """ select * from altering_complex_columns_table order by id """ + // qt_deleting_complex_columns_table """ select * from deleting_complex_columns_table order by id """ + // qt_renaming_complex_columns_table """ select * from renaming_complex_columns_table order by id """ + + sql """set force_jni_scanner = true;""" qt_adding_simple_columns_table """ select * from adding_simple_columns_table order by id """ qt_altering_simple_columns_table """ select * from altering_simple_columns_table order by id """ // qt_deleting_simple_columns_table """ select * from deleting_simple_columns_table order by id """ @@ -43,6 +54,7 @@ suite("test_hudi_schema_evolution", "p2,external,hudi,external_remote,external_r qt_altering_complex_columns_table """ select * from altering_complex_columns_table order by id """ // qt_deleting_complex_columns_table """ select * from deleting_complex_columns_table order by id """ // qt_renaming_complex_columns_table """ select * from renaming_complex_columns_table order by id """ + sql """set force_jni_scanner = false;""" sql """drop catalog if exists ${catalog_name};""" -} \ No newline at end of file +} diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy index 53c09e6d5a9031..89d89709b3c822 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy @@ -64,7 +64,7 @@ suite("test_hudi_snapshot", "p2,external,hudi,external_remote,external_remote_hu qt_q09 """SELECT * FROM ${table_name} WHERE struct_element(struct_element(address, 'coordinates'), 'latitude') BETWEEN 0 AND 100 AND struct_element(struct_element(address, 'coordinates'), 'longitude') BETWEEN 0 AND 100 ORDER BY event_time LIMIT 5;""" // Query records with ratings above a specific value and limit output - qt_q10 """SELECT * FROM ${table_name} WHERE rating > 4.5 ORDER BY rating DESC LIMIT 5;""" + qt_q10 """SELECT * FROM ${table_name} WHERE rating > 4.5 ORDER BY event_time DESC LIMIT 5;""" // Query all users' signup dates and limit output qt_q11 """SELECT user_id, signup_date FROM ${table_name} ORDER BY signup_date DESC LIMIT 10;""" @@ -79,13 +79,20 @@ suite("test_hudi_snapshot", "p2,external,hudi,external_remote,external_remote_hu qt_q14 """SELECT * FROM ${table_name} WHERE signup_date = '2024-01-15' ORDER BY user_id LIMIT 5;""" // Query the total count of purchases for each user and limit output - qt_q15 """SELECT user_id, array_size(purchases) AS purchase_count FROM ${table_name} ORDER BY purchase_count DESC LIMIT 5;""" + qt_q15 """SELECT user_id, array_size(purchases) AS purchase_count FROM ${table_name} ORDER BY user_id LIMIT 5;""" } + test_hudi_snapshot_querys("user_activity_log_mor_non_partition") + test_hudi_snapshot_querys("user_activity_log_mor_partition") test_hudi_snapshot_querys("user_activity_log_cow_non_partition") test_hudi_snapshot_querys("user_activity_log_cow_partition") + + sql """set force_jni_scanner=true;""" test_hudi_snapshot_querys("user_activity_log_mor_non_partition") test_hudi_snapshot_querys("user_activity_log_mor_partition") + test_hudi_snapshot_querys("user_activity_log_cow_non_partition") + test_hudi_snapshot_querys("user_activity_log_cow_partition") + sql """set force_jni_scanner=false;""" sql """drop catalog if exists ${catalog_name};""" -} \ No newline at end of file +} diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy index c1ba630e4a7d01..36309322558f52 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy @@ -59,4 +59,4 @@ suite("test_hudi_timestamp", "p2,external,hudi,external_remote,external_remote_h // INSERT OVERWRITE hudi_table_with_timestamp VALUES // ('1', 'Alice', timestamp('2024-10-25 08:00:00')), // ('2', 'Bob', timestamp('2024-10-25 09:30:00')), -// ('3', 'Charlie', timestamp('2024-10-25 11:00:00')); \ No newline at end of file +// ('3', 'Charlie', timestamp('2024-10-25 11:00:00')); diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy index 4d458dc4381dcf..cceeaa412202c6 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy @@ -54,7 +54,6 @@ suite("test_hudi_timetravel", "p2,external,hudi,external_remote,external_remote_ "20241114152009764", "20241114152011901", ] - test_hudi_timetravel_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_cow_partition order by _hoodie_commit_time;" def timestamps_cow_partition = [ @@ -69,7 +68,6 @@ suite("test_hudi_timetravel", "p2,external,hudi,external_remote,external_remote_ "20241114152147114", "20241114152156417", ] - test_hudi_timetravel_querys("user_activity_log_cow_partition", timestamps_cow_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_mor_non_partition order by _hoodie_commit_time;" def timestamps_mor_non_partition = [ @@ -84,7 +82,6 @@ suite("test_hudi_timetravel", "p2,external,hudi,external_remote,external_remote_ "20241114152028770", "20241114152030746", ] - test_hudi_timetravel_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_mor_partition order by _hoodie_commit_time;" def timestamps_mor_partition = [ @@ -99,7 +96,17 @@ suite("test_hudi_timetravel", "p2,external,hudi,external_remote,external_remote_ "20241114152323587", "20241114152334111", ] + + test_hudi_timetravel_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) + test_hudi_timetravel_querys("user_activity_log_cow_partition", timestamps_cow_partition) + test_hudi_timetravel_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) + test_hudi_timetravel_querys("user_activity_log_mor_partition", timestamps_mor_partition) + sql """set force_jni_scanner=true;""" + test_hudi_timetravel_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) + test_hudi_timetravel_querys("user_activity_log_cow_partition", timestamps_cow_partition) + test_hudi_timetravel_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) test_hudi_timetravel_querys("user_activity_log_mor_partition", timestamps_mor_partition) + sql """set force_jni_scanner=false;""" sql """drop catalog if exists ${catalog_name};""" -} \ No newline at end of file +}