Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAG Visualizer #25

Merged
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
314206a
feat: Render DAG visualization for runs from context button
Christopher-R-Perkins Jul 3, 2024
b19f48f
fix: prevent panel from opening multiple times, if button clicked mul…
ErikWiens Jul 5, 2024
d0d2308
feat: dag webview changed from pure svg to svg+html
Christopher-R-Perkins Jul 5, 2024
4dfbbe2
feat: changed nodes on dag to foreign objects instead of overlay
Christopher-R-Perkins Jul 6, 2024
05d0e38
feat: Dag now has orthagonal edges
Christopher-R-Perkins Jul 8, 2024
a38b967
feat: added pan and zoom to dag visualization
Christopher-R-Perkins Jul 8, 2024
9693a28
fix: prevent ignoring zenml config change due to debounce
Christopher-R-Perkins Jul 9, 2024
1e858e8
feat: add progress indicator for image loading in WebView panel
ErikWiens Jul 9, 2024
61702d4
feat: Added error notifcation for failed DAG data retrieval
Christopher-R-Perkins Jul 9, 2024
e124a88
feat: Dag hover node -> highlight edges
Christopher-R-Perkins Jul 9, 2024
40312b7
chore: Moved Dag Renderer to its own class.
Christopher-R-Perkins Jul 9, 2024
ec5dadf
feat: Added colored icons to DAG view
Christopher-R-Perkins Jul 9, 2024
83323a3
feat: Added ability to retrieve single pipeline run from ZenML client
Christopher-R-Perkins Jul 10, 2024
e2c4209
feat: Added dashboard url to server info
Christopher-R-Perkins Jul 10, 2024
974abe5
feat: Added ability to pull steps and artifacts via ZenML client
Christopher-R-Perkins Jul 10, 2024
ee8cf49
feat: Added ability to get DAG data from ZenML client
Christopher-R-Perkins Jul 10, 2024
ba6c307
feat: add update button to DAG panel if DAG run still running or init…
ErikWiens Jul 11, 2024
a17751e
feat: Changed update bar to title/update bar
Christopher-R-Perkins Jul 11, 2024
376b634
feat: Dag Node onclick goes to dashboard
Christopher-R-Perkins Jul 11, 2024
ee3173f
feat: Added ZenML Panel for data view with DAG step/artifact
Christopher-R-Perkins Jul 11, 2024
535d688
chore: Added Documentation To DAG view
Christopher-R-Perkins Jul 11, 2024
de7cd4c
fix: Pipeline Url now goes to new dashboard
Christopher-R-Perkins Jul 11, 2024
cf8d4f8
feat: Changed way node urls are accessed - now via panel or dbl click
Christopher-R-Perkins Jul 12, 2024
c46dfd2
feat: Sped up graph building by implementing own solution
Christopher-R-Perkins Jul 12, 2024
3ae30f1
feat: Packed webview js and updated security for it
Christopher-R-Perkins Jul 14, 2024
b9f8b03
feat: Added loading indicator when retrieving step/artifact in panel
Christopher-R-Perkins Jul 14, 2024
50989d9
feat: Updated readme with rundown of DAG visualization feature
Christopher-R-Perkins Jul 14, 2024
19dc93a
chore: removed unused imports
Christopher-R-Perkins Jul 14, 2024
38d77e9
chore: implemented code rabbit suggestions
Christopher-R-Perkins Jul 15, 2024
9c18b5c
chore: refactor DagRenderer.createView to fit code rabbit suggestions
Christopher-R-Perkins Jul 15, 2024
5594a86
chore: Moved messageHandler for webviewpanel into own method for read…
Christopher-R-Perkins Jul 15, 2024
e238f28
chore: Refactored createMessageHandler and added error logs as sugges…
Christopher-R-Perkins Jul 15, 2024
37007c9
Fix linting
strickvl Jul 16, 2024
d4d6a19
fix: Fixed initializing edges list as list, not dict in grapher
Christopher-R-Perkins Jul 16, 2024
68161cd
fix: updated context mock in activation test
Christopher-R-Perkins Jul 16, 2024
2760d9e
chore: Implemented Alex's notes from code review
Christopher-R-Perkins Jul 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ out/
build/
*.tsbuildinfo
.history/
dag-packed.js

