Skip to content

Commit

Permalink
Add index_column and unique index creation to load_from_hf_hub comp…
Browse files Browse the repository at this point in the history
…onent (#345)

Based on
[this](https://stackoverflow.com/questions/47571715/dask-create-strictly-increasing-index)

PR that adds a monotonically increasing index to the initial loaded
dataframe by default. This is beneficial in case the dataset does not
come with a given reliable index. Resetting the index should be avoided
since it can assign duplicate indices across partitions.

The user can always disable the automatic partitioning in case they have
a pre-defined index column they want to set or they want to create their
own initial custom index.
  • Loading branch information
PhilippeMoussalli authored Aug 21, 2023
1 parent edadd15 commit 939d638
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 10 deletions.
4 changes: 4 additions & 0 deletions components/load_from_hf_hub/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
default: None
33 changes: 27 additions & 6 deletions components/load_from_hf_hub/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import typing as t

import dask.dataframe as dd
import pandas as pd
from fondant.component import DaskLoadComponent
from fondant.executor import DaskLoadExecutor

Expand All @@ -16,7 +17,8 @@ def __init__(self, *_,
column_name_mapping: dict,
image_column_names: t.Optional[list],
n_rows_to_load: t.Optional[int],
) -> None:
index_column:t.Optional[str],
) -> None:
"""
Args:
dataset_name: name of the dataset to load.
Expand All @@ -25,11 +27,14 @@ def __init__(self, *_,
format the image from HF hub format to a byte string
n_rows_to_load: optional argument that defines the number of rows to load. Useful for
testing pipeline runs on a small scale.
index_column: Column to set index to in the load component, if not specified a default
globally unique index will be set.
"""
self.dataset_name = dataset_name
self.column_name_mapping = column_name_mapping
self.image_column_names = image_column_names
self.n_rows_to_load = n_rows_to_load
self.index_column = index_column

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
Expand Down Expand Up @@ -59,11 +64,27 @@ def load(self) -> dd.DataFrame:
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)

# Set monotonically increasing index
logger.info("Setting the index...")
dask_df["id"] = 1
dask_df["id"] = dask_df.id.cumsum()
dask_df = dask_df.set_index("id", sort=True)
# 4) Set the index
if self.index_column is None:
logger.info(
"Index column not specified, setting a globally unique index",
)

def _set_unique_index(dataframe: pd.DataFrame, partition_info=None):
"""Function that sets a unique index based on the partition and row number."""
dataframe["id"] = 1
dataframe["id"] = (
str(partition_info["number"])
+ "_"
+ (dataframe.id.cumsum()).astype(str)
)
dataframe.index = dataframe.pop("id")
return dataframe

dask_df = dask_df.map_partitions(_set_unique_index, meta=dask_df.head())
else:
logger.info(f"Setting `{self.index_column}` as index")
dask_df = dask_df.set_index(self.index_column, drop=True)

return dask_df

Expand Down
4 changes: 4 additions & 0 deletions docs/generic_component.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ args:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
default: None

```

Expand Down
4 changes: 4 additions & 0 deletions docs/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ args:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
default: None
```
This is the component spec of the component we have just added to our pipelines, the only thing we have altered is the `produces` section. We have defined what subsets, fields and types this component produces.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,8 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
default: None
1 change: 0 additions & 1 deletion examples/pipelines/datacomp/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
},
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool",
output_partition_size="disable",
)
filter_complexity_op = ComponentOp(
component_dir="components/filter_text_complexity",
Expand Down
1 change: 0 additions & 1 deletion examples/pipelines/datacomp/simple_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
},
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool",
output_partition_size="disable",
)

# add ops to pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ args:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
default: None
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ args:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
default: None
2 changes: 0 additions & 2 deletions tests/example_specs/component_specs/kubeflow_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ inputs:
partitioning
type: String
default: None
type: String
default: None
- name: storage_args
description: Storage arguments
type: String
Expand Down

0 comments on commit 939d638

Please sign in to comment.