diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/CloudObjectStorageClient.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/CloudObjectStorageClient.java index d6c6b6b969..42b49a6efd 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/CloudObjectStorageClient.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/CloudObjectStorageClient.java @@ -164,6 +164,21 @@ public List deleteObjects(List objectNames) { return deletedObjects; } + @Override + public InputStream getObject(String objectName) throws IOException { + verifySupported(); + boolean exist = internalEndpointCloudObjectStorage.doesObjectExist(getBucketName(), objectName); + if (!exist) { + throw new FileNotFoundException("File dose not exist, object name " + objectName); + } + try (InputStream inputStream = + internalEndpointCloudObjectStorage.getObject(getBucketName(), objectName).getObjectContent()) { + return inputStream; + } catch (Exception exception) { + log.warn("get object failed, objectName={}", objectName, exception); + throw new IOException(exception); + } + } /** * 文件上传方法,为了保证性能,如果文件大小小于10MB使用简单上传,如果文件大小大于10MB则使用分片上传功能 diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/LocalObjectStorageClient.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/LocalObjectStorageClient.java index 057da5ac05..6c5cc07738 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/LocalObjectStorageClient.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/LocalObjectStorageClient.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.HashSet; @@ -94,4 +95,10 @@ public List deleteObjects(List objectNames) throws IOException { blockOperator.batchDelete(objectNameSet); return new ArrayList<>(); } + + @Override + public InputStream getObject(String objectName) throws IOException { + throw new UnsupportedOperationException("Not supported yet."); + } + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/ObjectStorageClient.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/ObjectStorageClient.java index 28266c12fd..4701bf1a68 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/ObjectStorageClient.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/client/ObjectStorageClient.java @@ -17,6 +17,7 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.URL; import java.util.List; @@ -47,4 +48,6 @@ public interface ObjectStorageClient { void downloadToFile(@NotBlank String objectName, @NotNull File targetFile) throws IOException; List deleteObjects(@NotEmpty List objectNames) throws IOException; + + InputStream getObject(@NotBlank String objectName) throws IOException; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/CloudObjectStorageService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/CloudObjectStorageService.java index 07a2ef2ea3..fbdfebaf25 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/CloudObjectStorageService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/objectstorage/cloud/CloudObjectStorageService.java @@ -196,6 +196,16 @@ public File downloadToTempFile(@NotBlank String objectName) { } } + /** + * get object inputStream by objectName + * + * @param objectName objectName + * @return inputStream of object + */ + public InputStream getObject(@NotBlank String objectName) throws IOException { + return cloudObjectStorageClient.getObject(objectName); + } + ObjectStorageConfiguration getObjectStorageConfiguration() { return cloudObjectStorageClient.getObjectStorageConfiguration(); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/QuartzJobService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/QuartzJobService.java index 42549bea33..433a3e6d0d 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/QuartzJobService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/QuartzJobService.java @@ -93,7 +93,8 @@ public void changeQuartzJob(ChangeQuartJobParam req) { pauseJob(jobKey); break; } - case TERMINATE: { + case TERMINATE: + case DELETE: { deleteJob(jobKey); break; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlplan/model/SqlPlanTaskResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlplan/model/SqlPlanTaskResult.java index 0a05d3805d..8bc199d162 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlplan/model/SqlPlanTaskResult.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlplan/model/SqlPlanTaskResult.java @@ -33,6 +33,10 @@ public class SqlPlanTaskResult { private List failedRecord; + private String region; + + private String cloudProvider; + /** * sql execution json file download url */ diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/task/SqlPlanTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/task/SqlPlanTask.java index 788f6b7527..73d72a9e4b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/task/SqlPlanTask.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/task/SqlPlanTask.java @@ -15,16 +15,21 @@ */ package com.oceanbase.odc.service.task.executor.task; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; import org.springframework.jdbc.core.ConnectionCallback; import org.springframework.jdbc.core.StatementCallback; @@ -48,11 +53,12 @@ import com.oceanbase.odc.core.sql.execute.model.SqlTuple; import com.oceanbase.odc.core.sql.parser.AbstractSyntaxTreeFactories; import com.oceanbase.odc.core.sql.parser.AbstractSyntaxTreeFactory; -import com.oceanbase.odc.core.sql.split.OffsetString; import com.oceanbase.odc.core.sql.split.SqlCommentProcessor; +import com.oceanbase.odc.core.sql.split.SqlStatementIterator; import com.oceanbase.odc.service.common.FileManager; import com.oceanbase.odc.service.common.model.FileBucket; import com.oceanbase.odc.service.common.util.OdcFileUtil; +import com.oceanbase.odc.service.common.util.SqlUtils; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.objectstorage.cloud.CloudObjectStorageService; import com.oceanbase.odc.service.schedule.job.PublishSqlPlanJobReq; @@ -75,6 +81,8 @@ public class SqlPlanTask extends BaseTask { private PublishSqlPlanJobReq parameters; + private SqlStatementIterator sqlIterator; + private ConnectionSession connectionSession; private SyncJdbcExecutor executor; @@ -98,7 +106,12 @@ protected void doInit(JobContext context) { this.result = new SqlPlanTaskResult(); this.parameters = JobUtils.fromJson(getJobParameters().get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON), PublishSqlPlanJobReq.class); + JobContext jobContext = getJobContext(); + Map jobProperties = jobContext.getJobProperties(); + this.result.setRegion(jobProperties.get("region")); + this.result.setCloudProvider(jobProperties.get("cloudProvider")); this.connectionSession = generateSession(); + initSqlIterator(); this.executor = connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.CONSOLE_DS_KEY); long timeoutUs = TimeUnit.MILLISECONDS.toMicros(parameters.getTimeoutMillis()); PreConditions.notNull(timeoutUs, "timeoutUs"); @@ -117,36 +130,30 @@ protected void doInit(JobContext context) { @Override protected boolean doStart(JobContext context) throws Exception { - String sqlContent = parameters.getSqlContent(); - log.info("sql content={}", sqlContent); - // todo: Download the attachment that needs to execute SQL from OSS - SqlCommentProcessor processor = ConnectionSessionUtil.getSqlCommentProcessor(connectionSession); - - List offsetStringList = processor.split(new StringBuffer(), sqlContent); - result.setTotalStatements(offsetStringList.size()); + int index = 0; - for (int i = 0; i <= offsetStringList.size() - 1; i++) { - OffsetString offsetString = offsetStringList.get(i); + while (sqlIterator.hasNext()) { + String sql = sqlIterator.next().getStr(); + index++; // The retry statement will write the result into the buffer, while executing a new SQL command will // clear the buffer. queryResultSetBuffer.clear(); - String sql = offsetString.getStr(); try { boolean success = executeSqlWithRetries(sql); // write all result into json file - appendResultToJsonFile(i == 0, i == offsetStringList.size() - 1); + appendResultToJsonFile(index == 1, !sqlIterator.hasNext()); // write result rows into csv file - writeCsvFiles(i + 1); + writeCsvFiles(index); if (success) { result.incrementSucceedStatements(); } else { result.incrementFailedStatements(); // only write failed record into error records file - addErrorRecordsToFile(i + 1, sql); + addErrorRecordsToFile(index, sql); } } catch (Exception e) { result.incrementFailedStatements(); - addErrorRecordsToFile(i + 1, sql); + addErrorRecordsToFile(index, sql); if (parameters.getErrorStrategy() == TaskErrorStrategy.ABORT) { canceled = true; break; @@ -154,6 +161,7 @@ protected boolean doStart(JobContext context) throws Exception { log.warn("Sql task execution failed, will continue to execute next statement.", e); } } + result.setTotalStatements(index); log.info("The sql plan task execute finished,result={}", result); // all sql execute csv file list write to zip file @@ -163,6 +171,45 @@ protected boolean doStart(JobContext context) throws Exception { return true; } + private void initSqlIterator() { + if (CollectionUtils.isEmpty(parameters.getSqlObjectIds()) && StringUtils.isBlank(parameters.getSqlContent())) { + throw new UnexpectedException("Sql content and sql object id can not be null at the same time."); + } + InputStream sqlInputStream = new ByteArrayInputStream(new byte[0]); + if (StringUtils.isNotBlank(parameters.getSqlContent())) { + byte[] bytes = parameters.getSqlContent().getBytes(); + sqlInputStream = new ByteArrayInputStream(bytes); + this.sqlIterator = SqlUtils.iterator(connectionSession, sqlInputStream, StandardCharsets.UTF_8); + return; + } + + CloudObjectStorageService cloudObjectStorageService = getCloudObjectStorageService(); + if (Objects.isNull(cloudObjectStorageService) || !cloudObjectStorageService.supported()) { + log.warn("Cloud object storage service not supported."); + throw new UnexpectedException("Cloud object storage service not supported"); + } + + for (String sqlObjectId : parameters.getSqlObjectIds()) { + try { + InputStream current = cloudObjectStorageService.getObject(sqlObjectId); + // remove UTF-8 BOM if exists + current.mark(3); + byte[] byteSql = new byte[3]; + if (current.read(byteSql) >= 3 && byteSql[0] == (byte) 0xef && byteSql[1] == (byte) 0xbb + && byteSql[2] == (byte) 0xbf) { + current.reset(); + current.skip(3); + } else { + current.reset(); + } + sqlInputStream = new SequenceInputStream(sqlInputStream, current); + } catch (IOException e) { + log.warn("Read content from cloud object storage failed, sqlObjectId={}", sqlObjectId); + throw new InternalServerError("load database change task file failed", e); + } + } + this.sqlIterator = SqlUtils.iterator(connectionSession, sqlInputStream, StandardCharsets.UTF_8); + } @Override