Skip to content

Commit

Permalink
Test2
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyPavlenko committed Dec 8, 2023
1 parent 9da33d9 commit 32f547c
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 69 deletions.
148 changes: 85 additions & 63 deletions modin/core/execution/ray/common/deferred_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,19 @@
# governing permissions and limitations under the License.
from builtins import NotImplementedError
from enum import Enum
from itertools import islice
from types import GeneratorType
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, Generator
from typing import (

Check warning on line 17 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L13-L17

Added lines #L13 - L17 were not covered by tests
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
Generator,
)

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'Generator' is not used.

import pandas
import ray
Expand Down Expand Up @@ -101,12 +112,7 @@ def _ref(obj):

def exec(

Check warning on line 113 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L113

Added line #L113 was not covered by tests
self,
) -> Tuple[
ObjectRefOrListType,
ObjectRefOrListType,
ObjectRefOrListType,
ObjectRefOrListType,
]:
) -> Tuple[ObjectRefOrListType, Union["MetaList", List], Union[int, List[int]]]:
"""
Execute this task, if required.
Expand All @@ -116,63 +122,58 @@ def exec(
The execution result, length, width and the worker's ip address.
"""
if not self.has_result:
if self.num_returns == 1 and not isinstance(self.data, DeferredExecution):
data = self.data
if self.num_returns == 1 and not isinstance(data, DeferredExecution):
task = self.task
if task[3] and task[4]:
result = remote_exec_func.remote(
task[0], self.data, *task[1], **task[2]
if isinstance(data, ray.ObjectRef):
ray.wait([data])
result, length, width, ip = remote_exec_func.remote(

Check warning on line 131 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L124-L131

Added lines #L124 - L131 were not covered by tests
task[0], data, *task[1], **task[2]
)
self._set_result(*result)
return result
meta = MetaList([length, width, ip])
self._set_result(result, meta, 0)
return result, meta, 0

Check warning on line 136 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L134-L136

Added lines #L134 - L136 were not covered by tests

consumers, output = deconstruct(self)
num_returns = sum(c.num_returns for c in consumers)
num_returns = sum(c.num_returns for c in consumers) + 1
if isinstance(output[0], ray.ObjectRef):
ray.wait(output[0:1])
results = _remote_exec_chain(num_returns, *output)
if not isinstance(results, ListOrTuple):
results = [results]
meta = MetaList(results.pop())
meta_off = 0
results = iter(results)
for de in consumers:
if de.num_returns == 1:
de._set_result(
next(results),
None,
None,
None,
)
de._set_result(next(results), meta, meta_off)
meta_off += 2

Check warning on line 149 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L138-L149

Added lines #L138 - L149 were not covered by tests
else:
res = []
lengths = []
widths = []
for _ in range(de.num_returns):
res.append(next(results))
lengths.append(None)
widths.append(None)
de._set_result(res, lengths, widths, None)
res = list(islice(results, num_returns))
offsets = list(range(0, 2 * num_returns, 2))
de._set_result(res, res, offsets)
meta_off += 2 * num_returns

Check warning on line 154 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L151-L154

Added lines #L151 - L154 were not covered by tests

return self.data, self.length, self.width, self.ip
return self.data, self.meta, self.meta_off

Check warning on line 156 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L156

Added line #L156 was not covered by tests

def _set_result(

Check warning on line 158 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L158

Added line #L158 was not covered by tests
self,
result: ObjectRefOrListType,
length: ObjectRefOrListType,
width: ObjectRefOrListType,
ip: ObjectRefOrListType,
meta: Union["MetaList", List],
meta_off: Union[int, List[int]],
):
"""
Set the execution result.
Parameters
----------
result : ObjectRefOrListType
length : ObjectRefOrListType
width : ObjectRefOrListType
ip : ObjectRefOrListType
meta : MetaList or list
meta_off : int or list of int
"""
del self.task
self.data = result
self.length = length
self.width = width
self.ip = ip
self.meta = meta
self.meta_off = meta_off

Check warning on line 176 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L173-L176

Added lines #L173 - L176 were not covered by tests

@property
def has_result(self):

Check warning on line 179 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L178-L179

Added lines #L178 - L179 were not covered by tests
Expand Down Expand Up @@ -200,11 +201,25 @@ def __reduce__(self):
raise NotImplementedError()


ListOrDe = (DeferredExecution, list, tuple)
class MetaList:
def __init__(self, obj: Union[ray.ObjectRef, ClientObjectRef, List]):
self._obj = obj

Check warning on line 206 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L204-L206

Added lines #L204 - L206 were not covered by tests

def __getitem__(self, item):
obj = self._obj
if not isinstance(obj, list):
from modin.core.execution.ray.common import RayWrapper

Check warning on line 211 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L208-L211

Added lines #L208 - L211 were not covered by tests

def has_list_or_de(it: Iterable):
return any(isinstance(i, ListOrDe) for i in it)
self._obj = obj = RayWrapper.materialize(obj)
return obj[item]

Check warning on line 214 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L213-L214

Added lines #L213 - L214 were not covered by tests

def __setitem__(self, key, value):
obj = self._obj
if not isinstance(obj, list):
from modin.core.execution.ray.common import RayWrapper

Check warning on line 219 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L216-L219

Added lines #L216 - L219 were not covered by tests

self._obj = obj = RayWrapper.materialize(obj)
obj[key] = value

Check warning on line 222 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L221-L222

Added lines #L221 - L222 were not covered by tests


class DeferredExecutionException(Exception):
Expand All @@ -223,6 +238,13 @@ class _Tag(Enum):
END = 3

Check warning on line 238 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L238

Added line #L238 was not covered by tests


ListOrDe = (DeferredExecution, list, tuple)

Check warning on line 241 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L241

Added line #L241 was not covered by tests


def has_list_or_de(it: Iterable):
return any(isinstance(i, ListOrDe) for i in it)

Check warning on line 245 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L244-L245

Added lines #L244 - L245 were not covered by tests


def deconstruct(de: DeferredExecution) -> Tuple[List[DeferredExecution], List[Any]]:
stack = []
result_consumers = []
Expand Down Expand Up @@ -399,8 +421,9 @@ def _remote_exec_multi_chain(num_returns: int, *args: Tuple) -> List:

def construct(num_returns: int, args: Tuple) -> List:
chain = list(reversed(args))
meta = []
try:
stack = [construct_chain(chain, {}, None)]
stack = [construct_chain(chain, {}, meta, None)]
while stack:
try:
gen = stack.pop()
Expand All @@ -411,7 +434,8 @@ def construct(num_returns: int, args: Tuple) -> List:
else:
yield obj
except StopIteration:
...
meta.append(get_node_ip_address())
yield meta
except DeferredExecutionException as err:
for _ in range(num_returns):
yield err
Expand All @@ -426,7 +450,8 @@ def construct(num_returns: int, args: Tuple) -> List:
def construct_chain(

Check warning on line 450 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L450

Added line #L450 was not covered by tests
chain: List,
refs: Dict[int, Any],
lst: List,
meta: List,
lst: Optional[List],
):
pop = chain.pop
tg_e = _Tag.END

Check warning on line 457 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L456-L457

Added lines #L456 - L457 were not covered by tests
Expand All @@ -438,7 +463,7 @@ def construct_chain(
obj = refs[pop()]
elif obj is _Tag.LIST:
obj = []
yield construct_list(obj, chain, refs)
yield construct_list(obj, chain, refs, meta)
if isinstance(obj[0], DeferredExecutionException):
raise obj[0]

Check warning on line 468 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L459-L468

Added lines #L459 - L468 were not covered by tests

Expand All @@ -457,38 +482,35 @@ def construct_chain(
args.reverse()

Check warning on line 482 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L480-L482

Added lines #L480 - L482 were not covered by tests
else:
args = []
yield construct_list(args, chain, refs)
yield construct_list(args, chain, refs, meta)
if (args_len := pop()) >= 0:
kwargs = {pop(): pop() for _ in range(args_len)}

Check warning on line 487 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L484-L487

Added lines #L484 - L487 were not covered by tests
else:
values = []
yield construct_list(values, chain, refs)
yield construct_list(values, chain, refs, meta)
kwargs = {pop(): v for v in values}

Check warning on line 491 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L489-L491

Added lines #L489 - L491 were not covered by tests

obj = _exec_func(fn, obj, args, kwargs)

Check warning on line 493 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L493

Added line #L493 was not covered by tests

if ref := pop():
refs[ref] = obj
if pop():
yield obj


def _add_result(obj: Any, output: List):
out_append = output.append
if isinstance(obj, pandas.DataFrame):
out_append(ray.put(obj) if obj.memory_usage(deep=True).sum() > 100000 else obj)
out_append(len(obj))
out_append(len(obj.columns))
else:
out_append(obj)
out_append(0)
out_append(0)
if isinstance(obj, ListOrTuple):
for o in obj:
meta.append(len(o) if hasattr(o, "__len__") else 0)
meta.append(len(o.columns) if hasattr(o, "columns") else 0)
yield o

Check warning on line 502 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L495-L502

Added lines #L495 - L502 were not covered by tests
else:
meta.append(len(obj) if hasattr(obj, "__len__") else 0)
meta.append(len(obj.columns) if hasattr(obj, "columns") else 0)
yield obj

Check warning on line 506 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L504-L506

Added lines #L504 - L506 were not covered by tests


def construct_list(

Check warning on line 509 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L509

Added line #L509 was not covered by tests
lst: List,
chain: List,
refs: Dict[int, Any],
meta: List,
):
pop = chain.pop
lst_append = lst.append

Check warning on line 516 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L515-L516

Added lines #L515 - L516 were not covered by tests
Expand All @@ -498,10 +520,10 @@ def construct_list(
if obj == _Tag.END:
break
elif obj == _Tag.CHAIN:
yield construct_chain(chain, refs, lst)
yield construct_chain(chain, refs, meta, lst)
elif obj == _Tag.LIST:
lst_append([])
yield construct_list(lst[-1], chain, refs)
yield construct_list(lst[-1], chain, refs, meta)
elif obj is _Tag.REF:
lst_append(refs[pop()])

Check warning on line 528 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L518-L528

Added lines #L518 - L528 were not covered by tests
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
DeferredExecution,
has_list_or_de,
remote_exec_func,
MetaList,
)

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'has_list_or_de' is not used.
Import of 'remote_exec_func' is not used.
from modin.core.execution.ray.common.utils import ObjectIDType

Check warning on line 30 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L30

Added line #L30 was not covered by tests
from modin.logging import get_logger
Expand Down Expand Up @@ -64,9 +65,8 @@ def __init__(
if isinstance(data, DeferredExecution):
data.ref_count(1)
self._data_ref = data
self._length_cache = length
self._width_cache = width
self._ip_cache = ip
self.meta = MetaList([length, width, ip])
self.meta_off = 0

Check warning on line 69 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L65-L69

Added lines #L65 - L69 were not covered by tests

def __del__(self):

Check warning on line 71 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L71

Added line #L71 was not covered by tests
"""Decrement the reference counter."""
Expand Down Expand Up @@ -116,9 +116,8 @@ def drain_call_queue(self):
if isinstance(data, DeferredExecution):
(

Check warning on line 117 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L113-L117

Added lines #L113 - L117 were not covered by tests
self._data_ref,
self._length_cache,
self._width_cache,
self._ip_cache,
self._meta,
self._meta_off,
) = data.exec()
data.ref_count(-1)

Check warning on line 122 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L122

Added line #L122 was not covered by tests

Expand Down Expand Up @@ -302,6 +301,30 @@ def _data(self):
self.drain_call_queue()
return self._data_ref

Check warning on line 302 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L299-L302

Added lines #L299 - L302 were not covered by tests

@property
def _length_cache(self):
return self.meta[self.meta_off]

Check warning on line 306 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L304-L306

Added lines #L304 - L306 were not covered by tests

@_length_cache.setter
def _length_cache(self, value):
self.meta[self.meta_off] = value

Check warning on line 310 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L308-L310

Added lines #L308 - L310 were not covered by tests

@property
def _width_cache(self):
return self.meta[self.meta_off + 1]

Check warning on line 314 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L312-L314

Added lines #L312 - L314 were not covered by tests

@_width_cache.setter
def _width_cache(self, value):
self.meta[self.meta_off + 1] = value

Check warning on line 318 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L316-L318

Added lines #L316 - L318 were not covered by tests

@property
def _ip_cache(self):
return self.meta[-1]

Check warning on line 322 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L320-L322

Added lines #L320 - L322 were not covered by tests

@_ip_cache.setter
def _ip_cache(self, value):
self.meta[-1] = value

Check warning on line 326 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L324-L326

Added lines #L324 - L326 were not covered by tests


@ray.remote(num_returns=2)
def _get_index_and_columns(df): # pragma: no cover
Expand Down

0 comments on commit 32f547c

Please sign in to comment.