Skip to content

Commit

Permalink
Skip transformation of partition if partition is empty (#908)
Browse files Browse the repository at this point in the history
In the case that a `PandasTransformComponent` receives an empty dask
partition, the execution fails. There could be several reasons why a
partition is empty, and it may depend on a custom implementation. We
should catch this case and not halt the component's execution.
This PR adds a check to see if the `wrapped_transform` receives an empty
partition. If it does, we skip the execution and return the received
dataframe.

---------

Co-authored-by: Robbe Sneyders <[email protected]>
  • Loading branch information
mrchtr and RobbeSneyders authored Apr 3, 2024
1 parent 2464ef0 commit ad7191e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
12 changes: 8 additions & 4 deletions src/fondant/component/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ def wrap_transform(
operation_spec: OperationSpec,
) -> t.Callable:
"""Factory that creates a function to wrap the component transform function. The wrapper:
- Skips the transformation if the received partition is empty
- Removes extra columns from the returned dataframe which are not defined in the component
spec `produces` section
- Sorts the columns from the returned dataframe according to the order in the component
Expand All @@ -491,14 +492,17 @@ def wrap_transform(
"""

def wrapped_transform(dataframe: pd.DataFrame) -> pd.DataFrame:
# Call transform method
dataframe = transform(dataframe)

# Drop columns not in specification
# Columns of operation specification
columns = [
name for name, field in operation_spec.operation_produces.items()
]

if not dataframe.empty:
dataframe = transform(dataframe)
else:
logger.info("Received empty partition, skipping transformation.")

# Drop columns not in specification
return dataframe[columns]

return wrapped_transform
Expand Down
29 changes: 29 additions & 0 deletions tests/component/test_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,32 @@ def write(self, dataframe):
with mock.patch.object(MyWriteComponent, "write", write):
executor.execute(MyWriteComponent)
write.mock.assert_called_once()


def test_skipping_empty_partition():
# Create an empty dataframe to simulate empty partitions
input_df = pd.DataFrame.from_dict(
{
"image_height": [],
"image_width": [],
"caption_text": [],
},
)

def transform(dataframe: pd.DataFrame) -> pd.DataFrame:
msg = "This should not be called"
raise ValueError(msg)

wrapped_transform = PandasTransformExecutor.wrap_transform(
transform,
operation_spec=OperationSpec(
ComponentSpec(
name="dummy-spec",
image="dummy-image",
description="dummy-description",
),
),
)

output_df = wrapped_transform(input_df)
assert output_df.equals(pd.DataFrame())

0 comments on commit ad7191e

Please sign in to comment.