Skip to content

Commit

Permalink
FEAT-#6492: Add from_map feature to create dataframe (#7215)
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Iaroslav <[email protected]>
  • Loading branch information
YarShev authored Apr 30, 2024
1 parent 747f4de commit 9fa326f
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 12 deletions.
20 changes: 17 additions & 3 deletions modin/core/execution/dask/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@

from collections import UserDict

import pandas
from dask.distributed import wait
from distributed import Future
from distributed.client import default_client


def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover
def _deploy_dask_func(func, *args, return_pandas_df=None, **kwargs): # pragma: no cover
"""
Wrap `func` to ease calling it remotely.
Expand All @@ -30,6 +31,8 @@ def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover
A local function that we want to call remotely.
*args : iterable
Positional arguments to pass to `func` when calling remotely.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
**kwargs : dict
Keyword arguments to pass to `func` when calling remotely.
Expand All @@ -38,7 +41,10 @@ def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover
distributed.Future or list
Dask identifier of the result being put into distributed memory.
"""
return func(*args, **kwargs)
result = func(*args, **kwargs)
if return_pandas_df and not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result


class DaskWrapper:
Expand All @@ -50,6 +56,7 @@ def deploy(
func,
f_args=None,
f_kwargs=None,
return_pandas_df=None,
num_returns=1,
pure=True,
):
Expand All @@ -64,6 +71,8 @@ def deploy(
Positional arguments to pass to ``func``.
f_kwargs : dict, optional
Keyword arguments to pass to ``func``.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
num_returns : int, default: 1
The number of returned objects.
pure : bool, default: True
Expand All @@ -82,7 +91,12 @@ def deploy(
else:
# for the case where type(func) is distributed.Future
remote_task_future = client.submit(
_deploy_dask_func, func, *args, pure=pure, **kwargs
_deploy_dask_func,
func,
*args,
pure=pure,
return_pandas_df=return_pandas_df,
**kwargs,
)
if num_returns != 1:
return [
Expand Down
44 changes: 44 additions & 0 deletions modin/core/execution/dask/implementations/pandas_on_dask/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

"""Module houses class that implements ``BaseIO`` using Dask as an execution engine."""

import numpy as np
from distributed.client import default_client

from modin.core.execution.dask.common import DaskWrapper
Expand Down Expand Up @@ -68,6 +69,7 @@ class PandasOnDaskIO(BaseIO):
"""The class implements interface in ``BaseIO`` using Dask as an execution engine."""

frame_cls = PandasOnDaskDataframe
frame_partition_cls = PandasOnDaskDataframePartition
query_compiler_cls = PandasQueryCompiler
build_args = dict(
frame_cls=PandasOnDaskDataframe,
Expand Down Expand Up @@ -188,3 +190,45 @@ def df_to_series(df):
partitions = [client.submit(df_to_series, part) for part in partitions]

return from_delayed(partitions)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.
This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.
Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
func = cls.frame_cls._partition_mgr_cls.preprocess_func(func)
partitions = np.array(
[
[
cls.frame_partition_cls(
DaskWrapper.deploy(
func,
f_args=(obj,) + args,
f_kwargs=kwargs,
return_pandas_df=True,
)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))
5 changes: 5 additions & 0 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ def from_ray(cls, ray_obj):
def from_dask(cls, dask_obj):
return cls.get_factory()._from_dask(dask_obj)

@classmethod
@_inherit_docstrings(factories.BaseFactory._from_map)
def from_map(cls, func, iterable, *args, **kwargs):
return cls.get_factory()._from_map(func, iterable, *args, **kwargs)

@classmethod
@_inherit_docstrings(factories.BaseFactory._read_parquet)
def read_parquet(cls, **kwargs):
Expand Down
26 changes: 26 additions & 0 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,32 @@ def _from_ray(cls, ray_obj):
def _from_dask(cls, dask_obj):
return cls.io_cls.from_dask(dask_obj)

@classmethod
def _from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.
This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.
Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
return cls.io_cls.from_map(func, iterable, *args, **kwargs)

@classmethod
@doc(
_doc_io_method_template,
Expand Down
18 changes: 14 additions & 4 deletions modin/core/execution/ray/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from types import FunctionType
from typing import Sequence

import pandas
import ray
from ray.util.client.common import ClientObjectRef

Expand All @@ -30,7 +31,7 @@


@ray.remote
def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover
def _deploy_ray_func(func, *args, return_pandas_df=None, **kwargs): # pragma: no cover
"""
Wrap `func` to ease calling it remotely.
Expand All @@ -40,6 +41,8 @@ def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover
A local function that we want to call remotely.
*args : iterable
Positional arguments to pass to `func` when calling remotely.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
**kwargs : dict
Keyword arguments to pass to `func` when calling remotely.
Expand All @@ -48,7 +51,10 @@ def _deploy_ray_func(func, *args, **kwargs): # pragma: no cover
ray.ObjectRef or list
Ray identifier of the result being put to Plasma store.
"""
return func(*args, **kwargs)
result = func(*args, **kwargs)
if return_pandas_df and not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result


class RayWrapper:
Expand All @@ -57,7 +63,9 @@ class RayWrapper:
_func_cache = {}

@classmethod
def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
def deploy(
cls, func, f_args=None, f_kwargs=None, return_pandas_df=None, num_returns=1
):
"""
Run local `func` remotely.
Expand All @@ -69,6 +77,8 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
Positional arguments to pass to ``func``.
f_kwargs : dict, optional
Keyword arguments to pass to ``func``.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
num_returns : int, default: 1
Amount of return values expected from `func`.
Expand All @@ -81,7 +91,7 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
kwargs = {} if f_kwargs is None else f_kwargs
return _deploy_ray_func.options(
num_returns=num_returns, resources=RayTaskCustomResources.get()
).remote(func, *args, **kwargs)
).remote(func, *args, return_pandas_df=return_pandas_df, **kwargs)

@classmethod
def is_future(cls, item):
Expand Down
41 changes: 41 additions & 0 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io

import numpy as np
import pandas
from pandas.io.common import get_handle, stringify_path
from ray.data import from_pandas_refs
Expand Down Expand Up @@ -68,6 +69,7 @@ class PandasOnRayIO(RayIO):
"""Factory providing methods for performing I/O operations using pandas as storage format on Ray as engine."""

frame_cls = PandasOnRayDataframe
frame_partition_cls = PandasOnRayDataframePartition
query_compiler_cls = PandasQueryCompiler
build_args = dict(
frame_partition_cls=PandasOnRayDataframePartition,
Expand Down Expand Up @@ -302,3 +304,42 @@ def to_ray(cls, modin_obj):
"""
parts = unwrap_partitions(modin_obj, axis=0)
return from_pandas_refs(parts)

@classmethod
def from_map(cls, func, iterable, *args, **kwargs):
"""
Create a Modin `query_compiler` from a map function.
This method will construct a Modin `query_compiler` split by row partitions.
The number of row partitions matches the number of elements in the iterable object.
Parameters
----------
func : callable
Function to map across the iterable object.
iterable : Iterable
An iterable object.
*args : tuple
Positional arguments to pass in `func`.
**kwargs : dict
Keyword arguments to pass in `func`.
Returns
-------
BaseQueryCompiler
QueryCompiler containing data returned by map function.
"""
func = cls.frame_cls._partition_mgr_cls.preprocess_func(func)
partitions = np.array(
[
[
cls.frame_partition_cls(
RayWrapper.deploy(
func, f_args=(obj,) + args, return_pandas_df=True, **kwargs
)
)
]
for obj in iterable
]
)
return cls.query_compiler_cls(cls.frame_cls(partitions))
20 changes: 16 additions & 4 deletions modin/core/execution/unidist/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import asyncio

import pandas
import unidist


@unidist.remote
def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover
def _deploy_unidist_func(
func, *args, return_pandas_df=None, **kwargs
): # pragma: no cover
"""
Wrap `func` to ease calling it remotely.
Expand All @@ -33,6 +36,8 @@ def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover
A local function that we want to call remotely.
*args : iterable
Positional arguments to pass to `func` when calling remotely.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
**kwargs : dict
Keyword arguments to pass to `func` when calling remotely.
Expand All @@ -41,14 +46,19 @@ def _deploy_unidist_func(func, *args, **kwargs): # pragma: no cover
unidist.ObjectRef or list[unidist.ObjectRef]
Unidist identifier of the result being put to object store.
"""
return func(*args, **kwargs)
result = func(*args, **kwargs)
if return_pandas_df and not isinstance(result, pandas.DataFrame):
result = pandas.DataFrame(result)
return result


class UnidistWrapper:
"""Mixin that provides means of running functions remotely and getting local results."""

@classmethod
def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
def deploy(
cls, func, f_args=None, f_kwargs=None, return_pandas_df=None, num_returns=1
):
"""
Run local `func` remotely.
Expand All @@ -60,6 +70,8 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
Positional arguments to pass to ``func``.
f_kwargs : dict, optional
Keyword arguments to pass to ``func``.
return_pandas_df : bool, optional
Whether to convert the result of `func` to a pandas DataFrame or not.
num_returns : int, default: 1
Amount of return values expected from `func`.
Expand All @@ -71,7 +83,7 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
args = [] if f_args is None else f_args
kwargs = {} if f_kwargs is None else f_kwargs
return _deploy_unidist_func.options(num_returns=num_returns).remote(
func, *args, **kwargs
func, *args, return_pandas_df=return_pandas_df, **kwargs
)

@classmethod
Expand Down
Loading

0 comments on commit 9fa326f

Please sign in to comment.