From 582355ab3c004ec179e61603c5c55352075dc4dc Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Tue, 24 Oct 2023 00:25:41 +0200 Subject: [PATCH] dask fixes Signed-off-by: Anatoly Myachev --- .../pandas_on_dask/dataframe/dataframe.py | 21 +++++++++++++++++++ modin/pandas/test/test_series.py | 2 +- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py index 77c78e29f9c..bccc5c0152b 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py @@ -41,3 +41,24 @@ class PandasOnDaskDataframe(PandasDataframe): """ _partition_mgr_cls = PandasOnDaskDataframePartitionManager + + @classmethod + def reconnect(cls, address, attributes): + try: + from distributed import default_client + + default_client() + except ValueError: + from distributed import Client + + # setup `default_client` for worker process + _ = Client(address) + obj = cls.__new__(cls) + obj.__dict__.update(attributes) + return obj + + def __reduce__(self): + from distributed import default_client + + address = default_client().scheduler_info()["address"] + return self.reconnect, (address, self.__dict__) diff --git a/modin/pandas/test/test_series.py b/modin/pandas/test/test_series.py index 1dc0843c63f..b1930487952 100644 --- a/modin/pandas/test/test_series.py +++ b/modin/pandas/test/test_series.py @@ -4796,7 +4796,7 @@ def test_binary_numpy_universal_function_issue_6483(): ) -@pytest.mark.xfail(Engine.get() != "Ray", reason="reason") +@pytest.mark.skipif(Engine.get() == "Unidist", reason="Unidist doesn't work for now.") def test__reduce__(): abbreviations = pd.Series( ["Major League Baseball", "National Basketball Association"],