Skip to content

Commit

Permalink
PERF-#0000: dask: return node address only if requested
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed Oct 16, 2023
1 parent f14b0fe commit ac6d7ce
Showing 1 changed file with 35 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@

"""Module houses class that wraps data (block partition) and its metadata."""

import pandas
from distributed import Future
from distributed.utils import get_ip
from distributed.utils import get_ip as get_node_address

Check warning on line 17 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L17

Added line #L17 was not covered by tests

from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
from modin.core.execution.dask.common import DaskWrapper
Expand Down Expand Up @@ -64,7 +63,7 @@ def __init__(self, data, length=None, width=None, ip=None, call_queue=None):
)
)

def apply(self, func, *args, **kwargs):
def apply(self, func, *args, get_ip=False, **kwargs):

Check warning on line 66 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L66

Added line #L66 was not covered by tests
"""
Apply a function to the object wrapped by this partition.
Expand All @@ -74,6 +73,7 @@ def apply(self, func, *args, **kwargs):
A function to apply.
*args : iterable
Additional positional arguments to be passed in `func`.
get_ip : bool, default: False
**kwargs : dict
Additional keyword arguments to be passed in `func`.
Expand All @@ -93,26 +93,31 @@ def apply(self, func, *args, **kwargs):
self._is_debug(log) and log.debug(
f"SUBMIT::_apply_list_of_funcs::{self._identity}"
)
futures = DaskWrapper.deploy(
future = DaskWrapper.deploy(

Check warning on line 96 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L96

Added line #L96 was not covered by tests
func=apply_list_of_funcs,
f_args=(call_queue, self._data),
num_returns=2,
f_args=(call_queue, self._data, get_ip),
num_returns=2 if get_ip else 1,
pure=False,
)
else:
# We handle `len(call_queue) == 1` in a different way because
# this improves performance a bit.
func, f_args, f_kwargs = call_queue[0]
futures = DaskWrapper.deploy(
f_kwargs["get_ip"] = get_ip
future = DaskWrapper.deploy(

Check warning on line 107 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L106-L107

Added lines #L106 - L107 were not covered by tests
func=apply_func,
f_args=(self._data, func, *f_args),
f_kwargs=f_kwargs,
num_returns=2,
num_returns=2 if get_ip else 1,
pure=False,
)
self._is_debug(log) and log.debug(f"SUBMIT::_apply_func::{self._identity}")
self._is_debug(log) and log.debug(f"EXIT::Partition.apply::{self._identity}")
return self.__constructor__(futures[0], ip=futures[1])
if get_ip:
partition = self.__constructor__(future[0], ip=future[1])

Check warning on line 117 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L116-L117

Added lines #L116 - L117 were not covered by tests
else:
partition = self.__constructor__(future)
return partition

Check warning on line 120 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L119-L120

Added lines #L119 - L120 were not covered by tests

def drain_call_queue(self):
"""Execute all operations stored in the call queue on the object wrapped by this partition."""
Expand All @@ -127,26 +132,25 @@ def drain_call_queue(self):
self._is_debug(log) and log.debug(
f"SUBMIT::_apply_list_of_funcs::{self._identity}"
)
futures = DaskWrapper.deploy(
future = DaskWrapper.deploy(

Check warning on line 135 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L135

Added line #L135 was not covered by tests
func=apply_list_of_funcs,
f_args=(call_queue, self._data),
num_returns=2,
num_returns=1,
pure=False,
)
else:
# We handle `len(call_queue) == 1` in a different way because
# this improves performance a bit.
func, f_args, f_kwargs = call_queue[0]
self._is_debug(log) and log.debug(f"SUBMIT::_apply_func::{self._identity}")
futures = DaskWrapper.deploy(
future = DaskWrapper.deploy(

Check warning on line 146 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L146

Added line #L146 was not covered by tests
func=apply_func,
f_args=(self._data, func, *f_args),
f_kwargs=f_kwargs,
num_returns=2,
num_returns=1,
pure=False,
)
self._data = futures[0]
self._ip_cache = futures[1]
self._data = future

Check warning on line 153 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L153

Added line #L153 was not covered by tests
self._is_debug(log) and log.debug(
f"EXIT::Partition.drain_call_queue::{self._identity}"
)
Expand Down Expand Up @@ -307,13 +311,13 @@ def ip(self, materialize=True):
IP address of the node that holds the data.
"""
if self._ip_cache is None:
self._ip_cache = self.apply(lambda df: pandas.DataFrame([]))._ip_cache
self._ip_cache = self.apply(lambda df: None, get_ip=True)._ip_cache

Check warning on line 314 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L314

Added line #L314 was not covered by tests
if materialize and isinstance(self._ip_cache, Future):
self._ip_cache = DaskWrapper.materialize(self._ip_cache)
return self._ip_cache


def apply_func(partition, func, *args, **kwargs):
def apply_func(partition, func, *args, get_ip=False, **kwargs):

Check warning on line 320 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L320

Added line #L320 was not covered by tests
"""
Execute a function on the partition in a worker process.
Expand All @@ -325,6 +329,7 @@ def apply_func(partition, func, *args, **kwargs):
The function to perform.
*args : list
Positional arguments to pass to ``func``.
get_ip : bool, default: False
**kwargs : dict
Keyword arguments to pass to ``func``.
Expand All @@ -333,18 +338,21 @@ def apply_func(partition, func, *args, **kwargs):
pandas.DataFrame
The resulting pandas DataFrame.
str
The node IP address of the worker process.
The node IP address of the worker process (if `get_ip==True`).
Notes
-----
Directly passing a call queue entry (i.e. a list of [func, args, kwargs]) instead of
destructuring it causes a performance penalty.
"""
result = func(partition, *args, **kwargs)
return result, get_ip()
if get_ip:
return result, get_node_address()

Check warning on line 350 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L349-L350

Added lines #L349 - L350 were not covered by tests
else:
return result

Check warning on line 352 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L352

Added line #L352 was not covered by tests


def apply_list_of_funcs(call_queue, partition):
def apply_list_of_funcs(call_queue, partition, get_ip=False):

Check warning on line 355 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L355

Added line #L355 was not covered by tests
"""
Execute all operations stored in the call queue on the partition in a worker process.
Expand All @@ -354,14 +362,18 @@ def apply_list_of_funcs(call_queue, partition):
A call queue of ``[func, args, kwargs]`` triples that needs to be executed on the partition.
partition : pandas.DataFrame
A pandas DataFrame the call queue needs to be executed on.
get_ip : bool, default: False
Returns
-------
pandas.DataFrame
The resulting pandas DataFrame.
str
The node IP address of the worker process.
The node IP address of the worker process (if `get_ip==True`).
"""
for func, f_args, f_kwargs in call_queue:
partition = func(partition, *f_args, **f_kwargs)
return partition, get_ip()
result = func(partition, *f_args, **f_kwargs)
if get_ip:
return result, get_node_address()

Check warning on line 377 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L375-L377

Added lines #L375 - L377 were not covered by tests
else:
return result

Check warning on line 379 in modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py#L379

Added line #L379 was not covered by tests

0 comments on commit ac6d7ce

Please sign in to comment.