diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml new file mode 100644 index 0000000..9ec2459 --- /dev/null +++ b/.github/workflows/linter.yml @@ -0,0 +1,41 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python + +name: Python Linter And Unittest + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +permissions: + contents: read + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.8 + uses: actions/setup-python@v5 + with: + python-version: "3.8" + - name: Install dependencies + # TODO: add Scaler as a test dependency once it's published to Pypi + run: | + python -m pip install --upgrade pip + pip install flake8 pyproject-flake8 mypy + pip install -r requirements.txt + pip install pandas dask[distributed] + - name: Lint with flake8 + run: | + pflake8 . + - name: Lint with MyPy + run: | + mypy . + - name: Run python unittest + run: | + python -m unittest discover -v tests diff --git a/parfun/about.py b/parfun/about.py index 35504ed..52b23b0 100644 --- a/parfun/about.py +++ b/parfun/about.py @@ -1 +1 @@ -__version__ = "6.0.5" +__version__ = "6.0.6" diff --git a/parfun/backend/dask.py b/parfun/backend/dask.py index 4e10541..265789e 100644 --- a/parfun/backend/dask.py +++ b/parfun/backend/dask.py @@ -1,7 +1,7 @@ import abc from contextlib import contextmanager from threading import BoundedSemaphore -from typing import ContextManager, Optional +from typing import Generator, Optional try: from dask.distributed import Client, Future, LocalCluster, worker_client @@ -30,7 +30,7 @@ def __enter__(self) -> "DaskSession": def __exit__(self, exc_type, exc_val, exc_tb) -> None: return None - def submit(self, fn, *args, **kwargs) -> Optional[Future]: + def submit(self, fn, *args, **kwargs) -> Optional[ProfiledFuture]: with profile() as submit_duration: future = ProfiledFuture() @@ -38,7 +38,7 @@ def submit(self, fn, *args, **kwargs) -> Optional[Future]: if not acquired: return None - with self._engine.executor() as executor: + with self._engine.executor() as executor: # type: ignore[var-annotated] underlying_future = executor.submit(timed_function, fn, *args, **kwargs) def on_done_callback(underlying_future: Future): @@ -82,7 +82,7 @@ def session(self) -> DaskSession: @abc.abstractmethod @contextmanager - def executor(self) -> ContextManager[ClientExecutor]: + def executor(self) -> Generator[ClientExecutor, None, None]: raise NotImplementedError def allows_nested_tasks(self) -> bool: @@ -101,7 +101,7 @@ def __init__(self, scheduler_address: str): self._executor = self._client.get_executor() @contextmanager - def executor(self) -> ContextManager[ClientExecutor]: + def executor(self) -> Generator[ClientExecutor, None, None]: yield self._executor def get_scheduler_address(self) -> str: @@ -141,7 +141,7 @@ def __init__(self, n_workers: int) -> None: super().__init__(n_workers) @contextmanager - def executor(self) -> ContextManager[ClientExecutor]: + def executor(self) -> Generator[ClientExecutor, None, None]: with worker_client() as client: yield client.get_executor() diff --git a/parfun/backend/local_multiprocessing.py b/parfun/backend/local_multiprocessing.py index d3a3b41..54b9bdf 100644 --- a/parfun/backend/local_multiprocessing.py +++ b/parfun/backend/local_multiprocessing.py @@ -17,7 +17,7 @@ class LocalMultiprocessingSession(BackendSession): def __init__(self, underlying_executor: Executor): self._underlying_executor = underlying_executor - self._concurrent_task_guard = BoundedSemaphore(underlying_executor._max_workers) + self._concurrent_task_guard = BoundedSemaphore(underlying_executor._max_workers) # type: ignore[attr-defined] def __enter__(self) -> "LocalMultiprocessingSession": return self @@ -73,7 +73,7 @@ def on_done_callback(underlying_future: Future): return future -@attrs.define +@attrs.define(init=False) class LocalMultiprocessingBackend(BackendEngine): """ A concurrent engine that shares a similar interface to :py:class:`concurrent.futures.Executor`, but that blocks when diff --git a/parfun/backend/scaler.py b/parfun/backend/scaler.py index 4ad796a..97e3b9b 100644 --- a/parfun/backend/scaler.py +++ b/parfun/backend/scaler.py @@ -1,10 +1,10 @@ import inspect -from concurrent.futures import Future from threading import BoundedSemaphore from typing import Any, Optional, Set try: from scaler import Client, SchedulerClusterCombo + from scaler.client.future import ScalerFuture from scaler.client.object_reference import ObjectReference except ImportError: raise ImportError("Scaler dependency missing. Use `pip install 'parfun[scaler]'` to install Scaler.") @@ -34,7 +34,7 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: def preload_value(self, value: Any) -> ObjectReference: return self.client.send_object(value) - def submit(self, fn, *args, **kwargs) -> Optional[Future]: + def submit(self, fn, *args, **kwargs) -> Optional[ProfiledFuture]: with profile() as submit_duration: future = ProfiledFuture() @@ -44,7 +44,7 @@ def submit(self, fn, *args, **kwargs) -> Optional[Future]: underlying_future = self.client.submit(fn, *args, **kwargs) - def on_done_callback(underlying_future: Future): + def on_done_callback(underlying_future: ScalerFuture): assert submit_duration.value is not None if underlying_future.cancelled(): @@ -56,17 +56,7 @@ def on_done_callback(underlying_future: Future): if exception is None: result = underlying_future.result() - - # New for scaler>=1.5.0: task_duration is removed and replaced with profiling_info() - - function_duration = int( - ( - underlying_future.task_duration - if hasattr(underlying_future, "task_duration") - else underlying_future.profiling_info().duration_s - ) - * 1_000_000_000 - ) + function_duration = int(underlying_future.profiling_info().cpu_time_s * 1_000_000_000) else: function_duration = 0 result = None @@ -97,21 +87,28 @@ def __init__( allows_nested_tasks: bool = True, **client_kwargs, ): - self._scheduler_address = scheduler_address - self._n_workers = n_workers - self._allows_nested_tasks = allows_nested_tasks - self._client_kwargs = client_kwargs + self.__setstate__( + { + "scheduler_address": scheduler_address, + "n_workers": n_workers, + "allows_nested_tasks": allows_nested_tasks, + "client_kwargs": client_kwargs, + } + ) def __getstate__(self) -> dict: return { "scheduler_address": self._scheduler_address, "n_workers": self._n_workers, "allows_nested_tasks": self._allows_nested_tasks, - **self._client_kwargs, + "client_kwargs": self._client_kwargs, } def __setstate__(self, state: dict) -> None: - self.__init__(**state) + self._scheduler_address = state["scheduler_address"] + self._n_workers = state["n_workers"] + self._allows_nested_tasks = state["allows_nested_tasks"] + self._client_kwargs = state["client_kwargs"] def session(self) -> ScalerSession: return ScalerSession(self._scheduler_address, self._n_workers, **self._client_kwargs) @@ -149,15 +146,6 @@ def __init__( scheduler_port = get_available_tcp_port() scheduler_address = f"tcp://127.0.0.1:{scheduler_port}" - scheduler_cluster_combo_kwargs = self.__get_constructor_arg_names(SchedulerClusterCombo) - - self._cluster = SchedulerClusterCombo( - address=scheduler_address, - n_workers=n_workers, - per_worker_queue_size=per_worker_queue_size, - **{kwarg: value for kwarg, value in kwargs.items() if kwarg in scheduler_cluster_combo_kwargs}, - ) - client_kwargs = self.__get_constructor_arg_names(Client) super().__init__( @@ -167,8 +155,17 @@ def __init__( **{kwarg: value for kwarg, value in kwargs.items() if kwarg in client_kwargs}, ) + scheduler_cluster_combo_kwargs = self.__get_constructor_arg_names(SchedulerClusterCombo) + + self._cluster = SchedulerClusterCombo( + address=scheduler_address, + n_workers=n_workers, + per_worker_queue_size=per_worker_queue_size, + **{kwarg: value for kwarg, value in kwargs.items() if kwarg in scheduler_cluster_combo_kwargs}, + ) + def __setstate__(self, state: dict) -> None: - super().__init__(**state) + super().__setstate__(state) self._cluster = None # Unserialized instances have no cluster reference. @property diff --git a/parfun/combine/collection.py b/parfun/combine/collection.py index db8192b..8d14f0d 100644 --- a/parfun/combine/collection.py +++ b/parfun/combine/collection.py @@ -37,7 +37,7 @@ def concat_lists(values: Iterable[List[ListValue]]) -> List[ListValue]: return list_concat(values) -def unzip(iterable: Iterable[Tuple]) -> Tuple[Iterable]: +def unzip(iterable: Iterable[Tuple]) -> Tuple[Iterable, ...]: """ Opposite of zip(). diff --git a/parfun/decorators.py b/parfun/decorators.py index 3fec5c4..440e41a 100644 --- a/parfun/decorators.py +++ b/parfun/decorators.py @@ -1,24 +1,26 @@ """ A decorator that helps users run their functions in parallel. """ + import importlib from functools import wraps from typing import Callable, Iterable, Optional, Tuple, Union +from parfun.kernel.function_signature import NamedArguments from parfun.kernel.parallel_function import ParallelFunction from parfun.object import FunctionInputType, FunctionOutputType, PartitionType -from parfun.partition.object import PartitionFunction +from parfun.partition.object import PartitionFunction, PartitionGenerator from parfun.partition_size_estimator.linear_regression_estimator import LinearRegessionEstimator from parfun.partition_size_estimator.mixins import PartitionSizeEstimator def parfun( combine_with: Callable[[Iterable[FunctionOutputType]], FunctionOutputType], - split: Optional[PartitionFunction[PartitionType]] = None, + split: Optional[Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]] = None, partition_on: Optional[Union[str, Tuple[str, ...]]] = None, partition_with: Optional[PartitionFunction[PartitionType]] = None, - initial_partition_size: Optional[Union[int, Callable[[PartitionType], int]]] = None, - fixed_partition_size: Optional[Union[int, Callable[[PartitionType], int]]] = None, + initial_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None, + fixed_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None, profile: bool = False, trace_export: Optional[str] = None, partition_size_estimator_factory: Callable[[], PartitionSizeEstimator] = LinearRegessionEstimator, diff --git a/parfun/entry_point.py b/parfun/entry_point.py index dd02705..01ced74 100644 --- a/parfun/entry_point.py +++ b/parfun/entry_point.py @@ -7,7 +7,7 @@ import logging import os from contextvars import ContextVar, Token -from typing import Optional, Union +from typing import Callable, Dict, Optional, Union from parfun.backend.local_multiprocessing import LocalMultiprocessingBackend from parfun.backend.local_single_process import LocalSingleProcessBackend @@ -15,7 +15,7 @@ _backend_engine: ContextVar[Optional[BackendEngine]] = ContextVar("_backend_engine", default=None) -BACKEND_REGISTRY = { +BACKEND_REGISTRY: Dict[str, Callable] = { "none": lambda *_args, **_kwargs: None, "local_single_process": LocalSingleProcessBackend, "local_multiprocessing": LocalMultiprocessingBackend, diff --git a/parfun/functions.py b/parfun/functions.py index 83f6034..8285c58 100644 --- a/parfun/functions.py +++ b/parfun/functions.py @@ -1,9 +1,9 @@ import collections import logging import time -from typing import Any, Callable, Iterable, Optional, Tuple +from typing import Any, Callable, Deque, Iterable, Optional, Tuple -from parfun.backend.mixins import BackendSession +from parfun.backend.mixins import BackendSession, ProfiledFuture from parfun.entry_point import get_parallel_backend from parfun.profiler.object import TraceTime @@ -28,7 +28,7 @@ def parallel_timed_map( # Uses a generator function, so that we can use deque.pop() and thus discard the no longer required futures' # references as we yield them. def result_generator(backend_session: BackendSession): - futures = collections.deque() + futures: Deque[ProfiledFuture] = collections.deque() try: for args in zip(*iterables): diff --git a/parfun/kernel/parallel_function.py b/parfun/kernel/parallel_function.py index 9d43137..79dd1e0 100644 --- a/parfun/kernel/parallel_function.py +++ b/parfun/kernel/parallel_function.py @@ -11,14 +11,14 @@ from parfun.kernel.function_signature import FunctionSignature, NamedArguments from parfun.object import FunctionInputType, FunctionOutputType, PartitionType from parfun.partition.api import multiple_arguments -from parfun.partition.object import PartitionFunction +from parfun.partition.object import PartitionFunction, PartitionGenerator from parfun.partition_size_estimator.linear_regression_estimator import LinearRegessionEstimator from parfun.partition_size_estimator.mixins import PartitionSizeEstimator from parfun.profiler.functions import export_task_trace, print_profile_trace, timed_combine_with, timed_partition from parfun.profiler.object import PartitionedTaskTrace -@attrs.define +@attrs.define(init=False) class ParallelFunction: """Wraps a function so that it executes in parallel using a map-reduce/scatter-gather approach. @@ -29,19 +29,15 @@ class ParallelFunction: combine_with: Callable[[Iterable[FunctionOutputType]], FunctionOutputType] = attrs.field() - split: PartitionFunction[NamedArguments] = attrs.field() + split: Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]] = attrs.field() - function_name: Optional[str] = attrs.field(default=None) + function_name: str = attrs.field() - initial_partition_size: Optional[Callable[[FunctionInputType], int]] = attrs.field(default=None) - fixed_partition_size: Optional[Callable[[FunctionInputType], int]] = attrs.field(default=None) + initial_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = attrs.field() + fixed_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = attrs.field() - profile: bool = attrs.field(default=False) - trace_export: Optional[str] = attrs.field(default=None) - - partition_size_estimator_factory: Callable[[], PartitionSizeEstimator] = attrs.field( - default=LinearRegessionEstimator - ) + profile: bool = attrs.field() + trace_export: Optional[str] = attrs.field() _partition_size_estimator: Optional[PartitionSizeEstimator] = attrs.field(init=False, default=None) @@ -50,13 +46,17 @@ class ParallelFunction: def __init__( self, - *args, - split: Optional[PartitionFunction[NamedArguments]] = None, + function: Callable[[FunctionInputType], FunctionOutputType], + function_name: str, + combine_with: Callable[[Iterable[FunctionOutputType]], FunctionOutputType], + split: Optional[Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]] = None, partition_on: Optional[Union[str, Tuple[str, ...]]] = None, partition_with: Optional[PartitionFunction[PartitionType]] = None, - initial_partition_size: Optional[Callable[[FunctionInputType], int]] = None, - fixed_partition_size: Optional[Callable[[FunctionInputType], int]] = None, - **kwargs, + initial_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None, + fixed_partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] = None, + profile: bool = False, + trace_export: Optional[str] = None, + partition_size_estimator_factory: Callable[[], PartitionSizeEstimator] = LinearRegessionEstimator, ): if (partition_on is None) != (partition_with is None): raise ValueError("`partition_on` and `partition_with` should be both simultaneously set or None.") @@ -76,12 +76,15 @@ def __init__( initial_partition_size = ParallelFunction._legacy_partition_size(partition_on, initial_partition_size) fixed_partition_size = ParallelFunction._legacy_partition_size(partition_on, fixed_partition_size) - self.__attrs_init__( - *args, + self.__attrs_init__( # type: ignore[attr-defined] + function=function, + function_name=function_name, + combine_with=combine_with, split=split, initial_partition_size=initial_partition_size, fixed_partition_size=fixed_partition_size, - **kwargs, + profile=profile, + trace_export=trace_export, ) self._function_signature = FunctionSignature.from_function(self.function) @@ -90,7 +93,7 @@ def __init__( raise ValueError("`initial_partition_size` and `fixed_partition_size` cannot be set simultaneously.") if self.fixed_partition_size is None: - self._partition_size_estimator = self.partition_size_estimator_factory() + self._partition_size_estimator = partition_size_estimator_factory() self._validate_function_signature() @@ -194,15 +197,15 @@ def _get_user_partition_sizes(self, args, kwargs) -> Tuple[Optional[int], Option @staticmethod def _legacy_partition_with( partition_on: Union[str, Tuple[str, ...]], partition_with: PartitionFunction[PartitionType] - ) -> PartitionFunction[NamedArguments]: + ) -> Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]: """Implements the legacy `partition_on` and `partition_with` API using the newer `split` interface.""" return multiple_arguments(partition_on, partition_with) @staticmethod def _legacy_partition_size( - partition_on: Tuple[str, ...], partition_size: Optional[Callable[[FunctionInputType], int]] - ) -> Optional[Callable[[FunctionInputType], int]]: + partition_on: Tuple[str, ...], partition_size: Optional[Union[int, Callable[[FunctionInputType], int]]] + ) -> Optional[Union[int, Callable[[FunctionInputType], int]]]: """ Implements the legacy behaviour of `initial_partition_size` and `fixed_partition_size` when used with `partition_on` and `partition_with` API. @@ -214,7 +217,8 @@ def _legacy_partition_size( # When the partition size argument is a callable, the old API only passes the `partition_on` values as # positional arguments. - def legacy_partition_size(**kwargs): + def legacy_partition_size(*args, **kwargs): + assert len(args) == 0 partition_args = [kwargs[arg_name] for arg_name in partition_on] return partition_size(*partition_args) diff --git a/parfun/object.py b/parfun/object.py index 6ce1f11..0841ba2 100644 --- a/parfun/object.py +++ b/parfun/object.py @@ -1,7 +1,7 @@ -from typing import TypeVar +from typing import Any, TypeVar # TODO we can specify and limit their values in future -FunctionInputType = any -FunctionOutputType = any +FunctionInputType = Any +FunctionOutputType = Any PartitionType = TypeVar("PartitionType") # Input and output are identical for partitioning functions. diff --git a/parfun/partition/api.py b/parfun/partition/api.py index 958a26b..9fb5545 100644 --- a/parfun/partition/api.py +++ b/parfun/partition/api.py @@ -1,13 +1,15 @@ from collections import OrderedDict from itertools import chain -from typing import Tuple, Union +from typing import Callable, Tuple, Union from parfun.kernel.function_signature import NamedArguments from parfun.partition.object import PartitionFunction, PartitionGenerator from parfun.partition.primitives import partition_map, partition_zip -def per_argument(**partition_arg_with: PartitionFunction) -> PartitionFunction[NamedArguments]: +def per_argument( + **partition_arg_with: PartitionFunction, +) -> Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]: """ Applies multiple partitioning functions simultaneously on different function arguments, similarly to Python's :py:func:`zip`. @@ -27,7 +29,7 @@ def func(df: pd.DataFrame, xs: List, constant: int): partition_arg_names = set(partition_arg_with.keys()) - def partitioning_function(named_args: NamedArguments) -> Tuple[NamedArguments, PartitionGenerator]: + def partitioning_function(named_args: NamedArguments) -> Tuple[NamedArguments, PartitionGenerator[NamedArguments]]: # Applies all partition functions simultaneously using `partition_zip()`, and then rebuilds the `NamedArguments` # object with the partitioned values. @@ -41,7 +43,9 @@ def reassign_partitioned_arguments(*partitioned_values) -> NamedArguments: partition_arg_with[arg_name](partitioned_args[arg_name]) for arg_name in partition_arg_names ] - generator = partition_map(reassign_partitioned_arguments, partition_zip(*partitioned_arg_generators)) + zipped = partition_zip(*partitioned_arg_generators) + + generator = partition_map(reassign_partitioned_arguments, zipped) # type: ignore[type-var] return non_partitioned_args, generator @@ -50,7 +54,7 @@ def reassign_partitioned_arguments(*partitioned_values) -> NamedArguments: def multiple_arguments( partition_on: Union[Tuple[str, ...], str], partition_with: PartitionFunction -) -> PartitionFunction[NamedArguments]: +) -> Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]: """ Applies a single partitioning function to multiple arguments. @@ -87,14 +91,16 @@ def partitioning_function(named_args: NamedArguments) -> Tuple[NamedArguments, P generator = partition_map( lambda *partitioned_values: partitioned_args.reassigned(**dict(zip(partition_on, partitioned_values))), partition_with(*arg_values), - ) + ) # type: ignore[type-var] return non_partitioned_args, generator return partitioning_function -def all_arguments(partition_with: PartitionFunction) -> PartitionFunction[NamedArguments]: +def all_arguments( + partition_with: PartitionFunction, +) -> Callable[[NamedArguments], Tuple[NamedArguments, PartitionGenerator[NamedArguments]]]: """ Applies a single partitioning function to all arguments. diff --git a/parfun/partition/collection.py b/parfun/partition/collection.py index 5afee74..cf2c641 100644 --- a/parfun/partition/collection.py +++ b/parfun/partition/collection.py @@ -3,12 +3,12 @@ """ import logging -from typing import Any, Iterable, Tuple +from typing import Iterable, Tuple -from parfun.partition.object import PartitionGenerator +from parfun.partition.object import PartitionGenerator, PartitionType -def list_by_chunk(*iterables: Iterable) -> PartitionGenerator[Tuple[Any, ...]]: +def list_by_chunk(*iterables: Iterable[PartitionType]) -> PartitionGenerator[Tuple[Iterable[PartitionType], ...]]: """ Partition one or multiple iterables by chunks of identical sizes. @@ -41,7 +41,7 @@ def list_by_chunk(*iterables: Iterable) -> PartitionGenerator[Tuple[Any, ...]]: yield len(partition), tuple(zip(*partition)) -def lists_by_chunk(*iterables: Iterable) -> PartitionGenerator[Tuple[Any, ...]]: +def lists_by_chunk(*iterables: Iterable[PartitionType]) -> PartitionGenerator[Tuple[Iterable[PartitionType], ...]]: logging.warning( f"`{lists_by_chunk.__name__}` will be removed in a future version, use " + f"`{list_by_chunk.__name__}` instead." @@ -50,7 +50,9 @@ def lists_by_chunk(*iterables: Iterable) -> PartitionGenerator[Tuple[Any, ...]]: return list_by_chunk(*iterables) -def zip_partition_on_args(*iterable) -> PartitionGenerator[Tuple[Any, ...]]: +def zip_partition_on_args( + *iterable: Iterable[PartitionType], +) -> PartitionGenerator[Tuple[Iterable[PartitionType], ...]]: logging.warning( f"`{zip_partition_on_args.__name__}` will be removed in a future version, use " + f"`{list_by_chunk.__name__}` instead." diff --git a/parfun/partition/dataframe.py b/parfun/partition/dataframe.py index 7324228..39ce4c7 100644 --- a/parfun/partition/dataframe.py +++ b/parfun/partition/dataframe.py @@ -47,7 +47,7 @@ def df_by_row(*dfs: pd.DataFrame) -> PartitionGenerator[Tuple[pd.DataFrame, ...] __validate_dfs_parameter(*dfs) - chunk_size = yield + chunk_size = yield None def dfs_chunk(rng_start: int, rng_end: int) -> Tuple[pd.DataFrame, ...]: return tuple(df.iloc[rng_start:rng_end] for df in dfs) @@ -65,7 +65,7 @@ def dfs_chunk(rng_start: int, rng_end: int) -> Tuple[pd.DataFrame, ...]: yield total_size - range_start, dfs_chunk(range_start, total_size) -def df_by_group(*args, **kwargs) -> PartitionFunction: +def df_by_group(*args, **kwargs) -> PartitionFunction[pd.DataFrame]: """ Partitions one or multiple Pandas dataframes by groups of identical numbers of rows, similar to :py:func:`pandas.DataFrame.groupby`. @@ -97,23 +97,23 @@ def df_by_group(*args, **kwargs) -> PartitionFunction: def generator(*dfs: pd.DataFrame) -> PartitionGenerator[Tuple[pd.DataFrame, ...]]: __validate_dfs_parameter(*dfs) - groups: Iterable[Tuple[pd.DataFrame]] = zip( + groups: Iterable[Tuple[pd.DataFrame, ...]] = zip( *((group for _name, group in df.groupby(*args, **kwargs)) for df in dfs) ) it = iter(groups) - chunked_group = tuple([] for _ in range(0, len(dfs))) + chunked_group: Tuple[List[pd.DataFrame], ...] = tuple([] for _ in range(0, len(dfs))) chunked_group_size: int = 0 - target_chunk_size = yield + target_chunk_size = yield None def concat_chunked_group_dfs(chunked_group: Tuple[List[pd.DataFrame], ...]): return tuple(pd.concat(chunked_dfs) for chunked_dfs in chunked_group) while True: try: - group: Tuple[pd.DataFrame] = next(it) + group = next(it) assert isinstance(group, tuple) assert isinstance(group[0], pd.DataFrame) diff --git a/parfun/partition/nested.py b/parfun/partition/nested.py index 79cb049..a2d951b 100644 --- a/parfun/partition/nested.py +++ b/parfun/partition/nested.py @@ -6,7 +6,7 @@ def partition_nested( *columns_partitions: Tuple[Union[Tuple[str, ...], str], PartitionFunction[Tuple]] -) -> PartitionFunction[Dict[str, Any]]: +) -> Callable[..., PartitionGenerator[Dict[str, Any]]]: """ Creates a new partitioning function from a collection of nested partitioning functions that are individually applied to some of the input arguments. @@ -66,17 +66,17 @@ def generator(**kwargs) -> PartitionGenerator[Dict[str, Any]]: **_updated_partitioned_kwargs(kwargs, current_arg_names, partitioned_values) ), current_generator, - ) + ) # type: ignore[type-var, return-value] else: return partition_map( lambda *partitioned_values: _updated_partitioned_kwargs(kwargs, current_arg_names, partitioned_values), current_generator, - ) + ) # type: ignore[type-var, return-value] return generator def _updated_partitioned_kwargs( - kwargs: Dict[str, Any], arg_names: Tuple[str], partitioned_values: Any + kwargs: Dict[str, Any], arg_names: Tuple[str, ...], partitioned_values: Any ) -> Dict[str, Any]: return {**kwargs, **dict(zip(arg_names, partitioned_values))} diff --git a/parfun/partition/object.py b/parfun/partition/object.py index 20ef6f2..2c0885f 100644 --- a/parfun/partition/object.py +++ b/parfun/partition/object.py @@ -2,15 +2,17 @@ from parfun.object import PartitionType +SimplePartitionIterator = Generator[PartitionType, None, None] + SmartPartitionGenerator = Generator[Optional[Tuple[int, PartitionType]], int, None] -PartitionGenerator = Union[Generator[PartitionType, None, None], SmartPartitionGenerator[PartitionType]] +PartitionGenerator = Union[SimplePartitionIterator[PartitionType], SmartPartitionGenerator[PartitionType]] """ All partitioning functions must return a Python generator of this type. There are two ways of writing a partitioning functions: -* Use regular Python generators returning partitioned values: +* Use regular Python generators (prefered) or iterators, returning partitioned values: .. code:: python @@ -47,4 +49,4 @@ def partition_list_by_chunks(values: List, constant: int) -> PartitionGenerator[ """ -PartitionFunction = Callable[[PartitionType], Tuple[PartitionType, PartitionGenerator[PartitionType]]] +PartitionFunction = Callable[..., PartitionGenerator[PartitionType]] diff --git a/parfun/partition/primitives.py b/parfun/partition/primitives.py index 7ed8764..9a5c73f 100644 --- a/parfun/partition/primitives.py +++ b/parfun/partition/primitives.py @@ -1,16 +1,16 @@ -from typing import Callable, Generator, Optional, Sized, Tuple, TypeVar +from typing import Callable, Optional, Sequence, Tuple, TypeVar, cast -from parfun.partition.object import PartitionGenerator, PartitionType, SmartPartitionGenerator +from parfun.partition.object import PartitionGenerator, PartitionType, SmartPartitionGenerator, SimplePartitionIterator -InputPartitionType = TypeVar("InputPartitionType") +InputPartitionType = TypeVar("InputPartitionType", bound=Tuple) OutputPartitionType = TypeVar("OutputPartitionType", bound=Tuple) def partition_map( - func: Callable[[InputPartitionType], OutputPartitionType], generator: PartitionGenerator[InputPartitionType] + func: Callable[..., OutputPartitionType], generator: PartitionGenerator[InputPartitionType] ) -> PartitionGenerator[OutputPartitionType]: """ - Same as Python's built-in ``map()``, but works on ``PartitionGenerator``s. + Same as Python's built-in ``map()``, but works on partition generators. .. code:: python @@ -24,19 +24,23 @@ def partition_map( """ try: - first_value = next(generator) + first_value = cast(Optional[InputPartitionType], next(generator)) if first_value is not None: # This is a regular generator + simple_generator = cast(SimplePartitionIterator[InputPartitionType], generator) + yield func(*first_value) while True: - yield func(*next(generator)) + yield func(*next(simple_generator)) else: + smart_generator = cast(SmartPartitionGenerator[InputPartitionType], generator) + requested_partition_size = yield None while True: - value = generator.send(requested_partition_size) + value = smart_generator.send(requested_partition_size) _validate_smart_partition_value(value) partition_size, partition = value @@ -72,21 +76,23 @@ def partition_flatmap( """ try: - first_value = next(generator) + first_value = cast(Optional[InputPartitionType], next(generator)) except StopIteration: return if first_value is not None: # The parent generator is a regular generator - yield from _partition_flatmap_regular_generator(func, first_value, generator) + simple_generator = cast(SimplePartitionIterator[InputPartitionType], generator) + yield from _partition_flatmap_regular_generator(func, first_value, simple_generator) else: - yield from _partition_flatmap_smart_generator(func, generator) + smart_generator = cast(SmartPartitionGenerator[InputPartitionType], generator) + yield from _partition_flatmap_smart_generator(func, smart_generator) def _partition_flatmap_regular_generator( func: Callable[[InputPartitionType], PartitionGenerator[OutputPartitionType]], first_value: InputPartitionType, - generator: PartitionGenerator[InputPartitionType], + generator: SimplePartitionIterator[InputPartitionType], ) -> PartitionGenerator[OutputPartitionType]: """ `partition_flatmap()` specialisation for parent generators that are regular Python generators. @@ -97,12 +103,13 @@ def _partition_flatmap_regular_generator( def iterate_nested_generator( nested_generator: PartitionGenerator[OutputPartitionType], requested_partition_size: Optional[int] = None - ) -> Generator: + ): try: - first_value = next(nested_generator) + first_value = cast(Optional[OutputPartitionType], next(nested_generator)) if first_value is not None: # This is a regular generator + nested_simple_generator = cast(SimplePartitionIterator[OutputPartitionType], nested_generator) if requested_partition_size is not None: raise ValueError( @@ -110,13 +117,15 @@ def iterate_nested_generator( ) yield first_value - yield from nested_generator + yield from nested_simple_generator else: + nested_smart_generator = cast(SmartPartitionGenerator[OutputPartitionType], nested_generator) + if requested_partition_size is None: # First nested call value. requested_partition_size = yield None while True: - value = nested_generator.send(requested_partition_size) + value = nested_smart_generator.send(requested_partition_size) _validate_smart_partition_value(value) partition_size, partition = value @@ -138,7 +147,7 @@ def iterate_nested_generator( def _partition_flatmap_smart_generator( func: Callable[[InputPartitionType], PartitionGenerator[OutputPartitionType]], - generator: PartitionGenerator[InputPartitionType], + generator: SmartPartitionGenerator[InputPartitionType], ) -> SmartPartitionGenerator[OutputPartitionType]: """ `partition_flatmap()` specialisation for parent generators that are smart generators. @@ -150,7 +159,7 @@ def iterate_nested_generator( nested_generator: PartitionGenerator[OutputPartitionType], requested_partition_size: int, parent_partition_size: int, - ) -> Generator: + ): total_size = 0 try: @@ -158,17 +167,21 @@ def iterate_nested_generator( if nested_value is not None: # This is a regular nested generator + nested_simple_generator = cast(SimplePartitionIterator[OutputPartitionType], nested_generator) + while True: total_size += 1 requested_partition_size = yield parent_partition_size, nested_value - nested_value = next(nested_generator) + nested_value = next(nested_simple_generator) else: # This is a smart nested generator + nested_smart_generator = cast(SmartPartitionGenerator[OutputPartitionType], nested_generator) + while True: nested_requested_partition_size = max(1, round(requested_partition_size / parent_partition_size)) - nested_value = nested_generator.send(nested_requested_partition_size) + nested_value = nested_smart_generator.send(nested_requested_partition_size) _validate_smart_partition_value(nested_value) nested_partition_size, nested_partition = nested_value @@ -243,7 +256,7 @@ def partition_zip(*generators: PartitionGenerator) -> PartitionGenerator[Tuple]: if not is_smart[i]: continue - value = generator.send(requested_partition_size) + value = cast(SmartPartitionGenerator, generator).send(requested_partition_size) _validate_partition_zip_smart_partition_value(value, partition_size) partition_size, first_values[i] = value @@ -260,7 +273,7 @@ def partition_zip(*generators: PartitionGenerator) -> PartitionGenerator[Tuple]: for i, generator in enumerate(generators): if is_smart[i]: - value = generator.send(requested_partition_size) + value = cast(SmartPartitionGenerator, generator).send(requested_partition_size) _validate_partition_zip_smart_partition_value(value, partition_size) partition_size, partition = value else: @@ -295,7 +308,7 @@ def _validate_partition_zip_smart_partition_value( def _validate_smart_partition_value(value): - if not isinstance(value, Sized) or len(value) != 2: + if not isinstance(value, Sequence) or len(value) != 2: raise ValueError("partition generator should yield a partition with its size.") partition_size, _ = value diff --git a/parfun/partition/utility.py b/parfun/partition/utility.py index 68b7585..bd30953 100644 --- a/parfun/partition/utility.py +++ b/parfun/partition/utility.py @@ -1,7 +1,7 @@ -from typing import Callable, Generator, Union +from typing import Callable, Generator, Optional, Union, cast from parfun.object import PartitionType -from parfun.partition.object import PartitionGenerator +from parfun.partition.object import PartitionGenerator, SmartPartitionGenerator, SimplePartitionIterator def with_partition_size( @@ -26,13 +26,17 @@ def with_partition_size( """ try: - first_value = next(generator) + first_value = cast(Optional[PartitionType], next(generator)) if first_value is not None: # This is a regular generator + simple_generator = cast(SimplePartitionIterator[PartitionType], generator) + yield first_value - yield from generator + yield from simple_generator else: + smart_generator = cast(SmartPartitionGenerator[PartitionType], generator) + while True: if isinstance(partition_size, int): current_partition_size = partition_size @@ -40,7 +44,7 @@ def with_partition_size( assert callable(partition_size) current_partition_size = partition_size() - value = generator.send(current_partition_size) + value = smart_generator.send(current_partition_size) if value is None or len(value) != 2: raise ValueError("partition generator should yield a partition with its size.") diff --git a/parfun/partition_size_estimator/linear_regression_estimator.py b/parfun/partition_size_estimator/linear_regression_estimator.py index 3b72489..c05a02f 100644 --- a/parfun/partition_size_estimator/linear_regression_estimator.py +++ b/parfun/partition_size_estimator/linear_regression_estimator.py @@ -5,7 +5,7 @@ import attrs import numpy as np -from attrs.validators import instance_of +from attrs.validators import instance_of, is_callable from sklearn.base import BaseEstimator from sklearn.linear_model import LinearRegression from sklearn.pipeline import Pipeline @@ -63,7 +63,7 @@ class LinearRegessionEstimator(PartitionSizeEstimator[LinearRegessionEstimate]): _run_traces: List[Tuple[int, int]] = attrs.field(init=False, factory=list) regressor_factory: Callable[[], BaseEstimator] = attrs.field( - validator=instance_of(Callable), default=lambda: LinearRegessionEstimator.default_regressor() + validator=is_callable(), default=lambda: LinearRegessionEstimator.default_regressor() ) _current_coefficients: Optional[LinearRegressionCoefficients] = attrs.field(default=None) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..5026b5e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,57 @@ +[build-system] +requires = ["setuptools", "setuptools-scm", "mypy", "black", "flake8", "pyproject-flake8"] +build-backend = "setuptools.build_meta" + +[project] +name = "parfun" +description = "Lightweight parallelisation library for Python" +requires-python = ">=3.8" +readme = { file = "README.md", content-type = "text/markdown" } +license = { text = "Apache 2.0" } +authors = [{ name = "Citi", email = "opensource@citi.com" }] +dynamic = ["dependencies", "version"] +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Intended Audience :: Developers", + "Operating System :: OS Independent", + "Topic :: System :: Distributed Computing", +] + +[project.urls] +Home = "https://github.com/Citi/parfun" +Issues = "https://github.com/Citi/parfun/issues" + +[project.optional-dependencies] +pandas = ["pandas"] +dask = ["dask", "distributed"] +scaler = ["scaler[graphblas]"] + +[tool.setuptools] +packages = ["parfun"] +include-package-data = true + +[tool.setuptools.dynamic] +dependencies = { file = "requirements.txt" } +version = { attr = "parfun.about.__version__" } + +[tool.mypy] +no_strict_optional = true +check_untyped_defs = true +ignore_missing_imports = true +exclude = [ + "^docs.*$", +] + +[tool.black] +line-length = 120 +skip-magic-trailing-comma = true + +[tool.flake8] +max-line-length = 120 +extend-ignore = "E203" +exclude = "venv312" + +[metadata] +long_description = { file = "README.md" } +long_description_content_type = "text/markdown" \ No newline at end of file diff --git a/setup.py b/setup.py deleted file mode 100644 index 89cb011..0000000 --- a/setup.py +++ /dev/null @@ -1,19 +0,0 @@ -from setuptools import find_packages, setup - -from parfun.about import __version__ - -with open("requirements.txt", "rt") as f: - requirements = [i.strip() for i in f.readlines()] - -setup( - name="parfun", - version=__version__, - packages=find_packages(exclude=("tests",)), - install_requires=requirements, - extras_require={"pandas": ["pandas"], "dask": ["dask", "distributed"], "scaler": ["scaler[graphblas]"]}, - url="", - license="", - author="Citi", - author_email="opensource@citi.com", - description="Parfun makes it easy to distribute the computation of Python functions using a map-reduce cluster.", -) diff --git a/tests/backend/mixins.py b/tests/backend/mixins.py index 04312da..23b8abb 100644 --- a/tests/backend/mixins.py +++ b/tests/backend/mixins.py @@ -1,13 +1,15 @@ import abc +import time +import timeit +import warnings +from collections import deque +from typing import Deque -from parfun.backend.mixins import BackendEngine -from tests.backend.utility import ( - is_backend_blocking, - is_backend_handling_exceptions, - is_backend_providing_speedup, - is_backend_supporting_nested_tasks, - is_task_duration_correct, -) +from parfun.backend.mixins import BackendEngine, ProfiledFuture +from parfun.functions import parallel_map +from parfun.profiler.functions import profile + +from tests.backend.utility import failure_task, nested_task, no_op_task, sleep class BackendEngineTestCase(metaclass=abc.ABCMeta): @@ -15,8 +17,8 @@ class BackendEngineTestCase(metaclass=abc.ABCMeta): Validates the requirements of the ``BackendEngine`` interface. """ - # Remark: the class cannot be an instance of unittest.TestCase as the library will try to instance it and execute - # it as a test case. + # Remark: the class cannot be an instance of TestCase as unittest will try to instance it and execute it as a test + # case. @abc.abstractmethod def backend(self) -> BackendEngine: @@ -27,16 +29,130 @@ def n_workers(self) -> int: raise NotImplementedError() def test_is_blocking(self): - is_backend_blocking(self, self.backend(), max_concurrency=self.n_workers()) + """Tests the backend `submit()` method and checks that it's blocking when exceeding `max_concurrency` concurrent + tasks.""" + + N_TASKS = self.n_workers() * 4 + DELAY = 0.5 + + n_concurrent_tasks = 0 + futures = [] + + def future_callback(_): + nonlocal n_concurrent_tasks + n_concurrent_tasks -= 1 + + with self.backend().session() as session: + for _ in range(0, N_TASKS): + n_concurrent_tasks += 1 + future = session.submit(sleep, DELAY) + + future.add_done_callback(future_callback) + + futures.append(future) + + self.assertLessEqual(n_concurrent_tasks, self.n_workers()) # type: ignore[attr-defined] + + for future in futures: + future.result() + + self.assertEqual(n_concurrent_tasks, 0) # type: ignore[attr-defined] def test_is_backend_handling_exceptions(self): - is_backend_handling_exceptions(self, self.backend()) + """Tests if the the backend correctly reports the exceptions to the returned value.""" + + N_TASKS = 10 + + futures = [] + + with self.backend().session() as session: + for i in range(0, N_TASKS): + must_fail = i == (N_TASKS - 1) + futures.append(session.submit(failure_task, must_fail=must_fail)) + + for i, future in enumerate(futures): + must_fail = i == (N_TASKS - 1) + + if must_fail: + self.assertRaises(Exception, future.result) # type: ignore[attr-defined] + else: + future.result() def test_is_backend_providing_speedup(self): - is_backend_providing_speedup(self, self.backend(), n_workers=self.n_workers()) + N_TASKS = 8 + MAX_OVERHEAD = 0.2 # 20% + TASK_DURATION = 10 + + if self.n_workers() > 0: + min_expected_duration = (N_TASKS * TASK_DURATION) / float(self.n_workers()) + else: + min_expected_duration = N_TASKS * TASK_DURATION + + with self.backend().session() as session: + duration = timeit.timeit( + lambda: list(parallel_map(sleep, (TASK_DURATION for _ in range(0, N_TASKS)), backend_session=session)), + number=1, + ) + + self.assertAlmostEqual( # type: ignore[attr-defined] + duration, min_expected_duration, delta=min_expected_duration * MAX_OVERHEAD + ) + self.assertGreaterEqual(duration, min_expected_duration) # type: ignore[attr-defined] def test_task_duration(self): - is_task_duration_correct(self, self.backend(), concurrency=self.n_workers()) + """ + Checks if the measured ``submit()`` task duration matches the returned backend's value. + """ + + TOLERANCE = 0.25 # 25% + ITERATIONS = 1000 + + # Measures the total task duration and compares it to the task duration returned by the backend's interface. + # + # We take two time measurements, one for the current process (only CPU time), and one for all the concurrent + # processes (wall time). + + futures: Deque[ProfiledFuture] = deque() + total_task_duration = 0 + + with profile() as process_time, \ + profile(time.perf_counter_ns) as eslaped_time, \ + self.backend().session() as session: + i = 0 + current_concurrency = 0 + while i < ITERATIONS: + while current_concurrency <= self.n_workers(): + futures.append(session.submit(no_op_task)) + current_concurrency += 1 + i += 1 + + # Waits for oldest task to finish. + future = futures.popleft() + total_task_duration += future.duration() + + current_concurrency -= 1 + + # Waits for all futures to finish. + total_task_duration += sum(f.duration() for f in futures) + + measured_duration = process_time.value + eslaped_time.value * self.n_workers() + + delta = measured_duration * TOLERANCE + + if abs(measured_duration - total_task_duration) < delta: + warnings.warn(f"Excpected execution duration of {total_task_duration} ns, measured {measured_duration} ns.") def test_supports_nested_tasks(self): - is_backend_supporting_nested_tasks(self, self.backend()) + """Validates that the backend supports nested tasks if it reports it.""" + + if not self.backend().allows_nested_tasks(): + return + + with self.backend().session() as session: + self.assertEqual( # type: ignore[attr-defined] + session.submit(nested_task, self.backend(), must_fail=False).result(), None + ) + + with self.assertRaises(Exception): # type: ignore[attr-defined] + # Must propagate inner tasks' exceptions. + session.submit(nested_task, self.backend(), must_fail=True).result() diff --git a/tests/backend/test_backend_debug.py b/tests/backend/test_backend_debug.py deleted file mode 100644 index de32d9d..0000000 --- a/tests/backend/test_backend_debug.py +++ /dev/null @@ -1,82 +0,0 @@ -import logging -import unittest -from typing import Callable, Generator, Iterable, Tuple - -from parfun.decorators import parfun -from parfun.entry_point import set_parallel_backend - - -def factorial_partition_function( - start: int, end: int, chunk_size: int = 5 -) -> Generator[Tuple[int, Tuple[int, ...]], int, None]: - """ - A python generator to partition a range between - [start] and [end] using a chunk size of [chunk_size] - Return: - Tuple[int, Tuple[int, ...], - e.g, (chunk_size, (start, end)) - """ - while start <= end: - new_end = min(start + chunk_size - 1, end) - yield new_end - start + 1, (start, new_end) - start = new_end + 1 - - -def factorial_combine_function(values: Iterable[int]) -> int: - from functools import reduce - - return reduce(lambda x, y: x * y, values, 1) - - -@parfun( - partition_on=("start", "end"), partition_with=factorial_partition_function, combine_with=factorial_combine_function -) -def factorial_computing_using_loop(start: int, end: int, callback: Callable = lambda x: x) -> int: - """ - Get factorial computation between [start] and [end] - e.g., - >> factorial_computing_using_loop(1, 5) = 1*2*3*4*5 = 120 - """ - m = 1 - for i in range(start, end + 1): - m = m * i - return callback(m) - - -def factorial_computing_using_multiprocess(start: int, end: int) -> int: - from multiprocessing import Pool - - num_workers = 5 - # [(1, 9), (10, 18), (19, 27), (28, 36), (37, 45), (46, 50)] - partition_data = [data for _, data in factorial_partition_function(start, end, (end - start) // num_workers)] - with Pool(num_workers) as pool: - results = pool.starmap(factorial_computing_using_loop, partition_data) - return factorial_combine_function(results) - - -@parfun( - partition_on=("start", "end"), partition_with=factorial_partition_function, combine_with=factorial_combine_function -) -def factorial_computing_using_recursive(start: int, end: int) -> int: - if end == start: - return start - else: - return end * factorial_computing_using_recursive(start, end - 1) - - -def callback_func(x): - return x * 2 - - -class TestFactorialComputation(unittest.TestCase): - def test_loop_using_none_backend(self): - set_parallel_backend("none") - logging.info(f"Result: {factorial_computing_using_loop(1, 10)}") - - def test_loop_using_single_process_backend(self): - set_parallel_backend("local_single_process") - logging.info(f"Result: {factorial_computing_using_loop(1, 10)}") - - def test_loop_using_multiprocess_backend(self): - set_parallel_backend("local_multiprocessing") - logging.info(f"Result: {factorial_computing_using_loop(1, 10)}") diff --git a/tests/backend/test_dask.py b/tests/backend/test_dask.py index ab8deb6..cfdf47a 100644 --- a/tests/backend/test_dask.py +++ b/tests/backend/test_dask.py @@ -1,10 +1,17 @@ import unittest -from parfun.backend.dask import DaskLocalClusterBackend, DaskRemoteClusterBackend +try: + from parfun.backend.dask import DaskLocalClusterBackend, DaskRemoteClusterBackend + dask_available = True +except ImportError: + dask_available = False +from parfun.backend.mixins import BackendEngine + from tests.backend.mixins import BackendEngineTestCase from tests.backend.utility import warmup_workers +@unittest.skipUnless(dask_available, "Dask is not installed") class TestDaskLocalBackend(unittest.TestCase, BackendEngineTestCase): N_WORKERS = 4 @@ -19,7 +26,7 @@ def tearDown(self) -> None: def n_workers(self) -> int: return TestDaskLocalBackend.N_WORKERS - def backend(self) -> DaskLocalClusterBackend: + def backend(self) -> BackendEngine: return self._backend def test_is_backend_dask_remote(self): diff --git a/tests/backend/test_scaler.py b/tests/backend/test_scaler.py index a26b605..d41204e 100644 --- a/tests/backend/test_scaler.py +++ b/tests/backend/test_scaler.py @@ -1,10 +1,17 @@ import unittest -from parfun.backend.scaler import ScalerLocalBackend, ScalerRemoteBackend +from parfun.backend.mixins import BackendEngine +try: + from parfun.backend.scaler import ScalerLocalBackend, ScalerRemoteBackend + scaler_installed = True +except ImportError: + scaler_installed = False + from tests.backend.mixins import BackendEngineTestCase from tests.backend.utility import warmup_workers +@unittest.skipUnless(scaler_installed, "Scaler backend not installed") class TestScalerBackend(unittest.TestCase, BackendEngineTestCase): N_WORKERS = 4 @@ -19,7 +26,7 @@ def tearDown(self) -> None: def n_workers(self) -> int: return TestScalerBackend.N_WORKERS - def backend(self) -> ScalerLocalBackend: + def backend(self) -> BackendEngine: return self._backend def test_is_backend_scaler_remote(self): diff --git a/tests/backend/utility.py b/tests/backend/utility.py index 181300d..4919605 100644 --- a/tests/backend/utility.py +++ b/tests/backend/utility.py @@ -1,166 +1,25 @@ import time -import timeit -import unittest -from collections import deque from concurrent.futures import wait from parfun.backend.mixins import BackendEngine -from parfun.functions import parallel_map -from parfun.profiler.functions import profile from parfun.profiler.object import TraceTime -# default time.sleep has no signatures, which will break scaler -def sleep(x): - time.sleep(x) - return - - -def is_backend_blocking(test_case: unittest.TestCase, backend: BackendEngine, max_concurrency: int = 1): - """Tests the backend `submit()` method and checks that it's blocking when exceeding `max_concurrency` concurrent - tasks.""" - - N_TASKS = max_concurrency * 4 - DELAY = 0.5 - - n_concurrent_tasks = 0 - futures = [] - - def future_callback(_): - nonlocal n_concurrent_tasks - n_concurrent_tasks -= 1 - - with backend.session() as session: - for _ in range(0, N_TASKS): - n_concurrent_tasks += 1 - future = session.submit(sleep, DELAY) - - future.add_done_callback(future_callback) - - futures.append(future) - - test_case.assertLessEqual(n_concurrent_tasks, max_concurrency) - - for future in futures: - future.result() - - test_case.assertEqual(n_concurrent_tasks, 0) - - -def is_backend_handling_exceptions(test_case: unittest.TestCase, backend: BackendEngine): - """Tests if the the backend correctly reports the exceptions to the returned value.""" - - N_TASKS = 10 - - futures = [] - - with backend.session() as session: - for i in range(0, N_TASKS): - must_fail = i == (N_TASKS - 1) - futures.append(session.submit(_failure_task, must_fail=must_fail)) - - for i, future in enumerate(futures): - must_fail = i == (N_TASKS - 1) - - if must_fail: - test_case.assertRaises(Exception, future.result) - else: - future.result() - - -def is_backend_providing_speedup( - test_case: unittest.TestCase, - backend: BackendEngine, - n_tasks: int = 8, - n_workers: int = 2, - max_overhead: float = 0.2, # 20% - task_duration: float = 10, -): - if n_workers > 0: - min_expected_duration = (n_tasks * task_duration) / float(n_workers) - else: - min_expected_duration = n_tasks * task_duration - - with backend.session() as session: - duration = timeit.timeit( - lambda: list(parallel_map(sleep, (task_duration for _ in range(0, n_tasks)), backend_session=session)), - number=1, - ) - - test_case.assertAlmostEqual(duration, min_expected_duration, delta=min_expected_duration * max_overhead) - test_case.assertGreaterEqual(duration, min_expected_duration) - - -def is_task_duration_correct( - test_case: unittest.TestCase, - backend: BackendEngine, - concurrency: int = 0, - tolerance: float = 0.25, # 25% - iterations: int = 1000, -): - """ - Checks if the measured ``submit()`` task duration matches the returned backend's value. - - :param concurrency the concurrent processes. If zero, assumes no worker processes and the backend executes in the - current thread. - :param tolerance the test tolerance as a fraction of the backend's overhead (default: 25%). - """ - - # Measures the total task duration and compares it to the task duration returned by the backend's interface. - # - # We take two time measurements, one for the current process (only CPU time), and one for all the concurrent - # processes (wall time). - - futures = deque() - total_task_duration = 0 - - with profile() as process_time, profile(time.perf_counter_ns) as eslaped_time, backend.session() as session: - i = 0 - current_concurrency = 0 - while i < iterations: - while current_concurrency <= concurrency: - futures.append(session.submit(_no_op_task)) - current_concurrency += 1 - i += 1 - - # Waits for oldest task to finish. - future = futures.popleft() - total_task_duration += future.duration() - - current_concurrency -= 1 - - # Waits for all futures to finish. - total_task_duration += sum(f.duration() for f in futures) - - measured_duration = process_time.value + eslaped_time.value * concurrency - - delta = measured_duration * tolerance - - test_case.assertAlmostEqual(measured_duration, total_task_duration, delta=delta) - - -def is_backend_supporting_nested_tasks(test_case: unittest.TestCase, backend: BackendEngine): - """Validates that the backend supports nested tasks if it reports it.""" - - if not backend.allows_nested_tasks(): - return - - with backend.session() as session: - test_case.assertEqual(session.submit(_nested_task, backend, must_fail=False).result(), None) +def failure_task(must_fail: bool, duration: TraceTime = 1_000_000): + """Raises an exception if `must_fail` is True.""" - with test_case.assertRaises(Exception): - # Must propagate inner tasks' exceptions. - session.submit(_nested_task, backend, must_fail=True).result() + no_op_task(duration) + if must_fail: + raise Exception("_failure_task: task failure.") -def warmup_workers(backend: BackendEngine, n_workers: int): - """Makes sure the backend's workers are fully initialized by submitting a single task.""" +def nested_task(backend: BackendEngine, must_fail: bool, duration: TraceTime = 1_000_000): with backend.session() as session: - wait([session.submit(_no_op_task) for _ in range(0, n_workers)]) + return session.submit(failure_task, must_fail, duration).result() -def _no_op_task(duration: TraceTime = 1_000_000) -> None: +def no_op_task(duration: TraceTime = 1_000_000) -> None: starts_at = time.process_time_ns() while time.process_time_ns() - starts_at < duration: @@ -169,15 +28,14 @@ def _no_op_task(duration: TraceTime = 1_000_000) -> None: return None -def _failure_task(must_fail: bool, duration: TraceTime = 1_000_000): - """Raises an exception if `must_fail` is True.""" - - _no_op_task(duration) +def sleep(x): + """Built-in time.sleep has no signatures, which will break scaler""" + time.sleep(x) + return - if must_fail: - raise Exception("_failure_task: task failure.") +def warmup_workers(backend: BackendEngine, n_workers: int): + """Makes sure the backend's workers are fully initialized by submitting a single task.""" -def _nested_task(backend: BackendEngine, must_fail: bool, duration: TraceTime = 1_000_000): with backend.session() as session: - return session.submit(_failure_task, must_fail, duration).result() + wait([session.submit(no_op_task) for _ in range(0, n_workers)]) diff --git a/tests/kernel/test_function_signature.py b/tests/kernel/test_function_signature.py index 7494d72..ee3345a 100644 --- a/tests/kernel/test_function_signature.py +++ b/tests/kernel/test_function_signature.py @@ -1,4 +1,3 @@ -import inspect import unittest from collections import OrderedDict @@ -8,7 +7,7 @@ class TestFunctionSignature(unittest.TestCase): def test_assign(self): def function_1(arg_1, arg_2=None): - ... + pass signature = FunctionSignature.from_function(function_1) @@ -33,7 +32,7 @@ def function_1(arg_1, arg_2=None): self.assertEqual(assigned_args.var_args, tuple()) def function_2(arg_1, arg_2=None, *args, **kwargs): - ... + pass signature = FunctionSignature.from_function(function_2) @@ -48,27 +47,6 @@ def function_2(arg_1, arg_2=None, *args, **kwargs): self.assertEqual(assigned_args.kwargs, {"arg_4": 4}) self.assertEqual(assigned_args.var_args, (3,)) - # Positional only, positional or keyword, keyword only - def function_3(arg_1, /, arg_2, arg_3=0, *, arg_4=1, **kwargs): - ... - - signature = inspect.signature(function_3) - self.assertEqual(signature.parameters["arg_1"].kind, inspect.Parameter.POSITIONAL_ONLY) - self.assertEqual(signature.parameters["arg_2"].kind, inspect.Parameter.POSITIONAL_OR_KEYWORD) - self.assertEqual(signature.parameters["arg_3"].kind, inspect.Parameter.POSITIONAL_OR_KEYWORD) - self.assertEqual(signature.parameters["arg_4"].kind, inspect.Parameter.KEYWORD_ONLY) - self.assertEqual(signature.parameters["kwargs"].kind, inspect.Parameter.VAR_KEYWORD) - - # Positional only, positional or keyword, var positional, var keyword - def function_4(arg_1, /, arg_2=1, *args, **kwargs): - ... - - signature = inspect.signature(function_4) - self.assertEqual(signature.parameters["arg_1"].kind, inspect.Parameter.POSITIONAL_ONLY) - self.assertEqual(signature.parameters["arg_2"].kind, inspect.Parameter.POSITIONAL_OR_KEYWORD) - self.assertEqual(signature.parameters["args"].kind, inspect.Parameter.VAR_POSITIONAL) - self.assertEqual(signature.parameters["kwargs"].kind, inspect.Parameter.VAR_KEYWORD) - if __name__ == "__main__": unittest.main() diff --git a/tests/kernel/test_parallel_function.py b/tests/kernel/test_parallel_function.py index 8cae863..e43dd01 100644 --- a/tests/kernel/test_parallel_function.py +++ b/tests/kernel/test_parallel_function.py @@ -12,26 +12,36 @@ def test_validate_signature(self): # These are valid: ParallelFunction( - function=lambda x, y: x + y, partition_on=("x",), partition_with=lambda x: [(x,)], combine_with=sum + function=lambda x, y: x + y, # type: ignore[misc, arg-type] + function_name="lambda", + partition_on=("x",), + partition_with=lambda x: [(x,)], # type: ignore[arg-type, return-value] + combine_with=sum, ) ParallelFunction( - function=lambda *args, **kwargs: tuple(), + function=lambda *args, **kwargs: tuple(), # type: ignore[misc, arg-type] + function_name="lambda", partition_on=("x", "y"), - partition_with=lambda x, y: [(x, y)], + partition_with=lambda x, y: [(x, y)], # type: ignore[arg-type, return-value] combine_with=sum, ) with self.assertRaises(ValueError): ParallelFunction( - function=lambda x, y: x + y, partition_on=(), partition_with=lambda: [()], combine_with=sum + function=lambda x, y: x + y, # type: ignore[misc, arg-type] + function_name="lambda", + partition_on=(), + partition_with=lambda: [()], # type: ignore[arg-type, return-value] + combine_with=sum, ) with self.assertRaises(ValueError): ParallelFunction( - function=lambda x, y: x + y, - partition_on=["x", "z"], - partition_with=lambda x, z: [(x, z)], + function=lambda x, y: x + y, # type: ignore[misc, arg-type] + function_name="lambda", + partition_on=["x", "z"], # type: ignore[arg-type] + partition_with=lambda x, z: [(x, z)], # type: ignore[arg-type, return-value] combine_with=sum, ) diff --git a/tests/partition/test_dataframe.py b/tests/partition/test_dataframe.py index d76d4f8..ec543d4 100644 --- a/tests/partition/test_dataframe.py +++ b/tests/partition/test_dataframe.py @@ -1,6 +1,6 @@ import math import unittest -from typing import List +from typing import List, cast try: import pandas as pd @@ -8,6 +8,7 @@ raise ImportError("Pandas dependency missing. Use `pip install 'parfun[pandas]'` to install Pandas.") from parfun.partition.dataframe import df_by_group, df_by_row +from parfun.partition.object import SmartPartitionGenerator from parfun.partition.utility import with_partition_size from tests.test_helpers import random_df @@ -57,7 +58,7 @@ def test_df_by_group(self): # Tests if the generator dynamically adapts to varying chunk size. - gen = df_by_group(by="category")(input_df) + gen = cast(SmartPartitionGenerator, df_by_group(by="category")(input_df)) next(gen) partition_size, chunk = gen.send(1) diff --git a/tests/partition/test_primitives.py b/tests/partition/test_primitives.py index 8410b01..6ec0f5d 100644 --- a/tests/partition/test_primitives.py +++ b/tests/partition/test_primitives.py @@ -1,7 +1,7 @@ import math import unittest from itertools import chain, repeat -from typing import Generator, List, Tuple +from typing import Generator, List, Tuple, cast try: import pandas as pd @@ -10,6 +10,7 @@ from parfun.partition.collection import list_by_chunk from parfun.partition.dataframe import df_by_group, df_by_row +from parfun.partition.object import SimplePartitionIterator from parfun.partition.primitives import partition_flatmap, partition_map, partition_zip from parfun.partition.utility import with_partition_size @@ -26,7 +27,7 @@ def test_partition_zip(self): gen_1 = list_by_chunk gen_2 = df_by_row - gen_3 = repeat(math.pi) + gen_3 = cast(SimplePartitionIterator, repeat(math.pi)) ys = list(with_partition_size(partition_zip(gen_1(xs), gen_2(df), gen_3), partition_size=PARTITION_SIZE)) @@ -62,7 +63,7 @@ def test_partition_map(self): xs = list(range(0, N)) - def mapped_function(partition: List) -> Tuple[List]: + def mapped_function(partition: List[int]) -> Tuple[List[int]]: return ([x * x for x in partition],) # Smart generators diff --git a/tests/partition/test_utility.py b/tests/partition/test_utility.py index 598ee23..6e16884 100644 --- a/tests/partition/test_utility.py +++ b/tests/partition/test_utility.py @@ -8,9 +8,9 @@ class TestPartitionUtility(unittest.TestCase): def test_with_partition_size(self): ls_1 = range(0, 5) ls_2 = range(10, 15) - values = with_partition_size(list_by_chunk(ls_1, ls_2), partition_size=2) + values = list(with_partition_size(list_by_chunk(ls_1, ls_2), partition_size=2)) - self.assertListEqual(list(values), [((0, 1), (10, 11)), ((2, 3), (12, 13)), ((4,), (14,))]) + self.assertListEqual(values, [((0, 1), (10, 11)), ((2, 3), (12, 13)), ((4,), (14,))]) if __name__ == "__main__": diff --git a/tests/test_decorators.py b/tests/test_decorators.py index 687ffe1..4286dc0 100644 --- a/tests/test_decorators.py +++ b/tests/test_decorators.py @@ -13,7 +13,9 @@ from parfun.combine.collection import list_concat from parfun.combine.dataframe import df_concat from parfun.decorators import parfun -from parfun.entry_point import get_parallel_backend, set_parallel_backend, set_parallel_backend_context +from parfun.entry_point import ( + BACKEND_REGISTRY, get_parallel_backend, set_parallel_backend, set_parallel_backend_context +) from parfun.partition.api import per_argument from parfun.partition.collection import list_by_chunk from parfun.partition.dataframe import df_by_row @@ -101,6 +103,7 @@ def test_parallel_concurrent_generator(self): self.assertGreater(duration, expected_duration) self.assertAlmostEqual(duration, expected_duration, delta=expected_duration * 0.2) # within 20% of expected + @unittest.skipUnless("scaler_local" in BACKEND_REGISTRY, "Scaler backend not installed") def test_parallel_nested_calls(self): """Makes sure that the decorators handles nested parallel function calls.""" @@ -162,9 +165,7 @@ def test_per_argument(self): self.assertTrue(sequential.equals(parallel)) -@parfun( - partition_on=("col1", "col2", "col3"), partition_with=list_by_chunk, combine_with=sum, fixed_partition_size=100 -) +@parfun(partition_on=("col1", "col2", "col3"), partition_with=list_by_chunk, combine_with=sum, fixed_partition_size=100) def _sum_horizontally(col1: Iterable[int], col2: Iterable[int], col3: Iterable[int], constant: int) -> int: result = 0 for i in zip(col1, col2, col3): @@ -187,7 +188,7 @@ def _calculate_some_df(a: List[int], b: List[float], constant_df: pd.DataFrame) return pd.concat(list_of_df) -def _delayed_partition(values: Iterable[float]) -> PartitionGenerator[Tuple[float]]: +def _delayed_partition(values: Iterable[float]) -> PartitionGenerator[Tuple[List[float]]]: yield None for i, v in enumerate(values): logging.debug(f"starts generating partition #{i}") @@ -197,7 +198,7 @@ def _delayed_partition(values: Iterable[float]) -> PartitionGenerator[Tuple[floa def _delayed_combine(values: Iterable[float]) -> float: - result = 0 + result = 0.0 for i, v in enumerate(values): logging.debug(f"starts combining partition #{i}") time.sleep(_DELAY)