Skip to content

Commit

Permalink
FEAT-#6767: Implement 'modin.utils.enable_exp_mode' context manager
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed Nov 23, 2023
1 parent b8323b5 commit e1caebb
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 12 deletions.
2 changes: 1 addition & 1 deletion docs/flow/modin/experimental/pandas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ Experimental API Reference
.. autofunction:: read_csv_glob
.. autofunction:: read_custom_text
.. autofunction:: read_pickle_distributed
.. automethod:: modin.experimental.pandas.DataFrame.to_pickle_distributed
.. automethod:: modin.experimental.pandas.DataFrame._exp.to_pickle_distributed
3 changes: 2 additions & 1 deletion docs/supported_apis/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ default to pandas.
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``to_period`` | `to_period`_ | D | |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``to_pickle`` | `to_pickle`_ | D | Experimental implementation: to_pickle_distributed |
| ``to_pickle`` | `to_pickle`_ | D | Experimental implementation: |
| | | | DataFrame._exp.to_pickle_distributed |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``to_records`` | `to_records`_ | D | |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
Expand Down
2 changes: 1 addition & 1 deletion docs/usage_guide/advanced_usage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Modin also supports these experimental APIs on top of pandas that are under acti
- :py:func:`~modin.experimental.pandas.read_sql` -- add optional parameters for the database connection
- :py:func:`~modin.experimental.pandas.read_custom_text` -- read custom text data from file
- :py:func:`~modin.experimental.pandas.read_pickle_distributed` -- read multiple files in a directory
- :py:meth:`~modin.experimental.pandas.DataFrame.to_pickle_distributed` -- write to multiple files in a directory
- :py:meth:`~modin.experimental.pandas.DataFrame._exp.to_pickle_distributed` -- write to multiple files in a directory

DataFrame partitioning API
--------------------------
Expand Down
21 changes: 18 additions & 3 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,22 @@ class FactoryDispatcher(object):

__factory: factories.BaseFactory = None

@classmethod
def _update_factory_if_already_initialized(cls) -> None:
"""
Update the factory if it has already been initialized.
Notes
-----
1. Primary use in `enable_exp_mode` context manager, to enable experimental mode.
2. This work is not delegated to `get_factory` method specifically so as not to
call the engine initialization function.
3. This work is not delegated to `_update_factory` method specifically so as not
not to initialize the factory the first time.
"""
if cls.__factory is not None:
cls.__factory = cls._update_factory()

@classmethod
def get_factory(cls) -> factories.BaseFactory:
"""Get current factory."""
Expand All @@ -118,14 +134,13 @@ def get_factory(cls) -> factories.BaseFactory:
return cls.__factory

@classmethod
# FIXME: replace `_` parameter with `*args`
def _update_factory(cls, _):
def _update_factory(cls, *args):
"""
Update and prepare factory with a new one specified via Modin config.
Parameters
----------
_ : object
*args : iterable
This parameters serves the compatibility purpose.
Does not affect the result.
"""
Expand Down
3 changes: 0 additions & 3 deletions modin/experimental/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,8 @@
read_custom_text,
read_pickle_distributed,
read_sql,
to_pickle_distributed,
)

setattr(DataFrame, "to_pickle_distributed", to_pickle_distributed) # noqa: F405

