From d7ec9ac244673e13384f50d1f608554e28e0bc65 Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Fri, 8 Dec 2023 11:37:48 +0100 Subject: [PATCH] Non-lazy apply() --- .../pandas_on_ray/partitioning/partition.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index c7f6bacd244..6766f53536e 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -98,8 +98,19 @@ def apply(self, func: Callable, *args, **kwargs): Note: the function is not applied immediately but is added to the execution tree. """ - de = DeferredExecution(self._data_ref, func, args, kwargs) - return self.__constructor__(de) + data = self._data_ref + if not isinstance(data, DeferredExecution): + flat_args = not has_list_or_de(args) + flat_kwargs = not has_list_or_de(kwargs.values()) + if flat_args and flat_kwargs: + result = remote_exec_func.remote(func, data, *args, **kwargs) + return self.__constructor__(*result) + de = DeferredExecution(data, func, args, kwargs, 1, flat_args, flat_kwargs) + else: + de = DeferredExecution(data, func, args, kwargs) + part = self.__constructor__(de) + part.drain_call_queue() + return part @_inherit_docstrings(PandasDataframePartition.add_to_apply_calls) def add_to_apply_calls(self, func, *args, length=None, width=None, **kwargs):