Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask.distributed does not work with grib files opened by earthkit #378

Open
malmans2 opened this issue May 10, 2024 · 2 comments
Open

dask.distributed does not work with grib files opened by earthkit #378

malmans2 opened this issue May 10, 2024 · 2 comments
Assignees
Labels
bug Something isn't working

Comments

@malmans2
Copy link
Contributor

malmans2 commented May 10, 2024

What happened?

I think it's the same problem reported in #375, but I'm opening a new issue as I'm not 100% sure.
I'm not able to use dask.distributed with grib files opened by earthkit-data with dask.

I get this warning when I open the data with dask:

In  , overriding the default value (chunks=None) with chunks={} is not recommended.

Is that intentional? We are not supposed to use dask with grib files?

What are the steps to reproduce the bug?

import earthkit.data
import dask.distributed

client = dask.distributed.Client()

collection_id = "reanalysis-era5-single-levels"
request = {
    "variable": "2t",
    "product_type": "reanalysis",
    "date": "2012-12-01",
    "time": "12:00",
}
earthkit_ds = earthkit.data.from_source("cds", collection_id, **request)
xr_ds = earthkit_ds.to_xarray(xarray_open_dataset_kwargs={"chunks": {}})
xr_ds.to_netcdf("test.nc")  # TypeError

Version

0.7.0

Platform (OS and architecture)

Linux eqc-quality-tools.eqc.compute.cci1.ecmwf.int 5.14.0-362.8.1.el9_3.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Nov 8 17:36:32 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

Relevant log output

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/pickle.py:63, in dumps(x, buffer_callback, protocol)
     62 try:
---> 63     result = pickle.dumps(x, **dump_kwargs)
     64 except Exception:

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/readers/grib/xarray.py:38, in IndexWrapperForCfGrib.__getstate__(self)
     37 def __getstate__(self):
---> 38     return dict(index=serialise_state(self.index), ignore_keys=self.ignore_keys)

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/utils/serialise.py:26, in serialise_state(obj)
     25 LOG.info("serialise %s", fullname)
---> 26 return (fullname, SERIALISATION[fullname][0](obj))

KeyError: ('earthkit.data.readers.grib.reader', 'GRIBReader')

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/pickle.py:68, in dumps(x, buffer_callback, protocol)
     67 buffers.clear()
---> 68 pickler.dump(x)
     69 result = f.getvalue()

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/readers/grib/xarray.py:38, in IndexWrapperForCfGrib.__getstate__(self)
     37 def __getstate__(self):
---> 38     return dict(index=serialise_state(self.index), ignore_keys=self.ignore_keys)

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/utils/serialise.py:26, in serialise_state(obj)
     25 LOG.info("serialise %s", fullname)
---> 26 return (fullname, SERIALISATION[fullname][0](obj))

KeyError: ('earthkit.data.readers.grib.reader', 'GRIBReader')

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/serialize.py:363, in serialize(x, serializers, on_error, context, iterate_collection)
    362 try:
--> 363     header, frames = dumps(x, context=context) if wants_context else dumps(x)
    364     header["serializer"] = name

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/serialize.py:78, in pickle_dumps(x, context)
     76     writeable.append(not f.readonly)
---> 78 frames[0] = pickle.dumps(
     79     x,
     80     buffer_callback=buffer_callback,
     81     protocol=context.get("pickle-protocol", None) if context else None,
     82 )
     83 header = {
     84     "serializer": "pickle",
     85     "writeable": tuple(writeable),
     86 }

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/pickle.py:81, in dumps(x, buffer_callback, protocol)
     80     buffers.clear()
---> 81     result = cloudpickle.dumps(x, **dump_kwargs)
     82 except Exception:

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/cloudpickle/cloudpickle.py:1479, in dumps(obj, protocol, buffer_callback)
   1478 cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479 cp.dump(obj)
   1480 return file.getvalue()

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/cloudpickle/cloudpickle.py:1245, in Pickler.dump(self, obj)
   1244 try:
-> 1245     return super().dump(obj)
   1246 except RuntimeError as e:

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/readers/grib/xarray.py:38, in IndexWrapperForCfGrib.__getstate__(self)
     37 def __getstate__(self):
---> 38     return dict(index=serialise_state(self.index), ignore_keys=self.ignore_keys)

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/utils/serialise.py:26, in serialise_state(obj)
     25 LOG.info("serialise %s", fullname)
---> 26 return (fullname, SERIALISATION[fullname][0](obj))

KeyError: ('earthkit.data.readers.grib.reader', 'GRIBReader')

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
Cell In[1], line 15
     13 earthkit_ds = earthkit.data.from_source("cds", collection_id, **request)
     14 xr_ds = earthkit_ds.to_xarray(xarray_open_dataset_kwargs={"chunks": {}})
