Skip to content

Commit

Permalink
FEAT-#5221: add execute to trigger lazy computations and wait for t…
Browse files Browse the repository at this point in the history
…hem to complete (#6648)

Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev authored Oct 25, 2023
1 parent 19615af commit e558d9d
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 13 deletions.
1 change: 1 addition & 0 deletions docs/development/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ details. The documentation covers most modules, with more docs being added every
├───examples
├───modin
│ ├─── :doc:`config </flow/modin/config>`
| ├─── :doc:`utils </flow/modin/utils>`
│ ├───core
│ │ ├─── :doc:`dataframe </flow/modin/core/dataframe/index>`
│ │ │ ├─── :doc:`algebra </flow/modin/core/dataframe/algebra>`
Expand Down
12 changes: 12 additions & 0 deletions docs/flow/modin/utils.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
:orphan:

Modin Utils
"""""""""""

Here are utilities that can be useful when working with Modin.

Public API
''''''''''

.. autofunction:: modin.utils.try_cast_to_pandas
.. autofunction:: modin.utils.execute
22 changes: 9 additions & 13 deletions docs/usage_guide/benchmarking.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ To benchmark a single Modin function, often turning on the
:code:`BenchmarkMode` will suffice.

There is no simple way to benchmark more complex Modin workflows, though
benchmark mode or calling ``repr`` on Modin objects may be useful. The
:doc:`Modin logs </usage_guide/advanced_usage/modin_logging>` may help you
benchmark mode or calling ``modin.utils.execute`` on Modin objects may be useful.
The :doc:`Modin logs </usage_guide/advanced_usage/modin_logging>` may help you
identify bottlenecks in your code, and they may also help profile the execution
of each Modin function.

Expand Down Expand Up @@ -125,9 +125,8 @@ at each Modin :doc:`layer </development/architecture>`. Log mode is more
useful when used in conjuction with benchmark mode.

Sometimes, if you don't have a natural end-point to your workflow, you can
just call ``repr`` on the workflow's final Modin objects. That will typically
block on any asynchronous computation. However, beware that ``repr`` can also
be misleading, e.g. here:
just call ``modin.utils.execute`` on the workflow's final Modin objects.
That will typically block on any asynchronous computation:

.. code-block:: python
Expand All @@ -137,6 +136,7 @@ be misleading, e.g. here:
import modin.pandas as pd
from modin.config import MinPartitionSize, NPartitions
import modin.utils
MinPartitionSize.put(32)
NPartitions.put(16)
Expand All @@ -149,17 +149,13 @@ be misleading, e.g. here:
ray.init()
df1 = pd.DataFrame(list(range(10_000)), columns=['A'])
result = df1.map(slow_add_one)
%time repr(result)
# time.sleep(10)
# %time modin.utils.execute(result)
%time result.to_parquet(BytesIO())
.. code-block::python
The ``repr`` takes only 802 milliseconds, but writing the result to a buffer
takes 9.84 seconds. However, if you uncomment the :code:`time.sleep` before the
:code:`to_parquet` call, the :code:`to_parquet` takes just 23.8 milliseconds!
The problem is that the ``repr`` blocks only on getting the first few and the
last few rows, but the slow execution is for row 5001, which Modin is
computing asynchronously in the background even after ``repr`` finishes.
Writing the result to a buffer takes 9.84 seconds. However, if you uncomment
the :code:`%time modin.utils.execute(result)` before the :code:`to_parquet`
call, the :code:`to_parquet` takes just 23.8 milliseconds!

.. note::
If you see any Modin documentation touting Modin's speed without using
Expand Down
4 changes: 4 additions & 0 deletions modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ def __init__(self, modin_frame):
def finalize(self):
self._modin_frame.finalize()

def execute(self):
self.finalize()
self._modin_frame.wait_computations()

@classmethod
def from_pandas(cls, df, data_cls):
return cls(data_cls.from_pandas(df))
Expand Down
4 changes: 4 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4106,6 +4106,10 @@ def finalize(self):
"""
self._partition_mgr_cls.finalize(self._partitions)

def wait_computations(self):
"""Wait for all computations to complete without materializing data."""
self._partition_mgr_cls.wait_partitions(self._partitions.flatten())

def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True):
"""
Get a Modin DataFrame that implements the dataframe exchange protocol.
Expand Down
5 changes: 5 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ def finalize(self):
"""Finalize constructing the dataframe calling all deferred functions which were used to build it."""
pass

@abc.abstractmethod
def execute(self):
"""Wait for all computations to complete without materializing data."""
pass

# END Data Management Methods

# To/From Pandas
Expand Down
4 changes: 4 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ def lazy_execution(self):
def finalize(self):
self._modin_frame.finalize()

def execute(self):
self.finalize()
self._modin_frame.wait_computations()

def to_pandas(self):
return self._modin_frame.to_pandas()

Expand Down
8 changes: 8 additions & 0 deletions modin/experimental/core/storage_formats/hdk/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ def finalize(self):
# TODO: implement this for HDK storage format
raise NotImplementedError()

def execute(self):
self._modin_frame._execute()

def force_import(self):
"""Force table import."""
# HDK-specific method
self._modin_frame.force_import()

def to_pandas(self):
return self._modin_frame.to_pandas()

Expand Down
1 change: 1 addition & 0 deletions modin/test/test_executions_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_base_abstract_methods():
"__init__",
"free",
"finalize",
"execute",
"to_pandas",
"from_pandas",
"from_arrow",
Expand Down
32 changes: 32 additions & 0 deletions modin/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

import json
from textwrap import dedent, indent
from unittest.mock import Mock, patch

import numpy as np
import pandas
import pytest

Expand Down Expand Up @@ -336,3 +338,33 @@ def test_assert_dtypes_equal():
df1 = df1.astype({"a": "str"})
with pytest.raises(AssertionError):
assert_dtypes_equal(df1, df2)


def test_execute():
data = np.random.rand(100, 64)
modin_df, pandas_df = create_test_dfs(data)
partitions = modin_df._query_compiler._modin_frame._partitions.flatten()
mgr_cls = modin_df._query_compiler._modin_frame._partition_mgr_cls

# check modin case
with patch.object(mgr_cls, "wait_partitions", new=Mock()):
modin.utils.execute(modin_df)
mgr_cls.wait_partitions.assert_called_once()
assert (mgr_cls.wait_partitions.call_args[0] == partitions).all()

# check pandas case without error
with patch.object(mgr_cls, "wait_partitions", new=Mock()):
modin.utils.execute(pandas_df)
mgr_cls.wait_partitions.assert_not_called()

# muke sure `trigger_hdk_import=True` doesn't broke anything
# when using other storage formats
with patch.object(mgr_cls, "wait_partitions", new=Mock()):
modin.utils.execute(modin_df, trigger_hdk_import=True)
mgr_cls.wait_partitions.assert_called_once()

# check several modin dataframes
with patch.object(mgr_cls, "wait_partitions", new=Mock()):
modin.utils.execute(modin_df, modin_df[modin_df.columns[:4]])
mgr_cls.wait_partitions.assert_called
assert mgr_cls.wait_partitions.call_count == 2
22 changes: 22 additions & 0 deletions modin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from typing import (
Any,
Callable,
Iterable,
List,
Mapping,
Optional,
Expand Down Expand Up @@ -606,6 +607,27 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any:
return obj


def execute(*objs: Iterable[Any], trigger_hdk_import: bool = False) -> None:
"""
Trigger the lazy computations for each obj in `objs`, if any, and wait for them to complete.
Parameters
----------
*objs : Iterable[Any]
A collection of objects to trigger lazy computations.
trigger_hdk_import : bool, default: False
Trigger import execution. Makes sense only for HDK storage format.
Safe to use with other storage formats.
"""
for obj in objs:
if not hasattr(obj, "_query_compiler"):
continue
query_compiler = obj._query_compiler
query_compiler.execute()
if trigger_hdk_import and hasattr(query_compiler, "force_import"):
query_compiler.force_import()


def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]:
"""
Wrap a sequence of passed values in a flattened list.
Expand Down

0 comments on commit e558d9d

Please sign in to comment.