diff --git a/.gitignore b/.gitignore index 67b6e250..1ddbbd42 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,9 @@ ._.DS_Store docs/api/ - +tests/data/MOP*.he5 +tests/data/TROPOMI*.nc +tests/data/OMPS-*.h5 # Default GitHub .gitignore for Python below: diff --git a/environment-dev.yml b/environment-dev.yml index bafb5845..ca94caff 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -16,6 +16,7 @@ dependencies: # # optional - h5netcdf + - h5py - joblib - lxml - pyhdf!=0.11.3 @@ -23,6 +24,7 @@ dependencies: - timezonefinder # # test + - filelock - pytest - pytest-xdist # diff --git a/monetio/models/_wrfchem_mm.py b/monetio/models/_wrfchem_mm.py index 9b11d7f7..dc0d6cdb 100644 --- a/monetio/models/_wrfchem_mm.py +++ b/monetio/models/_wrfchem_mm.py @@ -189,7 +189,12 @@ def open_mfdataset( if "pm25_om" in list_calc_sum: dset = add_lazy_om_pm25(dset, dict_sum) - dset = dset.reset_coords(["XTIME", "datetime"], drop=True) + # fix to work with Jerrold's wrfchem files + if "XTIME" in list(dset.coords): + dset = dset.reset_coords(["XTIME", "datetime"], drop=True) + else: + dset = dset.reset_coords("datetime", drop=True) + if not surf_only_nc: # Reset more variables dset = dset.rename( diff --git a/monetio/sat/__init__.py b/monetio/sat/__init__.py index 29f96114..2ceb4de9 100644 --- a/monetio/sat/__init__.py +++ b/monetio/sat/__init__.py @@ -1,7 +1,10 @@ from . import ( _gridded_eos_mm, _modis_l2_mm, + _mopitt_l3_mm, + _omps_l3_mm, _omps_nadir_mm, + _tropomi_l2_no2_mm, goes, modis_ornl, nesdis_edr_viirs, @@ -12,12 +15,15 @@ __all__ = [ "_gridded_eos_mm", "_modis_l2_mm", + "_mopitt_l3_mm", + "_omps_l3_mm", "_omps_nadir_mm", + "_tropomi_l2_no2_mm", + "goes", + "modis_ornl", "nesdis_edr_viirs", "nesdis_eps_viirs", "nesdis_frp", - "modis_ornl", - "goes", ] __name__ = "sat" diff --git a/monetio/sat/_modis_l2_mm.py b/monetio/sat/_modis_l2_mm.py index f1ce98b8..ce9a4d56 100644 --- a/monetio/sat/_modis_l2_mm.py +++ b/monetio/sat/_modis_l2_mm.py @@ -79,9 +79,7 @@ def read_mfdataset(fnames, variable_dict, debug=False): """ if debug: logging_level = logging.DEBUG - else: - logging_level = logging.INFO - logging.basicConfig(stream=sys.stdout, level=logging_level) + logging.basicConfig(stream=sys.stdout, level=logging_level) files = sorted(glob(fnames)) diff --git a/monetio/sat/_mopitt_l3_mm.py b/monetio/sat/_mopitt_l3_mm.py new file mode 100644 index 00000000..a0290bf7 --- /dev/null +++ b/monetio/sat/_mopitt_l3_mm.py @@ -0,0 +1,263 @@ +"""MOPITT gridded data file reader. + +History: + +- updated 2024-02 meb + * read multiple variables into a DataSet instead of individual variables + * add functions to combine profiles and surface as in rrb's code +- updated 2023-08 rrb + * Added units +- updated 2022-10 rrb + * Dataset instead of DataArray +- created 2021-12 rrb +""" +import glob +from pathlib import Path + +import pandas as pd +import xarray as xr + + +def get_start_time(filename): + """Method to read the time in MOPITT level 3 HDF files. + + Parameters + ---------- + filename : str + Path to the file. + + Returns + ------- + pandas.Timestamp or pandas.NaT + """ + import h5py + + structure = "/HDFEOS/ADDITIONAL/FILE_ATTRIBUTES" + + inFile = h5py.File(filename, "r") + + grp = inFile[structure] + k = grp.attrs + startTimeBytes = k.get("StartTime", default=None) # one-element float array + if startTimeBytes is None: + startTime = pd.NaT + else: + startTime = pd.to_datetime( + startTimeBytes[0], + unit="s", + origin="1993-01-01 00:00:00", + ) + + inFile.close() + + return startTime + + +def load_variable(filename, varname): + """Method to open MOPITT gridded HDF files. + Masks data that is missing (turns into ``np.nan``). + + Parameters + ---------- + filename + Path to the file. + varname : str + The variable to load from the MOPITT file. + + Returns + ------- + xarray.Dataset + """ + import h5py + + ds = xr.Dataset() + + # Load the dimensions + he5_load = h5py.File(filename, mode="r") + lat = he5_load["/HDFEOS/GRIDS/MOP03/Data Fields/Latitude"][:] + lon = he5_load["/HDFEOS/GRIDS/MOP03/Data Fields/Longitude"][:] + alt = he5_load["/HDFEOS/GRIDS/MOP03/Data Fields/Pressure2"][:] + alt_short = he5_load["/HDFEOS/GRIDS/MOP03/Data Fields/Pressure"][:] + + # 2D or 3D variables to choose from + variable_dict = { + "column": "/HDFEOS/GRIDS/MOP03/Data Fields/RetrievedCOTotalColumnDay", + "apriori_col": "/HDFEOS/GRIDS/MOP03/Data Fields/APrioriCOTotalColumnDay", + "apriori_surf": "/HDFEOS/GRIDS/MOP03/Data Fields/APrioriCOSurfaceMixingRatioDay", + "pressure_surf": "/HDFEOS/GRIDS/MOP03/Data Fields/SurfacePressureDay", + "ak_col": "/HDFEOS/GRIDS/MOP03/Data Fields/TotalColumnAveragingKernelDay", + "apriori_prof": "/HDFEOS/GRIDS/MOP03/Data Fields/APrioriCOMixingRatioProfileDay", + } + if varname not in variable_dict: + raise ValueError(f"Variable {varname!r} not in {sorted(variable_dict)}.") + data_loaded = he5_load[variable_dict[varname]][:] + + he5_load.close() + + # Create xarray DataArray + if varname == "column": + ds[varname] = xr.DataArray( + data_loaded, + dims=["lon", "lat"], + coords=[lon, lat], + attrs={ + "long_name": "Retrieved CO Total Column", + "units": "molec/cm^2", + }, + ) + elif varname in {"apriori_col", "apriori_surf", "pressure_surf"}: + ds[varname] = xr.DataArray(data_loaded, dims=["lon", "lat"], coords=[lon, lat]) + elif varname == "ak_col": + ds[varname] = xr.DataArray( + data_loaded, + dims=["lon", "lat", "alt"], + coords=[lon, lat, alt], + attrs={ + "long_name": "Total Column Averaging Kernel", + "units": "mol/(cm^2 log(VMR))", + }, + ) + elif varname == "apriori_prof": + ds[varname] = xr.DataArray( + data_loaded, + dims=["lon", "lat", "alt"], + coords=[lon, lat, alt_short], + attrs={ + "long_name": "A Priori CO Mixing Ratio Profile", + "units": "ppbv", + }, + ) + else: + raise AssertionError(f"Variable {varname!r} in variable dict but not accounted for.") + + # missing value -> nan + ds[varname] = ds[varname].where(ds[varname] != -9999.0) + + return ds + + +def _add_pressure_variables(dataset): + """Setup 3-D pressure array. + + Parameters + ---------- + dataset : xarray.Dataset + Should have the 3D averaging kernel field and surface pressure field + Returns + ------- + xarray.DataSet + """ + import numpy as np + + # broadcast 10 levels 1000 to 100 hPa repeated everywhere + dummy, press_dummy_arr = xr.broadcast(dataset["ak_col"], dataset["ak_col"].alt) + # Replace level with 1000 hPa with the actual surface pressure + dataset["pressure"] = press_dummy_arr.copy() + dataset["pressure"][:, :, :, 9] = dataset["pressure_surf"].values + + # Correct for where MOPITT surface pressure <900 hPa + # difference between layer pressure and surface pressure + diff = xr.full_like(dataset["pressure"], np.nan) + diff[:, :, :, 0] = 1000 + diff[:, :, :, 1:] = ( + dataset["pressure_surf"].values[:, :, :, None] - dataset["pressure"][:, :, :, :9].values + ) + # add fill values below true surface + dataset["pressure"] = dataset["pressure"].where(diff > 0) + # replace lowest pressure with surface pressure; broadcast happens in background + dataset["pressure"].values = ( + dataset["pressure_surf"].where((diff > 0) & (diff < 100), dataset["pressure"]).values + ) + + # Center Pressure + dummy = dataset["pressure"].copy() + dummy[:, :, :, 0] = 87.0 + for z in range(1, 10): + dummy[:, :, :, z] = ( + dataset["pressure"][:, :, :, z] + - (dataset["pressure"][:, :, :, z] - dataset["pressure"][:, :, :, z - 1]) / 2 + ) + dataset["pressure"] = dummy + + return dataset + + +def _combine_apriori(dataset): + """MOPITT surface values are stored separately to profile values because + of the floating surface pressure. So, for the smoothing calculations, + need to combine surface and profile + + Parameters + ---------- + xarray.Dataset + Returns + ------- + xarray.Dataset + """ + import numpy as np + + dataset["apriori_prof"][:, :, :, -1] = dataset["apriori_surf"].values + + # As with pressure, correct for where MOPITT surface pressure <900 hPa + # difference between layer pressure and surface pressure + diff = xr.full_like(dataset["pressure"], np.nan) + diff[:, :, :, 0] = 1000 + diff[:, :, :, 1:] = ( + dataset["pressure_surf"].values[:, :, :, None] - dataset["pressure"][:, :, :, :9].values + ) + # add fill values below true surface + dataset["apriori_prof"] = dataset["apriori_prof"].where(diff > 0) + # replace lowest pressure with surface pressure; broadcast happens in background + dataset["apriori_prof"].values = ( + dataset["apriori_surf"].where((diff > 0) & (diff < 100), dataset["apriori_prof"]).values + ) + + return dataset + + +def open_dataset(files, varnames): + """Loop through files to open the MOPITT level 3 data for variable `varname`. + + Parameters + ---------- + files : str or Path or list + Input file path(s). + If :class:`str`, shell-style wildcards (e.g. ``*``) will be expanded. + varnames : str or list of str + The variable(s) to load from the MOPITT file. + + Returns + ------- + xarray.Dataset + """ + if isinstance(files, str): + filelist = sorted(glob.glob(files, recursive=False)) + elif isinstance(files, Path): + filelist = [files] + else: + filelist = files # assume list + + if isinstance(varnames, str): + varnames = [varnames] + + datasets = [] + for filename in filelist: + print(filename) + file_varset = [] + for varname in varnames: + data = load_variable(filename, varname) + time = get_start_time(filename) + data = data.expand_dims(axis=0, time=[time]) + file_varset.append(data) + + # merge variables for file into single dataset + data = xr.merge(file_varset) + if "apriori_prof" in varnames and "pressure_surf" in varnames: + # add 3-d pressure field + data = _add_pressure_variables(data) + # combine surface and rest of profile into single variable + data = _combine_apriori(data) + + datasets.append(data) + + return xr.concat(datasets, dim="time") diff --git a/monetio/sat/_omps_l3_mm.py b/monetio/sat/_omps_l3_mm.py new file mode 100644 index 00000000..75b94d62 --- /dev/null +++ b/monetio/sat/_omps_l3_mm.py @@ -0,0 +1,82 @@ +"""Read NASA Suomi NPP OMPS Level 3 Nadir Mapper TO3 file.""" + + +def open_dataset(files): + """Open OMPS nadir mapper Total Column Ozone L3 files. + + Parameters + ---------- + files: str or Path or list + Input file path(s). + If :class:`str`, shell-style wildcards (e.g. ``*``) will be expanded. + + Returns + ------- + xarray.Dataset + """ + from glob import glob + from pathlib import Path + + import xarray as xr + + if isinstance(files, str): + filelist = sorted(glob(files, recursive=False)) + elif isinstance(files, Path): + filelist = [files] + else: + filelist = files # assume list + + datasets = [] + for filename in filelist: + ds = _open_one_dataset(filename) + datasets.append(ds) + + return xr.concat(datasets, dim="time") + + +def _open_one_dataset(fname): + """Read locally stored NASA Suomi NPP OMPS Level 3 Nadir Mapper TO3 file. + + Parameters + ---------- + fname: str + Local path to h5 file. + + Returns + ------- + xarray.Dataset + """ + import h5py + import numpy as np + import pandas as pd + import xarray as xr + + with h5py.File(fname, "r") as f: + # Info: https://snpp-omps.gesdisc.eosdis.nasa.gov/data/SNPP_OMPS_Level3/OMPS_NPP_NMTO3_L3_DAILY.2/doc/README.OMPS_NPP_NMTO3_L3_DAILY.2.pdf + lat = f["Latitude"][:] + lon = f["Longitude"][:] + column = f["ColumnAmountO3"][:] + cloud_fraction = f["RadiativeCloudFraction"][:] + time = pd.to_datetime(f.attrs.get("Date").decode("UTF-8"), format=r"%Y-%m-%d") + + # Remove cloudy scenes and points with no data (eg. polar dark zone) + column[(column < 0)] = np.nan + column[(cloud_fraction > 0.3)] = np.nan + lon_2d, lat_2d = np.meshgrid(lon, lat) + + ds = xr.Dataset( + { + "ozone_column": ( + ("y", "x"), + column, + {"long_name": "total column ozone amount", "units": "DU"}, + ), + }, + coords={ + "longitude": (("y", "x"), lon_2d, {"long_name": "longitude", "units": "degree_east"}), + "latitude": (("y", "x"), lat_2d, {"long_name": "latitude", "units": "degree_north"}), + "time": ((), time), + }, + ) + + return ds diff --git a/monetio/sat/_tropomi_l2_no2_mm.py b/monetio/sat/_tropomi_l2_no2_mm.py new file mode 100644 index 00000000..031d6ebd --- /dev/null +++ b/monetio/sat/_tropomi_l2_no2_mm.py @@ -0,0 +1,171 @@ +"""Read TROPOMI L2 NO2 data.""" + +import logging +import os +import sys +from collections import OrderedDict +from datetime import datetime +from glob import glob +from pathlib import Path + +import numpy as np +import xarray as xr +from cftime import num2date +from netCDF4 import Dataset + + +def _open_one_dataset(fname, variable_dict): + """ + Parameters + ---------- + fname : str + Input file path. + variable_dict : dict + + Returns + ------- + xarray.Dataset + """ + print("reading " + fname) + + ds = xr.Dataset() + + dso = Dataset(fname, "r") + + lon_var = dso.groups["PRODUCT"]["longitude"] + lat_var = dso.groups["PRODUCT"]["latitude"] + + ref_time_var = dso.groups["PRODUCT"]["time"] + ref_time_val = np.datetime64(num2date(ref_time_var[:].item(), ref_time_var.units)) + dtime_var = dso.groups["PRODUCT"]["delta_time"] + dtime = xr.DataArray(dtime_var[:].squeeze(), dims=("y",)).astype("timedelta64[ms]") + + ds["lon"] = ( + ("y", "x"), + lon_var[:].squeeze(), + {"long_name": lon_var.long_name, "units": lon_var.units}, + ) + ds["lat"] = ( + ("y", "x"), + lat_var[:].squeeze(), + {"long_name": lat_var.long_name, "units": lat_var.units}, + ) + ds["time"] = ((), ref_time_val, {"long_name": "reference time"}) + ds["scan_time"] = ds["time"] + dtime + ds["scan_time"].attrs.update({"long_name": "scan time"}) + ds = ds.set_coords(["lon", "lat", "time", "scan_time"]) + ds.attrs["reference_time_string"] = ref_time_val.astype(datetime).strftime(r"%Y%m%d") + + for varname in variable_dict: + print(varname) + values_var = dso.groups["PRODUCT"][varname] + values = values_var[:].squeeze() + + fv = values_var.getncattr("_FillValue") + if fv is not None: + values[:][values[:] == fv] = np.nan + + if "fillvalue" in variable_dict[varname]: + fillvalue = variable_dict[varname]["fillvalue"] + values[:][values[:] == fillvalue] = np.nan + + if "scale" in variable_dict[varname]: + values[:] = variable_dict[varname]["scale"] * values[:] + + if "minimum" in variable_dict[varname]: + minimum = variable_dict[varname]["minimum"] + values[:][values[:] < minimum] = np.nan + + if "maximum" in variable_dict[varname]: + maximum = variable_dict[varname]["maximum"] + values[:][values[:] > maximum] = np.nan + + ds[varname] = ( + ("y", "x"), + values, + {"long_name": values_var.long_name, "units": values_var.units}, + ) + + if "quality_flag_min" in variable_dict[varname]: + ds.attrs["quality_flag"] = varname + ds.attrs["quality_thresh_min"] = variable_dict[varname]["quality_flag_min"] + + dso.close() + + return ds + + +def apply_quality_flag(ds): + """Mask variables in place based on quality flag. + + Parameters + ---------- + ds : xarray.Dataset + """ + if "quality_flag" in ds.attrs: + quality_flag = ds[ds.attrs["quality_flag"]] + quality_thresh_min = ds.attrs["quality_thresh_min"] + + # Apply the quality thresh minimum to all variables in ds + for varname in ds: + print(varname) + if varname != ds.attrs["quality_flag"]: + logging.debug(varname) + values = ds[varname].values + values[quality_flag <= quality_thresh_min] = np.nan + + +def open_dataset(fnames, variable_dict, debug=False): + """ + Parameters + ---------- + fnames : str + Glob expression for input file paths. + variable_dict : dict + Mapping of variable names to a dict of attributes + that, if provided, will be used when processing the data + (``fillvalue``, ``scale``, ``maximum``, ``minimum``, ``quality_flag_min``). + A variable's attribute dict is allowed to be empty (``{}``). + Or, instead, you can pass a single variable name as a string + or a sequence of variable names. + debug : bool + Set logging level to debug. + + Returns + ------- + OrderedDict + Dict mapping reference time string (date) to :class:`xarray.Dataset` of the granule. + """ + if debug: + logging_level = logging.DEBUG + logging.basicConfig(stream=sys.stdout, level=logging_level) + + if isinstance(fnames, Path): + fnames = fnames.as_posix() + + if isinstance(variable_dict, str): + variable_dict = {variable_dict: {}} + elif isinstance(variable_dict, dict): + pass + else: # Assume sequence + variable_dict = {varname: {} for varname in variable_dict} + + for subpath in fnames.split("/"): + if "$" in subpath: + envvar = subpath.replace("$", "") + envval = os.getenv(envvar) + if envval is None: + print("environment variable not defined: " + subpath) + exit(1) + else: + fnames = fnames.replace(subpath, envval) + + print(fnames) + files = sorted(glob(fnames)) + granules = OrderedDict() + for file in files: + granule = _open_one_dataset(file, variable_dict) + apply_quality_flag(granule) + granules[granule.attrs["reference_time_string"]] = granule + + return granules diff --git a/tests/test_mopitt_l3.py b/tests/test_mopitt_l3.py new file mode 100644 index 00000000..ce2d7746 --- /dev/null +++ b/tests/test_mopitt_l3.py @@ -0,0 +1,73 @@ +import shutil +import warnings +from pathlib import Path + +import pandas as pd +import pytest +from filelock import FileLock + +from monetio.sat._mopitt_l3_mm import get_start_time, load_variable, open_dataset + +HERE = Path(__file__).parent + + +def retrieve_test_file(): + fn = "MOP03JM-201701-L3V95.9.3.he5" + + # Download to tests/data if not already present + p = HERE / "data" / fn + if not p.is_file(): + warnings.warn(f"Downloading test file {fn} for MOPITT L3 test") + import requests + + r = requests.get( + "https://csl.noaa.gov/groups/csl4/modeldata/melodies-monet/data/" + f"example_observation_data/satellite/{fn}", + stream=True, + ) + r.raise_for_status() + with open(p, "wb") as f: + f.write(r.content) + + return p + + +@pytest.fixture(scope="session") +def test_file_path(tmp_path_factory, worker_id): + if worker_id == "master": + # Not executing with multiple workers; + # let pytest's fixture caching do its job + return retrieve_test_file() + + # Get the temp directory shared by all workers + root_tmp_dir = tmp_path_factory.getbasetemp().parent + + # Copy to the shared test location + p_test = root_tmp_dir / "mopitt_l3_test.he5" + with FileLock(p_test.as_posix() + ".lock"): + if p_test.is_file(): + return p_test + else: + p = retrieve_test_file() + shutil.copy(p, p_test) + return p_test + + +def test_get_start_time(test_file_path): + t = get_start_time(test_file_path) + assert t.floor("D") == pd.Timestamp("2017-01-01") + + +def test_load_variable(test_file_path): + ds = load_variable(test_file_path, "column") + assert set(ds.coords) == {"lon", "lat"} + assert set(ds) == {"column"} + assert ds.column.mean() > 0 + + +def test_open_dataset(test_file_path): + ds = open_dataset(test_file_path, "column") + assert set(ds.coords) == {"time", "lat", "lon"} + assert set(ds) == {"column"} + assert ds.column.mean() > 0 + assert (ds.time.dt.floor("D") == pd.Timestamp("2017-01-01")).all() diff --git a/tests/test_omps_l3.py b/tests/test_omps_l3.py new file mode 100644 index 00000000..c19ae48d --- /dev/null +++ b/tests/test_omps_l3.py @@ -0,0 +1,82 @@ +import shutil +import warnings +from pathlib import Path + +import pandas as pd +import pytest +from filelock import FileLock + +from monetio.sat._omps_l3_mm import open_dataset + +HERE = Path(__file__).parent + +FNS = [ + "OMPS-TO3-L3-DAILY_v2.1_20190905.h5", + "OMPS-TO3-L3-DAILY_v2.1_20190906.h5", +] + + +def retrieve_test_file(i): + fn = FNS[i] + + # Download to tests/data if not already present + p = HERE / "data" / fn + if not p.is_file(): + warnings.warn(f"Downloading test file {fn} for OMPS L3 test") + import requests + + r = requests.get( + "https://csl.noaa.gov/groups/csl4/modeldata/melodies-monet/data/" + f"example_observation_data/satellite/{fn}", + stream=True, + ) + r.raise_for_status() + with open(p, "wb") as f: + f.write(r.content) + + return p + + +@pytest.fixture(scope="session") +def test_file_paths(tmp_path_factory, worker_id): + if worker_id == "master": + # Not executing with multiple workers; + # let pytest's fixture caching do its job + return [retrieve_test_file(i) for i in range(len(FNS))] + + # Get the temp directory shared by all workers + root_tmp_dir = tmp_path_factory.getbasetemp().parent + + # Copy to the shared test location + p_tests = [] + for i, p in enumerate(FNS): + p_test = root_tmp_dir / f"omps_l3_test_{i}.he5" + with FileLock(p_test.as_posix() + ".lock"): + if not p_test.is_file(): + p = retrieve_test_file(i) + shutil.copy(p, p_test) + p_tests.append(p_test) + + return p_tests + + +def test_open_dataset(test_file_paths): + ps = test_file_paths + ds = open_dataset(ps) + + assert ds.sizes["time"] == len(ps) == 2 + assert ds.sizes["x"] == 360 + assert ds.sizes["y"] == 180 + + assert ds.time.isel(time=0) == pd.Timestamp("2019-09-05") + assert ds.time.isel(time=1) == pd.Timestamp("2019-09-06") + assert ds.longitude.dims == ds.latitude.dims == ("y", "x") + assert ds.longitude.min() == -179.5 + assert ds.longitude.max() == 179.5 + assert ds.latitude.min() == -89.5 + assert ds.latitude.max() == 89.5 + + assert set(ds.data_vars) == {"ozone_column"} + assert 150 < ds.ozone_column.mean() < 400 + assert ds.ozone_column.min() > 100 + assert ds.ozone_column.max() < 600 diff --git a/tests/test_tropomi_l2.py b/tests/test_tropomi_l2.py new file mode 100644 index 00000000..8beff561 --- /dev/null +++ b/tests/test_tropomi_l2.py @@ -0,0 +1,74 @@ +import shutil +import warnings +from pathlib import Path + +import pandas as pd +import pytest +from filelock import FileLock + +from monetio.sat._tropomi_l2_no2_mm import open_dataset + +HERE = Path(__file__).parent + + +def retrieve_test_file(): + fn = "TROPOMI-L2-NO2-20190715.nc" + + # Download to tests/data if not already present + p = HERE / "data" / fn + if not p.is_file(): + warnings.warn(f"Downloading test file {fn} for TROPOMI L2 test") + import requests + + r = requests.get( + "https://csl.noaa.gov/groups/csl4/modeldata/melodies-monet/data/" + f"example_observation_data/satellite/{fn}", + stream=True, + ) + r.raise_for_status() + with open(p, "wb") as f: + f.write(r.content) + + return p + + +@pytest.fixture(scope="session") +def test_file_path(tmp_path_factory, worker_id): + if worker_id == "master": + # Not executing with multiple workers; + # let pytest's fixture caching do its job + return retrieve_test_file() + + # Get the temp directory shared by all workers + root_tmp_dir = tmp_path_factory.getbasetemp().parent + + # Copy to the shared test location + p_test = root_tmp_dir / "tropomi_l2_test.he5" + with FileLock(p_test.as_posix() + ".lock"): + if p_test.is_file(): + return p_test + else: + p = retrieve_test_file() + shutil.copy(p, p_test) + return p_test + + +def test_open_dataset(test_file_path): + vn = "nitrogendioxide_tropospheric_column" # mol m-2 + t_ref = pd.Timestamp("2019-07-15") + + ds = open_dataset(test_file_path, vn)[t_ref.strftime(r"%Y%m%d")] + + assert set(ds.coords) == {"time", "lat", "lon", "scan_time"} + assert set(ds) == {vn} + + assert 0 < ds[vn].mean() < 2e-4 + assert ds[vn].max() < 1e-3 + assert ds[vn].min() < 0 + + assert ds.time.ndim == 0 + assert pd.Timestamp(ds.time.values) == t_ref + assert (ds.scan_time.dt.floor("D") == t_ref).all() + + ds2 = open_dataset(test_file_path, {vn: {"minimum": 1e-9}})[t_ref.strftime(r"%Y%m%d")] + assert ds2[vn].min() >= 1e-9