Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAG Visualizer #25

Merged
Merged
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
314206a
feat: Render DAG visualization for runs from context button
Christopher-R-Perkins Jul 3, 2024
b19f48f
fix: prevent panel from opening multiple times, if button clicked mul…
ErikWiens Jul 5, 2024
d0d2308
feat: dag webview changed from pure svg to svg+html
Christopher-R-Perkins Jul 5, 2024
4dfbbe2
feat: changed nodes on dag to foreign objects instead of overlay
Christopher-R-Perkins Jul 6, 2024
05d0e38
feat: Dag now has orthagonal edges
Christopher-R-Perkins Jul 8, 2024
a38b967
feat: added pan and zoom to dag visualization
Christopher-R-Perkins Jul 8, 2024
9693a28
fix: prevent ignoring zenml config change due to debounce
Christopher-R-Perkins Jul 9, 2024
1e858e8
feat: add progress indicator for image loading in WebView panel
ErikWiens Jul 9, 2024
61702d4
feat: Added error notifcation for failed DAG data retrieval
Christopher-R-Perkins Jul 9, 2024
e124a88
feat: Dag hover node -> highlight edges
Christopher-R-Perkins Jul 9, 2024
40312b7
chore: Moved Dag Renderer to its own class.
Christopher-R-Perkins Jul 9, 2024
ec5dadf
feat: Added colored icons to DAG view
Christopher-R-Perkins Jul 9, 2024
83323a3
feat: Added ability to retrieve single pipeline run from ZenML client
Christopher-R-Perkins Jul 10, 2024
e2c4209
feat: Added dashboard url to server info
Christopher-R-Perkins Jul 10, 2024
974abe5
feat: Added ability to pull steps and artifacts via ZenML client
Christopher-R-Perkins Jul 10, 2024
ee8cf49
feat: Added ability to get DAG data from ZenML client
Christopher-R-Perkins Jul 10, 2024
ba6c307
feat: add update button to DAG panel if DAG run still running or init…
ErikWiens Jul 11, 2024
a17751e
feat: Changed update bar to title/update bar
Christopher-R-Perkins Jul 11, 2024
376b634
feat: Dag Node onclick goes to dashboard
Christopher-R-Perkins Jul 11, 2024
ee3173f
feat: Added ZenML Panel for data view with DAG step/artifact
Christopher-R-Perkins Jul 11, 2024
535d688
chore: Added Documentation To DAG view
Christopher-R-Perkins Jul 11, 2024
de7cd4c
fix: Pipeline Url now goes to new dashboard
Christopher-R-Perkins Jul 11, 2024
cf8d4f8
feat: Changed way node urls are accessed - now via panel or dbl click
Christopher-R-Perkins Jul 12, 2024
c46dfd2
feat: Sped up graph building by implementing own solution
Christopher-R-Perkins Jul 12, 2024
3ae30f1
feat: Packed webview js and updated security for it
Christopher-R-Perkins Jul 14, 2024
b9f8b03
feat: Added loading indicator when retrieving step/artifact in panel
Christopher-R-Perkins Jul 14, 2024
50989d9
feat: Updated readme with rundown of DAG visualization feature
Christopher-R-Perkins Jul 14, 2024
19dc93a
chore: removed unused imports
Christopher-R-Perkins Jul 14, 2024
38d77e9
chore: implemented code rabbit suggestions
Christopher-R-Perkins Jul 15, 2024
9c18b5c
chore: refactor DagRenderer.createView to fit code rabbit suggestions
Christopher-R-Perkins Jul 15, 2024
5594a86
chore: Moved messageHandler for webviewpanel into own method for read…
Christopher-R-Perkins Jul 15, 2024
e238f28
chore: Refactored createMessageHandler and added error logs as sugges…
Christopher-R-Perkins Jul 15, 2024
37007c9
Fix linting
strickvl Jul 16, 2024
d4d6a19
fix: Fixed initializing edges list as list, not dict in grapher
Christopher-R-Perkins Jul 16, 2024
68161cd
fix: updated context mock in activation test
Christopher-R-Perkins Jul 16, 2024
2760d9e
chore: Implemented Alex's notes from code review
Christopher-R-Perkins Jul 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Sped up graph building by implementing own solution
  • Loading branch information
