diff --git a/components/load_from_hf_hub/fondant_component.yaml b/components/load_from_hf_hub/fondant_component.yaml index 0099a92f8..532b77d25 100644 --- a/components/load_from_hf_hub/fondant_component.yaml +++ b/components/load_from_hf_hub/fondant_component.yaml @@ -23,4 +23,8 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int + default: None + index_column: + description: Column to set index to in the load component, if not specified a default globally unique index will be set + type: str default: None \ No newline at end of file diff --git a/components/load_from_hf_hub/src/main.py b/components/load_from_hf_hub/src/main.py index 2f1492bab..013d1d277 100644 --- a/components/load_from_hf_hub/src/main.py +++ b/components/load_from_hf_hub/src/main.py @@ -3,6 +3,7 @@ import typing as t import dask.dataframe as dd +import pandas as pd from fondant.component import DaskLoadComponent from fondant.executor import DaskLoadExecutor @@ -16,7 +17,8 @@ def __init__(self, *_, column_name_mapping: dict, image_column_names: t.Optional[list], n_rows_to_load: t.Optional[int], - ) -> None: + index_column:t.Optional[str], + ) -> None: """ Args: dataset_name: name of the dataset to load. @@ -25,11 +27,14 @@ def __init__(self, *_, format the image from HF hub format to a byte string n_rows_to_load: optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale. + index_column: Column to set index to in the load component, if not specified a default + globally unique index will be set. """ self.dataset_name = dataset_name self.column_name_mapping = column_name_mapping self.image_column_names = image_column_names self.n_rows_to_load = n_rows_to_load + self.index_column = index_column def load(self) -> dd.DataFrame: # 1) Load data, read as Dask dataframe @@ -59,11 +64,27 @@ def load(self) -> dd.DataFrame: dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) dask_df = dd.from_pandas(dask_df, npartitions=npartitions) - # Set monotonically increasing index - logger.info("Setting the index...") - dask_df["id"] = 1 - dask_df["id"] = dask_df.id.cumsum() - dask_df = dask_df.set_index("id", sort=True) + # 4) Set the index + if self.index_column is None: + logger.info( + "Index column not specified, setting a globally unique index", + ) + + def _set_unique_index(dataframe: pd.DataFrame, partition_info=None): + """Function that sets a unique index based on the partition and row number.""" + dataframe["id"] = 1 + dataframe["id"] = ( + str(partition_info["number"]) + + "_" + + (dataframe.id.cumsum()).astype(str) + ) + dataframe.index = dataframe.pop("id") + return dataframe + + dask_df = dask_df.map_partitions(_set_unique_index, meta=dask_df.head()) + else: + logger.info(f"Setting `{self.index_column}` as index") + dask_df = dask_df.set_index(self.index_column, drop=True) return dask_df diff --git a/docs/generic_component.md b/docs/generic_component.md index c8a15f724..09b4d15ed 100644 --- a/docs/generic_component.md +++ b/docs/generic_component.md @@ -59,6 +59,10 @@ args: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int default: None + index_column: + description: Column to set index to in the load component, if not specified a default globally unique index will be set + type: str + default: None ``` diff --git a/docs/getting_started.md b/docs/getting_started.md index fe8328695..5db78de16 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -106,6 +106,10 @@ args: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int default: None + index_column: + description: Column to set index to in the load component, if not specified a default globally unique index will be set + type: str + default: None ``` This is the component spec of the component we have just added to our pipelines, the only thing we have altered is the `produces` section. We have defined what subsets, fields and types this component produces. diff --git a/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml index c4cc37aa8..100320a9c 100644 --- a/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml +++ b/examples/pipelines/datacomp/components/load_from_hf_hub/fondant_component.yaml @@ -49,4 +49,8 @@ args: n_rows_to_load: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int + default: None + index_column: + description: Column to set index to in the load component, if not specified a default globally unique index will be set + type: str default: None \ No newline at end of file diff --git a/examples/pipelines/datacomp/pipeline.py b/examples/pipelines/datacomp/pipeline.py index 97db22c0f..659714d76 100644 --- a/examples/pipelines/datacomp/pipeline.py +++ b/examples/pipelines/datacomp/pipeline.py @@ -51,7 +51,6 @@ }, node_pool_label="node_pool", node_pool_name="n2-standard-128-pool", - output_partition_size="disable", ) filter_complexity_op = ComponentOp( component_dir="components/filter_text_complexity", diff --git a/examples/pipelines/datacomp/simple_pipeline.py b/examples/pipelines/datacomp/simple_pipeline.py index 30aaecee0..9b307e213 100644 --- a/examples/pipelines/datacomp/simple_pipeline.py +++ b/examples/pipelines/datacomp/simple_pipeline.py @@ -56,7 +56,6 @@ }, node_pool_label="node_pool", node_pool_name="n2-standard-128-pool", - output_partition_size="disable", ) # add ops to pipeline diff --git a/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml b/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml index 9c9e02adf..f79232360 100644 --- a/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml +++ b/examples/pipelines/finetune_stable_diffusion/components/load_from_hf_hub/fondant_component.yaml @@ -29,3 +29,7 @@ args: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int default: None + index_column: + description: Column to set index to in the load component, if not specified a default globally unique index will be set + type: str + default: None diff --git a/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml b/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml index 56d2d3d41..288314b79 100644 --- a/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml +++ b/examples/pipelines/starcoder/components/load_from_hub/fondant_component.yaml @@ -38,3 +38,7 @@ args: description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale type: int default: None + index_column: + description: Column to set index to in the load component, if not specified a default globally unique index will be set + type: str + default: None diff --git a/tests/example_specs/component_specs/kubeflow_component.yaml b/tests/example_specs/component_specs/kubeflow_component.yaml index b63a4b983..b00b0bd19 100644 --- a/tests/example_specs/component_specs/kubeflow_component.yaml +++ b/tests/example_specs/component_specs/kubeflow_component.yaml @@ -16,8 +16,6 @@ inputs: partitioning type: String default: None - type: String - default: None - name: storage_args description: Storage arguments type: String