Skip to content

Commit

Permalink
Split component implementation and execution (#302)
Browse files Browse the repository at this point in the history
This PR follows up on the PoC presented in #268

---

Fixes #257 

It splits the implementation and execution of components, this has some
advantages:

- Pandas components can use `__init__` instead of setup, which is
probably more familiar to users
- Other components can use `__init__` as well instead of receiving all
arguments to their transform or equivalent method, aligning
implementation of different component types
- Component implementation and execution should be easier to test
separately

I borrowed the executor terminology from KfP.

---

Fixes #203 

Since I had to update all the components, I also switched some of them
to subclass `PandasTransformComponent` instead of
`DaskTransformComponent`.

---

These changes open some opportunities for additional improvements, but I
propose to tackle those as separate PRs as this PR is already quite huge
due to all the changes to the components.

- [ ] #300
- [ ] #301
  • Loading branch information
RobbeSneyders authored Jul 18, 2023
1 parent 7d43515 commit 6d2251e
Show file tree
Hide file tree
Showing 29 changed files with 799 additions and 669 deletions.
33 changes: 24 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,24 +222,39 @@ args:
type: str
```
Once you have your component specification, all you need to do is implement a single `.transform`
method and Fondant will do the rest. You will get the data defined in your specification as a
[Dask](https://www.dask.org/) dataframe, which is evaluated lazily.
Once you have your component specification, all you need to do is implement a constructor
and a single `.transform` method and Fondant will do the rest. You will get the data defined in
your specification partition by partition as a Pandas dataframe.

```python
from fondant.component import TransformComponent
import pandas as pd
from fondant.component import PandasTransformComponent
from fondant.executor import PandasTransformExecutor
class ExampleComponent(TransformComponent):
def transform(self, dataframe, *, argument1, argument2):
"""Implement your custom logic in this single method
class ExampleComponent(PandasTransformComponent):
def __init__(self, *args, argument1, argument2) -> None:
"""
Args:
dataframe: A Dask dataframe containing the data
argumentX: An argument passed to the component
"""
# Initialize your component here based on the arguments
def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""Implement your custom logic in this single method
Args:
dataframe: A Pandas dataframe containing the data
Returns:
A pandas dataframe containing the transformed data
"""
if __name__ == "__main__":
executor = PandasTransformExecutor.from_args()
executor.execute(ExampleComponent)
```

For more advanced use cases, you can use the `DaskTransformComponent` instead.

### Running your pipeline

Expand Down
7 changes: 4 additions & 3 deletions components/caption_images/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas as pd
import torch
from fondant.component import PandasTransformComponent
from fondant.executor import PandasTransformExecutor
from PIL import Image
from transformers import BatchEncoding, BlipForConditionalGeneration, BlipProcessor

Expand Down Expand Up @@ -52,7 +53,7 @@ def caption_image_batch(
class CaptionImagesComponent(PandasTransformComponent):
"""Component that captions images using a model from the Hugging Face hub."""

def setup(self, *, model_id: str, batch_size: int, max_new_tokens: int) -> None:
def __init__(self, *args, model_id: str, batch_size: int, max_new_tokens: int) -> None:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Device: {self.device}")

Expand Down Expand Up @@ -85,5 +86,5 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:


if __name__ == "__main__":
component = CaptionImagesComponent.from_args()
component.run()
executor = PandasTransformExecutor.from_args()
executor.execute(CaptionImagesComponent)
48 changes: 27 additions & 21 deletions components/download_images/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import dask.dataframe as dd
from fondant.component import DaskTransformComponent
from fondant.executor import DaskTransformExecutor
from resizer import Resizer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -88,21 +89,19 @@ def download_image_with_retry(
class DownloadImagesComponent(DaskTransformComponent):
"""Component that downloads images based on URLs."""

def transform(
self,
dataframe: dd.DataFrame,
*,
timeout: int,
retries: int,
image_size: int,
resize_mode: str,
resize_only_if_bigger: bool,
min_image_size: int,
max_aspect_ratio: float,
) -> dd.DataFrame:
"""Function that downloads images from a list of URLs and executes filtering and resizing
def __init__(self,
*_,
timeout: int,
retries: int,
image_size: int,
resize_mode: str,
resize_only_if_bigger: bool,
min_image_size: int,
max_aspect_ratio: float,
):
"""Component that downloads images from a list of URLs and executes filtering and resizing.
Args:
dataframe: Dask dataframe
timeout: Maximum time (in seconds) to wait when trying to download an image.
retries: Number of times to retry downloading an image if it fails.
image_size: Size of the images after resizing.
Expand All @@ -114,24 +113,31 @@ def transform(
Returns:
Dask dataframe
"""
logger.info("Instantiating resizer...")
resizer = Resizer(
self.timeout = timeout
self.retries = retries
self.resizer = Resizer(
image_size=image_size,
resize_mode=resize_mode,
resize_only_if_bigger=resize_only_if_bigger,
min_image_size=min_image_size,
max_aspect_ratio=max_aspect_ratio,
)

def transform(
self,
dataframe: dd.DataFrame,
) -> dd.DataFrame:
logger.info("Instantiating resizer...")

# Remove duplicates from laion retrieval
dataframe = dataframe.drop_duplicates()

dataframe = dataframe.apply(
lambda example: download_image_with_retry(
url=example.images_url,
timeout=timeout,
retries=retries,
resizer=resizer,
timeout=self.timeout,
retries=self.retries,
resizer=self.resizer,
),
axis=1,
result_type="expand",
Expand All @@ -150,5 +156,5 @@ def transform(


if __name__ == "__main__":
component = DownloadImagesComponent.from_args()
component.run()
executor = DaskTransformExecutor.from_args()
executor.execute(DownloadImagesComponent)
7 changes: 4 additions & 3 deletions components/embedding_based_laion_retrieval/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import pandas as pd
from clip_client import ClipClient, Modality
from fondant.component import PandasTransformComponent
from fondant.executor import PandasTransformExecutor

logger = logging.getLogger(__name__)


class LAIONRetrievalComponent(PandasTransformComponent):
"""Component that retrieves image URLs from LAION-5B based on a set of CLIP embeddings."""

def setup(
def __init__(
self,
*,
num_images: int,
Expand Down Expand Up @@ -70,5 +71,5 @@ async def async_query():


if __name__ == "__main__":
component = LAIONRetrievalComponent.from_args()
component.run()
executor = PandasTransformExecutor.from_args()
executor.execute(LAIONRetrievalComponent)
48 changes: 21 additions & 27 deletions components/filter_comments/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,35 @@
"""
import logging

import dask.dataframe as dd
from fondant.component import DaskTransformComponent
import pandas as pd
from fondant.component import PandasTransformComponent
from fondant.executor import PandasTransformExecutor
from utils.text_extraction import get_comments_to_code_ratio

logger = logging.getLogger(__name__)


class FilterCommentsComponent(DaskTransformComponent):
"""Component that filters instances based on code to comments ratio."""
class FilterCommentsComponent(PandasTransformComponent):
"""Component that filters instances based on code to comments ratio.
Args:
min_comments_ratio: The minimum code to comment ratio
max_comments_ratio: The maximum code to comment ratio
"""

def __init__(self, *args, min_comments_ratio: float, max_comments_ratio: float) -> None:
self.min_comments_ratio = min_comments_ratio
self.max_comments_ratio = max_comments_ratio

def transform(
self,
*,
dataframe: dd.DataFrame,
min_comments_ratio: float,
max_comments_ratio: float,
) -> dd.DataFrame:
"""
Args:
dataframe: Dask dataframe
min_comments_ratio: The minimum code to comment ratio
max_comments_ratio: The maximum code to comment ratio
Returns:
Filtered dask dataframe.
"""
# Apply the function to the desired column and filter the DataFrame
return dataframe[
dataframe["code_content"].map_partitions(
lambda example: example.map(get_comments_to_code_ratio).between(
min_comments_ratio, max_comments_ratio,
),
)
]
dataframe: pd.DataFrame,
) -> pd.DataFrame:
comment_to_code_ratio = dataframe["code"]["content"].apply(get_comments_to_code_ratio)
mask = comment_to_code_ratio.between(self.min_comments_ratio, self.max_comments_ratio)
return dataframe[mask]


if __name__ == "__main__":
component = FilterCommentsComponent.from_args()
component.run()
executor = PandasTransformExecutor.from_args()
executor.execute(FilterCommentsComponent)
21 changes: 9 additions & 12 deletions components/filter_image_resolution/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,24 @@
import numpy as np
import pandas as pd
from fondant.component import PandasTransformComponent
from fondant.executor import PandasTransformExecutor

logger = logging.getLogger(__name__)


class FilterImageResolutionComponent(PandasTransformComponent):
"""Component that filters images based on height and width."""

def setup(self, *, min_image_dim: int, max_aspect_ratio: float) -> None:
self.min_image_dim = min_image_dim
self.max_aspect_ratio = max_aspect_ratio

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
def __init__(self, *_, min_image_dim: int, max_aspect_ratio: float) -> None:
"""
Args:
dataframe: Pandas dataframe
min_image_dim: minimum image dimension.
min_aspect_ratio: minimum aspect ratio.
Returns:
filtered Pandas dataframe
max_aspect_ratio: maximum aspect ratio.
"""
self.min_image_dim = min_image_dim
self.max_aspect_ratio = max_aspect_ratio

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
width = dataframe["image"]["width"]
height = dataframe["image"]["height"]
min_image_dim = np.minimum(width, height)
Expand All @@ -38,5 +35,5 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:


if __name__ == "__main__":
component = FilterImageResolutionComponent.from_args()
component.run()
executor = PandasTransformExecutor.from_args()
executor.execute(FilterImageResolutionComponent)
37 changes: 20 additions & 17 deletions components/filter_line_length/src/main.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,45 @@
"""This component filters code based on a set of metadata associated with it."""
import logging

import dask.dataframe as dd
from fondant.component import DaskTransformComponent
import pandas as pd
from fondant.component import PandasTransformComponent
from fondant.executor import PandasTransformExecutor

logger = logging.getLogger(__name__)


class FilterLineLengthComponent(DaskTransformComponent):
class FilterLineLengthComponent(PandasTransformComponent):
"""
This component filters code based on a set of metadata associated with it:
average line length, maximum line length and alphanum fraction.
"""

def transform(
self,
*,
dataframe: dd.DataFrame,
def __init__(self, *_,
avg_line_length_threshold: int,
max_line_length_threshold: int,
alphanum_fraction_threshold: float,
) -> dd.DataFrame:
) -> None:
"""
Args:
dataframe: Dask dataframe
avg_line_length_threshold: Threshold for average line length to filter on
max_line_length_threshold: Threshold for max line length to filter on
alphanum_fraction_threshold: Alphanum fraction to filter on
Returns:
Filtered dask dataframe.
alphanum_fraction_threshold: Alphanum fraction to filter on.
"""
self.avg_line_length_threshold = avg_line_length_threshold
self.max_line_length_threshold = max_line_length_threshold
self.alphanum_fraction_threshold = alphanum_fraction_threshold

def transform(
self,
dataframe: pd.DataFrame,
) -> pd.DataFrame:
return dataframe[
(dataframe["code_avg_line_length"] > avg_line_length_threshold)
& (dataframe["code_max_line_length"] > max_line_length_threshold)
& (dataframe["code_alphanum_fraction"] > alphanum_fraction_threshold)
(dataframe["code_avg_line_length"] > self.avg_line_length_threshold)
& (dataframe["code_max_line_length"] > self.max_line_length_threshold)
& (dataframe["code_alphanum_fraction"] > self.alphanum_fraction_threshold)
]


if __name__ == "__main__":
component = FilterLineLengthComponent.from_args()
component.run()
executor = PandasTransformExecutor.from_args()
executor.execute(FilterLineLengthComponent)
Loading

0 comments on commit 6d2251e

Please sign in to comment.