---> 15 xr_ds.to_netcdf("test.nc")

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/core/dataset.py:2298, in Dataset.to_netcdf(self, path, mode, format, group, engine, encoding, unlimited_dims, compute, invalid_netcdf)
   2295     encoding = {}
   2296 from xarray.backends.api import to_netcdf
-> 2298 return to_netcdf(  # type: ignore  # mypy cannot resolve the overloads:(
   2299     self,
   2300     path,
   2301     mode=mode,
   2302     format=format,
   2303     group=group,
   2304     engine=engine,
   2305     encoding=encoding,
   2306     unlimited_dims=unlimited_dims,
   2307     compute=compute,
   2308     multifile=False,
   2309     invalid_netcdf=invalid_netcdf,
   2310 )

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/backends/api.py:1348, in to_netcdf(dataset, path_or_file, mode, format, group, engine, encoding, unlimited_dims, compute, multifile, invalid_netcdf)
   1345 if multifile:
   1346     return writer, store
-> 1348 writes = writer.sync(compute=compute)
   1350 if isinstance(target, BytesIO):
   1351     store.sync()

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/backends/common.py:297, in ArrayWriter.sync(self, compute, chunkmanager_store_kwargs)
    294 if chunkmanager_store_kwargs is None:
    295     chunkmanager_store_kwargs = {}
--> 297 delayed_store = chunkmanager.store(
    298     self.sources,
    299     self.targets,
    300     lock=self.lock,
    301     compute=compute,
    302     flush=True,
    303     regions=self.regions,
    304     **chunkmanager_store_kwargs,
    305 )
    306 self.sources = []
    307 self.targets = []

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:249, in DaskManager.store(self, sources, targets, **kwargs)
    241 def store(
    242     self,
    243     sources: Any | Sequence[Any],
    244     targets: Any,
    245     **kwargs: Any,
    246 ) -> Any:
    247     from dask.array import store
--> 249     return store(
    250         sources=sources,
    251         targets=targets,
    252         **kwargs,
    253     )

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/dask/array/core.py:1236, in store(***failed resolving arguments***)
   1234 elif compute:
   1235     store_dsk = HighLevelGraph(layers, dependencies)
-> 1236     compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
   1237     return None
   1239 else:

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/dask/base.py:402, in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    400 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    401 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 402 return schedule(dsk2, keys, **kwargs)

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/client.py:3259, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3186 def get(
   3187     self,
   3188     dsk,
   (...)
   3200     **kwargs,
   3201 ):
   3202     """Compute dask graph
   3203
   3204     Parameters
   (...)
   3257     Client.compute : Compute asynchronous collections
   3258     """
-> 3259     futures = self._graph_to_futures(
   3260         dsk,
   3261         keys=set(flatten([keys])),
   3262         workers=workers,
   3263         allow_other_workers=allow_other_workers,
   3264         resources=resources,
   3265         fifo_timeout=fifo_timeout,
   3266         retries=retries,
   3267         user_priority=priority,
   3268         actors=actors,
   3269     )
   3270     packed = pack_data(keys, futures)
   3271     if sync:

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/client.py:3155, in Client._graph_to_futures(self, dsk, keys, workers, allow_other_workers, internal_priority, user_priority, resources, retries, fifo_timeout, actors)
   3152 from distributed.protocol import serialize
   3153 from distributed.protocol.serialize import ToPickle
-> 3155 header, frames = serialize(ToPickle(dsk), on_error="raise")
   3157 pickled_size = sum(map(nbytes, [header] + frames))
   3158 if pickled_size > parse_bytes(
   3159     dask.config.get("distributed.admin.large-graph-warning-threshold")
   3160 ):

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/serialize.py:389, in serialize(x, serializers, on_error, context, iterate_collection)
    387     except Exception:
    388         raise TypeError(msg) from exc
--> 389     raise TypeError(msg, str_x) from exc
    390 else:  # pragma: nocover
    391     raise ValueError(f"{on_error=}; expected 'message' or 'raise'")

TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7feabb21bf90>\n 0. 140646138612736\n>')

Accompanying data

No response

Organisation

B-Open / CADS-EQC

@malmans2 malmans2 added the bug Something isn't working label May 10, 2024
@sandorkertesz sandorkertesz self-assigned this May 13, 2024
@sandorkertesz
Copy link
Collaborator

Thank you for reporting this issue. I looked into it and it does not seem to be related to #375.

@sandorkertesz
Copy link
Collaborator

The reason for this failure is that serialisation for this GRIB data is yet to be implemented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants