Skip to content

Commit

Permalink
Merge pull request #250 from rk2b-fenghaolun/develop
Browse files Browse the repository at this point in the history
fix component execute order
  • Loading branch information
rk2b-fenghaolun authored Apr 8, 2024
2 parents 276ecb9 + 511f921 commit 4e39ab6
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,11 @@ public void runModelTask(ComponentTaskReq req) {
dataComponent.setComponentState(2);
req.getDataModelTask().setComponentJson(JSONObject.toJSONString(req.getDataComponents()));
dataModelPrRepository.updateDataModelTask(req.getDataModelTask());
dataComponent.setComponentState(1);
executeBeanMethod(false, dataComponentReqMap.get(dataComponent.getComponentCode()), req);
if (req.getDataTask().getTaskState().equals(TaskStateEnum.FAIL.getStateType())) {
dataComponent.setComponentState(3);
} else {
dataComponent.setComponentState(1);
}
dataComponent.setEndTime(System.currentTimeMillis());
req.getDataModelTask().setComponentJson(JSONObject.toJSONString(req.getDataComponents()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,21 +381,43 @@ public BaseResultEntity runTaskModel(Long modelId,Long userId) {
}
ComponentTaskReq taskReq = new ComponentTaskReq(dataModel);
DataModelAndComponentReq modelComponentReq = taskReq.getModelComponentReq();
List<DataComponentReq> modelComponents = modelComponentReq.getModelComponents();
List<String> zComponentCodeList= new ArrayList<>();
Map<String, DataComponentReq> modelComponentMap = new HashMap<>();
if (modelComponents != null && !modelComponents.isEmpty()) {
modelComponentMap = modelComponents.stream().collect(Collectors.toMap(DataComponentReq::getComponentCode, Function.identity()));
// 将组件中的`componentCode`按照组件顺序进行排序
List<DataComponentReq> zComponentList = modelComponents.stream().filter(dataComponentReq -> dataComponentReq.getInput().isEmpty()).collect(Collectors.toList());
zComponentCodeList= new ArrayList<>();
if (zComponentList.isEmpty()) {
return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"未查询到模型组件信息");
}
// 开头的code
zComponentCodeList.add(zComponentList.get(0).getComponentCode());

// 递归方法,排序componentCode
sortComponent(zComponentList.get(0).getOutput(), zComponentCodeList, modelComponentMap);
log.info("sort后的组件顺序:{}", zComponentCodeList);
} else {
return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"未查询到模型组件信息");
}
if (modelComponentReq==null) {
log.info("{}-{}", BaseResultEnum.DATA_RUN_TASK_FAIL.getMessage(), "组件解析失败");
return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"组件解析失败");
}
Set<String> mandatorySet = componentsConfiguration.getModelComponents().stream()
/*Set<String> mandatorySet = componentsConfiguration.getModelComponents().stream()
.filter(modelComponent -> modelComponent.getIsMandatory() == 0)
.map(ModelComponent::getComponentCode)
.collect(Collectors.toSet());
.collect(Collectors.toSet());*/
// Set<String> reqSet = modelComponentReq.getModelComponents().stream().map(DataComponentReq::getComponentCode).collect(Collectors.toSet());
// mandatorySet.removeAll(reqSet);
// if (!mandatorySet.isEmpty()) {
// return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"缺少必选组件,请检查组件");
// }
for (DataComponentReq modelComponent : modelComponentReq.getModelComponents()) {
BaseResultEntity baseResultEntity = dataAsyncService.executeBeanMethod(true, modelComponent, taskReq);
for (String componentCode : zComponentCodeList) {
BaseResultEntity baseResultEntity = dataAsyncService.executeBeanMethod(true, modelComponentMap.get(componentCode), taskReq);
if (baseResultEntity.getCode()!=0){
log.error("{}-[{}]", "组件执行结果失败", JSONObject.toJSONString(baseResultEntity));
return baseResultEntity;
}
}
Expand Down Expand Up @@ -424,6 +446,16 @@ public BaseResultEntity runTaskModel(Long modelId,Long userId) {
return BaseResultEntity.success(returnMap);
}

private void sortComponent(List<DataComponentRelationReq> dataComponentRelationReqs, List<String> zComponentCodeList, Map<String, DataComponentReq> componentMap) {
if (dataComponentRelationReqs.isEmpty()) {
return;
}
DataComponentRelationReq dataComponentRelationReq = dataComponentRelationReqs.get(0);
zComponentCodeList.add(dataComponentRelationReq.getComponentCode());
DataComponentReq dataComponentReq = componentMap.get(dataComponentRelationReq.getComponentCode());
sortComponent(dataComponentReq.getOutput(), zComponentCodeList, componentMap);
}

public BaseResultEntity restartTaskModel(Long taskId) {
DataTask dataTask = dataTaskRepository.selectDataTaskByTaskId(taskId);
if (dataTask==null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.primihub.biz.service.data.component.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.primihub.biz.config.base.BaseConfiguration;
import com.primihub.biz.config.base.ComponentsConfiguration;
Expand Down Expand Up @@ -55,10 +56,12 @@ public BaseResultEntity runTask(DataComponentReq req, ComponentTaskReq taskReq)
List<String> ids = taskReq.getFusionResourceList().stream().map(data -> data.get("resourceId").toString()).collect(Collectors.toList());
List<ModelDerivationDto> newest = taskReq.getNewest();
log.info("ids:{}", ids);
log.info("newest:{}", JSON.toJSONString(newest));
String path = baseConfiguration.getRunModelFileUrlDirPrefix()+taskReq.getDataTask().getTaskIdName() + File.separator + "fitTransform";
Map<String, GrpcComponentDto> fitTransformEntityMap = getGrpcComponentDataSetMap(taskReq.getFusionResourceList(),path);
fitTransformEntityMap.remove(taskReq.getFreemarkerMap().get(DataConstant.PYTHON_ARBITER_DATASET));
log.info("fitTransform-1:{}", JSONObject.toJSONString(fitTransformEntityMap));
Map<String, ModelDerivationDto> oldResourceIdMap = new HashMap<>();
if (newest!=null && newest.size()!=0){
ids = new ArrayList<>();
for (ModelDerivationDto modelDerivationDto : newest) {
Expand All @@ -68,17 +71,18 @@ public BaseResultEntity runTask(DataComponentReq req, ComponentTaskReq taskReq)
}
log.info("newids:{}", ids);
log.info("fitTransform-2:{}", JSONObject.toJSONString(fitTransformEntityMap));
oldResourceIdMap = newest.stream().collect(Collectors.toMap(ModelDerivationDto::getOriginalResourceId, Function.identity()));
}
Map<String, ModelDerivationDto> oldResourceIdMap = newest.stream().collect(Collectors.toMap(ModelDerivationDto::getOriginalResourceId, Function.identity()));
log.info("freemarkerMap:{}", JSON.toJSONString(taskReq.getFreemarkerMap()));
try {
GrpcComponentDto labelDatasetDto = fitTransformEntityMap.get(taskReq.getFreemarkerMap().get(DataConstant.PYTHON_LABEL_DATASET));
if (labelDatasetDto == null) {
if (labelDatasetDto == null && oldResourceIdMap.size() > 0) {
labelDatasetDto = fitTransformEntityMap.get(oldResourceIdMap.get(taskReq.getFreemarkerMap().get(DataConstant.PYTHON_LABEL_DATASET)).getNewResourceId());
}
taskReq.getFreemarkerMap().put("new_"+DataConstant.PYTHON_LABEL_DATASET,labelDatasetDto.getNewDataSetId());
taskReq.getFreemarkerMap().put("new_"+DataConstant.PYTHON_LABEL_DATASET+"_path",labelDatasetDto.getOutputFilePath());
taskReq.getFreemarkerMap().put("new_"+DataConstant.PYTHON_LABEL_DATASET, labelDatasetDto.getNewDataSetId());
taskReq.getFreemarkerMap().put("new_"+DataConstant.PYTHON_LABEL_DATASET+"_path", labelDatasetDto.getOutputFilePath());
GrpcComponentDto guestDatasetDto = fitTransformEntityMap.get(taskReq.getFreemarkerMap().get(DataConstant.PYTHON_GUEST_DATASET));
if (guestDatasetDto == null) {
if (guestDatasetDto == null && oldResourceIdMap.size() > 0) {
guestDatasetDto = fitTransformEntityMap.get(oldResourceIdMap.get(taskReq.getFreemarkerMap().get(DataConstant.PYTHON_GUEST_DATASET)).getNewResourceId());
}
taskReq.getFreemarkerMap().put("new_"+DataConstant.PYTHON_GUEST_DATASET,guestDatasetDto.getNewDataSetId());
Expand Down

0 comments on commit 4e39ab6

Please sign in to comment.