Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#5221: add execute to trigger lazy computations and wait for them to complete #6648

Merged
merged 10 commits into from
Oct 25, 2023
Merged
1 change: 1 addition & 0 deletions docs/development/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ details. The documentation covers most modules, with more docs being added every
├───examples
├───modin
│ ├─── :doc:`config </flow/modin/config>`
| ├─── :doc:`utils </flow/modin/utils>`
│ ├───core
│ │ ├─── :doc:`dataframe </flow/modin/core/dataframe/index>`
│ │ │ ├─── :doc:`algebra </flow/modin/core/dataframe/algebra>`
Expand Down
12 changes: 12 additions & 0 deletions docs/flow/modin/utils.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
:orphan:

Modin Utils
"""""""""""

Here are utilities that can be useful when working with Modin.

Public API
''''''''''

.. autofunction:: modin.utils.try_cast_to_pandas
.. autofunction:: modin.utils.execute
22 changes: 9 additions & 13 deletions docs/usage_guide/benchmarking.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ To benchmark a single Modin function, often turning on the
:code:`BenchmarkMode` will suffice.

There is no simple way to benchmark more complex Modin workflows, though
benchmark mode or calling ``repr`` on Modin objects may be useful. The
:doc:`Modin logs </usage_guide/advanced_usage/modin_logging>` may help you
benchmark mode or calling ``modin.utils.execute`` on Modin objects may be useful.
The :doc:`Modin logs </usage_guide/advanced_usage/modin_logging>` may help you
identify bottlenecks in your code, and they may also help profile the execution
of each Modin function.

Expand Down Expand Up @@ -125,9 +125,8 @@ at each Modin :doc:`layer </development/architecture>`. Log mode is more
useful when used in conjuction with benchmark mode.

Sometimes, if you don't have a natural end-point to your workflow, you can
just call ``repr`` on the workflow's final Modin objects. That will typically
block on any asynchronous computation. However, beware that ``repr`` can also
be misleading, e.g. here:
just call ``modin.utils.execute`` on the workflow's final Modin objects.
That will typically block on any asynchronous computation:

.. code-block:: python

Expand All @@ -137,6 +136,7 @@ be misleading, e.g. here:

import modin.pandas as pd
from modin.config import MinPartitionSize, NPartitions
import modin.utils

MinPartitionSize.put(32)
NPartitions.put(16)
Expand All @@ -149,17 +149,13 @@ be misleading, e.g. here:
ray.init()
df1 = pd.DataFrame(list(range(10_000)), columns=['A'])
result = df1.map(slow_add_one)
%time repr(result)
# time.sleep(10)
# %time modin.utils.execute(result)
%time result.to_parquet(BytesIO())
.. code-block::python

The ``repr`` takes only 802 milliseconds, but writing the result to a buffer
takes 9.84 seconds. However, if you uncomment the :code:`time.sleep` before the
:code:`to_parquet` call, the :code:`to_parquet` takes just 23.8 milliseconds!
The problem is that the ``repr`` blocks only on getting the first few and the
last few rows, but the slow execution is for row 5001, which Modin is
computing asynchronously in the background even after ``repr`` finishes.
Writing the result to a buffer takes 9.84 seconds. However, if you uncomment
the :code:`%time modin.utils.execute(result)` before the :code:`to_parquet`
call, the :code:`to_parquet` takes just 23.8 milliseconds!

.. note::
If you see any Modin documentation touting Modin's speed without using
Expand Down
4 changes: 4 additions & 0 deletions modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ def __init__(self, modin_frame):
def finalize(self):
self._modin_frame.finalize()

def execute(self):
self.finalize()
self._modin_frame.wait_computations()

@classmethod
def from_pandas(cls, df, data_cls):
return cls(data_cls.from_pandas(df))
Expand Down
4 changes: 4 additions & 0 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4106,6 +4106,10 @@ def finalize(self):
"""
self._partition_mgr_cls.finalize(self._partitions)

def wait_computations(self):
"""Wait for all computations to complete without materializing data."""
self._partition_mgr_cls.wait_partitions(self._partitions.flatten())

