diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 450f226ed6ca..29a890a90ada 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -147,7 +147,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -234,7 +233,7 @@ public TSStatus testSubProcedure() { public TSStatus deleteDatabases( final List deleteSgSchemaList, final boolean isGeneratedByPipe) { - final List procedureIds = new ArrayList<>(); + List procedures = new ArrayList<>(); final long startCheckTimeForProcedures = System.currentTimeMillis(); for (final TDatabaseSchema databaseSchema : deleteSgSchemaList) { final String database = databaseSchema.getName(); @@ -248,9 +247,10 @@ public TSStatus deleteDatabases( hasOverlappedTask = procedureIdDuplicatePair.getRight(); if (Boolean.FALSE.equals(procedureIdDuplicatePair.getRight())) { - final DeleteDatabaseProcedure deleteDatabaseProcedure = + DeleteDatabaseProcedure procedure = new DeleteDatabaseProcedure(databaseSchema, isGeneratedByPipe); - procedureIds.add(this.executor.submitProcedure(deleteDatabaseProcedure)); + this.executor.submitProcedure(procedure); + procedures.add(procedure); break; } try { @@ -268,33 +268,34 @@ public TSStatus deleteDatabases( } } } - final List procedureStatus = new ArrayList<>(); - final boolean isSucceed = waitingProcedureFinished(procedureIds, procedureStatus); + List results = new ArrayList<>(procedures.size()); + procedures.forEach(procedure -> results.add(waitingProcedureFinished(procedure))); // Clear the previously deleted regions final PartitionManager partitionManager = getConfigManager().getPartitionManager(); partitionManager.getRegionMaintainer().submit(partitionManager::maintainRegionReplicas); - if (isSucceed) { + if (results.stream() + .allMatch(result -> result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())) { return StatusUtils.OK; } else { - return RpcUtils.getStatus(procedureStatus); + return RpcUtils.getStatus(results); } } public TSStatus deleteTimeSeries( String queryId, PathPatternTree patternTree, boolean isGeneratedByPipe) { - long procedureId = -1; + DeleteTimeSeriesProcedure procedure = null; synchronized (this) { boolean hasOverlappedTask = false; ProcedureType type; DeleteTimeSeriesProcedure deleteTimeSeriesProcedure; - for (Procedure procedure : executor.getProcedures().values()) { - type = ProcedureFactory.getProcedureType(procedure); + for (Procedure runningProcedure : executor.getProcedures().values()) { + type = ProcedureFactory.getProcedureType(runningProcedure); if (type == null || !type.equals(ProcedureType.DELETE_TIMESERIES_PROCEDURE)) { continue; } - deleteTimeSeriesProcedure = ((DeleteTimeSeriesProcedure) procedure); + deleteTimeSeriesProcedure = ((DeleteTimeSeriesProcedure) runningProcedure); if (queryId.equals(deleteTimeSeriesProcedure.getQueryId())) { - procedureId = deleteTimeSeriesProcedure.getProcId(); + procedure = deleteTimeSeriesProcedure; break; } if (patternTree.isOverlapWith(deleteTimeSeriesProcedure.getPatternTree())) { @@ -303,44 +304,36 @@ public TSStatus deleteTimeSeries( } } - if (procedureId == -1) { + if (procedure == null) { if (hasOverlappedTask) { return RpcUtils.getStatus( TSStatusCode.OVERLAP_WITH_EXISTING_TASK, "Some other task is deleting some target timeseries."); } - procedureId = - this.executor.submitProcedure( - new DeleteTimeSeriesProcedure(queryId, patternTree, isGeneratedByPipe)); + procedure = new DeleteTimeSeriesProcedure(queryId, patternTree, isGeneratedByPipe); + this.executor.submitProcedure(procedure); } } - List procedureStatus = new ArrayList<>(); - boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus); - if (isSucceed) { - return StatusUtils.OK; - } else { - return procedureStatus.get(0); - } + return waitingProcedureFinished(procedure); } public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) { String queryId = req.getQueryId(); PathPatternTree patternTree = PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree())); - long procedureId = -1; + DeleteLogicalViewProcedure procedure = null; synchronized (this) { boolean hasOverlappedTask = false; ProcedureType type; DeleteLogicalViewProcedure deleteLogicalViewProcedure; - for (Procedure procedure : executor.getProcedures().values()) { - type = ProcedureFactory.getProcedureType(procedure); + for (Procedure runningProcedure : executor.getProcedures().values()) { + type = ProcedureFactory.getProcedureType(runningProcedure); if (type == null || !type.equals(ProcedureType.DELETE_LOGICAL_VIEW_PROCEDURE)) { continue; } - deleteLogicalViewProcedure = ((DeleteLogicalViewProcedure) procedure); + deleteLogicalViewProcedure = ((DeleteLogicalViewProcedure) runningProcedure); if (queryId.equals(deleteLogicalViewProcedure.getQueryId())) { - procedureId = deleteLogicalViewProcedure.getProcId(); + procedure = deleteLogicalViewProcedure; break; } if (patternTree.isOverlapWith(deleteLogicalViewProcedure.getPatternTree())) { @@ -349,28 +342,19 @@ public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) { } } - if (procedureId == -1) { + if (procedure == null) { if (hasOverlappedTask) { return RpcUtils.getStatus( TSStatusCode.OVERLAP_WITH_EXISTING_TASK, "Some other task is deleting some target views."); } - procedureId = - this.executor.submitProcedure( - new DeleteLogicalViewProcedure( - queryId, - patternTree, - req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe())); + procedure = + new DeleteLogicalViewProcedure( + queryId, patternTree, req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe()); + this.executor.submitProcedure(procedure); } } - List procedureStatus = new ArrayList<>(); - boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus); - if (isSucceed) { - return StatusUtils.OK; - } else { - return procedureStatus.get(0); - } + return waitingProcedureFinished(procedure); } public TSStatus alterLogicalView(TAlterLogicalViewReq req) { @@ -386,56 +370,49 @@ public TSStatus alterLogicalView(TAlterLogicalViewReq req) { viewPathToSourceMap.put(path, viewExpression); } - long procedureId = -1; + AlterLogicalViewProcedure procedure = null; synchronized (this) { ProcedureType type; AlterLogicalViewProcedure alterLogicalViewProcedure; - for (Procedure procedure : executor.getProcedures().values()) { - type = ProcedureFactory.getProcedureType(procedure); + for (Procedure runningProcedure : executor.getProcedures().values()) { + type = ProcedureFactory.getProcedureType(runningProcedure); if (type == null || !type.equals(ProcedureType.ALTER_LOGICAL_VIEW_PROCEDURE)) { continue; } - alterLogicalViewProcedure = ((AlterLogicalViewProcedure) procedure); + alterLogicalViewProcedure = ((AlterLogicalViewProcedure) runningProcedure); if (queryId.equals(alterLogicalViewProcedure.getQueryId())) { - procedureId = alterLogicalViewProcedure.getProcId(); + procedure = alterLogicalViewProcedure; break; } } - if (procedureId == -1) { - procedureId = - this.executor.submitProcedure( - new AlterLogicalViewProcedure( - queryId, - viewPathToSourceMap, - req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe())); + if (procedure == null) { + procedure = + new AlterLogicalViewProcedure( + queryId, + viewPathToSourceMap, + req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe()); + this.executor.submitProcedure(procedure); } } - List procedureStatus = new ArrayList<>(); - boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus); - if (isSucceed) { - return StatusUtils.OK; - } else { - return procedureStatus.get(0); - } + return waitingProcedureFinished(procedure); } public TSStatus setSchemaTemplate( String queryId, String templateName, String templateSetPath, boolean isGeneratedByPipe) { - long procedureId = -1; + SetTemplateProcedure procedure = null; synchronized (this) { boolean hasOverlappedTask = false; ProcedureType type; SetTemplateProcedure setTemplateProcedure; - for (Procedure procedure : executor.getProcedures().values()) { - type = ProcedureFactory.getProcedureType(procedure); + for (Procedure runningProcedure : executor.getProcedures().values()) { + type = ProcedureFactory.getProcedureType(runningProcedure); if (type == null || !type.equals(ProcedureType.SET_TEMPLATE_PROCEDURE)) { continue; } - setTemplateProcedure = (SetTemplateProcedure) procedure; + setTemplateProcedure = (SetTemplateProcedure) runningProcedure; if (queryId.equals(setTemplateProcedure.getQueryId())) { - procedureId = setTemplateProcedure.getProcId(); + procedure = setTemplateProcedure; break; } if (templateSetPath.equals(setTemplateProcedure.getTemplateSetPath())) { @@ -444,43 +421,35 @@ public TSStatus setSchemaTemplate( } } - if (procedureId == -1) { + if (procedure == null) { if (hasOverlappedTask) { return RpcUtils.getStatus( TSStatusCode.OVERLAP_WITH_EXISTING_TASK, "Some other task is setting template on target path."); } - procedureId = - this.executor.submitProcedure( - new SetTemplateProcedure( - queryId, templateName, templateSetPath, isGeneratedByPipe)); + procedure = + new SetTemplateProcedure(queryId, templateName, templateSetPath, isGeneratedByPipe); + this.executor.submitProcedure(procedure); } } - List procedureStatus = new ArrayList<>(); - boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus); - if (isSucceed) { - return StatusUtils.OK; - } else { - return procedureStatus.get(0); - } + return waitingProcedureFinished(procedure); } public TSStatus deactivateTemplate( String queryId, Map> templateSetInfo, boolean isGeneratedByPipe) { - long procedureId = -1; + DeactivateTemplateProcedure procedure = null; synchronized (this) { boolean hasOverlappedTask = false; ProcedureType type; DeactivateTemplateProcedure deactivateTemplateProcedure; - for (Procedure procedure : executor.getProcedures().values()) { - type = ProcedureFactory.getProcedureType(procedure); + for (Procedure runningProcedure : executor.getProcedures().values()) { + type = ProcedureFactory.getProcedureType(runningProcedure); if (type == null || !type.equals(ProcedureType.DEACTIVATE_TEMPLATE_PROCEDURE)) { continue; } - deactivateTemplateProcedure = (DeactivateTemplateProcedure) procedure; + deactivateTemplateProcedure = (DeactivateTemplateProcedure) runningProcedure; if (queryId.equals(deactivateTemplateProcedure.getQueryId())) { - procedureId = deactivateTemplateProcedure.getProcId(); + procedure = deactivateTemplateProcedure; break; } for (PartialPath pattern : templateSetInfo.keySet()) { @@ -500,42 +469,34 @@ public TSStatus deactivateTemplate( } } - if (procedureId == -1) { + if (procedure == null) { if (hasOverlappedTask) { return RpcUtils.getStatus( TSStatusCode.OVERLAP_WITH_EXISTING_TASK, "Some other task is deactivating some target template from target path."); } - procedureId = - this.executor.submitProcedure( - new DeactivateTemplateProcedure(queryId, templateSetInfo, isGeneratedByPipe)); + procedure = new DeactivateTemplateProcedure(queryId, templateSetInfo, isGeneratedByPipe); + this.executor.submitProcedure(procedure); } } - List procedureStatus = new ArrayList<>(); - boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus); - if (isSucceed) { - return StatusUtils.OK; - } else { - return procedureStatus.get(0); - } + return waitingProcedureFinished(procedure); } public TSStatus unsetSchemaTemplate( String queryId, Template template, PartialPath path, boolean isGeneratedByPipe) { - long procedureId = -1; + UnsetTemplateProcedure procedure = null; synchronized (this) { boolean hasOverlappedTask = false; ProcedureType type; UnsetTemplateProcedure unsetTemplateProcedure; - for (Procedure procedure : executor.getProcedures().values()) { - type = ProcedureFactory.getProcedureType(procedure); + for (Procedure runningProcedure : executor.getProcedures().values()) { + type = ProcedureFactory.getProcedureType(runningProcedure); if (type == null || !type.equals(ProcedureType.UNSET_TEMPLATE_PROCEDURE)) { continue; } - unsetTemplateProcedure = (UnsetTemplateProcedure) procedure; + unsetTemplateProcedure = (UnsetTemplateProcedure) runningProcedure; if (queryId.equals(unsetTemplateProcedure.getQueryId())) { - procedureId = unsetTemplateProcedure.getProcId(); + procedure = unsetTemplateProcedure; break; } if (template.getId() == unsetTemplateProcedure.getTemplateId() @@ -545,26 +506,18 @@ public TSStatus unsetSchemaTemplate( } } - if (procedureId == -1) { + if (procedure == null) { if (hasOverlappedTask) { return RpcUtils.getStatus( TSStatusCode.OVERLAP_WITH_EXISTING_TASK, "Some other task is unsetting target template from target path " + path.getFullPath()); } - procedureId = - this.executor.submitProcedure( - new UnsetTemplateProcedure(queryId, template, path, isGeneratedByPipe)); + procedure = new UnsetTemplateProcedure(queryId, template, path, isGeneratedByPipe); + this.executor.submitProcedure(procedure); } } - List procedureStatus = new ArrayList<>(); - boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus); - if (isSucceed) { - return StatusUtils.OK; - } else { - return procedureStatus.get(0); - } + return waitingProcedureFinished(procedure); } /** @@ -960,17 +913,15 @@ public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) { */ public TSStatus createRegionGroups( TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) { - final long procedureId = - executor.submitProcedure( - new CreateRegionGroupsProcedure(consensusGroupType, createRegionGroupsPlan)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; + CreateRegionGroupsProcedure procedure = + new CreateRegionGroupsProcedure(consensusGroupType, createRegionGroupsPlan); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode()) - .setMessage(statusList.get(0).getMessage()); + .setMessage(status.getMessage()); } } @@ -998,15 +949,13 @@ && new UpdateProcedurePlan(createTriggerProcedure).getSerializedSize() > planSiz .setMessage(e.getMessage()); } - final long procedureId = executor.submitProcedure(createTriggerProcedure); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; + executor.submitProcedure(createTriggerProcedure); + TSStatus status = waitingProcedureFinished(createTriggerProcedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode()) - .setMessage(statusList.get(0).getMessage()); + .setMessage(status.getMessage()); } } @@ -1017,25 +966,21 @@ && new UpdateProcedurePlan(createTriggerProcedure).getSerializedSize() > planSiz * {@link TSStatusCode#DROP_TRIGGER_ERROR} otherwise */ public TSStatus dropTrigger(String triggerName, boolean isGeneratedByPipe) { - long procedureId = - executor.submitProcedure(new DropTriggerProcedure(triggerName, isGeneratedByPipe)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; + DropTriggerProcedure procedure = new DropTriggerProcedure(triggerName, isGeneratedByPipe); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.DROP_TRIGGER_ERROR.getStatusCode()) - .setMessage(statusList.get(0).getMessage()); + .setMessage(status.getMessage()); } } public TSStatus createCQ(TCreateCQReq req, ScheduledExecutorService scheduledExecutor) { - final long procedureId = - executor.submitProcedure(new CreateCQProcedure(req, scheduledExecutor)); - final List statusList = new ArrayList<>(); - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - return statusList.get(0); + CreateCQProcedure procedure = new CreateCQProcedure(req, scheduledExecutor); + executor.submitProcedure(procedure); + return waitingProcedureFinished(procedure); } public TSStatus createModel(String modelName, String uri) { @@ -1045,15 +990,14 @@ public TSStatus createModel(String modelName, String uri) { } public TSStatus dropModel(String modelId) { - long procedureId = executor.submitProcedure(new DropModelProcedure(modelId)); - List statusList = new ArrayList<>(); - boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; + DropModelProcedure procedure = new DropModelProcedure(modelId); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.DROP_MODEL_ERROR.getStatusCode()) - .setMessage(statusList.get(0).getMessage()); + .setMessage(status.getMessage()); } } @@ -1076,38 +1020,35 @@ && new UpdateProcedurePlan(createPipePluginProcedure).getSerializedSize() .setMessage(e.getMessage()); } - final long procedureId = executor.submitProcedure(createPipePluginProcedure); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; + executor.submitProcedure(createPipePluginProcedure); + TSStatus status = waitingProcedureFinished(createPipePluginProcedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode()) - .setMessage(statusList.get(0).getMessage()); + .setMessage(status.getMessage()); } } public TSStatus dropPipePlugin(TDropPipePluginReq req) { - final long procedureId = - executor.submitProcedure( - new DropPipePluginProcedure( - req.getPluginName(), req.isSetIfExistsCondition() && req.isIfExistsCondition())); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; + DropPipePluginProcedure procedure = + new DropPipePluginProcedure( + req.getPluginName(), req.isSetIfExistsCondition() && req.isIfExistsCondition()); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ERROR.getStatusCode()) - .setMessage(statusList.get(0).getMessage()); + .setMessage(status.getMessage()); } } public TSStatus createConsensusPipe(TCreatePipeReq req) { try { - final long procedureId = executor.submitProcedure(new CreatePipeProcedureV2(req)); - return handleConsensusPipeProcedure(procedureId); + CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req); + executor.submitProcedure(procedure); + return handleConsensusPipeProcedure(procedure); } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); } @@ -1115,15 +1056,14 @@ public TSStatus createConsensusPipe(TCreatePipeReq req) { public TSStatus createPipe(TCreatePipeReq req) { try { - final long procedureId = executor.submitProcedure(new CreatePipeProcedureV2(req)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); + CreatePipeProcedureV2 procedure = new CreatePipeProcedureV2(req); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); @@ -1132,15 +1072,14 @@ public TSStatus createPipe(TCreatePipeReq req) { public TSStatus alterPipe(TAlterPipeReq req) { try { - final long procedureId = executor.submitProcedure(new AlterPipeProcedureV2(req)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); + AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); @@ -1149,8 +1088,9 @@ public TSStatus alterPipe(TAlterPipeReq req) { public TSStatus startConsensusPipe(String pipeName) { try { - final long procedureId = executor.submitProcedure(new StartPipeProcedureV2(pipeName)); - return handleConsensusPipeProcedure(procedureId); + StartPipeProcedureV2 procedure = new StartPipeProcedureV2(pipeName); + executor.submitProcedure(procedure); + return handleConsensusPipeProcedure(procedure); } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); } @@ -1158,15 +1098,14 @@ public TSStatus startConsensusPipe(String pipeName) { public TSStatus startPipe(String pipeName) { try { - final long procedureId = executor.submitProcedure(new StartPipeProcedureV2(pipeName)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); + StartPipeProcedureV2 procedure = new StartPipeProcedureV2(pipeName); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); @@ -1175,8 +1114,9 @@ public TSStatus startPipe(String pipeName) { public TSStatus stopConsensusPipe(String pipeName) { try { - final long procedureId = executor.submitProcedure(new StopPipeProcedureV2(pipeName)); - return handleConsensusPipeProcedure(procedureId); + StopPipeProcedureV2 procedure = new StopPipeProcedureV2(pipeName); + executor.submitProcedure(procedure); + return handleConsensusPipeProcedure(procedure); } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); } @@ -1184,15 +1124,14 @@ public TSStatus stopConsensusPipe(String pipeName) { public TSStatus stopPipe(String pipeName) { try { - final long procedureId = executor.submitProcedure(new StopPipeProcedureV2(pipeName)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); + StopPipeProcedureV2 procedure = new StopPipeProcedureV2(pipeName); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); @@ -1201,8 +1140,9 @@ public TSStatus stopPipe(String pipeName) { public TSStatus dropConsensusPipe(String pipeName) { try { - final long procedureId = executor.submitProcedure(new DropPipeProcedureV2(pipeName)); - return handleConsensusPipeProcedure(procedureId); + DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName); + executor.submitProcedure(procedure); + return handleConsensusPipeProcedure(procedure); } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); } @@ -1210,35 +1150,32 @@ public TSStatus dropConsensusPipe(String pipeName) { public TSStatus dropPipe(String pipeName) { try { - final long procedureId = executor.submitProcedure(new DropPipeProcedureV2(pipeName)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); + DropPipeProcedureV2 procedure = new DropPipeProcedureV2(pipeName); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); } } - private TSStatus handleConsensusPipeProcedure(final long procedureId) { - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); + private TSStatus handleConsensusPipeProcedure(Procedure procedure) { + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { // if time out, optimistically believe that this procedure will execute successfully. - if (statusList.get(0).getMessage().equals(PROCEDURE_TIMEOUT_MESSAGE)) { + if (status.getMessage().equals(PROCEDURE_TIMEOUT_MESSAGE)) { return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } // otherwise, some exceptions must have occurred, throw them. return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } @@ -1270,18 +1207,16 @@ public void pipeHandleMetaChange( public TSStatus pipeHandleMetaChangeWithBlock( boolean needWriteConsensusOnConfigNodes, boolean needPushPipeMetaToDataNodes) { try { - final long procedureId = - executor.submitProcedure( - new PipeHandleMetaChangeProcedure( - needWriteConsensusOnConfigNodes, needPushPipeMetaToDataNodes)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; + PipeHandleMetaChangeProcedure procedure = + new PipeHandleMetaChangeProcedure( + needWriteConsensusOnConfigNodes, needPushPipeMetaToDataNodes); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); @@ -1290,15 +1225,14 @@ public TSStatus pipeHandleMetaChangeWithBlock( public TSStatus pipeMetaSync() { try { - final long procedureId = executor.submitProcedure(new PipeMetaSyncProcedure()); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; + PipeMetaSyncProcedure procedure = new PipeMetaSyncProcedure(); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()); @@ -1307,15 +1241,14 @@ public TSStatus pipeMetaSync() { public TSStatus createTopic(TCreateTopicReq req) { try { - final long procedureId = executor.submitProcedure(new CreateTopicProcedure(req)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); + CreateTopicProcedure procedure = new CreateTopicProcedure(req); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.CREATE_TOPIC_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.CREATE_TOPIC_ERROR.getStatusCode()) @@ -1325,15 +1258,14 @@ public TSStatus createTopic(TCreateTopicReq req) { public TSStatus dropTopic(String topicName) { try { - final long procedureId = executor.submitProcedure(new DropTopicProcedure(topicName)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); + DropTopicProcedure procedure = new DropTopicProcedure(topicName); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.DROP_TOPIC_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.DROP_TOPIC_ERROR.getStatusCode()).setMessage(e.getMessage()); @@ -1342,15 +1274,14 @@ public TSStatus dropTopic(String topicName) { public TSStatus topicMetaSync() { try { - final long procedureId = executor.submitProcedure(new TopicMetaSyncProcedure()); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; + TopicMetaSyncProcedure procedure = new TopicMetaSyncProcedure(); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.TOPIC_PUSH_META_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.TOPIC_PUSH_META_ERROR.getStatusCode()) @@ -1360,15 +1291,14 @@ public TSStatus topicMetaSync() { public TSStatus createConsumer(TCreateConsumerReq req) { try { - final long procedureId = executor.submitProcedure(new CreateConsumerProcedure(req)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); + CreateConsumerProcedure procedure = new CreateConsumerProcedure(req); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.CREATE_CONSUMER_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.CREATE_CONSUMER_ERROR.getStatusCode()) @@ -1378,15 +1308,14 @@ public TSStatus createConsumer(TCreateConsumerReq req) { public TSStatus dropConsumer(TCloseConsumerReq req) { try { - final long procedureId = executor.submitProcedure(new DropConsumerProcedure(req)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); + DropConsumerProcedure procedure = new DropConsumerProcedure(req); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.DROP_CONSUMER_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.DROP_CONSUMER_ERROR.getStatusCode()) @@ -1396,15 +1325,14 @@ public TSStatus dropConsumer(TCloseConsumerReq req) { public TSStatus consumerGroupMetaSync() { try { - final long procedureId = executor.submitProcedure(new ConsumerGroupMetaSyncProcedure()); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; + ConsumerGroupMetaSyncProcedure procedure = new ConsumerGroupMetaSyncProcedure(); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } else { return new TSStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } } catch (Exception e) { return new TSStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR.getStatusCode()) @@ -1414,17 +1342,16 @@ public TSStatus consumerGroupMetaSync() { public TSStatus createSubscription(TSubscribeReq req) { try { - final long procedureId = executor.submitProcedure(new CreateSubscriptionProcedure(req)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); - } else if (PROCEDURE_TIMEOUT_MESSAGE.equals(statusList.get(0).getMessage())) { + CreateSubscriptionProcedure procedure = new CreateSubscriptionProcedure(req); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } else if (PROCEDURE_TIMEOUT_MESSAGE.equals(status.getMessage())) { // we assume that a timeout has occurred in the procedure related to the pipe in the // subscription procedure return new TSStatus(TSStatusCode.SUBSCRIPTION_PIPE_TIMEOUT_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } else { return new TSStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR.getStatusCode()); } @@ -1436,17 +1363,16 @@ public TSStatus createSubscription(TSubscribeReq req) { public TSStatus dropSubscription(TUnsubscribeReq req) { try { - final long procedureId = executor.submitProcedure(new DropSubscriptionProcedure(req)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return statusList.get(0); - } else if (PROCEDURE_TIMEOUT_MESSAGE.equals(statusList.get(0).getMessage())) { + DropSubscriptionProcedure procedure = new DropSubscriptionProcedure(req); + executor.submitProcedure(procedure); + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } else if (PROCEDURE_TIMEOUT_MESSAGE.equals(status.getMessage())) { // we assume that a timeout has occurred in the procedure related to the pipe in the // subscription procedure return new TSStatus(TSStatusCode.SUBSCRIPTION_PIPE_TIMEOUT_ERROR.getStatusCode()) - .setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage())); + .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage())); } else { return new TSStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR.getStatusCode()); } @@ -1459,16 +1385,10 @@ public TSStatus dropSubscription(TUnsubscribeReq req) { public TSStatus operateAuthPlan( AuthorPlan authorPlan, List dns, boolean isGeneratedByPipe) { try { - final long procedureId = - executor.submitProcedure(new AuthOperationProcedure(authorPlan, dns, isGeneratedByPipe)); - final List statusList = new ArrayList<>(); - final boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), statusList); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; - } else { - return new TSStatus(statusList.get(0).getCode()).setMessage(statusList.get(0).getMessage()); - } + AuthOperationProcedure procedure = + new AuthOperationProcedure(authorPlan, dns, isGeneratedByPipe); + executor.submitProcedure(procedure); + return waitingProcedureFinished(procedure); } catch (Exception e) { return new TSStatus(TSStatusCode.AUTH_OPERATE_EXCEPTION.getStatusCode()) .setMessage(e.getMessage()); @@ -1476,71 +1396,60 @@ public TSStatus operateAuthPlan( } public TSStatus setTTL(SetTTLPlan setTTLPlan, final boolean isGeneratedByPipe) { - long procedureId = executor.submitProcedure(new SetTTLProcedure(setTTLPlan, isGeneratedByPipe)); - - List procedureStatus = new ArrayList<>(); - boolean isSucceed = - waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus); - if (isSucceed) { - return RpcUtils.SUCCESS_STATUS; - } else { - return procedureStatus.get(0); - } + SetTTLProcedure procedure = new SetTTLProcedure(setTTLPlan, isGeneratedByPipe); + executor.submitProcedure(procedure); + return waitingProcedureFinished(procedure); } /** - * Waiting until the specific procedures finished. + * Waiting until the specific procedure finished. * - * @param procedureIds The specific procedures' index - * @param statusList The corresponding running results of these procedures - * @return True if all Procedures finished successfully, false otherwise + * @param procedure The specific procedure + * @return TSStatus the running result of this procedure */ - private boolean waitingProcedureFinished(List procedureIds, List statusList) { - boolean isSucceed = true; - for (long procedureId : procedureIds) { - final long startTimeForCurrentProcedure = System.currentTimeMillis(); - while (executor.isRunning() - && !executor.isFinished(procedureId) - && System.currentTimeMillis() - startTimeForCurrentProcedure < PROCEDURE_WAIT_TIME_OUT) { - sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT); - } - final Procedure finishedProcedure = - executor.getResultOrProcedure(procedureId); - if (!finishedProcedure.isFinished()) { - // The procedure is still executing - statusList.add( - RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK, PROCEDURE_TIMEOUT_MESSAGE)); - isSucceed = false; - continue; - } - if (finishedProcedure.isSuccess()) { - if (Objects.nonNull(finishedProcedure.getResult())) { - statusList.add( + private TSStatus waitingProcedureFinished(Procedure procedure) { + if (procedure == null) { + LOGGER.error("Unexpected null procedure parameters for waitingProcedureFinished"); + return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR); + } + TSStatus status; + final long startTimeForCurrentProcedure = System.currentTimeMillis(); + while (executor.isRunning() + && !executor.isFinished(procedure.getProcId()) + && System.currentTimeMillis() - startTimeForCurrentProcedure < PROCEDURE_WAIT_TIME_OUT) { + sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT); + } + if (!procedure.isFinished()) { + // The procedure is still executing + status = + RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK, PROCEDURE_TIMEOUT_MESSAGE); + } else { + if (procedure.isSuccess()) { + if (procedure.getResult() != null) { + status = RpcUtils.getStatus( - TSStatusCode.SUCCESS_STATUS, Arrays.toString(finishedProcedure.getResult()))); + TSStatusCode.SUCCESS_STATUS, Arrays.toString(procedure.getResult())); } else { - statusList.add(StatusUtils.OK); + status = StatusUtils.OK; } } else { - if (finishedProcedure.getException().getCause() instanceof IoTDBException) { - final IoTDBException e = (IoTDBException) finishedProcedure.getException().getCause(); + if (procedure.getException().getCause() instanceof IoTDBException) { + final IoTDBException e = (IoTDBException) procedure.getException().getCause(); if (e instanceof BatchProcessException) { - statusList.add( + status = RpcUtils.getStatus( Arrays.stream(((BatchProcessException) e).getFailingStatus()) - .collect(Collectors.toList()))); + .collect(Collectors.toList())); } else { - statusList.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); + status = RpcUtils.getStatus(e.getErrorCode(), e.getMessage()); } } else { - statusList.add( - StatusUtils.EXECUTE_STATEMENT_ERROR.setMessage( - finishedProcedure.getException().getMessage())); + status = + StatusUtils.EXECUTE_STATEMENT_ERROR.setMessage(procedure.getException().getMessage()); } - isSucceed = false; } } - return isSucceed; + return status; } private static String wrapTimeoutMessageForPipeProcedure(String message) { @@ -1667,19 +1576,18 @@ public TDeleteTableDeviceResp deleteDevices(final TDeleteTableDeviceReq req) { req.getPatternInfo(), req.getFilterInfo(), req.getModInfo()); - procedureId = this.executor.submitProcedure(procedure); + this.executor.submitProcedure(procedure); } } - executor.getResultOrProcedure(procedureId); - final List procedureStatus = new ArrayList<>(); - if (waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus)) { - if (Objects.isNull(procedure)) { - procedure = ((DeleteDevicesProcedure) executor.getResultOrProcedure(procedureId)); - } + TSStatus status = waitingProcedureFinished(procedure); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return new TDeleteTableDeviceResp(StatusUtils.OK) - .setDeletedNum(Objects.nonNull(procedure) ? procedure.getDeletedDevicesNum() : -1); + .setDeletedNum( + Optional.ofNullable(procedure) + .map(DeleteDevicesProcedure::getDeletedDevicesNum) + .orElse(-1L)); } else { - return new TDeleteTableDeviceResp(procedureStatus.get(0)); + return new TDeleteTableDeviceResp(status); } } @@ -1702,13 +1610,10 @@ private TSStatus executeWithoutDuplicate( TSStatusCode.OVERLAP_WITH_EXISTING_TASK, "Some other task is operating table with same name."); } - procedureId = this.executor.submitProcedure(procedure); + this.executor.submitProcedure(procedure); } } - final List procedureStatus = new ArrayList<>(); - return waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus) - ? StatusUtils.OK - : procedureStatus.get(0); + return waitingProcedureFinished(procedure); } public Pair checkDuplicateTableTask( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index ffdc86a4bfd0..d1bbb3219857 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -964,30 +964,6 @@ public boolean abort(long procId) { return abort(procId, true); } - public Procedure getResult(long procId) { - CompletedProcedureContainer retainer = completed.get(procId); - if (retainer == null) { - return null; - } else { - return retainer.getProcedure(); - } - } - - /** - * Query a procedure result. - * - * @param procId procedure id - * @return procedure or retainer - */ - public Procedure getResultOrProcedure(long procId) { - CompletedProcedureContainer retainer = completed.get(procId); - if (retainer == null) { - return procedures.get(procId); - } else { - return retainer.getProcedure(); - } - } - public ProcedureScheduler getScheduler() { return scheduler; }