diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp b/be/src/vec/exec/format/table/hudi_jni_reader.cpp index 33ba92b540a497..cb109bf05a2393 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp @@ -18,7 +18,6 @@ #include "hudi_jni_reader.h" #include -#include #include "runtime/descriptors.h" #include "runtime/runtime_state.h" @@ -65,7 +64,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, {"input_format", _hudi_params.input_format}}; // Use compatible hadoop client to read data - for (auto& kv : _scan_params.properties) { + for (const auto& kv : _scan_params.properties) { if (kv.first.starts_with(HOODIE_CONF_PREFIX)) { params[kv.first] = kv.second; } else { @@ -73,8 +72,15 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, } } - _jni_connector = std::make_unique("org/apache/doris/hudi/HudiJniScanner", params, - required_fields); + if (_hudi_params.hudi_jni_scanner == "hadoop") { + _jni_connector = std::make_unique( + "org/apache/doris/hudi/HadoopHudiJniScanner", params, required_fields); + } else if (_hudi_params.hudi_jni_scanner == "spark") { + _jni_connector = std::make_unique("org/apache/doris/hudi/HudiJniScanner", + params, required_fields); + } else { + DCHECK(false) << "Unsupported hudi jni scanner: " << _hudi_params.hudi_jni_scanner; + } } Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h b/be/src/vec/exec/format/table/hudi_jni_reader.h index e9bb55a69a77e7..bfa0291a61035c 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.h +++ b/be/src/vec/exec/format/table/hudi_jni_reader.h @@ -17,9 +17,7 @@ #pragma once -#include - -#include +#include #include #include #include diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 997eef02090912..ffedf6f0d58461 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -23,18 +23,15 @@ #include #include -#include #include #include #include -#include #include #include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" -#include "common/object_pool.h" #include "io/cache/block_file_cache_profile.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" @@ -47,7 +44,6 @@ #include "vec/common/string_ref.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/columns_with_type_and_name.h" -#include "vec/core/field.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_nullable.h" @@ -720,17 +716,16 @@ Status VFileScanner::_get_next_reader() { // create reader for specific format Status init_status; - TFileFormatType::type format_type = _params->format_type; + // for compatibility, if format_type is not set in range, use the format type of params + TFileFormatType::type format_type = + range.__isset.format_type ? range.format_type : _params->format_type; // JNI reader can only push down column value range bool push_down_predicates = !_is_load && _params->format_type != TFileFormatType::FORMAT_JNI; + // for compatibility, this logic is deprecated in 3.1 if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) { - if (range.table_format_params.table_format_type == "hudi" && - range.table_format_params.hudi_params.delta_logs.empty()) { - // fall back to native reader if there is no log file - format_type = TFileFormatType::FORMAT_PARQUET; - } else if (range.table_format_params.table_format_type == "paimon" && - !range.table_format_params.paimon_params.__isset.paimon_split) { + if (range.table_format_params.table_format_type == "paimon" && + !range.table_format_params.paimon_params.__isset.paimon_split) { // use native reader auto format = range.table_format_params.paimon_params.file_format; if (format == "orc") { diff --git a/build.sh b/build.sh index 6f3ddfa236fb10..8f1262aa76b211 100755 --- a/build.sh +++ b/build.sh @@ -525,6 +525,7 @@ fi if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then modules+=("fe-common") modules+=("be-java-extensions/hudi-scanner") + modules+=("be-java-extensions/hadoop-hudi-scanner") modules+=("be-java-extensions/java-common") modules+=("be-java-extensions/java-udf") modules+=("be-java-extensions/jdbc-scanner") @@ -814,6 +815,7 @@ EOF extensions_modules=("java-udf") extensions_modules+=("jdbc-scanner") extensions_modules+=("hudi-scanner") + extensions_modules+=("hadoop-hudi-scanner") extensions_modules+=("paimon-scanner") extensions_modules+=("trino-connector-scanner") extensions_modules+=("max-compute-scanner") diff --git a/conf/be.conf b/conf/be.conf index fc89b5985b2454..896c7b74f22bc4 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -24,7 +24,7 @@ LOG_DIR="${DORIS_HOME}/log/" JAVA_OPTS="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$LOG_DIR/jni.log -Xloggc:$LOG_DIR/be.gc.log.$CUR_DATE -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=50M -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" # For jdk 17, this JAVA_OPTS will be used as default JVM options -JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Xmx2048m -DlogPath=$LOG_DIR/jni.log -Xlog:gc*:$LOG_DIR/be.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED" +JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8 -Djol.skipHotspotSAAttach=true -Xmx2048m -DlogPath=$LOG_DIR/jni.log -Xlog:gc*:$LOG_DIR/be.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.management/sun.management=ALL-UNNAMED" # Set your own JAVA_HOME # JAVA_HOME=/path/to/jdk/ diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml new file mode 100644 index 00000000000000..4b80d49de17527 --- /dev/null +++ b/fe/be-java-extensions/hadoop-hudi-scanner/pom.xml @@ -0,0 +1,227 @@ + + + + + + be-java-extensions + org.apache.doris + ${revision} + + 4.0.0 + hadoop-hudi-scanner + + + ${basedir}/../../ + 1 + 0.15.0 + 1.11.3 + 1.5.4-2 + 3.1.2-22 + + + + + org.apache.doris + java-common + ${project.version} + + + org.apache.thrift + libthrift + + + + + + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + + + + + org.apache.hadoop + hadoop-common + + + + org.apache.hadoop + hadoop-aws + + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + + org.junit.jupiter + junit-jupiter + + + + + org.apache.hudi + hudi-common + ${hudi.version} + + + org.apache.hbase + hbase-client + + + org.apache.hbase + hbase-server + + + org.apache.thrift + libthrift + + + com.fasterxml.jackson.core + jackson-databind + + + + + + + org.apache.hudi + hudi-io + ${hudi.version} + + + + + org.apache.hudi + hudi-hadoop-mr + ${hudi.version} + + + + + org.apache.parquet + parquet-hadoop-bundle + ${parquet.version} + + + + + org.apache.parquet + parquet-avro + ${parquet.version} + + + + + org.apache.avro + avro + ${avro.version} + + + org.apache.commons + commons-compress + + + + + + io.airlift + concurrent + 202 + + + + + io.airlift + aircompressor + ${aircompressor.version} + + + + com.github.luben + zstd-jni + ${luben.zstd.jni.version} + + + + com.esotericsoftware + kryo-shaded + 4.0.2 + + + + + io.trino.hive + hive-apache + ${hive-apache.version} + + + org.apache.thrift + libthrift + + + org.apache.parquet + * + + + org.apache.avro + * + + + io.airlift + aircompressor + + + + + + + hadoop-hudi-scanner + + + org.apache.maven.plugins + maven-assembly-plugin + + + src/main/resources/package.xml + + + + + + + + + + make-assembly + package + + single + + + + + + + diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java new file mode 100644 index 00000000000000..ae0199d07d27c5 --- /dev/null +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiColumnValue.java @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hudi; + +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.ColumnValue; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.io.LongWritable; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.List; +import java.util.Map; + +public class HadoopHudiColumnValue implements ColumnValue { + private ColumnType dorisType; + private ObjectInspector fieldInspector; + private Object fieldData; + private final ZoneId zoneId; + + public HadoopHudiColumnValue(ZoneId zoneId) { + this.zoneId = zoneId; + } + + public void setRow(Object record) { + this.fieldData = record; + } + + public void setField(ColumnType dorisType, ObjectInspector fieldInspector) { + this.dorisType = dorisType; + this.fieldInspector = fieldInspector; + } + + private Object inspectObject() { + return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData); + } + + @Override + public boolean getBoolean() { + return (boolean) inspectObject(); + } + + @Override + public short getShort() { + return (short) inspectObject(); + } + + @Override + public int getInt() { + return (int) inspectObject(); + } + + @Override + public float getFloat() { + return (float) inspectObject(); + } + + @Override + public long getLong() { + return (long) inspectObject(); + } + + @Override + public double getDouble() { + return (double) inspectObject(); + } + + @Override + public String getString() { + return inspectObject().toString(); + } + + @Override + public byte[] getBytes() { + return (byte[]) inspectObject(); + } + + + @Override + public byte getByte() { + throw new UnsupportedOperationException("Hoodie type does not support tinyint"); + } + + @Override + public BigDecimal getDecimal() { + return ((HiveDecimal) inspectObject()).bigDecimalValue(); + } + + @Override + public LocalDate getDate() { + return LocalDate.ofEpochDay((((DateObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData)) + .toEpochDay()); + } + + @Override + public LocalDateTime getDateTime() { + if (fieldData instanceof Timestamp) { + return ((Timestamp) fieldData).toLocalDateTime(); + } else if (fieldData instanceof TimestampWritableV2) { + return LocalDateTime.ofInstant(Instant.ofEpochSecond((((TimestampObjectInspector) fieldInspector) + .getPrimitiveJavaObject(fieldData)).toEpochSecond()), zoneId); + } else { + long datetime = ((LongWritable) fieldData).get(); + long seconds; + long nanoseconds; + if (dorisType.getPrecision() == 3) { + seconds = datetime / 1000; + nanoseconds = (datetime % 1000) * 1000000; + } else if (dorisType.getPrecision() == 6) { + seconds = datetime / 1000000; + nanoseconds = (datetime % 1000000) * 1000; + } else { + throw new RuntimeException("Hoodie timestamp only support milliseconds and microseconds, " + + "wrong precision = " + dorisType.getPrecision()); + } + return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, nanoseconds), zoneId); + } + } + + @Override + public boolean canGetStringAsBytes() { + return false; + } + + @Override + public boolean isNull() { + return fieldData == null; + } + + @Override + public BigInteger getBigInteger() { + throw new UnsupportedOperationException("Hoodie type does not support largeint"); + } + + @Override + public byte[] getStringAsBytes() { + throw new UnsupportedOperationException("Hoodie type does not support getStringAsBytes"); + } + + @Override + public void unpackArray(List values) { + ListObjectInspector inspector = (ListObjectInspector) fieldInspector; + List items = inspector.getList(fieldData); + ObjectInspector itemInspector = inspector.getListElementObjectInspector(); + for (int i = 0; i < items.size(); i++) { + Object item = items.get(i); + HadoopHudiColumnValue childValue = new HadoopHudiColumnValue(zoneId); + childValue.setRow(item); + childValue.setField(dorisType.getChildTypes().get(0), itemInspector); + values.add(childValue); + } + } + + @Override + public void unpackMap(List keys, List values) { + MapObjectInspector inspector = (MapObjectInspector) fieldInspector; + ObjectInspector keyObjectInspector = inspector.getMapKeyObjectInspector(); + ObjectInspector valueObjectInspector = inspector.getMapValueObjectInspector(); + for (Map.Entry kv : inspector.getMap(fieldData).entrySet()) { + HadoopHudiColumnValue key = new HadoopHudiColumnValue(zoneId); + key.setRow(kv.getKey()); + key.setField(dorisType.getChildTypes().get(0), keyObjectInspector); + keys.add(key); + + HadoopHudiColumnValue value = new HadoopHudiColumnValue(zoneId); + value.setRow(kv.getValue()); + value.setField(dorisType.getChildTypes().get(1), valueObjectInspector); + values.add(value); + } + } + + @Override + public void unpackStruct(List structFieldIndex, List values) { + StructObjectInspector inspector = (StructObjectInspector) fieldInspector; + List fields = inspector.getAllStructFieldRefs(); + for (int i = 0; i < structFieldIndex.size(); i++) { + Integer idx = structFieldIndex.get(i); + HadoopHudiColumnValue value = new HadoopHudiColumnValue(zoneId); + Object obj = null; + if (idx != null) { + StructField sf = fields.get(idx); + obj = inspector.getStructFieldData(fieldData, sf); + } + value.setRow(obj); + value.setField(dorisType.getChildTypes().get(i), fields.get(i).getFieldObjectInspector()); + values.add(value); + } + } +} diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java new file mode 100644 index 00000000000000..f2b38815a366fe --- /dev/null +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hudi; + +import org.apache.doris.common.classloader.ThreadClassLoaderContext; +import org.apache.doris.common.jni.JniScanner; +import org.apache.doris.common.jni.vec.ColumnType; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * HadoopHudiJniScanner is a JniScanner implementation that reads Hudi data using hudi-hadoop-mr. + */ +public class HadoopHudiJniScanner extends JniScanner { + private static final Logger LOG = LoggerFactory.getLogger(HadoopHudiJniScanner.class); + + private static final String HADOOP_CONF_PREFIX = "hadoop_conf."; + + // Hudi data info + private final String basePath; + private final String dataFilePath; + private final long dataFileLength; + private final String[] deltaFilePaths; + private final String instantTime; + private final String serde; + private final String inputFormat; + + // schema info + private final String hudiColumnNames; + private final String[] hudiColumnTypes; + private final String[] requiredFields; + private List requiredColumnIds; + private ColumnType[] requiredTypes; + + // Hadoop info + private RecordReader reader; + private StructObjectInspector rowInspector; + private final ObjectInspector[] fieldInspectors; + private final StructField[] structFields; + private Deserializer deserializer; + private final Map fsOptionsProps; + + // scanner info + private final HadoopHudiColumnValue columnValue; + private final int fetchSize; + private final ClassLoader classLoader; + + public HadoopHudiJniScanner(int fetchSize, Map params) { + this.basePath = params.get("base_path"); + this.dataFilePath = params.get("data_file_path"); + this.dataFileLength = Long.parseLong(params.get("data_file_length")); + if (Strings.isNullOrEmpty(params.get("delta_file_paths"))) { + this.deltaFilePaths = new String[0]; + } else { + this.deltaFilePaths = params.get("delta_file_paths").split(","); + } + this.instantTime = params.get("instant_time"); + this.serde = params.get("serde"); + this.inputFormat = params.get("input_format"); + + this.hudiColumnNames = params.get("hudi_column_names"); + this.hudiColumnTypes = params.get("hudi_column_types").split("#"); + this.requiredFields = params.get("required_fields").split(","); + + this.fieldInspectors = new ObjectInspector[requiredFields.length]; + this.structFields = new StructField[requiredFields.length]; + this.fsOptionsProps = Maps.newHashMap(); + for (Map.Entry entry : params.entrySet()) { + if (entry.getKey().startsWith(HADOOP_CONF_PREFIX)) { + fsOptionsProps.put(entry.getKey().substring(HADOOP_CONF_PREFIX.length()), entry.getValue()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("get hudi params {}: {}", entry.getKey(), entry.getValue()); + } + } + + ZoneId zoneId; + if (Strings.isNullOrEmpty(params.get("time_zone"))) { + zoneId = ZoneId.systemDefault(); + } else { + zoneId = ZoneId.of(params.get("time_zone")); + } + this.columnValue = new HadoopHudiColumnValue(zoneId); + this.fetchSize = fetchSize; + this.classLoader = this.getClass().getClassLoader(); + } + + @Override + public void open() throws IOException { + try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { + initRequiredColumnsAndTypes(); + initTableInfo(requiredTypes, requiredFields, fetchSize); + Properties properties = getReaderProperties(); + initReader(properties); + } catch (Exception e) { + close(); + LOG.warn("failed to open hadoop hudi jni scanner", e); + throw new IOException("failed to open hadoop hudi jni scanner: " + e.getMessage(), e); + } + } + + @Override + public int getNext() throws IOException { + try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + int numRows = 0; + for (; numRows < fetchSize; numRows++) { + if (!reader.next(key, value)) { + break; + } + Object rowData = deserializer.deserialize(value); + for (int i = 0; i < fields.length; i++) { + Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); + columnValue.setRow(fieldData); + // LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}", + // numRows, i, types[i].getName(), types[i].getType().name(), + // fieldInspectors[i].getTypeName()); + columnValue.setField(types[i], fieldInspectors[i]); + appendData(i, columnValue); + } + } + return numRows; + } catch (Exception e) { + close(); + LOG.warn("failed to get next in hadoop hudi jni scanner", e); + throw new IOException("failed to get next in hadoop hudi jni scanner: " + e.getMessage(), e); + } + } + + @Override + public void close() throws IOException { + try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + LOG.warn("failed to close hadoop hudi jni scanner", e); + throw new IOException("failed to close hadoop hudi jni scanner: " + e.getMessage(), e); + } + } + + private void initRequiredColumnsAndTypes() { + String[] splitHudiColumnNames = hudiColumnNames.split(","); + + Map hudiColNameToIdx = + IntStream.range(0, splitHudiColumnNames.length) + .boxed() + .collect(Collectors.toMap(i -> splitHudiColumnNames[i], i -> i)); + + Map hudiColNameToType = + IntStream.range(0, splitHudiColumnNames.length) + .boxed() + .collect(Collectors.toMap(i -> splitHudiColumnNames[i], i -> hudiColumnTypes[i])); + + requiredTypes = Arrays.stream(requiredFields) + .map(field -> ColumnType.parseType(field, hudiColNameToType.get(field))) + .toArray(ColumnType[]::new); + + requiredColumnIds = Arrays.stream(requiredFields) + .mapToInt(hudiColNameToIdx::get) + .boxed().collect(Collectors.toList()); + } + + private Properties getReaderProperties() { + Properties properties = new Properties(); + properties.setProperty("hive.io.file.readcolumn.ids", Joiner.on(",").join(requiredColumnIds)); + properties.setProperty("hive.io.file.readcolumn.names", Joiner.on(",").join(this.requiredFields)); + properties.setProperty("columns", this.hudiColumnNames); + properties.setProperty("columns.types", Joiner.on(",").join(hudiColumnTypes)); + properties.setProperty("serialization.lib", this.serde); + properties.setProperty("hive.io.file.read.all.columns", "false"); + fsOptionsProps.forEach(properties::setProperty); + return properties; + } + + private void initReader(Properties properties) throws Exception { + String realtimePath = dataFileLength != -1 ? dataFilePath : deltaFilePaths[0]; + long realtimeLength = dataFileLength != -1 ? dataFileLength : 0; + Path path = new Path(realtimePath); + FileSplit fileSplit = new FileSplit(path, 0, realtimeLength, (String[]) null); + List logFiles = Arrays.stream(deltaFilePaths).map(HoodieLogFile::new) + .collect(Collectors.toList()); + FileSplit hudiSplit = + new HoodieRealtimeFileSplit(fileSplit, basePath, logFiles, instantTime, false, Option.empty()); + + JobConf jobConf = new JobConf(new Configuration()); + properties.stringPropertyNames().forEach(name -> jobConf.set(name, properties.getProperty(name))); + InputFormat inputFormatClass = createInputFormat(jobConf, inputFormat); + reader = (RecordReader) inputFormatClass + .getRecordReader(hudiSplit, jobConf, Reporter.NULL); + + deserializer = getDeserializer(jobConf, properties, serde); + rowInspector = getTableObjectInspector(deserializer); + for (int i = 0; i < requiredFields.length; i++) { + StructField field = rowInspector.getStructFieldRef(requiredFields[i]); + structFields[i] = field; + fieldInspectors[i] = field.getFieldObjectInspector(); + } + } + + private InputFormat createInputFormat(Configuration conf, String inputFormat) throws Exception { + Class clazz = conf.getClassByName(inputFormat); + Class> cls = + (Class>) clazz.asSubclass(InputFormat.class); + return ReflectionUtils.newInstance(cls, conf); + } + + private Deserializer getDeserializer(Configuration configuration, Properties properties, String name) + throws Exception { + Class deserializerClass = Class.forName(name, true, JavaUtils.getClassLoader()) + .asSubclass(Deserializer.class); + Deserializer deserializer = deserializerClass.getConstructor().newInstance(); + deserializer.initialize(configuration, properties); + return deserializer; + } + + private StructObjectInspector getTableObjectInspector(Deserializer deserializer) throws Exception { + ObjectInspector inspector = deserializer.getObjectInspector(); + Preconditions.checkArgument(inspector.getCategory() == ObjectInspector.Category.STRUCT, + "expected STRUCT: %s", inspector.getCategory()); + return (StructObjectInspector) inspector; + } +} diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml new file mode 100644 index 00000000000000..4bbb2610603363 --- /dev/null +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/resources/package.xml @@ -0,0 +1,41 @@ + + + + jar-with-dependencies + + jar + + false + + + / + true + true + runtime + + + **/Log4j2Plugins.dat + + + + + diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java index 5614f8bcc96eb1..3e07c8917905a3 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java @@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import java.io.BufferedReader; import java.io.File; @@ -75,7 +76,8 @@ public static void killProcess(long pid) { } public static HoodieTableMetaClient getMetaClient(Configuration conf, String basePath) { + HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf); return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> HoodieTableMetaClient.builder() - .setConf(conf).setBasePath(basePath).build()); + .setConf(hadoopStorageConfiguration).setBasePath(basePath).build()); } } diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala index dcc068ad7006d8..fc8d74f9713c26 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala @@ -36,13 +36,15 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.{ConfigUtils, StringUtils} import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.hadoop.CachingPath +import org.apache.hudi.hadoop.fs.CachingPath import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} -import org.apache.hudi.io.storage.HoodieAvroHFileReader +import org.apache.hudi.io.hadoop.HoodieHBaseAvroHFileReader import org.apache.hudi.metadata.HoodieTableMetadataUtil import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkConfUtils, HoodieTableSchema, HoodieTableState} +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.log4j.Logger import org.apache.spark.sql.adapter.Spark3_4Adapter import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters} @@ -430,7 +432,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) { try { if (shouldExtractPartitionValuesFromPartitionPath) { val filePath = new Path(split.dataFilePath) - val tablePathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(tableInformation.metaClient.getBasePathV2) + val tablePathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(new Path(tableInformation.metaClient.getBasePathV2.toUri)) val partitionPathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(filePath.getParent) val relativePath = new URI(tablePathWithoutScheme.toString).relativize(new URI(partitionPathWithoutScheme.toString)).toString val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean @@ -497,8 +499,11 @@ abstract class BaseSplitReader(val split: HoodieSplit) { options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { partitionedFile => { - val reader = new HoodieAvroHFileReader( - hadoopConf, partitionedFile.filePath.toPath, new CacheConfig(hadoopConf)) + var hadoopStorageConfiguration = new HadoopStorageConfiguration(hadoopConf); + var storagePath = new StoragePath(partitionedFile.toPath.toUri.getPath); + var emptySchema = org.apache.hudi.common.util.Option.empty[org.apache.avro.Schema]() + val reader = new HoodieHBaseAvroHFileReader( + hadoopStorageConfiguration, storagePath, emptySchema) val requiredRowSchema = requiredDataSchema.structTypeSchema // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml index bbe056739d51ec..5d56ef76e7c3ef 100644 --- a/fe/be-java-extensions/pom.xml +++ b/fe/be-java-extensions/pom.xml @@ -22,6 +22,7 @@ under the License. 4.0.0 hudi-scanner + hadoop-hudi-scanner java-common java-udf jdbc-scanner diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index d1de1306d04693..5ea2a2637d86e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -433,6 +433,8 @@ private TScanRangeLocations splitToScanRange( } } + // set file format type, and the type might fall back to native format in setScanParams + rangeDesc.setFormatType(getFileFormatType()); setScanParams(rangeDesc, fileSplit); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); TScanRangeLocation location = new TScanRangeLocation(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index 97032467cec765..0f60a61b27538c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -64,6 +64,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -828,8 +829,10 @@ public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction act public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) { String hudiBasePath = table.getRemoteTable().getSd().getLocation(); Configuration conf = getConfiguration(table); + HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf); return HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), - () -> HoodieTableMetaClient.builder().setConf(conf).setBasePath(hudiBasePath).build()); + () -> HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath) + .build()); } public static Configuration getConfiguration(HMSExternalTable table) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java index d7803b1a516f9e..c98d994a28a08f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java @@ -86,7 +86,7 @@ public static String convertAvroToHiveType(Schema schema) { case LONG: if (logicalType instanceof LogicalTypes.TimestampMillis || logicalType instanceof LogicalTypes.TimestampMicros) { - return logicalType.getName(); + return "timestamp"; } if (logicalType instanceof LogicalTypes.TimeMicros) { return handleUnsupportedType(schema); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java index 7981a0b4f261ff..843dded27969ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.StoragePath; import java.io.IOException; import java.util.ArrayList; @@ -105,7 +106,7 @@ public COWIncrementalRelation(Map optParams, Configuration confi List commitsToReturn = commitsTimelineToReturn.getInstants(); // todo: support configuration hoodie.datasource.read.incr.filters - Path basePath = metaClient.getBasePathV2(); + StoragePath basePath = metaClient.getBasePathV2(); Map regularFileIdToFullPath = new HashMap<>(); Map metaBootstrapFileIdToFullPath = new HashMap<>(); HoodieTimeline replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline(); @@ -113,8 +114,8 @@ public COWIncrementalRelation(Map optParams, Configuration confi for (HoodieInstant instant : replacedTimeline.getInstants()) { HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class).getPartitionToReplaceFileIds().forEach( - (key, value) -> value.forEach( - e -> replacedFile.put(e, FSUtils.getPartitionPath(basePath, key).toString()))); + (key, value) -> value.forEach( + e -> replacedFile.put(e, FSUtils.constructAbsolutePath(basePath, key).toString()))); } fileToWriteStat = new HashMap<>(); @@ -123,7 +124,7 @@ public COWIncrementalRelation(Map optParams, Configuration confi commitTimeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); metadata.getPartitionToWriteStats().forEach((partition, stats) -> { for (HoodieWriteStat stat : stats) { - fileToWriteStat.put(FSUtils.getPartitionPath(basePath, stat.getPath()).toString(), stat); + fileToWriteStat.put(FSUtils.constructAbsolutePath(basePath, stat.getPath()).toString(), stat); } }); if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.getTimestamp())) { @@ -158,7 +159,7 @@ public COWIncrementalRelation(Map optParams, Configuration confi } - fs = basePath.getFileSystem(configuration); + fs = new Path(basePath.toUri().getPath()).getFileSystem(configuration); fullTableScan = shouldFullTableScan(); includeStartTime = !fullTableScan; if (fullTableScan || commitsToReturn.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java index 26ef6fdfef7086..fecc026cf8d046 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java @@ -17,10 +17,6 @@ package org.apache.doris.datasource.hudi.source; -import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; import org.apache.hudi.common.data.HoodieData; @@ -39,7 +35,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.StorageConfiguration; import java.util.Collections; import java.util.Iterator; @@ -50,18 +46,20 @@ import java.util.stream.Stream; /** - * This file is copied from org.apache.hudi.common.engine.HoodieLocalEngineContext. + * This file is copied from + * org.apache.hudi.common.engine.HudiLocalEngineContext. * Because we need set ugi in thread pool - * A java based engine context, use this implementation on the query engine integrations if needed. + * A java based engine context, use this implementation on the query engine + * integrations if needed. */ public final class HudiLocalEngineContext extends HoodieEngineContext { - public HudiLocalEngineContext(Configuration conf) { + public HudiLocalEngineContext(StorageConfiguration conf) { this(conf, new LocalTaskContextSupplier()); } - public HudiLocalEngineContext(Configuration conf, TaskContextSupplier taskContextSupplier) { - super(new SerializableConfiguration(conf), taskContextSupplier); + public HudiLocalEngineContext(StorageConfiguration conf, TaskContextSupplier taskContextSupplier) { + super(conf, taskContextSupplier); } @Override @@ -81,27 +79,18 @@ public HoodieData parallelize(List data, int parallelism) { @Override public List map(List data, SerializableFunction func, int parallelism) { - return data.stream().parallel().map(v1 -> { - try { - return HiveMetaStoreClientHelper.ugiDoAs(getHadoopConf().get(), () -> func.apply(v1)); - } catch (Exception e) { - throw new HoodieException("Error occurs when executing map", e); - } - }).collect(Collectors.toList()); + return data.stream().parallel().map(FunctionWrapper.throwingMapWrapper(func)).collect(Collectors.toList()); } @Override public List mapToPairAndReduceByKey( - List data, - SerializablePairFunction mapToPairFunc, - SerializableBiFunction reduceFunc, int parallelism) { + List data, SerializablePairFunction mapToPairFunc, SerializableBiFunction reduceFunc, + int parallelism) { return data.stream().parallel().map(FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc)) - .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() - .map(list -> - list.stream() - .map(e -> e.getValue()) + .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() + .map(list -> list.stream().map(e -> e.getValue()) .reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).get()) - .collect(Collectors.toList()); + .collect(Collectors.toList()); } @Override @@ -109,29 +98,28 @@ public Stream> mapPartitionsToPairAndReduceByKey( Stream data, SerializablePairFlatMapFunction, K, V> flatMapToPairFunc, SerializableBiFunction reduceFunc, int parallelism) { return FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator()) - .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream() - .map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map( - Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))) - .filter(Objects::nonNull); + .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream() + .map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map( + Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))) + .filter(Objects::nonNull); } @Override public List reduceByKey( List> data, SerializableBiFunction reduceFunc, int parallelism) { return data.stream().parallel() - .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() - .map(list -> - list.stream() - .map(e -> e.getValue()) - .reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() + .map(list -> list.stream().map(e -> e.getValue()) + .reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)) + .orElse(null)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); } @Override public List flatMap(List data, SerializableFunction> func, int parallelism) { - return - data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func)).collect(Collectors.toList()); + return data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func)) + .collect(Collectors.toList()); } @Override @@ -142,8 +130,7 @@ public void foreach(List data, SerializableConsumer consumer, int para @Override public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) { return data.stream().map(FunctionWrapper.throwingMapToPairWrapper(func)).collect( - Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal) - ); + Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java index 738b2638588e03..0ab9fef951a378 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; @@ -49,14 +48,15 @@ public List getAllPartitionNames(HoodieTableMetaClient tableMetaClient) .build(); HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create( - new HudiLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig, + new HudiLocalEngineContext(tableMetaClient.getStorageConf()), tableMetaClient.getStorage(), + metadataConfig, tableMetaClient.getBasePathV2().toString(), true); return newTableMetadata.getAllPartitionPaths(); } public List getPartitionNamesBeforeOrEquals(HoodieTimeline timeline, String timestamp) { - return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths( + return new ArrayList<>(HoodieTableMetadataUtil.getWritePartitionPaths( timeline.findInstantsBeforeOrEquals(timestamp).getInstants().stream().map(instant -> { try { return TimelineUtils.getCommitMetadata(instant, timeline); @@ -67,7 +67,7 @@ public List getPartitionNamesBeforeOrEquals(HoodieTimeline timeline, Str } public List getPartitionNamesInRange(HoodieTimeline timeline, String startTimestamp, String endTimestamp) { - return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths( + return new ArrayList<>(HoodieTableMetadataUtil.getWritePartitionPaths( timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants().stream().map(instant -> { try { return TimelineUtils.getCommitMetadata(instant, timeline); @@ -101,8 +101,10 @@ public static List parsePartitionValues(List partitionColumns, S } else { // If the partition column size is not equal to the partition fragments size // and the partition column size > 1, we do not know how to map the partition - // fragments to the partition columns and therefore return an empty tuple. We don't - // fail outright so that in some cases we can fallback to reading the table as non-partitioned + // fragments to the partition columns and therefore return an empty tuple. We + // don't + // fail outright so that in some cases we can fallback to reading the table as + // non-partitioned // one throw new RuntimeException("Failed to parse partition values of path: " + partitionPath); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index a8f2a362bfde8d..28805aae63c1e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -37,6 +37,7 @@ import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -48,8 +49,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; @@ -62,6 +61,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -87,7 +88,7 @@ public class HudiScanNode extends HiveScanNode { private static final Logger LOG = LogManager.getLogger(HudiScanNode.class); - private boolean isCowOrRoTable; + private boolean isCowTable; private final AtomicLong noLogsSplitNum = new AtomicLong(0); @@ -113,19 +114,23 @@ public class HudiScanNode extends HiveScanNode { private boolean incrementalRead = false; private TableScanParams scanParams; private IncrementalRelation incrementalRelation; + private SessionVariable sessionVariable; /** * External file scan node for Query Hudi table - * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv + * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column + * priv * eg: s3 tvf - * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check + * These scan nodes do not have corresponding catalog/database/table info, so no + * need to do priv check */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, - Optional scanParams, Optional incrementalRelation) { + Optional scanParams, Optional incrementalRelation, + SessionVariable sessionVariable) { super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); - isCowOrRoTable = hmsTable.isHoodieCowTable(); + isCowTable = hmsTable.isHoodieCowTable(); if (LOG.isDebugEnabled()) { - if (isCowOrRoTable) { + if (isCowTable) { LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getFullQualifiers()); } else { LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", @@ -136,11 +141,12 @@ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumn this.scanParams = scanParams.orElse(null); this.incrementalRelation = incrementalRelation.orElse(null); this.incrementalRead = (this.scanParams != null && this.scanParams.incrementalRead()); + this.sessionVariable = sessionVariable; } @Override public TFileFormatType getFileFormatType() throws UserException { - if (isCowOrRoTable) { + if (canUseNativeReader()) { return super.getFileFormatType(); } else { // Use jni to read hudi table in BE @@ -185,13 +191,13 @@ protected void doInitialize() throws UserException { throw new UserException("Not support function '" + scanParams.getParamType() + "' in hudi table"); } if (incrementalRead) { - if (isCowOrRoTable) { + if (isCowTable) { try { Map serd = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); if ("true".equals(serd.get("hoodie.query.as.ro.table")) && hmsTable.getRemoteTable().getTableName().endsWith("_ro")) { // Incremental read RO table as RT table, I don't know why? - isCowOrRoTable = false; + isCowTable = false; LOG.warn("Execute incremental read on RO table: {}", hmsTable.getFullQualifiers()); } } catch (Exception e) { @@ -236,7 +242,15 @@ protected Map getLocationProperties() throws UserException { @Override protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { if (split instanceof HudiSplit) { - setHudiParams(rangeDesc, (HudiSplit) split); + HudiSplit hudiSplit = (HudiSplit) split; + if (rangeDesc.getFormatType() == TFileFormatType.FORMAT_JNI + && !sessionVariable.isForceJniScanner() + && hudiSplit.getHudiDeltaLogs().isEmpty()) { + // no logs, is read optimize table, fallback to use native reader + // TODO: support read orc hudi table in native reader + rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET); + } + setHudiParams(rangeDesc, hudiSplit); } } @@ -255,10 +269,15 @@ private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) { fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes()); // TODO(gaoxin): support complex types // fileDesc.setNestedFields(hudiSplit.getNestedFields()); + fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner()); tableFormatFileDesc.setHudiParams(fileDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); } + private boolean canUseNativeReader() { + return !sessionVariable.isForceJniScanner() && isCowTable; + } + private List getPrunedPartitions( HoodieTableMetaClient metaClient, Option snapshotTimestamp) throws AnalysisException { List partitionColumnTypes = hmsTable.getPartitionColumnTypes(); @@ -304,7 +323,8 @@ private List getPrunedPartitions( } } } - // unpartitioned table, create a dummy partition to save location and inputformat, + // unpartitioned table, create a dummy partition to save location and + // inputformat, // so that we can unify the interface. HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true, hmsTable.getRemoteTable().getSd().getInputFormat(), @@ -315,7 +335,7 @@ private List getPrunedPartitions( } private List getIncrementalSplits() { - if (isCowOrRoTable) { + if (canUseNativeReader()) { List splits = incrementalRelation.collectSplits(); noLogsSplitNum.addAndGet(splits.size()); return splits; @@ -336,15 +356,15 @@ private void getPartitionSplits(HivePartition partition, List splits) thr globPath = hudiClient.getBasePathV2().toString() + "/*"; } else { partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), - new Path(partition.getPath())); + new StoragePath(partition.getPath())); globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); } - List statuses = FSUtils.getGlobStatusExcludingMetaFolder( - hudiClient.getRawFs(), new Path(globPath)); + List statuses = FSUtils.getGlobStatusExcludingMetaFolder( + hudiClient.getRawHoodieStorage(), new StoragePath(globPath)); HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, - timeline, statuses.toArray(new FileStatus[0])); + timeline, statuses); - if (isCowOrRoTable) { + if (canUseNativeReader()) { fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { noLogsSplitNum.incrementAndGet(); String filePath = baseFile.getPath(); @@ -473,7 +493,7 @@ private HudiSplit generateHudiSplit(FileSlice fileSlice, List partitionV fileSlice.getPartitionPath(); List logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) - .map(Path::toString) + .map(StoragePath::toString) .collect(Collectors.toList()); if (logs.isEmpty()) { noLogsSplitNum.incrementAndGet(); @@ -492,6 +512,7 @@ private HudiSplit generateHudiSplit(FileSlice fileSlice, List partitionV split.setHudiColumnNames(columnNames); split.setHudiColumnTypes(columnTypes); split.setInstantTime(queryInstant); + split.setHudiJniScanner(sessionVariable.getHudiJniScanner()); return split; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java index c72f7621feaa55..2270d2017937da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java @@ -40,6 +40,5 @@ public HudiSplit(LocationPath file, long start, long length, long fileLength, St private List hudiColumnNames; private List hudiColumnTypes; private List nestedFields; + private String hudiJniScanner; } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java index c06fcc2a578d43..7df013599229fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java @@ -20,9 +20,7 @@ import org.apache.doris.spi.Split; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.GlobPattern; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -34,6 +32,8 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.storage.StoragePathInfo; import java.io.IOException; import java.util.ArrayList; @@ -54,7 +54,7 @@ public class MORIncrementalRelation implements IncrementalRelation { private final boolean endInstantArchived; private final List includedCommits; private final List commitsMetadata; - private final FileStatus[] affectedFilesInCommits; + private final List affectedFilesInCommits; private final boolean fullTableScan; private final String globPattern; private final boolean includeStartTime; @@ -96,7 +96,7 @@ public MORIncrementalRelation(Map optParams, Configuration confi includedCommits = getIncludedCommits(); commitsMetadata = getCommitsMetadata(); affectedFilesInCommits = HoodieInputFormatUtils.listAffectedFilesForCommits(configuration, - new Path(metaClient.getBasePath()), commitsMetadata); + metaClient.getBasePathV2(), commitsMetadata); fullTableScan = shouldFullTableScan(); if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME && fullTableScan) { throw new HoodieException("Cannot use stateTransitionTime while enables full table scan"); @@ -152,8 +152,8 @@ private boolean shouldFullTableScan() throws IOException { if (should) { return true; } - for (FileStatus fileStatus : affectedFilesInCommits) { - if (!metaClient.getFs().exists(fileStatus.getPath())) { + for (StoragePathInfo fileStatus : affectedFilesInCommits) { + if (!metaClient.getRawHoodieStorage().exists(fileStatus.getPath())) { return true; } } @@ -199,7 +199,7 @@ public List collectFileSlices() throws HoodieException { String latestCommit = includedCommits.get(includedCommits.size() - 1).getTimestamp(); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, scanTimeline, affectedFilesInCommits); - Stream fileSlices = HoodieInputFormatUtils.getWritePartitionPaths(commitsMetadata) + Stream fileSlices = HoodieTableMetadataUtil.getWritePartitionPaths(commitsMetadata) .stream().flatMap(relativePartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit)); if ("".equals(globPattern)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 59f51c8425c7f2..bf917804fb7b6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -105,9 +105,9 @@ public String toString() { private String serializedTable; public PaimonScanNode(PlanNodeId id, - TupleDescriptor desc, - boolean needCheckColumnPriv, - SessionVariable sessionVariable) { + TupleDescriptor desc, + boolean needCheckColumnPriv, + SessionVariable sessionVariable) { super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv); this.sessionVariable = sessionVariable; } @@ -127,8 +127,7 @@ protected void convertPredicate() { predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts); } - private static final Base64.Encoder BASE64_ENCODER = - java.util.Base64.getUrlEncoder().withoutPadding(); + private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding(); public static String encodeObjectToString(T t) { try { @@ -156,11 +155,24 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value()); TPaimonFileDesc fileDesc = new TPaimonFileDesc(); org.apache.paimon.table.source.Split split = paimonSplit.getSplit(); + + String fileFormat = getFileFormat(paimonSplit.getPathString()); if (split != null) { // use jni reader + rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI); fileDesc.setPaimonSplit(encodeObjectToString(split)); + } else { + // use native reader + if (fileFormat.equals("orc")) { + rangeDesc.setFormatType(TFileFormatType.FORMAT_ORC); + } else if (fileFormat.equals("parquet")) { + rangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET); + } else { + throw new RuntimeException("Unsupported file format: " + fileFormat); + } } - fileDesc.setFileFormat(getFileFormat(paimonSplit.getPathString())); + + fileDesc.setFileFormat(fileFormat); fileDesc.setPaimonPredicate(encodeObjectToString(predicates)); fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName()) .collect(Collectors.joining(","))); @@ -172,7 +184,8 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) fileDesc.setTblId(source.getTargetTable().getId()); fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime()); fileDesc.setPaimonTable(encodeObjectToString(source.getPaimonTable())); - // The hadoop conf should be same with PaimonExternalCatalog.createCatalog()#getConfiguration() + // The hadoop conf should be same with + // PaimonExternalCatalog.createCatalog()#getConfiguration() fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties()); Optional optDeletionFile = paimonSplit.getDeletionFile(); if (optDeletionFile.isPresent()) { @@ -190,8 +203,8 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) @Override public List getSplits() throws UserException { boolean forceJniScanner = sessionVariable.isForceJniScanner(); - SessionVariable.IgnoreSplitType ignoreSplitType = - SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType()); + SessionVariable.IgnoreSplitType ignoreSplitType = SessionVariable.IgnoreSplitType + .valueOf(sessionVariable.getIgnoreSplitType()); List splits = new ArrayList<>(); int[] projected = desc.getSlots().stream().mapToInt( slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName()))) @@ -288,7 +301,8 @@ public List getSplits() throws UserException { } this.selectedPartitionNum = selectedPartitionValues.size(); // TODO: get total partition number - // We should set fileSplitSize at the end because fileSplitSize may be modified in splitFile. + // We should set fileSplitSize at the end because fileSplitSize may be modified + // in splitFile. splits.forEach(s -> s.setTargetSplitSize(fileSplitSize)); return splits; } @@ -318,8 +332,9 @@ public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundExce @Override public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { - // return new ArrayList<>(source.getPaimonTable().partitionKeys()); - //Paymon is not aware of partitions and bypasses some existing logic by returning an empty list + // return new ArrayList<>(source.getPaimonTable().partitionKeys()); + // Paymon is not aware of partitions and bypasses some existing logic by + // returning an empty list return new ArrayList<>(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 3a31381854c6c4..a6007e0020fc28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -651,7 +651,7 @@ public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTransla + " for Hudi table"); PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan; ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false, - hudiScan.getScanParams(), hudiScan.getIncrementalRelation()); + hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), ConnectContext.get().getSessionVariable()); if (fileScan.getTableSnapshot().isPresent()) { ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index d94ad0a2552240..df898c69ebcbfc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1969,7 +1969,7 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s + "please set enable_nereids_planner = true to enable new optimizer"); } scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, - Optional.empty(), Optional.empty()); + Optional.empty(), Optional.empty(), ConnectContext.get().getSessionVariable()); break; case ICEBERG: scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 3ac50c9d7b53a1..35828c90f8aa0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -633,6 +633,8 @@ public class SessionVariable implements Serializable, Writable { public static final String FORCE_JNI_SCANNER = "force_jni_scanner"; + public static final String HUDI_JNI_SCANNER = "hudi_jni_scanner"; + public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = "enable_count_push_down_for_external_table"; public static final String SHOW_ALL_FE_CONNECTION = "show_all_fe_connection"; @@ -2072,6 +2074,10 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"}) private boolean forceJniScanner = false; + @VariableMgr.VarAttr(name = HUDI_JNI_SCANNER, description = { "使用那种hudi jni scanner, 'hadoop' 或 'spark'", + "Which hudi jni scanner to use, 'hadoop' or 'spark'" }) + private String hudiJniScanner = "hadoop"; + @VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE, description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown optimization for external table"}) private boolean enableCountPushDownForExternalTable = true; @@ -4482,6 +4488,10 @@ public boolean isForceJniScanner() { return forceJniScanner; } + public String getHudiJniScanner() { + return hudiJniScanner; + } + public String getIgnoreSplitType() { return ignoreSplitType; } @@ -4502,6 +4512,10 @@ public void setForceJniScanner(boolean force) { forceJniScanner = force; } + public void setHudiJniScanner(String hudiJniScanner) { + this.hudiJniScanner = hudiJniScanner; + } + public boolean isEnableCountPushDownForExternalTable() { return enableCountPushDownForExternalTable; } diff --git a/fe/pom.xml b/fe/pom.xml index d78cfd50b819b4..0d2e3f70aa6e6a 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -317,7 +317,7 @@ under the License. 1.11.4 17.0.0 - 0.14.1 + 0.15.0 2.7.4-11 3.0.0-8 @@ -371,7 +371,7 @@ under the License. 435 2.1.1 9.4 - 202 + 202 1.2.27 12.22.0 5.3.0 @@ -1649,7 +1649,7 @@ under the License. io.airlift concurrent - ${airlift.version} + ${airlift.concurrent.version} com.azure diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0bbd364fda1c2a..3a0a995ca459e2 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -353,7 +353,6 @@ struct TMaxComputeFileDesc { 1: optional string partition_spec // deprecated 2: optional string session_id 3: optional string table_batch_read_session - } struct THudiFileDesc { @@ -367,6 +366,7 @@ struct THudiFileDesc { 8: optional list column_names; 9: optional list column_types; 10: optional list nested_fields; + 11: optional string hudi_jni_scanner; } struct TLakeSoulFileDesc { @@ -406,6 +406,7 @@ enum TTextSerdeType { struct TFileScanRangeParams { // deprecated, move to TFileScanRange 1: optional Types.TFileType file_type; + // deprecated, move to TFileScanRange 2: optional TFileFormatType format_type; // deprecated, move to TFileScanRange 3: optional TFileCompressType compress_type; @@ -480,6 +481,7 @@ struct TFileRangeDesc { // for hive table, different files may have different fs, // so fs_name should be with TFileRangeDesc 12: optional string fs_name + 13: optional TFileFormatType format_type; } struct TSplitSource { diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out index b1bdad85013bfc..50644f34961942 100644 --- a/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out +++ b/regression-test/data/external_table_p2/hudi/test_hudi_incremental.out @@ -347,3 +347,177 @@ -- !incremental_9_10 -- 1000 +-- !incremental_1_end -- +9000 + +-- !incremental_earliest_1 -- +1000 + +-- !incremental_2_end -- +8000 + +-- !incremental_earliest_2 -- +2000 + +-- !incremental_1_2 -- +1000 + +-- !incremental_3_end -- +7000 + +-- !incremental_earliest_3 -- +3000 + +-- !incremental_2_3 -- +1000 + +-- !incremental_4_end -- +6000 + +-- !incremental_earliest_4 -- +4000 + +-- !incremental_3_4 -- +1000 + +-- !incremental_5_end -- +5000 + +-- !incremental_earliest_5 -- +5000 + +-- !incremental_4_5 -- +1000 + +-- !incremental_6_end -- +4000 + +-- !incremental_earliest_6 -- +6000 + +-- !incremental_5_6 -- +1000 + +-- !incremental_7_end -- +3000 + +-- !incremental_earliest_7 -- +7000 + +-- !incremental_6_7 -- +1000 + +-- !incremental_8_end -- +2000 + +-- !incremental_earliest_8 -- +8000 + +-- !incremental_7_8 -- +1000 + +-- !incremental_9_end -- +1000 + +-- !incremental_earliest_9 -- +9000 + +-- !incremental_8_9 -- +1000 + +-- !incremental_10_end -- +0 + +-- !incremental_earliest_10 -- +10000 + +-- !incremental_9_10 -- +1000 + +-- !incremental_1_end -- +9000 + +-- !incremental_earliest_1 -- +1000 + +-- !incremental_2_end -- +8000 + +-- !incremental_earliest_2 -- +2000 + +-- !incremental_1_2 -- +1000 + +-- !incremental_3_end -- +7000 + +-- !incremental_earliest_3 -- +3000 + +-- !incremental_2_3 -- +1000 + +-- !incremental_4_end -- +6000 + +-- !incremental_earliest_4 -- +4000 + +-- !incremental_3_4 -- +1000 + +-- !incremental_5_end -- +5000 + +-- !incremental_earliest_5 -- +5000 + +-- !incremental_4_5 -- +1000 + +-- !incremental_6_end -- +4000 + +-- !incremental_earliest_6 -- +6000 + +-- !incremental_5_6 -- +1000 + +-- !incremental_7_end -- +3000 + +-- !incremental_earliest_7 -- +7000 + +-- !incremental_6_7 -- +1000 + +-- !incremental_8_end -- +2000 + +-- !incremental_earliest_8 -- +8000 + +-- !incremental_7_8 -- +1000 + +-- !incremental_9_end -- +1000 + +-- !incremental_earliest_9 -- +9000 + +-- !incremental_8_9 -- +1000 + +-- !incremental_10_end -- +0 + +-- !incremental_earliest_10 -- +10000 + +-- !incremental_9_10 -- +1000 + diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out b/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out index 12dd0cf086d3f0..da7273d4c14ef9 100644 --- a/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out +++ b/regression-test/data/external_table_p2/hudi/test_hudi_schema_evolution.out @@ -31,3 +31,35 @@ 20241118012149007 20241118012149007_0_4 5 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 5 Eva {"age":31.5, "address":"Chengdu"} 20241118012149007 20241118012149007_0_5 6 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 6 Frank {"age":29.2, "address":"Wuhan"} +-- !adding_simple_columns_table -- +20241118012126237 20241118012126237_0_1 1 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 1 Alice \N +20241118012126237 20241118012126237_0_0 2 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 2 Bob \N +20241118012126237 20241118012126237_0_2 3 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 3 Cathy \N +20241118012132306 20241118012132306_0_3 4 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 4 David 25 +20241118012132306 20241118012132306_0_4 5 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 5 Eva 30 +20241118012132306 20241118012132306_0_5 6 5166112a-90d8-4ba8-8646-337fbeb2a375-0_0-35-121_20241118012132306.parquet 6 Frank 28 + +-- !altering_simple_columns_table -- +20241118012136512 20241118012136512_0_0 1 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 1 Alice 25.0 +20241118012136512 20241118012136512_0_2 2 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 2 Bob 30.0 +20241118012136512 20241118012136512_0_1 3 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 3 Cathy 28.0 +20241118012138287 20241118012138287_0_3 4 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 4 David 26.0 +20241118012138287 20241118012138287_0_4 5 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 5 Eva 31.5 +20241118012138287 20241118012138287_0_5 6 203f0f43-ae9d-4c17-8d5d-834f0dbc62c9-0_0-78-246_20241118012138287.parquet 6 Frank 29.2 + +-- !adding_complex_columns_table -- +20241118012144831 20241118012144831_0_1 1 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 1 Alice {"age":25, "address":"Guangzhou", "email":null} +20241118012144831 20241118012144831_0_0 2 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 2 Bob {"age":30, "address":"Shanghai", "email":null} +20241118012144831 20241118012144831_0_2 3 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 3 Cathy {"age":28, "address":"Beijing", "email":null} +20241118012146150 20241118012146150_0_3 4 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 4 David {"age":25, "address":"Shenzhen", "email":"david@example.com"} +20241118012146150 20241118012146150_0_4 5 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 5 Eva {"age":30, "address":"Chengdu", "email":"eva@example.com"} +20241118012146150 20241118012146150_0_5 6 3c038df9-a652-4878-9b8a-221ae443448e-0_0-165-497_20241118012146150.parquet 6 Frank {"age":28, "address":"Wuhan", "email":"frank@example.com"} + +-- !altering_complex_columns_table -- +20241118012147879 20241118012147879_0_0 1 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 1 Alice {"age":25, "address":"Guangzhou"} +20241118012147879 20241118012147879_0_2 2 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 2 Bob {"age":30, "address":"Shanghai"} +20241118012147879 20241118012147879_0_1 3 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 3 Cathy {"age":28, "address":"Beijing"} +20241118012149007 20241118012149007_0_3 4 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 4 David {"age":26, "address":"Shenzhen"} +20241118012149007 20241118012149007_0_4 5 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 5 Eva {"age":31.5, "address":"Chengdu"} +20241118012149007 20241118012149007_0_5 6 185d101f-a484-45ce-b236-03ccd33c521b-0_0-208-622_20241118012149007.parquet 6 Frank {"age":29.2, "address":"Wuhan"} + diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out b/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out index efad67ffbfa8c4..1e151c2a86fa20 100644 Binary files a/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out and b/regression-test/data/external_table_p2/hudi/test_hudi_snapshot.out differ diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out index a9b5d23595a8e3..00d15805baf04e 100644 --- a/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out +++ b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out @@ -119,3 +119,123 @@ -- !timetravel10 -- 10000 +-- !timetravel1 -- +1000 + +-- !timetravel2 -- +2000 + +-- !timetravel3 -- +3000 + +-- !timetravel4 -- +4000 + +-- !timetravel5 -- +5000 + +-- !timetravel6 -- +6000 + +-- !timetravel7 -- +7000 + +-- !timetravel8 -- +8000 + +-- !timetravel9 -- +9000 + +-- !timetravel10 -- +10000 + +-- !timetravel1 -- +1000 + +-- !timetravel2 -- +2000 + +-- !timetravel3 -- +3000 + +-- !timetravel4 -- +4000 + +-- !timetravel5 -- +5000 + +-- !timetravel6 -- +6000 + +-- !timetravel7 -- +7000 + +-- !timetravel8 -- +8000 + +-- !timetravel9 -- +9000 + +-- !timetravel10 -- +10000 + +-- !timetravel1 -- +1000 + +-- !timetravel2 -- +2000 + +-- !timetravel3 -- +3000 + +-- !timetravel4 -- +4000 + +-- !timetravel5 -- +5000 + +-- !timetravel6 -- +6000 + +-- !timetravel7 -- +7000 + +-- !timetravel8 -- +8000 + +-- !timetravel9 -- +9000 + +-- !timetravel10 -- +10000 + +-- !timetravel1 -- +1000 + +-- !timetravel2 -- +2000 + +-- !timetravel3 -- +3000 + +-- !timetravel4 -- +4000 + +-- !timetravel5 -- +5000 + +-- !timetravel6 -- +6000 + +-- !timetravel7 -- +7000 + +-- !timetravel8 -- +8000 + +-- !timetravel9 -- +9000 + +-- !timetravel10 -- +10000 + diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy index f2082ef89c7a50..149eecf5817bd4 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_catalog.groovy @@ -36,4 +36,4 @@ suite("test_hudi_catalog", "p2,external,hudi,external_remote,external_remote_hud def tables = sql """ show tables; """ assertTrue(tables.size() > 0) sql """drop catalog if exists ${catalog_name};""" -} \ No newline at end of file +} diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy index 8cc1d2a852b8c4..885903646cc5b5 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy @@ -60,7 +60,6 @@ suite("test_hudi_incremental", "p2,external,hudi,external_remote,external_remote "20241114152009764", "20241114152011901", ] - test_hudi_incremental_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_cow_partition order by _hoodie_commit_time;" def timestamps_cow_partition = [ @@ -75,7 +74,6 @@ suite("test_hudi_incremental", "p2,external,hudi,external_remote,external_remote "20241114152147114", "20241114152156417", ] - test_hudi_incremental_querys("user_activity_log_cow_partition", timestamps_cow_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_mor_non_partition order by _hoodie_commit_time;" def timestamps_mor_non_partition = [ @@ -90,7 +88,6 @@ suite("test_hudi_incremental", "p2,external,hudi,external_remote,external_remote "20241114152028770", "20241114152030746", ] - test_hudi_incremental_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_mor_partition order by _hoodie_commit_time;" def timestamps_mor_partition = [ @@ -105,7 +102,18 @@ suite("test_hudi_incremental", "p2,external,hudi,external_remote,external_remote "20241114152323587", "20241114152334111", ] + + test_hudi_incremental_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) + test_hudi_incremental_querys("user_activity_log_cow_partition", timestamps_cow_partition) + test_hudi_incremental_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) + test_hudi_incremental_querys("user_activity_log_mor_partition", timestamps_mor_partition) + sql """set force_jni_scanner=true;""" + // don't support incremental query for cow table by jni reader + // test_hudi_incremental_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) + // test_hudi_incremental_querys("user_activity_log_cow_partition", timestamps_cow_partition) + test_hudi_incremental_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) test_hudi_incremental_querys("user_activity_log_mor_partition", timestamps_mor_partition) + // sql """set force_jni_scanner=false;""" sql """drop catalog if exists ${catalog_name};""" -} \ No newline at end of file +} diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy index b247aaf492400d..0da88447cdef15 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy @@ -33,7 +33,18 @@ suite("test_hudi_schema_evolution", "p2,external,hudi,external_remote,external_r sql """ switch ${catalog_name};""" sql """ use regression_hudi;""" sql """ set enable_fallback_to_original_planner=false """ + + qt_adding_simple_columns_table """ select * from adding_simple_columns_table order by id """ + qt_altering_simple_columns_table """ select * from altering_simple_columns_table order by id """ + // qt_deleting_simple_columns_table """ select * from deleting_simple_columns_table order by id """ + // qt_renaming_simple_columns_table """ select * from renaming_simple_columns_table order by id """ + qt_adding_complex_columns_table """ select * from adding_complex_columns_table order by id """ + qt_altering_complex_columns_table """ select * from altering_complex_columns_table order by id """ + // qt_deleting_complex_columns_table """ select * from deleting_complex_columns_table order by id """ + // qt_renaming_complex_columns_table """ select * from renaming_complex_columns_table order by id """ + + sql """set force_jni_scanner = true;""" qt_adding_simple_columns_table """ select * from adding_simple_columns_table order by id """ qt_altering_simple_columns_table """ select * from altering_simple_columns_table order by id """ // qt_deleting_simple_columns_table """ select * from deleting_simple_columns_table order by id """ @@ -43,6 +54,7 @@ suite("test_hudi_schema_evolution", "p2,external,hudi,external_remote,external_r qt_altering_complex_columns_table """ select * from altering_complex_columns_table order by id """ // qt_deleting_complex_columns_table """ select * from deleting_complex_columns_table order by id """ // qt_renaming_complex_columns_table """ select * from renaming_complex_columns_table order by id """ + sql """set force_jni_scanner = false;""" sql """drop catalog if exists ${catalog_name};""" -} \ No newline at end of file +} diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy index 53c09e6d5a9031..89d89709b3c822 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy @@ -64,7 +64,7 @@ suite("test_hudi_snapshot", "p2,external,hudi,external_remote,external_remote_hu qt_q09 """SELECT * FROM ${table_name} WHERE struct_element(struct_element(address, 'coordinates'), 'latitude') BETWEEN 0 AND 100 AND struct_element(struct_element(address, 'coordinates'), 'longitude') BETWEEN 0 AND 100 ORDER BY event_time LIMIT 5;""" // Query records with ratings above a specific value and limit output - qt_q10 """SELECT * FROM ${table_name} WHERE rating > 4.5 ORDER BY rating DESC LIMIT 5;""" + qt_q10 """SELECT * FROM ${table_name} WHERE rating > 4.5 ORDER BY event_time DESC LIMIT 5;""" // Query all users' signup dates and limit output qt_q11 """SELECT user_id, signup_date FROM ${table_name} ORDER BY signup_date DESC LIMIT 10;""" @@ -79,13 +79,20 @@ suite("test_hudi_snapshot", "p2,external,hudi,external_remote,external_remote_hu qt_q14 """SELECT * FROM ${table_name} WHERE signup_date = '2024-01-15' ORDER BY user_id LIMIT 5;""" // Query the total count of purchases for each user and limit output - qt_q15 """SELECT user_id, array_size(purchases) AS purchase_count FROM ${table_name} ORDER BY purchase_count DESC LIMIT 5;""" + qt_q15 """SELECT user_id, array_size(purchases) AS purchase_count FROM ${table_name} ORDER BY user_id LIMIT 5;""" } + test_hudi_snapshot_querys("user_activity_log_mor_non_partition") + test_hudi_snapshot_querys("user_activity_log_mor_partition") test_hudi_snapshot_querys("user_activity_log_cow_non_partition") test_hudi_snapshot_querys("user_activity_log_cow_partition") + + sql """set force_jni_scanner=true;""" test_hudi_snapshot_querys("user_activity_log_mor_non_partition") test_hudi_snapshot_querys("user_activity_log_mor_partition") + test_hudi_snapshot_querys("user_activity_log_cow_non_partition") + test_hudi_snapshot_querys("user_activity_log_cow_partition") + sql """set force_jni_scanner=false;""" sql """drop catalog if exists ${catalog_name};""" -} \ No newline at end of file +} diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy index c1ba630e4a7d01..36309322558f52 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timestamp.groovy @@ -59,4 +59,4 @@ suite("test_hudi_timestamp", "p2,external,hudi,external_remote,external_remote_h // INSERT OVERWRITE hudi_table_with_timestamp VALUES // ('1', 'Alice', timestamp('2024-10-25 08:00:00')), // ('2', 'Bob', timestamp('2024-10-25 09:30:00')), -// ('3', 'Charlie', timestamp('2024-10-25 11:00:00')); \ No newline at end of file +// ('3', 'Charlie', timestamp('2024-10-25 11:00:00')); diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy index 4d458dc4381dcf..cceeaa412202c6 100644 --- a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy @@ -54,7 +54,6 @@ suite("test_hudi_timetravel", "p2,external,hudi,external_remote,external_remote_ "20241114152009764", "20241114152011901", ] - test_hudi_timetravel_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_cow_partition order by _hoodie_commit_time;" def timestamps_cow_partition = [ @@ -69,7 +68,6 @@ suite("test_hudi_timetravel", "p2,external,hudi,external_remote,external_remote_ "20241114152147114", "20241114152156417", ] - test_hudi_timetravel_querys("user_activity_log_cow_partition", timestamps_cow_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_mor_non_partition order by _hoodie_commit_time;" def timestamps_mor_non_partition = [ @@ -84,7 +82,6 @@ suite("test_hudi_timetravel", "p2,external,hudi,external_remote,external_remote_ "20241114152028770", "20241114152030746", ] - test_hudi_timetravel_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) // spark-sql "select distinct _hoodie_commit_time from user_activity_log_mor_partition order by _hoodie_commit_time;" def timestamps_mor_partition = [ @@ -99,7 +96,17 @@ suite("test_hudi_timetravel", "p2,external,hudi,external_remote,external_remote_ "20241114152323587", "20241114152334111", ] + + test_hudi_timetravel_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) + test_hudi_timetravel_querys("user_activity_log_cow_partition", timestamps_cow_partition) + test_hudi_timetravel_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) + test_hudi_timetravel_querys("user_activity_log_mor_partition", timestamps_mor_partition) + sql """set force_jni_scanner=true;""" + test_hudi_timetravel_querys("user_activity_log_cow_non_partition", timestamps_cow_non_partition) + test_hudi_timetravel_querys("user_activity_log_cow_partition", timestamps_cow_partition) + test_hudi_timetravel_querys("user_activity_log_mor_non_partition", timestamps_mor_non_partition) test_hudi_timetravel_querys("user_activity_log_mor_partition", timestamps_mor_partition) + sql """set force_jni_scanner=false;""" sql """drop catalog if exists ${catalog_name};""" -} \ No newline at end of file +}