# env
.env
Expand All @@ -38,4 +39,4 @@ build/
bundled/libs/
**/__pycache__
**/.pytest_cache
**/.vs
**/.vs
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The ZenML VSCode extension seamlessly integrates with [ZenML](https://github.com
## Features

- **Server, Stacks, and Pipeline Runs Views**: Interact directly with ML stacks, pipeline runs, and server configurations from the Activity Bar.
- **DAG Visualization for Pipeline Runs**: Explore Directed Acyclic Graphs for each pipeline view directly from command on the Activity Bar.
strickvl marked this conversation as resolved.
Show resolved Hide resolved
- **Python Tool Integration**: Utilizes a Language Server Protocol (LSP) server for real-time synchronization with the ZenML environment.
- **Real-Time Configuration Monitoring**: Leverages `watchdog` to dynamically update configurations, keeping the extension in sync with your ZenML setup.
- **Status Bar**: Display the current stack name and connection status. You can
Expand All @@ -27,9 +28,22 @@ this extension and your Python version needs to be 3.8 or greater.

- **Manage Server Connections**: Connect or disconnect from ZenML servers and refresh server status.
- **Stack Operations**: View stack details, rename, copy, or set active stacks directly from VSCode.
- **Pipeline Runs**: Monitor and manage pipeline runs, including deleting runs from the system.
- **Pipeline Runs**: Monitor and manage pipeline runs, including deleting runs from the system and rendering DAGs.
- **Environment Information**: Get detailed snapshots of the development environment, aiding troubleshooting.

### DAG Rendering
strickvl marked this conversation as resolved.
Show resolved Hide resolved

![DAG Rendering Example](resources/zenml-extension-dag.gif)

- **Directed Acyclic Graph rendering**
- click on the Render Dag context action(labeled 1 in above image) next to the pipeline run you want to render. This will render the DAG in the editor window.
strickvl marked this conversation as resolved.
Show resolved Hide resolved
- **Graph manuevering**
- Panning the graph can be done by clicking and dragging anywhere on the graph.
- Zooming can be controlled by the mousewheel, the control panel(labeled 2 in the above graph) or double-clicking anywhere there is not a node.
strickvl marked this conversation as resolved.
Show resolved Hide resolved
- Mousing over a node will highlight all edges being output by that node
- Clicking a node will display the data related to it in the ZenML panel view(labeled 3 in the above image)
strickvl marked this conversation as resolved.
Show resolved Hide resolved
- Double-clicking a node will open the dashboard in a web browser to either the pipeline run or the artifact version.

## Requirements

- **ZenML Installation:** ZenML needs to be installed in the local Python environment associated with the Python interpreter selected in the current VS Code workspace. This extension interacts directly with your ZenML environment, so ensuring that ZenML is installed and properly configured is essential.
Expand Down
24 changes: 24 additions & 0 deletions bundled/tool/lsp_zenml.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,27 @@ def fetch_pipeline_runs(wrapper_instance, args):
def delete_pipeline_run(wrapper_instance, args):
"""Deletes a specified ZenML pipeline run."""
return wrapper_instance.delete_pipeline_run(args)

@self.command(f"{TOOL_MODULE_NAME}.getPipelineRun")
@self.zenml_command(wrapper_name="pipeline_runs_wrapper")
def get_pipeline_run(wrapper_instance, args):
"""Gets a specified ZenML pipeline run."""
return wrapper_instance.get_pipeline_run(args)

@self.command(f"{TOOL_MODULE_NAME}.getPipelineRunStep")
@self.zenml_command(wrapper_name="pipeline_runs_wrapper")
def get_run_step(wrapper_instance, args):
"""Gets a specified ZenML pipeline run step."""
return wrapper_instance.get_run_step(args)

@self.command(f"{TOOL_MODULE_NAME}.getPipelineRunArtifact")
@self.zenml_command(wrapper_name="pipeline_runs_wrapper")
def get_run_artifact(wrapper_instance, args):
"""Gets a specified ZenML pipeline artifact"""
return wrapper_instance.get_run_artifact(args)

@self.command(f"{TOOL_MODULE_NAME}.getPipelineRunDag")
@self.zenml_command(wrapper_name="pipeline_runs_wrapper")
def get_run_dag(wrapper_instance, args):
"""Gets graph data for a specified ZenML pipeline run"""
return wrapper_instance.get_pipeline_run_graph(args)
18 changes: 13 additions & 5 deletions bundled/tool/zen_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ def __init__(self, lsp_server):
"always",
]

