Skip to content

Commit

Permalink
Modify arg default (#524)
Browse files Browse the repository at this point in the history
  • Loading branch information
PhilippeMoussalli authored Oct 16, 2023
1 parent a259583 commit 4814e2d
Show file tree
Hide file tree
Showing 24 changed files with 408 additions and 373 deletions.
3 changes: 2 additions & 1 deletion components/load_from_hf_hub/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ args:
column_name_mapping:
description: Mapping of the consumed hub dataset to fondant column names
type: dict
default: {}
image_column_names:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
Expand All @@ -23,7 +24,7 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
113 changes: 72 additions & 41 deletions components/load_from_hf_hub/src/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""This component loads a seed dataset from the hub."""
import logging
import typing as t

import dask
import dask.dataframe as dd
Expand All @@ -18,10 +19,10 @@ def __init__(
spec: ComponentSpec,
*_,
dataset_name: str,
column_name_mapping: dict,
image_column_names: list,
n_rows_to_load: int,
index_column: str,
column_name_mapping: t.Optional[dict],
image_column_names: t.Optional[list],
n_rows_to_load: t.Optional[int],
index_column: t.Optional[str],
) -> None:
"""
Args:
Expand All @@ -42,55 +43,45 @@ def __init__(
self.index_column = index_column
self.spec = spec

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the hub...")

def get_columns_to_keep(self) -> t.List[str]:
# Only read required columns
columns = []

invert_column_name_mapping = {v: k for k, v in self.column_name_mapping.items()}
if self.column_name_mapping:
invert_column_name_mapping = {
v: k for k, v in self.column_name_mapping.items()
}
else:
invert_column_name_mapping = {}

for subset_name, subset in self.spec.produces.items():
for field_name, field in subset.fields.items():
subset_field_name = f"{subset_name}_{field_name}"
column_name = invert_column_name_mapping.get(
subset_field_name,
subset_field_name,
)
columns.append(column_name)

logger.debug(f"Columns to keep: {columns}")
dask_df = dd.read_parquet(f"hf://datasets/{self.dataset_name}", columns=columns)

# 2) Make sure images are bytes instead of dicts
if self.image_column_names is not None:
column_name = f"{subset_name}_{field_name}"
if (
invert_column_name_mapping
and column_name in invert_column_name_mapping
):
columns.append(invert_column_name_mapping[column_name])
else:
columns.append(column_name)

if self.index_column is not None:
columns.append(self.index_column)

return columns

def convert_images_to_bytes(self, dask_df) -> dd.DataFrame:
if self.image_column_names:
for image_column_name in self.image_column_names:
dask_df[image_column_name] = dask_df[image_column_name].map(
lambda x: x["bytes"],
meta=("bytes", bytes),
)

# 3) Rename columns
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 4) Optional: only return specific amount of rows
if self.n_rows_to_load > 0:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
if partitions_length >= self.n_rows_to_load:
logger.info(
f"""Required number of partitions to load\n
{self.n_rows_to_load} is {npartitions}""",
)
break
partitions_length += len(partition)
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)
return dask_df

# 5) Set the index
if self.index_column == "None":
def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame:
if self.index_column is None:
logger.info(
"Index column not specified, setting a globally unique index",
)
Expand Down Expand Up @@ -122,3 +113,43 @@ def _get_meta_df() -> pd.DataFrame:
dask_df = dask_df.set_index(self.index_column, drop=True)

return dask_df

def return_subset_of_df(self, dask_df: dd.DataFrame) -> dd.DataFrame:
if self.n_rows_to_load is not None:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
if partitions_length >= self.n_rows_to_load:
logger.info(
f"""Required number of partitions to load\n
{self.n_rows_to_load} is {npartitions}""",
)
break
partitions_length += len(partition)
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)
return dask_df

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the hub...")

columns = self.get_columns_to_keep()

logger.debug(f"Columns to keep: {columns}")
dask_df = dd.read_parquet(f"hf://datasets/{self.dataset_name}", columns=columns)

# 2) Make sure images are bytes instead of dicts
dask_df = self.convert_images_to_bytes(dask_df)

# 3) Rename columns
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 4) Optional: only return specific amount of rows
dask_df = self.return_subset_of_df(dask_df)

# 5) Set the index
dask_df = self.set_df_index(dask_df)

