diff --git a/docs/manifest.md b/docs/manifest.md index a967aa73e..d672d534d 100644 --- a/docs/manifest.md +++ b/docs/manifest.md @@ -33,8 +33,9 @@ the data is stored and references to the pipeline and component that were used t ```json { "metadata": { + "pipeline_name": "pipeline_name", "base_path": "gs://bucket", - "run_id": "12345", + "run_id": "pipeline_name_12345", "component_id": "67890" } } diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 52924a056..5eaed0866 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -1,4 +1,3 @@ -import datetime import json import logging import typing as t @@ -8,6 +7,7 @@ import yaml +from fondant.manifest import Metadata from fondant.pipeline import Pipeline logger = logging.getLogger(__name__) @@ -37,19 +37,6 @@ class DockerVolume: target: str -@dataclass -class MetaData: - """Dataclass representing the metadata arguments of a pipeline. - - Args: - run_id: identifier of the current pipeline run - base_path: the base path used to store the artifacts. - """ - - run_id: str - base_path: str - - class DockerCompiler(Compiler): """Compiler that creates a docker-compose spec from a pipeline.""" @@ -90,13 +77,6 @@ def ignore_aliases(self, data): logger.info(f"Successfully compiled to {output_path}") - @staticmethod - def _safe_component_name(component_name: str) -> str: - """Transform a component name to a docker-compose friendly one. - eg: `Component A` -> `component_a`. - """ - return component_name.replace(" ", "_").lower() - @staticmethod def _patch_path(base_path: str) -> t.Tuple[str, t.Optional[DockerVolume]]: """Helper that checks if the base_path is local or remote, @@ -132,29 +112,33 @@ def _generate_spec( """Generate a docker-compose spec as a python dictionary, loops over the pipeline graph to create services and their dependencies. """ - timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") path, volume = self._patch_path(base_path=pipeline.base_path) - run_id = f"{pipeline.name}-{timestamp}" - metadata = MetaData(run_id=run_id, base_path=path) + run_id = pipeline.get_run_id() services = {} pipeline.validate(run_id=run_id) for component_name, component in pipeline._graph.items(): + metadata = Metadata( + pipeline_name=pipeline.name, + run_id=run_id, + base_path=path, + component_id=component_name, + ) + logger.info(f"Compiling service for {component_name}") - safe_component_name = self._safe_component_name(component_name) component_op = component["fondant_component_op"] # add metadata argument to command - command = ["--metadata", json.dumps(asdict(metadata))] + command = ["--metadata", metadata.to_json()] # add in and out manifest paths to command command.extend( [ "--output_manifest_path", - f"{path}/{safe_component_name}/manifest.json", + f"{path}/{component_name}/manifest.json", ], ) @@ -169,15 +153,14 @@ def _generate_spec( depends_on = {} if component["dependencies"]: for dependency in component["dependencies"]: - safe_dependency = self._safe_component_name(dependency) - depends_on[safe_dependency] = { + depends_on[dependency] = { "condition": "service_completed_successfully", } # there is only an input manifest if the component has dependencies command.extend( [ "--input_manifest_path", - f"{path}/{safe_dependency}/manifest.json", + f"{path}/{dependency}/manifest.json", ], ) @@ -187,7 +170,7 @@ def _generate_spec( if extra_volumes: volumes.extend(extra_volumes) - services[safe_component_name] = { + services[component_name] = { "command": command, "depends_on": depends_on, "volumes": volumes, @@ -197,14 +180,12 @@ def _generate_spec( logger.info( f"Found Dockerfile for {component_name}, adding build step.", ) - services[safe_component_name]["build"] = { + services[component_name]["build"] = { "context": str(component_op.component_dir), "args": build_args, } else: - services[safe_component_name][ - "image" - ] = component_op.component_spec.image + services[component_name]["image"] = component_op.component_spec.image return { "name": pipeline.name, "version": "3.8", @@ -242,12 +223,21 @@ def compile( pipeline: the pipeline to compile output_path: the path where to save the Kubeflow pipeline spec """ + run_id = pipeline.get_run_id() @self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description) def kfp_pipeline(): previous_component_task = None manifest_path = "" - for component_name, component in self.pipeline._graph.items(): + + for component_name, component in pipeline._graph.items(): + metadata = Metadata( + pipeline_name=pipeline.name, + run_id=run_id, + base_path=pipeline.base_path, + component_id=component_name, + ) + logger.info(f"Compiling service for {component_name}") component_op = component["fondant_component_op"] @@ -259,16 +249,10 @@ def kfp_pipeline(): # Execute the Kubeflow component and pass in the output manifest path from # the previous component. component_args = component_op.arguments - metadata = json.dumps( - { - "base_path": self.pipeline.base_path, - "run_id": "{{workflow.name}}", - }, - ) component_task = kubeflow_component_op( input_manifest_path=manifest_path, - metadata=metadata, + metadata=metadata.to_json(), **component_args, ) # Set optional configurations @@ -287,7 +271,7 @@ def kfp_pipeline(): previous_component_task = component_task self.pipeline = pipeline - self.pipeline.validate(run_id="{{workflow.name}}") + self.pipeline.validate(run_id=run_id) logger.info(f"Compiling {self.pipeline.name} to {output_path}") self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 9b736e1cc..9b6bc4f34 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -24,7 +24,7 @@ ) from fondant.component_spec import Argument, ComponentSpec, kubeflow2python_type from fondant.data_io import DaskDataLoader, DaskDataWriter -from fondant.manifest import Manifest +from fondant.manifest import Manifest, Metadata from fondant.schema import validate_partition_number logger = logging.getLogger(__name__) @@ -46,7 +46,7 @@ def __init__( self.spec = spec self.input_manifest_path = input_manifest_path self.output_manifest_path = output_manifest_path - self.metadata = metadata + self.metadata = Metadata.from_dict(metadata) self.user_arguments = user_arguments self.input_partition_rows = input_partition_rows @@ -239,8 +239,9 @@ def optional_fondant_arguments() -> t.List[str]: def _load_or_create_manifest(self) -> Manifest: component_id = self.spec.name.lower().replace(" ", "_") return Manifest.create( - base_path=self.metadata["base_path"], - run_id=self.metadata["run_id"], + pipeline_name=self.metadata.pipeline_name, + base_path=self.metadata.base_path, + run_id=self.metadata.run_id, component_id=component_id, ) diff --git a/src/fondant/manifest.py b/src/fondant/manifest.py index 15d3cb584..448c21865 100644 --- a/src/fondant/manifest.py +++ b/src/fondant/manifest.py @@ -4,6 +4,7 @@ import pkgutil import types import typing as t +from dataclasses import asdict, dataclass from pathlib import Path import jsonschema.exceptions @@ -70,6 +71,26 @@ def fields(self) -> t.Dict[str, Field]: } +@dataclass +class Metadata: + """Class representing the Metadata of the manifest.""" + + base_path: str + pipeline_name: str + run_id: str + component_id: str + + def to_dict(self): + return asdict(self) + + def to_json(self): + return json.dumps(self.to_dict()) + + @classmethod + def from_dict(cls, data_dict): + return cls(**data_dict) + + class Manifest: """ Class representing a Fondant manifest. @@ -112,20 +133,31 @@ def retrieve_from_filesystem(uri: str) -> Resource: raise InvalidManifest.create_from(e) @classmethod - def create(cls, *, base_path: str, run_id: str, component_id: str) -> "Manifest": + def create( + cls, + *, + pipeline_name: str, + base_path: str, + run_id: str, + component_id: str, + ) -> "Manifest": """Create an empty manifest. Args: + pipeline_name: the name of the pipeline base_path: The base path of the manifest run_id: The id of the current pipeline run component_id: The id of the current component being executed """ + metadata = Metadata( + pipeline_name=pipeline_name, + base_path=base_path, + run_id=run_id, + component_id=component_id, + ) + specification = { - "metadata": { - "base_path": base_path, - "run_id": run_id, - "component_id": component_id, - }, + "metadata": metadata.to_dict(), "index": {"location": f"/index/{run_id}/{component_id}"}, "subsets": {}, } @@ -166,6 +198,10 @@ def run_id(self) -> str: def component_id(self) -> str: return self.metadata["component_id"] + @property + def pipeline_name(self) -> str: + return self.metadata["pipeline_name"] + @property def index(self) -> Index: return Index(self._specification["index"], base_path=self.base_path) diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index a2c731688..a82bb09ea 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -1,4 +1,5 @@ """This module defines classes to represent a Fondant Pipeline.""" +import datetime import hashlib import json import logging @@ -57,12 +58,13 @@ def __init__( node_pool_name: t.Optional[str] = None, ) -> None: self.component_dir = Path(component_dir) - self.input_partition_rows = input_partition_rows - self.arguments = self._set_arguments(arguments) - self.component_spec = ComponentSpec.from_file( self.component_dir / self.COMPONENT_SPEC_NAME, ) + self.name = self.component_spec.name.replace(" ", "_").lower() + self.input_partition_rows = input_partition_rows + self.arguments = self._set_arguments(arguments) + self.arguments.setdefault("component_spec", self.component_spec.specification) self.number_of_gpus = number_of_gpus @@ -238,11 +240,9 @@ def add_op( msg, ) - dependencies_names = [ - dependency.component_spec.name for dependency in dependencies - ] + dependencies_names = [dependency.name for dependency in dependencies] - self._graph[task.component_spec.name] = { + self._graph[task.name] = { "fondant_component_op": task, "dependencies": dependencies_names, } @@ -279,11 +279,17 @@ def _validate_pipeline_name(pipeline_name: str) -> str: raise InvalidPipelineDefinition(msg) return pipeline_name + def get_run_id(self) -> str: + """Get a unique run ID for the pipeline.""" + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + return f"{self.name}-{timestamp}" + def validate(self, run_id: str): """Sort and run validation on the pipeline definition. Args: - run_id (str, optional): run identifier. Defaults to None. + run_id: run identifier + """ self.sort_graph() self._validate_pipeline_definition(run_id) @@ -310,6 +316,7 @@ def _validate_pipeline_definition(self, run_id: str): # Create initial manifest manifest = Manifest.create( + pipeline_name=self.name, base_path=self.base_path, run_id=run_id, component_id=load_component_name, diff --git a/src/fondant/schemas/manifest.json b/src/fondant/schemas/manifest.json index f08d82c29..00ad6d1cc 100644 --- a/src/fondant/schemas/manifest.json +++ b/src/fondant/schemas/manifest.json @@ -9,6 +9,9 @@ "type": "string", "format": "uri" }, + "pipeline_name": { + "type": "string" + }, "run_id": { "type": "string" }, @@ -18,6 +21,7 @@ }, "required": [ "base_path", + "pipeline_name", "run_id", "component_id" ] diff --git a/tests/example_data/manifest.json b/tests/example_data/manifest.json index 2d17d6af8..8fe4ef16b 100644 --- a/tests/example_data/manifest.json +++ b/tests/example_data/manifest.json @@ -1,7 +1,8 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "tests/example_data/subsets_input", - "run_id": "12345", + "run_id": "test_pipeline_12345", "component_id": "67890" }, "index": { diff --git a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml index b7d31f3b2..fc39ca80d 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml @@ -6,7 +6,8 @@ services: context: tests/example_pipelines/valid_pipeline/example_1/first_component command: - --metadata - - '{"run_id": "test_pipeline-20230101000000", "base_path": "/foo/bar"}' + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "first_component"}' - --output_manifest_path - /foo/bar/first_component/manifest.json - --storage_args @@ -26,7 +27,8 @@ services: context: tests/example_pipelines/valid_pipeline/example_1/second_component command: - --metadata - - '{"run_id": "test_pipeline-20230101000000", "base_path": "/foo/bar"}' + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "second_component"}' - --output_manifest_path - /foo/bar/second_component/manifest.json - --storage_args @@ -48,16 +50,15 @@ services: third_component: build: args: [] - context: tests/example_pipelines/valid_pipeline/example_1/fourth_component + context: tests/example_pipelines/valid_pipeline/example_1/third_component command: - --metadata - - '{"run_id": "test_pipeline-20230101000000", "base_path": "/foo/bar"}' + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "third_component"}' - --output_manifest_path - /foo/bar/third_component/manifest.json - --storage_args - a dummy string arg - - --some_list - - '[1, 2, 3]' - --input_partition_rows - None - --component_spec @@ -67,8 +68,7 @@ services: "embeddings": {"fields": {"data": {"type": "array", "items": {"type": "float32"}}}}}, "produces": {"images": {"fields": {"data": {"type": "binary"}}}, "additionalSubsets": false}, "args": {"storage_args": {"description": "Storage arguments", "type": - "str"}, "some_list": {"description": "Some list", "type": "list", "items": {"type": - "int"}}}}' + "str"}}}' - --input_manifest_path - /foo/bar/second_component/manifest.json depends_on: diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml index acdb7ac69..5ddb02e38 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -1,47 +1,68 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - generateName: test-pipeline- - annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + annotations: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' pipelines.kubeflow.org/pipeline_spec: '{"description": "description of the test - pipeline", "name": "test_pipeline"}'} - labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} + pipeline", "name": "test_pipeline"}' + generateName: test-pipeline- + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 spec: + arguments: + parameters: [] entrypoint: test-pipeline + serviceAccountName: pipeline-runner templates: - - name: first-component - container: + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": - {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', - --input_partition_rows, disable, --storage_args, a dummy string arg, --output_manifest_path, - /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "first_component"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": {"type": + "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' + - --input_partition_rows + - disable + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: {data: ''} - outputs: - artifacts: - - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + raw: + data: '' metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", - {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, - "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "disable", "metadata": + "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": + \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", + "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -50,46 +71,64 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "First component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", - \"type\": \"str\"}}, \"description\": \"This is an example component\", - \"image\": \"example_component:latest\", \"name\": \"First component\", - \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": - "disable", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", - "storage_args": "a dummy string arg"}'} - - name: second-component - container: + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: first-component + outputs: + artifacts: + - name: first-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "description": - "This is an example component", "image": "example_component:latest", "name": - "Second component", "produces": {"embeddings": {"fields": {"data": {"items": - {"type": "float32"}, "type": "array"}}}}}', --input_partition_rows, '10', - --storage_args, a dummy string arg, --output_manifest_path, /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "second_component"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "description": + "This is an example component", "image": "example_component:latest", "name": + "Second component", "produces": {"embeddings": {"fields": {"data": {"items": + {"type": "float32"}, "type": "array"}}}}}' + - --input_partition_rows + - '10' + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - - {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} - outputs: - artifacts: - - {name: second-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + - name: first-component-output_manifest_path + path: /tmp/inputs/input_manifest_path/data metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", - {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, - "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"consumes\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"Second component\", \"produces\": {\"embeddings\": {\"fields\": + {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}}}", + "input_partition_rows": "10", "metadata": "{\"base_path\": \"/foo/bar\", + \"pipeline_name\": \"test_pipeline\", \"run_id\": \"test_pipeline-20230101000000\", + \"component_id\": \"second_component\"}", "storage_args": "a dummy string + arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "a02b0189397a2d9318982201f020dbbbe3962427ed150fe58cc69ff508cc68bb"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -98,89 +137,103 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "Second component", "outputs": [{"description": "Path to the output manifest", "name": - "output_manifest_path", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "a02b0189397a2d9318982201f020dbbbe3962427ed150fe58cc69ff508cc68bb"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", - \"type\": \"str\"}}, \"consumes\": {\"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}, \"description\": \"This is an example component\", - \"image\": \"example_component:latest\", \"name\": \"Second component\", - \"produces\": {\"embeddings\": {\"fields\": {\"data\": {\"items\": {\"type\": - \"float32\"}, \"type\": \"array\"}}}}}", "input_partition_rows": "10", "metadata": - "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "storage_args": - "a dummy string arg"}'} - - name: test-pipeline - dag: + "output_manifest_path", "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: second-component + outputs: + artifacts: + - name: second-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - dag: tasks: - - {name: first-component, template: first-component} - - name: second-component + - name: first-component + template: first-component + - arguments: + artifacts: + - from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}' + name: first-component-output_manifest_path + dependencies: + - first-component + name: second-component template: second-component - dependencies: [first-component] - arguments: + - arguments: artifacts: - - {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'} - - name: third-component + - from: '{{tasks.second-component.outputs.artifacts.second-component-output_manifest_path}}' + name: second-component-output_manifest_path + dependencies: + - second-component + name: third-component template: third-component - dependencies: [second-component] - arguments: - artifacts: - - {name: second-component-output_manifest_path, from: '{{tasks.second-component.outputs.artifacts.second-component-output_manifest_path}}'} - - name: third-component - container: + name: test-pipeline + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"some_list": {"description": "Some list", "items": {"type": "int"}, - "type": "list"}, "storage_args": {"description": "Storage arguments", "type": - "str"}}, "consumes": {"captions": {"fields": {"data": {"type": "string"}}}, - "embeddings": {"fields": {"data": {"items": {"type": "float32"}, "type": - "array"}}}, "images": {"fields": {"data": {"type": "binary"}}}}, "description": - "This is an example component", "image": "example_component:latest", "name": - "Third component", "produces": {"additionalSubsets": false, "images": {"fields": - {"data": {"type": "binary"}}}}}', --input_partition_rows, None, --storage_args, - a dummy string arg, --some_list, '[1, 2, 3]', --output_manifest_path, /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "third_component"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "consumes": {"captions": {"fields": {"data": {"type": "string"}}}, "embeddings": + {"fields": {"data": {"items": {"type": "float32"}, "type": "array"}}}, "images": + {"fields": {"data": {"type": "binary"}}}}, "description": "This is an example + component", "image": "example_component:latest", "name": "Third component", + "produces": {"additionalSubsets": false, "images": {"fields": {"data": {"type": + "binary"}}}}}' + - --input_partition_rows + - None + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - - {name: second-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} - outputs: - artifacts: - - {name: third-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + - name: second-component-output_manifest_path + path: /tmp/inputs/input_manifest_path/data metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--storage_args", {"inputValue": "storage_args"}, "--some_list", {"inputValue": - "some_list"}, "--output_manifest_path", {"outputPath": "output_manifest_path"}], - "image": "example_component:latest"}}, "inputs": [{"description": "Path - to the input manifest", "name": "input_manifest_path", "type": "String"}, - {"description": "Metadata arguments containing the run id and base path", - "name": "metadata", "type": "String"}, {"default": "None", "description": - "The component specification as a dictionary", "name": "component_spec", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"consumes\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, + \"embeddings\": {\"fields\": {\"data\": {\"items\": {\"type\": \"float32\"}, + \"type\": \"array\"}}}, \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"Third component\", \"produces\": {\"additionalSubsets\": false, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": + "None", "metadata": "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", + \"run_id\": \"test_pipeline-20230101000000\", \"component_id\": \"third_component\"}", + "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "698791c6aa2ed14d4b337840116a7a995f403e5be414389b05ccf7942b9e4437"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", + "type": "String"}, {"description": "Metadata arguments containing the run + id and base path", "name": "metadata", "type": "String"}, {"default": "None", + "description": "The component specification as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": "None", "description": "The number of rows to load per partition. Set to override the automatic partitioning", "name": "input_partition_rows", "type": "String"}, {"description": "Storage - arguments", "name": "storage_args", "type": "String"}, {"description": "Some - list", "name": "some_list", "type": "JsonArray"}], "name": "Third component", + arguments", "name": "storage_args", "type": "String"}], "name": "Third component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "253932349a663809f2ea6fcf63ebd58f963881c6960435269d3fbe3eb17dcf53"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"some_list\": {\"description\": \"Some list\", \"items\": - {\"type\": \"int\"}, \"type\": \"list\"}, \"storage_args\": {\"description\": - \"Storage arguments\", \"type\": \"str\"}}, \"consumes\": {\"captions\": - {\"fields\": {\"data\": {\"type\": \"string\"}}}, \"embeddings\": {\"fields\": - {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, \"description\": - \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"Third component\", \"produces\": {\"additionalSubsets\": false, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": - "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", - "some_list": "[1, 2, 3]", "storage_args": "a dummy string arg"}'} - arguments: - parameters: [] - serviceAccountName: pipeline-runner + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: third-component + outputs: + artifacts: + - name: third-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data diff --git a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml index 807d281ba..b84fa6d69 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml @@ -6,7 +6,8 @@ services: context: tests/example_pipelines/valid_pipeline/example_1/first_component command: - --metadata - - '{"run_id": "test_pipeline-20230101000000", "base_path": "/foo/bar"}' + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "first_component"}' - --output_manifest_path - /foo/bar/first_component/manifest.json - --storage_args @@ -23,7 +24,8 @@ services: image_cropping: command: - --metadata - - '{"run_id": "test_pipeline-20230101000000", "base_path": "/foo/bar"}' + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "image_cropping"}' - --output_manifest_path - /foo/bar/image_cropping/manifest.json - --cropping_threshold diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index 67d306139..adcfdf703 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -1,47 +1,68 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - generateName: test-pipeline- - annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + annotations: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' pipelines.kubeflow.org/pipeline_spec: '{"description": "description of the test - pipeline", "name": "test_pipeline"}'} - labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} + pipeline", "name": "test_pipeline"}' + generateName: test-pipeline- + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 spec: + arguments: + parameters: [] entrypoint: test-pipeline + serviceAccountName: pipeline-runner templates: - - name: first-component - container: + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": - {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', - --input_partition_rows, None, --storage_args, a dummy string arg, --output_manifest_path, - /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "first_component"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": {"type": + "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' + - --input_partition_rows + - None + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: {data: ''} - outputs: - artifacts: - - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + raw: + data: '' metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", - {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, - "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": + "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": + \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", + "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -50,47 +71,71 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "First component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", - \"type\": \"str\"}}, \"description\": \"This is an example component\", - \"image\": \"example_component:latest\", \"name\": \"First component\", - \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": - "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", - "storage_args": "a dummy string arg"}'} - - name: image-cropping - container: + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: first-component + outputs: + artifacts: + - name: first-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"cropping_threshold": {"default": -30, "description": "Threshold - parameter used for detecting borders. A lower (negative) parameter results - in a more performant border detection, but can cause overcropping. Default - is -30", "type": "int"}, "padding": {"default": 10, "description": "Padding - for the image cropping. The padding is added to all borders of the image.", - "type": "int"}}, "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, - "description": "Component that removes single-colored borders around images - and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", - "name": "Image cropping", "produces": {"images": {"fields": {"data": {"type": - "binary"}, "height": {"type": "int32"}, "width": {"type": "int32"}}}}}', - --input_partition_rows, None, --cropping_threshold, '0', --padding, '0', --output_manifest_path, - /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "image_cropping"}' + - --component_spec + - '{"args": {"cropping_threshold": {"default": -30, "description": "Threshold + parameter used for detecting borders. A lower (negative) parameter results + in a more performant border detection, but can cause overcropping. Default + is -30", "type": "int"}, "padding": {"default": 10, "description": "Padding + for the image cropping. The padding is added to all borders of the image.", + "type": "int"}}, "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, + "description": "Component that removes single-colored borders around images + and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", + "name": "Image cropping", "produces": {"images": {"fields": {"data": {"type": + "binary"}, "height": {"type": "int32"}, "width": {"type": "int32"}}}}}' + - --input_partition_rows + - None + - --cropping_threshold + - '0' + - --padding + - '0' + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: ghcr.io/ml6team/image_cropping:dev inputs: artifacts: - - {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} - outputs: - artifacts: - - {name: image-cropping-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + - name: first-component-output_manifest_path + path: /tmp/inputs/input_manifest_path/data metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Component - that removes single-colored borders around images and crops them appropriately", - "implementation": {"container": {"command": ["python3", "main.py", "--input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"cropping_threshold\": {\"default\": -30, \"description\": \"Threshold + parameter used for detecting borders. A lower (negative) parameter results + in a more performant border detection, but can cause overcropping. Default + is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": + \"Padding for the image cropping. The padding is added to all borders of + the image.\", \"type\": \"int\"}}, \"consumes\": {\"images\": {\"fields\": + {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"Component that + removes single-colored borders around images and crops them appropriately\", + \"image\": \"ghcr.io/ml6team/image_cropping:dev\", \"name\": \"Image cropping\", + \"produces\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}, + \"height\": {\"type\": \"int32\"}, \"width\": {\"type\": \"int32\"}}}}}", + "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": + \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": \"test_pipeline-20230101000000\", + \"component_id\": \"image_cropping\"}", "padding": "0"}' + pipelines.kubeflow.org/component_ref: '{"digest": "e86f02b6b9cc878b6187e44bb3caf9291c3ce42c1939e19b0a97dacdc78a9d72"}' + pipelines.kubeflow.org/component_spec: '{"description": "Component that removes + single-colored borders around images and crops them appropriately", "implementation": + {"container": {"command": ["python3", "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--cropping_threshold", {"inputValue": @@ -109,31 +154,26 @@ spec: 10, "description": "Padding for the image cropping. The padding is added to all borders of the image.", "name": "padding", "type": "Integer"}], "name": "Image cropping", "outputs": [{"description": "Path to the output manifest", - "name": "output_manifest_path", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "e86f02b6b9cc878b6187e44bb3caf9291c3ce42c1939e19b0a97dacdc78a9d72"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"cropping_threshold\": {\"default\": -30, \"description\": - \"Threshold parameter used for detecting borders. A lower (negative) parameter - results in a more performant border detection, but can cause overcropping. - Default is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": - \"Padding for the image cropping. The padding is added to all borders of - the image.\", \"type\": \"int\"}}, \"consumes\": {\"images\": {\"fields\": - {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"Component that - removes single-colored borders around images and crops them appropriately\", - \"image\": \"ghcr.io/ml6team/image_cropping:dev\", \"name\": \"Image cropping\", - \"produces\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}, - \"height\": {\"type\": \"int32\"}, \"width\": {\"type\": \"int32\"}}}}}", - "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": - \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "padding": "0"}'} - - name: test-pipeline - dag: + "name": "output_manifest_path", "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: image-cropping + outputs: + artifacts: + - name: image-cropping-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - dag: tasks: - - {name: first-component, template: first-component} - - name: image-cropping - template: image-cropping - dependencies: [first-component] - arguments: + - name: first-component + template: first-component + - arguments: artifacts: - - {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'} - arguments: - parameters: [] - serviceAccountName: pipeline-runner + - from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}' + name: first-component-output_manifest_path + dependencies: + - first-component + name: image-cropping + template: image-cropping + name: test-pipeline diff --git a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml index 8383894d6..8450fe823 100644 --- a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml @@ -1,50 +1,71 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - generateName: test-pipeline- - annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + annotations: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' pipelines.kubeflow.org/pipeline_spec: '{"description": "description of the test - pipeline", "name": "test_pipeline"}'} - labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} + pipeline", "name": "test_pipeline"}' + generateName: test-pipeline- + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 spec: + arguments: + parameters: [] entrypoint: test-pipeline + serviceAccountName: pipeline-runner templates: - - name: first-component - container: + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": - {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', - --input_partition_rows, None, --storage_args, a dummy string arg, --output_manifest_path, - /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "first_component"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": {"type": + "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' + - --input_partition_rows + - None + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest resources: - limits: {nvidia.com/gpu: 1} + limits: + nvidia.com/gpu: 1 inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: {data: ''} - outputs: - artifacts: - - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} - nodeSelector: {node_pool: a_node_pool} + raw: + data: '' metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", - {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, - "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": + "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": + \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", + "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -53,19 +74,20 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "First component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", - \"type\": \"str\"}}, \"description\": \"This is an example component\", - \"image\": \"example_component:latest\", \"name\": \"First component\", - \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": - "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", - "storage_args": "a dummy string arg"}'} - - name: test-pipeline - dag: + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: first-component + nodeSelector: + node_pool: a_node_pool + outputs: + artifacts: + - name: first-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - dag: tasks: - - {name: first-component, template: first-component} - arguments: - parameters: [] - serviceAccountName: pipeline-runner + - name: first-component + template: first-component + name: test-pipeline diff --git a/tests/example_pipelines/valid_pipeline/example_1/fourth_component/fondant_component.yaml b/tests/example_pipelines/valid_pipeline/example_1/fourth_component/fondant_component.yaml index f1d6d0b77..3cda0cc6c 100644 --- a/tests/example_pipelines/valid_pipeline/example_1/fourth_component/fondant_component.yaml +++ b/tests/example_pipelines/valid_pipeline/example_1/fourth_component/fondant_component.yaml @@ -1,4 +1,4 @@ -name: Third component +name: Fourth component description: This is an example component image: example_component:latest diff --git a/tests/example_specs/components/arguments/input_manifest.json b/tests/example_specs/components/arguments/input_manifest.json index bdaf8976e..004113289 100644 --- a/tests/example_specs/components/arguments/input_manifest.json +++ b/tests/example_specs/components/arguments/input_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "/bucket", "run_id": "12345", "component_id": "67890" diff --git a/tests/example_specs/components/input_manifest.json b/tests/example_specs/components/input_manifest.json index 7ff924680..7af13d599 100644 --- a/tests/example_specs/components/input_manifest.json +++ b/tests/example_specs/components/input_manifest.json @@ -1,7 +1,8 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "/bucket", - "run_id": "12345", + "run_id": "test_pipeline_12345", "component_id": "67890" }, "index": { diff --git a/tests/example_specs/evolution_examples/1/output_manifest.json b/tests/example_specs/evolution_examples/1/output_manifest.json index 9f14f77d9..2694368d7 100644 --- a/tests/example_specs/evolution_examples/1/output_manifest.json +++ b/tests/example_specs/evolution_examples/1/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/2/output_manifest.json b/tests/example_specs/evolution_examples/2/output_manifest.json index 98483d483..a9d3851b2 100644 --- a/tests/example_specs/evolution_examples/2/output_manifest.json +++ b/tests/example_specs/evolution_examples/2/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/3/output_manifest.json b/tests/example_specs/evolution_examples/3/output_manifest.json index b74d4b774..cb510c3cf 100644 --- a/tests/example_specs/evolution_examples/3/output_manifest.json +++ b/tests/example_specs/evolution_examples/3/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/4/output_manifest.json b/tests/example_specs/evolution_examples/4/output_manifest.json index 54fc737a4..bf0c2a295 100644 --- a/tests/example_specs/evolution_examples/4/output_manifest.json +++ b/tests/example_specs/evolution_examples/4/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/5/output_manifest.json b/tests/example_specs/evolution_examples/5/output_manifest.json index d9fffcaff..0ed082ce8 100644 --- a/tests/example_specs/evolution_examples/5/output_manifest.json +++ b/tests/example_specs/evolution_examples/5/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/6/output_manifest.json b/tests/example_specs/evolution_examples/6/output_manifest.json index f03e1647e..9f8e814fa 100644 --- a/tests/example_specs/evolution_examples/6/output_manifest.json +++ b/tests/example_specs/evolution_examples/6/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/7/output_manifest.json b/tests/example_specs/evolution_examples/7/output_manifest.json index 1879dc0be..dec03420a 100644 --- a/tests/example_specs/evolution_examples/7/output_manifest.json +++ b/tests/example_specs/evolution_examples/7/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/8/output_manifest.json b/tests/example_specs/evolution_examples/8/output_manifest.json index 3f1de1fa7..657aeaadb 100644 --- a/tests/example_specs/evolution_examples/8/output_manifest.json +++ b/tests/example_specs/evolution_examples/8/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/input_manifest.json b/tests/example_specs/evolution_examples/input_manifest.json index 2c12b2a1d..2d9910981 100644 --- a/tests/example_specs/evolution_examples/input_manifest.json +++ b/tests/example_specs/evolution_examples/input_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "67890" diff --git a/tests/example_specs/manifests/valid_manifest.json b/tests/example_specs/manifests/valid_manifest.json index 8b9a5794d..9bc00c512 100644 --- a/tests/example_specs/manifests/valid_manifest.json +++ b/tests/example_specs/manifests/valid_manifest.json @@ -1,7 +1,8 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", - "run_id": "12345", + "run_id": "test_pipeline_12345", "component_id": "67890" }, "index": { diff --git a/tests/test_compiler.py b/tests/test_compiler.py index 89c863143..af15e58f8 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -27,10 +27,9 @@ input_partition_rows="10", ), ComponentOp( - Path(COMPONENTS_PATH / "example_1" / "fourth_component"), + Path(COMPONENTS_PATH / "example_1" / "third_component"), arguments={ "storage_args": "a dummy string arg", - "some_list": [1, 2, 3], }, ), ], @@ -82,7 +81,7 @@ def setup_pipeline(request, tmp_path, monkeypatch): # override the default package_path with temporary path to avoid the creation of artifacts monkeypatch.setattr(pipeline, "package_path", str(tmp_path / "test_pipeline.tgz")) - return (example_dir, pipeline) + return example_dir, pipeline @pytest.mark.usefixtures("_freeze_time") @@ -127,7 +126,8 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): # check if commands are patched to use the working dir commands_with_dir = [ f"{work_dir}/{name}/manifest.json", - f'{{"run_id": "test_pipeline-20230101000000", "base_path": "{work_dir}"}}', + f'{{"base_path": "{work_dir}", "pipeline_name": "{pipeline.name}",' + f' "run_id": "test_pipeline-20230101000000", "component_id": "{name}"}}', ] for command in commands_with_dir: assert command in service["command"] @@ -153,7 +153,8 @@ def test_docker_remote_path(setup_pipeline, tmp_path_factory): # check if commands are patched to use the remote dir commands_with_dir = [ f"{remote_dir}/{name}/manifest.json", - f'{{"run_id": "test_pipeline-20230101000000", "base_path": "{remote_dir}"}}', + f'{{"base_path": "{remote_dir}", "pipeline_name": "{pipeline.name}",' + f' "run_id": "test_pipeline-20230101000000", "component_id": "{name}"}}', ] for command in commands_with_dir: assert command in service["command"] diff --git a/tests/test_component.py b/tests/test_component.py index 3ecab1f16..851983c71 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -21,7 +21,7 @@ ExecutorFactory, PandasTransformExecutor, ) -from fondant.manifest import Manifest +from fondant.manifest import Manifest, Metadata components_path = Path(__file__).parent / "example_specs/components" N_PARTITIONS = 2 @@ -33,6 +33,16 @@ def yaml_file_to_json_string(file_path): return json.dumps(data) +@pytest.fixture() +def metadata(): + return Metadata( + pipeline_name="pipeline", + base_path="/bucket", + component_id="load_component", + run_id="12345", + ) + + @pytest.fixture() def _patched_data_loading(monkeypatch): """Mock data loading so no actual data is loaded.""" @@ -72,14 +82,14 @@ def wrapper(self, *args, **kwargs): return wrapper -def test_component_arguments(): +def test_component_arguments(metadata): # Mock CLI arguments sys.argv = [ "", "--input_manifest_path", str(components_path / "arguments/input_manifest.json"), "--metadata", - "{}", + metadata.to_json(), "--output_manifest_path", str(components_path / "arguments/output_manifest.json"), "--component_spec", @@ -127,12 +137,13 @@ def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]: @pytest.mark.usefixtures("_patched_data_writing") -def test_load_component(): +def test_load_component(metadata): # Mock CLI arguments load + sys.argv = [ "", "--metadata", - json.dumps({"base_path": "/bucket", "run_id": "12345"}), + metadata.to_json(), "--flag", "success", "--value", @@ -168,14 +179,14 @@ def load(self): @pytest.mark.usefixtures("_patched_data_loading", "_patched_data_writing") -def test_dask_transform_component(): +def test_dask_transform_component(metadata): # Mock CLI arguments sys.argv = [ "", "--input_manifest_path", str(components_path / "input_manifest.json"), "--metadata", - "{}", + metadata.to_json(), "--flag", "success", "--value", @@ -213,14 +224,14 @@ def transform(self, dataframe): @pytest.mark.usefixtures("_patched_data_loading", "_patched_data_writing") -def test_pandas_transform_component(): +def test_pandas_transform_component(metadata): # Mock CLI arguments sys.argv = [ "", "--input_manifest_path", str(components_path / "input_manifest.json"), "--metadata", - "{}", + metadata.to_json(), "--flag", "success", "--value", @@ -329,14 +340,14 @@ def transform(dataframe: pd.DataFrame) -> pd.DataFrame: @pytest.mark.usefixtures("_patched_data_loading") -def test_write_component(): +def test_write_component(metadata): # Mock CLI arguments sys.argv = [ "", "--input_manifest_path", str(components_path / "input_manifest.json"), "--metadata", - "{}", + metadata.to_json(), "--flag", "success", "--value", diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 68fb9f364..922d70cfa 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -117,9 +117,11 @@ def test_manifest_creation(): """Test the stepwise creation of a manifest via the Manifest class.""" base_path = "gs://bucket" run_id = "run_id" + pipeline_name = "pipeline_name" component_id = "component_id" manifest = Manifest.create( + pipeline_name=pipeline_name, base_path=base_path, run_id=run_id, component_id=component_id, @@ -129,6 +131,7 @@ def test_manifest_creation(): assert manifest._specification == { "metadata": { + "pipeline_name": pipeline_name, "base_path": base_path, "run_id": run_id, "component_id": component_id, @@ -154,11 +157,17 @@ def test_manifest_creation(): def test_manifest_repr(): - manifest = Manifest.create(base_path="/", run_id="A", component_id="1") + manifest = Manifest.create( + pipeline_name="NAME", + base_path="/", + run_id="A", + component_id="1", + ) assert ( manifest.__repr__() - == "Manifest({'metadata': {'base_path': '/', 'run_id': 'A', 'component_id': '1'}, " - "'index': {'location': '/index/A/1'}, 'subsets': {}})" + == "Manifest({'metadata': {'base_path': '/', 'pipeline_name': 'NAME'," + " 'run_id': 'A', 'component_id': '1'}, 'index': {'location': '/index/A/1'}," + " 'subsets': {}})" ) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 60892117c..e9e4ac7cd 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -144,13 +144,13 @@ def test_valid_pipeline( pipeline.sort_graph() assert list(pipeline._graph.keys()) == [ - "First component", - "Second component", - "Third component", + "first_component", + "second_component", + "third_component", ] - assert pipeline._graph["First component"]["dependencies"] == [] - assert pipeline._graph["Second component"]["dependencies"] == ["First component"] - assert pipeline._graph["Third component"]["dependencies"] == ["Second component"] + assert pipeline._graph["first_component"]["dependencies"] == [] + assert pipeline._graph["second_component"]["dependencies"] == ["first_component"] + assert pipeline._graph["third_component"]["dependencies"] == ["second_component"] pipeline._validate_pipeline_definition("test_pipeline")