try:
with suppress_stdout_temporarily():
config_wrapper_instance = self.LSP_SERVER.zenml_client.config_wrapper
self.config_path = config_wrapper_instance.get_global_config_file_path()
except Exception as e:
self.log_error(f"Failed to retrieve global config file path: {e}")


def process_config_change(self, config_file_path: str):
"""Process the configuration file change."""
with suppress_stdout_temporarily():
Expand Down Expand Up @@ -88,20 +96,20 @@ def on_modified(self, event):
"""
Handles the modification event triggered when the global configuration file is changed.
"""
if event.src_path != self.config_path:
return

if self._timer is not None:
self._timer.cancel()

self._timer = Timer(self.debounce_interval, self.process_event, [event])
self._timer.start()

def process_event(self, event):
"""
Processes the event with a debounce mechanism.
"""
with suppress_stdout_temporarily():
config_wrapper_instance = self.LSP_SERVER.zenml_client.config_wrapper
config_file_path = config_wrapper_instance.get_global_config_file_path()
if event.src_path == str(config_file_path):
self.process_config_change(config_file_path)
self.process_config_change(event.src_path)

def watch_zenml_config_yaml(self):
"""
Expand Down
97 changes: 97 additions & 0 deletions bundled/tool/zenml_grapher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright (c) ZenML GmbH 2024. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""This module contains a tool to mimic LineageGraph output for pipeline runs"""

class Grapher:
"""Quick and dirty implementation of ZenML/LineageGraph to reduce number of api calls"""

def __init__(self, run):
self.run = run
self.nodes = []
self.artifacts = {}
self.edges = {}

def build_nodes_from_steps(self) -> None:
"""Builds internal node list from run steps"""
self.nodes = []
self.artifacts = {}

for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
self.nodes.append({
"id": str(step_data.id),
"type": "step",
"data": {
"execution_id": str(step_data.id),
"name": step,
"status": step_data.body.status,
},
})
self.add_artifacts_from_list(step_data.body.inputs)
self.add_artifacts_from_list(step_data.body.outputs)

strickvl marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +27 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method build_nodes_from_steps: Optimization and Redundancy Concern

Each call to build_nodes_from_steps resets self.nodes and self.artifacts, which could lead to inefficiencies if the method is called multiple times within the same context. Consider maintaining state across calls or restructuring how this method is invoked to prevent unnecessary reinitializations.

  def build_nodes_from_steps(self) -> None:
      """Builds internal node list from run steps"""
-     self.nodes = []
-     self.artifacts = {}
+     if not self.nodes and not self.artifacts:
+         self.nodes = []
+         self.artifacts = {}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def build_nodes_from_steps(self) -> None:
"""Builds internal node list from run steps"""
self.nodes = []
self.artifacts = {}
for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
self.nodes.append({
"id": str(step_data.id),
"type": "step",
"data": {
"execution_id": str(step_data.id),
"name": step,
"status": step_data.body.status,
},
})
self.add_artifacts_from_list(step_data.body.inputs)
self.add_artifacts_from_list(step_data.body.outputs)
def build_nodes_from_steps(self) -> None:
"""Builds internal node list from run steps"""
if not self.nodes and not self.artifacts:
self.nodes = []
self.artifacts = {}
for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
self.nodes.append({
"id": str(step_data.id),
"type": "step",
"data": {
"execution_id": str(step_data.id),
"name": step,
"status": step_data.body.status,
},
})
self.add_artifacts_from_list(step_data.body.inputs)
self.add_artifacts_from_list(step_data.body.outputs)


