Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enchement](mc)mc catalog append netowrk config #44194

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/vec/exec/format/table/max_compute_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des
{"start_offset", std::to_string(_range.start_offset)},
{"split_size", std::to_string(_range.size)},
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()}};
{"columns_types", columns_types.str()},

{"connect_timeout", std::to_string(_max_compute_params.connect_timeout)},
{"read_timeout", std::to_string(_max_compute_params.read_timeout)},
{"retry_count", std::to_string(_max_compute_params.retry_times)}};
_jni_connector = std::make_unique<JniConnector>(
"org/apache/doris/maxcompute/MaxComputeJniScanner", params, column_names);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.table.configuration.CompressionCodec;
import com.aliyun.odps.table.configuration.ReaderOptions;
import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.enviroment.Credentials;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import com.aliyun.odps.table.read.SplitReader;
Expand Down Expand Up @@ -67,6 +68,10 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String SCAN_SERIALIZER = "scan_serializer";
private static final String TIME_ZONE = "time_zone";

private static final String CONNECT_TIMEOUT = "connect_timeout";
private static final String READ_TIMEOUT = "read_timeout";
private static final String RETRY_COUNT = "retry_count";

private enum SplitType {
BYTE_SIZE,
ROW_OFFSET
Expand Down Expand Up @@ -136,16 +141,40 @@ public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount())
.withAppAccount(odps.getAppAccount()).build();


int connectTimeout = 10; // 10s
if (!Strings.isNullOrEmpty(params.get(CONNECT_TIMEOUT))) {
connectTimeout = Integer.parseInt(params.get(CONNECT_TIMEOUT));
}

int readTimeout = 120; // 120s
if (!Strings.isNullOrEmpty(params.get(READ_TIMEOUT))) {
readTimeout = Integer.parseInt(params.get(READ_TIMEOUT));
}

int retryTimes = 4; // 4 times
if (!Strings.isNullOrEmpty(params.get(RETRY_COUNT))) {
retryTimes = Integer.parseInt(params.get(RETRY_COUNT));
}

RestOptions restOptions = RestOptions.newBuilder()
.withConnectTimeout(connectTimeout)
.withReadTimeout(readTimeout)
.withRetryTimes(retryTimes).build();

settings = EnvironmentSettings.newBuilder()
.withCredentials(credentials)
.withServiceEndpoint(odps.getEndpoint())
.withQuotaName(quota)
.withRestOptions(restOptions)
.build();

try {
scan = (TableBatchReadSession) deserialize(scanSerializer);
} catch (Exception e) {
LOG.info("deserialize TableBatchReadSession failed.", e);
String errorMsg = "Failed to deserialize table batch read session.";
LOG.warn(errorMsg, e);
throw new IllegalArgumentException(errorMsg, e);
}
}

Expand Down Expand Up @@ -176,11 +205,11 @@ public void open() throws IOException {
.withReuseBatch(true)
.build());

} catch (IOException e) {
LOG.info("createArrowReader failed.", e);
} catch (Exception e) {
String errorMsg = "MaxComputeJniScanner Failed to open table batch read session.";
LOG.warn(errorMsg, e);
close();
throw new IOException(e.getMessage(), e);
throw new IOException(errorMsg, e);
}
}

Expand Down Expand Up @@ -215,8 +244,9 @@ private int readVectors(int expectedRows) throws IOException {
break;
}
} catch (Exception e) {
LOG.info("currentSplitReader hasNext fail", e);
break;
String errorMsg = "MaxComputeJniScanner readVectors hasNext fail";
LOG.warn(errorMsg, e);
throw new IOException(e.getMessage(), e);
}

