From e529894653d04f06d9c809ff8bb29a7c01b6bd3f Mon Sep 17 00:00:00 2001 From: daidai Date: Fri, 6 Dec 2024 10:33:19 +0800 Subject: [PATCH] [enchement](mc)mc catalog append netowrk config (#44194) ### What problem does this PR solve? Since there may be some network problems when accessing maxcompute, in order to provide a better user experience, we introduce some parameters to help users better adapt to the network. You can add these parameters when creating a catalog: `mc.connect_timeout` : Connection timeout, default is 10s. `mc.read_timeout` : Read timeout,default is 120s. `mc.retry_count` : Number of retries after failure,default is 4. ### Release note MaxCompute catalog add three parameters : `mc.connect_timeout` , `mc.read_timeout`,`mc.retry_count` help users better adapt to the network between MaxCompute and Doris. --- .../format/table/max_compute_jni_reader.cpp | 6 +- .../maxcompute/MaxComputeJniScanner.java | 47 ++++++++++++--- .../maxcompute/MaxComputeExternalCatalog.java | 57 +++++++++++++++++++ .../maxcompute/source/MaxComputeScanNode.java | 13 +++++ .../property/constants/MCProperties.java | 8 +++ gensrc/thrift/PlanNodes.thrift | 4 ++ .../test_external_catalog_maxcompute.groovy | 5 +- 7 files changed, 131 insertions(+), 9 deletions(-) diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp index fe014904060706..c69da28258e795 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp @@ -77,7 +77,11 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des {"start_offset", std::to_string(_range.start_offset)}, {"split_size", std::to_string(_range.size)}, {"required_fields", required_fields.str()}, - {"columns_types", columns_types.str()}}; + {"columns_types", columns_types.str()}, + + {"connect_timeout", std::to_string(_max_compute_params.connect_timeout)}, + {"read_timeout", std::to_string(_max_compute_params.read_timeout)}, + {"retry_count", std::to_string(_max_compute_params.retry_times)}}; _jni_connector = std::make_unique( "org/apache/doris/maxcompute/MaxComputeJniScanner", params, column_names); } diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 6cbed70adc7d46..d6325bdae4673a 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -25,6 +25,7 @@ import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.table.configuration.CompressionCodec; import com.aliyun.odps.table.configuration.ReaderOptions; +import com.aliyun.odps.table.configuration.RestOptions; import com.aliyun.odps.table.enviroment.Credentials; import com.aliyun.odps.table.enviroment.EnvironmentSettings; import com.aliyun.odps.table.read.SplitReader; @@ -67,6 +68,10 @@ public class MaxComputeJniScanner extends JniScanner { private static final String SCAN_SERIALIZER = "scan_serializer"; private static final String TIME_ZONE = "time_zone"; + private static final String CONNECT_TIMEOUT = "connect_timeout"; + private static final String READ_TIMEOUT = "read_timeout"; + private static final String RETRY_COUNT = "retry_count"; + private enum SplitType { BYTE_SIZE, ROW_OFFSET @@ -136,16 +141,40 @@ public MaxComputeJniScanner(int batchSize, Map params) { Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount()) .withAppAccount(odps.getAppAccount()).build(); + + int connectTimeout = 10; // 10s + if (!Strings.isNullOrEmpty(params.get(CONNECT_TIMEOUT))) { + connectTimeout = Integer.parseInt(params.get(CONNECT_TIMEOUT)); + } + + int readTimeout = 120; // 120s + if (!Strings.isNullOrEmpty(params.get(READ_TIMEOUT))) { + readTimeout = Integer.parseInt(params.get(READ_TIMEOUT)); + } + + int retryTimes = 4; // 4 times + if (!Strings.isNullOrEmpty(params.get(RETRY_COUNT))) { + retryTimes = Integer.parseInt(params.get(RETRY_COUNT)); + } + + RestOptions restOptions = RestOptions.newBuilder() + .withConnectTimeout(connectTimeout) + .withReadTimeout(readTimeout) + .withRetryTimes(retryTimes).build(); + settings = EnvironmentSettings.newBuilder() .withCredentials(credentials) .withServiceEndpoint(odps.getEndpoint()) .withQuotaName(quota) + .withRestOptions(restOptions) .build(); try { scan = (TableBatchReadSession) deserialize(scanSerializer); } catch (Exception e) { - LOG.info("deserialize TableBatchReadSession failed.", e); + String errorMsg = "Failed to deserialize table batch read session."; + LOG.warn(errorMsg, e); + throw new IllegalArgumentException(errorMsg, e); } } @@ -176,11 +205,11 @@ public void open() throws IOException { .withReuseBatch(true) .build()); - } catch (IOException e) { - LOG.info("createArrowReader failed.", e); } catch (Exception e) { + String errorMsg = "MaxComputeJniScanner Failed to open table batch read session."; + LOG.warn(errorMsg, e); close(); - throw new IOException(e.getMessage(), e); + throw new IOException(errorMsg, e); } } @@ -215,8 +244,9 @@ private int readVectors(int expectedRows) throws IOException { break; } } catch (Exception e) { - LOG.info("currentSplitReader hasNext fail", e); - break; + String errorMsg = "MaxComputeJniScanner readVectors hasNext fail"; + LOG.warn(errorMsg, e); + throw new IOException(e.getMessage(), e); } try { @@ -241,7 +271,10 @@ private int readVectors(int expectedRows) throws IOException { } curReadRows += batchRows; } catch (Exception e) { - throw new RuntimeException("Fail to read arrow data, reason: " + e.getMessage(), e); + String errorMsg = String.format("MaxComputeJniScanner Fail to read arrow data. " + + "curReadRows = {}, expectedRows = {}", curReadRows, expectedRows); + LOG.warn(errorMsg, e); + throw new RuntimeException(errorMsg, e); } } return curReadRows; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index e6cd77103dbc3b..06c1e55dcf6f4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -33,6 +33,7 @@ import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.security.SecurityManager; +import com.aliyun.odps.table.configuration.RestOptions; import com.aliyun.odps.table.configuration.SplitOptions; import com.aliyun.odps.table.enviroment.Credentials; import com.aliyun.odps.table.enviroment.EnvironmentSettings; @@ -71,6 +72,10 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { private long splitRowCount; private long splitByteSize; + private int connectTimeout; + private int readTimeout; + private int retryTimes; + private static final Map REGION_ZONE_MAP; private static final List REQUIRED_PROPERTIES = ImmutableList.of( MCProperties.PROJECT, @@ -178,6 +183,17 @@ protected void initLocalObjectsImpl() { .build(); } + connectTimeout = Integer.parseInt( + props.getOrDefault(MCProperties.CONNECT_TIMEOUT, MCProperties.DEFAULT_CONNECT_TIMEOUT)); + readTimeout = Integer.parseInt( + props.getOrDefault(MCProperties.READ_TIMEOUT, MCProperties.DEFAULT_READ_TIMEOUT)); + retryTimes = Integer.parseInt( + props.getOrDefault(MCProperties.RETRY_COUNT, MCProperties.DEFAULT_RETRY_COUNT)); + + RestOptions restOptions = RestOptions.newBuilder() + .withConnectTimeout(connectTimeout) + .withReadTimeout(readTimeout) + .withRetryTimes(retryTimes).build(); CloudCredential credential = MCProperties.getCredential(props); accessKey = credential.getAccessKey(); @@ -196,6 +212,7 @@ protected void initLocalObjectsImpl() { .withCredentials(credentials) .withServiceEndpoint(odps.getEndpoint()) .withQuotaName(quota) + .withRestOptions(restOptions) .build(); } @@ -304,6 +321,21 @@ public String getDefaultProject() { return defaultProject; } + public int getRetryTimes() { + makeSureInitialized(); + return retryTimes; + } + + public int getConnectTimeout() { + makeSureInitialized(); + return connectTimeout; + } + + public int getReadTimeout() { + makeSureInitialized(); + return readTimeout; + } + public ZoneId getProjectDateTimeZone() { makeSureInitialized(); @@ -385,6 +417,31 @@ public void checkProperties() throws DdlException { + MCProperties.SPLIT_ROW_COUNT + "must be an integer"); } + + try { + connectTimeout = Integer.parseInt( + props.getOrDefault(MCProperties.CONNECT_TIMEOUT, MCProperties.DEFAULT_CONNECT_TIMEOUT)); + readTimeout = Integer.parseInt( + props.getOrDefault(MCProperties.READ_TIMEOUT, MCProperties.DEFAULT_READ_TIMEOUT)); + retryTimes = Integer.parseInt( + props.getOrDefault(MCProperties.RETRY_COUNT, MCProperties.DEFAULT_RETRY_COUNT)); + if (connectTimeout <= 0) { + throw new DdlException(MCProperties.CONNECT_TIMEOUT + " must be greater than 0"); + } + + if (readTimeout <= 0) { + throw new DdlException(MCProperties.READ_TIMEOUT + " must be greater than 0"); + } + + if (retryTimes <= 0) { + throw new DdlException(MCProperties.RETRY_COUNT + " must be greater than 0"); + } + + } catch (NumberFormatException e) { + throw new DdlException("property " + MCProperties.CONNECT_TIMEOUT + "/" + + MCProperties.READ_TIMEOUT + "/" + MCProperties.RETRY_COUNT + "must be an integer"); + } + CloudCredential credential = MCProperties.getCredential(props); if (!credential.isWhole()) { throw new DdlException("Max-Compute credential properties '" diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index e177e9d8b7c88c..4ad971a5c64789 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -89,6 +89,10 @@ public class MaxComputeScanNode extends FileQueryScanNode { private static final LocationPath ROW_OFFSET_PATH = new LocationPath("/row_offset", Maps.newHashMap()); private static final LocationPath BYTE_SIZE_PATH = new LocationPath("/byte_size", Maps.newHashMap()); + private int connectTimeout; + private int readTimeout; + private int retryTimes; + @Setter private SelectedPartitions selectedPartitions = null; @@ -127,6 +131,11 @@ private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeS fileDesc.setPartitionSpec("deprecated"); fileDesc.setTableBatchReadSession(maxComputeSplit.scanSerialize); fileDesc.setSessionId(maxComputeSplit.getSessionId()); + + fileDesc.setReadTimeout(readTimeout); + fileDesc.setConnectTimeout(connectTimeout); + fileDesc.setRetryTimes(retryTimes); + tableFormatFileDesc.setMaxComputeParams(fileDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); rangeDesc.setPath("[ " + maxComputeSplit.getStart() + " , " + maxComputeSplit.getLength() + " ]"); @@ -477,6 +486,10 @@ public List getSplits() throws UserException { MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog(); + readTimeout = mcCatalog.getReadTimeout(); + connectTimeout = mcCatalog.getConnectTimeout(); + retryTimes = mcCatalog.getRetryTimes(); + if (mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) { for (com.aliyun.odps.table.read.split.InputSplit split : assigner.getAllSplits()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java index 20a77574fc7820..efbd01c14777de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java @@ -56,6 +56,14 @@ public class MCProperties extends BaseProperties { public static final String SPLIT_ROW_COUNT = "mc.split_row_count"; public static final String DEFAULT_SPLIT_ROW_COUNT = "1048576"; // 256 * 4096 + public static final String CONNECT_TIMEOUT = "mc.connect_timeout"; + public static final String READ_TIMEOUT = "mc.read_timeout"; + public static final String RETRY_COUNT = "mc.retry_count"; + + public static final String DEFAULT_CONNECT_TIMEOUT = "10"; // 10s + public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s + public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times + public static CloudCredential getCredential(Map props) { return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN); } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index a7f6d9ede24c20..1b873787765b7d 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -337,6 +337,10 @@ struct TMaxComputeFileDesc { 1: optional string partition_spec // deprecated 2: optional string session_id 3: optional string table_batch_read_session + // for mc network configuration + 4: optional i32 connect_timeout + 5: optional i32 read_timeout + 6: optional i32 retry_times } struct THudiFileDesc { diff --git a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy index 3f0929b59ea96a..81133270fb6e60 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy @@ -388,7 +388,10 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot order_qt_multi_partition_q6 """ select max(pt), yy, mm from multi_partitions where yy = '2023' and mm='08' group by yy, mm order by yy, mm; """ order_qt_multi_partition_q7 """ select count(*) from multi_partitions where yy < '2023' or dd < '03'; """ order_qt_multi_partition_q8 """ select count(*) from multi_partitions where pt>=3; """ - order_qt_multi_partition_q9 """ select city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null; """ + + //`finished_time is not null` => com.aliyun.odps.OdpsException: ODPS-0010000:System internal error - fuxi job failed, caused by: timestamp_ntz + // order_qt_multi_partition_q9 """ select city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null; """ + order_qt_multi_partition_q10 """ select pt, yy, mm, dd from multi_partitions where pt >= 2 and create_time > '2023-08-03 03:11:00' order by pt, yy, mm, dd; """