Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(database-change): supports the execution of sql by downloading files from a separate bucket #3436

Open
wants to merge 9 commits into
base: feat/obcloud_202409
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public void changeQuartzJob(ChangeQuartJobParam req) {
pauseJob(jobKey);
break;
}
case TERMINATE: {
case TERMINATE:
case DELETE: {
deleteJob(jobKey);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.StatementCallback;

import com.amazonaws.util.CollectionUtils;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use apache commons instead.

import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.common.util.CSVUtils;
import com.oceanbase.odc.common.util.StringUtils;
Expand Down Expand Up @@ -117,13 +118,13 @@ protected void doInit(JobContext context) {

@Override
protected boolean doStart(JobContext context) throws Exception {
String sqlContent = parameters.getSqlContent();
log.info("sql content={}", sqlContent);
// todo: Download the attachment that needs to execute SQL from OSS
SqlCommentProcessor processor = ConnectionSessionUtil.getSqlCommentProcessor(connectionSession);

List<OffsetString> offsetStringList = processor.split(new StringBuffer(), sqlContent);
result.setTotalStatements(offsetStringList.size());
if (CollectionUtils.isNullOrEmpty(parameters.getSqlObjectIds())
&& StringUtils.isBlank(parameters.getSqlContent())) {
log.warn("Sql content and sql object id can not be null at the same time.");
return false;
}
List<OffsetString> offsetStringList = getSplitSql();

for (int i = 0; i <= offsetStringList.size() - 1; i++) {
OffsetString offsetString = offsetStringList.get(i);
Expand Down Expand Up @@ -163,6 +164,47 @@ protected boolean doStart(JobContext context) throws Exception {
return true;
}

private List<OffsetString> getSplitSql() {
SqlCommentProcessor processor = ConnectionSessionUtil.getSqlCommentProcessor(connectionSession);
StringBuffer buffer = new StringBuffer();
List<OffsetString> splitSqlList = new ArrayList<>();

if (CollectionUtils.isNullOrEmpty(parameters.getSqlObjectIds())) {
String sqlContent = parameters.getSqlContent();
log.info("SQL content={}", sqlContent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sqlContent may too long for log output


List<OffsetString> offsetStringList = processor.split(buffer, sqlContent);
result.setTotalStatements(offsetStringList.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result.setTotalStatements may set out of getSplitSql method, responsibility should isolated


return offsetStringList;
}

CloudObjectStorageService cloudObjectStorageService = getCloudObjectStorageService();

if (Objects.isNull(cloudObjectStorageService) || !cloudObjectStorageService.supported()) {
log.warn("Cloud object storage service not supported.");
throw new UnexpectedException("Cloud object storage service not supported");
}

for (String objectId : parameters.getSqlObjectIds()) {
try {
byte[] bytes = cloudObjectStorageService.readContent(objectId);
if (Objects.isNull(bytes)) {
log.warn("Read content from cloud storage returned null, objectId={}", objectId);
continue;
}
log.info("successfully read content from cloud storage, objectId={}", objectId);
String sqlContent = new String(bytes, StandardCharsets.UTF_8);
List<OffsetString> offsetStringList = processor.split(buffer, sqlContent);
splitSqlList.addAll(offsetStringList);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read all sql statement into memory may cost a lot of memory,
the database change task already contains stream read/split logic, sql plan may follow the same flow.

} catch (IOException e) {
log.warn("Read content from cloud storage failed, objectId={}", objectId, e);
}
}

this.result.setTotalStatements(splitSqlList.size());
return splitSqlList;
}


@Override
Expand Down