From 757033eda767bc80607f2c5c7b7a6693055bb3cf Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 16 Aug 2023 09:17:13 +0200 Subject: [PATCH] Remove output_partition_size argument and logic (#355) PR to remove automatic logic of repartitioning the output based on size --- components/load_from_files/README.md | 1 - docs/pipeline.md | 2 - .../controlnet-interior-design/pipeline.py | 1 - src/fondant/component_spec.py | 9 - src/fondant/data_io.py | 38 -- src/fondant/executor.py | 13 +- src/fondant/pipeline.py | 12 +- .../example_1/docker-compose.yml | 8 +- .../example_1/kubeflow_pipeline.yml | 330 ++++++++++-------- .../example_2/docker-compose.yml | 4 - .../example_2/kubeflow_pipeline.yml | 245 +++++++------ .../compiled_pipeline/kubeflow_pipeline.yml | 122 ++++--- .../component_specs/kubeflow_component.yaml | 5 - tests/test_compiler.py | 6 +- tests/test_component.py | 7 - tests/test_data_io.py | 7 +- tests/test_pipeline.py | 30 +- 17 files changed, 400 insertions(+), 440 deletions(-) diff --git a/components/load_from_files/README.md b/components/load_from_files/README.md index ae5f40684..962baab36 100644 --- a/components/load_from_files/README.md +++ b/components/load_from_files/README.md @@ -29,7 +29,6 @@ load_from_files = ComponentOp( "directory_uri": "./data.zip", # change this to your # directory_uri, remote or local }, - output_partition_size="10MB", ) my_pipeline.add_op(load_from_files, dependencies=[]) diff --git a/docs/pipeline.md b/docs/pipeline.md index 8b6585c17..beb355601 100644 --- a/docs/pipeline.md +++ b/docs/pipeline.md @@ -86,7 +86,6 @@ caption_images_op = ComponentOp( "max_new_tokens": 50, }, input_partition_rows='disable', - output_partition_size='disable', ) ``` @@ -105,7 +104,6 @@ caption_images_op = ComponentOp( "max_new_tokens": 50, }, input_partition_rows=100, - output_partition_size="10MB", ) ``` diff --git a/examples/pipelines/controlnet-interior-design/pipeline.py b/examples/pipelines/controlnet-interior-design/pipeline.py index be27ecb7e..f176cf5fc 100644 --- a/examples/pipelines/controlnet-interior-design/pipeline.py +++ b/examples/pipelines/controlnet-interior-design/pipeline.py @@ -17,7 +17,6 @@ generate_prompts_op = ComponentOp( component_dir="components/generate_prompts", arguments={"n_rows_to_load": None}, - output_partition_size="disable", ) laion_retrieval_op = ComponentOp.from_registry( name="prompt_based_laion_retrieval", diff --git a/src/fondant/component_spec.py b/src/fondant/component_spec.py index 581342025..c00b39683 100644 --- a/src/fondant/component_spec.py +++ b/src/fondant/component_spec.py @@ -263,13 +263,6 @@ def from_fondant_component_spec( "type": "String", "default": "None", }, - { - "name": "output_partition_size", - "description": "The size of the output partition size, defaults" - " to 250MB. Set to `disable` to disable the automatic partitioning", - "type": "String", - "default": "None", - }, *( { "name": arg.name, @@ -301,8 +294,6 @@ def from_fondant_component_spec( {"inputValue": "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--output_partition_size", - {"inputValue": "output_partition_size"}, *cls._dump_args(fondant_component.args.values()), "--output_manifest_path", {"outputPath": "output_manifest_path"}, diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index 64f4f89f8..448fcbf37 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -153,50 +153,12 @@ def __init__( *, manifest: Manifest, component_spec: ComponentSpec, - output_partition_size: t.Optional[t.Union[str]] = None, ): super().__init__(manifest=manifest, component_spec=component_spec) - self.output_partition_size = output_partition_size - - def partition_written_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: - """ - Function that partitions the written dataframe to smaller partitions based on a given - partition size. - """ - if self.output_partition_size != "disable": - if isinstance(self.output_partition_size, str): - dataframe = dataframe.repartition( - partition_size=self.output_partition_size, - ) - logger.info( - f"Repartitioning the written data such that the size per partition is approx." - f" {self.output_partition_size}", - ) - - elif self.output_partition_size is None: - dataframe = dataframe.repartition(partition_size="250MB") - logger.info( - "Repartitioning the written data such that the size per partition is approx." - " 250MB. (Automatic repartitioning)", - ) - else: - msg = ( - f"{self.output_partition_size} is not a valid argument. Choose either the" - f" number of size of the partition (e.g. '250Mb' or set to 'disable' to" - f" disable automated partitioning" - ) - raise ValueError( - msg, - ) - - return dataframe - def write_dataframe(self, dataframe: dd.DataFrame) -> None: write_tasks = [] - dataframe = self.partition_written_dataframe(dataframe) - dataframe.index = dataframe.index.rename("id").astype("string") # Turn index into an empty dataframe so we can write it diff --git a/src/fondant/executor.py b/src/fondant/executor.py index afa7801cb..a434f0c85 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -25,7 +25,7 @@ from fondant.component_spec import Argument, ComponentSpec, kubeflow2python_type from fondant.data_io import DaskDataLoader, DaskDataWriter from fondant.manifest import Manifest -from fondant.schema import validate_partition_number, validate_partition_size +from fondant.schema import validate_partition_number logger = logging.getLogger(__name__) @@ -42,7 +42,6 @@ def __init__( metadata: t.Dict[str, t.Any], user_arguments: t.Dict[str, t.Any], input_partition_rows: t.Optional[t.Union[str, int]] = None, - output_partition_size: t.Optional[str] = None, ) -> None: self.spec = spec self.input_manifest_path = input_manifest_path @@ -50,7 +49,6 @@ def __init__( self.metadata = metadata self.user_arguments = user_arguments self.input_partition_rows = input_partition_rows - self.output_partition_size = output_partition_size @classmethod def from_args(cls) -> "Executor": @@ -58,7 +56,6 @@ def from_args(cls) -> "Executor": parser = argparse.ArgumentParser() parser.add_argument("--component_spec", type=json.loads) parser.add_argument("--input_partition_rows", type=validate_partition_number) - parser.add_argument("--output_partition_size", type=validate_partition_size) args, _ = parser.parse_known_args() if "component_spec" not in args: @@ -67,12 +64,10 @@ def from_args(cls) -> "Executor": component_spec = ComponentSpec(args.component_spec) input_partition_rows = args.input_partition_rows - output_partition_size = args.output_partition_size return cls.from_spec( component_spec, input_partition_rows, - output_partition_size, ) @classmethod @@ -80,7 +75,6 @@ def from_spec( cls, component_spec: ComponentSpec, input_partition_rows: t.Optional[t.Union[str, int]], - output_partition_size: t.Optional[str], ) -> "Executor": """Create an executor from a component spec.""" args_dict = vars(cls._add_and_parse_args(component_spec)) @@ -91,9 +85,6 @@ def from_spec( if "input_partition_rows" in args_dict: args_dict.pop("input_partition_rows") - if "output_partition_size" in args_dict: - args_dict.pop("output_partition_size") - input_manifest_path = args_dict.pop("input_manifest_path") output_manifest_path = args_dict.pop("output_manifest_path") metadata = args_dict.pop("metadata") @@ -106,7 +97,6 @@ def from_spec( metadata=metadata, user_arguments=args_dict, input_partition_rows=input_partition_rows, - output_partition_size=output_partition_size, ) @classmethod @@ -182,7 +172,6 @@ def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest): data_writer = DaskDataWriter( manifest=manifest, component_spec=self.spec, - output_partition_size=self.output_partition_size, ) data_writer.write_dataframe(dataframe) diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index 2b2a3dcd0..4ec60972d 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -14,7 +14,7 @@ from fondant.exceptions import InvalidPipelineDefinition from fondant.import_utils import is_kfp_available from fondant.manifest import Manifest -from fondant.schema import validate_partition_number, validate_partition_size +from fondant.schema import validate_partition_number if is_kfp_available(): from kubernetes import client as k8s_client @@ -32,8 +32,6 @@ class ComponentOp: arguments: A dictionary containing the argument name and value for the operation. input_partition_rows: The number of rows to load per partition. Set to override the automatic partitioning - output_partition_size: the size of the output written dataset. Defaults to 250MB, - set to "disable" to disable automatic repartitioning of the output, number_of_gpus: The number of gpus to assign to the operation node_pool_label: The label of the node pool to which the operation will be assigned. node_pool_name: The name of the node pool to which the operation will be assigned. @@ -61,7 +59,6 @@ def __init__( *, arguments: t.Optional[t.Dict[str, t.Any]] = None, input_partition_rows: t.Optional[t.Union[str, int]] = None, - output_partition_size: t.Optional[str] = None, number_of_gpus: t.Optional[int] = None, node_pool_label: t.Optional[str] = None, node_pool_name: t.Optional[str] = None, @@ -70,7 +67,6 @@ def __init__( ) -> None: self.component_dir = Path(component_dir) self.input_partition_rows = input_partition_rows - self.output_partitioning_size = output_partition_size self.arguments = self._set_arguments(arguments) self.component_spec = ComponentSpec.from_file( @@ -96,10 +92,8 @@ def _set_arguments( arguments = arguments or {} input_partition_rows = validate_partition_number(self.input_partition_rows) - output_partition_size = validate_partition_size(self.output_partitioning_size) arguments["input_partition_rows"] = str(input_partition_rows) - arguments["output_partition_size"] = str(output_partition_size) return arguments @@ -128,7 +122,6 @@ def from_registry( *, arguments: t.Optional[t.Dict[str, t.Any]] = None, input_partition_rows: t.Optional[t.Union[int, str]] = None, - output_partition_size: t.Optional[str] = None, number_of_gpus: t.Optional[int] = None, node_pool_label: t.Optional[str] = None, node_pool_name: t.Optional[str] = None, @@ -142,8 +135,6 @@ def from_registry( arguments: A dictionary containing the argument name and value for the operation. input_partition_rows: The number of rows to load per partition. Set to override the automatic partitioning - output_partition_size: the size of the output written dataset. Defaults to 250MB, - set to "disable" to disable automatic repartitioning of the output, number_of_gpus: The number of gpus to assign to the operation node_pool_label: The label of the node pool to which the operation will be assigned. node_pool_name: The name of the node pool to which the operation will be assigned. @@ -163,7 +154,6 @@ def from_registry( components_dir, arguments=arguments, input_partition_rows=input_partition_rows, - output_partition_size=output_partition_size, number_of_gpus=number_of_gpus, node_pool_label=node_pool_label, node_pool_name=node_pool_name, diff --git a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml index cae66f6fb..b7d31f3b2 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml @@ -13,8 +13,6 @@ services: - a dummy string arg - --input_partition_rows - disable - - --output_partition_size - - disable - --component_spec - '{"name": "First component", "description": "This is an example component", "image": "example_component:latest", "produces": {"images": {"fields": {"data": @@ -35,8 +33,6 @@ services: - a dummy string arg - --input_partition_rows - '10' - - --output_partition_size - - 30MB - --component_spec - '{"name": "Second component", "description": "This is an example component", "image": "example_component:latest", "consumes": {"images": {"fields": {"data": @@ -64,8 +60,6 @@ services: - '[1, 2, 3]' - --input_partition_rows - None - - --output_partition_size - - None - --component_spec - '{"name": "Third component", "description": "This is an example component", "image": "example_component:latest", "consumes": {"images": {"fields": {"data": @@ -81,4 +75,4 @@ services: second_component: condition: service_completed_successfully volumes: [] -version: '3.8' \ No newline at end of file +version: '3.8' diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml index f8127c8e6..d34a20c9e 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -1,44 +1,62 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: + annotations: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' + pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}' generateName: kfp-pipeline- - annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', - pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}'} - labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 spec: + arguments: + parameters: [] entrypoint: kfp-pipeline + serviceAccountName: pipeline-runner templates: - - name: first-component - container: + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": - {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', - --input_partition_rows, disable, --output_partition_size, disable, --storage_args, - a dummy string arg, --output_manifest_path, /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": {"type": + "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' + - --input_partition_rows + - disable + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: {data: ''} - outputs: - artifacts: - - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + raw: + data: '' metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "disable", "metadata": + "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "storage_args": + "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", @@ -47,67 +65,82 @@ spec: "description": "The component specification as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": "None", "description": "The number of rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"default": "None", "description": - "The size of the output partition size, defaults to 250MB. Set to `disable` - to disable the automatic partitioning", "name": "output_partition_size", - "type": "String"}, {"description": "Storage arguments", "name": "storage_args", - "type": "String"}], "name": "First component", "outputs": [{"description": - "Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}', - pipelines.kubeflow.org/component_ref: '{"digest": "561ddfe38aa8378f4ea92b26ef6bdeb53b1e9b2fc3c0908800738c304fdca30a"}', - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": - {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}}", "input_partition_rows": "disable", "metadata": - "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": - "disable", "storage_args": "a dummy string arg"}'} - - name: kfp-pipeline - dag: + "name": "input_partition_rows", "type": "String"}, {"description": "Storage + arguments", "name": "storage_args", "type": "String"}], "name": "First component", + "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: first-component + outputs: + artifacts: + - name: first-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - dag: tasks: - - {name: first-component, template: first-component} - - name: second-component + - name: first-component + template: first-component + - arguments: + artifacts: + - from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}' + name: first-component-output_manifest_path + dependencies: + - first-component + name: second-component template: second-component - dependencies: [first-component] - arguments: + - arguments: artifacts: - - {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'} - - name: third-component + - from: '{{tasks.second-component.outputs.artifacts.second-component-output_manifest_path}}' + name: second-component-output_manifest_path + dependencies: + - second-component + name: third-component template: third-component - dependencies: [second-component] - arguments: - artifacts: - - {name: second-component-output_manifest_path, from: '{{tasks.second-component.outputs.artifacts.second-component-output_manifest_path}}'} - - name: second-component - container: + name: kfp-pipeline + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "description": - "This is an example component", "image": "example_component:latest", "name": - "Second component", "produces": {"embeddings": {"fields": {"data": {"items": - {"type": "float32"}, "type": "array"}}}}}', --input_partition_rows, '10', - --output_partition_size, 30MB, --storage_args, a dummy string arg, --output_manifest_path, - /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "description": + "This is an example component", "image": "example_component:latest", "name": + "Second component", "produces": {"embeddings": {"fields": {"data": {"items": + {"type": "float32"}, "type": "array"}}}}}' + - --input_partition_rows + - '10' + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - - {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} - outputs: - artifacts: - - {name: second-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + - name: first-component-output_manifest_path + path: /tmp/inputs/input_manifest_path/data metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"consumes\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"Second component\", \"produces\": {\"embeddings\": {\"fields\": + {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}}}", + "input_partition_rows": "10", "metadata": "{\"base_path\": \"/foo/bar\", + \"run_id\": \"{{workflow.name}}\"}", "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "a02b0189397a2d9318982201f020dbbbe3962427ed150fe58cc69ff508cc68bb"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", @@ -116,55 +149,70 @@ spec: "description": "The component specification as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": "None", "description": "The number of rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"default": "None", "description": - "The size of the output partition size, defaults to 250MB. Set to `disable` - to disable the automatic partitioning", "name": "output_partition_size", - "type": "String"}, {"description": "Storage arguments", "name": "storage_args", - "type": "String"}], "name": "Second component", "outputs": [{"description": - "Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}', - pipelines.kubeflow.org/component_ref: '{"digest": "b20d3957f48cd2540e594e8c9f2f1f67f5a299152522c61a71f697f5e40278c7"}', - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"consumes\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"Second component\", \"produces\": {\"embeddings\": {\"fields\": - {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}}}", - "input_partition_rows": "10", "metadata": "{\"base_path\": \"/foo/bar\", - \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": "30MB", "storage_args": - "a dummy string arg"}'} - - name: third-component - container: + "name": "input_partition_rows", "type": "String"}, {"description": "Storage + arguments", "name": "storage_args", "type": "String"}], "name": "Second + component", "outputs": [{"description": "Path to the output manifest", "name": + "output_manifest_path", "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: second-component + outputs: + artifacts: + - name: second-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"some_list": {"description": "Some list", "items": {"type": "int"}, - "type": "list"}, "storage_args": {"description": "Storage arguments", "type": - "str"}}, "consumes": {"captions": {"fields": {"data": {"type": "string"}}}, - "embeddings": {"fields": {"data": {"items": {"type": "float32"}, "type": - "array"}}}, "images": {"fields": {"data": {"type": "binary"}}}}, "description": - "This is an example component", "image": "example_component:latest", "name": - "Third component", "produces": {"additionalSubsets": false, "images": {"fields": - {"data": {"type": "binary"}}}}}', --input_partition_rows, None, --output_partition_size, - None, --storage_args, a dummy string arg, --some_list, '[1, 2, 3]', --output_manifest_path, - /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' + - --component_spec + - '{"args": {"some_list": {"description": "Some list", "items": {"type": "int"}, + "type": "list"}, "storage_args": {"description": "Storage arguments", "type": + "str"}}, "consumes": {"captions": {"fields": {"data": {"type": "string"}}}, + "embeddings": {"fields": {"data": {"items": {"type": "float32"}, "type": "array"}}}, + "images": {"fields": {"data": {"type": "binary"}}}}, "description": "This + is an example component", "image": "example_component:latest", "name": "Third + component", "produces": {"additionalSubsets": false, "images": {"fields": + {"data": {"type": "binary"}}}}}' + - --input_partition_rows + - None + - --storage_args + - a dummy string arg + - --some_list + - '[1, 2, 3]' + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - - {name: second-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} - outputs: - artifacts: - - {name: third-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + - name: second-component-output_manifest_path + path: /tmp/inputs/input_manifest_path/data metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"some_list\": {\"description\": \"Some list\", \"items\": {\"type\": \"int\"}, + \"type\": \"list\"}, \"storage_args\": {\"description\": \"Storage arguments\", + \"type\": \"str\"}}, \"consumes\": {\"captions\": {\"fields\": {\"data\": + {\"type\": \"string\"}}}, \"embeddings\": {\"fields\": {\"data\": {\"items\": + {\"type\": \"float32\"}, \"type\": \"array\"}}}, \"images\": {\"fields\": + {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"This is an example + component\", \"image\": \"example_component:latest\", \"name\": \"Third + component\", \"produces\": {\"additionalSubsets\": false, \"images\": {\"fields\": + {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", + "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", + "some_list": "[1, 2, 3]", "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "253932349a663809f2ea6fcf63ebd58f963881c6960435269d3fbe3eb17dcf53"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", {"inputValue": "storage_args"}, "--some_list", {"inputValue": "some_list"}, "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": [{"description": "Path to the input @@ -174,26 +222,16 @@ spec: as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": "None", "description": "The number of rows to load per partition. Set to override the automatic partitioning", "name": "input_partition_rows", "type": - "String"}, {"default": "None", "description": "The size of the output partition - size, defaults to 250MB. Set to `disable` to disable the automatic partitioning", - "name": "output_partition_size", "type": "String"}, {"description": "Storage - arguments", "name": "storage_args", "type": "String"}, {"description": "Some - list", "name": "some_list", "type": "JsonArray"}], "name": "Third component", - "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "936f0e13275cc8aab199925252dffe2720a01d94af50e5aa78bf9819ccb4ab27"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"some_list\": {\"description\": \"Some list\", \"items\": - {\"type\": \"int\"}, \"type\": \"list\"}, \"storage_args\": {\"description\": - \"Storage arguments\", \"type\": \"str\"}}, \"consumes\": {\"captions\": - {\"fields\": {\"data\": {\"type\": \"string\"}}}, \"embeddings\": {\"fields\": - {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, \"description\": - \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"Third component\", \"produces\": {\"additionalSubsets\": false, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": - "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", - "output_partition_size": "None", "some_list": "[1, 2, 3]", "storage_args": - "a dummy string arg"}'} - arguments: - parameters: [] - serviceAccountName: pipeline-runner + "String"}, {"description": "Storage arguments", "name": "storage_args", + "type": "String"}, {"description": "Some list", "name": "some_list", "type": + "JsonArray"}], "name": "Third component", "outputs": [{"description": "Path + to the output manifest", "name": "output_manifest_path", "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: third-component + outputs: + artifacts: + - name: third-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data diff --git a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml index df29f07f7..807d281ba 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml @@ -13,8 +13,6 @@ services: - a dummy string arg - --input_partition_rows - None - - --output_partition_size - - None - --component_spec - '{"name": "First component", "description": "This is an example component", "image": "example_component:latest", "produces": {"images": {"fields": {"data": @@ -34,8 +32,6 @@ services: - '0' - --input_partition_rows - None - - --output_partition_size - - None - --component_spec - '{"name": "Image cropping", "description": "Component that removes single-colored borders around images and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index 4bb853ea2..68ae61bda 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -1,44 +1,62 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: + annotations: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' + pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}' generateName: kfp-pipeline- - annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', - pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}'} - labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 spec: + arguments: + parameters: [] entrypoint: kfp-pipeline + serviceAccountName: pipeline-runner templates: - - name: first-component - container: + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": - {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', - --input_partition_rows, None, --output_partition_size, None, --storage_args, - a dummy string arg, --output_manifest_path, /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": {"type": + "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' + - --input_partition_rows + - None + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: {data: ''} - outputs: - artifacts: - - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + raw: + data: '' metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": + "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "storage_args": + "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", @@ -47,101 +65,110 @@ spec: "description": "The component specification as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": "None", "description": "The number of rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"default": "None", "description": - "The size of the output partition size, defaults to 250MB. Set to `disable` - to disable the automatic partitioning", "name": "output_partition_size", - "type": "String"}, {"description": "Storage arguments", "name": "storage_args", - "type": "String"}], "name": "First component", "outputs": [{"description": - "Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}', - pipelines.kubeflow.org/component_ref: '{"digest": "561ddfe38aa8378f4ea92b26ef6bdeb53b1e9b2fc3c0908800738c304fdca30a"}', - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": - {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": - "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": - "None", "storage_args": "a dummy string arg"}'} - - name: image-cropping - container: + "name": "input_partition_rows", "type": "String"}, {"description": "Storage + arguments", "name": "storage_args", "type": "String"}], "name": "First component", + "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: first-component + outputs: + artifacts: + - name: first-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"cropping_threshold": {"default": -30, "description": "Threshold - parameter used for detecting borders. A lower (negative) parameter results - in a more performant border detection, but can cause overcropping. Default - is -30", "type": "int"}, "padding": {"default": 10, "description": "Padding - for the image cropping. The padding is added to all borders of the image.", - "type": "int"}}, "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, - "description": "Component that removes single-colored borders around images - and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", - "name": "Image cropping", "produces": {"images": {"fields": {"data": {"type": - "binary"}, "height": {"type": "int32"}, "width": {"type": "int32"}}}}}', - --input_partition_rows, None, --output_partition_size, None, --cropping_threshold, - '0', --padding, '0', --output_manifest_path, /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' + - --component_spec + - '{"args": {"cropping_threshold": {"default": -30, "description": "Threshold + parameter used for detecting borders. A lower (negative) parameter results + in a more performant border detection, but can cause overcropping. Default + is -30", "type": "int"}, "padding": {"default": 10, "description": "Padding + for the image cropping. The padding is added to all borders of the image.", + "type": "int"}}, "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, + "description": "Component that removes single-colored borders around images + and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", + "name": "Image cropping", "produces": {"images": {"fields": {"data": {"type": + "binary"}, "height": {"type": "int32"}, "width": {"type": "int32"}}}}}' + - --input_partition_rows + - None + - --cropping_threshold + - '0' + - --padding + - '0' + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: ghcr.io/ml6team/image_cropping:dev inputs: artifacts: - - {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} - outputs: - artifacts: - - {name: image-cropping-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + - name: first-component-output_manifest_path + path: /tmp/inputs/input_manifest_path/data metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Component - that removes single-colored borders around images and crops them appropriately", - "implementation": {"container": {"command": ["python3", "main.py", "--input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"cropping_threshold\": {\"default\": -30, \"description\": \"Threshold + parameter used for detecting borders. A lower (negative) parameter results + in a more performant border detection, but can cause overcropping. Default + is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": + \"Padding for the image cropping. The padding is added to all borders of + the image.\", \"type\": \"int\"}}, \"consumes\": {\"images\": {\"fields\": + {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"Component that + removes single-colored borders around images and crops them appropriately\", + \"image\": \"ghcr.io/ml6team/image_cropping:dev\", \"name\": \"Image cropping\", + \"produces\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}, + \"height\": {\"type\": \"int32\"}, \"width\": {\"type\": \"int32\"}}}}}", + "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": + \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "padding": "0"}' + pipelines.kubeflow.org/component_ref: '{"digest": "e86f02b6b9cc878b6187e44bb3caf9291c3ce42c1939e19b0a97dacdc78a9d72"}' + pipelines.kubeflow.org/component_spec: '{"description": "Component that removes + single-colored borders around images and crops them appropriately", "implementation": + {"container": {"command": ["python3", "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, "--input_partition_rows", - {"inputValue": "input_partition_rows"}, "--output_partition_size", {"inputValue": - "output_partition_size"}, "--cropping_threshold", {"inputValue": "cropping_threshold"}, - "--padding", {"inputValue": "padding"}, "--output_manifest_path", {"outputPath": - "output_manifest_path"}], "image": "ghcr.io/ml6team/image_cropping:dev"}}, + {"inputValue": "input_partition_rows"}, "--cropping_threshold", {"inputValue": + "cropping_threshold"}, "--padding", {"inputValue": "padding"}, "--output_manifest_path", + {"outputPath": "output_manifest_path"}], "image": "ghcr.io/ml6team/image_cropping:dev"}}, "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": "None", "description": "The number of rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"default": "None", "description": - "The size of the output partition size, defaults to 250MB. Set to `disable` - to disable the automatic partitioning", "name": "output_partition_size", - "type": "String"}, {"default": -30, "description": "Threshold parameter - used for detecting borders. A lower (negative) parameter results in a more - performant border detection, but can cause overcropping. Default is -30", - "name": "cropping_threshold", "type": "Integer"}, {"default": 10, "description": - "Padding for the image cropping. The padding is added to all borders of - the image.", "name": "padding", "type": "Integer"}], "name": "Image cropping", - "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "08066cab55f911d084af84cc08a555794a9cf94569fe4b991836a39fd3f76f86"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"cropping_threshold\": {\"default\": -30, \"description\": - \"Threshold parameter used for detecting borders. A lower (negative) parameter + "name": "input_partition_rows", "type": "String"}, {"default": -30, "description": + "Threshold parameter used for detecting borders. A lower (negative) parameter results in a more performant border detection, but can cause overcropping. - Default is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": - \"Padding for the image cropping. The padding is added to all borders of - the image.\", \"type\": \"int\"}}, \"consumes\": {\"images\": {\"fields\": - {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"Component that - removes single-colored borders around images and crops them appropriately\", - \"image\": \"ghcr.io/ml6team/image_cropping:dev\", \"name\": \"Image cropping\", - \"produces\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}, - \"height\": {\"type\": \"int32\"}, \"width\": {\"type\": \"int32\"}}}}}", - "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": - \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": - "None", "padding": "0"}'} - - name: kfp-pipeline - dag: + Default is -30", "name": "cropping_threshold", "type": "Integer"}, {"default": + 10, "description": "Padding for the image cropping. The padding is added + to all borders of the image.", "name": "padding", "type": "Integer"}], "name": + "Image cropping", "outputs": [{"description": "Path to the output manifest", + "name": "output_manifest_path", "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: image-cropping + outputs: + artifacts: + - name: image-cropping-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - dag: tasks: - - {name: first-component, template: first-component} - - name: image-cropping - template: image-cropping - dependencies: [first-component] - arguments: + - name: first-component + template: first-component + - arguments: artifacts: - - {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'} - arguments: - parameters: [] - serviceAccountName: pipeline-runner + - from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}' + name: first-component-output_manifest_path + dependencies: + - first-component + name: image-cropping + template: image-cropping + name: kfp-pipeline diff --git a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml index 907cea309..246809542 100644 --- a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml @@ -1,50 +1,70 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: + annotations: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' + pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}' generateName: kfp-pipeline- - annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', - pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}'} - labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 spec: + arguments: + parameters: [] entrypoint: kfp-pipeline + serviceAccountName: pipeline-runner templates: - - name: first-component - container: + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": - {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', - --input_partition_rows, None, --output_partition_size, None, --storage_args, - a dummy string arg, --output_manifest_path, /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": {"type": + "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' + - --input_partition_rows + - None + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest resources: - limits: {nvidia.com/gpu: 1} - requests: {ephemeral-storage: 1Gi} + limits: + nvidia.com/gpu: 1 + requests: + ephemeral-storage: 1Gi volumeMounts: - - {mountPath: /mnt, name: mypvc} + - mountPath: /mnt + name: mypvc inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: {data: ''} - outputs: - artifacts: - - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} - nodeSelector: {node_pool: a_node_pool} + raw: + data: '' metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": + "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "storage_args": + "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", @@ -53,28 +73,26 @@ spec: "description": "The component specification as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": "None", "description": "The number of rows to load per partition. Set to override the automatic partitioning", - "name": "input_partition_rows", "type": "String"}, {"default": "None", "description": - "The size of the output partition size, defaults to 250MB. Set to `disable` - to disable the automatic partitioning", "name": "output_partition_size", - "type": "String"}, {"description": "Storage arguments", "name": "storage_args", - "type": "String"}], "name": "First component", "outputs": [{"description": - "Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}', - pipelines.kubeflow.org/component_ref: '{"digest": "561ddfe38aa8378f4ea92b26ef6bdeb53b1e9b2fc3c0908800738c304fdca30a"}', - pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": - {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, - \"description\": \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": - {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": - "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": - "None", "storage_args": "a dummy string arg"}'} + "name": "input_partition_rows", "type": "String"}, {"description": "Storage + arguments", "name": "storage_args", "type": "String"}], "name": "First component", + "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: first-component + nodeSelector: + node_pool: a_node_pool + outputs: + artifacts: + - name: first-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data volumes: - emptyDir: {} name: mypvc - - name: kfp-pipeline - dag: + - dag: tasks: - - {name: first-component, template: first-component} - arguments: - parameters: [] - serviceAccountName: pipeline-runner + - name: first-component + template: first-component + name: kfp-pipeline diff --git a/tests/example_specs/component_specs/kubeflow_component.yaml b/tests/example_specs/component_specs/kubeflow_component.yaml index aa74a9de0..b63a4b983 100644 --- a/tests/example_specs/component_specs/kubeflow_component.yaml +++ b/tests/example_specs/component_specs/kubeflow_component.yaml @@ -16,9 +16,6 @@ inputs: partitioning type: String default: None -- name: output_partition_size - description: The size of the output partition size, defaults to 250MB. Set to - `disable` to disable the automatic partitioning type: String default: None - name: storage_args @@ -42,8 +39,6 @@ implementation: - inputValue: component_spec - --input_partition_rows - inputValue: input_partition_rows - - --output_partition_size - - inputValue: output_partition_size - --storage_args - inputValue: storage_args - --output_manifest_path diff --git a/tests/test_compiler.py b/tests/test_compiler.py index 553069a42..f69dc6c45 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -20,13 +20,11 @@ Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, input_partition_rows="disable", - output_partition_size="disable", ), ComponentOp( Path(COMPONENTS_PATH / "example_1" / "second_component"), arguments={"storage_args": "a dummy string arg"}, input_partition_rows="10", - output_partition_size="30MB", ), ComponentOp( Path(COMPONENTS_PATH / "example_1" / "fourth_component"), @@ -197,7 +195,7 @@ def test_kubeflow_compiler(setup_pipeline, tmp_path_factory): with open(output_path) as src, open( VALID_PIPELINE / example_dir / "kubeflow_pipeline.yml", ) as truth: - assert src.read() == truth.read() + assert yaml.safe_load(src) == yaml.safe_load(truth) @pytest.mark.usefixtures("_freeze_time") @@ -227,7 +225,7 @@ def test_kubeflow_configuration(tmp_path_factory): with open(output_path) as src, open( VALID_PIPELINE / "kubeflow_pipeline.yml", ) as truth: - assert src.read() == truth.read() + assert yaml.safe_load(src) == yaml.safe_load(truth) def test_kfp_import(): diff --git a/tests/test_component.py b/tests/test_component.py index d051ccb4f..612e9c7b9 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -88,8 +88,6 @@ def test_component_arguments(): yaml_file_to_json_string(components_path / "arguments/component.yaml"), "--input_partition_rows", "100", - "--output_partition_size", - "100MB", "--override_default_arg", "bar", "--override_default_none_arg", @@ -110,7 +108,6 @@ def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]: executor = MyExecutor.from_args() expected_partition_row_arg = 100 assert executor.input_partition_rows == expected_partition_row_arg - assert executor.output_partition_size == "100MB" assert executor.user_arguments == { "string_default_arg": "foo", "integer_default_arg": 0, @@ -164,7 +161,6 @@ def load(self): executor = DaskLoadExecutor.from_args() assert executor.input_partition_rows is None - assert executor.output_partition_size is None load = patch_method_class(MyLoadComponent.load) with mock.patch.object(MyLoadComponent, "load", load): executor.execute(MyLoadComponent) @@ -186,8 +182,6 @@ def test_dask_transform_component(): "1", "--input_partition_rows", "disable", - "--output_partition_size", - "disable", "--output_manifest_path", str(components_path / "output_manifest.json"), "--component_spec", @@ -207,7 +201,6 @@ def transform(self, dataframe): executor = DaskTransformExecutor.from_args() assert executor.input_partition_rows == "disable" - assert executor.output_partition_size == "disable" transform = patch_method_class(MyDaskComponent.transform) with mock.patch.object( MyDaskComponent, diff --git a/tests/test_data_io.py b/tests/test_data_io.py index ff2920474..7aef87ba7 100644 --- a/tests/test_data_io.py +++ b/tests/test_data_io.py @@ -101,15 +101,17 @@ def test_write_index(tmp_path_factory, dataframe, manifest, component_spec): data_writer = DaskDataWriter( manifest=manifest, component_spec=component_spec, - output_partition_size="1TB", ) # write out index to temp dir data_writer.write_dataframe(dataframe) + number_workers = os.cpu_count() # read written data and assert dataframe = dd.read_parquet(fn / "index") assert len(dataframe) == NUMBER_OF_TEST_ROWS assert dataframe.index.name == "id" - assert dataframe.npartitions == 1 + assert dataframe.npartitions in list( + range(number_workers - 1, number_workers + 2), + ) def test_write_subsets(tmp_path_factory, dataframe, manifest, component_spec): @@ -168,7 +170,6 @@ def test_write_divisions( data_writer = DaskDataWriter( manifest=manifest, component_spec=component_spec, - output_partition_size="disable", ) data_writer.write_dataframe(dataframe) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index b4270e725..9b80c9121 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -3,7 +3,7 @@ import pytest import yaml -from fondant.exceptions import InvalidPipelineDefinition, InvalidTypeSchema +from fondant.exceptions import InvalidPipelineDefinition from fondant.pipeline import ComponentOp, Pipeline valid_pipeline_path = Path(__file__).parent / "example_pipelines/valid_pipeline" @@ -42,29 +42,8 @@ def test_component_op( ComponentOp( Path(components_path / component_names[0]), arguments=component_args, - output_partition_size=None, ) - ComponentOp( - Path(components_path / component_names[0]), - arguments=component_args, - output_partition_size="250MB", - ) - - with pytest.raises(InvalidTypeSchema): - ComponentOp( - Path(components_path / component_names[0]), - arguments=component_args, - output_partition_size="10", - ) - - with pytest.raises(InvalidTypeSchema): - ComponentOp( - Path(components_path / component_names[0]), - arguments=component_args, - output_partition_size="250 MB", - ) - with pytest.raises(InvalidPipelineDefinition): ComponentOp( Path(components_path / component_names[0]), @@ -72,13 +51,6 @@ def test_component_op( node_pool_label="dummy_label", ) - with pytest.raises(InvalidPipelineDefinition): - ComponentOp( - Path(components_path / component_names[0]), - arguments=component_args, - node_pool_name="dummy_name", - ) - @pytest.mark.parametrize( "valid_pipeline_example",