Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(database-change): supports the execution of sql by downloading files from a separate bucket #3436

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public void changeQuartzJob(ChangeQuartJobParam req) {
pauseJob(jobKey);
break;
}
case TERMINATE: {
case TERMINATE:
case DELETE: {
deleteJob(jobKey);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.StatementCallback;

import com.amazonaws.util.CollectionUtils;
kiko-art marked this conversation as resolved.
Show resolved Hide resolved
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.common.util.CSVUtils;
import com.oceanbase.odc.common.util.StringUtils;
Expand Down Expand Up @@ -117,13 +118,13 @@ protected void doInit(JobContext context) {

@Override
protected boolean doStart(JobContext context) throws Exception {
String sqlContent = parameters.getSqlContent();
log.info("sql content={}", sqlContent);
// todo: Download the attachment that needs to execute SQL from OSS
SqlCommentProcessor processor = ConnectionSessionUtil.getSqlCommentProcessor(connectionSession);

List<OffsetString> offsetStringList = processor.split(new StringBuffer(), sqlContent);
result.setTotalStatements(offsetStringList.size());
if (CollectionUtils.isNullOrEmpty(parameters.getSqlObjectIds())
&& StringUtils.isBlank(parameters.getSqlContent())) {
log.warn("Sql content and sql object id can not be null at the same time.");
return false;
}
List<OffsetString> offsetStringList = getSplitSql();

for (int i = 0; i <= offsetStringList.size() - 1; i++) {
OffsetString offsetString = offsetStringList.get(i);
Expand Down Expand Up @@ -163,6 +164,47 @@ protected boolean doStart(JobContext context) throws Exception {
return true;
}

private List<OffsetString> getSplitSql() {
SqlCommentProcessor processor = ConnectionSessionUtil.getSqlCommentProcessor(connectionSession);
StringBuffer buffer = new StringBuffer();
List<OffsetString> splitSqlList = new ArrayList<>();

if (CollectionUtils.isNullOrEmpty(parameters.getSqlObjectIds())) {
String sqlContent = parameters.getSqlContent();
log.info("SQL content={}", sqlContent);
kiko-art marked this conversation as resolved.
Show resolved Hide resolved

List<OffsetString> offsetStringList = processor.split(buffer, sqlContent);
result.setTotalStatements(offsetStringList.size());
kiko-art marked this conversation as resolved.
Show resolved Hide resolved

return offsetStringList;
}

CloudObjectStorageService cloudObjectStorageService = getCloudObjectStorageService();

if (Objects.isNull(cloudObjectStorageService) || !cloudObjectStorageService.supported()) {
log.warn("Cloud object storage service not supported.");
throw new UnexpectedException("Cloud object storage service not supported");
}

for (String objectId : parameters.getSqlObjectIds()) {
try {
byte[] bytes = cloudObjectStorageService.readContent(objectId);
if (Objects.isNull(bytes)) {
log.warn("Read content from cloud storage returned null, objectId={}", objectId);
continue;
}
log.info("successfully read content from cloud storage, objectId={}", objectId);
String sqlContent = new String(bytes, StandardCharsets.UTF_8);
List<OffsetString> offsetStringList = processor.split(buffer, sqlContent);
splitSqlList.addAll(offsetStringList);
kiko-art marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException e) {
log.warn("Read content from cloud storage failed, objectId={}", objectId, e);
}
}

this.result.setTotalStatements(splitSqlList.size());
return splitSqlList;
}


@Override
Expand Down