Skip to content

Commit

Permalink
Remove output_partition_size argument and logic (#355)
Browse files Browse the repository at this point in the history
PR to remove automatic logic of repartitioning the output based on size
  • Loading branch information
PhilippeMoussalli authored Aug 16, 2023
1 parent e3e078d commit 757033e
Show file tree
Hide file tree
Showing 17 changed files with 400 additions and 440 deletions.
1 change: 0 additions & 1 deletion components/load_from_files/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ load_from_files = ComponentOp(
"directory_uri": "./data.zip", # change this to your
# directory_uri, remote or local
},
output_partition_size="10MB",
)

my_pipeline.add_op(load_from_files, dependencies=[])
Expand Down
2 changes: 0 additions & 2 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ caption_images_op = ComponentOp(
"max_new_tokens": 50,
},
input_partition_rows='disable',
output_partition_size='disable',
)
```

Expand All @@ -105,7 +104,6 @@ caption_images_op = ComponentOp(
"max_new_tokens": 50,
},
input_partition_rows=100,
output_partition_size="10MB",
)
```

Expand Down
1 change: 0 additions & 1 deletion examples/pipelines/controlnet-interior-design/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
generate_prompts_op = ComponentOp(
component_dir="components/generate_prompts",
arguments={"n_rows_to_load": None},
output_partition_size="disable",
)
laion_retrieval_op = ComponentOp.from_registry(
name="prompt_based_laion_retrieval",
Expand Down
9 changes: 0 additions & 9 deletions src/fondant/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,6 @@ def from_fondant_component_spec(
"type": "String",
"default": "None",
},
{
"name": "output_partition_size",
"description": "The size of the output partition size, defaults"
" to 250MB. Set to `disable` to disable the automatic partitioning",
"type": "String",
"default": "None",
},
*(
{
"name": arg.name,
Expand Down Expand Up @@ -301,8 +294,6 @@ def from_fondant_component_spec(
{"inputValue": "component_spec"},
"--input_partition_rows",
{"inputValue": "input_partition_rows"},
"--output_partition_size",
{"inputValue": "output_partition_size"},
*cls._dump_args(fondant_component.args.values()),
"--output_manifest_path",
{"outputPath": "output_manifest_path"},
Expand Down
38 changes: 0 additions & 38 deletions src/fondant/data_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,50 +153,12 @@ def __init__(
*,
manifest: Manifest,
component_spec: ComponentSpec,
output_partition_size: t.Optional[t.Union[str]] = None,
):
super().__init__(manifest=manifest, component_spec=component_spec)

self.output_partition_size = output_partition_size

def partition_written_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame:
"""
Function that partitions the written dataframe to smaller partitions based on a given
partition size.
"""
if self.output_partition_size != "disable":
if isinstance(self.output_partition_size, str):
dataframe = dataframe.repartition(
partition_size=self.output_partition_size,
)
logger.info(
f"Repartitioning the written data such that the size per partition is approx."
f" {self.output_partition_size}",
)

elif self.output_partition_size is None:
dataframe = dataframe.repartition(partition_size="250MB")
logger.info(
"Repartitioning the written data such that the size per partition is approx."
" 250MB. (Automatic repartitioning)",
)
else:
msg = (
f"{self.output_partition_size} is not a valid argument. Choose either the"
f" number of size of the partition (e.g. '250Mb' or set to 'disable' to"
f" disable automated partitioning"
)
raise ValueError(
msg,
)

return dataframe

def write_dataframe(self, dataframe: dd.DataFrame) -> None:
write_tasks = []

dataframe = self.partition_written_dataframe(dataframe)

dataframe.index = dataframe.index.rename("id").astype("string")

# Turn index into an empty dataframe so we can write it
Expand Down
13 changes: 1 addition & 12 deletions src/fondant/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from fondant.component_spec import Argument, ComponentSpec, kubeflow2python_type
from fondant.data_io import DaskDataLoader, DaskDataWriter
from fondant.manifest import Manifest
from fondant.schema import validate_partition_number, validate_partition_size
from fondant.schema import validate_partition_number

logger = logging.getLogger(__name__)

Expand All @@ -42,23 +42,20 @@ def __init__(
metadata: t.Dict[str, t.Any],
user_arguments: t.Dict[str, t.Any],
input_partition_rows: t.Optional[t.Union[str, int]] = None,
output_partition_size: t.Optional[str] = None,
) -> None:
self.spec = spec
self.input_manifest_path = input_manifest_path
self.output_manifest_path = output_manifest_path
self.metadata = metadata
self.user_arguments = user_arguments
self.input_partition_rows = input_partition_rows
self.output_partition_size = output_partition_size

@classmethod
def from_args(cls) -> "Executor":
"""Create an executor from a passed argument containing the specification as a dict."""
parser = argparse.ArgumentParser()
parser.add_argument("--component_spec", type=json.loads)
parser.add_argument("--input_partition_rows", type=validate_partition_number)
parser.add_argument("--output_partition_size", type=validate_partition_size)
args, _ = parser.parse_known_args()

if "component_spec" not in args:
Expand All @@ -67,20 +64,17 @@ def from_args(cls) -> "Executor":

component_spec = ComponentSpec(args.component_spec)
input_partition_rows = args.input_partition_rows
output_partition_size = args.output_partition_size

return cls.from_spec(
component_spec,
input_partition_rows,
output_partition_size,
)

@classmethod
def from_spec(
cls,
component_spec: ComponentSpec,
input_partition_rows: t.Optional[t.Union[str, int]],
output_partition_size: t.Optional[str],
) -> "Executor":
"""Create an executor from a component spec."""
args_dict = vars(cls._add_and_parse_args(component_spec))
Expand All @@ -91,9 +85,6 @@ def from_spec(
if "input_partition_rows" in args_dict:
args_dict.pop("input_partition_rows")

if "output_partition_size" in args_dict:
args_dict.pop("output_partition_size")

input_manifest_path = args_dict.pop("input_manifest_path")
output_manifest_path = args_dict.pop("output_manifest_path")
metadata = args_dict.pop("metadata")
Expand All @@ -106,7 +97,6 @@ def from_spec(
metadata=metadata,
user_arguments=args_dict,
input_partition_rows=input_partition_rows,
output_partition_size=output_partition_size,
)

@classmethod
Expand Down Expand Up @@ -182,7 +172,6 @@ def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest):
data_writer = DaskDataWriter(
manifest=manifest,
component_spec=self.spec,
output_partition_size=self.output_partition_size,
)

data_writer.write_dataframe(dataframe)
Expand Down
12 changes: 1 addition & 11 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from fondant.exceptions import InvalidPipelineDefinition
from fondant.import_utils import is_kfp_available
from fondant.manifest import Manifest
from fondant.schema import validate_partition_number, validate_partition_size
from fondant.schema import validate_partition_number

if is_kfp_available():
from kubernetes import client as k8s_client
Expand All @@ -32,8 +32,6 @@ class ComponentOp:
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
output_partition_size: the size of the output written dataset. Defaults to 250MB,
set to "disable" to disable automatic repartitioning of the output,
number_of_gpus: The number of gpus to assign to the operation
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
Expand Down Expand Up @@ -61,7 +59,6 @@ def __init__(
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[str, int]] = None,
output_partition_size: t.Optional[str] = None,
number_of_gpus: t.Optional[int] = None,
node_pool_label: t.Optional[str] = None,
node_pool_name: t.Optional[str] = None,
Expand All @@ -70,7 +67,6 @@ def __init__(
) -> None:
self.component_dir = Path(component_dir)
self.input_partition_rows = input_partition_rows
self.output_partitioning_size = output_partition_size
self.arguments = self._set_arguments(arguments)

self.component_spec = ComponentSpec.from_file(
Expand All @@ -96,10 +92,8 @@ def _set_arguments(
arguments = arguments or {}

input_partition_rows = validate_partition_number(self.input_partition_rows)
output_partition_size = validate_partition_size(self.output_partitioning_size)

arguments["input_partition_rows"] = str(input_partition_rows)
arguments["output_partition_size"] = str(output_partition_size)

return arguments

Expand Down Expand Up @@ -128,7 +122,6 @@ def from_registry(
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
output_partition_size: t.Optional[str] = None,
number_of_gpus: t.Optional[int] = None,
node_pool_label: t.Optional[str] = None,
node_pool_name: t.Optional[str] = None,
Expand All @@ -142,8 +135,6 @@ def from_registry(
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
output_partition_size: the size of the output written dataset. Defaults to 250MB,
set to "disable" to disable automatic repartitioning of the output,
number_of_gpus: The number of gpus to assign to the operation
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
Expand All @@ -163,7 +154,6 @@ def from_registry(
components_dir,
arguments=arguments,
input_partition_rows=input_partition_rows,
output_partition_size=output_partition_size,
number_of_gpus=number_of_gpus,
node_pool_label=node_pool_label,
node_pool_name=node_pool_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ services:
- a dummy string arg
- --input_partition_rows
- disable
- --output_partition_size
- disable
- --component_spec
- '{"name": "First component", "description": "This is an example component",
"image": "example_component:latest", "produces": {"images": {"fields": {"data":
Expand All @@ -35,8 +33,6 @@ services:
- a dummy string arg
- --input_partition_rows
- '10'
- --output_partition_size
- 30MB
- --component_spec
- '{"name": "Second component", "description": "This is an example component",
"image": "example_component:latest", "consumes": {"images": {"fields": {"data":
Expand Down Expand Up @@ -64,8 +60,6 @@ services:
- '[1, 2, 3]'
- --input_partition_rows
- None
- --output_partition_size
- None
- --component_spec
- '{"name": "Third component", "description": "This is an example component",
"image": "example_component:latest", "consumes": {"images": {"fields": {"data":
Expand All @@ -81,4 +75,4 @@ services:
second_component:
condition: service_completed_successfully
volumes: []
version: '3.8'
version: '3.8'
Loading

0 comments on commit 757033e

Please sign in to comment.