Skip to content

Commit

Permalink
[enchement](mc)mc catalog append netowrk config (#44194) (#45149)
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter authored Dec 7, 2024
1 parent 2543af8 commit b332217
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 9 deletions.
6 changes: 5 additions & 1 deletion be/src/vec/exec/format/table/max_compute_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des
{"start_offset", std::to_string(_range.start_offset)},
{"split_size", std::to_string(_range.size)},
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()}};
{"columns_types", columns_types.str()},

{"connect_timeout", std::to_string(_max_compute_params.connect_timeout)},
{"read_timeout", std::to_string(_max_compute_params.read_timeout)},
{"retry_count", std::to_string(_max_compute_params.retry_times)}};
_jni_connector = std::make_unique<JniConnector>(
"org/apache/doris/maxcompute/MaxComputeJniScanner", params, column_names);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.table.configuration.CompressionCodec;
import com.aliyun.odps.table.configuration.ReaderOptions;
import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.enviroment.Credentials;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import com.aliyun.odps.table.read.SplitReader;
Expand Down Expand Up @@ -67,6 +68,10 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String SCAN_SERIALIZER = "scan_serializer";
private static final String TIME_ZONE = "time_zone";

private static final String CONNECT_TIMEOUT = "connect_timeout";
private static final String READ_TIMEOUT = "read_timeout";
private static final String RETRY_COUNT = "retry_count";

private enum SplitType {
BYTE_SIZE,
ROW_OFFSET
Expand Down Expand Up @@ -136,16 +141,40 @@ public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount())
.withAppAccount(odps.getAppAccount()).build();


int connectTimeout = 10; // 10s
if (!Strings.isNullOrEmpty(params.get(CONNECT_TIMEOUT))) {
connectTimeout = Integer.parseInt(params.get(CONNECT_TIMEOUT));
}

int readTimeout = 120; // 120s
if (!Strings.isNullOrEmpty(params.get(READ_TIMEOUT))) {
readTimeout = Integer.parseInt(params.get(READ_TIMEOUT));
}

int retryTimes = 4; // 4 times
if (!Strings.isNullOrEmpty(params.get(RETRY_COUNT))) {
retryTimes = Integer.parseInt(params.get(RETRY_COUNT));
}

RestOptions restOptions = RestOptions.newBuilder()
.withConnectTimeout(connectTimeout)
.withReadTimeout(readTimeout)
.withRetryTimes(retryTimes).build();

settings = EnvironmentSettings.newBuilder()
.withCredentials(credentials)
.withServiceEndpoint(odps.getEndpoint())
.withQuotaName(quota)
.withRestOptions(restOptions)
.build();

try {
scan = (TableBatchReadSession) deserialize(scanSerializer);
} catch (Exception e) {
LOG.info("deserialize TableBatchReadSession failed.", e);
String errorMsg = "Failed to deserialize table batch read session.";
LOG.warn(errorMsg, e);
throw new IllegalArgumentException(errorMsg, e);
}
}

Expand Down Expand Up @@ -176,11 +205,11 @@ public void open() throws IOException {
.withReuseBatch(true)
.build());

} catch (IOException e) {
LOG.info("createArrowReader failed.", e);
} catch (Exception e) {
String errorMsg = "MaxComputeJniScanner Failed to open table batch read session.";
LOG.warn(errorMsg, e);
close();
throw new IOException(e.getMessage(), e);
throw new IOException(errorMsg, e);
}
}

Expand Down Expand Up @@ -215,8 +244,9 @@ private int readVectors(int expectedRows) throws IOException {
break;
}
} catch (Exception e) {
LOG.info("currentSplitReader hasNext fail", e);
break;
String errorMsg = "MaxComputeJniScanner readVectors hasNext fail";
LOG.warn(errorMsg, e);
throw new IOException(e.getMessage(), e);
}

