From fabdd980449f419002961edffb6bba06600bf0f5 Mon Sep 17 00:00:00 2001 From: Like habits <30977529+likehabits@users.noreply.github.com> Date: Thu, 28 Sep 2023 14:30:05 +0800 Subject: [PATCH] September collaborate (#193) * add get service state * update get service state * update get service state * update get service state * update get service state * update get service state * update get service state * fix mpc sdk * fix psi * fix psi * fix psi * fix psi * fix fitTransform * fix psi * fix psi * fix psi * fix psi * fix psi * fix psi * fix psi * fix psi * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix pir * fix psi * fix pir --- .../sdk/task/dataenum/ModelTypeEnum.java | 28 ++-- .../factory/AbstractComponentGRPCExecute.java | 10 +- .../factory/AbstractGRPCExecuteFactory.java | 11 +- .../task/factory/AbstractKillGRPCExecute.java | 2 +- .../task/factory/AbstractPsiGRPCExecute.java | 15 +- .../sdk/task/param/TaskComponentParam.java | 10 ++ .../primihub/sdk/task/param/TaskPSIParam.java | 12 ++ .../templates/hetero_fitTransform.ftl | 76 +++++++++ .../resources/templates/homo_fitTransform.ftl | 57 +++++++ .../src/main/resources/templates/homo_lr.ftl | 2 +- .../resources/templates/homo_nn_binary.ftl | 2 +- .../application/PlatformApplication.java | 2 + .../controller/data/PirController.java | 20 ++- .../controller/data/PsiController.java | 16 +- .../controller/share/ShareDataController.java | 9 ++ .../primihub/biz/convert/DataPsiConvert.java | 13 +- .../biz/convert/DataResourceConvert.java | 10 +- .../biz/entity/data/po/DataFileField.java | 3 + .../primihub/biz/entity/data/po/DataPsi.java | 2 + .../biz/entity/data/req/DataPirTaskReq.java | 5 + .../biz/entity/data/req/DataPsiQueryReq.java | 13 ++ .../biz/entity/data/req/DataPsiReq.java | 5 + .../entity/data/vo/DataPirTaskDetailVo.java | 45 ++++++ .../biz/entity/data/vo/DataPirTaskVo.java | 2 + .../data/vo/DataPsiResourceAllocationVo.java | 2 +- .../biz/entity/data/vo/DataPsiTaskVo.java | 8 + .../biz/entity/data/vo/DataPsiVo.java | 24 ++- .../biz/entity/data/vo/DataResourceVo.java | 4 + .../secondarydb/data/DataPsiRepository.java | 5 +- .../secondarydb/data/DataTaskRepository.java | 4 + .../biz/service/data/DataAsyncService.java | 36 ++++- .../biz/service/data/DataPsiService.java | 50 ++++-- .../primihub/biz/service/data/PirService.java | 37 ++++- .../impl/DataSetComponentTaskServiceImpl.java | 1 + .../ExceptionComponentTaskServiceImpl.java | 2 + .../FitTransformComponentTaskServiceImpl.java | 147 ++++++++++++++++++ ...ntStatisticalComponentTaskServiceImpl.java | 2 + .../service/feign/FusionResourceService.java | 3 + .../biz/service/schedule/ScheduleService.java | 53 +++++++ .../biz/service/share/ShareService.java | 48 ++++++ .../biz/service/sys/SysAsyncService.java | 2 + .../biz/service/sys/SysOrganService.java | 3 + .../data/DataPsiPrRepositoryMapper.xml | 4 +- .../data/DataPsiRepositoryMapper.xml | 28 +++- .../data/DataTaskRepositoryMapper.xml | 13 ++ .../primihub/gateway/GatewayApplication.java | 2 +- primihub-service/script/ddl.sql | 1 + 47 files changed, 779 insertions(+), 70 deletions(-) create mode 100644 primihub-sdk/src/main/resources/templates/hetero_fitTransform.ftl create mode 100644 primihub-sdk/src/main/resources/templates/homo_fitTransform.ftl create mode 100644 primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPsiQueryReq.java create mode 100644 primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPirTaskDetailVo.java create mode 100644 primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/FitTransformComponentTaskServiceImpl.java create mode 100644 primihub-service/biz/src/main/java/com/primihub/biz/service/share/ShareService.java diff --git a/primihub-sdk/src/main/java/com/primihub/sdk/task/dataenum/ModelTypeEnum.java b/primihub-sdk/src/main/java/com/primihub/sdk/task/dataenum/ModelTypeEnum.java index 30fdc1cce..f6a2daf8c 100644 --- a/primihub-sdk/src/main/java/com/primihub/sdk/task/dataenum/ModelTypeEnum.java +++ b/primihub-sdk/src/main/java/com/primihub/sdk/task/dataenum/ModelTypeEnum.java @@ -4,20 +4,21 @@ import java.util.Map; public enum ModelTypeEnum { - V_XGBOOST(2,"taskModel-v_xgboost",0,"hetero_xgb.ftl","hetero_xgb_infer.ftl"), - TRANSVERSE_LR(3,"HFL_logistic_regression",1,"homo_lr.ftl","homo_lr_infer.ftl"), - MPC_LR(4,"taskModel-mpc_lr",1,null,null), - HETERO_LR(5,"VFL_logistic_regression",0,"hetero_lr.ftl","hetero_lr_infer.ftl"), - CLASSIFICATION_BINARY(6,"taskModel-nn_classification",1,"homo_nn_binary.ftl","homo_nn_binary_infer.ftl"), - REGRESSION_BINARY(7,"taskModel-nn_regression",1,"homo_nn_binary.ftl","homo_nn_binary_infer.ftl"), - HFL_LINEAR_REGRESSION(8,"HFL_linear_regression",1,"homo_lr.ftl","homo_lr_infer.ftl"), - VFL_LINEAR_REGRESSION(9,"VFL_linear_regression",0,"hetero_lr.ftl","hetero_lr_infer.ftl"), + V_XGBOOST(2,"taskModel-v_xgboost",0,"hetero_xgb.ftl","hetero_xgb_infer.ftl","hetero_fitTransform.ftl"), + TRANSVERSE_LR(3,"HFL_logistic_regression",1,"homo_lr.ftl","homo_lr_infer.ftl","homo_fitTransform.ftl"), + MPC_LR(4,"taskModel-mpc_lr",1,null,null,null), + HETERO_LR(5,"VFL_logistic_regression",0,"hetero_lr.ftl","hetero_lr_infer.ftl","hetero_fitTransform.ftl"), + CLASSIFICATION_BINARY(6,"taskModel-nn_classification",1,"homo_nn_binary.ftl","homo_nn_binary_infer.ftl","homo_fitTransform.ftl"), + REGRESSION_BINARY(7,"taskModel-nn_regression",1,"homo_nn_binary.ftl","homo_nn_binary_infer.ftl","homo_fitTransform.ftl"), + HFL_LINEAR_REGRESSION(8,"HFL_linear_regression",1,"homo_lr.ftl","homo_lr_infer.ftl","homo_fitTransform.ftl"), + VFL_LINEAR_REGRESSION(9,"VFL_linear_regression",0,"hetero_lr.ftl","hetero_lr_infer.ftl","hetero_fitTransform.ftl"), ; private Integer type; private Integer trainType; private String typeName; private String modelFtlPath; private String inferFtlPath; + private String fitTransformFtlPath; public static Map MODEL_TYPE_MAP=new HashMap(){ { @@ -27,12 +28,13 @@ public enum ModelTypeEnum { } }; - ModelTypeEnum(Integer type, String typeName, Integer trainType, String modelFtlPath, String inferFtlPath) { + ModelTypeEnum(Integer type, String typeName, Integer trainType, String modelFtlPath, String inferFtlPath,String fitTransformFtlPath) { this.type = type; this.typeName = typeName; this.trainType = trainType; this.modelFtlPath = modelFtlPath; this.inferFtlPath = inferFtlPath; + this.fitTransformFtlPath = fitTransformFtlPath; } public String getTypeName() { @@ -74,4 +76,12 @@ public String getInferFtlPath() { public void setInferFtlPath(String inferFtlPath) { this.inferFtlPath = inferFtlPath; } + + public String getFitTransformFtlPath() { + return fitTransformFtlPath; + } + + public void setFitTransformFtlPath(String fitTransformFtlPath) { + this.fitTransformFtlPath = fitTransformFtlPath; + } } diff --git a/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractComponentGRPCExecute.java b/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractComponentGRPCExecute.java index b88aa3e4e..983e4d32e 100644 --- a/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractComponentGRPCExecute.java +++ b/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractComponentGRPCExecute.java @@ -53,10 +53,14 @@ private void runComponentTask(Channel channel, TaskParam tas taskContentParam.getFreemarkerMap().put("model",taskContentParam.getModelType().getTypeName()); } String freemarkerContent; - if (StringUtils.isEmpty(taskContentParam.getTemplatesContent())){ - freemarkerContent = FreemarkerTemplate.getInstance().generateTemplateStr(taskContentParam.getFreemarkerMap(),taskContentParam.isInfer()?taskContentParam.getModelType().getInferFtlPath():taskContentParam.getModelType().getModelFtlPath()); + if (taskContentParam.isFitTransform()){ + freemarkerContent = FreemarkerTemplate.getInstance().generateTemplateStr(taskContentParam.getFreemarkerMap(),taskContentParam.getModelType().getFitTransformFtlPath()); }else { - freemarkerContent = taskContentParam.isUntreated()?FreemarkerTemplate.getInstance().generateTemplateStrFreemarkerContent(taskContentParam.isInfer()?taskContentParam.getModelType().getInferFtlPath():taskContentParam.getModelType().getModelFtlPath(),taskContentParam.getTemplatesContent(),taskContentParam.getFreemarkerMap()):taskContentParam.getTemplatesContent(); + if (StringUtils.isEmpty(taskContentParam.getTemplatesContent())){ + freemarkerContent = FreemarkerTemplate.getInstance().generateTemplateStr(taskContentParam.getFreemarkerMap(),taskContentParam.isInfer()?taskContentParam.getModelType().getInferFtlPath():taskContentParam.getModelType().getModelFtlPath()); + }else { + freemarkerContent = taskContentParam.isUntreated()?FreemarkerTemplate.getInstance().generateTemplateStrFreemarkerContent(taskContentParam.isInfer()?taskContentParam.getModelType().getInferFtlPath():taskContentParam.getModelType().getModelFtlPath(),taskContentParam.getTemplatesContent(),taskContentParam.getFreemarkerMap()):taskContentParam.getTemplatesContent(); + } } log.info("start taskParam:{} - freemarkerContent:{}",taskParam,freemarkerContent); Common.ParamValue componentParamsParamValue = Common.ParamValue.newBuilder().setValueString(ByteString.copyFrom(JSONObject.toJSONString(JSONObject.parseObject(freemarkerContent), SerializerFeature.WriteMapNullValue).getBytes(StandardCharsets.UTF_8))).build(); diff --git a/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractGRPCExecuteFactory.java b/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractGRPCExecuteFactory.java index fa68bb5cf..1cb452b10 100644 --- a/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractGRPCExecuteFactory.java +++ b/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractGRPCExecuteFactory.java @@ -123,14 +123,13 @@ public void continuouslyObtainTaskStatus(Channel channel,Common.TaskContext task } param.setError(sb.toString()); isContinue = false; - }else { - long success = getNumberOfSuccessfulTasks(key,cacheService); - log.info("taskid:{} - requestId:{} - num:{} - success:{}",param.getTaskId(),param.getRequestId(),partyCount,success); - if (partyCount <= success){ - isContinue = false; - } } } + long success = getNumberOfSuccessfulTasks(key,cacheService); + log.info("taskid:{} - requestId:{} - num:{} - success:{}",param.getTaskId(),param.getRequestId(),partyCount,success); + if (partyCount <= success){ + isContinue = false; + } } Thread.sleep(1000L); } diff --git a/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractKillGRPCExecute.java b/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractKillGRPCExecute.java index df4486a42..02dd81dd7 100644 --- a/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractKillGRPCExecute.java +++ b/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractKillGRPCExecute.java @@ -36,7 +36,7 @@ public void execute(Channel channel, TaskParam taskParam) { taskParam.setSuccess(true); }else { taskParam.setSuccess(false); - taskParam.setError(response.getMsgInfo()); + taskParam.setError(response.getMsgInfoBytes().toStringUtf8()); } log.info("kill end response:{}",response.toString()); } diff --git a/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractPsiGRPCExecute.java b/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractPsiGRPCExecute.java index e8a032eda..4141728b9 100644 --- a/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractPsiGRPCExecute.java +++ b/primihub-sdk/src/main/java/com/primihub/sdk/task/factory/AbstractPsiGRPCExecute.java @@ -12,6 +12,8 @@ import primihub.rpc.Common; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; public class AbstractPsiGRPCExecute extends AbstractGRPCExecuteFactory { @@ -67,15 +69,22 @@ private void runPsi(Channel channel, TaskParam param){ paramsBuilder.putParamMap("sync_result_to_server",syncResultToServerParamValue).putParamMap("server_outputFullFilname",serverOutputFullFilnameParamValue); } Common.TaskContext taskBuild = assembleTaskContext(param); + Map datasetMap = new HashMap<>(); + datasetMap.put("SERVER",Common.Dataset.newBuilder().putData("SERVER", param.getTaskContentParam().getServerData()).build()); + datasetMap.put("CLIENT",Common.Dataset.newBuilder().putData("CLIENT", param.getTaskContentParam().getClientData()).build()); + String code = ""; + if (param.getTaskContentParam().getPsiTag().equals(2)){ + datasetMap.put("TEE_COMPUTE",Common.Dataset.newBuilder().putData("TEE_COMPUTE", param.getTaskContentParam().getTeeData()).build()); + code = "psi"; + } Common.Task task= Common.Task.newBuilder() .setType(Common.TaskType.PSI_TASK) .setParams(paramsBuilder.build()) .setName("psiTask") .setTaskInfo(taskBuild) .setLanguage(Common.Language.PROTO) - .setCode(ByteString.copyFrom("".getBytes(StandardCharsets.UTF_8))) - .putPartyDatasets("SERVER",Common.Dataset.newBuilder().putData("SERVER", param.getTaskContentParam().getServerData()).build()) - .putPartyDatasets("CLIENT",Common.Dataset.newBuilder().putData("CLIENT", param.getTaskContentParam().getClientData()).build()) + .setCode(ByteString.copyFrom(code.getBytes(StandardCharsets.UTF_8))) + .putAllPartyDatasets(datasetMap) .build(); log.info("grpc Common.Task : \n{}",task.toString()); PushTaskRequest request=PushTaskRequest.newBuilder() diff --git a/primihub-sdk/src/main/java/com/primihub/sdk/task/param/TaskComponentParam.java b/primihub-sdk/src/main/java/com/primihub/sdk/task/param/TaskComponentParam.java index 91e26872c..86be68e32 100644 --- a/primihub-sdk/src/main/java/com/primihub/sdk/task/param/TaskComponentParam.java +++ b/primihub-sdk/src/main/java/com/primihub/sdk/task/param/TaskComponentParam.java @@ -23,6 +23,8 @@ public class TaskComponentParam{ private boolean infer = false; + private boolean fitTransform = false; + public ModelTypeEnum getModelType() { return modelType; } @@ -63,6 +65,14 @@ public void setInfer(boolean infer) { this.infer = infer; } + public boolean isFitTransform() { + return fitTransform; + } + + public void setFitTransform(boolean fitTransform) { + this.fitTransform = fitTransform; + } + @Override public String toString() { return "ComponentTaskParam{" + diff --git a/primihub-sdk/src/main/java/com/primihub/sdk/task/param/TaskPSIParam.java b/primihub-sdk/src/main/java/com/primihub/sdk/task/param/TaskPSIParam.java index 7b3606d60..8c8fc2093 100644 --- a/primihub-sdk/src/main/java/com/primihub/sdk/task/param/TaskPSIParam.java +++ b/primihub-sdk/src/main/java/com/primihub/sdk/task/param/TaskPSIParam.java @@ -4,6 +4,7 @@ import com.primihub.sdk.task.factory.AbstractPsiGRPCExecute; import java.util.Arrays; +import java.util.List; /** * psi 隐私求交组装类 @@ -25,6 +26,7 @@ public class TaskPSIParam { /** * 0、ECDH * 1、KKRT + * 2、TEE * 默认0 */ private Integer psiTag = 0; @@ -49,6 +51,8 @@ public class TaskPSIParam { */ private String serverOutputFullFilname; + private String teeData; + public String getClientData() { return clientData; } @@ -121,6 +125,14 @@ public void setServerOutputFullFilname(String serverOutputFullFilname) { this.serverOutputFullFilname = serverOutputFullFilname; } + public String getTeeData() { + return teeData; + } + + public void setTeeData(String teeData) { + this.teeData = teeData; + } + @Override public String toString() { return "PsiTaskParam{" + diff --git a/primihub-sdk/src/main/resources/templates/hetero_fitTransform.ftl b/primihub-sdk/src/main/resources/templates/hetero_fitTransform.ftl new file mode 100644 index 000000000..12077b44d --- /dev/null +++ b/primihub-sdk/src/main/resources/templates/hetero_fitTransform.ftl @@ -0,0 +1,76 @@ +{ + "roles": { + "host": "Bob", + "guest": [ + "Charlie" + ] + }, + "common_params": { + "model": "FL_Preprocess", + "process": "fit_transform", + "FL_type": "V", + "task_name": "VFL_simpleimpute_fit_transform", + "task": "classification" + }, + "role_params": { + "Bob": { + "data_set": "${label_dataset}", + "selected_column": null, + "id": "id", + "label": "y", + "preprocess_column": null, + "preprocess_dataset_id": "${new_label_dataset}", + "preprocess_dataset_path": "${new_label_dataset_path}", + "preprocess_module_path": "${new_label_dataset_path}.pkl", + "preprocess_module": { + "SimpleImputer_string": { + "column": null, + "missing_values": "np.nan", + "strategy": "${simpleImputerString}", + "fill_value": null, + "copy": true, + "add_indicator": false, + "keep_empty_features": false + }, + "SimpleImputer_numeric": { + "column": null, + "missing_values": "np.nan", + "strategy": "${simpleImputerNumeric}", + "fill_value": null, + "copy": true, + "add_indicator": false, + "keep_empty_features": false + } + } + }, + "Charlie": { + "data_set": "${guest_dataset}", + "selected_column": null, + "id": "id", + "preprocess_column": null, + "preprocess_dataset_id": "${new_guest_dataset}", + "preprocess_dataset_path": "${new_guest_dataset_path}", + "preprocess_module_path": "${new_guest_dataset_path}.pkl", + "preprocess_module": { + "SimpleImputer_string": { + "column": null, + "missing_values": "np.nan", + "strategy": "${simpleImputerString}", + "fill_value": null, + "copy": true, + "add_indicator": false, + "keep_empty_features": false + }, + "SimpleImputer_numeric": { + "column": null, + "missing_values": "np.nan", + "strategy": "${simpleImputerNumeric}", + "fill_value": null, + "copy": true, + "add_indicator": false, + "keep_empty_features": false + } + } + } + } +} \ No newline at end of file diff --git a/primihub-sdk/src/main/resources/templates/homo_fitTransform.ftl b/primihub-sdk/src/main/resources/templates/homo_fitTransform.ftl new file mode 100644 index 000000000..8c040ad42 --- /dev/null +++ b/primihub-sdk/src/main/resources/templates/homo_fitTransform.ftl @@ -0,0 +1,57 @@ +{ + "roles": { + "server": "Alice", + "client": [ + "Bob", + "Charlie" + ] + }, + "common_params": { + "model": "FL_Preprocess", + "process": "fit_transform", + "FL_type": "H", + "task_name": "HFL_simpleimpute_fit_transform", + "task": "classification", + "selected_column": null, + "id": "id", + "label": "y", + "preprocess_column": null, + "preprocess_module": { + "SimpleImputer_string": { + "column": null, + "missing_values": "np.nan", + "strategy": "${simpleImputerString}", + "fill_value": null, + "copy": true, + "add_indicator": false, + "keep_empty_features": false + }, + "SimpleImputer_numeric": { + "column": null, + "missing_values": "np.nan", + "strategy": "${simpleImputerNumeric}", + "fill_value": null, + "copy": true, + "add_indicator": false, + "keep_empty_features": false + } + } + }, + "role_params": { + "Bob": { + "data_set": "${label_dataset}", + "preprocess_dataset_id": "${new_label_dataset}", + "preprocess_dataset_path": "${new_label_dataset_path}", + "preprocess_module_path": "${new_label_dataset_path}.pkl" + }, + "Charlie": { + "data_set": "${guest_dataset}", + "preprocess_dataset_id": "${new_guest_dataset}", + "preprocess_dataset_path": "${new_guest_dataset_path}", + "preprocess_module_path": "${new_guest_dataset_path}.pkl" + }, + "Alice": { + "data_set": "${arbiter_dataset!""}" + } + } +} \ No newline at end of file diff --git a/primihub-sdk/src/main/resources/templates/homo_lr.ftl b/primihub-sdk/src/main/resources/templates/homo_lr.ftl index c4e63f8ca..c99dc6ea5 100644 --- a/primihub-sdk/src/main/resources/templates/homo_lr.ftl +++ b/primihub-sdk/src/main/resources/templates/homo_lr.ftl @@ -39,7 +39,7 @@ }, "Alice": { "data_set": "${arbiter_dataset}", - "metric_path": "${indicatorFileName}" + "metric_path": "/data${indicatorFileName}" } } } \ No newline at end of file diff --git a/primihub-sdk/src/main/resources/templates/homo_nn_binary.ftl b/primihub-sdk/src/main/resources/templates/homo_nn_binary.ftl index 83c12e0de..39053ae52 100644 --- a/primihub-sdk/src/main/resources/templates/homo_nn_binary.ftl +++ b/primihub-sdk/src/main/resources/templates/homo_nn_binary.ftl @@ -39,7 +39,7 @@ }, "Alice": { "data_set": "${arbiter_dataset}", - "metric_path": "${indicatorFileName}" + "metric_path": "/data${indicatorFileName}" } } } diff --git a/primihub-service/application/src/main/java/com/primihub/application/PlatformApplication.java b/primihub-service/application/src/main/java/com/primihub/application/PlatformApplication.java index 39c038567..368c59d4e 100644 --- a/primihub-service/application/src/main/java/com/primihub/application/PlatformApplication.java +++ b/primihub-service/application/src/main/java/com/primihub/application/PlatformApplication.java @@ -9,6 +9,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; @NacosPropertySources({ // @NacosPropertySource(dataId = "test", autoRefreshed = true), @@ -22,6 +23,7 @@ @ServletComponentScan(basePackages = {"com.primihub.biz.filter"}) @EnableBinding({SingleTaskChannel.class}) @EnableFeignClients(basePackages = {"com.primihub"}) +@EnableScheduling public class PlatformApplication { public static void main(String[] args) { diff --git a/primihub-service/application/src/main/java/com/primihub/application/controller/data/PirController.java b/primihub-service/application/src/main/java/com/primihub/application/controller/data/PirController.java index fe1767ed4..0b335c128 100644 --- a/primihub-service/application/src/main/java/com/primihub/application/controller/data/PirController.java +++ b/primihub-service/application/src/main/java/com/primihub/application/controller/data/PirController.java @@ -28,14 +28,17 @@ public class PirController { private PirService pirService; @RequestMapping("pirSubmitTask") - public BaseResultEntity pirSubmitTask(String resourceId,String pirParam){ + public BaseResultEntity pirSubmitTask(String resourceId,String pirParam,String taskName){ if (StringUtils.isBlank(resourceId)){ return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"resourceId"); } if (StringUtils.isBlank(pirParam)){ return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"pirParam"); } - return pirService.pirSubmitTask(resourceId,pirParam); + if (StringUtils.isBlank(taskName)){ + return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"taskName"); + } + return pirService.pirSubmitTask(resourceId,pirParam,taskName); } @RequestMapping("getPirTaskList") public BaseResultEntity getPirTaskList(DataPirTaskReq req){ @@ -51,6 +54,19 @@ public BaseResultEntity getPirTaskList(DataPirTaskReq req){ return pirService.getPirTaskList(req); } + /** + * 查询隐匿查询任务详情 + * @param taskId + * @return + */ + @GetMapping("getPirTaskDetail") + public BaseResultEntity getPirTaskDetail(Long taskId){ + if (taskId==null||taskId==0L) { + return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"taskId"); + } + return pirService.getPirTaskDetail(taskId); + } + @GetMapping("downloadPirTask") public void downloadPirTask(HttpServletResponse response, String taskId,String taskDate) { if (StringUtils.isBlank(taskId)||StringUtils.isBlank(taskDate)) { diff --git a/primihub-service/application/src/main/java/com/primihub/application/controller/data/PsiController.java b/primihub-service/application/src/main/java/com/primihub/application/controller/data/PsiController.java index 2e2fd76fb..7428dbf9b 100644 --- a/primihub-service/application/src/main/java/com/primihub/application/controller/data/PsiController.java +++ b/primihub-service/application/src/main/java/com/primihub/application/controller/data/PsiController.java @@ -4,10 +4,7 @@ import com.primihub.biz.entity.base.BaseResultEnum; import com.primihub.biz.entity.data.po.DataPsi; import com.primihub.biz.entity.data.po.DataPsiTask; -import com.primihub.biz.entity.data.req.DataPsiReq; -import com.primihub.biz.entity.data.req.DataPsiResourceReq; -import com.primihub.biz.entity.data.req.DataResourceReq; -import com.primihub.biz.entity.data.req.PageReq; +import com.primihub.biz.entity.data.req.*; import com.primihub.biz.service.data.DataPsiService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; @@ -72,6 +69,11 @@ public BaseResultEntity saveDataPsi(@RequestHeader("userId") Long userId, if (req.getPsiTag()==null) { return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"psiTag"); } + if (req.getPsiTag() == 2){ + if (StringUtils.isBlank(req.getTeeOrganId())){ + return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"teeOrganId"); + } + } return dataPsiService.saveDataPsi(req,userId); } @@ -118,14 +120,12 @@ public BaseResultEntity getPsiResourceDataList(PageReq req, /** * 查询隐私求交任务列表 - * @param resultName * @param req * @return */ @GetMapping("getPsiTaskList") - public BaseResultEntity getPsiTaskList(String resultName, - PageReq req){ - return dataPsiService.getPsiTaskList(req,resultName); + public BaseResultEntity getPsiTaskList(DataPsiQueryReq req){ + return dataPsiService.getPsiTaskList(req); } diff --git a/primihub-service/application/src/main/java/com/primihub/application/controller/share/ShareDataController.java b/primihub-service/application/src/main/java/com/primihub/application/controller/share/ShareDataController.java index 4ffc174a8..63183130b 100644 --- a/primihub-service/application/src/main/java/com/primihub/application/controller/share/ShareDataController.java +++ b/primihub-service/application/src/main/java/com/primihub/application/controller/share/ShareDataController.java @@ -11,6 +11,7 @@ import com.primihub.biz.service.data.DataModelService; import com.primihub.biz.service.data.DataProjectService; import com.primihub.biz.service.data.DataResourceService; +import com.primihub.biz.service.share.ShareService; import com.primihub.biz.service.sys.SysOrganService; import com.primihub.biz.service.test.TestService; import lombok.extern.slf4j.Slf4j; @@ -39,6 +40,14 @@ public class ShareDataController { private DataResourceService dataResourceService; @Autowired private TestService testService; + @Autowired + private ShareService shareService; + + @RequestMapping("/healthConnection") + public BaseResultEntity healthConnection(@RequestBody Object time){ + log.info("healthConnection - {}",time); + return BaseResultEntity.success(shareService.getServiceState()); + } /** * 创建编辑项目接口 diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/convert/DataPsiConvert.java b/primihub-service/biz/src/main/java/com/primihub/biz/convert/DataPsiConvert.java index 422dd5f5d..7a954a147 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/convert/DataPsiConvert.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/convert/DataPsiConvert.java @@ -3,12 +3,15 @@ import com.primihub.biz.entity.data.po.DataPsi; import com.primihub.biz.entity.data.po.DataPsiTask; import com.primihub.biz.entity.data.po.DataResource; +import com.primihub.biz.entity.data.po.DataTask; import com.primihub.biz.entity.data.req.DataPsiReq; import com.primihub.biz.entity.data.vo.DataPsiVo; import com.primihub.biz.entity.data.vo.PsiTaskVo; import com.primihub.biz.entity.sys.po.SysLocalOrganInfo; import org.apache.commons.lang.StringUtils; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; public class DataPsiConvert { @@ -30,6 +33,7 @@ public static DataPsi DataPsiReqConvertPo(DataPsiReq req){ dataPsi.setOutputFormat(StringUtils.isBlank(req.getOutputFormat())?"csv":req.getOutputFormat()); dataPsi.setResultOrganIds(req.getResultOrganIds()); dataPsi.setRemarks(req.getRemarks()); + dataPsi.setTeeOrganId(req.getTeeOrganId()); return dataPsi; } @@ -47,7 +51,7 @@ public static PsiTaskVo DataPsiTaskConvertVo(DataPsiTask task){ } - public static DataPsiVo DataPsiConvertVo(DataPsiTask task, DataPsi dataPsi, DataResource dataResource, Map otherDataResource, SysLocalOrganInfo sysLocalOrganInfo) { + public static DataPsiVo DataPsiConvertVo(DataPsiTask task, DataPsi dataPsi, DataResource dataResource, Map otherDataResource, SysLocalOrganInfo sysLocalOrganInfo, DataTask dataTask, List> dataList, String teeOrganName) { DataPsiVo dataPsiVo = new DataPsiVo(); dataPsiVo.setId(task.getId()); dataPsiVo.setOwnOrganId(dataPsi.getOwnOrganId()); @@ -81,6 +85,13 @@ public static DataPsiVo DataPsiConvertVo(DataPsiTask task, DataPsi dataPsi, Data dataPsiVo.setTaskId(task.getId()); dataPsiVo.setTaskIdName(task.getTaskId()); dataPsiVo.setTaskState(task.getTaskState()); + dataPsiVo.setTaskStartTime(dataTask.getTaskStartTime()); + dataPsiVo.setTaskEndTime(dataTask.getTaskEndTime()); + dataPsiVo.setTaskError(dataTask.getTaskErrorMsg()); + dataPsiVo.setDataList(dataList); + dataPsiVo.setTaskName(dataTask.getTaskName()); + dataPsiVo.setTeeOrganId(dataPsi.getTeeOrganId()); + dataPsiVo.setTeeOrganName(teeOrganName); return dataPsiVo; } } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/convert/DataResourceConvert.java b/primihub-service/biz/src/main/java/com/primihub/biz/convert/DataResourceConvert.java index a73515da1..7daa41c16 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/convert/DataResourceConvert.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/convert/DataResourceConvert.java @@ -1,5 +1,7 @@ package com.primihub.biz.convert; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.primihub.biz.entity.data.po.DataFileField; import com.primihub.biz.entity.data.po.DataResource; import com.primihub.biz.entity.data.po.DataResourceTag; @@ -116,6 +118,7 @@ public static DataResourceVo dataResourcePoConvertVo(DataResource po){ vo.setFileHandleField(StringUtils.isBlank(po.getFileHandleField())?new String[]{}:po.getFileHandleField().split(",")); vo.setResourceState(po.getResourceState()); vo.setResourceHashCode(po.getResourceHashCode()); + vo.setResourceFusionId(po.getResourceFusionId()); return vo; } public static DataFileFieldVo DataFileFieldPoConvertVo(DataFileField fileField){ @@ -161,7 +164,7 @@ public static DataPsiResourceAllocationVo DataResourcePoConvertAllocationVo(Data vo.setResourceName(dataResource.getResourceName()); if (fileFieldList!=null&&fileFieldList.size()>0){ for (DataFileField dataFileField : fileFieldList) { - vo.getKeywordList().add(dataFileField.getFieldName()); + vo.getKeywordList().add(dataResourceFieldPoConvertCopyVo(dataFileField)); } } return vo; @@ -218,10 +221,7 @@ public static DataPsiResourceAllocationVo fusionResourceConvertAllocationVo(Map< vo.setResourceId(fr.getOrDefault("resourceId","").toString()); vo.setResourceName(fr.getOrDefault("resourceName","").toString()); vo.setOrganId(fr.getOrDefault("organId","").toString()); - Object resourceColumnNameList = fr.get("resourceColumnNameList"); - if (resourceColumnNameList!=null&&StringUtils.isNotBlank(resourceColumnNameList.toString())){ - vo.getKeywordList().addAll(Arrays.asList(resourceColumnNameList.toString().split(","))); - } + vo.getKeywordList().addAll(JSONArray.parseArray(JSON.toJSONString(fr.get("fieldList")), DataResourceFieldCopyVo.class)); return vo; } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/po/DataFileField.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/po/DataFileField.java index ef789566b..79679eacf 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/po/DataFileField.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/po/DataFileField.java @@ -1,5 +1,6 @@ package com.primihub.biz.entity.data.po; +import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; import java.util.Date; @@ -97,6 +98,7 @@ public DataFileField(Long fileId, Long resourceId, String fieldName, String fiel /** * 是否删除 */ + @JsonIgnore private Integer isDel; /** @@ -107,6 +109,7 @@ public DataFileField(Long fileId, Long resourceId, String fieldName, String fiel /** * 修改时间 */ + @JsonIgnore private Date updateDate; diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/po/DataPsi.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/po/DataPsi.java index d422f4815..fcab5d8ee 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/po/DataPsi.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/po/DataPsi.java @@ -96,6 +96,8 @@ public class DataPsi { private String remarks; private Long userId; + + private String teeOrganId; /** * 是否删除 */ diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPirTaskReq.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPirTaskReq.java index 5644587d4..549c2f7a9 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPirTaskReq.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPirTaskReq.java @@ -20,4 +20,9 @@ public class DataPirTaskReq extends PageReq { private String taskId; + private String taskName; + + private String startDate; + private String endDate; + } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPsiQueryReq.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPsiQueryReq.java new file mode 100644 index 000000000..d398aa660 --- /dev/null +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPsiQueryReq.java @@ -0,0 +1,13 @@ +package com.primihub.biz.entity.data.req; + +import lombok.Data; + +@Data +public class DataPsiQueryReq extends PageReq{ + private String resultName; + private String organId; + private String taskName; + private Integer taskState; + private String startDate; + private String endDate; +} diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPsiReq.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPsiReq.java index 1273c1b94..7b5008f31 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPsiReq.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/req/DataPsiReq.java @@ -79,8 +79,13 @@ public class DataPsiReq { /** * 0、ECDH * 1、KKRT + * 2、TEE */ private Integer psiTag; + private String taskName; + + private String teeOrganId; + } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPirTaskDetailVo.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPirTaskDetailVo.java new file mode 100644 index 000000000..2d7377889 --- /dev/null +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPirTaskDetailVo.java @@ -0,0 +1,45 @@ +package com.primihub.biz.entity.data.vo; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; + +@Data +public class DataPirTaskDetailVo { + + private String taskName; + private String taskIdName; + private Integer taskState; + private String organName; + private String resourceName; + private String resourceId; + private String retrievalId; + private String taskError; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createDate; + + /** + * 任务开始时间 + */ + private Long taskStartTime; + + /** + * 任务结束时间 + */ + private Long taskEndTime; + + private List> list; + + public Long getConsuming() { + if (taskStartTime==null) { + return 0L; + } + if (taskEndTime==null||taskEndTime==0) { + return (System.currentTimeMillis() - taskStartTime)/1000; + } + return (taskEndTime - taskStartTime)/1000; + } +} diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPirTaskVo.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPirTaskVo.java index 0f8c37f4a..e4b41dc30 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPirTaskVo.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPirTaskVo.java @@ -40,6 +40,8 @@ public class DataPirTaskVo { private Integer available; + private String taskName; + /** * 任务状态(0未开始 1成功 2查询中 3失败) diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiResourceAllocationVo.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiResourceAllocationVo.java index 7c6421b24..ac0d45b94 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiResourceAllocationVo.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiResourceAllocationVo.java @@ -20,5 +20,5 @@ public class DataPsiResourceAllocationVo { */ private String organId; - private List keywordList = new ArrayList<>(); + private List keywordList = new ArrayList<>(); } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiTaskVo.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiTaskVo.java index 719194013..8a943c707 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiTaskVo.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiTaskVo.java @@ -16,6 +16,14 @@ public class DataPsiTaskVo { * 返回结果名称 */ private String resultName; + + private String taskName; + + private String otherOrganId; + + private String otherOrganName; + + private Integer psiTag; /** * 真实任务id */ diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiVo.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiVo.java index 6df704cb6..d49b33ef8 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiVo.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataPsiVo.java @@ -1,10 +1,13 @@ package com.primihub.biz.entity.data.vo; +import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; @Data @@ -113,7 +116,8 @@ public class DataPsiVo { /** * 创建时间 */ - @JsonIgnore +// @JsonIgnore + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createDate; /** @@ -124,8 +128,26 @@ public class DataPsiVo { * 任务id名称 */ private String taskIdName; + + private String taskName; /** * 运行状态 0未运行 1完成 2运行中 3失败 4取消 默认0 */ private Integer taskState; + + private String teeOrganId; + private String teeOrganName; + private String taskError; + + private List> dataList; + + private Long taskStartTime; + private Long taskEndTime; + + public Long getConsuming(){ + if ((taskStartTime!=null&&taskStartTime!=0L)&&(taskEndTime!=null&&taskEndTime!=0L)){ + return (taskEndTime-taskStartTime)/1000; + } + return 0L; + } } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataResourceVo.java b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataResourceVo.java index 5a9b78d25..5427edfc0 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataResourceVo.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/entity/data/vo/DataResourceVo.java @@ -118,4 +118,8 @@ public class DataResourceVo { * 资源状态 目前有 0上线 1下线 */ private Integer resourceState; + /** + * 中心节点资源id + */ + private String resourceFusionId; } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/repository/secondarydb/data/DataPsiRepository.java b/primihub-service/biz/src/main/java/com/primihub/biz/repository/secondarydb/data/DataPsiRepository.java index f0ad4fae4..9e4c87c6e 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/repository/secondarydb/data/DataPsiRepository.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/repository/secondarydb/data/DataPsiRepository.java @@ -2,6 +2,7 @@ import com.primihub.biz.entity.data.po.DataPsi; import com.primihub.biz.entity.data.po.DataPsiTask; +import com.primihub.biz.entity.data.req.DataPsiQueryReq; import com.primihub.biz.entity.data.vo.DataOrganPsiTaskVo; import com.primihub.biz.entity.data.vo.DataPsiTaskVo; import org.springframework.stereotype.Repository; @@ -12,13 +13,13 @@ @Repository public interface DataPsiRepository { - List selectPsiTaskPage(Map map); + List selectPsiTaskPage(DataPsiQueryReq req); DataPsiTask selectPsiTaskById(Long id); DataPsi selectPsiById(Long id); - Long selectPsiTaskPageCount(Map map); + Long selectPsiTaskPageCount(DataPsiQueryReq req); List selectOrganPsiTaskPage(Map map); diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/repository/secondarydb/data/DataTaskRepository.java b/primihub-service/biz/src/main/java/com/primihub/biz/repository/secondarydb/data/DataTaskRepository.java index acbe4b33f..e645d103a 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/repository/secondarydb/data/DataTaskRepository.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/repository/secondarydb/data/DataTaskRepository.java @@ -1,6 +1,7 @@ package com.primihub.biz.repository.secondarydb.data; +import com.primihub.biz.entity.data.po.DataPirTask; import com.primihub.biz.entity.data.po.DataTask; import com.primihub.biz.entity.data.req.DataPirTaskReq; import com.primihub.biz.entity.data.req.DataTaskReq; @@ -27,4 +28,7 @@ public interface DataTaskRepository { List selectDataTaskList(DataTaskReq req); Integer selectDataTaskListCount(DataTaskReq req); + + + DataPirTask selectPirTaskById(Long id); } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataAsyncService.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataAsyncService.java index 3cf5925c5..3b6110cf5 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataAsyncService.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataAsyncService.java @@ -203,7 +203,7 @@ private String formatModelComponentJson(DataModelAndComponentReq params, Map data = (LinkedHashMap)resourceList.getResult(); + List> resourceDataList = (List>)data.get("data"); + if (resourceDataList==null || resourceDataList.size()==0) { + psiTask.setTaskState(TaskStateEnum.FAIL.getStateType()); + dataPsiPrRepository.updateDataPsiTask(psiTask); + dataTask.setTaskState(TaskStateEnum.FAIL.getStateType()); + dataTask.setTaskEndTime(System.currentTimeMillis()); + dataTask.setTaskErrorMsg("TEE 机构资源查询失败:机构下无资源信息"); + dataTaskPrRepository.updateDataTask(dataTask); + return; + } + teeResourceId = resourceDataList.get(0).get("resourceId").toString(); + } psiTask.setTaskState(2); dataPsiPrRepository.updateDataPsiTask(psiTask); log.info("psi available:{}", available); @@ -247,6 +278,7 @@ public void psiGrpcRun(DataPsiTask psiTask, DataPsi dataPsi) { List serverFields = Arrays.asList(resourceColumnNameList.split(",")); List otherKeyword = Arrays.asList(dataPsi.getOtherKeyword().split(",")); psiParam.setServerData(resourceId); + psiParam.setTeeData(teeResourceId); psiParam.setServerIndex(otherKeyword.stream().map(serverFields::indexOf).toArray(Integer[]::new)); psiParam.setOutputFullFilename(psiTask.getFilePath()); TaskParam taskParam = new TaskParam(); diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataPsiService.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataPsiService.java index 49d05d070..f6413560b 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataPsiService.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/DataPsiService.java @@ -1,5 +1,6 @@ package com.primihub.biz.service.data; +import com.alibaba.fastjson.JSONObject; import com.primihub.biz.config.base.OrganConfiguration; import com.primihub.biz.convert.DataPsiConvert; import com.primihub.biz.convert.DataResourceConvert; @@ -11,9 +12,13 @@ import com.primihub.biz.entity.data.vo.DataOrganPsiTaskVo; import com.primihub.biz.entity.data.vo.DataPsiTaskVo; import com.primihub.biz.entity.sys.po.SysLocalOrganInfo; +import com.primihub.biz.entity.sys.po.SysOrgan; import com.primihub.biz.repository.primarydb.data.DataPsiPrRepository; import com.primihub.biz.repository.secondarydb.data.DataPsiRepository; import com.primihub.biz.repository.secondarydb.data.DataResourceRepository; +import com.primihub.biz.repository.secondarydb.data.DataTaskRepository; +import com.primihub.biz.repository.secondarydb.sys.SysOrganSecondarydbRepository; +import com.primihub.biz.util.FileUtil; import com.primihub.biz.util.snowflake.SnowflakeId; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; @@ -34,6 +39,8 @@ public class DataPsiService { @Autowired private DataPsiPrRepository dataPsiPrRepository; @Autowired + private DataTaskRepository dataTaskRepository; + @Autowired private DataResourceService dataResourceService; @Autowired private OtherBusinessesService otherBusinessesService; @@ -41,6 +48,8 @@ public class DataPsiService { private DataAsyncService dataAsyncService; @Autowired private OrganConfiguration organConfiguration; + @Autowired + private SysOrganSecondarydbRepository sysOrganSecondarydbRepository; public BaseResultEntity getPsiResourceList(DataResourceReq req) { @@ -101,29 +110,31 @@ public BaseResultEntity saveDataPsi(DataPsiReq req, Long userId) { task.setAscriptionType(0); } if (dataPsi.getOutputContent()==0){ - task.setAscription("求交集"); + task.setAscription("交集"); }else { - task.setAscription("求差集"); + task.setAscription("差集"); } task.setCreateDate(new Date()); dataPsiPrRepository.saveDataPsiTask(task); - dataAsyncService.psiGrpcRun(task,dataPsi); + dataAsyncService.psiGrpcRun(task,dataPsi,req.getTaskName()); Map map = new HashMap<>(); map.put("dataPsi",dataPsi); map.put("dataPsiTask",DataPsiConvert.DataPsiTaskConvertVo(task)); return BaseResultEntity.success(map); } - public BaseResultEntity getPsiTaskList(PageReq req,String resultName) { - Map paramMap = new HashMap<>(); - paramMap.put("offset",req.getOffset()); - paramMap.put("pageSize",req.getPageSize()); - paramMap.put("resultName",resultName); - List dataPsiTaskVos = dataPsiRepository.selectPsiTaskPage(paramMap); + public BaseResultEntity getPsiTaskList(DataPsiQueryReq req) { + log.info(JSONObject.toJSONString(req)); + List dataPsiTaskVos = dataPsiRepository.selectPsiTaskPage(req); if (dataPsiTaskVos.size()==0){ return BaseResultEntity.success(new PageDataEntity(0,req.getPageSize(),req.getPageNo(),new ArrayList())); } - Long count = dataPsiRepository.selectPsiTaskPageCount(paramMap); + Set ids = dataPsiTaskVos.stream().map(DataPsiTaskVo::getOtherOrganId).collect(Collectors.toSet()); + Map organListMap = otherBusinessesService.getOrganListMap(new ArrayList<>(ids)); + for (DataPsiTaskVo dataPsiTaskVo : dataPsiTaskVos) { + dataPsiTaskVo.setOtherOrganName(organListMap.get(dataPsiTaskVo.getOtherOrganId()).getOrganName()); + } + Long count = dataPsiRepository.selectPsiTaskPageCount(req); return BaseResultEntity.success(new PageDataEntity(count.intValue(),req.getPageSize(),req.getPageNo(),dataPsiTaskVos)); } @@ -150,6 +161,10 @@ public BaseResultEntity getPsiTaskDetails(Long taskId) { if (dataPsi==null) { return BaseResultEntity.failure(BaseResultEnum.DATA_QUERY_NULL,"未查询到PSI信息"); } + DataTask dataTask = dataTaskRepository.selectDataTaskByTaskIdName(task.getTaskId()); + if (dataTask == null){ + return BaseResultEntity.failure(BaseResultEnum.DATA_QUERY_NULL,"未查询到任务详情"); + } DataResource dataResource = dataResourceRepository.queryDataResourceById(dataPsi.getOwnResourceId()); Map otherDataResource = null; if (dataPsi.getOtherOrganId().equals(organConfiguration.getSysLocalOrganId())){ @@ -163,8 +178,19 @@ public BaseResultEntity getPsiTaskDetails(Long taskId) { otherDataResource = (LinkedHashMap)baseResult.getResult(); } } + List> list = null; + if (StringUtils.isNotEmpty(task.getFilePath())){ + list = FileUtil.getCsvData(task.getFilePath(), 50); + } + String teeOrganName = ""; + if (StringUtils.isNotEmpty(dataPsi.getTeeOrganId())){ + SysOrgan sysOrgan = sysOrganSecondarydbRepository.selectSysOrganByOrganId(dataPsi.getTeeOrganId()); + if (sysOrgan!=null){ + teeOrganName = sysOrgan.getOrganName(); + } + } SysLocalOrganInfo sysLocalOrganInfo = organConfiguration.getSysLocalOrganInfo(); - return BaseResultEntity.success(DataPsiConvert.DataPsiConvertVo(task,dataPsi,dataResource,otherDataResource,sysLocalOrganInfo)); + return BaseResultEntity.success(DataPsiConvert.DataPsiConvertVo(task,dataPsi,dataResource,otherDataResource,sysLocalOrganInfo,dataTask,list,teeOrganName)); } public DataPsiTask selectPsiTaskById(Long taskId){ @@ -205,7 +231,7 @@ public BaseResultEntity retryPsiTask(Long taskId) { DataPsi dataPsi = dataPsiRepository.selectPsiById(task.getPsiId()); task.setTaskState(2); dataPsiPrRepository.updateDataPsiTask(task); - dataAsyncService.psiGrpcRun(task,dataPsi); + dataAsyncService.psiGrpcRun(task,dataPsi,null); return BaseResultEntity.success(); } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/PirService.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/PirService.java index 7f29800d8..371434799 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/PirService.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/PirService.java @@ -11,11 +11,14 @@ import com.primihub.biz.entity.data.po.DataPirTask; import com.primihub.biz.entity.data.po.DataTask; import com.primihub.biz.entity.data.req.DataPirTaskReq; +import com.primihub.biz.entity.data.vo.DataPirTaskDetailVo; import com.primihub.biz.entity.data.vo.DataPirTaskVo; import com.primihub.biz.repository.primarydb.data.DataTaskPrRepository; import com.primihub.biz.repository.secondarydb.data.DataTaskRepository; +import com.primihub.biz.util.FileUtil; import com.primihub.biz.util.snowflake.SnowflakeId; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -40,7 +43,7 @@ public class PirService { public String getResultFilePath(String taskId,String taskDate){ return new StringBuilder().append(baseConfiguration.getResultUrlDirPrefix()).append(taskDate).append("/").append(taskId).append(".csv").toString(); } - public BaseResultEntity pirSubmitTask(String resourceId, String pirParam) { + public BaseResultEntity pirSubmitTask(String resourceId, String pirParam,String taskName) { BaseResultEntity dataResource = otherBusinessesService.getDataResource(resourceId); if (dataResource.getCode()!=0) { return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"资源查询失败"); @@ -53,7 +56,7 @@ public BaseResultEntity pirSubmitTask(String resourceId, String pirParam) { DataTask dataTask = new DataTask(); // dataTask.setTaskIdName(UUID.randomUUID().toString()); dataTask.setTaskIdName(Long.toString(SnowflakeId.getInstance().nextId())); - dataTask.setTaskName(pirDataResource.get("resourceName").toString()); + dataTask.setTaskName(taskName); dataTask.setTaskState(TaskStateEnum.IN_OPERATION.getStateType()); dataTask.setTaskType(TaskTypeEnum.PIR.getTaskType()); dataTask.setTaskStartTime(System.currentTimeMillis()); @@ -93,4 +96,34 @@ public BaseResultEntity getPirTaskList(DataPirTaskReq req) { } return BaseResultEntity.success(new PageDataEntity(tolal,req.getPageSize(),req.getPageNo(),dataPirTaskVos)); } + + public BaseResultEntity getPirTaskDetail(Long taskId) { + DataPirTask task = dataTaskRepository.selectPirTaskById(taskId); + if (task==null) { + return BaseResultEntity.failure(BaseResultEnum.DATA_QUERY_NULL,"未查询到任务信息"); + } + DataTask dataTask = dataTaskRepository.selectDataTaskByTaskId(taskId); + if (dataTask==null) { + return BaseResultEntity.failure(BaseResultEnum.DATA_QUERY_NULL,"未查询到任务详情"); + } + DataPirTaskDetailVo vo = new DataPirTaskDetailVo(); + List> list = null; + if (StringUtils.isNotEmpty(dataTask.getTaskResultPath())){ + vo.setList(FileUtil.getCsvData(dataTask.getTaskResultPath(), 50)); + } + vo.setTaskName(dataTask.getTaskName()); + vo.setTaskIdName(dataTask.getTaskIdName()); + vo.setTaskState(dataTask.getTaskState()); + vo.setOrganName(task.getProviderOrganName()); + vo.setResourceName(task.getResourceName()); + vo.setResourceId(task.getResourceId()); + vo.setRetrievalId(task.getRetrievalId()); + vo.setTaskError(dataTask.getTaskErrorMsg()); + vo.setCreateDate(dataTask.getCreateDate()); + vo.setTaskStartTime(dataTask.getTaskStartTime()); + vo.setTaskEndTime(dataTask.getTaskEndTime()); + return BaseResultEntity.success(vo); + } + + } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/DataSetComponentTaskServiceImpl.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/DataSetComponentTaskServiceImpl.java index 8a7ba817f..6f80d466f 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/DataSetComponentTaskServiceImpl.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/DataSetComponentTaskServiceImpl.java @@ -94,6 +94,7 @@ public BaseResultEntity check(DataComponentReq req, ComponentTaskReq taskReq) { } } }catch (Exception e){ + e.printStackTrace(); log.info("modelId:{} Failed to convert JSON :{}",taskReq.getDataModel().getModelId(),e.getMessage()); return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"模型选择资源转换失败"); } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/ExceptionComponentTaskServiceImpl.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/ExceptionComponentTaskServiceImpl.java index 7402d7eeb..503607689 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/ExceptionComponentTaskServiceImpl.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/ExceptionComponentTaskServiceImpl.java @@ -3,6 +3,7 @@ import com.alibaba.fastjson.JSONObject; import com.primihub.biz.config.base.BaseConfiguration; import com.primihub.biz.config.base.ComponentsConfiguration; +import com.primihub.biz.constant.DataConstant; import com.primihub.biz.entity.base.BaseResultEntity; import com.primihub.biz.entity.base.BaseResultEnum; import com.primihub.biz.entity.data.dataenum.TaskStateEnum; @@ -53,6 +54,7 @@ public BaseResultEntity runTask(DataComponentReq req, ComponentTaskReq taskReq) log.info("ids:{}", ids); String path = baseConfiguration.getRunModelFileUrlDirPrefix()+taskReq.getDataTask().getTaskIdName() + File.separator + "exception"; Map exceptionEntityMap = getGrpcComponentDataSetMap(taskReq.getFusionResourceList(),path); + exceptionEntityMap.remove(taskReq.getFreemarkerMap().get(DataConstant.PYTHON_ARBITER_DATASET)); log.info("exceptionEntityMap-1:{}",JSONObject.toJSONString(exceptionEntityMap)); if (newest!=null && newest.size()!=0){ ids = new ArrayList<>(); diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/FitTransformComponentTaskServiceImpl.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/FitTransformComponentTaskServiceImpl.java new file mode 100644 index 000000000..39d3a5989 --- /dev/null +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/FitTransformComponentTaskServiceImpl.java @@ -0,0 +1,147 @@ +package com.primihub.biz.service.data.component.impl; + +import com.alibaba.fastjson.JSONObject; +import com.primihub.biz.config.base.BaseConfiguration; +import com.primihub.biz.config.base.ComponentsConfiguration; +import com.primihub.biz.constant.DataConstant; +import com.primihub.biz.entity.base.BaseResultEntity; +import com.primihub.biz.entity.base.BaseResultEnum; +import com.primihub.biz.entity.data.dataenum.TaskStateEnum; +import com.primihub.biz.entity.data.dto.GrpcComponentDto; +import com.primihub.biz.entity.data.dto.ModelDerivationDto; +import com.primihub.biz.entity.data.po.DataModelResource; +import com.primihub.biz.entity.data.req.ComponentTaskReq; +import com.primihub.biz.entity.data.req.DataComponentReq; +import com.primihub.biz.repository.primarydb.data.DataModelPrRepository; +import com.primihub.biz.service.data.DataResourceService; +import com.primihub.biz.service.data.component.ComponentTaskService; +import com.primihub.biz.service.feign.FusionResourceService; +import com.primihub.sdk.task.TaskHelper; +import com.primihub.sdk.task.dataenum.ModelTypeEnum; +import com.primihub.sdk.task.param.TaskComponentParam; +import com.primihub.sdk.task.param.TaskParam; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.io.File; +import java.util.*; +import java.util.stream.Collectors; + +@Service("fitTransformComponentTaskServiceImpl") +@Slf4j +public class FitTransformComponentTaskServiceImpl extends BaseComponentServiceImpl implements ComponentTaskService { + @Autowired + private ComponentsConfiguration componentsConfiguration; + @Autowired + private FusionResourceService fusionResourceService; + @Autowired + private BaseConfiguration baseConfiguration; + @Autowired + private TaskHelper taskHelper; + @Autowired + private DataResourceService dataResourceService; + @Autowired + private DataModelPrRepository dataModelPrRepository; + @Override + public BaseResultEntity check(DataComponentReq req, ComponentTaskReq taskReq) { + return componentTypeVerification(req,componentsConfiguration.getModelComponents(),taskReq); + } + + @Override + public BaseResultEntity runTask(DataComponentReq req, ComponentTaskReq taskReq) { + taskReq.getFreemarkerMap().putAll(getComponentVals(req.getComponentValues())); + List ids = taskReq.getFusionResourceList().stream().map(data -> data.get("resourceId").toString()).collect(Collectors.toList()); + List newest = taskReq.getNewest(); + log.info("ids:{}", ids); + String path = baseConfiguration.getRunModelFileUrlDirPrefix()+taskReq.getDataTask().getTaskIdName() + File.separator + "fitTransform"; + Map fitTransformEntityMap = getGrpcComponentDataSetMap(taskReq.getFusionResourceList(),path); + fitTransformEntityMap.remove(taskReq.getFreemarkerMap().get(DataConstant.PYTHON_ARBITER_DATASET)); + log.info("fitTransform-1:{}", JSONObject.toJSONString(fitTransformEntityMap)); + if (newest!=null && newest.size()!=0){ + ids = new ArrayList<>(); + for (ModelDerivationDto modelDerivationDto : newest) { + ids.add(modelDerivationDto.getNewResourceId()); + fitTransformEntityMap.put(modelDerivationDto.getNewResourceId(),fitTransformEntityMap.get(modelDerivationDto.getOriginalResourceId())); + fitTransformEntityMap.remove(modelDerivationDto.getOriginalResourceId()); + } + log.info("newids:{}", ids); + } + try { + GrpcComponentDto labelDatasetDto = fitTransformEntityMap.get(taskReq.getFreemarkerMap().get(DataConstant.PYTHON_LABEL_DATASET)); + taskReq.getFreemarkerMap().put("new_"+DataConstant.PYTHON_LABEL_DATASET,labelDatasetDto.getNewDataSetId()); + taskReq.getFreemarkerMap().put("new_"+DataConstant.PYTHON_LABEL_DATASET+"_path",labelDatasetDto.getOutputFilePath()); + GrpcComponentDto guestDatasetDto = fitTransformEntityMap.get(taskReq.getFreemarkerMap().get(DataConstant.PYTHON_GUEST_DATASET)); + taskReq.getFreemarkerMap().put("new_"+DataConstant.PYTHON_GUEST_DATASET,guestDatasetDto.getNewDataSetId()); + taskReq.getFreemarkerMap().put("new_"+DataConstant.PYTHON_GUEST_DATASET+"_path",guestDatasetDto.getOutputFilePath()); + TaskParam taskParam = new TaskParam<>(new TaskComponentParam()); + taskParam.setTaskId(taskReq.getDataTask().getTaskIdName()); + taskParam.setJobId(String.valueOf(taskReq.getJob())); + taskParam.getTaskContentParam().setFitTransform(true); + taskParam.getTaskContentParam().setModelType(ModelTypeEnum.MODEL_TYPE_MAP.get(taskReq.getDataModel().getModelType())); + taskParam.getTaskContentParam().setFreemarkerMap(taskReq.getFreemarkerMap()); + taskHelper.submit(taskParam); + if(!taskParam.getSuccess()){ + taskReq.getDataTask().setTaskState(TaskStateEnum.FAIL.getStateType()); + taskReq.getDataTask().setTaskErrorMsg(req.getComponentName()+"组件处理失败:"+taskParam.getError()); + }else { + List derivationList = new ArrayList<>(); + log.info("fitTransform-3:{}",JSONObject.toJSONString(fitTransformEntityMap)); + Iterator keyi = fitTransformEntityMap.keySet().iterator(); + while (keyi.hasNext()){ + String key = keyi.next(); + GrpcComponentDto value = fitTransformEntityMap.get(key); + if (value==null) { + continue; + } + log.info("value:{}",JSONObject.toJSONString(value)); + derivationList.add(new ModelDerivationDto(key,"fitTransform","缺失值填充",value.getNewDataSetId(),value.getOutputFilePath(),value.getDataSetId())); + log.info("derivationList:{}",JSONObject.toJSONString(derivationList)); + } + taskReq.getDerivationList().addAll(derivationList); + taskReq.setNewest(derivationList); + // derivation resource datas + log.info(JSONObject.toJSONString(taskReq.getDerivationList())); + BaseResultEntity derivationResource = dataResourceService.saveDerivationResource(derivationList, taskReq.getDataTask().getTaskUserId()); + log.info(JSONObject.toJSONString(derivationResource)); + if (!derivationResource.getCode().equals(BaseResultEnum.SUCCESS.getReturnCode())){ + taskReq.getDataTask().setTaskState(TaskStateEnum.FAIL.getStateType()); + taskReq.getDataTask().setTaskErrorMsg(req.getComponentName()+"组件处理失败:"+derivationResource.getMsg()); + }else { + List resourceIdLst = (List)derivationResource.getResult(); + for (String resourceId : resourceIdLst) { + DataModelResource dataModelResource = new DataModelResource(taskReq.getDataModel().getModelId()); + dataModelResource.setTaskId(taskReq.getDataTask().getTaskId()); + dataModelResource.setResourceId(resourceId); + dataModelResource.setTakePartType(1); + dataModelPrRepository.saveDataModelResource(dataModelResource); + taskReq.getDmrList().add(dataModelResource); + } + } + } + HashSet dids = new HashSet(){{ + add(labelDatasetDto.getNewDataSetId()); + add(guestDatasetDto.getNewDataSetId()); + }}; + while (true){ + // 休眠一秒等待数据集同步 + BaseResultEntity dataSets = fusionResourceService.getDataSets(dids); + log.info(JSONObject.toJSONString(dataSets)); + if (dataSets.getCode().equals(BaseResultEnum.SUCCESS.getReturnCode())){ + List objectList = (List) dataSets.getResult(); + if (objectList.size() == dids.size()){ + break; + } + } + Thread.sleep(100L); + } + + }catch (Exception e){ + e.printStackTrace(); + taskReq.getDataTask().setTaskState(TaskStateEnum.FAIL.getStateType()); + taskReq.getDataTask().setTaskErrorMsg("缺失值填充错误:"+e.getMessage()); + } + + return BaseResultEntity.success(); + } +} diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/JointStatisticalComponentTaskServiceImpl.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/JointStatisticalComponentTaskServiceImpl.java index 19606ce98..810aa1e35 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/JointStatisticalComponentTaskServiceImpl.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/data/component/impl/JointStatisticalComponentTaskServiceImpl.java @@ -6,6 +6,7 @@ import com.primihub.biz.config.base.BaseConfiguration; import com.primihub.biz.config.base.ComponentsConfiguration; import com.primihub.biz.config.base.OrganConfiguration; +import com.primihub.biz.constant.DataConstant; import com.primihub.biz.entity.base.BaseResultEntity; import com.primihub.biz.entity.data.dataenum.TaskStateEnum; import com.primihub.biz.entity.data.dto.GrpcComponentDto; @@ -61,6 +62,7 @@ public BaseResultEntity runTask(DataComponentReq req, ComponentTaskReq taskReq) log.info("ids:{}", ids); String path = baseConfiguration.getRunModelFileUrlDirPrefix()+taskReq.getDataTask().getTaskIdName() + File.separator + "mpc"; Map jointStatisticalMap = getGrpcComponentDataSetMap(taskReq.getFusionResourceList(),path); + jointStatisticalMap.remove(taskReq.getFreemarkerMap().get(DataConstant.PYTHON_ARBITER_DATASET)); Map newsetidMap = new HashMap<>(); log.info("jointStatisticalMap-1:{}", JSONObject.toJSONString(jointStatisticalMap)); if (newest!=null && newest.size()!=0){ diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/feign/FusionResourceService.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/feign/FusionResourceService.java index 2328618a2..da4ffbbe6 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/feign/FusionResourceService.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/feign/FusionResourceService.java @@ -47,6 +47,9 @@ public interface FusionResourceService { @RequestMapping("/fusionResource/getTestDataSet") BaseResultEntity getTestDataSet(@RequestParam("id")String id); + @RequestMapping("/fusionResource/getDataSets") + BaseResultEntity getDataSets(@RequestBody Set id); + @PostMapping("/fusionResource/batchSaveTestDataSet") BaseResultEntity batchSaveTestDataSet(@RequestBody List dataSets); } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/schedule/ScheduleService.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/schedule/ScheduleService.java index 9b090c968..5293b1f4b 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/schedule/ScheduleService.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/schedule/ScheduleService.java @@ -2,23 +2,41 @@ import com.primihub.biz.constant.RedisKeyConstant; import com.primihub.biz.entity.base.BaseResultEntity; +import com.primihub.biz.entity.base.BaseResultEnum; import com.primihub.biz.entity.data.po.DataFusionCopyTask; +import com.primihub.biz.entity.sys.po.SysOrgan; +import com.primihub.biz.repository.primarydb.sys.SysOrganPrimarydbRepository; import com.primihub.biz.repository.primaryredis.sys.SysCommonPrimaryRedisRepository; +import com.primihub.biz.repository.secondarydb.sys.SysOrganSecondarydbRepository; import com.primihub.biz.service.data.DataCopyService; +import com.primihub.biz.service.data.OtherBusinessesService; +import com.primihub.biz.service.sys.SysAsyncService; import com.primihub.biz.util.crypt.DateUtil; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.util.*; import java.util.concurrent.TimeUnit; @Service +@Slf4j public class ScheduleService { @Autowired private SysCommonPrimaryRedisRepository sysCommonPrimaryRedisRepository; @Autowired + private SysOrganSecondarydbRepository sysOrganSecondarydbRepository; + @Autowired + private SysOrganPrimarydbRepository sysOrganPrimarydbRepository; + @Autowired private DataCopyService dataCopyService; + @Autowired + private SysAsyncService sysAsyncService; + @Autowired + private OtherBusinessesService otherBusinessesService; + public BaseResultEntity recallNotFinishedTask(){ Date date=new Date(); @@ -39,4 +57,39 @@ public BaseResultEntity recallNotFinishedTask(){ } return BaseResultEntity.success(result); } + + /** + * 定时处理节点业务 10分钟一次 + */ + @Scheduled(cron="0 0/10 * * * ? ") + private void nodeOperations(){ + log.info("定时处理节点业务"); + // 上报节点状态 + sysAsyncService.collectBaseData(); + List sysOrgans = sysOrganSecondarydbRepository.selectSysOrganByExamine(); + long time = System.currentTimeMillis(); + String data = String.format("{'time':%s}", time); + for (SysOrgan sysOrgan : sysOrgans) { + try { + BaseResultEntity baseResultEntity = otherBusinessesService.syncGatewayApiData(data, sysOrgan.getOrganGateway() + "/share/shareData/healthConnection", sysOrgan.getPublicKey()); + if (baseResultEntity==null || !baseResultEntity.getCode().equals(BaseResultEnum.SUCCESS.getReturnCode())){ + Set services = (Set) baseResultEntity.getResult(); + sysOrgan.setPlatformState(services.contains("platform")?1:0); + sysOrgan.setNodeState(services.contains("node")?1:0); + sysOrgan.setFusionState(services.contains("fusion")?1:0); + }else { + sysOrgan.setPlatformState(0); + sysOrgan.setNodeState(0); + sysOrgan.setFusionState(0); + } + }catch (Exception e){ + sysOrgan.setPlatformState(0); + sysOrgan.setNodeState(0); + sysOrgan.setFusionState(0); + log.info("机构ID:{} - 机构名称:{} - 机构网关地址:{} - 状态获取失败",sysOrgan.getOrganId(),sysOrgan.getOrganName(),sysOrgan.getOrganGateway()); + e.printStackTrace(); + } + sysOrganPrimarydbRepository.updateSysOrgan(sysOrgan); + } + } } diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/share/ShareService.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/share/ShareService.java new file mode 100644 index 000000000..4c910c159 --- /dev/null +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/share/ShareService.java @@ -0,0 +1,48 @@ +package com.primihub.biz.service.share; + +import com.primihub.biz.entity.base.BaseResultEnum; +import com.primihub.biz.service.feign.FusionOrganService; +import com.primihub.sdk.task.TaskHelper; +import com.primihub.sdk.task.param.TaskPSIParam; +import com.primihub.sdk.task.param.TaskParam; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.HashSet; +import java.util.Set; + +@Service +public class ShareService { + + @Autowired + private TaskHelper taskHelper; + @Autowired + private FusionOrganService fusionOrganService; + + + public Set getServiceState(){ + Set service = new HashSet<>(); + service.add("platform"); + try { + TaskParam taskParam = new TaskParam(new TaskPSIParam()); + taskParam.setTaskId("0"); + taskParam.setJobId("0"); + taskParam.setRequestId("0"); + taskParam.setPartyCount(0); + taskHelper.continuouslyObtainTaskStatus(taskParam); + if (taskParam.getSuccess()){ + service.add("node"); + } + }catch (Exception e){ + e.printStackTrace(); + } + try { + if(fusionOrganService.healthConnection().getCode().equals(BaseResultEnum.SUCCESS.getReturnCode())){ + service.add("fusion"); + } + }catch (Exception e){ + e.printStackTrace(); + } + return service; + } +} diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/sys/SysAsyncService.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/sys/SysAsyncService.java index f8625163e..4fcc0edbd 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/sys/SysAsyncService.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/sys/SysAsyncService.java @@ -63,6 +63,8 @@ public void collectBaseData() { map.put("lon", new ArrayList() {{add(sysLocalOrganInfo.getAddressInfo().getLon());}}); map.put("city", new ArrayList() {{add(sysLocalOrganInfo.getAddressInfo().getCity());}}); map.put("region", new ArrayList() {{add(sysLocalOrganInfo.getAddressInfo().getRegion());}}); + map.put("globalGateway", new ArrayList() {{add(sysLocalOrganInfo.getGatewayAddress());}}); + map.put("publicKey", new ArrayList() {{add(sysLocalOrganInfo.getPublicKey());}}); HttpEntity> request = new HttpEntity(map, headers); BaseResultEntity resultEntity = restTemplate.postForObject(SysConstant.SYS_COLLECT_URL, request, BaseResultEntity.class); log.info(JSONObject.toJSONString(resultEntity)); diff --git a/primihub-service/biz/src/main/java/com/primihub/biz/service/sys/SysOrganService.java b/primihub-service/biz/src/main/java/com/primihub/biz/service/sys/SysOrganService.java index 7632fb8e7..4873400d0 100644 --- a/primihub-service/biz/src/main/java/com/primihub/biz/service/sys/SysOrganService.java +++ b/primihub-service/biz/src/main/java/com/primihub/biz/service/sys/SysOrganService.java @@ -240,6 +240,9 @@ public BaseResultEntity applyForJoinNode(Map info) { log.info(JSONObject.toJSONString(info)); // SysOrgan sysOrgan = sysOrganSecondarydbRepository.selectSysOrganByApplyId(info.get("applyId").toString()); SysOrgan sysOrgan = sysOrganSecondarydbRepository.selectSysOrganByOrganId(info.get("organId").toString()); + if (sysOrgan.getOrganId().equals(info.get("organId").toString())){ + return BaseResultEntity.success(); + } if (sysOrgan==null){ sysOrgan = new SysOrgan(); sysOrgan.setApplyId(info.get("applyId").toString()); diff --git a/primihub-service/biz/src/main/resources/mybatis/mapper/primarydb/data/DataPsiPrRepositoryMapper.xml b/primihub-service/biz/src/main/resources/mybatis/mapper/primarydb/data/DataPsiPrRepositoryMapper.xml index b6f9853e3..44c44d47a 100644 --- a/primihub-service/biz/src/main/resources/mybatis/mapper/primarydb/data/DataPsiPrRepositoryMapper.xml +++ b/primihub-service/biz/src/main/resources/mybatis/mapper/primarydb/data/DataPsiPrRepositoryMapper.xml @@ -2,8 +2,8 @@ - insert into data_psi (own_organ_id,own_resource_id,own_keyword,other_organ_id,other_resource_id,other_keyword,output_file_path_type,output_no_repeat,tag,result_name,output_content,output_format,result_organ_ids,remarks,user_id,is_del,create_date,update_date) - values (#{ownOrganId},#{ownResourceId},#{ownKeyword},#{otherOrganId},#{otherResourceId},#{otherKeyword},#{outputFilePathType},${outputNoRepeat},#{tag},#{resultName},#{outputContent},#{outputFormat},#{resultOrganIds},#{remarks},#{userId},0,now(),now()) + insert into data_psi (own_organ_id,own_resource_id,own_keyword,other_organ_id,other_resource_id,other_keyword,output_file_path_type,output_no_repeat,tag,result_name,output_content,output_format,result_organ_ids,tee_organ_id,remarks,user_id,is_del,create_date,update_date) + values (#{ownOrganId},#{ownResourceId},#{ownKeyword},#{otherOrganId},#{otherResourceId},#{otherKeyword},#{outputFilePathType},${outputNoRepeat},#{tag},#{resultName},#{outputContent},#{outputFormat},#{resultOrganIds},#{teeOrganId},#{remarks},#{userId},0,now(),now()) insert into data_psi_task (psi_id,task_id,task_state,ascription,ascription_type,file_rows,file_path,is_del,create_date,update_date) diff --git a/primihub-service/biz/src/main/resources/mybatis/mapper/secondarydb/data/DataPsiRepositoryMapper.xml b/primihub-service/biz/src/main/resources/mybatis/mapper/secondarydb/data/DataPsiRepositoryMapper.xml index a086e8e72..e55d1f942 100644 --- a/primihub-service/biz/src/main/resources/mybatis/mapper/secondarydb/data/DataPsiRepositoryMapper.xml +++ b/primihub-service/biz/src/main/resources/mybatis/mapper/secondarydb/data/DataPsiRepositoryMapper.xml @@ -2,12 +2,15 @@ - SELECT dp.id AS dataPsiId, dp.result_name AS resultName, + dp.other_organ_id as otherOrganId, + dp.tag as psiTag, dpt.id as taskId, dpt.task_id as taskIdName, + dt.task_name as taskName, dt.task_start_time as taskStart, dt.task_end_time as taskEnd, dpt.task_state as taskState, @@ -20,20 +23,31 @@ WHERE dp.id = dpt.psi_id and dpt.task_id = dt.task_id_name - and dp.result_name like CONCAT('%',#{resultName},'%') + ORDER BY dp.id desc LIMIT #{offset},#{pageSize} - SELECT count(dp.id) FROM data_psi dp, - data_psi_task dpt + data_psi_task dpt, + data_task dt WHERE dp.id = dpt.psi_id - and dp.result_name like CONCAT('%',#{resultName},'%') + and dpt.task_id = dt.task_id_name + + + + and dp.result_name like CONCAT('%',#{resultName},'%') + and dp.other_organ_id = #{organId} + and dt.task_name like CONCAT('%',#{taskName},'%') + and dpt.task_state = #{taskState} + and dpt.create_date >=#{startDate} + and dpt.create_date <=#{endDate} + + SELECT + id,task_id as taskId,provider_organ_name as providerOrganName,resource_id as resourceId,resource_name as resourceName,retrieval_id as retrievalId,create_date as createDate + FROM + data_pir_task + WHERE + task_id = #{id} + \ No newline at end of file diff --git a/primihub-service/gateway/src/main/java/com/primihub/gateway/GatewayApplication.java b/primihub-service/gateway/src/main/java/com/primihub/gateway/GatewayApplication.java index 69780fc20..baa618f46 100644 --- a/primihub-service/gateway/src/main/java/com/primihub/gateway/GatewayApplication.java +++ b/primihub-service/gateway/src/main/java/com/primihub/gateway/GatewayApplication.java @@ -36,7 +36,7 @@ SysAsyncService.class, GrpcConfiguration.class }), - @ComponentScan.Filter(type = FilterType.REGEX, pattern = {"com.primihub.biz.service.data.*","com.primihub.biz.service.schedule.*","com.primihub.biz.service.test.*","com.primihub.biz.config.captcha.*"}) + @ComponentScan.Filter(type = FilterType.REGEX, pattern = {"com.primihub.biz.service.data.*","com.primihub.biz.service.schedule.*","com.primihub.biz.service.share.*","com.primihub.biz.service.test.*","com.primihub.biz.config.captcha.*"}) } ) public class GatewayApplication { diff --git a/primihub-service/script/ddl.sql b/primihub-service/script/ddl.sql index 84220b20a..b78464a3f 100644 --- a/primihub-service/script/ddl.sql +++ b/primihub-service/script/ddl.sql @@ -193,6 +193,7 @@ CREATE TABLE `data_psi` ( `output_content` int(11) DEFAULT '0' COMMENT '输出内容 默认0 0交集 1差集', `output_format` varchar(255) DEFAULT NULL COMMENT '输出格式', `result_organ_ids` varchar(255) DEFAULT NULL COMMENT '结果获取方 多机构","号间隔', + `tee_organ_id` varchar(255) DEFAULT NULL COMMENT 'tee 机构ID', `remarks` varchar(255) DEFAULT NULL COMMENT '备注', `user_id` bigint(20) DEFAULT NULL COMMENT '用户id', `is_del` tinyint(4) DEFAULT '0' COMMENT '是否删除',