From f9238e2e158025fe954a5e65026cadfd71cabbdc Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 21 Mar 2024 11:43:55 +0100 Subject: [PATCH] Implementation new dataset interface (#902) First steps for the implementation of the new dataset interface: - Removed Pipeline class - Added Workspace singleton to hold pipeline name, base_path, etc. .. (shouldn't be the focus of this PR) - Moved `Pipeline.read(..)` to Dataset class --- docs/components/component_spec.md | 12 +- docs/components/lightweight_components.md | 2 +- docs/guides/build_a_simple_pipeline.md | 10 +- docs/guides/implement_custom_components.md | 2 +- docs/pipeline.md | 12 +- examples/sample_pipeline/pipeline.py | 15 +- src/fondant/cli.py | 104 ++++----- src/fondant/core/component_spec.py | 8 +- src/fondant/core/exceptions.py | 2 +- src/fondant/dataset/__init__.py | 2 +- src/fondant/dataset/compiler.py | 91 ++++---- src/fondant/dataset/dataset.py | 180 ++++++++++------ src/fondant/dataset/runner.py | 58 +++-- src/fondant/testing.py | 4 +- tests/core/test_manifest_evolution.py | 4 +- tests/examples/example_modules/dataset.py | 19 ++ .../invalid_double_pipeline.py | 4 - .../invalid_double_workspace.py | 4 + tests/examples/example_modules/pipeline.py | 19 -- tests/pipeline/test_compiler.py | 202 +++++++++++------- tests/pipeline/test_lightweight_component.py | 100 +++++---- tests/pipeline/test_pipeline.py | 147 +++++++------ tests/pipeline/test_runner.py | 48 +++-- tests/test_cli.py | 71 +++--- 24 files changed, 652 insertions(+), 468 deletions(-) create mode 100644 tests/examples/example_modules/dataset.py delete mode 100644 tests/examples/example_modules/invalid_double_pipeline.py create mode 100644 tests/examples/example_modules/invalid_double_workspace.py delete mode 100644 tests/examples/example_modules/pipeline.py diff --git a/docs/components/component_spec.md b/docs/components/component_spec.md index 1a7bd811..ac609259 100644 --- a/docs/components/component_spec.md +++ b/docs/components/component_spec.md @@ -102,7 +102,7 @@ If your dataset has a field called `custom_text` with type `string`, you can map ```python -dataset = pipeline.read(...) +dataset = Dataset.read(...) dataset = dataset.apply( "example_component", consumes={ @@ -127,7 +127,7 @@ so as follows: ```python -dataset = pipeline.read(...) +dataset = Dataset.read(...) dataset = dataset.apply( "example_component", produces={ @@ -165,7 +165,7 @@ in the component specification, so we will need to specify the schema of the fields when defining the components ```python -dataset = pipeline.read( +dataset = Dataset.read( "load_from_csv", arguments={ "dataset_uri": "path/to/dataset.csv", @@ -196,7 +196,7 @@ by the next component. We can either load the `image` field: ```python -dataset = pipeline.read( +dataset = Dataset.read( "load_from_csv", arguments={ "dataset_uri": "path/to/dataset.csv", @@ -219,7 +219,7 @@ or the `embedding` field: ```python -dataset = pipeline.read( +dataset = Dataset.read( "load_from_csv", arguments={ "dataset_uri": "path/to/dataset.csv", @@ -268,7 +268,7 @@ These arguments are passed in when the component is instantiated. If an argument is not explicitly provided, the default value will be used instead if available. ```python -dataset = pipeline.read( +dataset = Dataset.read( "custom_component", arguments={ "custom_argument": "foo" diff --git a/docs/components/lightweight_components.md b/docs/components/lightweight_components.md index 46710795..6b6cb816 100644 --- a/docs/components/lightweight_components.md +++ b/docs/components/lightweight_components.md @@ -53,7 +53,7 @@ pipeline = Pipeline( base_path="./data", ) -dataset = pipeline.read( +dataset = Dataset.read( ref=CreateData, ) diff --git a/docs/guides/build_a_simple_pipeline.md b/docs/guides/build_a_simple_pipeline.md index c4accf7f..4aab1e5b 100644 --- a/docs/guides/build_a_simple_pipeline.md +++ b/docs/guides/build_a_simple_pipeline.md @@ -44,7 +44,7 @@ pipeline = Pipeline( ??? "View a detailed reference of the options accepted by the `Pipeline` class" - ::: fondant.dataset.Pipeline.__init__ + ::: fondant.dataset.Dataset.__init__ handler: python options: show_source: false @@ -69,13 +69,13 @@ As a first step, we want to read data into our pipeline. In this case, we will l from the HuggingFace Hub. For this, we can use the reusable [load_from_hf_hub](../components/hub.md#load_from_hugging_face_hub#description) component. -We can read data into our pipeline using the `Pipeline.read()` method, which returns a (lazy) +We can read data into our pipeline using the `Dataset.read()` method, which returns a (lazy) `Dataset`. ```python import pyarrow as pa -dataset = pipeline.read( +dataset = Dataset.read( "load_from_hf_hub", arguments={ "dataset_name": "fondant-ai/fondant-cc-25m", @@ -101,9 +101,9 @@ We provide three arguments to the `.read()` method: defined in the component [documentation](../components/hub.md#load_from_hugging_face_hub#inputs_outputs) with `additionalProperties: true` under the produces section. -??? "View a detailed reference of the `Pipeline.read()` method" +??? "View a detailed reference of the `Dataset.read()` method" - ::: fondant.dataset.Pipeline.read + ::: fondant.dataset.Dataset.read handler: python options: show_source: false diff --git a/docs/guides/implement_custom_components.md b/docs/guides/implement_custom_components.md index 59459cbd..a9350524 100644 --- a/docs/guides/implement_custom_components.md +++ b/docs/guides/implement_custom_components.md @@ -33,7 +33,7 @@ pipeline = Pipeline( base_path="./data" ) -dataset = pipeline.read( +dataset = Dataset.read( "load_from_hf_hub", arguments={ "dataset_name": "fondant-ai/fondant-cc-25m", diff --git a/docs/pipeline.md b/docs/pipeline.md index ea091cd2..bf4b6c19 100644 --- a/docs/pipeline.md +++ b/docs/pipeline.md @@ -10,12 +10,12 @@ components and custom components, and chain them together. Start by creating a `pipeline.py` file and adding the following code. ```python -from fondant.pipeline import Pipeline +from fondant.dataset import Dataset + +#dataset = Dataset.read( +# .. +#) -pipeline = Pipeline( - name="my-pipeline", - base_path="./data", -) ``` We identify our pipeline with a name and provide a base path where the pipeline will store its @@ -49,7 +49,7 @@ dataset = Dataset.read( ??? "View a detailed reference of the `Dataset.read()` method" - ::: fondant.dataset.Pipeline.read + ::: fondant.dataset.Dataset.read handler: python options: show_source: false diff --git a/examples/sample_pipeline/pipeline.py b/examples/sample_pipeline/pipeline.py index fcc39d2c..3b083e19 100644 --- a/examples/sample_pipeline/pipeline.py +++ b/examples/sample_pipeline/pipeline.py @@ -8,30 +8,29 @@ import pyarrow as pa from fondant.component import PandasTransformComponent -from fondant.pipeline import Pipeline, lightweight_component +from fondant.dataset import Workspace, lightweight_component, Dataset BASE_PATH = Path("./.artifacts").resolve() # Define pipeline -pipeline = Pipeline(name="dummy-pipeline", base_path=str(BASE_PATH)) +workspace = Workspace(name="dummy-pipeline", base_path=str(BASE_PATH)) # Load from hub component load_component_column_mapping = { "text": "text_data", } -dataset = pipeline.read( +dataset = Dataset.read( "load_from_parquet", arguments={ "dataset_uri": "/data/sample.parquet", "column_name_mapping": load_component_column_mapping, }, produces={"text_data": pa.string()}, + workspace=workspace, ) -dataset = dataset.apply( - "./components/dummy_component", -) +dataset = dataset.apply("./components/dummy_component") dataset = dataset.apply( "chunk_text", @@ -63,5 +62,7 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: ) dataset.write( - ref="write_to_file", arguments={"path": "/data/export"}, consumes={"text": "text"} + ref="write_to_file", + arguments={"path": "/data/export"}, + consumes={"text": "text"}, ) diff --git a/src/fondant/cli.py b/src/fondant/cli.py index dc0f72c1..7515c32f 100644 --- a/src/fondant/cli.py +++ b/src/fondant/cli.py @@ -29,7 +29,7 @@ from types import ModuleType from fondant.core.schema import CloudCredentialsMount -from fondant.dataset import Pipeline +from fondant.dataset import Dataset, Workspace if t.TYPE_CHECKING: from fondant.component import Component @@ -416,7 +416,7 @@ def compile_local(args): if args.extra_volumes: extra_volumes.extend(args.extra_volumes) - pipeline = pipeline_from_string(args.ref) + pipeline = dataset_from_string(args.ref) compiler = DockerCompiler() compiler.compile( pipeline=pipeline, @@ -430,7 +430,7 @@ def compile_local(args): def compile_kfp(args): from fondant.dataset.compiler import KubeFlowCompiler - pipeline = pipeline_from_string(args.ref) + pipeline = dataset_from_string(args.ref) compiler = KubeFlowCompiler() compiler.compile(pipeline=pipeline, output_path=args.output_path) @@ -438,7 +438,7 @@ def compile_kfp(args): def compile_vertex(args): from fondant.dataset.compiler import VertexCompiler - pipeline = pipeline_from_string(args.ref) + pipeline = dataset_from_string(args.ref) compiler = VertexCompiler() compiler.compile(pipeline=pipeline, output_path=args.output_path) @@ -446,7 +446,7 @@ def compile_vertex(args): def compile_sagemaker(args): from fondant.dataset.compiler import SagemakerCompiler - pipeline = pipeline_from_string(args.ref) + pipeline = dataset_from_string(args.ref) compiler = SagemakerCompiler() compiler.compile( pipeline=pipeline, @@ -607,18 +607,28 @@ def run_local(args): from fondant.dataset.runner import DockerRunner extra_volumes = [] + # use workspace from cli command + # if args.workspace exists + + workspace = getattr(args, "workspace", None) + if workspace is None: + workspace = Workspace( + name="dummy_workspace", + base_path=".artifacts", + ) # TODO: handle in #887 -> retrieve global workspace or init default one if args.extra_volumes: extra_volumes.extend(args.extra_volumes) try: - ref = pipeline_from_string(args.ref) + dataset = dataset_from_string(args.ref) except ModuleNotFoundError: - ref = args.ref + dataset = args.ref runner = DockerRunner() runner.run( - input=ref, + dataset=dataset, + workspace=workspace, extra_volumes=extra_volumes, build_args=args.build_arg, auth_provider=args.auth_provider, @@ -632,19 +642,19 @@ def run_kfp(args): msg = "--host argument is required for running on Kubeflow" raise ValueError(msg) try: - ref = pipeline_from_string(args.ref) + ref = dataset_from_string(args.ref) except ModuleNotFoundError: ref = args.ref runner = KubeflowRunner(host=args.host) - runner.run(input=ref) + runner.run(dataset=ref) def run_vertex(args): from fondant.dataset.runner import VertexRunner try: - ref = pipeline_from_string(args.ref) + ref = dataset_from_string(args.ref) except ModuleNotFoundError: ref = args.ref @@ -661,7 +671,7 @@ def run_sagemaker(args): from fondant.dataset.runner import SagemakerRunner try: - ref = pipeline_from_string(args.ref) + ref = dataset_from_string(args.ref) except ModuleNotFoundError: ref = args.ref @@ -715,7 +725,7 @@ class ComponentImportError(Exception): """Error raised when an import string is not valid.""" -class PipelineImportError(Exception): +class DatasetImportError(Exception): """Error raised when an import from module is not valid.""" @@ -761,8 +771,8 @@ def _called_with_wrong_args(f): del tb -def pipeline_from_string(string_ref: str) -> Pipeline: # noqa: PLR0912 - """Get the pipeline from the provided string reference. +def dataset_from_string(string_ref: str) -> Dataset: # noqa: PLR0912 + """Get the workspace from the provided string reference. Inspired by Flask: https://github.com/pallets/flask/blob/d611989/src/flask/cli.py#L112 @@ -776,19 +786,19 @@ def pipeline_from_string(string_ref: str) -> Pipeline: # noqa: PLR0912 The pipeline obtained from the provided string """ if ":" not in string_ref: - return pipeline_from_module(string_ref) + return dataset_from_module(string_ref) - module_str, pipeline_str = string_ref.split(":") + module_str, dataset_str = string_ref.split(":") module = get_module(module_str) # Parse `pipeline_str` as a single expression to determine if it's a valid # attribute name or function call. try: - expr = ast.parse(pipeline_str.strip(), mode="eval").body + expr = ast.parse(dataset_str.strip(), mode="eval").body except SyntaxError: - msg = f"Failed to parse {pipeline_str} as an attribute name or function call." - raise PipelineImportError( + msg = f"Failed to parse {dataset_str} as an attribute name or function call." + raise DatasetImportError( msg, ) from None @@ -799,8 +809,8 @@ def pipeline_from_string(string_ref: str) -> Pipeline: # noqa: PLR0912 elif isinstance(expr, ast.Call): # Ensure the function name is an attribute name only. if not isinstance(expr.func, ast.Name): - msg = f"Function reference must be a simple name: {pipeline_str}." - raise PipelineImportError( + msg = f"Function reference must be a simple name: {dataset_str}." + raise DatasetImportError( msg, ) @@ -813,13 +823,13 @@ def pipeline_from_string(string_ref: str) -> Pipeline: # noqa: PLR0912 except ValueError: # literal_eval gives cryptic error messages, show a generic # message with the full expression instead. - msg = f"Failed to parse arguments as literal values: {pipeline_str}." - raise PipelineImportError( + msg = f"Failed to parse arguments as literal values: {dataset_str}." + raise DatasetImportError( msg, ) from None else: - msg = f"Failed to parse {pipeline_str} as an attribute name or function call." - raise PipelineImportError( + msg = f"Failed to parse {dataset_str} as an attribute name or function call." + raise DatasetImportError( msg, ) @@ -827,7 +837,7 @@ def pipeline_from_string(string_ref: str) -> Pipeline: # noqa: PLR0912 attr = getattr(module, name) except AttributeError as e: msg = f"Failed to find attribute {name} in {module.__name__}." - raise PipelineImportError( + raise DatasetImportError( msg, ) from e @@ -840,47 +850,45 @@ def pipeline_from_string(string_ref: str) -> Pipeline: # noqa: PLR0912 if not _called_with_wrong_args(attr): raise - msg = f"The factory {pipeline_str} in module {module.__name__} could not be called with the specified arguments." - raise PipelineImportError( + msg = f"The factory {dataset_str} in module {module.__name__} could not be called with the specified arguments." + raise DatasetImportError( msg, ) from e else: app = attr - if isinstance(app, Pipeline): + if isinstance(app, Dataset): return app - msg = f"A valid Fondant pipeline was not obtained from '{module.__name__}:{pipeline_str}'." - raise PipelineImportError( + msg = f"A valid Fondant workspace was not obtained from '{module.__name__}:{dataset_str}'." + raise DatasetImportError( msg, ) -def pipeline_from_module(module_str: str) -> Pipeline: - """Try to import a pipeline from a string otherwise raise an ImportFromStringError.""" - from fondant.dataset import Pipeline +def dataset_from_module(module_str: str) -> Dataset: + """Try to import a dataset from a string otherwise raise an ImportFromStringError.""" + from fondant.dataset import Dataset module = get_module(module_str) - pipeline_instances = [ - obj for obj in module.__dict__.values() if isinstance(obj, Pipeline) + dataset_instances = [ + obj for obj in module.__dict__.values() if isinstance(obj, Dataset) ] - if not pipeline_instances: - msg = f"No pipeline found in module {module_str}" - raise PipelineImportError(msg) + if not dataset_instances: + msg = f"No dataset found in module {module_str}" + raise DatasetImportError(msg) - if len(pipeline_instances) > 1: + # Skip this one and choose the first dataset instance? + if len(dataset_instances) > 1: msg = ( - f"Found multiple instantiated pipelines in {module_str}. Only one pipeline " - f"can be present" + f"Found multiple instantiated datasets in {module_str}. Use the first dataset to start " + f"the execution." ) - raise PipelineImportError(msg) - - pipeline = pipeline_instances[0] - logger.info(f"Pipeline `{pipeline.name}` found in module {module_str}") + logger.info(msg) - return pipeline + return dataset_instances[0] def component_from_module(module_str: str) -> t.Type["Component"]: diff --git a/src/fondant/core/component_spec.py b/src/fondant/core/component_spec.py index ff31c89c..c9647795 100644 --- a/src/fondant/core/component_spec.py +++ b/src/fondant/core/component_spec.py @@ -15,7 +15,7 @@ from referencing import Registry, Resource from referencing.jsonschema import DRAFT4 -from fondant.core.exceptions import InvalidComponentSpec, InvalidPipelineDefinition +from fondant.core.exceptions import InvalidComponentSpec, InvalidWorkspaceDefinition from fondant.core.schema import Field, Type @@ -411,7 +411,7 @@ def _validate_mappings(self) -> None: for key, value in mapping.items(): if not isinstance(value, (str, pa.DataType)): msg = f"Unexpected type {type(value)} received for key {key} in {name} mapping" - raise InvalidPipelineDefinition(msg) + raise InvalidWorkspaceDefinition(msg) def _dataset_schema_to_operation_schema(self, name: str) -> t.Mapping[str, Field]: """Calculate the operations schema based on dataset schema. @@ -450,7 +450,7 @@ def _dataset_schema_to_operation_schema(self, name: str) -> t.Mapping[str, Field f"already defined in the `{name}` section of the component spec " f"with type {spec_type}" ) - raise InvalidPipelineDefinition(msg) + raise InvalidWorkspaceDefinition(msg) return types.MappingProxyType(mapping) @@ -492,7 +492,7 @@ def _operation_schema_to_dataset_schema(self, name: str) -> t.Mapping[str, Field f"argument passed to the operation, but `{operations_column_name}` is not " f"defined in the `{name}` section of the component spec." ) - raise InvalidPipelineDefinition(msg) + raise InvalidWorkspaceDefinition(msg) return types.MappingProxyType(mapping) diff --git a/src/fondant/core/exceptions.py b/src/fondant/core/exceptions.py index 66fa7f87..fb1a3ff1 100644 --- a/src/fondant/core/exceptions.py +++ b/src/fondant/core/exceptions.py @@ -15,7 +15,7 @@ class InvalidComponentSpec(ValidationError, FondantException): """Thrown when a component spec cannot be validated against the schema.""" -class InvalidPipelineDefinition(ValidationError, FondantException): +class InvalidWorkspaceDefinition(ValidationError, FondantException): """Thrown when a pipeline definition is invalid.""" diff --git a/src/fondant/dataset/__init__.py b/src/fondant/dataset/__init__.py index 8288295a..afe95fae 100644 --- a/src/fondant/dataset/__init__.py +++ b/src/fondant/dataset/__init__.py @@ -8,6 +8,6 @@ VALID_VERTEX_ACCELERATOR_TYPES, ComponentOp, Dataset, - Pipeline, + Workspace, Resources, ) diff --git a/src/fondant/dataset/compiler.py b/src/fondant/dataset/compiler.py index 00d3a99f..2a4f09c7 100644 --- a/src/fondant/dataset/compiler.py +++ b/src/fondant/dataset/compiler.py @@ -15,14 +15,15 @@ from fsspec.registry import known_implementations from fondant.core.component_spec import ComponentSpec -from fondant.core.exceptions import InvalidPipelineDefinition +from fondant.core.exceptions import InvalidWorkspaceDefinition from fondant.core.manifest import Metadata from fondant.core.schema import CloudCredentialsMount, DockerVolume from fondant.dataset import ( VALID_ACCELERATOR_TYPES, VALID_VERTEX_ACCELERATOR_TYPES, + Dataset, Image, - Pipeline, + Workspace, ) logger = logging.getLogger(__name__) @@ -90,7 +91,8 @@ class DockerCompiler(Compiler): def compile( self, - pipeline: Pipeline, + dataset: Dataset, + workspace: Workspace, *, output_path: str = "docker-compose.yml", extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None, @@ -100,7 +102,8 @@ def compile( """Compile a pipeline to docker-compose spec and save it to a specified output path. Args: - pipeline: the pipeline to compile + dataset: the dataset to compile + workspace: workspace to operate in output_path: the path where to save the docker-compose spec extra_volumes: a list of extra volumes (using the Short syntax: https://docs.docker.com/compose/compose-file/05-services/#short-syntax-5) @@ -109,6 +112,8 @@ def compile( auth_provider: The cloud provider to use for authentication. Default is None. """ + # TODO: add method call to retrieve workspace context, and make passing workspace optional + if extra_volumes is None: extra_volumes = [] @@ -118,10 +123,11 @@ def compile( if auth_provider: extra_volumes.append(auth_provider.get_path()) - logger.info(f"Compiling {pipeline.name} to {output_path}") + logger.info(f"Compiling {workspace.name} to {output_path}") spec = self._generate_spec( - pipeline, + dataset, + workspace, extra_volumes=extra_volumes, build_args=build_args or [], ) @@ -186,7 +192,8 @@ def resolve_local_base_path(base_path: Path) -> Path: def _generate_spec( self, - pipeline: Pipeline, + dataset: Dataset, + workspace: Workspace, *, extra_volumes: t.List[str], build_args: t.List[str], @@ -194,16 +201,16 @@ def _generate_spec( """Generate a docker-compose spec as a python dictionary, loops over the pipeline graph to create services and their dependencies. """ - path, volume = self._patch_path(base_path=pipeline.base_path) - run_id = pipeline.get_run_id() + path, volume = self._patch_path(base_path=workspace.base_path) + run_id = workspace.get_run_id() services = {} - pipeline.validate(run_id=run_id) + dataset.validate(run_id=run_id, workspace=workspace) component_cache_key = None - for component_id, component in pipeline._graph.items(): + for component_id, component in dataset._graph.items(): component_op = component["operation"] component_cache_key = component_op.get_component_cache_key( @@ -211,7 +218,7 @@ def _generate_spec( ) metadata = Metadata( - pipeline_name=pipeline.name, + pipeline_name=workspace.name, run_id=run_id, base_path=path, component_id=component_id, @@ -272,7 +279,7 @@ def _generate_spec( "volumes": volumes, "ports": ports, "labels": { - "pipeline_description": pipeline.description, + "pipeline_description": workspace.description, }, } @@ -290,7 +297,7 @@ def _generate_spec( services[component_id]["image"] = component_op.component_spec.image return { - "name": pipeline.name, + "name": workspace.name, "version": "3.8", "services": services, } @@ -311,7 +318,7 @@ def _set_configuration(self, services, fondant_component_operation, component_id f" is not a valid accelerator type for Docker Compose compiler." f" Available options: {VALID_VERTEX_ACCELERATOR_TYPES}" ) - raise InvalidPipelineDefinition(msg) + raise InvalidWorkspaceDefinition(msg) if accelerator_name == "GPU": services[component_id]["deploy"] = { @@ -474,18 +481,22 @@ def _resolve_imports(self): def compile( self, - pipeline: Pipeline, + dataset: Dataset, + workspace: Workspace, output_path: str, ) -> None: """Compile a pipeline to Kubeflow pipeline spec and save it to a specified output path. Args: - pipeline: the pipeline to compile + dataset: the dataset to compile + workspace: workspace to operate in output_path: the path where to save the Kubeflow pipeline spec """ - run_id = pipeline.get_run_id() - pipeline.validate(run_id=run_id) - logger.info(f"Compiling {pipeline.name} to {output_path}") + # TODO: add method call to retrieve workspace context, and make passing workspace optional + + run_id = workspace.get_run_id() + dataset.validate(run_id=run_id, workspace=workspace) + logger.info(f"Compiling {workspace.name} to {output_path}") def set_component_exec_args( *, @@ -511,12 +522,12 @@ def set_component_exec_args( return component_op - @self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description) + @self.kfp.dsl.pipeline(name=workspace.name, description=workspace.description) def kfp_pipeline(): previous_component_task = None component_cache_key = None - for component_name, component in pipeline._graph.items(): + for component_name, component in dataset._graph.items(): logger.info(f"Compiling service for {component_name}") component_op = component["operation"] @@ -542,15 +553,15 @@ def kfp_pipeline(): previous_component_cache=component_cache_key, ) metadata = Metadata( - pipeline_name=pipeline.name, + pipeline_name=workspace.name, run_id=run_id, - base_path=pipeline.base_path, + base_path=workspace.base_path, component_id=component_name, cache_key=component_cache_key, ) output_manifest_path = ( - f"{pipeline.base_path}/{pipeline.name}/" + f"{workspace.base_path}/{workspace.name}/" f"{run_id}/{component_name}/manifest.json" ) # Set the execution order of the component task to be after the previous @@ -558,7 +569,7 @@ def kfp_pipeline(): if component["dependencies"]: for dependency in component["dependencies"]: input_manifest_path = ( - f"{pipeline.base_path}/{pipeline.name}/" + f"{workspace.base_path}/{workspace.name}/" f"{run_id}/{dependency}/manifest.json" ) kubeflow_component_op = set_component_exec_args( @@ -597,7 +608,7 @@ def kfp_pipeline(): previous_component_task = component_task - logger.info(f"Compiling {pipeline.name} to {output_path}") + logger.info(f"Compiling {workspace.name} to {output_path}") self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore logger.info("Pipeline compiled successfully") @@ -633,7 +644,7 @@ def _set_configuration(self, task, fondant_component_operation): f"Configured accelerator `{accelerator_name}` is not a valid accelerator type" f"for Kubeflow compiler. Available options: {VALID_ACCELERATOR_TYPES}" ) - raise InvalidPipelineDefinition(msg) + raise InvalidWorkspaceDefinition(msg) task.set_accelerator_limit(accelerator_number) if accelerator_name == "GPU": @@ -692,7 +703,7 @@ def _set_configuration(self, task, fondant_component_operation): f"Configured accelerator `{accelerator_name}` is not a valid accelerator type" f"for Vertex compiler. Available options: {VALID_VERTEX_ACCELERATOR_TYPES}" ) - raise InvalidPipelineDefinition(msg) + raise InvalidWorkspaceDefinition(msg) task.set_accelerator_type(accelerator_name) @@ -825,7 +836,8 @@ def validate_base_path(self, base_path: str) -> None: def compile( self, - pipeline: Pipeline, + dataset: Dataset, + workspace: Workspace, output_path: str, *, role_arn: t.Optional[str] = None, @@ -834,32 +846,35 @@ def compile( to a specified output path. Args: - pipeline: the pipeline to compile + dataset: the dataset to compile + workspace: workspace to operate in output_path: the path where to save the sagemaker pipeline spec. role_arn: the Amazon Resource Name role to use for the processing steps, if none provided the `sagemaker.get_execution_role()` role will be used. """ + # TODO: add method call to retrieve workspace context, and make passing workspace optional + self.ecr_client = self.boto3.client("ecr") - self.validate_base_path(pipeline.base_path) + self.validate_base_path(workspace.base_path) self._check_ecr_pull_through_rule() - run_id = pipeline.get_run_id() - path = pipeline.base_path - pipeline.validate(run_id=run_id) + run_id = workspace.get_run_id() + path = workspace.base_path + dataset.validate(run_id=run_id, workspace=workspace) component_cache_key = None steps: t.List[t.Any] = [] with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdirname: - for component_name, component in pipeline._graph.items(): + for component_name, component in dataset._graph.items(): component_op = component["operation"] component_cache_key = component_op.get_component_cache_key( previous_component_cache=component_cache_key, ) metadata = Metadata( - pipeline_name=pipeline.name, + pipeline_name=workspace.name, run_id=run_id, base_path=path, component_id=component_name, @@ -911,7 +926,7 @@ def compile( steps.append(step) sagemaker_pipeline = self.sagemaker.workflow.pipeline.Pipeline( - name=pipeline.name, + name=workspace.name, steps=steps, ) with open(output_path, "w") as outfile: diff --git a/src/fondant/dataset/dataset.py b/src/fondant/dataset/dataset.py index d013d1a0..7666a384 100644 --- a/src/fondant/dataset/dataset.py +++ b/src/fondant/dataset/dataset.py @@ -22,7 +22,7 @@ from fondant.core.component_spec import ComponentSpec, OperationSpec from fondant.core.exceptions import ( InvalidLightweightComponent, - InvalidPipelineDefinition, + InvalidWorkspaceDefinition, ) from fondant.core.manifest import Manifest from fondant.core.schema import Field @@ -96,7 +96,7 @@ def __post_init__(self): """Validate the resources.""" if bool(self.node_pool_label) != bool(self.node_pool_name): msg = "Both node_pool_label and node_pool_name must be specified or both must be None." - raise InvalidPipelineDefinition( + raise InvalidWorkspaceDefinition( msg, ) @@ -105,7 +105,7 @@ def __post_init__(self): "Both number of accelerators and accelerator name must be specified or both must" " be None." ) - raise InvalidPipelineDefinition( + raise InvalidWorkspaceDefinition( msg, ) @@ -265,7 +265,7 @@ def _validate_consumes( f"The dataset does not contain the column {dataset_column_name_or_type} " f"required by the component {component_spec.name}." ) - raise InvalidPipelineDefinition(msg) + raise InvalidWorkspaceDefinition(msg) # If operations column name is not in the component spec, but additional properties # are true we will infer the correct type from the dataset fields @@ -284,7 +284,7 @@ def _validate_consumes( f"but `{operations_column_name}` is not defined in the `consumes` " f"section of the component spec." ) - raise InvalidPipelineDefinition(msg) + raise InvalidWorkspaceDefinition(msg) return validated_consumes @@ -440,30 +440,71 @@ def get_nested_dict_hash(input_dict): return get_nested_dict_hash(component_op_uid_dict) -class Pipeline: - """Class representing a Fondant Pipeline.""" +class Workspace: + """Workspace holding environment information for a Fondants execution environment.""" def __init__( self, name: str, - *, base_path: str, description: t.Optional[str] = None, ): - """ - Args: - name: The name of the pipeline. - base_path: The base path for the pipeline to use to store artifacts and data. This - can be a local path or a remote path on one of the supported cloud storage - services. The path should already exist. - description: Optional description of the pipeline. - """ - self.base_path = base_path - self.name = self._validate_pipeline_name(name) + self.name = self._validate_workspace_name(name) self.description = description + self.base_path = base_path self.package_path = f"{name}.tgz" + + @staticmethod + def _validate_workspace_name(name: str) -> str: + pattern = r"^[a-z0-9][a-z0-9_-]*$" + if not re.match(pattern, name): + msg = f"The workspace name violates the pattern {pattern}" + raise InvalidWorkspaceDefinition(msg) + return name + + def get_run_id(self) -> str: + """Get a unique run ID for the workspace.""" + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + return f"{self.name}-{timestamp}" + + +class Dataset: + def __init__( + self, + name: t.Optional[str] = None, + description: t.Optional[str] = None, + manifest: t.Optional[Manifest] = None, + run_id: t.Optional[ + str + ] = None, # TODO: could be probably used as dataset version in the future! + ): + if name is not None: + self.name = self._validate_dataset_name(name) + + self.description = description self._graph: t.OrderedDict[str, t.Any] = OrderedDict() self.task_without_dependencies_added = False + self.manifest = manifest + + if run_id is None: + # TODO random generation of run id? + self.run_id = "run-id" + else: + self.run_id = run_id + + @staticmethod + def _validate_dataset_name(name: str) -> str: + pattern = r"^[a-z0-9][a-z0-9_-]*$" + if not re.match(pattern, name): + msg = f"The workspace name violates the pattern {pattern}" + raise InvalidWorkspaceDefinition(msg) + return name + + @staticmethod + def load(manifest: Manifest) -> "Dataset": + """Load a dataset from a manifest.""" + # TODO: fondant #885 + raise NotImplementedError def register_operation( self, @@ -472,6 +513,9 @@ def register_operation( input_dataset: t.Optional["Dataset"], output_dataset: t.Optional["Dataset"], ) -> None: + if self._graph is None: + self._graph = OrderedDict() + dependencies = [] for component_name, info in self._graph.items(): if info["output_dataset"] == input_dataset: @@ -483,10 +527,11 @@ def register_operation( "output_dataset": output_dataset, } + @staticmethod def read( - self, ref: t.Any, *, + workspace: Workspace, produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None, arguments: t.Optional[t.Dict[str, t.Any]] = None, input_partition_rows: t.Optional[t.Union[int, str]] = None, @@ -499,6 +544,7 @@ def read( Args: ref: The name of a reusable component, or the path to the directory containing a containerized component, or a lightweight component class. + workspace: The workspace to operate in produces: A mapping to update the fields produced by the operation as defined in the component spec. The keys are the names of the fields to be received by the component, while the values are the type of the field, or the name of the field to @@ -512,11 +558,7 @@ def read( Returns: An intermediate dataset. """ - if self._graph: - msg = "For now, at most one read component can be applied per pipeline." - raise InvalidPipelineDefinition( - msg, - ) + # TODO: add method call to retrieve workspace context, and make passing workspace optional operation = ComponentOp.from_ref( ref, @@ -526,13 +568,16 @@ def read( resources=resources, cache=cache, ) + + run_id = workspace.get_run_id() manifest = Manifest.create( - pipeline_name=self.name, - base_path=self.base_path, - run_id=self.get_run_id(), + pipeline_name=workspace.name, + base_path=workspace.base_path, + run_id=run_id, component_id=operation.component_name, ) - dataset = Dataset(manifest, pipeline=self) + + dataset = Dataset(manifest=manifest, run_id=run_id) return dataset._apply(operation) @@ -560,32 +605,20 @@ def depth_first_traversal(node: str): self._graph = OrderedDict((node, self._graph[node]) for node in sorted_graph) - @staticmethod - def _validate_pipeline_name(pipeline_name: str) -> str: - pattern = r"^[a-z0-9][a-z0-9_-]*$" - if not re.match(pattern, pipeline_name): - msg = f"The pipeline name violates the pattern {pattern}" - raise InvalidPipelineDefinition(msg) - return pipeline_name - - def get_run_id(self) -> str: - """Get a unique run ID for the pipeline.""" - timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - return f"{self.name}-{timestamp}" - - def validate(self, run_id: str): + def validate(self, run_id: str, workspace: Workspace): """Sort and run validation on the pipeline definition. Args: run_id: run identifier + workspace: workspace to operate in """ self.sort_graph() - self._validate_pipeline_definition(run_id) + self._validate_workspace_definition(run_id, workspace) - def _validate_pipeline_definition(self, run_id: str): + def _validate_workspace_definition(self, run_id: str, workspace: Workspace): """ - Validates the pipeline definition by ensuring that the consumed and produced subsets and + Validates the workspace definition by ensuring that the consumed and produced subsets and their associated fields match and are invoked in the correct order. Raises: @@ -605,8 +638,8 @@ def _validate_pipeline_definition(self, run_id: str): # Create initial manifest manifest = Manifest.create( - pipeline_name=self.name, - base_path=self.base_path, + pipeline_name=workspace.name, + base_path=workspace.base_path, run_id=run_id, component_id=load_component_name, cache_key="42", @@ -628,7 +661,7 @@ def _validate_pipeline_definition(self, run_id: str): f"in the previous components. \n" f"Available field names: {list(manifest.fields.keys())}" ) - raise InvalidPipelineDefinition( + raise InvalidWorkspaceDefinition( msg, ) @@ -645,7 +678,7 @@ def _validate_pipeline_definition(self, run_id: str): f"{manifest_field.type}\nThe current component to " f"trying to invoke it with this type:\n{component_field.type}" ) - raise InvalidPipelineDefinition( + raise InvalidWorkspaceDefinition( msg, ) @@ -658,22 +691,13 @@ def __repr__(self) -> str: """Return a string representation of the FondantPipeline object.""" return f"{self.__class__.__name__}({self._graph!r}" - -class Dataset: - def __init__(self, manifest, *, pipeline: Pipeline) -> None: - """A class representing an intermediate dataset. - - Args: - manifest: Manifest representing the dataset - pipeline: The pipeline this dataset is a part of. - """ - self.manifest = manifest - self.pipeline = pipeline - @property def fields(self) -> t.Mapping[str, Field]: """The fields of the manifest as an immutable mapping.""" - return dict(self.manifest.fields) + if self.manifest: + return dict(self.manifest.fields) + msg = "No manifest found." + raise ValueError(msg) def _apply(self, operation: ComponentOp) -> "Dataset": """ @@ -681,19 +705,29 @@ def _apply(self, operation: ComponentOp) -> "Dataset": Args: operation: The operation to apply. + workspace: The workspace to operate in. """ + if self.manifest is None: + msg = "No manifest found." + raise ValueError(msg) + evolved_manifest = self.manifest.evolve( operation.operation_spec, - run_id=self.pipeline.get_run_id(), + run_id=self.run_id, ) - evolved_dataset = Dataset(evolved_manifest, pipeline=self.pipeline) - if self.pipeline is not None: - self.pipeline.register_operation( - operation, - input_dataset=self, - output_dataset=evolved_dataset, - ) + evolved_dataset = Dataset( + manifest=evolved_manifest, + run_id=self.run_id, + ) + + evolved_dataset._graph = self._graph + + evolved_dataset.register_operation( + operation, + input_dataset=self, # using reference to manifests instead? + output_dataset=evolved_dataset, + ) return evolved_dataset @@ -714,6 +748,7 @@ def apply( Args: ref: The name of a reusable component, or the path to the directory containing a custom component, or a lightweight component class. + workspace: workspace to operate in consumes: A mapping to update the fields consumed by the operation as defined in the component spec. The keys are the names of the fields to be received by the component, while the values are the type of the field, or the name of the field to @@ -795,6 +830,8 @@ def apply( Returns: An intermediate dataset. """ + # TODO: add method call to retrieve workspace context, and make passing workspace optional + operation = ComponentOp.from_ref( ref, fields=self.fields, @@ -824,6 +861,7 @@ def write( Args: ref: The name of a reusable component, or the path to the directory containing a custom component, or a lightweight component class. + workspace: workspace to operate in consumes: A mapping to update the fields consumed by the operation as defined in the component spec. The keys are the names of the fields to be received by the component, while the values are the type of the field, or the name of the field to @@ -837,6 +875,8 @@ def write( Returns: An intermediate dataset. """ + # TODO: add method call to retrieve workspace context, and make passing workspace optional + operation = ComponentOp.from_ref( ref, fields=self.fields, diff --git a/src/fondant/dataset/runner.py b/src/fondant/dataset/runner.py index 7cb9f9c7..948aab31 100644 --- a/src/fondant/dataset/runner.py +++ b/src/fondant/dataset/runner.py @@ -8,7 +8,7 @@ import yaml from fondant.core.schema import CloudCredentialsMount -from fondant.dataset import Pipeline +from fondant.dataset import Dataset, Workspace from fondant.dataset.compiler import ( DockerCompiler, KubeFlowCompiler, @@ -53,7 +53,8 @@ def _run(self, input_spec: str, *args, **kwargs): def run( self, - input: t.Union[Pipeline, str], + dataset: t.Union[Dataset, str], + workspace: Workspace, *, extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None, build_args: t.Optional[t.List[str]] = None, @@ -62,7 +63,8 @@ def run( """Run a pipeline, either from a compiled docker-compose spec or from a fondant pipeline. Args: - input: the pipeline to compile or a path to a already compiled docker-compose spec + dataset: the dataset to compile or a path to an already compiled docker-compose spec + workspace: The workspace to operate in extra_volumes: a list of extra volumes (using the Short syntax: https://docs.docker.com/compose/compose-file/05-services/#short-syntax-5) to mount in the docker-compose spec. @@ -72,7 +74,7 @@ def run( self.check_docker_install() self.check_docker_compose_install() - if isinstance(input, Pipeline): + if isinstance(dataset, Dataset): os.makedirs(".fondant", exist_ok=True) output_path = ".fondant/compose.yaml" logging.info( @@ -80,7 +82,8 @@ def run( ) compiler = DockerCompiler() compiler.compile( - input, + dataset, + workspace=workspace, output_path=output_path, extra_volumes=extra_volumes, build_args=build_args, @@ -88,7 +91,8 @@ def run( ) self._run(output_path) else: - self._run(input) + # TODO: use better naming for the case, initialising from existing docker compose spec + self._run(dataset) @staticmethod def _versionify(version: str) -> t.Tuple[int, ...]: @@ -180,17 +184,19 @@ def _resolve_imports(self): def run( self, - input: t.Union[Pipeline, str], + dataset: t.Union[Dataset, str], + workspace: Workspace, *, experiment_name: str = "Default", ): """Run a pipeline, either from a compiled kubeflow spec or from a fondant pipeline. Args: - input: the pipeline to compile or a path to a already compiled sagemaker spec + dataset: the dataset to compile or a path to an already compiled sagemaker spec + workspace: workspace to operate in experiment_name: the name of the experiment to create """ - if isinstance(input, Pipeline): + if isinstance(dataset, Dataset): os.makedirs(".fondant", exist_ok=True) output_path = ".fondant/kubeflow-pipeline.yaml" logging.info( @@ -198,12 +204,13 @@ def run( ) compiler = KubeFlowCompiler() compiler.compile( - input, + dataset, + workspace, output_path=output_path, ) self._run(output_path, experiment_name=experiment_name) else: - self._run(input, experiment_name=experiment_name) + self._run(dataset, experiment_name=experiment_name) def _run( self, @@ -263,14 +270,16 @@ def __init__( def run( self, - input: t.Union[Pipeline, str], + dataset: t.Union[Dataset, str], + workspace: Workspace, ): """Run a pipeline, either from a compiled vertex spec or from a fondant pipeline. Args: - input: the pipeline to compile or a path to a already compiled sagemaker spec + dataset: the dataset to compile or a path to an already compiled sagemaker spec + workspace: workspace to operate in """ - if isinstance(input, Pipeline): + if isinstance(dataset, Dataset): os.makedirs(".fondant", exist_ok=True) output_path = ".fondant/vertex-pipeline.yaml" logging.info( @@ -278,12 +287,13 @@ def run( ) compiler = VertexCompiler() compiler.compile( - input, + dataset, + workspace, output_path=output_path, ) self._run(output_path) else: - self._run(input) + self._run(dataset) def _run(self, input_spec: str, *args, **kwargs): job = self.aip.PipelineJob( @@ -322,19 +332,22 @@ def __resolve_imports(self): def run( self, - input: t.Union[Pipeline, str], + dataset: t.Union[Dataset, str], + workspace: Workspace, pipeline_name: str, role_arn: str, ): - """Run a pipeline, either from a compiled sagemaker spec or from a fondant pipeline. + """Run a dataset execution, either from a compiled sagemaker spec or from a fondant + pipeline. Args: - input: the pipeline to compile or a path to a already compiled sagemaker spec + dataset: the dataset to compile or a path to a already compiled sagemaker spec + workspace: workspace to operate in pipeline_name: the name of the pipeline to create role_arn: the Amazon Resource Name role to use for the processing steps, if none provided the `sagemaker.get_execution_role()` role will be used. """ - if isinstance(input, Pipeline): + if isinstance(dataset, Dataset): os.makedirs(".fondant", exist_ok=True) output_path = ".fondant/sagemaker-pipeline.yaml" logging.info( @@ -342,13 +355,14 @@ def run( ) compiler = SagemakerCompiler() compiler.compile( - input, + dataset=dataset, + workspace=workspace, output_path=output_path, role_arn=role_arn, ) self._run(output_path, pipeline_name=pipeline_name, role_arn=role_arn) else: - self._run(input, pipeline_name=pipeline_name, role_arn=role_arn) + self._run(dataset, pipeline_name=pipeline_name, role_arn=role_arn) def _run(self, input_spec: str, pipeline_name: str, role_arn: str): """Creates/updates a sagemaker pipeline and execute it.""" diff --git a/src/fondant/testing.py b/src/fondant/testing.py index 24e5c78c..7b7ddca4 100644 --- a/src/fondant/testing.py +++ b/src/fondant/testing.py @@ -5,7 +5,7 @@ import yaml -from fondant.core.exceptions import InvalidPipelineDefinition +from fondant.core.exceptions import InvalidWorkspaceDefinition @dataclass @@ -196,7 +196,7 @@ def from_spec(cls, spec_path: str) -> "KubeflowPipelineConfigs": if not specification: msg = "No component specification found in the pipeline specification" - raise InvalidPipelineDefinition(msg) + raise InvalidWorkspaceDefinition(msg) components_configs_dict = {} # Iterate through each service diff --git a/tests/core/test_manifest_evolution.py b/tests/core/test_manifest_evolution.py index c06ac97a..2b03ca69 100644 --- a/tests/core/test_manifest_evolution.py +++ b/tests/core/test_manifest_evolution.py @@ -5,7 +5,7 @@ import pytest import yaml from fondant.core.component_spec import ComponentSpec, OperationSpec -from fondant.core.exceptions import InvalidPipelineDefinition +from fondant.core.exceptions import InvalidWorkspaceDefinition from fondant.core.manifest import Manifest EXAMPLES_PATH = Path(__file__).parent / "examples/evolution_examples" @@ -106,7 +106,7 @@ def test_invalid_evolution_examples( component_spec = ComponentSpec.from_dict(component_spec) for test_condition in test_conditions: produces = test_condition["produces"] - with pytest.raises(InvalidPipelineDefinition): # noqa: PT012 + with pytest.raises(InvalidWorkspaceDefinition): # noqa: PT012 operation_spec = OperationSpec(component_spec, produces=produces) manifest.evolve( operation_spec=operation_spec, diff --git a/tests/examples/example_modules/dataset.py b/tests/examples/example_modules/dataset.py new file mode 100644 index 00000000..33da8f67 --- /dev/null +++ b/tests/examples/example_modules/dataset.py @@ -0,0 +1,19 @@ +from fondant.dataset import Dataset + + +def create_dataset_with_args(name): + return Dataset(name) + + +def create_dataset(): + return Dataset("test_dataset") + + +def not_implemented(): + raise NotImplementedError + + +workspace = create_dataset() + + +number = 1 diff --git a/tests/examples/example_modules/invalid_double_pipeline.py b/tests/examples/example_modules/invalid_double_pipeline.py deleted file mode 100644 index be1007d7..00000000 --- a/tests/examples/example_modules/invalid_double_pipeline.py +++ /dev/null @@ -1,4 +0,0 @@ -from fondant.dataset import Pipeline - -TEST_PIPELINE = Pipeline(name="test_pipeline", base_path="some/path") -TEST_PIPELINE_2 = Pipeline(name="test_pipeline", base_path="some/path") diff --git a/tests/examples/example_modules/invalid_double_workspace.py b/tests/examples/example_modules/invalid_double_workspace.py new file mode 100644 index 00000000..d794a91f --- /dev/null +++ b/tests/examples/example_modules/invalid_double_workspace.py @@ -0,0 +1,4 @@ +from fondant.dataset import Workspace + +TEST_WORKSPACE = Workspace(name="test_pipeline", base_path="some/path") +TEST_WORKSPACE_2 = Workspace(name="test_pipeline", base_path="some/path") diff --git a/tests/examples/example_modules/pipeline.py b/tests/examples/example_modules/pipeline.py deleted file mode 100644 index 39fd7adb..00000000 --- a/tests/examples/example_modules/pipeline.py +++ /dev/null @@ -1,19 +0,0 @@ -from fondant.dataset import Pipeline - - -def create_pipeline_with_args(name): - return Pipeline(name=name, base_path="some/path") - - -def create_pipeline(): - return Pipeline(name="test_pipeline", base_path="some/path") - - -def not_implemented(): - raise NotImplementedError - - -pipeline = create_pipeline() - - -number = 1 diff --git a/tests/pipeline/test_compiler.py b/tests/pipeline/test_compiler.py index 1c79307c..2b3b7878 100644 --- a/tests/pipeline/test_compiler.py +++ b/tests/pipeline/test_compiler.py @@ -15,14 +15,14 @@ import yaml from fondant.component import DaskLoadComponent from fondant.core.component_spec import ComponentSpec -from fondant.core.exceptions import InvalidPipelineDefinition +from fondant.core.exceptions import InvalidWorkspaceDefinition from fondant.core.manifest import Manifest, Metadata from fondant.core.schema import CloudCredentialsMount from fondant.dataset import ( ComponentOp, Dataset, - Pipeline, Resources, + Workspace, lightweight_component, ) from fondant.dataset.compiler import ( @@ -131,17 +131,19 @@ def now(cls): @pytest.fixture(params=TEST_PIPELINES) def setup_pipeline(request, tmp_path, monkeypatch): - pipeline = Pipeline( + workspace = Workspace( name="testpipeline", description="description of the test pipeline", base_path="/foo/bar", ) + + run_id = workspace.get_run_id() manifest = Manifest.create( - pipeline_name=pipeline.name, - base_path=pipeline.base_path, - run_id=pipeline.get_run_id(), + pipeline_name=workspace.name, + base_path=workspace.base_path, + run_id=run_id, ) - dataset = Dataset(manifest, pipeline=pipeline) + dataset = Dataset(manifest=manifest, run_id=run_id) cache_dict = {} example_dir, components = request.param for component_dict in components: @@ -158,29 +160,34 @@ def setup_pipeline(request, tmp_path, monkeypatch): cache_dict[component.component_name] = cache_key # override the default package_path with temporary path to avoid the creation of artifacts - monkeypatch.setattr(pipeline, "package_path", str(tmp_path / "test_pipeline.tgz")) + monkeypatch.setattr(workspace, "package_path", str(tmp_path / "test_pipeline.tgz")) - return example_dir, pipeline, cache_dict + return example_dir, workspace, dataset, cache_dict @pytest.mark.usefixtures("_freeze_time") def test_docker_compiler(setup_pipeline, tmp_path_factory): """Test compiling a pipeline to docker-compose.""" - example_dir, pipeline, _ = setup_pipeline + example_dir, workspace, dataset, _ = setup_pipeline compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: - pipeline.base_path = str(fn) + workspace.base_path = str(fn) output_path = str(fn / "docker-compose.yml") - compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) + compiler.compile( + dataset=dataset, + workspace=workspace, + output_path=output_path, + build_args=[], + ) pipeline_configs = DockerComposeConfigs.from_spec(output_path) - assert pipeline_configs.pipeline_name == pipeline.name - assert pipeline_configs.pipeline_description == pipeline.description + assert pipeline_configs.pipeline_name == workspace.name + assert pipeline_configs.pipeline_description == workspace.description for ( component_name, component_configs, ) in pipeline_configs.component_configs.items(): # Get expected component configs - component = pipeline._graph[component_name] + component = dataset._graph[component_name] component_op = component["operation"] # Check that the component configs are correct @@ -207,12 +214,12 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): # volumes are only created for local existing directories with tmp_path_factory.mktemp("temp") as fn: # this is the directory mounted in the container - _, pipeline, cache_dict = setup_pipeline + _, workspace, dataset, cache_dict = setup_pipeline work_dir = f"/{fn.stem}" - pipeline.base_path = str(fn) + workspace.base_path = str(fn) compiler = DockerCompiler() output_path = str(fn / "docker-compose.yml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile(dataset=dataset, workspace=workspace, output_path=output_path) pipeline_configs = DockerComposeConfigs.from_spec(output_path) expected_run_id = "testpipeline-20230101000000" for ( @@ -229,7 +236,7 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): "type": "bind", }, ] - cleaned_pipeline_name = pipeline.name.replace("_", "") + cleaned_pipeline_name = workspace.name.replace("_", "") # check if commands are patched to use the working dir expected_output_manifest_path = ( f"{work_dir}/{cleaned_pipeline_name}/{expected_run_id}" @@ -251,13 +258,13 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): @pytest.mark.usefixtures("_freeze_time") def test_docker_remote_path(setup_pipeline, tmp_path_factory): """Test that a remote path is applied correctly in the arguments and no volume.""" - _, pipeline, cache_dict = setup_pipeline + _, workspace, dataset, cache_dict = setup_pipeline remote_dir = "gs://somebucket/artifacts" - pipeline.base_path = remote_dir + workspace.base_path = remote_dir compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile(dataset=dataset, workspace=workspace, output_path=output_path) pipeline_configs = DockerComposeConfigs.from_spec(output_path) expected_run_id = "testpipeline-20230101000000" for ( @@ -268,7 +275,7 @@ def test_docker_remote_path(setup_pipeline, tmp_path_factory): # check that no volumes are created assert component_configs.volumes == [] # check if commands are patched to use the remote dir - cleaned_pipeline_name = pipeline.name.replace("_", "") + cleaned_pipeline_name = workspace.name.replace("_", "") expected_output_manifest_path = ( f"{remote_dir}/{cleaned_pipeline_name}/{expected_run_id}" @@ -296,8 +303,8 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: # this is the directory mounted in the container - _, pipeline, _ = setup_pipeline - pipeline.base_path = str(fn) + _, workspace, dataset, _ = setup_pipeline + workspace.base_path = str(fn) compiler = DockerCompiler() # define some extra volumes to be mounted extra_volumes = ["hello:there", "general:kenobi"] @@ -305,7 +312,8 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): output_path = str(fn / "docker-compose.yml") compiler.compile( - pipeline=pipeline, + dataset=dataset, + workspace=workspace, output_path=output_path, extra_volumes=extra_volumes, auth_provider=auth_provider, @@ -321,12 +329,13 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): @pytest.mark.usefixtures("_freeze_time") def test_docker_configuration(tmp_path_factory): """Test that extra volumes are applied correctly.""" - pipeline = Pipeline( + workspace = Workspace( name="test_pipeline", description="description of the test pipeline", base_path="/foo/bar", ) - pipeline.read( + + dataset = Dataset.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -334,13 +343,14 @@ def test_docker_configuration(tmp_path_factory): accelerator_name="GPU", ), produces={"captions_data": pa.string()}, + workspace=workspace, ) compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: - pipeline.base_path = str(fn) + workspace.base_path = str(fn) output_path = str(fn / "docker-compose.yaml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile(dataset=dataset, workspace=workspace, output_path=output_path) pipeline_configs = DockerComposeConfigs.from_spec(output_path) component_config = pipeline_configs.component_configs["first_component"] assert component_config.accelerators[0].type == "gpu" @@ -350,12 +360,12 @@ def test_docker_configuration(tmp_path_factory): @pytest.mark.usefixtures("_freeze_time") def test_invalid_docker_configuration(tmp_path_factory): """Test that a valid error is returned when an unknown accelerator is set.""" - pipeline = Pipeline( + workspace = Workspace( name="test_pipeline", description="description of the test pipeline", base_path="/foo/bar", ) - pipeline.read( + dataset = Dataset.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -363,14 +373,19 @@ def test_invalid_docker_configuration(tmp_path_factory): accelerator_name="unknown resource", ), produces={"captions_data": pa.string()}, + workspace=workspace, ) compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn, pytest.raises( # noqa PT012 - InvalidPipelineDefinition, + InvalidWorkspaceDefinition, ): - pipeline.base_path = str(fn) - compiler.compile(pipeline=pipeline, output_path="kubeflow_pipeline.yml") + workspace.base_path = str(fn) + compiler.compile( + dataset=dataset, + workspace=workspace, + output_path="kubeflow_pipeline.yml", + ) def test_kubeflow_component_creation(valid_fondant_schema, valid_kubeflow_schema): @@ -410,7 +425,7 @@ def test_kubeflow_component_spec_repr(valid_kubeflow_schema): def test_kubeflow_component_spec_from_lightweight_component( tmp_path_factory, ): - pipeline = Pipeline( + workspace = Workspace( name="test-pipeline", description="description of the test pipeline", base_path="/foo/bar", @@ -432,14 +447,15 @@ def load(self) -> dd.DataFrame: ) return dd.from_pandas(df, npartitions=1) - _ = pipeline.read( + dataset = Dataset.read( ref=CreateData, + workspace=workspace, ) compiler = KubeFlowCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_spec.yaml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile(dataset=dataset, workspace=workspace, output_path=output_path) pipeline_configs = KubeflowPipelineConfigs.from_spec(output_path) assert pipeline_configs.component_configs["createdata"].image == ( "python:3.10-slim-buster" @@ -455,20 +471,20 @@ def load(self) -> dd.DataFrame: @pytest.mark.usefixtures("_freeze_time") def test_kubeflow_compiler(setup_pipeline, tmp_path_factory): """Test compiling a pipeline to kubeflow.""" - example_dir, pipeline, _ = setup_pipeline + example_dir, workspace, dataset, _ = setup_pipeline compiler = KubeFlowCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile(dataset=dataset, workspace=workspace, output_path=output_path) pipeline_configs = KubeflowPipelineConfigs.from_spec(output_path) - assert pipeline_configs.pipeline_name == pipeline.name - assert pipeline_configs.pipeline_description == pipeline.description + assert pipeline_configs.pipeline_name == workspace.name + assert pipeline_configs.pipeline_description == workspace.description for ( component_name, component_configs, ) in pipeline_configs.component_configs.items(): # Get exepcted component configs - component = pipeline._graph[component_name] + component = dataset._graph[component_name] component_op = component["operation"] # Check that the component configs are correct @@ -495,12 +511,12 @@ def test_kubeflow_configuration(tmp_path_factory): node_pool_label = "dummy_label" node_pool_name = "dummy_label" - pipeline = Pipeline( + workspace = Workspace( name="test_pipeline", description="description of the test pipeline", base_path="/foo/bar", ) - pipeline.read( + dataset = Dataset.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -510,11 +526,12 @@ def test_kubeflow_configuration(tmp_path_factory): accelerator_name="GPU", ), produces={"captions_data": pa.string()}, + workspace=workspace, ) compiler = KubeFlowCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile(dataset=dataset, workspace=workspace, output_path=output_path) pipeline_configs = KubeflowPipelineConfigs.from_spec(output_path) component_configs = pipeline_configs.component_configs["first_component"] for accelerator in component_configs.accelerators: @@ -527,12 +544,12 @@ def test_kubeflow_configuration(tmp_path_factory): @pytest.mark.usefixtures("_freeze_time") def test_invalid_kubeflow_configuration(tmp_path_factory): """Test that an error is returned when an invalid resource is provided.""" - pipeline = Pipeline( + workspace = Workspace( name="test_pipeline", description="description of the test pipeline", base_path="/foo/bar", ) - pipeline.read( + dataset = Dataset.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -540,11 +557,16 @@ def test_invalid_kubeflow_configuration(tmp_path_factory): accelerator_name="unknown resource", ), produces={"captions_data": pa.string()}, + workspace=workspace, ) compiler = KubeFlowCompiler() - with pytest.raises(InvalidPipelineDefinition): - compiler.compile(pipeline=pipeline, output_path="kubeflow_pipeline.yml") + with pytest.raises(InvalidWorkspaceDefinition): + compiler.compile( + dataset=dataset, + workspace=workspace, + output_path="kubeflow_pipeline.yml", + ) def test_kfp_import(): @@ -559,20 +581,20 @@ def test_kfp_import(): @pytest.mark.usefixtures("_freeze_time") def test_vertex_compiler(setup_pipeline, tmp_path_factory): """Test compiling a pipeline to vertex.""" - example_dir, pipeline, _ = setup_pipeline + example_dir, workspace, dataset, _ = setup_pipeline compiler = VertexCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile(dataset=dataset, workspace=workspace, output_path=output_path) pipeline_configs = VertexPipelineConfigs.from_spec(output_path) - assert pipeline_configs.pipeline_name == pipeline.name - assert pipeline_configs.pipeline_description == pipeline.description + assert pipeline_configs.pipeline_name == workspace.name + assert pipeline_configs.pipeline_description == workspace.description for ( component_name, component_configs, ) in pipeline_configs.component_configs.items(): # Get exepcted component configs - component = pipeline._graph[component_name] + component = dataset._graph[component_name] component_op = component["operation"] # Check that the component configs are correct @@ -596,12 +618,13 @@ def test_vertex_compiler(setup_pipeline, tmp_path_factory): @pytest.mark.usefixtures("_freeze_time") def test_vertex_configuration(tmp_path_factory): """Test that the kubeflow pipeline can be configured.""" - pipeline = Pipeline( + workspace = Workspace( name="test_pipeline", description="description of the test pipeline", base_path="/foo/bar", ) - pipeline.read( + + dataset = Dataset.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -609,11 +632,12 @@ def test_vertex_configuration(tmp_path_factory): accelerator_name="NVIDIA_TESLA_K80", ), produces={"captions_data": pa.string()}, + workspace=workspace, ) compiler = VertexCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile(dataset=dataset, workspace=workspace, output_path=output_path) pipeline_configs = VertexPipelineConfigs.from_spec(output_path) component_configs = pipeline_configs.component_configs["first_component"] for accelerator in component_configs.accelerators: @@ -624,12 +648,12 @@ def test_vertex_configuration(tmp_path_factory): @pytest.mark.usefixtures("_freeze_time") def test_invalid_vertex_configuration(tmp_path_factory): """Test that extra volumes are applied correctly.""" - pipeline = Pipeline( + workspace = Workspace( name="test_pipeline", description="description of the test pipeline", base_path="/foo/bar", ) - pipeline.read( + dataset = Dataset.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -637,10 +661,15 @@ def test_invalid_vertex_configuration(tmp_path_factory): accelerator_name="unknown resource", ), produces={"captions_data": pa.string()}, + workspace=workspace, ) compiler = VertexCompiler() - with pytest.raises(InvalidPipelineDefinition): - compiler.compile(pipeline=pipeline, output_path="kubeflow_pipeline.yml") + with pytest.raises(InvalidWorkspaceDefinition): + compiler.compile( + dataset=dataset, + workspace=workspace, + output_path="kubeflow_pipeline.yml", + ) def test_caching_dependency_docker(tmp_path_factory): @@ -651,17 +680,18 @@ def test_caching_dependency_docker(tmp_path_factory): second_component_cache_key_dict = {} for arg in arg_list: - pipeline = Pipeline( + workspace = Workspace( name="test_pipeline", description="description of the test pipeline", base_path="/foo/bar", ) compiler = DockerCompiler() - dataset = pipeline.read( + dataset = Dataset.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": f"{arg}"}, produces={"images_data": pa.binary()}, + workspace=workspace, ) dataset.apply( Path(COMPONENTS_PATH / "example_1" / "second_component"), @@ -669,9 +699,14 @@ def test_caching_dependency_docker(tmp_path_factory): ) with tmp_path_factory.mktemp("temp") as fn: - pipeline.base_path = str(fn) + workspace.base_path = str(fn) output_path = str(fn / "docker-compose.yml") - compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) + compiler.compile( + dataset=dataset, + workspace=workspace, + output_path=output_path, + build_args=[], + ) pipeline_configs = DockerComposeConfigs.from_spec(output_path) metadata = json.loads( pipeline_configs.component_configs["second_component"].arguments[ @@ -695,17 +730,18 @@ def test_caching_dependency_kfp(tmp_path_factory): second_component_cache_key_dict = {} for arg in arg_list: - pipeline = Pipeline( + workspace = Workspace( name="test_pipeline", description="description of the test pipeline", base_path="/foo/bar", ) compiler = KubeFlowCompiler() - dataset = pipeline.read( + dataset = Dataset.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": f"{arg}"}, produces={"images_data": pa.binary()}, + workspace=workspace, ) dataset.apply( Path(COMPONENTS_PATH / "example_1" / "second_component"), @@ -714,7 +750,11 @@ def test_caching_dependency_kfp(tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile( + dataset=dataset, + workspace=workspace, + output_path=output_path, + ) pipeline_configs = KubeflowPipelineConfigs.from_spec(output_path) metadata = json.loads( @@ -856,13 +896,18 @@ def test_sagemaker_base_path_validator(): @pytest.mark.usefixtures("_freeze_time") def test_docker_compiler_create_local_base_path(setup_pipeline, tmp_path_factory): """Test compiling a pipeline to docker-compose.""" - example_dir, pipeline, _ = setup_pipeline + example_dir, workspace, dataset, _ = setup_pipeline compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: - pipeline.base_path = str(fn) + "/my-artifacts" + workspace.base_path = str(fn) + "/my-artifacts" output_path = str(fn / "docker-compose.yml") - compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) - assert Path(pipeline.base_path).exists() + compiler.compile( + dataset=dataset, + workspace=workspace, + output_path=output_path, + build_args=[], + ) + assert Path(workspace.base_path).exists() @pytest.mark.usefixtures("_freeze_time") @@ -871,7 +916,7 @@ def test_docker_compiler_create_local_base_path_propagate_exception( tmp_path_factory, ): """Test compiling a pipeline to docker-compose.""" - example_dir, pipeline, _ = setup_pipeline + example_dir, workspace, dataset, _ = setup_pipeline compiler = DockerCompiler() msg = re.escape( "Unable to create and mount local base path. ", @@ -881,6 +926,11 @@ def test_docker_compiler_create_local_base_path_propagate_exception( ValueError, match=msg, ): - pipeline.base_path = "/my-artifacts" + workspace.base_path = "/my-artifacts" output_path = str(fn / "docker-compose.yml") - compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) + compiler.compile( + dataset=dataset, + workspace=workspace, + output_path=output_path, + build_args=[], + ) diff --git a/tests/pipeline/test_lightweight_component.py b/tests/pipeline/test_lightweight_component.py index f9f15d06..69b5c968 100644 --- a/tests/pipeline/test_lightweight_component.py +++ b/tests/pipeline/test_lightweight_component.py @@ -13,7 +13,7 @@ from fondant.component import DaskLoadComponent, PandasTransformComponent from fondant.core.component_spec import OperationSpec from fondant.core.exceptions import InvalidLightweightComponent -from fondant.dataset import Image, Pipeline, lightweight_component +from fondant.dataset import Dataset, Image, Workspace, lightweight_component from fondant.dataset.compiler import DockerCompiler from fondant.testing import DockerComposeConfigs @@ -29,7 +29,7 @@ def default_fondant_image(): @pytest.fixture() def load_pipeline(caplog): - pipeline = Pipeline( + workspace = Workspace( name="dummy-pipeline", base_path="./data", ) @@ -53,12 +53,13 @@ def load(self) -> dd.DataFrame: load_script = CreateData.image().script - dataset = pipeline.read( + dataset = Dataset.read( ref=CreateData, + workspace=workspace, ) caplog_records = caplog.records - return pipeline, dataset, load_script, caplog_records + return workspace, dataset, load_script, caplog_records def test_build_python_script(load_pipeline): @@ -91,10 +92,10 @@ def load(self) -> dd.DataFrame: def test_lightweight_component_sdk(default_fondant_image, load_pipeline): - pipeline, dataset, load_script, caplog_records = load_pipeline + workspace, dataset, load_script, caplog_records = load_pipeline - assert len(pipeline._graph.keys()) == 1 - operation_spec_dict = pipeline._graph["createdata"][ + assert len(dataset._graph.keys()) == 1 + operation_spec_dict = dataset._graph["createdata"][ "operation" ].operation_spec.to_dict() assert operation_spec_dict == { @@ -142,11 +143,11 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: ref=AddN, arguments={"n": 1}, ) - assert len(pipeline._graph.keys()) == 1 + 1 - assert pipeline._graph["addn"]["dependencies"] == ["createdata"] - pipeline._graph["addn"]["operation"].operation_spec.to_json() + assert len(dataset._graph.keys()) == 1 + 1 + assert dataset._graph["addn"]["dependencies"] == ["createdata"] + dataset._graph["addn"]["operation"].operation_spec.to_json() - operation_spec_dict = pipeline._graph["addn"]["operation"].operation_spec.to_dict() + operation_spec_dict = dataset._graph["addn"]["operation"].operation_spec.to_dict() assert operation_spec_dict == { "specification": { "name": "AddN", @@ -170,9 +171,9 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: }, "produces": {}, } - pipeline._validate_pipeline_definition(run_id="dummy-run-id") + dataset._validate_workspace_definition(run_id="dummy-run-id", workspace=workspace) - DockerCompiler().compile(pipeline) + DockerCompiler().compile(dataset=dataset, workspace=workspace) def test_consumes_mapping_all_fields(tmp_path_factory, load_pipeline): @@ -192,7 +193,7 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: dataframe["a"] = dataframe["a"].map(lambda x: x + self.n) return dataframe - pipeline, dataset, _, _ = load_pipeline + workspace, dataset, _, _ = load_pipeline _ = dataset.apply( ref=AddN, @@ -202,7 +203,11 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - DockerCompiler().compile(pipeline=pipeline, output_path=output_path) + DockerCompiler().compile( + dataset=dataset, + workspace=workspace, + output_path=output_path, + ) pipeline_configs = DockerComposeConfigs.from_spec(output_path) operation_spec = OperationSpec.from_json( pipeline_configs.component_configs["addn"].arguments["operation_spec"], @@ -228,9 +233,9 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: dataframe["a"] = dataframe["a"].map(lambda x: x + self.n) return dataframe - pipeline, dataset, _, _ = load_pipeline + workspace, dataset, _, _ = load_pipeline - _ = dataset.apply( + dataset = dataset.apply( ref=AddN, consumes={"a": "x"}, arguments={"n": 1}, @@ -238,7 +243,11 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - DockerCompiler().compile(pipeline=pipeline, output_path=output_path) + DockerCompiler().compile( + dataset=dataset, + workspace=workspace, + output_path=output_path, + ) pipeline_configs = DockerComposeConfigs.from_spec(output_path) operation_spec = OperationSpec.from_json( pipeline_configs.component_configs["addn"].arguments["operation_spec"], @@ -265,9 +274,9 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: dataframe["a"] = dataframe["x"].map(lambda x: x + self.n) return dataframe - pipeline, dataset, _, _ = load_pipeline + workspace, dataset, _, _ = load_pipeline - _ = dataset.apply( + dataset = dataset.apply( ref=AddN, consumes={"x": pa.int32()}, arguments={"n": 1}, @@ -275,7 +284,11 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - DockerCompiler().compile(pipeline=pipeline, output_path=output_path) + DockerCompiler().compile( + dataset=dataset, + workspace=workspace, + output_path=output_path, + ) pipeline_configs = DockerComposeConfigs.from_spec(output_path) operation_spec = OperationSpec.from_json( pipeline_configs.component_configs["addn"].arguments["operation_spec"], @@ -303,9 +316,9 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: dataframe["c"] = dataframe["x"].map(lambda x: x + self.n) return dataframe - pipeline, dataset, _, _ = load_pipeline + workspace, dataset, _, _ = load_pipeline - _ = dataset.apply( + dataset = dataset.apply( ref=AddN, consumes={"x": pa.int32()}, produces={"a": pa.int32(), "b": pa.int32(), "c": pa.int32()}, @@ -314,7 +327,11 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - DockerCompiler().compile(pipeline=pipeline, output_path=output_path) + DockerCompiler().compile( + dataset=dataset, + workspace=workspace, + output_path=output_path, + ) pipeline_configs = DockerComposeConfigs.from_spec(output_path) operation_spec = OperationSpec.from_json( pipeline_configs.component_configs["addn"].arguments["operation_spec"], @@ -323,7 +340,7 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: def test_lightweight_component_missing_decorator(): - pipeline = Pipeline( + workspace = Workspace( name="dummy-pipeline", base_path="./data", ) @@ -333,9 +350,10 @@ def load(self) -> str: return "bar" with pytest.raises(InvalidLightweightComponent): - _ = pipeline.read( + Dataset.read( ref=Foo, produces={"x": pa.int32(), "y": pa.int32()}, + workspace=workspace, ) @@ -354,17 +372,18 @@ def load(self) -> dd.DataFrame: ) return dd.from_pandas(df, npartitions=1) - pipeline = Pipeline( + workspace = Workspace( name="dummy-pipeline", base_path="./data", ) - pipeline.read( + dataset = Dataset.read( ref=CreateData, + workspace=workspace, ) - assert len(pipeline._graph.keys()) == 1 - operation_spec = pipeline._graph["createdata"]["operation"].operation_spec.to_json() + assert len(dataset._graph.keys()) == 1 + operation_spec = dataset._graph["createdata"]["operation"].operation_spec.to_json() operation_spec_without_image = json.loads(operation_spec) assert operation_spec_without_image == { @@ -443,17 +462,18 @@ class CreateData(DaskLoadComponent): def load(self) -> dd.DataFrame: return None - pipeline = Pipeline( + workspace = Workspace( name="dummy-pipeline", base_path="./data", ) - pipeline.read( + dataset = Dataset.read( ref=CreateData, + workspace=workspace, ) - assert len(pipeline._graph.keys()) == 1 - operation_spec = pipeline._graph["createdata"]["operation"].operation_spec.to_json() + assert len(dataset._graph.keys()) == 1 + operation_spec = dataset._graph["createdata"]["operation"].operation_spec.to_json() operation_spec_without_image = json.loads(operation_spec) assert operation_spec_without_image == { @@ -512,7 +532,7 @@ def test_infer_consumes_if_not_defined(load_pipeline): Test that the consumes mapping is inferred when not defined in dataset interface. All columns of the dataset are consumed. """ - _, dataset, _, _ = load_pipeline + workspace, dataset, _, _ = load_pipeline @lightweight_component( base_image="python:3.10-slim-buster", @@ -528,9 +548,7 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: ref=Bar, ) - operation_spec_dict = dataset.pipeline._graph["bar"][ - "operation" - ].operation_spec.to_dict() + operation_spec_dict = dataset._graph["bar"]["operation"].operation_spec.to_dict() assert operation_spec_dict == { "consumes": { "x": {"type": "int32"}, @@ -561,7 +579,7 @@ def test_infer_consumes_if_additional_properties_true(load_pipeline): Test when additional properties is true (no consumes defined in the lightweight component), the consumes is inferred from the dataset interface. """ - _, dataset, _, _ = load_pipeline + workspace, dataset, _, _ = load_pipeline @lightweight_component( base_image="python:3.10-slim-buster", @@ -576,9 +594,7 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: ref=Bar, ) - operation_spec_dict = dataset.pipeline._graph["bar"][ - "operation" - ].operation_spec.to_dict() + operation_spec_dict = dataset._graph["bar"]["operation"].operation_spec.to_dict() assert operation_spec_dict == { "consumes": { "x": {"type": "int32"}, diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 76e37c05..d7bcdf58 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -10,13 +10,14 @@ import yaml from fondant.component import DaskLoadComponent from fondant.core.component_spec import ComponentSpec -from fondant.core.exceptions import InvalidPipelineDefinition +from fondant.core.exceptions import InvalidWorkspaceDefinition from fondant.core.schema import Field, Type from fondant.dataset import ( ComponentOp, + Dataset, Image, - Pipeline, Resources, + Workspace, lightweight_component, ) @@ -58,7 +59,7 @@ def test_component_op( arguments=component_args, ) - with pytest.raises(InvalidPipelineDefinition): + with pytest.raises(InvalidWorkspaceDefinition): ComponentOp( Path(components_path / component_names[0]), arguments=component_args, @@ -67,7 +68,7 @@ def test_component_op( ), ) - with pytest.raises(InvalidPipelineDefinition): + with pytest.raises(InvalidWorkspaceDefinition): ComponentOp( Path(components_path / component_names[0]), arguments=component_args, @@ -200,16 +201,18 @@ def test_valid_pipeline( component_args = {"storage_args": "a dummy string arg"} components_path = Path(valid_pipeline_path / example_dir) - pipeline = Pipeline(**default_pipeline_args) + workspace = Workspace(**default_pipeline_args) # override the default package_path with temporary path to avoid the creation of artifacts - monkeypatch.setattr(pipeline, "package_path", str(tmp_path / "test_pipeline.tgz")) + monkeypatch.setattr(workspace, "package_path", str(tmp_path / "test_pipeline.tgz")) - dataset = pipeline.read( + dataset = Dataset.read( Path(components_path / component_names[0]), arguments=component_args, produces={"images_array": pa.binary()}, + workspace=workspace, ) + dataset = dataset.apply( Path(components_path / component_names[1]), arguments=component_args, @@ -227,19 +230,19 @@ def test_valid_pipeline( consumes={"images_array": pa.binary()}, ) - pipeline.sort_graph() - assert list(pipeline._graph.keys()) == [ + dataset.sort_graph() + assert list(dataset._graph.keys()) == [ "first_component", "second_component", "third_component", "fourth_component", ] - assert pipeline._graph["first_component"]["dependencies"] == [] - assert pipeline._graph["second_component"]["dependencies"] == ["first_component"] - assert pipeline._graph["third_component"]["dependencies"] == ["second_component"] - assert pipeline._graph["fourth_component"]["dependencies"] == ["third_component"] + assert dataset._graph["first_component"]["dependencies"] == [] + assert dataset._graph["second_component"]["dependencies"] == ["first_component"] + assert dataset._graph["third_component"]["dependencies"] == ["second_component"] + assert dataset._graph["fourth_component"]["dependencies"] == ["third_component"] - pipeline._validate_pipeline_definition("test_pipeline") + dataset._validate_workspace_definition("test_pipeline", workspace) def test_invalid_pipeline_schema( @@ -251,22 +254,23 @@ def test_invalid_pipeline_schema( component_args = {"storage_args": "a dummy string arg"} components_path = Path(valid_pipeline_path / "example_1") - pipeline = Pipeline(**default_pipeline_args) + workspace = Workspace(**default_pipeline_args) # override the default package_path with temporary path to avoid the creation of artifacts monkeypatch.setattr( - pipeline, + workspace, "package_path", str(tmp_path / "test_pipeline.tgz"), ) - dataset = pipeline.read( + dataset = Dataset.read( Path(components_path / "first_component"), arguments=component_args, produces={"images_array": pa.binary()}, + workspace=workspace, ) # "images_pictures" does not exist in the dataset - with pytest.raises(InvalidPipelineDefinition): + with pytest.raises(InvalidWorkspaceDefinition): dataset.apply( Path(components_path / "second_component"), arguments=component_args, @@ -274,7 +278,7 @@ def test_invalid_pipeline_schema( ) # "images_array" does not exist in the component spec - with pytest.raises(InvalidPipelineDefinition): + with pytest.raises(InvalidWorkspaceDefinition): dataset.apply( Path(components_path / "second_component"), arguments=component_args, @@ -283,7 +287,7 @@ def test_invalid_pipeline_schema( # Extra field in the consumes mapping that does not have a corresponding field # in the dataset - with pytest.raises(InvalidPipelineDefinition): + with pytest.raises(InvalidWorkspaceDefinition): dataset.apply( Path(components_path / "second_component"), arguments=component_args, @@ -313,23 +317,19 @@ def test_invalid_pipeline_dependencies(default_pipeline_args, valid_pipeline_exa components_path = Path(valid_pipeline_path / example_dir) component_args = {"storage_args": "a dummy string arg"} - pipeline = Pipeline(**default_pipeline_args) + workspace = Workspace(**default_pipeline_args) - dataset = pipeline.read( + dataset = Dataset.read( Path(components_path / component_names[0]), arguments=component_args, produces={"image_data": pa.binary()}, + workspace=workspace, ) + dataset = dataset.apply( Path(components_path / component_names[1]), arguments=component_args, ) - with pytest.raises(InvalidPipelineDefinition): - pipeline.read( - Path(components_path / component_names[2]), - arguments=component_args, - produces={"image_data": pa.binary()}, - ) @pytest.mark.parametrize( @@ -351,20 +351,21 @@ def test_invalid_pipeline_declaration( components_path = Path(invalid_pipeline_path / example_dir) component_args = {"storage_args": "a dummy string arg"} - pipeline = Pipeline(**default_pipeline_args) + workspace = Workspace(**default_pipeline_args) - dataset = pipeline.read( + dataset = Dataset.read( Path(components_path / component_names[0]), arguments=component_args, produces={"image_data": pa.binary()}, + workspace=workspace, ) dataset.apply( Path(components_path / component_names[1]), arguments=component_args, ) - with pytest.raises(InvalidPipelineDefinition): - pipeline._validate_pipeline_definition("test_pipeline") + with pytest.raises(InvalidWorkspaceDefinition): + dataset._validate_workspace_definition("test_pipeline", workspace) def test_reusable_component_op(): @@ -407,25 +408,26 @@ def test_defining_reusable_component_op_with_custom_spec(): ) -def test_pipeline_name(): - Pipeline(name="valid-name", base_path="base_path") - with pytest.raises(InvalidPipelineDefinition, match="The pipeline name violates"): - Pipeline(name="invalid name", base_path="base_path") +def test_workspace_name(): + Workspace(name="valid-name", base_path="base_path") + with pytest.raises(InvalidWorkspaceDefinition, match="The workspace name violates"): + Workspace(name="invalid name", base_path="base_path") def test_schema_propagation(): """Test that the schema is propagated correctly between datasets taking into account the component specs and `consumes` and `produces` arguments. """ - pipeline = Pipeline(name="pipeline", base_path="base_path") + workspace = Workspace(name="pipeline", base_path="base_path") - pipeline.get_run_id = lambda: "pipeline-id" + workspace.get_run_id = lambda: "pipeline-id" - dataset = pipeline.read( + dataset = Dataset.read( "load_from_hf_hub", produces={ "image": pa.binary(), }, + workspace=workspace, ) assert dataset.fields == { @@ -501,15 +503,16 @@ def test_invoked_field_schema_raise_exception(): """Test that check if the invoked field schema not matches the current schema raise an InvalidPipelineDefinition. """ - pipeline = Pipeline(name="pipeline", base_path="base_path") + workspace = Workspace(name="pipeline", base_path="base_path") - pipeline.get_run_id = lambda: "pipeline-id" + workspace.get_run_id = lambda: "pipeline-id" - dataset = pipeline.read( + dataset = Dataset.read( "load_from_hf_hub", produces={ "image": pa.binary(), }, + workspace=workspace, ) dataset.write( @@ -526,8 +529,8 @@ def test_invoked_field_schema_raise_exception(): "it with this type:\nType(DataType(string))", ) - with pytest.raises(InvalidPipelineDefinition, match=expected_error_msg): - pipeline.validate("pipeline-id") + with pytest.raises(InvalidWorkspaceDefinition, match=expected_error_msg): + dataset.validate("pipeline-id", workspace=workspace) @pytest.mark.parametrize( @@ -555,15 +558,16 @@ def test_infer_consumes_if_not_defined( component_args = {"storage_args": "a dummy string arg"} components_path = Path(valid_pipeline_path / example_dir) - pipeline = Pipeline(**default_pipeline_args) + workspace = Workspace(**default_pipeline_args) # override the default package_path with temporary path to avoid the creation of artifacts - monkeypatch.setattr(pipeline, "package_path", str(tmp_path / "test_pipeline.tgz")) + monkeypatch.setattr(workspace, "package_path", str(tmp_path / "test_pipeline.tgz")) - dataset = pipeline.read( + dataset = Dataset.read( Path(components_path / component_names[0]), arguments=component_args, produces={"images_array": pa.binary()}, + workspace=workspace, ) # Empty consumes & additionalProperties=False -> infer component spec defined columns @@ -573,9 +577,9 @@ def test_infer_consumes_if_not_defined( arguments=component_args, ) - assert dataset.pipeline._graph["second_component"][ - "operation" - ].operation_spec.to_dict()["consumes"] == { + assert dataset._graph["second_component"]["operation"].operation_spec.to_dict()[ + "consumes" + ] == { "images_data": {"type": "binary"}, } @@ -586,9 +590,9 @@ def test_infer_consumes_if_not_defined( arguments=component_args, ) - assert dataset.pipeline._graph["third_component"][ - "operation" - ].operation_spec.to_dict()["consumes"] == { + assert dataset._graph["third_component"]["operation"].operation_spec.to_dict()[ + "consumes" + ] == { "images_data": {"type": "binary"}, "embeddings_data": {"items": {"type": "float32"}, "type": "array"}, } @@ -605,9 +609,9 @@ def test_infer_consumes_if_not_defined( arguments=component_args, ) - assert dataset.pipeline._graph["fourth_component"][ - "operation" - ].operation_spec.to_dict()["consumes"] == { + assert dataset._graph["fourth_component"]["operation"].operation_spec.to_dict()[ + "consumes" + ] == { "images_data": {"type": "binary"}, "images_array": {"type": "binary"}, "embeddings_data": {"items": {"type": "float32"}, "type": "array"}, @@ -622,15 +626,16 @@ def test_consumes_name_to_name_mapping( """Test that a valid pipeline definition can be compiled without errors.""" component_args = {"storage_args": "a dummy string arg"} components_path = Path(valid_pipeline_path / "example_1") - pipeline = Pipeline(**default_pipeline_args) + workspace = Workspace(**default_pipeline_args) # override the default package_path with temporary path to avoid the creation of artifacts - monkeypatch.setattr(pipeline, "package_path", str(tmp_path / "test_pipeline.tgz")) + monkeypatch.setattr(workspace, "package_path", str(tmp_path / "test_pipeline.tgz")) - dataset = pipeline.read( + dataset = Dataset.read( Path(components_path / "first_component"), arguments=component_args, produces={"images_data": pa.binary(), "second_field": pa.string()}, + workspace=workspace, ) dataset.apply( @@ -639,8 +644,26 @@ def test_consumes_name_to_name_mapping( consumes={"images_data": "images_data"}, ) - assert dataset.pipeline._graph["fourth_component"][ - "operation" - ].operation_spec.to_dict()["consumes"] == { + assert dataset._graph["fourth_component"]["operation"].operation_spec.to_dict()[ + "consumes" + ] == { "images_data": {"type": "binary"}, } + + +def test_new_pipeline_interface(): + workspace = Workspace(name="test_workspace", base_path="some/path") + + components_path = Path(valid_pipeline_path / "example_1") + + dataset = Dataset.read( + Path(components_path / "first_component"), + produces={"images_data": pa.binary()}, + workspace=workspace, + ) + + dataset = dataset.apply( + Path(components_path / "second_component"), + consumes={"images_data": "images_data"}, + ) + assert len(dataset._graph) == 2 # noqa PLR2004 diff --git a/tests/pipeline/test_runner.py b/tests/pipeline/test_runner.py index d55ae8f8..011af323 100644 --- a/tests/pipeline/test_runner.py +++ b/tests/pipeline/test_runner.py @@ -6,7 +6,7 @@ from unittest import mock import pytest -from fondant.dataset import Pipeline +from fondant.dataset import Dataset, Workspace from fondant.dataset.runner import ( DockerRunner, KubeflowRunner, @@ -16,7 +16,7 @@ VALID_PIPELINE = Path("./tests/pipeline/examples/pipelines/compiled_pipeline/") -PIPELINE = Pipeline( +WORKSPACE = Workspace( name="testpipeline", description="description of the test pipeline", base_path="/foo/bar", @@ -42,7 +42,7 @@ def mock_check_docker_compose_install(self): def test_docker_runner(mock_docker_installation): """Test that the docker runner while mocking subprocess.call.""" with mock.patch("subprocess.call") as mock_call: - DockerRunner().run("some/path") + DockerRunner().run("some/path", workspace=WORKSPACE) mock_call.assert_called_once_with( [ "docker", @@ -63,12 +63,14 @@ def test_docker_runner_from_pipeline(mock_docker_installation, tmp_path_factory) with mock.patch("subprocess.call") as mock_call, tmp_path_factory.mktemp( "temp", ) as fn: - pipeline = Pipeline( + workspace = Workspace( name="testpipeline", description="description of the test pipeline", base_path=str(fn), ) - DockerRunner().run(pipeline) + + dataset = Dataset() + DockerRunner().run(dataset, workspace) mock_call.assert_called_once_with( [ "docker", @@ -164,7 +166,7 @@ def test_kubeflow_runner(): new=MockKfpClient, ): runner = KubeflowRunner(host="some_host") - runner.run(input=input_spec_path) + runner.run(dataset=input_spec_path, workspace=WORKSPACE) assert runner.client.host == "some_host" @@ -176,7 +178,11 @@ def test_kubeflow_runner_new_experiment(): new=MockKfpClient, ): runner = KubeflowRunner(host="some_host") - runner.run(input=input_spec_path, experiment_name="NewExperiment") + runner.run( + dataset=input_spec_path, + experiment_name="NewExperiment", + workspace=WORKSPACE, + ) def test_kfp_import(): @@ -194,7 +200,8 @@ def test_kfp_import(): class MockKubeFlowCompiler: def compile( self, - pipeline, + dataset, + workspace, output_path, ) -> None: with open(output_path, "w") as f: @@ -212,8 +219,10 @@ def test_kubeflow_runner_from_pipeline(): new=MockKfpClient, ): runner = KubeflowRunner(host="some_host") + dataset = Dataset() runner.run( - input=PIPELINE, + dataset=dataset, + workspace=WORKSPACE, ) mock_run.assert_called_once_with( @@ -228,7 +237,7 @@ def test_vertex_runner(): "google.cloud.aiplatform.PipelineJob", ): runner = VertexRunner(project_id="some_project", region="some_region") - runner.run(input=input_spec_path) + runner.run(dataset=input_spec_path, workspace=WORKSPACE) # test with service account runner2 = VertexRunner( @@ -236,7 +245,7 @@ def test_vertex_runner(): region="some_region", service_account="some_account", ) - runner2.run(input=input_spec_path) + runner2.run(dataset=input_spec_path, workspace=WORKSPACE) def test_vertex_runner_from_pipeline(): @@ -249,7 +258,8 @@ def test_vertex_runner_from_pipeline(): ): runner = VertexRunner(project_id="some_project", region="some_region") runner.run( - input=PIPELINE, + dataset=Dataset(), + workspace=WORKSPACE, ) mock_run.assert_called_once_with(".fondant/vertex-pipeline.yaml") @@ -265,7 +275,8 @@ def test_sagemaker_runner(tmp_path_factory): runner = SagemakerRunner() runner.run( - input=tmpdir / "spec.json", + dataset=tmpdir / "spec.json", + workspace=WORKSPACE, pipeline_name="pipeline_1", role_arn="arn:something", ) @@ -291,7 +302,8 @@ def test_sagemaker_runner(tmp_path_factory): ) runner.run( - input=tmpdir / "spec.json", + dataset=tmpdir / "spec.json", + workspace=WORKSPACE, pipeline_name="pipeline_1", role_arn="arn:something", ) @@ -313,7 +325,8 @@ def test_sagemaker_runner(tmp_path_factory): class MockSagemakerCompiler: def compile( self, - pipeline, + dataset, + workspace, output_path, *, role_arn, @@ -329,7 +342,8 @@ def test_sagemaker_runner_from_pipeline(): ), mock.patch("boto3.client", spec=True): runner = SagemakerRunner() runner.run( - input=PIPELINE, - pipeline_name=PIPELINE.name, + dataset=Dataset(), + workspace=WORKSPACE, + pipeline_name=WORKSPACE.name, role_arn="arn:something", ) diff --git a/tests/test_cli.py b/tests/test_cli.py index b82c6a95..5c8cabcb 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -7,16 +7,16 @@ import pytest from fondant.cli import ( ComponentImportError, - PipelineImportError, + DatasetImportError, build, compile_kfp, compile_local, compile_sagemaker, compile_vertex, component_from_module, + dataset_from_string, execute, get_module, - pipeline_from_string, run_kfp, run_local, run_vertex, @@ -24,7 +24,7 @@ from fondant.component import DaskLoadComponent from fondant.component.executor import Executor, ExecutorFactory from fondant.core.schema import CloudCredentialsMount -from fondant.dataset import Pipeline +from fondant.dataset import Dataset, Workspace from fondant.dataset.runner import DockerRunner commands = [ @@ -68,7 +68,8 @@ def test_basic_invocation(command): assert process.returncode == 0 -TEST_PIPELINE = Pipeline("test_pipeline", base_path="some/path") +TEST_DATASET = Dataset(name="test_dataset", run_id="run-id-1") +TEST_WORKSPACE = Workspace("test_workspace", base_path="/dummy/path") @pytest.mark.parametrize( @@ -124,17 +125,17 @@ def test_component_from_module_error(module_str): "module_str", [ __name__, - "examples.example_modules.pipeline", - "examples.example_modules.pipeline:pipeline", - "examples.example_modules.pipeline:create_pipeline", - "examples.example_modules.pipeline:create_pipeline_with_args('test_pipeline')", - "examples.example_modules.pipeline:create_pipeline_with_args(name='test_pipeline')", + "examples.example_modules.dataset", + "examples.example_modules.dataset:workspace", + "examples.example_modules.dataset:create_dataset", + "examples.example_modules.dataset:create_dataset_with_args('test_dataset')", + "examples.example_modules.dataset:create_dataset_with_args(name='test_dataset')", ], ) def test_pipeline_from_module(module_str): """Test that pipeline_from_string works.""" - pipeline = pipeline_from_string(module_str) - assert pipeline.name == "test_pipeline" + dataset = dataset_from_string(module_str) + assert dataset.name == "test_dataset" @pytest.mark.parametrize( @@ -143,35 +144,35 @@ def test_pipeline_from_module(module_str): # module does not contain a pipeline instance "examples.example_modules.component", # module contains many pipeline instances - "examples.example_modules.invalid_double_pipeline", + "examples.example_modules.invalid_double_workspace", # Factory expects an argument - "examples.example_modules.pipeline:create_pipeline_with_args", + "examples.example_modules.dataset:create_pipeline_with_args", # Factory does not expect an argument - "examples.example_modules.pipeline:create_pipeline('test_pipeline')", + "examples.example_modules.dataset:create_pipeline('test_pipeline')", # Factory does not expect an argument - "examples.example_modules.pipeline:create_pipeline(name='test_pipeline')", + "examples.example_modules.dataset:create_pipeline(name='test_pipeline')", # Invalid argument - "examples.example_modules.pipeline:create_pipeline(name)", + "examples.example_modules.dataset:create_pipeline(name)", # Not a variable or function - "examples.example_modules.pipeline:[]", + "examples.example_modules.dataset:[]", # Attribute doesn't exist - "examples.example_modules.pipeline:no_pipeline", + "examples.example_modules.dataset:no_pipeline", # Attribute is not a valid python name - "examples.example_modules.pipeline:pipe;line", + "examples.example_modules.dataset:pipe;line", # Not a Pipeline - "examples.example_modules.pipeline:number", + "examples.example_modules.dataset:number", ], ) def test_pipeline_from_module_error(module_str): """Test different error cases for pipeline_from_string.""" - with pytest.raises(PipelineImportError): - pipeline_from_string(module_str) + with pytest.raises(DatasetImportError): + dataset_from_string(module_str) def test_factory_error_propagated(): """Test that an error in the factory method is correctly propagated.""" with pytest.raises(NotImplementedError): - pipeline_from_string("examples.example_modules.pipeline:not_implemented") + dataset_from_string("examples.example_modules.dataset:not_implemented") def test_execute_logic(monkeypatch): @@ -201,7 +202,7 @@ def test_local_compile(tmp_path_factory): compile_local(args) mock_compiler.assert_called_once_with( - pipeline=TEST_PIPELINE, + pipeline=TEST_DATASET, extra_volumes=[], output_path=str(fn / "docker-compose.yml"), build_args=[], @@ -222,7 +223,7 @@ def test_kfp_compile(tmp_path_factory): ) compile_kfp(args) mock_compiler.assert_called_once_with( - pipeline=TEST_PIPELINE, + pipeline=TEST_DATASET, output_path=str(fn / "kubeflow_pipeline.yml"), ) @@ -240,7 +241,7 @@ def test_vertex_compile(tmp_path_factory): ) compile_vertex(args) mock_compiler.assert_called_once_with( - pipeline=TEST_PIPELINE, + pipeline=TEST_DATASET, output_path=str(fn / "vertex_pipeline.yml"), ) @@ -260,7 +261,7 @@ def test_sagemaker_compile(tmp_path_factory): ) compile_sagemaker(args) mock_compiler.assert_called_once_with( - pipeline=TEST_PIPELINE, + pipeline=TEST_DATASET, output_path=str(fn / "sagemaker_pipeline.json"), role_arn="some_role", ) @@ -338,13 +339,15 @@ def test_local_run_cloud_credentials(mock_docker_installation): credentials=None, extra_volumes=[], build_arg=[], + workspace=TEST_WORKSPACE, ) run_local(args) mock_compiler.assert_called_once_with( - TEST_PIPELINE, - extra_volumes=[], + TEST_DATASET, + workspace=TEST_WORKSPACE, output_path=".fondant/compose.yaml", + extra_volumes=[], build_args=[], auth_provider=auth_provider, ) @@ -372,7 +375,7 @@ def test_kfp_run(tmp_path_factory): local=False, vertex=False, output_path=None, - ref=__name__, + ref="dataset", host=None, ) with pytest.raises( @@ -386,7 +389,7 @@ def test_kfp_run(tmp_path_factory): local=False, output_path=None, host="localhost", - ref=__name__, + ref="dataset", ) run_kfp(args) mock_runner.assert_called_once_with(host="localhost") @@ -401,7 +404,7 @@ def test_kfp_run(tmp_path_factory): local=False, host="localhost2", output_path=str(fn / "kubeflow_pipelines.yml"), - ref=__name__, + ref="dataset", ) run_kfp(args) mock_runner.assert_called_once_with(host="localhost2") @@ -419,7 +422,7 @@ def test_vertex_run(tmp_path_factory): project_id="project-123", service_account=None, network=None, - ref=__name__, + ref="dataset", ) run_vertex(args) mock_runner.assert_called_once_with( @@ -440,7 +443,7 @@ def test_vertex_run(tmp_path_factory): local=False, host="localhost2", output_path=str(fn / "kubeflow_pipelines.yml"), - ref=__name__, + ref="dataset", region="europe-west-1", project_id="project-123", service_account=None,