try {
Expand All @@ -241,7 +271,10 @@ private int readVectors(int expectedRows) throws IOException {
}
curReadRows += batchRows;
} catch (Exception e) {
throw new RuntimeException("Fail to read arrow data, reason: " + e.getMessage(), e);
String errorMsg = String.format("MaxComputeJniScanner Fail to read arrow data. "
+ "curReadRows = {}, expectedRows = {}", curReadRows, expectedRows);
LOG.warn(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
}
return curReadRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.security.SecurityManager;
import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.configuration.SplitOptions;
import com.aliyun.odps.table.enviroment.Credentials;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
Expand Down Expand Up @@ -71,6 +72,10 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
private long splitRowCount;
private long splitByteSize;

private int connectTimeout;
private int readTimeout;
private int retryTimes;

private static final Map<String, ZoneId> REGION_ZONE_MAP;
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
MCProperties.PROJECT,
Expand Down Expand Up @@ -178,6 +183,17 @@ protected void initLocalObjectsImpl() {
.build();
}

connectTimeout = Integer.parseInt(
props.getOrDefault(MCProperties.CONNECT_TIMEOUT, MCProperties.DEFAULT_CONNECT_TIMEOUT));
readTimeout = Integer.parseInt(
props.getOrDefault(MCProperties.READ_TIMEOUT, MCProperties.DEFAULT_READ_TIMEOUT));
retryTimes = Integer.parseInt(
props.getOrDefault(MCProperties.RETRY_COUNT, MCProperties.DEFAULT_RETRY_COUNT));

RestOptions restOptions = RestOptions.newBuilder()
.withConnectTimeout(connectTimeout)
.withReadTimeout(readTimeout)
.withRetryTimes(retryTimes).build();

CloudCredential credential = MCProperties.getCredential(props);
accessKey = credential.getAccessKey();
Expand All @@ -196,6 +212,7 @@ protected void initLocalObjectsImpl() {
.withCredentials(credentials)
.withServiceEndpoint(odps.getEndpoint())
.withQuotaName(quota)
.withRestOptions(restOptions)
.build();
}

Expand Down Expand Up @@ -304,6 +321,21 @@ public String getDefaultProject() {
return defaultProject;
}

public int getRetryTimes() {
makeSureInitialized();
return retryTimes;
}

public int getConnectTimeout() {
makeSureInitialized();
return connectTimeout;
}

public int getReadTimeout() {
makeSureInitialized();
return readTimeout;
}

public ZoneId getProjectDateTimeZone() {
makeSureInitialized();

Expand Down Expand Up @@ -385,6 +417,31 @@ public void checkProperties() throws DdlException {
+ MCProperties.SPLIT_ROW_COUNT + "must be an integer");
}


try {
connectTimeout = Integer.parseInt(
props.getOrDefault(MCProperties.CONNECT_TIMEOUT, MCProperties.DEFAULT_CONNECT_TIMEOUT));
readTimeout = Integer.parseInt(
props.getOrDefault(MCProperties.READ_TIMEOUT, MCProperties.DEFAULT_READ_TIMEOUT));
retryTimes = Integer.parseInt(
props.getOrDefault(MCProperties.RETRY_COUNT, MCProperties.DEFAULT_RETRY_COUNT));
if (connectTimeout <= 0) {
throw new DdlException(MCProperties.CONNECT_TIMEOUT + " must be greater than 0");
}

if (readTimeout <= 0) {
throw new DdlException(MCProperties.READ_TIMEOUT + " must be greater than 0");
}

if (retryTimes <= 0) {
throw new DdlException(MCProperties.RETRY_COUNT + " must be greater than 0");
}

} catch (NumberFormatException e) {
throw new DdlException("property " + MCProperties.CONNECT_TIMEOUT + "/"
+ MCProperties.READ_TIMEOUT + "/" + MCProperties.RETRY_COUNT + "must be an integer");
}

CloudCredential credential = MCProperties.getCredential(props);
if (!credential.isWhole()) {
throw new DdlException("Max-Compute credential properties '"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
private static final LocationPath ROW_OFFSET_PATH = new LocationPath("/row_offset", Maps.newHashMap());
private static final LocationPath BYTE_SIZE_PATH = new LocationPath("/byte_size", Maps.newHashMap());

private int connectTimeout;
private int readTimeout;
private int retryTimes;

@Setter
private SelectedPartitions selectedPartitions = null;

Expand Down Expand Up @@ -127,6 +131,11 @@ private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeS
fileDesc.setPartitionSpec("deprecated");
fileDesc.setTableBatchReadSession(maxComputeSplit.scanSerialize);
fileDesc.setSessionId(maxComputeSplit.getSessionId());

fileDesc.setReadTimeout(readTimeout);
fileDesc.setConnectTimeout(connectTimeout);
fileDesc.setRetryTimes(retryTimes);

tableFormatFileDesc.setMaxComputeParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
rangeDesc.setPath("[ " + maxComputeSplit.getStart() + " , " + maxComputeSplit.getLength() + " ]");
Expand Down Expand Up @@ -477,6 +486,10 @@ public List<Split> getSplits() throws UserException {

MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();

readTimeout = mcCatalog.getReadTimeout();
connectTimeout = mcCatalog.getConnectTimeout();
retryTimes = mcCatalog.getRetryTimes();

if (mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {

for (com.aliyun.odps.table.read.split.InputSplit split : assigner.getAllSplits()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public class MCProperties extends BaseProperties {
public static final String SPLIT_ROW_COUNT = "mc.split_row_count";
public static final String DEFAULT_SPLIT_ROW_COUNT = "1048576"; // 256 * 4096

public static final String CONNECT_TIMEOUT = "mc.connect_timeout";
public static final String READ_TIMEOUT = "mc.read_timeout";
public static final String RETRY_COUNT = "mc.retry_count";

public static final String DEFAULT_CONNECT_TIMEOUT = "10"; // 10s
public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times

public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
Expand Down
4 changes: 4 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ struct TMaxComputeFileDesc {
1: optional string partition_spec // deprecated
2: optional string session_id
3: optional string table_batch_read_session
// for mc network configuration
4: optional i32 connect_timeout
5: optional i32 read_timeout
6: optional i32 retry_times
}

struct THudiFileDesc {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,10 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot
order_qt_multi_partition_q6 """ select max(pt), yy, mm from multi_partitions where yy = '2023' and mm='08' group by yy, mm order by yy, mm; """
order_qt_multi_partition_q7 """ select count(*) from multi_partitions where yy < '2023' or dd < '03'; """
order_qt_multi_partition_q8 """ select count(*) from multi_partitions where pt>=3; """
order_qt_multi_partition_q9 """ select city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null; """

//`finished_time is not null` => com.aliyun.odps.OdpsException: ODPS-0010000:System internal error - fuxi job failed, caused by: timestamp_ntz
// order_qt_multi_partition_q9 """ select city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null; """

order_qt_multi_partition_q10 """ select pt, yy, mm, dd from multi_partitions where pt >= 2 and create_time > '2023-08-03 03:11:00' order by pt, yy, mm, dd; """


Expand Down

0 comments on commit b332217

Please sign in to comment.