def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True):
"""
Get a Modin DataFrame that implements the dataframe exchange protocol.
Expand Down
5 changes: 5 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ def finalize(self):
"""Finalize constructing the dataframe calling all deferred functions which were used to build it."""
pass

@abc.abstractmethod
def execute(self):
"""Wait for all computations to complete without materializing data."""
pass

# END Data Management Methods

# To/From Pandas
Expand Down
4 changes: 4 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ def lazy_execution(self):
def finalize(self):
self._modin_frame.finalize()

def execute(self):
self.finalize()
self._modin_frame.wait_computations()

def to_pandas(self):
return self._modin_frame.to_pandas()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ def finalize(self):
# TODO: implement this for HDK storage format
raise NotImplementedError()

def execute(self):
self._modin_frame._execute()

def force_import(self):
"""Force table import."""
# HDK-specific method
self._modin_frame.force_import()

def to_pandas(self):
return self._modin_frame.to_pandas()

Expand Down
1 change: 1 addition & 0 deletions modin/test/test_executions_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_base_abstract_methods():
"__init__",
"free",
"finalize",
"execute",
"to_pandas",
"from_pandas",
"from_arrow",
Expand Down
32 changes: 32 additions & 0 deletions modin/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

import json
from textwrap import dedent, indent
from unittest.mock import Mock, patch

import numpy as np
import pandas
import pytest

Expand Down Expand Up @@ -336,3 +338,33 @@ def test_assert_dtypes_equal():
df1 = df1.astype({"a": "str"})
with pytest.raises(AssertionError):
assert_dtypes_equal(df1, df2)


def test_execute():
data = np.random.rand(100, 64)
modin_df, pandas_df = create_test_dfs(data)
partitions = modin_df._query_compiler._modin_frame._partitions.flatten()
mgr_cls = modin_df._query_compiler._modin_frame._partition_mgr_cls

# check modin case
with patch.object(mgr_cls, "wait_partitions", new=Mock()):
modin.utils.execute(modin_df)
mgr_cls.wait_partitions.assert_called_once()
assert (mgr_cls.wait_partitions.call_args[0] == partitions).all()

# check pandas case without error
with patch.object(mgr_cls, "wait_partitions", new=Mock()):
modin.utils.execute(pandas_df)
mgr_cls.wait_partitions.assert_not_called()

# muke sure `trigger_hdk_import=True` doesn't broke anything
# when using other storage formats
with patch.object(mgr_cls, "wait_partitions", new=Mock()):
modin.utils.execute(modin_df, trigger_hdk_import=True)
mgr_cls.wait_partitions.assert_called_once()

# check several modin dataframes
with patch.object(mgr_cls, "wait_partitions", new=Mock()):
modin.utils.execute(modin_df, modin_df[modin_df.columns[:4]])
mgr_cls.wait_partitions.assert_called
assert mgr_cls.wait_partitions.call_count == 2
22 changes: 22 additions & 0 deletions modin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from typing import (
Any,
Callable,
Iterable,
List,
Mapping,
Optional,
Expand Down Expand Up @@ -606,6 +607,27 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any:
return obj


def execute(*objs: Iterable[Any], trigger_hdk_import: bool = False) -> None:
"""
Trigger the lazy computations for each obj in `objs`, if any, and wait for them to complete.

Parameters
----------
*objs : Iterable[Any]
A collection of objects to trigger lazy computations.
trigger_hdk_import : bool, default: False
Trigger import execution. Makes sense only for HDK storage format.
Safe to use with other storage formats.
"""
for obj in objs:
if not hasattr(obj, "_query_compiler"):
continue
query_compiler = obj._query_compiler
query_compiler.execute()
Copy link
Contributor

@Egor-Krivov Egor-Krivov Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note, that current implementation in timedf doesn't perform query_compiler.excute() when trigger_hdk_import is True. I don't remember exact reason, but it was part of our discussion in https://github.com/intel-ai/timedf/pull/460

@AndreyPavlenko Would this unconditional execute be a problem?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the operations have been executed with HDK, then we have nothing to import here, if with arrow - we will just import the arrow table to HDK, that could be redundant.

I think, force_import() should be a separate operation.

Copy link
Contributor

@Egor-Krivov Egor-Krivov Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndreyPavlenko Can current implementation negatively affect performance or it's just harmless redundancy?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not have any impact on performance, but, for example, if you want to measure an arrow-based execution time, you will not get a precise execution time, because the import will be performed after the execution. I.e. you will get the execution + import time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this issue offline with @AndreyPavlenko & @anmyachev , decided to keep the current implementation

if trigger_hdk_import and hasattr(query_compiler, "force_import"):
query_compiler.force_import()


def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]:
"""
Wrap a sequence of passed values in a flattened list.
Expand Down