From 8dff1f820b7a7646f0e664eb6aa8c1454912fb49 Mon Sep 17 00:00:00 2001 From: Honnix Date: Fri, 29 Sep 2023 09:00:43 +0200 Subject: [PATCH] Shutdown client properly (#252) * Shutdown client properly Signed-off-by: Hongxin Liang --- .../flyte/jflyte/ExecuteDynamicWorkflow.java | 79 ++++++++++--------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java index 6b0079846..21cb3338e 100644 --- a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java +++ b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java @@ -203,44 +203,47 @@ static DynamicJobSpec rewrite( Map taskTemplates, Map workflowTemplates) { - WorkflowNodeVisitor workflowNodeVisitor = - IdentifierRewrite.builder() - .domain(executionConfig.domain()) - .project(executionConfig.project()) - .version(executionConfig.version()) - .adminClient( - FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null)) - .build() - .visitor(); - - List rewrittenNodes = - spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList()); - - Map usedSubWorkflows = - ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates); - - Map usedTaskTemplates = - ProjectClosure.collectTasks(rewrittenNodes, taskTemplates); - - // FIXME one sub-workflow can use more sub-workflows, we should recursively collect used tasks - // and workflows - - Map rewrittenUsedSubWorkflows = - mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate); - - return spec.toBuilder() - .nodes(rewrittenNodes) - .subWorkflows( - ImmutableMap.builder() - .putAll(spec.subWorkflows()) - .putAll(rewrittenUsedSubWorkflows) - .build()) - .tasks( - ImmutableMap.builder() - .putAll(spec.tasks()) - .putAll(usedTaskTemplates) - .build()) - .build(); + try (FlyteAdminClient flyteAdminClient = + FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null)) { + + WorkflowNodeVisitor workflowNodeVisitor = + IdentifierRewrite.builder() + .domain(executionConfig.domain()) + .project(executionConfig.project()) + .version(executionConfig.version()) + .adminClient(flyteAdminClient) + .build() + .visitor(); + + List rewrittenNodes = + spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList()); + + Map usedSubWorkflows = + ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates); + + Map usedTaskTemplates = + ProjectClosure.collectTasks(rewrittenNodes, taskTemplates); + + // FIXME one sub-workflow can use more sub-workflows, we should recursively collect used tasks + // and workflows + + Map rewrittenUsedSubWorkflows = + mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate); + + return spec.toBuilder() + .nodes(rewrittenNodes) + .subWorkflows( + ImmutableMap.builder() + .putAll(spec.subWorkflows()) + .putAll(rewrittenUsedSubWorkflows) + .build()) + .tasks( + ImmutableMap.builder() + .putAll(spec.tasks()) + .putAll(usedTaskTemplates) + .build()) + .build(); + } } private static DynamicWorkflowTask getDynamicWorkflowTask(String name) {