def add_artifacts_from_list(self, list) -> None:
strickvl marked this conversation as resolved.
Show resolved Hide resolved
"""Used to add unique artifacts to the internal nodes list by build_nodes_from_steps"""
for artifact in list:
id = str(list[artifact].body.artifact.id)
if id in self.artifacts:
continue

self.artifacts[id] = True

self.nodes.append({
"type": "artifact",
"id": id,
"data": {
"name": artifact,
"artifact_type": list[artifact].body.type,
"execution_id": str(list[artifact].id),
},
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve add_artifacts_from_list Method.

  1. The variable list is poorly named and does not convey its purpose. Consider renaming it to artifacts_list or similar.
  2. The method lacks type hints which would improve readability and maintainability.
  3. Using a dictionary for self.artifacts to check for duplicates is fine as discussed previously, but ensure this aligns with the overall design philosophy of the application.
-    def add_artifacts_from_list(self, list) -> None:
+    def add_artifacts_from_list(self, artifacts_list: Dict[str, Artifact]) -> None:
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def add_artifacts_from_list(self, list) -> None:
"""Used to add unique artifacts to the internal nodes list by build_nodes_from_steps"""
for artifact in list:
id = str(list[artifact].body.artifact.id)
if id in self.artifacts:
continue
self.artifacts[id] = True
self.nodes.append({
"type": "artifact",
"id": id,
"data": {
"name": artifact,
"artifact_type": list[artifact].body.type,
"execution_id": str(list[artifact].id),
},
})
def add_artifacts_from_list(self, artifacts_list: Dict[str, Artifact]) -> None:
"""Used to add unique artifacts to the internal nodes list by build_nodes_from_steps"""
for artifact in artifacts_list:
id = str(artifacts_list[artifact].body.artifact.id)
if id in self.artifacts:
continue
self.artifacts[id] = True
self.nodes.append({
"type": "artifact",
"id": id,
"data": {
"name": artifact,
"artifact_type": artifacts_list[artifact].body.type,
"execution_id": str(artifacts_list[artifact].id),
},
})



def build_edges_from_steps(self) -> None:
"""Builds internal edges list from run steps"""
self.edges = []

for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
step_id = str(step_data.id)

for artifact in step_data.body.inputs:
input_id = str(step_data.body.inputs[artifact].body.artifact.id)
self.add_edge(input_id, step_id)

for artifact in step_data.body.outputs:
output_id = str(step_data.body.outputs[artifact].body.artifact.id)
self.add_edge(step_id, output_id)

strickvl marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +67 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method build_edges_from_steps: Efficiency Concern

This method builds edges from steps but does not check for duplicates, which could lead to redundant data in self.edges. Consider implementing a check to prevent duplicate edges from being added, especially since this could impact performance and memory usage with large datasets.

  def add_edge(self, v: str, w: str) -> None:
      """Helper method to add an edge to the internal edges list"""
+     edge_id = f"{v}_{w}"
+     if any(edge['id'] == edge_id for edge in self.edges):
+         return
      self.edges.append({
          "id": edge_id,
          "source": v,
          "target": w,
      })
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def build_edges_from_steps(self) -> None:
"""Builds internal edges list from run steps"""
self.edges = []
for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
step_id = str(step_data.id)
for artifact in step_data.body.inputs:
input_id = str(step_data.body.inputs[artifact].body.artifact.id)
self.add_edge(input_id, step_id)
for artifact in step_data.body.outputs:
output_id = str(step_data.body.outputs[artifact].body.artifact.id)
self.add_edge(step_id, output_id)
def build_edges_from_steps(self) -> None:
"""Builds internal edges list from run steps"""
self.edges = []
for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
step_id = str(step_data.id)
for artifact in step_data.body.inputs:
input_id = str(step_data.body.inputs[artifact].body.artifact.id)
self.add_edge(input_id, step_id)
for artifact in step_data.body.outputs:
output_id = str(step_data.body.outputs[artifact].body.artifact.id)
self.add_edge(step_id, output_id)
def add_edge(self, v: str, w: str) -> None:
"""Helper method to add an edge to the internal edges list"""
edge_id = f"{v}_{w}"
if any(edge['id'] == edge_id for edge in self.edges):
return
self.edges.append({
"id": edge_id,
"source": v,
"target": w,
})


def add_edge(self, v, w) -> None:
strickvl marked this conversation as resolved.
Show resolved Hide resolved
"""Helper method to add an edge to the internal edges list"""
self.edges.append({
"id": f"{v}_{w}",
"source": v,
"target": w,
})
strickvl marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review add_edge Method.

The method adds edges to the internal list without checking for duplicates. Based on previous discussions, this is acceptable due to the nature of the data. However, adding type hints and a brief description in the method's docstring about the parameters v and w could enhance clarity.

-    def add_edge(self, v, w) -> None:
+    def add_edge(self, v: str, w: str) -> None:
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def add_edge(self, v, w) -> None:
"""Helper method to add an edge to the internal edges list"""
self.edges.append({
"id": f"{v}_{w}",
"source": v,
"target": w,
})
def add_edge(self, v: str, w: str) -> None:
"""Helper method to add an edge to the internal edges list"""
self.edges.append({
"id": f"{v}_{w}",
"source": v,
"target": w,
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in latest commit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Christopher-R-Perkins The changes to the add_edge method, including the type hints and docstring, have been successfully implemented in the latest commit. Thank you for making the updates.

Analysis chain

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Fetch the latest changes to the `add_edge` method in the `Grapher` class
ast-grep --lang python --pattern $'class Grapher {
  $$$
  def add_edge($_, $_) {
    $$$
  }
  $$$  
}'

Length of output: 450


Script:

#!/bin/bash
# Fetch the latest changes to the `add_edge` method in the `Grapher` class
ast-grep --lang python --pattern $'class Grapher {
  def add_edge(self, v: str, w: str) -> None:
    $$$
}'

Length of output: 602


Script:

#!/bin/bash
# Fetch the latest changes to the `add_edge` method in the `Grapher` class
ast-grep --lang python --pattern $'def add_edge(self, v: str, w: str) -> None:
    $$$
'

Length of output: 571


def to_dict(self) -> dict:
strickvl marked this conversation as resolved.
Show resolved Hide resolved
"""Returns dictionary containing graph data"""
return {
"nodes": self.nodes,
"edges": self.edges,
"status": self.run.body.status,
"name": self.run.body.pipeline.name,
"version": self.run.body.pipeline.body.version,
}
140 changes: 140 additions & 0 deletions bundled/tool/zenml_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import pathlib
from typing import Any
from zenml_grapher import Grapher


class GlobalConfigWrapper:
Expand Down Expand Up @@ -324,6 +325,145 @@ def delete_pipeline_run(self, args) -> dict:
return {"message": f"Pipeline run `{run_id}` deleted successfully."}
except self.ZenMLBaseException as e:
return {"error": f"Failed to delete pipeline run: {str(e)}"}

def get_pipeline_run(self, args) -> dict:
strickvl marked this conversation as resolved.
Show resolved Hide resolved
"""Gets a ZenML pipeline run.

Args:
args (list): List of arguments.
Returns:
dict: Dictionary containing the result of the operation.
"""
try:
run_id = args[0]
run = self.client.get_pipeline_run(run_id, hydrate=True)
run_data = {
"id": str(run.id),
"name": run.body.pipeline.name,
"status": run.body.status,
"version": run.body.pipeline.body.version,
"stackName": run.body.stack.name,
"startTime": (
run.metadata.start_time.isoformat() if run.metadata.start_time else None
),
"endTime": (
run.metadata.end_time.isoformat() if run.metadata.end_time else None
),
"os": run.metadata.client_environment.get("os", "Unknown OS"),
"osVersion": run.metadata.client_environment.get(
"os_version",
run.metadata.client_environment.get("mac_version", "Unknown Version"),
),
"pythonVersion": run.metadata.client_environment.get(
"python_version", "Unknown"
),
}

return run_data
except self.ZenMLBaseException as e:
return {"error": f"Failed to retrieve pipeline run: {str(e)}"}

def get_pipeline_run_graph(self, args) -> dict:
strickvl marked this conversation as resolved.
Show resolved Hide resolved
"""Gets a ZenML pipeline run step DAG.

Args:
args (list): List of arguments.
Returns:
dict: Dictionary containing the result of the operation.
"""
try:
run_id = args[0]
run = self.client.get_pipeline_run(run_id, hydrate=True)
graph = Grapher(run)
graph.build_nodes_from_steps()
graph.build_edges_from_steps()
return graph.to_dict()
except self.ZenMLBaseException as e:
return {"error": f"Failed to retrieve pipeline run graph: {str(e)}"}

def get_run_step(self, args) -> dict:
strickvl marked this conversation as resolved.
Show resolved Hide resolved
"""Gets a ZenML pipeline run step.

Args:
args (list): List of arguments.
Returns:
dict: Dictionary containing the result of the operation.
"""
try:
step_run_id = args[0]
step = self.client.get_run_step(step_run_id, hydrate=True)
run = self.client.get_pipeline_run(step.metadata.pipeline_run_id, hydrate=True)

step_data = {
"name": step.name,
"id": str(step.id),
"status": step.body.status,
"author": {
"fullName": step.body.user.body.full_name,
"email": step.body.user.name,
},
"startTime": (
step.metadata.start_time.isoformat() if step.metadata.start_time else None
),
"endTime": (
step.metadata.end_time.isoformat() if step.metadata.end_time else None
),
"duration": (
str(step.metadata.end_time - step.metadata.start_time) if step.metadata.end_time and step.metadata.start_time else None
),
"stackName": run.body.stack.name,
"orchestrator": {
"runId": str(run.metadata.orchestrator_run_id)
},
"pipeline": {
"name": run.body.pipeline.name,
"status": run.body.status,
"version": run.body.pipeline.body.version,
},
"cacheKey": step.metadata.cache_key,
"sourceCode": step.metadata.source_code,
"logsUri": step.metadata.logs.body.uri
}
return step_data
except self.ZenMLBaseException as e:
return {"error": f"Failed to retrieve pipeline run step: {str(e)}"}

def get_run_artifact(self, args) -> dict:
strickvl marked this conversation as resolved.
Show resolved Hide resolved
"""Gets a ZenML pipeline run artifact.

Args:
args (list): List of arguments.
Returns:
dict: Dictionary containing the result of the operation.
"""
try:
artifact_id = args[0]
artifact = self.client.get_artifact_version(artifact_id, hydrate=True)

metadata = {}
for key in artifact.metadata.run_metadata:
metadata[key] = artifact.metadata.run_metadata[key].body.value

artifact_data = {
"name": artifact.body.artifact.name,
"version": artifact.body.version,
"id": str(artifact.id),
"type": artifact.body.type,
"author": {
"fullName": artifact.body.user.body.full_name,
"email": artifact.body.user.name,
},
"updated": artifact.body.updated.isoformat(),
"data": {
"uri": artifact.body.uri,
"dataType": artifact.body.data_type.attribute,
},
"metadata": metadata,
}
return artifact_data

except self.ZenMLBaseException as e:
return {"error": f"Failed to retrieve pipeline run artifact: {str(e)}"}


class StacksWrapper:
Expand Down
Loading