Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --only-output-json-directory Argo Workflow option #1947

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
28 changes: 10 additions & 18 deletions metaflow/plugins/argo/argo_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import json
import os
trhodeos marked this conversation as resolved.
Show resolved Hide resolved
import sys

from metaflow.exception import MetaflowException
from metaflow.plugins.kubernetes.kubernetes_client import KubernetesClient
Expand Down Expand Up @@ -283,23 +281,17 @@ def trigger_workflow_template(self, name, parameters={}):
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def schedule_workflow_template(self, name, schedule=None, timezone=None):
def schedule_workflow_template(self, name, cron_workflow=None):
# Unfortunately, Kubernetes client does not handle optimistic
# concurrency control by itself unlike kubectl
client = self._client.get()
body = {
"apiVersion": "argoproj.io/v1alpha1",
"kind": "CronWorkflow",
"metadata": {"name": name},
"spec": {
"suspend": schedule is None,
"schedule": schedule,
"timezone": timezone,
"workflowSpec": {"workflowTemplateRef": {"name": name}},
},
}
if cron_workflow is None:
trhodeos marked this conversation as resolved.
Show resolved Hide resolved
cron_workflow = {}
if not cron_workflow:
cron_workflow["metadata"] = {}

try:
body["metadata"][
cron_workflow["metadata"][
"resourceVersion"
] = client.CustomObjectsApi().get_namespaced_custom_object(
group=self._group,
Expand All @@ -315,15 +307,15 @@ def schedule_workflow_template(self, name, schedule=None, timezone=None):
except client.rest.ApiException as e:
# Scheduled workflow does not exist and we want to schedule a workflow
if e.status == 404:
if schedule is None:
if cron_workflow["spec"]["schedule"] is None:
return
try:
return client.CustomObjectsApi().create_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural="cronworkflows",
body=body,
body=cron_workflow,
)
except client.rest.ApiException as e:
raise ArgoClientException(
Expand All @@ -341,7 +333,7 @@ def schedule_workflow_template(self, name, schedule=None, timezone=None):
version=self._version,
namespace=self._namespace,
plural="cronworkflows",
body=body,
body=cron_workflow,
name=name,
)
except client.rest.ApiException as e:
Expand Down
98 changes: 86 additions & 12 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
from collections import defaultdict
from hashlib import sha1
from math import inf
from typing import List, Tuple

from metaflow import JSONType, current
from metaflow.decorators import flow_decorators
from metaflow.exception import MetaflowException
from metaflow.graph import DAGNode, FlowGraph
from metaflow.graph import FlowGraph
from metaflow.includefile import FilePathClass
from metaflow.metaflow_config import (
ARGO_EVENTS_EVENT,
Expand Down Expand Up @@ -41,7 +40,6 @@
KUBERNETES_FETCH_EC2_METADATA,
KUBERNETES_LABELS,
KUBERNETES_NAMESPACE,
KUBERNETES_NODE_SELECTOR,
KUBERNETES_SANDBOX_INIT_SCRIPT,
KUBERNETES_SECRETS,
S3_ENDPOINT_URL,
Expand Down Expand Up @@ -172,11 +170,21 @@ def __init__(

self.kubernetes_labels = self._get_kubernetes_labels()
self._workflow_template = self._compile_workflow_template()
self._cron_workflow = self._compile_cron_workflow()
self._sensor = self._compile_sensor()

def __str__(self):
return str(self._workflow_template)

def get_all_entities(self):
return [self._workflow_template, self._cron_workflow, self._sensor]

def get_cron_workflow(self):
return self._cron_workflow

def get_event_sensor(self):
return self._sensor

def deploy(self):
try:
# Register workflow template.
Expand Down Expand Up @@ -307,7 +315,7 @@ def trigger(cls, name, parameters=None):
try:
# Check that the workflow was deployed through Metaflow
workflow_template["metadata"]["annotations"]["metaflow/owner"]
except KeyError as e:
except KeyError:
raise ArgoWorkflowsException(
"An existing non-metaflow workflow with the same name as "
"*%s* already exists in Argo Workflows. \nPlease modify the "
Expand Down Expand Up @@ -345,9 +353,7 @@ def _get_schedule(self):
def schedule(self):
try:
argo_client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
argo_client.schedule_workflow_template(
self.name, self._schedule, self._timezone
)
argo_client.schedule_workflow_template(self.name, self._cron_workflow)
# Register sensor.
# Metaflow will overwrite any existing sensor.
sensor_name = ArgoWorkflows._sensor_name(self.name)
Expand Down Expand Up @@ -408,7 +414,7 @@ def get_existing_deployment(cls, name):
"metaflow/production_token"
],
)
except KeyError as e:
except KeyError:
raise ArgoWorkflowsException(
"An existing non-metaflow workflow with the same name as "
"*%s* already exists in Argo Workflows. \nPlease modify the "
Expand Down Expand Up @@ -2645,6 +2651,19 @@ def _heartbeat_daemon_template(self):
)
)

def _compile_cron_workflow(self):
return (
CronWorkflow()
.metadata(ObjectMeta().name(self.name))
.spec(
CronWorkflowSpec()
.suspend(self._schedule is None)
.schedule(self._schedule)
.timezone(self._timezone)
.workflow_template_ref_name(self.name)
)
)

def _compile_sensor(self):
# This method compiles a Metaflow @trigger decorator into Argo Events Sensor.
#
Expand Down Expand Up @@ -2744,8 +2763,6 @@ def _compile_sensor(self):
"sdk (https://pypi.org/project/kubernetes/) first."
)

labels = {"app.kubernetes.io/part-of": "metaflow"}

annotations = {
"metaflow/production_token": self.production_token,
"metaflow/owner": self.username,
Expand All @@ -2762,7 +2779,7 @@ def _compile_sensor(self):
)

# Useful to paint the UI
trigger_annotations = {
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trhodeos we have impending plans to move away from one sensor -> one workflow template mapping (to save resource costs) to one sensor -> multiple workflow templates which will break the expectations in this PR for --only-event-sensor-json. can you help us with the timelines you are shooting for this PR? happy to chat over zoom if that's helpful.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depending on your use case, an immediate workaround could also be to override argo_client.py through the extensions such that rather than submitting the CRDs to kubernetes, it just prints them out to stderr.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but regardless this change is in the right direction. can you help us with details on how you have tested this change? we should be able to test this PR ourselves some point next week.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potential solution is to only add a CLI arg --output-json-directory-only and just push all json blobs to that directory. This would essentially allow us to not be tied to the number of resources we generate (as in, adding more sensors is not a problem).

I like the argo_client.py idea, though we'd still like to push directly to argo while we migrate off of using metaflow directly. Still possible, but may require a bit more thinking.

For context: we are trying to use metaflow to generate the resources, spit them out to a gitops repo which has ArgoCD hooked up to it, for better staging of deploys.

As for testing, I'll test locally tomorrow!

"metaflow/triggered_by": json.dumps(
[
{key: trigger.get(key) for key in ["name", "type"]}
Expand Down Expand Up @@ -3029,6 +3046,7 @@ def list_to_prose(self, items, singular):
# TODO: Autogenerate them, maybe?



class WorkflowTemplate(object):
# https://argoproj.github.io/argo-workflows/fields/#workflowtemplate

Expand All @@ -3053,6 +3071,62 @@ def __str__(self):
return json.dumps(self.payload, indent=4)


class CronWorkflow(object):
# https://argo-workflows.readthedocs.io/en/latest/fields/#cronworkflow

def __init__(self):
tree = lambda: defaultdict(tree)
self.payload = tree()
self.payload["apiVersion"] = "argoproj.io/v1alpha1"
self.payload["kind"] = "CronWorkflow"

def metadata(self, object_meta):
self.payload["metadata"] = object_meta.to_json()
return self

def spec(self, cron_workflow_spec):
self.payload["spec"] = cron_workflow_spec.to_json()
return self

def to_json(self):
return self.payload

def __str__(self):
return json.dumps(self.payload, indent=4)


class CronWorkflowSpec(object):
# https://argo-workflows.readthedocs.io/en/latest/fields/#cronworkflowspec

def __init__(self):
tree = lambda: defaultdict(tree)
self.payload = tree()

def schedule(self, schedule):
self.payload["schedule"] = schedule
return self

def timezone(self, timezone):
self.payload["timezone"] = timezone
return self

def suspend(self, suspend):
self.payload["suspend"] = suspend
return self

def workflow_template_ref_name(self, workflow_template_ref_name):
self.payload["workflowSpec"]["workflowTemplateRef"][
"name"
] = workflow_template_ref_name
return self

def to_json(self):
return self.payload

def __str__(self):
return json.dumps(self.payload, indent=4)


class ObjectMeta(object):
# https://argoproj.github.io/argo-workflows/fields/#objectmeta

Expand Down Expand Up @@ -3252,8 +3326,8 @@ def __str__(self):

class DaemonTemplate(object):
def __init__(self, name):
tree = lambda: defaultdict(tree)
self.name = name
tree = lambda: defaultdict(tree)
self.payload = tree()
self.payload["daemon"] = True
self.payload["name"] = name
Expand Down
102 changes: 63 additions & 39 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import platform
import re
import sys
import os
import json
from hashlib import sha1

from metaflow import JSONType, Run, current, decorators, parameters
Expand Down Expand Up @@ -126,10 +128,17 @@ def argo_workflows(obj, name=None):
"to the given tag. See run --help for more information.",
)
@click.option(
"--only-workflow-template-json",
"--only-json",
is_flag=True,
default=False,
help="Only print out JSON sent to Argo Workflows. Do not deploy anything.",
help="Only print out the ArgoWorkflowTemplate JSON sent to Argo Workflows. Do not deploy anything.",
)
@click.option(
"--only-json-output-directory",
is_flag=True,
default=None,
help="Save all Argo Workflow k8s Template JSONs sent to Argo Workflows in this directory. Do not deploy anything.",
)
@click.option(
"--max-workers",
Expand Down Expand Up @@ -201,7 +210,8 @@ def create(
obj,
tags=None,
user_namespace=None,
only_json=False,
only_workflow_template_json=False,
only_json_output_directory=None,
authorize=None,
generate_new_token=False,
given_token=None,
Expand Down Expand Up @@ -272,50 +282,64 @@ def create(
enable_error_msg_capture,
)

if only_json:
only_json = (
only_workflow_template_json or only_json_output_directory
)
if only_workflow_template_json:
obj.echo_always(str(flow), err=False, no_bold=True)
# TODO: Support echo-ing Argo Events Sensor template
else:
flow.deploy()
elif only_json_output_directory:
def _save_json_to_directory(dir, obj):
os.makedirs(dir, exist_ok=True)
filename = f'{obj["kind"]}.{obj["metadata"]["name"]}.json'
path = os.path.join(dir, filename)
with open(path, 'w') as f:
json.dump(obj, f)
for e in flow.get_all_entities():
_save_json_to_directory(only_json_output_directory, e.to_json())

if only_json:
return;

flow.deploy()
obj.echo(
"Workflow *{workflow_name}* "
"for flow *{name}* pushed to "
"Argo Workflows successfully.\n".format(
workflow_name=obj.workflow_name, name=current.flow_name
),
bold=True,
)
if obj._is_workflow_name_modified:
obj.echo(
"Workflow *{workflow_name}* "
"for flow *{name}* pushed to "
"Argo Workflows successfully.\n".format(
workflow_name=obj.workflow_name, name=current.flow_name
),
bold=True,
"Note that the flow was deployed with a modified name "
"due to Kubernetes naming conventions\non Argo Workflows. The "
"original flow name is stored in the workflow annotation.\n"
)
if obj._is_workflow_name_modified:
obj.echo(
"Note that the flow was deployed with a modified name "
"due to Kubernetes naming conventions\non Argo Workflows. The "
"original flow name is stored in the workflow annotation.\n"
)

if ARGO_WORKFLOWS_UI_URL:
obj.echo("See the deployed workflow here:", bold=True)
argo_workflowtemplate_link = "%s/workflow-templates/%s" % (
ARGO_WORKFLOWS_UI_URL.rstrip("/"),
KUBERNETES_NAMESPACE,
)
obj.echo(
"%s/%s\n\n" % (argo_workflowtemplate_link, obj.workflow_name),
indent=True,
)
flow.schedule()
obj.echo("What will trigger execution of the workflow:", bold=True)
obj.echo(flow.trigger_explanation(), indent=True)
if ARGO_WORKFLOWS_UI_URL:
obj.echo("See the deployed workflow here:", bold=True)
argo_workflowtemplate_link = "%s/workflow-templates/%s" % (
ARGO_WORKFLOWS_UI_URL.rstrip("/"),
KUBERNETES_NAMESPACE,
)
obj.echo(
"%s/%s\n\n" % (argo_workflowtemplate_link, obj.workflow_name),
indent=True,
)
flow.schedule()
obj.echo("What will trigger execution of the workflow:", bold=True)
obj.echo(flow.trigger_explanation(), indent=True)

# TODO: Print events emitted by execution of this flow
# TODO: Print events emitted by execution of this flow

# response = ArgoWorkflows.trigger(obj.workflow_name)
# run_id = "argo-" + response["metadata"]["name"]
# response = ArgoWorkflows.trigger(obj.workflow_name)
# run_id = "argo-" + response["metadata"]["name"]

# obj.echo(
# "Workflow *{name}* triggered on Argo Workflows "
# "(run-id *{run_id}*).".format(name=obj.workflow_name, run_id=run_id),
# bold=True,
# )
# obj.echo(
# "Workflow *{name}* triggered on Argo Workflows "
# "(run-id *{run_id}*).".format(name=obj.workflow_name, run_id=run_id),
# bold=True,
# )


def check_python_version(obj):
Expand Down
Loading