Christopher-R-Perkins committed Jul 12, 2024
commit c46dfd20ec49a497111e17ad8015fa2d20ade7ad
97 changes: 97 additions & 0 deletions bundled/tool/zenml_grapher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright (c) ZenML GmbH 2024. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""This module contains a tool to mimic LineageGraph output for pipeline runs"""

class Grapher:
"""Quick and dirty implementation of ZenML/LineageGraph to reduce number of api calls"""

def __init__(self, run):
self.run = run
self.nodes = []
self.artifacts = {}
self.edges = {}

def build_nodes_from_steps(self) -> None:
"""Builds internal node list from run steps"""
self.nodes = []
self.artifacts = {}

for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
self.nodes.append({
"id": str(step_data.id),
"type": "step",
"data": {
"execution_id": str(step_data.id),
"name": step,
"status": step_data.body.status,
},
})
self.add_artifacts_from_list(step_data.body.inputs)
self.add_artifacts_from_list(step_data.body.outputs)

strickvl marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +27 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method build_nodes_from_steps: Optimization and Redundancy Concern

Each call to build_nodes_from_steps resets self.nodes and self.artifacts, which could lead to inefficiencies if the method is called multiple times within the same context. Consider maintaining state across calls or restructuring how this method is invoked to prevent unnecessary reinitializations.

  def build_nodes_from_steps(self) -> None:
      """Builds internal node list from run steps"""
-     self.nodes = []
-     self.artifacts = {}
+     if not self.nodes and not self.artifacts:
+         self.nodes = []
+         self.artifacts = {}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def build_nodes_from_steps(self) -> None:
"""Builds internal node list from run steps"""
self.nodes = []
self.artifacts = {}
for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
self.nodes.append({
"id": str(step_data.id),
"type": "step",
"data": {
"execution_id": str(step_data.id),
"name": step,
"status": step_data.body.status,
},
})
self.add_artifacts_from_list(step_data.body.inputs)
self.add_artifacts_from_list(step_data.body.outputs)
def build_nodes_from_steps(self) -> None:
"""Builds internal node list from run steps"""
if not self.nodes and not self.artifacts:
self.nodes = []
self.artifacts = {}
for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
self.nodes.append({
"id": str(step_data.id),
"type": "step",
"data": {
"execution_id": str(step_data.id),
"name": step,
"status": step_data.body.status,
},
})
self.add_artifacts_from_list(step_data.body.inputs)
self.add_artifacts_from_list(step_data.body.outputs)


def add_artifacts_from_list(self, list) -> None:
strickvl marked this conversation as resolved.
Show resolved Hide resolved
"""Used to add unique artifacts to the internal nodes list by build_nodes_from_steps"""
for artifact in list:
id = str(list[artifact].body.artifact.id)
if id in self.artifacts:
continue

self.artifacts[id] = True

self.nodes.append({
"type": "artifact",
"id": id,
"data": {
"name": artifact,
"artifact_type": list[artifact].body.type,
"execution_id": str(list[artifact].id),
},
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve add_artifacts_from_list Method.

  1. The variable list is poorly named and does not convey its purpose. Consider renaming it to artifacts_list or similar.
  2. The method lacks type hints which would improve readability and maintainability.
  3. Using a dictionary for self.artifacts to check for duplicates is fine as discussed previously, but ensure this aligns with the overall design philosophy of the application.
-    def add_artifacts_from_list(self, list) -> None:
+    def add_artifacts_from_list(self, artifacts_list: Dict[str, Artifact]) -> None:
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def add_artifacts_from_list(self, list) -> None:
"""Used to add unique artifacts to the internal nodes list by build_nodes_from_steps"""
for artifact in list:
id = str(list[artifact].body.artifact.id)
if id in self.artifacts:
continue
self.artifacts[id] = True
self.nodes.append({
"type": "artifact",
"id": id,
"data": {
"name": artifact,
"artifact_type": list[artifact].body.type,
"execution_id": str(list[artifact].id),
},
})
def add_artifacts_from_list(self, artifacts_list: Dict[str, Artifact]) -> None:
"""Used to add unique artifacts to the internal nodes list by build_nodes_from_steps"""
for artifact in artifacts_list:
id = str(artifacts_list[artifact].body.artifact.id)
if id in self.artifacts:
continue
self.artifacts[id] = True
self.nodes.append({
"type": "artifact",
"id": id,
"data": {
"name": artifact,
"artifact_type": artifacts_list[artifact].body.type,
"execution_id": str(artifacts_list[artifact].id),
},
})



def build_edges_from_steps(self) -> None:
"""Builds internal edges list from run steps"""
self.edges = []

for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
step_id = str(step_data.id)

for artifact in step_data.body.inputs:
input_id = str(step_data.body.inputs[artifact].body.artifact.id)
self.add_edge(input_id, step_id)

for artifact in step_data.body.outputs:
output_id = str(step_data.body.outputs[artifact].body.artifact.id)
self.add_edge(step_id, output_id)

strickvl marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +67 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method build_edges_from_steps: Efficiency Concern

This method builds edges from steps but does not check for duplicates, which could lead to redundant data in self.edges. Consider implementing a check to prevent duplicate edges from being added, especially since this could impact performance and memory usage with large datasets.

  def add_edge(self, v: str, w: str) -> None:
      """Helper method to add an edge to the internal edges list"""
+     edge_id = f"{v}_{w}"
+     if any(edge['id'] == edge_id for edge in self.edges):
+         return
      self.edges.append({
          "id": edge_id,
          "source": v,
          "target": w,
      })
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def build_edges_from_steps(self) -> None:
"""Builds internal edges list from run steps"""
self.edges = []
for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
step_id = str(step_data.id)
for artifact in step_data.body.inputs:
input_id = str(step_data.body.inputs[artifact].body.artifact.id)
self.add_edge(input_id, step_id)
for artifact in step_data.body.outputs:
output_id = str(step_data.body.outputs[artifact].body.artifact.id)
self.add_edge(step_id, output_id)
def build_edges_from_steps(self) -> None:
"""Builds internal edges list from run steps"""
self.edges = []
for step in self.run.metadata.steps:
step_data = self.run.metadata.steps[step]
step_id = str(step_data.id)
for artifact in step_data.body.inputs:
input_id = str(step_data.body.inputs[artifact].body.artifact.id)
self.add_edge(input_id, step_id)
for artifact in step_data.body.outputs:
output_id = str(step_data.body.outputs[artifact].body.artifact.id)
self.add_edge(step_id, output_id)
def add_edge(self, v: str, w: str) -> None:
"""Helper method to add an edge to the internal edges list"""
edge_id = f"{v}_{w}"
if any(edge['id'] == edge_id for edge in self.edges):
return
self.edges.append({
"id": edge_id,
"source": v,
"target": w,
})


def add_edge(self, v, w) -> None:
strickvl marked this conversation as resolved.
Show resolved Hide resolved
"""Helper method to add an edge to the internal edges list"""
self.edges.append({
"id": f"{v}_{w}",
"source": v,
"target": w,
})
strickvl marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review add_edge Method.

The method adds edges to the internal list without checking for duplicates. Based on previous discussions, this is acceptable due to the nature of the data. However, adding type hints and a brief description in the method's docstring about the parameters v and w could enhance clarity.

-    def add_edge(self, v, w) -> None:
+    def add_edge(self, v: str, w: str) -> None:
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def add_edge(self, v, w) -> None:
"""Helper method to add an edge to the internal edges list"""
self.edges.append({
"id": f"{v}_{w}",
"source": v,
"target": w,
})
def add_edge(self, v: str, w: str) -> None:
"""Helper method to add an edge to the internal edges list"""
self.edges.append({
"id": f"{v}_{w}",
"source": v,
"target": w,
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in latest commit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Christopher-R-Perkins The changes to the add_edge method, including the type hints and docstring, have been successfully implemented in the latest commit. Thank you for making the updates.

Analysis chain

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Fetch the latest changes to the `add_edge` method in the `Grapher` class
ast-grep --lang python --pattern $'class Grapher {
  $$$
  def add_edge($_, $_) {
    $$$
  }
  $$$  
}'

Length of output: 450


Script:

#!/bin/bash
# Fetch the latest changes to the `add_edge` method in the `Grapher` class
ast-grep --lang python --pattern $'class Grapher {
  def add_edge(self, v: str, w: str) -> None:
    $$$
}'

Length of output: 602


Script:

#!/bin/bash
# Fetch the latest changes to the `add_edge` method in the `Grapher` class
ast-grep --lang python --pattern $'def add_edge(self, v: str, w: str) -> None:
    $$$
'

Length of output: 571


def to_dict(self) -> dict:
strickvl marked this conversation as resolved.
Show resolved Hide resolved
"""Returns dictionary containing graph data"""
return {
"nodes": self.nodes,
"edges": self.edges,
"status": self.run.body.status,
"name": self.run.body.pipeline.name,
"version": self.run.body.pipeline.body.version,
}
41 changes: 6 additions & 35 deletions bundled/tool/zenml_wrappers.py
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
import json
import pathlib
from typing import Any
from zenml_grapher import Grapher


class GlobalConfigWrapper:
@@ -260,11 +261,6 @@ def ValidationError(self):
def ZenMLBaseException(self):
"""Returns the ZenML ZenMLBaseException class."""
return self.lazy_import("zenml.exceptions", "ZenMLBaseException")

@property
def LineageGraph(self):
"""Returns the ZenML LineageGraph class."""
return self.lazy_import("zenml.lineage_graph.lineage_graph", "LineageGraph")

def fetch_pipeline_runs(self, args):
"""Fetches all ZenML pipeline runs.
@@ -377,36 +373,11 @@ def get_pipeline_run_graph(self, args) -> dict:
"""
try:
run_id = args[0]
run = self.client.get_pipeline_run(run_id, hydrate=False)
graph = self.LineageGraph()
graph.generate_run_nodes_and_edges(run)

dag_data = {
"nodes": [
{
"id": node.id,
"type": node.type,
"data": {
"execution_id": node.data.execution_id,
"name": node.data.name,
"status": node.data.status if node.type == 'step' else None,
"artifact_type": node.data.artifact_type if node.type == 'artifact' else None,
}
} for node in graph.nodes
],
"edges": [
{
"id": edge.id,
"source": edge.source,
"target": edge.target
} for edge in graph.edges
],
"status": run.body.status,
"name": run.body.pipeline.name,
"version": run.body.pipeline.body.version,
}

return dag_data
run = self.client.get_pipeline_run(run_id, hydrate=True)
graph = Grapher(run)
graph.build_nodes_from_steps()
graph.build_edges_from_steps()
return graph.to_dict()
except self.ZenMLBaseException as e:
return {"error": f"Failed to retrieve pipeline run graph: {str(e)}"}