try {
Expand All @@ -241,7 +271,10 @@ private int readVectors(int expectedRows) throws IOException {
}
curReadRows += batchRows;
} catch (Exception e) {
throw new RuntimeException("Fail to read arrow data, reason: " + e.getMessage(), e);
String errorMsg = String.format("MaxComputeJniScanner Fail to read arrow data. "
+ "curReadRows = {}, expectedRows = {}", curReadRows, expectedRows);
LOG.warn(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
}
return curReadRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.security.SecurityManager;
import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.configuration.SplitOptions;
import com.aliyun.odps.table.enviroment.Credentials;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
Expand Down Expand Up @@ -71,6 +72,10 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
private long splitRowCount;
private long splitByteSize;

private int connectTimeout;
private int readTimeout;
private int retryTimes;

private static final Map<String, ZoneId> REGION_ZONE_MAP;
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
MCProperties.PROJECT,
Expand Down Expand Up @@ -178,6 +183,17 @@ protected void initLocalObjectsImpl() {
.build();
}

connectTimeout = Integer.parseInt(
props.getOrDefault(MCProperties.CONNECT_TIMEOUT, MCProperties.DEFAULT_CONNECT_TIMEOUT));
readTimeout = Integer.parseInt(
props.getOrDefault(MCProperties.READ_TIMEOUT, MCProperties.DEFAULT_READ_TIMEOUT));
retryTimes = Integer.parseInt(
props.getOrDefault(MCProperties.RETRY_COUNT, MCProperties.DEFAULT_RETRY_COUNT));

RestOptions restOptions = RestOptions.newBuilder()
.withConnectTimeout(connectTimeout)
.withReadTimeout(readTimeout)
.withRetryTimes(retryTimes).build();

CloudCredential credential = MCProperties.getCredential(props);
accessKey = credential.getAccessKey();
Expand All @@ -196,6 +212,7 @@ protected void initLocalObjectsImpl() {
.withCredentials(credentials)
.withServiceEndpoint(odps.getEndpoint())
.withQuotaName(quota)
.withRestOptions(restOptions)
.build();
}

Expand Down Expand Up @@ -304,6 +321,21 @@ public String getDefaultProject() {
return defaultProject;
}

public int getRetryTimes() {
makeSureInitialized();
return retryTimes;
}

public int getConnectTimeout() {
makeSureInitialized();
return connectTimeout;
}

public int getReadTimeout() {
makeSureInitialized();
return readTimeout;
}

public ZoneId getProjectDateTimeZone() {
makeSureInitialized();

Expand Down Expand Up @@ -385,6 +417,31 @@ public void checkProperties() throws DdlException {
+ MCProperties.SPLIT_ROW_COUNT + "must be an integer");
}


try {
connectTimeout = Integer.parseInt(
props.getOrDefault(MCProperties.CONNECT_TIMEOUT, MCProperties.DEFAULT_CONNECT_TIMEOUT));
readTimeout = Integer.parseInt(
props.getOrDefault(MCProperties.READ_TIMEOUT, MCProperties.DEFAULT_READ_TIMEOUT));
retryTimes = Integer.parseInt(
props.getOrDefault(MCProperties.RETRY_COUNT, MCProperties.DEFAULT_RETRY_COUNT));
if (connectTimeout <= 0) {
throw new DdlException(MCProperties.CONNECT_TIMEOUT + " must be greater than 0");
}

if (readTimeout <= 0) {
throw new DdlException(MCProperties.READ_TIMEOUT + " must be greater than 0");
}

if (retryTimes <= 0) {
throw new DdlException(MCProperties.RETRY_COUNT + " must be greater than 0");
}

} catch (NumberFormatException e) {
throw new DdlException("property " + MCProperties.CONNECT_TIMEOUT + "/"
+ MCProperties.READ_TIMEOUT + "/" + MCProperties.RETRY_COUNT + "must be an integer");
}

CloudCredential credential = MCProperties.getCredential(props);
if (!credential.isWhole()) {
throw new DdlException("Max-Compute credential properties '"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
private static final LocationPath ROW_OFFSET_PATH = new LocationPath("/row_offset", Maps.newHashMap());
private static final LocationPath BYTE_SIZE_PATH = new LocationPath("/byte_size", Maps.newHashMap());

private int connectTimeout;
private int readTimeout;
private int retryTimes;

@Setter
private SelectedPartitions selectedPartitions = null;

Expand Down Expand Up @@ -127,6 +131,11 @@ private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeS
fileDesc.setPartitionSpec("deprecated");
fileDesc.setTableBatchReadSession(maxComputeSplit.scanSerialize);
fileDesc.setSessionId(maxComputeSplit.getSessionId());

fileDesc.setReadTimeout(readTimeout);
fileDesc.setConnectTimeout(connectTimeout);
fileDesc.setRetryTimes(retryTimes);

tableFormatFileDesc.setMaxComputeParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
rangeDesc.setPath("[ " + maxComputeSplit.getStart() + " , " + maxComputeSplit.getLength() + " ]");
Expand Down Expand Up @@ -477,6 +486,10 @@ public List<Split> getSplits() throws UserException {

MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();

readTimeout = mcCatalog.getReadTimeout();
connectTimeout = mcCatalog.getConnectTimeout();
retryTimes = mcCatalog.getRetryTimes();

if (mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {

for (com.aliyun.odps.table.read.split.InputSplit split : assigner.getAllSplits()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public class MCProperties extends BaseProperties {
public static final String SPLIT_ROW_COUNT = "mc.split_row_count";
public static final String DEFAULT_SPLIT_ROW_COUNT = "1048576"; // 256 * 4096

public static final String CONNECT_TIMEOUT = "mc.connect_timeout";
public static final String READ_TIMEOUT = "mc.read_timeout";
public static final String RETRY_COUNT = "mc.retry_count";

public static final String DEFAULT_CONNECT_TIMEOUT = "10"; // 10s
public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s
public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times

public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
Expand Down
4 changes: 4 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ struct TMaxComputeFileDesc {
1: optional string partition_spec // deprecated
2: optional string session_id
3: optional string table_batch_read_session
// for mc network configuration
4: optional i32 connect_timeout
5: optional i32 read_timeout
6: optional i32 retry_times
}

struct THudiFileDesc {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,10 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot
order_qt_multi_partition_q6 """ select max(pt), yy, mm from multi_partitions where yy = '2023' and mm='08' group by yy, mm order by yy, mm; """
order_qt_multi_partition_q7 """ select count(*) from multi_partitions where yy < '2023' or dd < '03'; """
order_qt_multi_partition_q8 """ select count(*) from multi_partitions where pt>=3; """
order_qt_multi_partition_q9 """ select city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null; """

//`finished_time is not null` => com.aliyun.odps.OdpsException: ODPS-0010000:System internal error - fuxi job failed, caused by: timestamp_ntz
// order_qt_multi_partition_q9 """ select city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd from multi_partitions where pt >= 2 and pt < 4 and finished_time is not null; """

order_qt_multi_partition_q10 """ select pt, yy, mm, dd from multi_partitions where pt >= 2 and create_time > '2023-08-03 03:11:00' order by pt, yy, mm, dd; """


Expand Down
Loading