diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index c7f6bacd244..6766f53536e 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -98,8 +98,19 @@ def apply(self, func: Callable, *args, **kwargs): Note: the function is not applied immediately but is added to the execution tree. """ - de = DeferredExecution(self._data_ref, func, args, kwargs) - return self.__constructor__(de) + data = self._data_ref + if not isinstance(data, DeferredExecution): + flat_args = not has_list_or_de(args) + flat_kwargs = not has_list_or_de(kwargs.values()) + if flat_args and flat_kwargs: + result = remote_exec_func.remote(func, data, *args, **kwargs) + return self.__constructor__(*result) + de = DeferredExecution(data, func, args, kwargs, 1, flat_args, flat_kwargs) + else: + de = DeferredExecution(data, func, args, kwargs) + part = self.__constructor__(de) + part.drain_call_queue() + return part @_inherit_docstrings(PandasDataframePartition.add_to_apply_calls) def add_to_apply_calls(self, func, *args, length=None, width=None, **kwargs):