warnings.warn(
"Thank you for using the Modin Experimental pandas API."
+ "\nPlease note that some of these APIs deviate from pandas in order to "
Expand Down
2 changes: 1 addition & 1 deletion modin/experimental/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def read_pickle_distributed(
This experimental feature provides parallel reading from multiple pickle files which are
defined by glob pattern. The files must contain parts of one dataframe, which can be
obtained, for example, by `to_pickle_distributed` function.
obtained, for example, by `DataFrame._exp.to_pickle_distributed` function.
Parameters
----------
Expand Down
2 changes: 1 addition & 1 deletion modin/experimental/pandas/test/test_io_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def test_distributed_pickling(filename, compression):
if filename_param == test_default_to_pickle_filename
else contextlib.nullcontext()
):
df.to_pickle_distributed(filename, compression=compression)
df._exp.to_pickle_distributed(filename, compression=compression)
pickled_df = pd.read_pickle_distributed(filename, compression=compression)
df_equals(pickled_df, df)

Expand Down
73 changes: 73 additions & 0 deletions modin/pandas/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
CachedAccessor implements API of pandas.core.accessor.CachedAccessor
"""

import pickle

import pandas
from pandas._typing import CompressionOptions, StorageOptions
from pandas.core.dtypes.dtypes import SparseDtype

from modin import pandas as pd
from modin.config import IsExperimental
from modin.error_message import ErrorMessage
from modin.logging import ClassLogger
from modin.utils import _inherit_docstrings
Expand Down Expand Up @@ -191,3 +195,72 @@ def __get__(self, obj, cls):
accessor_obj = self._accessor(obj)
object.__setattr__(obj, self._name, accessor_obj)
return accessor_obj


class ExperimentalFunctions:
"""
Namespace class for accessing experimental functionality.
Parameters
----------
data : DataFrame or Series
Object to operate on.
"""

def __init__(self, data):
if IsExperimental.get() is not True:
raise ValueError(
"This only works in experimental mode. "
+ "Define `MODIN_EXPERIMENTAL=True` env variable or "
+ "use `modin.utils.enable_exp_mode` context manager"
)
self._data = data

def to_pickle_distributed(
self,
filepath_or_buffer,
compression: CompressionOptions = "infer",
protocol: int = pickle.HIGHEST_PROTOCOL,
storage_options: StorageOptions = None,
):
"""
Pickle (serialize) object to file.
This experimental feature provides parallel writing into multiple pickle files which are
defined by glob pattern, otherwise (without glob pattern) default pandas implementation is used.
Parameters
----------
filepath_or_buffer : str, path object or file-like object
File path where the pickled object will be stored.
compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default: 'infer'
A string representing the compression to use in the output file. By
default, infers from the file extension in specified path.
Compression mode may be any of the following possible
values: {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}. If compression
mode is 'infer' and path_or_buf is path-like, then detect
compression mode from the following extensions:
'.gz', '.bz2', '.zip' or '.xz'. (otherwise no compression).
If dict given and mode is 'zip' or inferred as 'zip', other entries
passed as additional compression options.
protocol : int, default: pickle.HIGHEST_PROTOCOL
Int which indicates which protocol should be used by the pickler,
default HIGHEST_PROTOCOL (see `pickle docs <https://docs.python.org/3/library/pickle.html>`_
paragraph 12.1.2 for details). The possible values are 0, 1, 2, 3, 4, 5. A negative value
for the protocol parameter is equivalent to setting its value to HIGHEST_PROTOCOL.
storage_options : dict, optional
Extra options that make sense for a particular storage connection, e.g.
host, port, username, password, etc., if using a URL that will be parsed by
fsspec, e.g., starting "s3://", "gcs://". An error will be raised if providing
this argument with a non-fsspec URL. See the fsspec and backend storage
implementation docs for the set of allowed keys and values.
"""
from modin.experimental.pandas.io import to_pickle_distributed

to_pickle_distributed(
self._data,
filepath_or_buffer=filepath_or_buffer,
compression=compression,
protocol=protocol,
storage_options=storage_options,
)
5 changes: 4 additions & 1 deletion modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
try_cast_to_pandas,
)

from .accessor import CachedAccessor, SparseFrameAccessor
from .accessor import CachedAccessor, ExperimentalFunctions, SparseFrameAccessor
from .base import _ATTRS_NO_LOOKUP, BasePandasDataset
from .groupby import DataFrameGroupBy
from .iterator import PartitionIterator
Expand Down Expand Up @@ -3169,3 +3169,6 @@ def __reduce__(self):
return self._inflate_light, (self._query_compiler, pid)

# Persistance support methods - END

# Namespace for experimental functions
_exp = CachedAccessor("_exp", ExperimentalFunctions)
11 changes: 11 additions & 0 deletions modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2366,3 +2366,14 @@ def test_groupby_index_dtype(self):
assert res_dtypes._known_dtypes["a"] == np.dtype(int)

patch.assert_not_called()


def test_enable_exp_mode_utility():
df = pd.DataFrame([1, 2, 3, 4, 5])
with pytest.raises(ValueError, match="This only works in experimental mode"):
_ = df._exp

import modin.utils

Check notice

Code scanning / CodeQL

Module is imported with 'import' and 'import from' Note test

Module 'modin.utils' is imported with both 'import' and 'import from'.

with modin.utils.enable_exp_mode():
_ = df._exp
39 changes: 39 additions & 0 deletions modin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Collection of general utility functions, mostly for internal use."""

import codecs
import contextlib
import functools
import importlib
import inspect
Expand All @@ -27,6 +28,7 @@
from typing import (
Any,
Callable,
Generator,
Iterable,
List,
Mapping,
Expand Down Expand Up @@ -628,6 +630,43 @@ def execute(*objs: Iterable[Any], trigger_hdk_import: bool = False) -> None:
query_compiler.force_import()


@contextlib.contextmanager
def enable_exp_mode() -> Generator[None, None, None]: # noqa: MD02
"""
Enable experimental mode.
Similar to using `export MODIN_EXPERIMENTAL=True`, but allows to enable
experimental functionality for a small section of code.
Returns
-------
Generator[None, None, None]
Examples
--------
>>> import modin.pandas as pd
>>> import modin.utils
>>> import numpy as np
>>> df = pd.DataFrame(np.random.rand(100, 32))
>>> with modin.utils.enable_exp_mode():
>>> from modin.experimental.pandas import read_pickle_distributed
>>> df._exp.to_pickle_distributed("transactions_train_new*.pkl")
>>> new_df = read_pickle_distributed("transactions_train_new*.pkl")
"""
from modin.config import IsExperimental
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

old_value = IsExperimental.get()
try:
IsExperimental.put(True)
FactoryDispatcher._update_factory_if_already_initialized()
yield
finally:
IsExperimental.put(old_value or False)
# cleanup potential side effects from importing `modin.experimental.pandas`
FactoryDispatcher._update_factory_if_already_initialized()


def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]:
"""
Wrap a sequence of passed values in a flattened list.
Expand Down

0 comments on commit e1caebb

Please sign in to comment.