return dask_df
1 change: 0 additions & 1 deletion components/load_from_parquet/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
94 changes: 58 additions & 36 deletions components/load_from_parquet/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def __init__(
spec: ComponentSpec,
*_,
dataset_uri: str,
column_name_mapping: dict,
n_rows_to_load: int,
column_name_mapping: t.Optional[dict],
n_rows_to_load: t.Optional[int],
index_column: t.Optional[str],
) -> None:
"""
Expand All @@ -39,49 +39,34 @@ def __init__(
self.index_column = index_column
self.spec = spec

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the file...")

def get_columns_to_keep(self) -> t.List[str]:
# Only read required columns
columns = []
if self.column_name_mapping is not None:

if self.column_name_mapping:
invert_column_name_mapping = {
v: k for k, v in self.column_name_mapping.items()
}
for subset_name, subset in self.spec.produces.items():
for field_name, field in subset.fields.items():
subset_field_name = f"{subset_name}_{field_name}"
column_name = invert_column_name_mapping.get(
subset_field_name,
subset_field_name,
)
columns.append(column_name)
else:
invert_column_name_mapping = {}

logger.debug(f"Columns to keep: {columns}")
dask_df = dd.read_parquet(self.dataset_uri, columns=columns)
for subset_name, subset in self.spec.produces.items():
for field_name, field in subset.fields.items():
column_name = f"{subset_name}_{field_name}"
if (
invert_column_name_mapping
and column_name in invert_column_name_mapping
):
columns.append(invert_column_name_mapping[column_name])
else:
columns.append(column_name)

# 2) Rename columns
if self.column_name_mapping:
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)
if self.index_column is not None:
columns.append(self.index_column)

# 3) Optional: only return specific amount of rows
if self.n_rows_to_load > 0:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
if partitions_length >= self.n_rows_to_load:
logger.info(
f"""Required number of partitions to load\n
{self.n_rows_to_load} is {npartitions}""",
)
break
partitions_length += len(partition)
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)
return columns

# 4) Set the index
def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame:
if self.index_column is None:
logger.info(
"Index column not specified, setting a globally unique index",
Expand Down Expand Up @@ -114,3 +99,40 @@ def _get_meta_df() -> pd.DataFrame:
dask_df = dask_df.set_index(self.index_column, drop=True)

return dask_df

def return_subset_of_df(self, dask_df: dd.DataFrame) -> dd.DataFrame:
if self.n_rows_to_load is not None:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
if partitions_length >= self.n_rows_to_load:
logger.info(
f"""Required number of partitions to load\n
{self.n_rows_to_load} is {npartitions}""",
)
break
partitions_length += len(partition)
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)
return dask_df

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the hub...")

columns = self.get_columns_to_keep()

logger.debug(f"Columns to keep: {columns}")
dask_df = dd.read_parquet(self.dataset_uri, columns=columns)

# 2) Rename columns
if self.column_name_mapping:
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 4) Optional: only return specific amount of rows
dask_df = self.return_subset_of_df(dask_df)

# 5) Set the index
dask_df = self.set_df_index(dask_df)
return dask_df
6 changes: 3 additions & 3 deletions components/write_to_hf_hub/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def __init__(
hf_token: str,
username: str,
dataset_name: str,
image_column_names: list,
column_name_mapping: dict,
image_column_names: t.Optional[list],
column_name_mapping: t.Optional[dict],
):
"""
Args:
Expand Down Expand Up @@ -91,7 +91,7 @@ def write(
# Map image column to hf data format
feature_encoder = datasets.Image(decode=True)

if self.image_column_names:
if self.image_column_names is not None:
for image_column_name in self.image_column_names:
dataframe[image_column_name] = dataframe[image_column_name].map(
lambda x: convert_bytes_to_image(x, feature_encoder),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ args:
n_records_to_download:
description: Number of records to download
type: int
default: -1
default: None
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(
self,
*_,
common_crawl_indices: t.List[str],
n_records_to_download: int,
n_records_to_download: t.Optional[int] = None,
):
self.index_urls = [
self.build_index_url(index_name) for index_name in common_crawl_indices
Expand All @@ -38,7 +38,7 @@ def load(self) -> dd.DataFrame:
warc_urls.extend([line.decode() for line in extracted.split(b"\n")])

df = pd.DataFrame(warc_urls, columns=["warc_url"])
if self.n_records_to_download > 0:
if self.n_records_to_download is not None:
df = df.head(self.n_records_to_download)

return dd.from_pandas(df, npartitions=len(df) // 100)
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def load(self) -> dd.DataFrame:

pandas_df = pd.DataFrame(prompts, columns=["prompts_text"])

if self.n_rows_to_load > 0:
if self.n_rows_to_load:
pandas_df = pandas_df.head(self.n_rows_to_load)

df = dd.from_pandas(pandas_df, npartitions=1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ args:
column_name_mapping:
description: Mapping of the consumed hub dataset to fondant column names
type: dict
default: {}
image_column_names:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
Expand All @@ -54,7 +55,7 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ args:
column_name_mapping:
description: Mapping of the consumed hub dataset to fondant column names
type: dict
default: {}
image_column_names:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
Expand All @@ -35,7 +36,6 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
Loading

0 comments on commit 4814e2d

Please sign in to comment.