From 7f6a8d467cecc47dc3bd7127e25d44c292fb698b Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 14 Jun 2024 10:02:49 +0200 Subject: [PATCH 01/23] Add test to reproduce GH 2815 --- satpy/tests/reader_tests/test_netcdf_utils.py | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/satpy/tests/reader_tests/test_netcdf_utils.py b/satpy/tests/reader_tests/test_netcdf_utils.py index 5e0bcc44a1..f48141c635 100644 --- a/satpy/tests/reader_tests/test_netcdf_utils.py +++ b/satpy/tests/reader_tests/test_netcdf_utils.py @@ -392,3 +392,40 @@ def test_get_data_as_xarray_scalar_h5netcdf(tmp_path): res = get_data_as_xarray(fid["test_data"]) np.testing.assert_equal(res.data, np.array(data)) assert res.attrs == NC_ATTRS + + +@pytest.fixture() +def dummy_nc(tmp_path): + """Fixture to create a dummy NetCDF file and return its path.""" + import xarray as xr + + fn = tmp_path / "sjaunja.nc" + ds = xr.Dataset(data_vars={"kaitum": (["x"], np.arange(10))}) + ds.to_netcdf(fn) + return fn + + +def test_caching_distributed(dummy_nc): + """Test that the distributed scheduler works with file handle caching. + + This is a test for GitHub issue 2815. + """ + from dask.distributed import Client + + from satpy.readers.netcdf_utils import NetCDF4FileHandler + + fh = NetCDF4FileHandler(dummy_nc, {}, {}, cache_handle=True) + + Client() + + def doubler(x): + return x * 2 + + # As documented in GH issue 2815, using dask distributed with the file + # handle cacher might fail in non-trivial ways, such as giving incorrect + # results. Testing map_blocks is one way to reproduce the problem + # reliably, even though the problem also manifests itself (in different + # ways) without map_blocks. + + dask_doubler = fh["kaitum"].map_blocks(doubler) + dask_doubler.compute() From 6d31c20ab3d914711eda2506083053b9fcf071ba Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 14 Jun 2024 11:10:52 +0200 Subject: [PATCH 02/23] make sure distributed client is local --- satpy/tests/reader_tests/test_netcdf_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/satpy/tests/reader_tests/test_netcdf_utils.py b/satpy/tests/reader_tests/test_netcdf_utils.py index f48141c635..c381d37d4a 100644 --- a/satpy/tests/reader_tests/test_netcdf_utils.py +++ b/satpy/tests/reader_tests/test_netcdf_utils.py @@ -416,8 +416,6 @@ def test_caching_distributed(dummy_nc): fh = NetCDF4FileHandler(dummy_nc, {}, {}, cache_handle=True) - Client() - def doubler(x): return x * 2 @@ -427,5 +425,7 @@ def doubler(x): # reliably, even though the problem also manifests itself (in different # ways) without map_blocks. - dask_doubler = fh["kaitum"].map_blocks(doubler) - dask_doubler.compute() + + with Client(): + dask_doubler = fh["kaitum"].map_blocks(doubler) + dask_doubler.compute() From 1e26d1a1c1846e0d2a03d41466bb59aae89fc974 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 14 Jun 2024 11:32:10 +0200 Subject: [PATCH 03/23] Start utility function for distributed friendly Start work on a utility function to get a dask array from a dataset variable in a way that is friendly to dask.distributed. --- satpy/readers/utils.py | 30 +++++++++++++++++++++++ satpy/tests/reader_tests/test_utils.py | 34 ++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index c1bf7c7497..72bc26bf30 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -29,6 +29,7 @@ from shutil import which from subprocess import PIPE, Popen # nosec +import dask.array as da import numpy as np import pyproj import xarray as xr @@ -474,3 +475,32 @@ def remove_earthsun_distance_correction(reflectance, utc_date=None): with xr.set_options(keep_attrs=True): reflectance = reflectance / reflectance.dtype.type(sun_earth_dist * sun_earth_dist) return reflectance + + +def get_distributed_friendly_dask_array(manager, varname): + """Construct a dask array from a variable for dask distributed. + + When we construct a dask array using da.array and use that to create an + xarray dataarray, the result is not serialisable and dask graphs using + this dataarray cannot be computed when the dask distributed scheduler + is in use. To circumvent this problem, xarray provides the + CachingFileManager. See GH#2815 for more information. + + Args: + manager (xarray.backends.CachingFileManager): + Instance of xarray.backends.CachingFileManager encapsulating the + dataset to be read. + varname (str): + Name of the variable. + """ + def get_chunk(block_info): + with manager.acquire_context() as nc: + loc = block_info[None]["array-location"][0] + rv = nc[varname][loc[0]:loc[1]] + return rv + + return da.map_blocks( + get_chunk, + chunks=(10,), + meta=np.array([]), + dtype="i4") diff --git a/satpy/tests/reader_tests/test_utils.py b/satpy/tests/reader_tests/test_utils.py index ba43688b76..3a0d85baed 100644 --- a/satpy/tests/reader_tests/test_utils.py +++ b/satpy/tests/reader_tests/test_utils.py @@ -512,3 +512,37 @@ def test_generic_open_binary(tmp_path, data, filename, mode): read_binary_data = f.read() assert read_binary_data == dummy_data + + +@pytest.fixture() +def dummy_nc(tmp_path): + """Fixture to create a dummy NetCDF file and return its path.""" + import xarray as xr + + fn = tmp_path / "sjaunja.nc" + ds = xr.Dataset(data_vars={"kaitum": (["x"], np.arange(10, dtype="i4"))}) + ds.to_netcdf(fn) + return fn + + +def test_get_distributed_friendly_dask_array(dummy_nc): + """Test getting a dask distributed friendly dask array.""" + import netCDF4 + from dask.distributed import Client + from xarray.backends import CachingFileManager + + cfm = CachingFileManager(netCDF4.Dataset, dummy_nc, mode="r") + arr = hf.get_distributed_friendly_dask_array(cfm, "kaitum") + + # As documented in GH issue 2815, using dask distributed with the file + # handle cacher might fail in non-trivial ways, such as giving incorrect + # results. Testing map_blocks is one way to reproduce the problem + # reliably, even though the problem also manifests itself (in different + # ways) without map_blocks. + + def doubler(x): + return x * 2 + + with Client(): + dask_doubler = arr.map_blocks(doubler) + dask_doubler.compute() From be40c5ba5b1210317e95c3a6680ec7c61d81d057 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 14 Jun 2024 14:11:47 +0200 Subject: [PATCH 04/23] Parameterise test and simplify implementation For the distributed-friendly dask array helper, parameterise the test to cover more cases. Simplify the implementation. --- satpy/readers/utils.py | 24 ++++++++++++------- satpy/tests/reader_tests/test_utils.py | 33 +++++++++++++++----------- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index 72bc26bf30..e39b46c0e6 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -477,7 +477,7 @@ def remove_earthsun_distance_correction(reflectance, utc_date=None): return reflectance -def get_distributed_friendly_dask_array(manager, varname): +def get_distributed_friendly_dask_array(manager, varname, chunks=None): """Construct a dask array from a variable for dask distributed. When we construct a dask array using da.array and use that to create an @@ -486,21 +486,29 @@ def get_distributed_friendly_dask_array(manager, varname): is in use. To circumvent this problem, xarray provides the CachingFileManager. See GH#2815 for more information. + Should have at least one dimension. + + Example:: + + >>> import NetCDF4 + >>> from xarray.backends import CachingFileManager + >>> cfm = CachingFileManager(NetCDF4.Dataset, fn, mode="r") + >>> arr = get_distributed_friendly_dask_array(cfm, "my_var") + Args: manager (xarray.backends.CachingFileManager): Instance of xarray.backends.CachingFileManager encapsulating the dataset to be read. varname (str): Name of the variable. + chunks (tuple or None, optional): + Chunks to use when creating the dask array. """ - def get_chunk(block_info): + def get_chunk(): with manager.acquire_context() as nc: - loc = block_info[None]["array-location"][0] - rv = nc[varname][loc[0]:loc[1]] - return rv + return nc[varname][:] return da.map_blocks( get_chunk, - chunks=(10,), - meta=np.array([]), - dtype="i4") + chunks=chunks, + meta=np.array([])) diff --git a/satpy/tests/reader_tests/test_utils.py b/satpy/tests/reader_tests/test_utils.py index 3a0d85baed..1b8af70e10 100644 --- a/satpy/tests/reader_tests/test_utils.py +++ b/satpy/tests/reader_tests/test_utils.py @@ -514,24 +514,23 @@ def test_generic_open_binary(tmp_path, data, filename, mode): assert read_binary_data == dummy_data -@pytest.fixture() -def dummy_nc(tmp_path): - """Fixture to create a dummy NetCDF file and return its path.""" - import xarray as xr - - fn = tmp_path / "sjaunja.nc" - ds = xr.Dataset(data_vars={"kaitum": (["x"], np.arange(10, dtype="i4"))}) - ds.to_netcdf(fn) - return fn - - -def test_get_distributed_friendly_dask_array(dummy_nc): +@pytest.mark.parametrize("shape", [(2,), (2, 3), (2, 3, 4)]) +@pytest.mark.parametrize("dtype", ["i4", "f4", "f8"]) +def test_get_distributed_friendly_dask_array(tmp_path, shape, dtype): """Test getting a dask distributed friendly dask array.""" import netCDF4 from dask.distributed import Client from xarray.backends import CachingFileManager - cfm = CachingFileManager(netCDF4.Dataset, dummy_nc, mode="r") + fn = tmp_path / "sjaunja.nc" + ds = xr.Dataset( + data_vars={ + "kaitum": (["x", "y", "z"][:len(shape)], + np.arange(np.prod(shape), + dtype=dtype).reshape(shape))}) + ds.to_netcdf(fn) + + cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") arr = hf.get_distributed_friendly_dask_array(cfm, "kaitum") # As documented in GH issue 2815, using dask distributed with the file @@ -543,6 +542,12 @@ def test_get_distributed_friendly_dask_array(dummy_nc): def doubler(x): return x * 2 + # FIXME: setting up the client is slow, taking more than one second — + # consider putting it in a class-scoped fixture and putting this test + # in a class (so it is still shared between parameterised runs) with Client(): dask_doubler = arr.map_blocks(doubler) - dask_doubler.compute() + res = dask_doubler.compute() + assert res.shape == shape + assert res.dtype == dtype + np.testing.assert_array_equal(res, np.arange(np.prod(shape)).reshape(shape)*2) From cbd00f0439e09432bc4555458b7045693f827c71 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 14 Jun 2024 14:40:42 +0200 Subject: [PATCH 05/23] Force shape and dtype. First working prototype. We need to force the shape and the dtype when getting the dask-distributed-friendly xarray-dataarray. Seems to have a first working prototype now. --- satpy/readers/netcdf_utils.py | 38 ++++++++++++++++---------- satpy/readers/utils.py | 7 +++-- satpy/tests/reader_tests/test_utils.py | 9 ++++-- 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index c8b8a3f85f..a6ca0bccef 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -19,14 +19,13 @@ import logging -import dask.array as da import netCDF4 import numpy as np import xarray as xr from satpy.readers import open_file_or_filename from satpy.readers.file_handlers import BaseFileHandler -from satpy.readers.utils import np2str +from satpy.readers.utils import get_distributed_friendly_dask_array, np2str from satpy.utils import get_legacy_chunk_size LOG = logging.getLogger(__name__) @@ -85,10 +84,12 @@ class NetCDF4FileHandler(BaseFileHandler): xarray_kwargs (dict): Addition arguments to `xarray.open_dataset` cache_var_size (int): Cache variables smaller than this size. cache_handle (bool): Keep files open for lifetime of filehandler. + Uses xarray.backends.CachingFileManager, which uses a least + recently used cache. """ - file_handle = None + manager = None def __init__(self, filename, filename_info, filetype_info, auto_maskandscale=False, xarray_kwargs=None, @@ -118,7 +119,8 @@ def __init__(self, filename, filename_info, filetype_info, self.collect_cache_vars(cache_var_size) if cache_handle: - self.file_handle = file_handle + self.manager = xr.backends.CachingFileManager( + netCDF4.Dataset, self.filename, mode="r") else: file_handle.close() @@ -196,9 +198,9 @@ def _get_required_variable_names(listed_variables, variable_name_replacements): def __del__(self): """Delete the file handler.""" - if self.file_handle is not None: + if self.manager is not None: try: - self.file_handle.close() + self.manager.close() except RuntimeError: # presumably closed already pass @@ -289,8 +291,8 @@ def _get_variable(self, key, val): group, key = parts else: group = None - if self.file_handle is not None: - val = self._get_var_from_filehandle(group, key) + if self.manager is not None: + val = self._get_var_from_manager(group, key) else: val = self._get_var_from_xr(group, key) return val @@ -319,18 +321,26 @@ def _get_var_from_xr(self, group, key): val.load() return val - def _get_var_from_filehandle(self, group, key): + def _get_var_from_manager(self, group, key): # Not getting coordinates as this is more work, therefore more # overhead, and those are not used downstream. + with self.manager.acquire_context() as ds: + if group is not None: + v = ds[group][key] + else: + v = ds[key] if group is None: - g = self.file_handle + dv = get_distributed_friendly_dask_array( + self.manager, key, + chunks=v.shape, dtype=v.dtype) else: - g = self.file_handle[group] - v = g[key] + dv = get_distributed_friendly_dask_array( + self.manager, key, group=group, + chunks=v.shape, dtype=v.dtype) attrs = self._get_object_attrs(v) x = xr.DataArray( - da.from_array(v), dims=v.dimensions, attrs=attrs, - name=v.name) + dv, + dims=v.dimensions, attrs=attrs, name=v.name) return x def __contains__(self, item): diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index e39b46c0e6..97c8b2f4ca 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -477,7 +477,7 @@ def remove_earthsun_distance_correction(reflectance, utc_date=None): return reflectance -def get_distributed_friendly_dask_array(manager, varname, chunks=None): +def get_distributed_friendly_dask_array(manager, varname, chunks, dtype): """Construct a dask array from a variable for dask distributed. When we construct a dask array using da.array and use that to create an @@ -501,8 +501,10 @@ def get_distributed_friendly_dask_array(manager, varname, chunks=None): dataset to be read. varname (str): Name of the variable. - chunks (tuple or None, optional): + chunks (tuple): Chunks to use when creating the dask array. + dtype (dtype): + What dtype to use. """ def get_chunk(): with manager.acquire_context() as nc: @@ -511,4 +513,5 @@ def get_chunk(): return da.map_blocks( get_chunk, chunks=chunks, + dtype=dtype, meta=np.array([])) diff --git a/satpy/tests/reader_tests/test_utils.py b/satpy/tests/reader_tests/test_utils.py index 1b8af70e10..876922477a 100644 --- a/satpy/tests/reader_tests/test_utils.py +++ b/satpy/tests/reader_tests/test_utils.py @@ -531,7 +531,8 @@ def test_get_distributed_friendly_dask_array(tmp_path, shape, dtype): ds.to_netcdf(fn) cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") - arr = hf.get_distributed_friendly_dask_array(cfm, "kaitum") + arr = hf.get_distributed_friendly_dask_array(cfm, "kaitum", + chunks=shape, dtype=dtype) # As documented in GH issue 2815, using dask distributed with the file # handle cacher might fail in non-trivial ways, such as giving incorrect @@ -548,6 +549,8 @@ def doubler(x): with Client(): dask_doubler = arr.map_blocks(doubler) res = dask_doubler.compute() - assert res.shape == shape - assert res.dtype == dtype + assert shape == dask_doubler.shape # we will need dtype before compute + assert shape == res.shape + assert dtype == dask_doubler.dtype + assert dtype == res.dtype np.testing.assert_array_equal(res, np.arange(np.prod(shape)).reshape(shape)*2) From af4ee66a1c424f3d6747026dfab95bd98c74172e Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 20 Jun 2024 09:24:16 +0200 Subject: [PATCH 06/23] Add group support and speed up tests Add group support for getting a dask distributed friendly dask array. Speed up the related tests by sharing the dask distributed client setup and breakdown. --- satpy/readers/utils.py | 7 +- satpy/tests/reader_tests/test_utils.py | 92 +++++++++++++++----------- 2 files changed, 59 insertions(+), 40 deletions(-) diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index 97c8b2f4ca..10de2b364c 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -477,7 +477,8 @@ def remove_earthsun_distance_correction(reflectance, utc_date=None): return reflectance -def get_distributed_friendly_dask_array(manager, varname, chunks, dtype): +def get_distributed_friendly_dask_array(manager, varname, chunks, dtype, + group="/"): """Construct a dask array from a variable for dask distributed. When we construct a dask array using da.array and use that to create an @@ -505,10 +506,12 @@ def get_distributed_friendly_dask_array(manager, varname, chunks, dtype): Chunks to use when creating the dask array. dtype (dtype): What dtype to use. + group (str): + What group to read the variable from. """ def get_chunk(): with manager.acquire_context() as nc: - return nc[varname][:] + return nc["/".join([group, varname])][:] return da.map_blocks( get_chunk, diff --git a/satpy/tests/reader_tests/test_utils.py b/satpy/tests/reader_tests/test_utils.py index 876922477a..2de91e6d4b 100644 --- a/satpy/tests/reader_tests/test_utils.py +++ b/satpy/tests/reader_tests/test_utils.py @@ -514,43 +514,59 @@ def test_generic_open_binary(tmp_path, data, filename, mode): assert read_binary_data == dummy_data -@pytest.mark.parametrize("shape", [(2,), (2, 3), (2, 3, 4)]) -@pytest.mark.parametrize("dtype", ["i4", "f4", "f8"]) -def test_get_distributed_friendly_dask_array(tmp_path, shape, dtype): - """Test getting a dask distributed friendly dask array.""" - import netCDF4 - from dask.distributed import Client - from xarray.backends import CachingFileManager - - fn = tmp_path / "sjaunja.nc" - ds = xr.Dataset( - data_vars={ - "kaitum": (["x", "y", "z"][:len(shape)], - np.arange(np.prod(shape), - dtype=dtype).reshape(shape))}) - ds.to_netcdf(fn) - - cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") - arr = hf.get_distributed_friendly_dask_array(cfm, "kaitum", - chunks=shape, dtype=dtype) - - # As documented in GH issue 2815, using dask distributed with the file - # handle cacher might fail in non-trivial ways, such as giving incorrect - # results. Testing map_blocks is one way to reproduce the problem - # reliably, even though the problem also manifests itself (in different - # ways) without map_blocks. - - def doubler(x): - return x * 2 - - # FIXME: setting up the client is slow, taking more than one second — - # consider putting it in a class-scoped fixture and putting this test - # in a class (so it is still shared between parameterised runs) - with Client(): +class TestDistributed: + """Distributed-related tests. + + Distributed-related tests are grouped so that they can share a class-scoped + fixture setting up the distributed client, as this setup is relatively + slow. + """ + + @pytest.fixture(scope="class") + def dask_dist_client(self): + """Set up and close a dask distributed client.""" + from dask.distributed import Client + cl = Client() + yield cl + cl.close() + + + @pytest.mark.parametrize("shape", [(2,), (2, 3), (2, 3, 4)]) + @pytest.mark.parametrize("dtype", ["i4", "f4", "f8"]) + @pytest.mark.parametrize("grp", ["/", "/in/a/group"]) + def test_get_distributed_friendly_dask_array(self, tmp_path, dask_dist_client, shape, dtype, grp): + """Test getting a dask distributed friendly dask array.""" + import netCDF4 + from xarray.backends import CachingFileManager + + fn = tmp_path / "sjaunja.nc" + ds = xr.Dataset( + data_vars={ + "kaitum": (["x", "y", "z"][:len(shape)], + np.arange(np.prod(shape), + dtype=dtype).reshape(shape))}) + ds.to_netcdf(fn, group=grp) + + cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") + arr = hf.get_distributed_friendly_dask_array(cfm, "kaitum", + chunks=shape, dtype=dtype, + group=grp) + + # As documented in GH issue 2815, using dask distributed with the file + # handle cacher might fail in non-trivial ways, such as giving incorrect + # results. Testing map_blocks is one way to reproduce the problem + # reliably, even though the problem also manifests itself (in different + # ways) without map_blocks. + + def doubler(x): + return x * 2 + dask_doubler = arr.map_blocks(doubler) res = dask_doubler.compute() - assert shape == dask_doubler.shape # we will need dtype before compute - assert shape == res.shape - assert dtype == dask_doubler.dtype - assert dtype == res.dtype - np.testing.assert_array_equal(res, np.arange(np.prod(shape)).reshape(shape)*2) + # test before and after computation, as to confirm we have the correct + # shape and dtype and that computing doesn't change them + assert shape == dask_doubler.shape + assert shape == res.shape + assert dtype == dask_doubler.dtype + assert dtype == res.dtype + np.testing.assert_array_equal(res, np.arange(np.prod(shape)).reshape(shape)*2) From dad3b1418dc971962596fa2ce6ff98138ccfc87a Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 20 Jun 2024 09:50:57 +0200 Subject: [PATCH 07/23] Add partial backward-compatibility fol file handle Add partial backward compatibility for accessing the file handle attribute when using caching with a NetCDF4FileHandler base class. Backward incompatibility is not 100%. Deleting the FileHandler closes the manager and therefore the ``file_handle`` property, however, when accessing the ``file_handle`` property after deleting the ``FileHandler``, it is reopened. Therefore, calling `__del__()`` manually and then accessing ``fh.file_handle`` will now return an open file (was a closed file). This should not happen in any sane use scenario. --- satpy/readers/netcdf_utils.py | 11 +++++++++++ satpy/tests/reader_tests/test_netcdf_utils.py | 2 -- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index a6ca0bccef..e4ddf5ccf7 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -18,6 +18,7 @@ """Helpers for reading netcdf-based files.""" import logging +import warnings import netCDF4 import numpy as np @@ -127,6 +128,16 @@ def __init__(self, filename, filename_info, filetype_info, def _get_file_handle(self): return netCDF4.Dataset(self.filename, "r") + @property + def file_handle(self): + """Backward-compatible way for file handle caching.""" + warnings.warn( + "attribute .file_handle is deprecated, use .manager instead", + DeprecationWarning) + if self.manager is None: + return None + return self.manager.acquire() + @staticmethod def _set_file_handle_auto_maskandscale(file_handle, auto_maskandscale): if hasattr(file_handle, "set_auto_maskandscale"): diff --git a/satpy/tests/reader_tests/test_netcdf_utils.py b/satpy/tests/reader_tests/test_netcdf_utils.py index c381d37d4a..6ff2b81b7f 100644 --- a/satpy/tests/reader_tests/test_netcdf_utils.py +++ b/satpy/tests/reader_tests/test_netcdf_utils.py @@ -226,8 +226,6 @@ def test_caching(self): np.testing.assert_array_equal( h["ds2_f"], np.arange(10. * 100).reshape((10, 100))) - h.__del__() - assert not h.file_handle.isopen() def test_filenotfound(self): """Test that error is raised when file not found.""" From fc58ca41c2b8a991151ec94f7fa87aebe7789b0b Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 20 Jun 2024 11:17:11 +0200 Subject: [PATCH 08/23] Respect auto_maskandscale with new caching With the new dask-distributed-friendly caching, make sure we are respecting auto_maskandscale and are not applying scale factors twice. --- satpy/readers/netcdf_utils.py | 8 ++++++-- satpy/readers/utils.py | 9 ++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index e4ddf5ccf7..d66d89b6a5 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -101,6 +101,7 @@ def __init__(self, filename, filename_info, filetype_info, self.file_content = {} self.cached_file_content = {} self._use_h5netcdf = False + self._auto_maskandscale = auto_maskandscale try: file_handle = self._get_file_handle() except IOError: @@ -335,6 +336,7 @@ def _get_var_from_xr(self, group, key): def _get_var_from_manager(self, group, key): # Not getting coordinates as this is more work, therefore more # overhead, and those are not used downstream. + with self.manager.acquire_context() as ds: if group is not None: v = ds[group][key] @@ -343,11 +345,13 @@ def _get_var_from_manager(self, group, key): if group is None: dv = get_distributed_friendly_dask_array( self.manager, key, - chunks=v.shape, dtype=v.dtype) + chunks=v.shape, dtype=v.dtype, + auto_maskandscale=self._auto_maskandscale) else: dv = get_distributed_friendly_dask_array( self.manager, key, group=group, - chunks=v.shape, dtype=v.dtype) + chunks=v.shape, dtype=v.dtype, + auto_maskandscale=self._auto_maskandscale) attrs = self._get_object_attrs(v) x = xr.DataArray( dv, diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index 10de2b364c..e59bf8d7ac 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -478,7 +478,7 @@ def remove_earthsun_distance_correction(reflectance, utc_date=None): def get_distributed_friendly_dask_array(manager, varname, chunks, dtype, - group="/"): + group="/", auto_maskandscale=None): """Construct a dask array from a variable for dask distributed. When we construct a dask array using da.array and use that to create an @@ -508,9 +508,16 @@ def get_distributed_friendly_dask_array(manager, varname, chunks, dtype, What dtype to use. group (str): What group to read the variable from. + auto_maskandscale (bool, optional): + Apply automatic masking and scaling. This will only + work if CachingFileManager.acquire returns a handler with a + method set_auto_maskandscale, such as is the case for + NetCDF4.Dataset. """ def get_chunk(): with manager.acquire_context() as nc: + if auto_maskandscale is not None: + nc.set_auto_maskandscale(auto_maskandscale) return nc["/".join([group, varname])][:] return da.map_blocks( From 09c821ad19fff8adc72a1b2f34aa113737d9472f Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 20 Jun 2024 12:23:13 +0200 Subject: [PATCH 09/23] Remove needless except block Remove a dead code except block that should never be reached. --- satpy/readers/netcdf_utils.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index d66d89b6a5..4645d9b11f 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -211,10 +211,7 @@ def _get_required_variable_names(listed_variables, variable_name_replacements): def __del__(self): """Delete the file handler.""" if self.manager is not None: - try: - self.manager.close() - except RuntimeError: # presumably closed already - pass + self.manager.close() def _collect_global_attrs(self, obj): """Collect all the global attributes for the provided file object.""" From 4f9c5edfdd568597d06fd628ba3dc56b90f2c4e8 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 20 Jun 2024 14:19:34 +0200 Subject: [PATCH 10/23] Test refactoring Migrate TestNetCDF4FileHandler from unittest.TestCase to a regular class. Use a pytest fixture for the temporary NetCDF file. --- satpy/tests/reader_tests/test_netcdf_utils.py | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/satpy/tests/reader_tests/test_netcdf_utils.py b/satpy/tests/reader_tests/test_netcdf_utils.py index 6ff2b81b7f..99a5e7cdb5 100644 --- a/satpy/tests/reader_tests/test_netcdf_utils.py +++ b/satpy/tests/reader_tests/test_netcdf_utils.py @@ -18,7 +18,6 @@ """Module for testing the satpy.readers.netcdf_utils module.""" import os -import unittest import numpy as np import pytest @@ -71,13 +70,15 @@ def get_test_content(self, filename, filename_info, filetype_info): raise NotImplementedError("Fake File Handler subclass must implement 'get_test_content'") -class TestNetCDF4FileHandler(unittest.TestCase): +class TestNetCDF4FileHandler: """Test NetCDF4 File Handler Utility class.""" - def setUp(self): + @pytest.fixture() + def dummy_nc_file(self, tmp_path): """Create a test NetCDF4 file.""" from netCDF4 import Dataset - with Dataset("test.nc", "w") as nc: + fn = tmp_path / "test.nc" + with Dataset(fn, "w") as nc: # Create dimensions nc.createDimension("rows", 10) nc.createDimension("cols", 100) @@ -116,17 +117,14 @@ def setUp(self): d.test_attr_str = "test_string" d.test_attr_int = 0 d.test_attr_float = 1.2 + return fn - def tearDown(self): - """Remove the previously created test file.""" - os.remove("test.nc") - - def test_all_basic(self): + def test_all_basic(self, dummy_nc_file): """Test everything about the NetCDF4 class.""" import xarray as xr from satpy.readers.netcdf_utils import NetCDF4FileHandler - file_handler = NetCDF4FileHandler("test.nc", {}, {}) + file_handler = NetCDF4FileHandler(dummy_nc_file, {}, {}) assert file_handler["/dimension/rows"] == 10 assert file_handler["/dimension/cols"] == 100 @@ -165,7 +163,7 @@ def test_all_basic(self): assert file_handler.file_handle is None assert file_handler["ds2_sc"] == 42 - def test_listed_variables(self): + def test_listed_variables(self, dummy_nc_file): """Test that only listed variables/attributes area collected.""" from satpy.readers.netcdf_utils import NetCDF4FileHandler @@ -175,12 +173,12 @@ def test_listed_variables(self): "attr/test_attr_str", ] } - file_handler = NetCDF4FileHandler("test.nc", {}, filetype_info) + file_handler = NetCDF4FileHandler(dummy_nc_file, {}, filetype_info) assert len(file_handler.file_content) == 2 assert "test_group/attr/test_attr_str" in file_handler.file_content assert "attr/test_attr_str" in file_handler.file_content - def test_listed_variables_with_composing(self): + def test_listed_variables_with_composing(self, dummy_nc_file): """Test that composing for listed variables is performed.""" from satpy.readers.netcdf_utils import NetCDF4FileHandler @@ -199,7 +197,7 @@ def test_listed_variables_with_composing(self): ], } } - file_handler = NetCDF4FileHandler("test.nc", {}, filetype_info) + file_handler = NetCDF4FileHandler(dummy_nc_file, {}, filetype_info) assert len(file_handler.file_content) == 3 assert "test_group/ds1_f/attr/test_attr_str" in file_handler.file_content assert "test_group/ds1_i/attr/test_attr_str" in file_handler.file_content @@ -208,10 +206,10 @@ def test_listed_variables_with_composing(self): assert not any("another_parameter" in var for var in file_handler.file_content) assert "test_group/attr/test_attr_str" in file_handler.file_content - def test_caching(self): + def test_caching(self, dummy_nc_file): """Test that caching works as intended.""" from satpy.readers.netcdf_utils import NetCDF4FileHandler - h = NetCDF4FileHandler("test.nc", {}, {}, cache_var_size=1000, + h = NetCDF4FileHandler(dummy_nc_file, {}, {}, cache_var_size=1000, cache_handle=True) assert h.file_handle is not None assert h.file_handle.isopen() @@ -234,21 +232,21 @@ def test_filenotfound(self): with pytest.raises(IOError, match=".*No such file or directory.*"): NetCDF4FileHandler("/thisfiledoesnotexist.nc", {}, {}) - def test_get_and_cache_npxr_is_xr(self): + def test_get_and_cache_npxr_is_xr(self, dummy_nc_file): """Test that get_and_cache_npxr() returns xr.DataArray.""" import xarray as xr from satpy.readers.netcdf_utils import NetCDF4FileHandler - file_handler = NetCDF4FileHandler("test.nc", {}, {}, cache_handle=True) + file_handler = NetCDF4FileHandler(dummy_nc_file, {}, {}, cache_handle=True) data = file_handler.get_and_cache_npxr("test_group/ds1_f") assert isinstance(data, xr.DataArray) - def test_get_and_cache_npxr_data_is_cached(self): + def test_get_and_cache_npxr_data_is_cached(self, dummy_nc_file): """Test that the data are cached when get_and_cache_npxr() is called.""" from satpy.readers.netcdf_utils import NetCDF4FileHandler - file_handler = NetCDF4FileHandler("test.nc", {}, {}, cache_handle=True) + file_handler = NetCDF4FileHandler(dummy_nc_file, {}, {}, cache_handle=True) data = file_handler.get_and_cache_npxr("test_group/ds1_f") # Delete the dataset from the file content dict, it should be available from the cache @@ -262,7 +260,6 @@ class TestNetCDF4FsspecFileHandler: def test_default_to_netcdf4_lib(self): """Test that the NetCDF4 backend is used by default.""" - import os import tempfile import h5py From ec76fa6ed5c654b470e5207c98e69141cf7d2660 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 20 Jun 2024 14:50:56 +0200 Subject: [PATCH 11/23] Broaden test match string for test_filenotfound Broaden the string that is matched against in TestNetCDF4FileHandler.test_filenotfound. On Linux and MacOS the expected failure gives "No such file or directory". On Windows it gives "Invalid file format". --- satpy/tests/reader_tests/test_netcdf_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/satpy/tests/reader_tests/test_netcdf_utils.py b/satpy/tests/reader_tests/test_netcdf_utils.py index 99a5e7cdb5..3c2bff4229 100644 --- a/satpy/tests/reader_tests/test_netcdf_utils.py +++ b/satpy/tests/reader_tests/test_netcdf_utils.py @@ -229,7 +229,7 @@ def test_filenotfound(self): """Test that error is raised when file not found.""" from satpy.readers.netcdf_utils import NetCDF4FileHandler - with pytest.raises(IOError, match=".*No such file or directory.*"): + with pytest.raises(IOError, match=".* file .*"): NetCDF4FileHandler("/thisfiledoesnotexist.nc", {}, {}) def test_get_and_cache_npxr_is_xr(self, dummy_nc_file): From 06d8811eaa82c9286909cb55d1420d081f21b797 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 24 Jul 2024 11:37:25 +0200 Subject: [PATCH 12/23] fix docstring example spelling Fix the spelling in the docstring example using netCDF4. Co-authored-by: David Hoese --- satpy/readers/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index e59bf8d7ac..487e10fa3b 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -491,9 +491,9 @@ def get_distributed_friendly_dask_array(manager, varname, chunks, dtype, Example:: - >>> import NetCDF4 + >>> import netCDF4 >>> from xarray.backends import CachingFileManager - >>> cfm = CachingFileManager(NetCDF4.Dataset, fn, mode="r") + >>> cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") >>> arr = get_distributed_friendly_dask_array(cfm, "my_var") Args: From aaf91b9ac53abb6a5c4c3f132b63ad57b6583bdb Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 24 Jul 2024 16:23:53 +0200 Subject: [PATCH 13/23] Prevent unexpected type promotion in unit test Add a workaround to prevent an unexpected type promotion in the unit test for dask distributed friendly dask arrays. --- satpy/tests/reader_tests/test_utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/satpy/tests/reader_tests/test_utils.py b/satpy/tests/reader_tests/test_utils.py index 2de91e6d4b..08501f9f14 100644 --- a/satpy/tests/reader_tests/test_utils.py +++ b/satpy/tests/reader_tests/test_utils.py @@ -559,9 +559,10 @@ def test_get_distributed_friendly_dask_array(self, tmp_path, dask_dist_client, s # ways) without map_blocks. def doubler(x): - return x * 2 + # with a workaround for https://github.com/numpy/numpy/issues/27029 + return x * x.dtype.type(2) - dask_doubler = arr.map_blocks(doubler) + dask_doubler = arr.map_blocks(doubler, dtype=arr.dtype) res = dask_doubler.compute() # test before and after computation, as to confirm we have the correct # shape and dtype and that computing doesn't change them From a2ad42f972c42c1a3cde795d4c142ad2629ee1d4 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 24 Jul 2024 16:58:11 +0200 Subject: [PATCH 14/23] Use block info getting a dd-friendly da When getting a dask-distributed friendly dask array from a NetCDF file using the CachingFileManager, use the information provided in bloc_info on the array location in case we are reading not the entire variable. --- satpy/readers/utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index 487e10fa3b..04ced56ee9 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -514,11 +514,13 @@ def get_distributed_friendly_dask_array(manager, varname, chunks, dtype, method set_auto_maskandscale, such as is the case for NetCDF4.Dataset. """ - def get_chunk(): + def get_chunk(block_info=None): + arrloc = block_info[None]["array-location"] with manager.acquire_context() as nc: if auto_maskandscale is not None: nc.set_auto_maskandscale(auto_maskandscale) - return nc["/".join([group, varname])][:] + var = nc["/".join([group, varname])] + return var[tuple(slice(*x) for x in arrloc)] return da.map_blocks( get_chunk, From 9126bbe69e1d996f59e11a27e271b9e1c17e7310 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 25 Jul 2024 09:59:57 +0200 Subject: [PATCH 15/23] Rename to serialisable and remove group argument Rename get_distributed_friendly_dask_array to get_serialisable_dask_array and remove the group argument, moving the responsibility for handlings groups to the caller. --- satpy/readers/netcdf_utils.py | 8 ++++---- satpy/readers/utils.py | 12 +++++------- satpy/tests/reader_tests/test_utils.py | 7 +++---- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index 4645d9b11f..7dbf6f45d6 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -26,7 +26,7 @@ from satpy.readers import open_file_or_filename from satpy.readers.file_handlers import BaseFileHandler -from satpy.readers.utils import get_distributed_friendly_dask_array, np2str +from satpy.readers.utils import get_serialisable_dask_array, np2str from satpy.utils import get_legacy_chunk_size LOG = logging.getLogger(__name__) @@ -340,13 +340,13 @@ def _get_var_from_manager(self, group, key): else: v = ds[key] if group is None: - dv = get_distributed_friendly_dask_array( + dv = get_serialisable_dask_array( self.manager, key, chunks=v.shape, dtype=v.dtype, auto_maskandscale=self._auto_maskandscale) else: - dv = get_distributed_friendly_dask_array( - self.manager, key, group=group, + dv = get_serialisable_dask_array( + self.manager, "/".join([group, key]), chunks=v.shape, dtype=v.dtype, auto_maskandscale=self._auto_maskandscale) attrs = self._get_object_attrs(v) diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index 04ced56ee9..4ce1ac795c 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -477,9 +477,9 @@ def remove_earthsun_distance_correction(reflectance, utc_date=None): return reflectance -def get_distributed_friendly_dask_array(manager, varname, chunks, dtype, - group="/", auto_maskandscale=None): - """Construct a dask array from a variable for dask distributed. +def get_serialisable_dask_array(manager, varname, chunks, dtype, + auto_maskandscale=None): + """Construct a serialisable dask array from a variable. When we construct a dask array using da.array and use that to create an xarray dataarray, the result is not serialisable and dask graphs using @@ -501,13 +501,11 @@ def get_distributed_friendly_dask_array(manager, varname, chunks, dtype, Instance of xarray.backends.CachingFileManager encapsulating the dataset to be read. varname (str): - Name of the variable. + Name of the variable (possibly including a group path). chunks (tuple): Chunks to use when creating the dask array. dtype (dtype): What dtype to use. - group (str): - What group to read the variable from. auto_maskandscale (bool, optional): Apply automatic masking and scaling. This will only work if CachingFileManager.acquire returns a handler with a @@ -519,7 +517,7 @@ def get_chunk(block_info=None): with manager.acquire_context() as nc: if auto_maskandscale is not None: nc.set_auto_maskandscale(auto_maskandscale) - var = nc["/".join([group, varname])] + var = nc[varname] #"/".join([group, varname])] return var[tuple(slice(*x) for x in arrloc)] return da.map_blocks( diff --git a/satpy/tests/reader_tests/test_utils.py b/satpy/tests/reader_tests/test_utils.py index 08501f9f14..13663ddb3a 100644 --- a/satpy/tests/reader_tests/test_utils.py +++ b/satpy/tests/reader_tests/test_utils.py @@ -534,7 +534,7 @@ def dask_dist_client(self): @pytest.mark.parametrize("shape", [(2,), (2, 3), (2, 3, 4)]) @pytest.mark.parametrize("dtype", ["i4", "f4", "f8"]) @pytest.mark.parametrize("grp", ["/", "/in/a/group"]) - def test_get_distributed_friendly_dask_array(self, tmp_path, dask_dist_client, shape, dtype, grp): + def test_get_serialisable_dask_array(self, tmp_path, dask_dist_client, shape, dtype, grp): """Test getting a dask distributed friendly dask array.""" import netCDF4 from xarray.backends import CachingFileManager @@ -548,9 +548,8 @@ def test_get_distributed_friendly_dask_array(self, tmp_path, dask_dist_client, s ds.to_netcdf(fn, group=grp) cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") - arr = hf.get_distributed_friendly_dask_array(cfm, "kaitum", - chunks=shape, dtype=dtype, - group=grp) + arr = hf.get_serialisable_dask_array(cfm, "/".join([grp, "kaitum"]), + chunks=shape, dtype=dtype) # As documented in GH issue 2815, using dask distributed with the file # handle cacher might fail in non-trivial ways, such as giving incorrect From 5e576f91853ec24702f0d8872cbc7608d1262906 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 25 Jul 2024 11:00:14 +0200 Subject: [PATCH 16/23] Use wrapper class for auto_maskandscale --- satpy/readers/netcdf_utils.py | 26 +++++++++++++++++++++++++- satpy/readers/utils.py | 14 +++----------- satpy/tests/reader_tests/test_utils.py | 2 +- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index 7dbf6f45d6..722f69d137 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -17,6 +17,7 @@ # satpy. If not, see . """Helpers for reading netcdf-based files.""" +import functools import logging import warnings @@ -122,7 +123,9 @@ def __init__(self, filename, filename_info, filetype_info, if cache_handle: self.manager = xr.backends.CachingFileManager( - netCDF4.Dataset, self.filename, mode="r") + functools.partial(_NCDatasetWrapper, + auto_maskandscale=auto_maskandscale), + self.filename, mode="r") else: file_handle.close() @@ -465,3 +468,24 @@ def _get_attr(self, obj, key): if self._use_h5netcdf: return obj.attrs[key] return super()._get_attr(obj, key) + +class _NCDatasetWrapper(netCDF4.Dataset): + """Wrap netcdf4.Dataset setting auto_maskandscale globally. + + Helper class that wraps netcdf4.Dataset while setting extra parameters. + By encapsulating this in a helper class, we can + pass it to CachingFileManager directly. Currently sets + auto_maskandscale globally (for all variables). + """ + + def __init__(self, *args, auto_maskandscale=False, **kwargs): + """Initialise object.""" + super().__init__(*args, **kwargs) + self._set_extra_settings(auto_maskandscale=auto_maskandscale) + + def _set_extra_settings(self, auto_maskandscale): + """Set our own custom settings. + + Currently only applies set_auto_maskandscale. + """ + self.set_auto_maskandscale(auto_maskandscale) diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index 4ce1ac795c..b7f630b117 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -477,8 +477,7 @@ def remove_earthsun_distance_correction(reflectance, utc_date=None): return reflectance -def get_serialisable_dask_array(manager, varname, chunks, dtype, - auto_maskandscale=None): +def get_serialisable_dask_array(manager, varname, chunks, dtype): """Construct a serialisable dask array from a variable. When we construct a dask array using da.array and use that to create an @@ -494,7 +493,7 @@ def get_serialisable_dask_array(manager, varname, chunks, dtype, >>> import netCDF4 >>> from xarray.backends import CachingFileManager >>> cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") - >>> arr = get_distributed_friendly_dask_array(cfm, "my_var") + >>> arr = get_serialisable_dask_array(cfm, "my_var") Args: manager (xarray.backends.CachingFileManager): @@ -506,18 +505,11 @@ def get_serialisable_dask_array(manager, varname, chunks, dtype, Chunks to use when creating the dask array. dtype (dtype): What dtype to use. - auto_maskandscale (bool, optional): - Apply automatic masking and scaling. This will only - work if CachingFileManager.acquire returns a handler with a - method set_auto_maskandscale, such as is the case for - NetCDF4.Dataset. """ def get_chunk(block_info=None): arrloc = block_info[None]["array-location"] with manager.acquire_context() as nc: - if auto_maskandscale is not None: - nc.set_auto_maskandscale(auto_maskandscale) - var = nc[varname] #"/".join([group, varname])] + var = nc[varname] return var[tuple(slice(*x) for x in arrloc)] return da.map_blocks( diff --git a/satpy/tests/reader_tests/test_utils.py b/satpy/tests/reader_tests/test_utils.py index 13663ddb3a..2445e51f75 100644 --- a/satpy/tests/reader_tests/test_utils.py +++ b/satpy/tests/reader_tests/test_utils.py @@ -535,7 +535,7 @@ def dask_dist_client(self): @pytest.mark.parametrize("dtype", ["i4", "f4", "f8"]) @pytest.mark.parametrize("grp", ["/", "/in/a/group"]) def test_get_serialisable_dask_array(self, tmp_path, dask_dist_client, shape, dtype, grp): - """Test getting a dask distributed friendly dask array.""" + """Test getting a dask distributed friendly serialisable dask array.""" import netCDF4 from xarray.backends import CachingFileManager From 63e75073c98a751e7a935a9faa80f1ed40fc4605 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 25 Jul 2024 11:05:31 +0200 Subject: [PATCH 17/23] GB -> US spelling Pytroll uses US spelling. Rename serializable to serialisable. Remove removed keyword argument from call. --- satpy/readers/netcdf_utils.py | 12 +++++------- satpy/readers/utils.py | 8 ++++---- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index 722f69d137..e9ae2d24db 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -27,7 +27,7 @@ from satpy.readers import open_file_or_filename from satpy.readers.file_handlers import BaseFileHandler -from satpy.readers.utils import get_serialisable_dask_array, np2str +from satpy.readers.utils import get_serializable_dask_array, np2str from satpy.utils import get_legacy_chunk_size LOG = logging.getLogger(__name__) @@ -343,15 +343,13 @@ def _get_var_from_manager(self, group, key): else: v = ds[key] if group is None: - dv = get_serialisable_dask_array( + dv = get_serializable_dask_array( self.manager, key, - chunks=v.shape, dtype=v.dtype, - auto_maskandscale=self._auto_maskandscale) + chunks=v.shape, dtype=v.dtype) else: - dv = get_serialisable_dask_array( + dv = get_serializable_dask_array( self.manager, "/".join([group, key]), - chunks=v.shape, dtype=v.dtype, - auto_maskandscale=self._auto_maskandscale) + chunks=v.shape, dtype=v.dtype) attrs = self._get_object_attrs(v) x = xr.DataArray( dv, diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index b7f630b117..6e1b3dc0ae 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -477,11 +477,11 @@ def remove_earthsun_distance_correction(reflectance, utc_date=None): return reflectance -def get_serialisable_dask_array(manager, varname, chunks, dtype): - """Construct a serialisable dask array from a variable. +def get_serializable_dask_array(manager, varname, chunks, dtype): + """Construct a serializable dask array from a variable. When we construct a dask array using da.array and use that to create an - xarray dataarray, the result is not serialisable and dask graphs using + xarray dataarray, the result is not serializable and dask graphs using this dataarray cannot be computed when the dask distributed scheduler is in use. To circumvent this problem, xarray provides the CachingFileManager. See GH#2815 for more information. @@ -493,7 +493,7 @@ def get_serialisable_dask_array(manager, varname, chunks, dtype): >>> import netCDF4 >>> from xarray.backends import CachingFileManager >>> cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") - >>> arr = get_serialisable_dask_array(cfm, "my_var") + >>> arr = get_serializable_dask_array(cfm, "my_var") Args: manager (xarray.backends.CachingFileManager): From ea0459593c4e1f5b1c8d08bd52bc215207959bbe Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 25 Jul 2024 11:11:53 +0200 Subject: [PATCH 18/23] Ensure meta dtype Ensure that the meta we pass to map_blocks also has the right dtype. Not sure if this is necessary when map_blocks already has the right dtype, but it can't hurt. --- satpy/readers/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index 6e1b3dc0ae..0c02db8ae3 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -497,7 +497,7 @@ def get_serializable_dask_array(manager, varname, chunks, dtype): Args: manager (xarray.backends.CachingFileManager): - Instance of xarray.backends.CachingFileManager encapsulating the + Instance of :class:`~xarray.backends.CachingFileManager` encapsulating the dataset to be read. varname (str): Name of the variable (possibly including a group path). @@ -516,4 +516,4 @@ def get_chunk(block_info=None): get_chunk, chunks=chunks, dtype=dtype, - meta=np.array([])) + meta=np.array([], dtype=dtype)) From fde3896fc50f0a7420bc7bd28f3e038025e9a953 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 25 Jul 2024 13:15:01 +0200 Subject: [PATCH 19/23] Fix spelling in test --- satpy/tests/reader_tests/test_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/satpy/tests/reader_tests/test_utils.py b/satpy/tests/reader_tests/test_utils.py index 2485598826..40a872db29 100644 --- a/satpy/tests/reader_tests/test_utils.py +++ b/satpy/tests/reader_tests/test_utils.py @@ -534,7 +534,7 @@ def dask_dist_client(self): @pytest.mark.parametrize("shape", [(2,), (2, 3), (2, 3, 4)]) @pytest.mark.parametrize("dtype", ["i4", "f4", "f8"]) @pytest.mark.parametrize("grp", ["/", "/in/a/group"]) - def test_get_serialisable_dask_array(self, tmp_path, dask_dist_client, shape, dtype, grp): + def test_get_serializable_dask_array(self, tmp_path, dask_dist_client, shape, dtype, grp): """Test getting a dask distributed friendly serialisable dask array.""" import netCDF4 from xarray.backends import CachingFileManager @@ -548,7 +548,7 @@ def test_get_serialisable_dask_array(self, tmp_path, dask_dist_client, shape, dt ds.to_netcdf(fn, group=grp) cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") - arr = hf.get_serialisable_dask_array(cfm, "/".join([grp, "kaitum"]), + arr = hf.get_serializable_dask_array(cfm, "/".join([grp, "kaitum"]), chunks=shape, dtype=dtype) # As documented in GH issue 2815, using dask distributed with the file From 5b137e8e7df8da7ca3223c7ae8cb5730b71d4123 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 26 Jul 2024 11:05:00 +0200 Subject: [PATCH 20/23] Clarify docstring --- satpy/readers/utils.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/satpy/readers/utils.py b/satpy/readers/utils.py index 80794dad80..4766a1c897 100644 --- a/satpy/readers/utils.py +++ b/satpy/readers/utils.py @@ -480,11 +480,11 @@ def remove_earthsun_distance_correction(reflectance, utc_date=None): def get_serializable_dask_array(manager, varname, chunks, dtype): """Construct a serializable dask array from a variable. - When we construct a dask array using da.array and use that to create an - xarray dataarray, the result is not serializable and dask graphs using - this dataarray cannot be computed when the dask distributed scheduler - is in use. To circumvent this problem, xarray provides the - CachingFileManager. See GH#2815 for more information. + When we construct a dask array using da.array from a file, and use + that to create an xarray dataarray, the result is not serializable + and dask graphs using this dataarray cannot be computed when the dask + distributed scheduler is in use. To circumvent this problem, xarray + provides the CachingFileManager. See GH#2815 for more information. Should have at least one dimension. @@ -492,8 +492,8 @@ def get_serializable_dask_array(manager, varname, chunks, dtype): >>> import netCDF4 >>> from xarray.backends import CachingFileManager - >>> cfm = CachingFileManager(netCDF4.Dataset, fn, mode="r") - >>> arr = get_serializable_dask_array(cfm, "my_var") + >>> cfm = CachingFileManager(netCDF4.Dataset, filename, mode="r") + >>> arr = get_serializable_dask_array(cfm, "my_var", 1024, "f4") Args: manager (xarray.backends.CachingFileManager): From c2b153332036c9e26452f7230ede67d47712e415 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 26 Jul 2024 12:42:32 +0200 Subject: [PATCH 21/23] Use cache already in scene creation When caching, make sure we use the CachingFileManager already upon scene creation and not only by the time we are loading. --- satpy/readers/netcdf_utils.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index e9ae2d24db..1e912be592 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -103,15 +103,22 @@ def __init__(self, filename, filename_info, filetype_info, self.cached_file_content = {} self._use_h5netcdf = False self._auto_maskandscale = auto_maskandscale - try: - file_handle = self._get_file_handle() - except IOError: - LOG.exception( - "Failed reading file %s. Possibly corrupted file", self.filename) - raise + if cache_handle: + self.manager = xr.backends.CachingFileManager( + functools.partial(_NCDatasetWrapper, + auto_maskandscale=auto_maskandscale), + self.filename, mode="r") + file_handle = self.manager.acquire() + else: + try: + file_handle = self._get_file_handle() + except IOError: + LOG.exception( + "Failed reading file %s. Possibly corrupted file", self.filename) + raise - self._set_file_handle_auto_maskandscale(file_handle, auto_maskandscale) - self._set_xarray_kwargs(xarray_kwargs, auto_maskandscale) + self._set_file_handle_auto_maskandscale(file_handle, auto_maskandscale) + self._set_xarray_kwargs(xarray_kwargs, auto_maskandscale) listed_variables = filetype_info.get("required_netcdf_variables") if listed_variables: @@ -121,12 +128,7 @@ def __init__(self, filename, filename_info, filetype_info, self.collect_dimensions("", file_handle) self.collect_cache_vars(cache_var_size) - if cache_handle: - self.manager = xr.backends.CachingFileManager( - functools.partial(_NCDatasetWrapper, - auto_maskandscale=auto_maskandscale), - self.filename, mode="r") - else: + if not cache_handle: file_handle.close() def _get_file_handle(self): From 9fce5a76416327ccf53d46f9c87facb144909599 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 26 Jul 2024 18:04:40 +0200 Subject: [PATCH 22/23] Use helper function rather than subclass Don't subclass netCDF4.Dataset, rather just return an instance from a helper function. Seems good enough and gets rid of the weird error messages upon exit. --- satpy/readers/netcdf_utils.py | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index 1e912be592..3f31cf5ee9 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -105,7 +105,7 @@ def __init__(self, filename, filename_info, filetype_info, self._auto_maskandscale = auto_maskandscale if cache_handle: self.manager = xr.backends.CachingFileManager( - functools.partial(_NCDatasetWrapper, + functools.partial(_nc_dataset_wrapper, auto_maskandscale=auto_maskandscale), self.filename, mode="r") file_handle = self.manager.acquire() @@ -469,23 +469,14 @@ def _get_attr(self, obj, key): return obj.attrs[key] return super()._get_attr(obj, key) -class _NCDatasetWrapper(netCDF4.Dataset): +def _nc_dataset_wrapper(*args, auto_maskandscale, **kwargs): """Wrap netcdf4.Dataset setting auto_maskandscale globally. - Helper class that wraps netcdf4.Dataset while setting extra parameters. - By encapsulating this in a helper class, we can + Helper function that wraps netcdf4.Dataset while setting extra parameters. + By encapsulating this in a helper function, we can pass it to CachingFileManager directly. Currently sets auto_maskandscale globally (for all variables). """ - - def __init__(self, *args, auto_maskandscale=False, **kwargs): - """Initialise object.""" - super().__init__(*args, **kwargs) - self._set_extra_settings(auto_maskandscale=auto_maskandscale) - - def _set_extra_settings(self, auto_maskandscale): - """Set our own custom settings. - - Currently only applies set_auto_maskandscale. - """ - self.set_auto_maskandscale(auto_maskandscale) + nc = netCDF4.Dataset(*args, **kwargs) + nc.set_auto_maskandscale(auto_maskandscale) + return nc From 4993b657be3034ad2996eef2a074e21d144ed6cf Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 26 Jul 2024 18:16:42 +0200 Subject: [PATCH 23/23] restore non-cached group retrieval Some readers read entire groups; this needs xarray kwargs to be set even if caching is used. --- satpy/readers/netcdf_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/satpy/readers/netcdf_utils.py b/satpy/readers/netcdf_utils.py index 3f31cf5ee9..59bf1829d2 100644 --- a/satpy/readers/netcdf_utils.py +++ b/satpy/readers/netcdf_utils.py @@ -118,7 +118,7 @@ def __init__(self, filename, filename_info, filetype_info, raise self._set_file_handle_auto_maskandscale(file_handle, auto_maskandscale) - self._set_xarray_kwargs(xarray_kwargs, auto_maskandscale) + self._set_xarray_kwargs(xarray_kwargs, auto_maskandscale) listed_variables = filetype_info.get("required_netcdf_variables") if listed_variables: