Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metadata): ability to get artifacts location for Argo-Workflows v3.0+ #5829

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions backend/metadata_writer/src/metadata_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def patch_pod_metadata(


def output_name_to_argo(name: str) -> str:
import re
# This sanitization code should be kept in sync with the code in the DSL compiler.
# See https://github.com/kubeflow/pipelines/blob/39975e3cde7ba4dcea2bca835b92d0fe40b1ae3c/sdk/python/kfp/compiler/_k8s_helper.py#L33
return re.sub('-+', '-', re.sub('[^-_0-9A-Za-z]+', '-', name)).strip('-')
Expand All @@ -98,15 +97,24 @@ def get_object_store_provider(endpoint: str) -> bool:
else:
return 'minio'

def argo_artifact_to_uri(artifact: dict) -> str:
def argo_artifact_to_uri(artifact: dict, skip_key=False) -> str:
# s3 here means s3 compatible object storage. not AWS S3.
if 's3' in artifact:
s3_artifact = artifact['s3']
return '{provider}://{bucket}/{key}'.format(
provider=get_object_store_provider(s3_artifact['endpoint']),
bucket=s3_artifact.get('bucket', ''),
key=s3_artifact.get('key', ''),
)
if (s3_artifact.keys() >= {'endpoint', 'bucket'}):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comparison looks fragile. Can we do this check in a more robust way?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, and why does it look fragile to you?
Looks clean and short check of multiple keys in dict to me comparing to checking for subset or making unnecessary loops.

if skip_key:
return '{provider}://{bucket}/'.format(
provider=get_object_store_provider(s3_artifact['endpoint']),
bucket=s3_artifact.get('bucket', ''),
)
else:
return '{provider}://{bucket}/{key}'.format(
provider=get_object_store_provider(s3_artifact['endpoint']),
bucket=s3_artifact.get('bucket', ''),
key=s3_artifact.get('key', ''),
)
else:
return s3_artifact.get('key', '')
elif 'raw' in artifact:
return None
else:
Expand Down Expand Up @@ -325,8 +333,14 @@ def is_kfp_v2_pod(pod) -> bool:

output_artifacts = []
for name, art in argo_output_artifacts.items():
artifact_uri = argo_artifact_to_uri(art)
if not artifact_uri:
artifact_uri_check = argo_artifact_to_uri(art)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just call this artifact_uri?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can, it's just a habit of not overwriting variables while making some checks.

if artifact_uri_check:
if re.search('(\W+)', artifact_uri_check).group(1) != '://':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be something like if re.match(r'^\W+://', artifact_uri):?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, it could be if not re.match(r'\w+://', artifact_uri):, I just don't like to use match if I'm searching for anything in the middle of the string.

artifact_uri_wo_key = argo_artifact_to_uri(argo_template.get('archiveLocation', {}), True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can always pass argo_template.get('archiveLocation', {}) to argo_artifact_to_uri and encapsulate this logic there?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is, that despite the property name in archiveLocation they're storing the folder of artifact location but not the actual archive (file) location. For example in the outputs you will have file location like "key":"artifacts/artifact-passing-lr6fg/artifact-passing-lr6fg-2908156709/hello-art.tgz" and within archiveLocation you will have something like "key":"artifacts/artifact-passing-lr6fg/artifact-passing-lr6fg-2908156709"

artifact_uri = artifact_uri_wo_key + artifact_uri_check
else:
artifact_uri = artifact_uri_check
else:
continue
artifact_type_name = argo_output_name_to_type.get(name, 'NoType') # Cannot be None or ''

Expand Down