From 70a18dc25060db690e5cefc1cbf316d859099774 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Tue, 26 Mar 2024 09:09:41 +0100 Subject: [PATCH] Fixing tests --- src/fondant/cli.py | 13 ++--- src/fondant/core/manifest.py | 52 +++++++++-------- src/fondant/core/schema.py | 29 +++++++++- src/fondant/core/schemas/manifest.json | 15 ++--- src/fondant/dataset/compiler.py | 4 +- src/fondant/dataset/dataset.py | 13 ++--- tests/examples/example_modules/dataset.py | 4 +- tests/pipeline/test_lightweight_component.py | 61 ++++++-------------- tests/test_cli.py | 8 ++- 9 files changed, 99 insertions(+), 100 deletions(-) diff --git a/src/fondant/cli.py b/src/fondant/cli.py index 7515c32f..43d8c0ea 100644 --- a/src/fondant/cli.py +++ b/src/fondant/cli.py @@ -29,7 +29,7 @@ from types import ModuleType from fondant.core.schema import CloudCredentialsMount -from fondant.dataset import Dataset, Workspace +from fondant.dataset import Dataset if t.TYPE_CHECKING: from fondant.component import Component @@ -610,12 +610,9 @@ def run_local(args): # use workspace from cli command # if args.workspace exists - workspace = getattr(args, "workspace", None) - if workspace is None: - workspace = Workspace( - name="dummy_workspace", - base_path=".artifacts", - ) # TODO: handle in #887 -> retrieve global workspace or init default one + working_directory = getattr(args, "working_directory", None) + if working_directory is None: + working_directory = "./.fondant" if args.extra_volumes: extra_volumes.extend(args.extra_volumes) @@ -628,7 +625,7 @@ def run_local(args): runner = DockerRunner() runner.run( dataset=dataset, - workspace=workspace, + working_directory=working_directory, extra_volumes=extra_volumes, build_args=args.build_arg, auth_provider=args.auth_provider, diff --git a/src/fondant/core/manifest.py b/src/fondant/core/manifest.py index 3c05470e..12fbb2c1 100644 --- a/src/fondant/core/manifest.py +++ b/src/fondant/core/manifest.py @@ -95,7 +95,7 @@ def retrieve_from_filesystem(uri: str) -> Resource: def create( cls, *, - dataset_name: t.Optional[str] = None, + dataset_name: t.Optional[str] = "", run_id: str, component_id: t.Optional[str] = None, cache_key: t.Optional[str] = None, @@ -272,33 +272,36 @@ def evolve( # : PLR0912 (too many branches) evolved_manifest.update_metadata(key="component_id", value=component_id) evolved_manifest.update_metadata(key="run_id", value=run_id) - if working_directory: - evolved_manifest = self.evolve_manifest_index_and_field_locations( - component_id, - evolved_manifest, - operation_spec, - run_id, - working_directory, - ) + evolved_manifest = self.evolve_manifest_index_and_field_locations( + component_id, + evolved_manifest, + operation_spec, + run_id, + working_directory, + ) return evolved_manifest def evolve_manifest_index_and_field_locations( # noqa PLR0913 self, - component_id, - evolved_manifest, - operation_spec, - run_id, - working_dir, + component_id: str, + evolved_manifest: "Manifest", + operation_spec: OperationSpec, + run_id: str, + working_dir: t.Optional[str] = None, ): - # TODO: check when we should change the index? + """Evolve the manifest index and field locations based on the component spec.""" # Update index location as this is always rewritten - evolved_manifest.add_or_update_field( - Field( + if working_dir: + field = Field.create( name="index", - location=f"{working_dir}/{self.dataset_name}/{run_id}/{component_id}", - ), - ) + working_dir=working_dir, + run_id=run_id, + component_id=component_id, + dataset_name=self.dataset_name, + ) + evolved_manifest.add_or_update_field(field, overwrite=False) + # Remove all previous fields if the component changes the index if operation_spec.previous_index: for field_name in evolved_manifest.fields: @@ -308,10 +311,13 @@ def evolve_manifest_index_and_field_locations( # noqa PLR0913 # If field was not part of the input manifest, add field to output manifest. # If field was part of the input manifest and got produced by the component, update # the manifest field. - field.location = ( - f"{working_dir}/{self.dataset_name}/{run_id}/{component_id}" + evolved_field = field.update_location( + working_dir=working_dir, + run_id=run_id, + component_id=component_id, + dataset_name=self.dataset_name, ) - evolved_manifest.add_or_update_field(field, overwrite=True) + evolved_manifest.add_or_update_field(evolved_field, overwrite=True) return evolved_manifest diff --git a/src/fondant/core/schema.py b/src/fondant/core/schema.py index 728bef5d..287747ea 100644 --- a/src/fondant/core/schema.py +++ b/src/fondant/core/schema.py @@ -261,7 +261,7 @@ def __init__( self, name: str, type: Type = Type("null"), - location: str = "", + location: t.Optional[str] = None, ) -> None: self.name = name self.type = type @@ -274,6 +274,33 @@ def __repr__(self): def __eq__(self, other): return vars(self) == vars(other) + @classmethod + def create( # noqa: PLR0913 + cls, + name: str, + run_id: str, + component_id: str, + dataset_name: str, + working_dir: t.Optional[str] = None, + ): + """Create a Field instance with the correct location based on the provided parameters.""" + if working_dir: + location = f"{working_dir}/{dataset_name}/{run_id}/{component_id}" + return Field(name=name, location=location) + return Field(name=name) + + def update_location( + self, + run_id: str, + component_id: str, + dataset_name: str, + working_dir: t.Optional[str] = None, + ): + """Update the location of the field based on the provided parameters.""" + if working_dir: + self.location = f"{working_dir}/{dataset_name}/{run_id}/{component_id}" + return self + def validate_partition_size(arg_value): if arg_value in ["disable", None, "None"]: diff --git a/src/fondant/core/schemas/manifest.json b/src/fondant/core/schemas/manifest.json index 1e65042d..dc09b09e 100644 --- a/src/fondant/core/schemas/manifest.json +++ b/src/fondant/core/schemas/manifest.json @@ -6,13 +6,13 @@ "type": "object", "properties": { "dataset_name": { - "type": "string" + "type": ["string", "null"] }, "manifest_location": { - "type": "string" + "type": ["string", "null"] }, "run_id": { - "type": "string" + "type": ["string", "null"] }, "component_id": { "type": ["string", "null"] @@ -29,10 +29,7 @@ "location": { "type": "string" } - }, - "required": [ - "location" - ] + } }, "fields": { "$ref": "#/definitions/fields" @@ -40,7 +37,6 @@ }, "required": [ "metadata", - "index", "fields" ], "definitions": { @@ -48,12 +44,11 @@ "type": "object", "properties": { "location": { - "type": "string", + "type": ["string", "null"], "pattern": "/.*" } }, "required": [ - "location", "type" ] }, diff --git a/src/fondant/dataset/compiler.py b/src/fondant/dataset/compiler.py index f9494bce..11ce2b5d 100644 --- a/src/fondant/dataset/compiler.py +++ b/src/fondant/dataset/compiler.py @@ -92,7 +92,7 @@ def compile( self, dataset: Dataset, *, - working_directory: t.Optional[str], + working_directory: t.Optional[str] = None, output_path: str = "docker-compose.yml", extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None, build_args: t.Optional[t.List[str]] = None, @@ -276,7 +276,7 @@ def _generate_spec( command.extend( [ "--input_manifest_path", - f"{dataset.manifest.manifest_location()}", + f"{dataset.manifest.manifest_location}", ], ) diff --git a/src/fondant/dataset/dataset.py b/src/fondant/dataset/dataset.py index 2ad3703d..e2d32d81 100644 --- a/src/fondant/dataset/dataset.py +++ b/src/fondant/dataset/dataset.py @@ -467,12 +467,8 @@ class Dataset: def __init__( self, manifest: Manifest, - name: t.Optional[str] = None, description: t.Optional[str] = None, ): - if name is not None: - self.name = self._validate_dataset_name(name) - self.description = description self._graph: t.OrderedDict[str, t.Any] = OrderedDict() self.task_without_dependencies_added = False @@ -482,7 +478,7 @@ def __init__( def _validate_dataset_name(name: str) -> str: pattern = r"^[a-z0-9][a-z0-9_-]*$" if not re.match(pattern, name): - msg = f"The workspace name violates the pattern {pattern}" + msg = f"The dataset name violates the pattern {pattern}" raise InvalidWorkspaceDefinition(msg) return name @@ -492,6 +488,11 @@ def get_run_id(name) -> str: timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") return f"{name}-{timestamp}" + @property + def name(self) -> str: + """The name of the dataset.""" + return self.manifest.dataset_name + def register_operation( self, operation: ComponentOp, @@ -820,8 +821,6 @@ def apply( Returns: An intermediate dataset. """ - # TODO: add method call to retrieve workspace context, and make passing workspace optional - operation = ComponentOp.from_ref( ref, fields=self.fields, diff --git a/tests/examples/example_modules/dataset.py b/tests/examples/example_modules/dataset.py index 33da8f67..c8163bd4 100644 --- a/tests/examples/example_modules/dataset.py +++ b/tests/examples/example_modules/dataset.py @@ -2,11 +2,11 @@ def create_dataset_with_args(name): - return Dataset(name) + return Dataset.create("load_from_parquet", dataset_name=name) def create_dataset(): - return Dataset("test_dataset") + return Dataset.create("load_from_parquet", dataset_name="test_dataset") def not_implemented(): diff --git a/tests/pipeline/test_lightweight_component.py b/tests/pipeline/test_lightweight_component.py index 90c99a90..430f1046 100644 --- a/tests/pipeline/test_lightweight_component.py +++ b/tests/pipeline/test_lightweight_component.py @@ -13,7 +13,7 @@ from fondant.component import DaskLoadComponent, PandasTransformComponent from fondant.core.component_spec import OperationSpec from fondant.core.exceptions import InvalidLightweightComponent -from fondant.dataset import Dataset, Image, Workspace, lightweight_component +from fondant.dataset import Dataset, Image, lightweight_component from fondant.dataset.compiler import DockerCompiler from fondant.testing import DockerComposeConfigs @@ -29,11 +29,6 @@ def default_fondant_image(): @pytest.fixture() def load_pipeline(caplog): - workspace = Workspace( - name="dummy-pipeline", - base_path="./data", - ) - @lightweight_component( base_image="python:3.10-slim-buster", extra_requires=["pandas", "dask"], @@ -53,17 +48,17 @@ def load(self) -> dd.DataFrame: load_script = CreateData.image().script - dataset = Dataset.read( + dataset = Dataset.create( ref=CreateData, - workspace=workspace, + dataset_name="dummy-dataset", ) caplog_records = caplog.records - return workspace, dataset, load_script, caplog_records + return dataset, load_script, caplog_records def test_build_python_script(load_pipeline): - _, _, load_script, _ = load_pipeline + _, load_script, _ = load_pipeline assert load_script == textwrap.dedent( """\ from typing import * @@ -92,7 +87,7 @@ def load(self) -> dd.DataFrame: def test_lightweight_component_sdk(default_fondant_image, load_pipeline): - workspace, dataset, load_script, caplog_records = load_pipeline + dataset, load_script, caplog_records = load_pipeline assert len(dataset._graph.keys()) == 1 operation_spec_dict = dataset._graph["createdata"][ @@ -171,9 +166,9 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: }, "produces": {}, } - dataset._validate_dataset_definition(run_id="dummy-run-id", workspace=workspace) + dataset._validate_dataset_definition(run_id="dummy-run-id") - DockerCompiler().compile(dataset=dataset, workspace=workspace) + DockerCompiler().compile(dataset=dataset) def test_consumes_mapping_all_fields(tmp_path_factory, load_pipeline): @@ -193,7 +188,7 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: dataframe["a"] = dataframe["a"].map(lambda x: x + self.n) return dataframe - workspace, dataset, _, _ = load_pipeline + dataset, _, _ = load_pipeline _ = dataset.apply( ref=AddN, @@ -205,7 +200,6 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: output_path = str(fn / "kubeflow_pipeline.yml") DockerCompiler().compile( dataset=dataset, - workspace=workspace, output_path=output_path, ) pipeline_configs = DockerComposeConfigs.from_spec(output_path) @@ -233,7 +227,7 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: dataframe["a"] = dataframe["a"].map(lambda x: x + self.n) return dataframe - workspace, dataset, _, _ = load_pipeline + dataset, _, _ = load_pipeline dataset = dataset.apply( ref=AddN, @@ -245,7 +239,6 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: output_path = str(fn / "kubeflow_pipeline.yml") DockerCompiler().compile( dataset=dataset, - workspace=workspace, output_path=output_path, ) pipeline_configs = DockerComposeConfigs.from_spec(output_path) @@ -274,7 +267,7 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: dataframe["a"] = dataframe["x"].map(lambda x: x + self.n) return dataframe - workspace, dataset, _, _ = load_pipeline + dataset, _, _ = load_pipeline dataset = dataset.apply( ref=AddN, @@ -286,7 +279,6 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: output_path = str(fn / "kubeflow_pipeline.yml") DockerCompiler().compile( dataset=dataset, - workspace=workspace, output_path=output_path, ) pipeline_configs = DockerComposeConfigs.from_spec(output_path) @@ -316,7 +308,7 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: dataframe["c"] = dataframe["x"].map(lambda x: x + self.n) return dataframe - workspace, dataset, _, _ = load_pipeline + dataset, _, _ = load_pipeline dataset = dataset.apply( ref=AddN, @@ -329,7 +321,6 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: output_path = str(fn / "kubeflow_pipeline.yml") DockerCompiler().compile( dataset=dataset, - workspace=workspace, output_path=output_path, ) pipeline_configs = DockerComposeConfigs.from_spec(output_path) @@ -340,20 +331,14 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: def test_lightweight_component_missing_decorator(): - workspace = Workspace( - name="dummy-pipeline", - base_path="./data", - ) - class Foo(DaskLoadComponent): def load(self) -> str: return "bar" with pytest.raises(InvalidLightweightComponent): - Dataset.read( + Dataset.create( ref=Foo, produces={"x": pa.int32(), "y": pa.int32()}, - workspace=workspace, ) @@ -372,14 +357,8 @@ def load(self) -> dd.DataFrame: ) return dd.from_pandas(df, npartitions=1) - workspace = Workspace( - name="dummy-pipeline", - base_path="./data", - ) - - dataset = Dataset.read( + dataset = Dataset.create( ref=CreateData, - workspace=workspace, ) assert len(dataset._graph.keys()) == 1 @@ -462,14 +441,8 @@ class CreateData(DaskLoadComponent): def load(self) -> dd.DataFrame: return None - workspace = Workspace( - name="dummy-pipeline", - base_path="./data", - ) - - dataset = Dataset.read( + dataset = Dataset.create( ref=CreateData, - workspace=workspace, ) assert len(dataset._graph.keys()) == 1 @@ -532,7 +505,7 @@ def test_infer_consumes_if_not_defined(load_pipeline): Test that the consumes mapping is inferred when not defined in dataset interface. All columns of the dataset are consumed. """ - workspace, dataset, _, _ = load_pipeline + dataset, _, _ = load_pipeline @lightweight_component( base_image="python:3.10-slim-buster", @@ -579,7 +552,7 @@ def test_infer_consumes_if_additional_properties_true(load_pipeline): Test when additional properties is true (no consumes defined in the lightweight component), the consumes is inferred from the dataset interface. """ - workspace, dataset, _, _ = load_pipeline + dataset, _, _ = load_pipeline @lightweight_component( base_image="python:3.10-slim-buster", diff --git a/tests/test_cli.py b/tests/test_cli.py index 5c8cabcb..83553977 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -23,6 +23,7 @@ ) from fondant.component import DaskLoadComponent from fondant.component.executor import Executor, ExecutorFactory +from fondant.core.manifest import Manifest from fondant.core.schema import CloudCredentialsMount from fondant.dataset import Dataset, Workspace from fondant.dataset.runner import DockerRunner @@ -68,7 +69,8 @@ def test_basic_invocation(command): assert process.returncode == 0 -TEST_DATASET = Dataset(name="test_dataset", run_id="run-id-1") +TEST_MANIFEST = Manifest.create(dataset_name="test_dataset", run_id="test_run_id") +TEST_DATASET = Dataset(manifest=TEST_MANIFEST) TEST_WORKSPACE = Workspace("test_workspace", base_path="/dummy/path") @@ -339,13 +341,13 @@ def test_local_run_cloud_credentials(mock_docker_installation): credentials=None, extra_volumes=[], build_arg=[], - workspace=TEST_WORKSPACE, + working_directory="dummy-dir", ) run_local(args) mock_compiler.assert_called_once_with( TEST_DATASET, - workspace=TEST_WORKSPACE, + working_directory="dummy-dir", output_path=".fondant/compose.yaml", extra_volumes=[], build_args=[],