Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TROPOMI updates including pressure calculation #184

Merged
merged 19 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions monetio/models/_wrfchem_mm.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,16 @@ def open_mfdataset(
var_list.append("PSFC")
# need to calculate surface pressure and dp and optionally dz here.

# Additional defaults for satellite analysis
var_list.append("zstag")

var_wrf_list = []
for var in var_list:
if var == "pres": # Insert special versions.
var_wrf = getvar(
wrflist, var, timeidx=ALL_TIMES, method="cat", squeeze=False, units="Pa"
)
elif var == "height":
var_wrf = getvar(
wrflist, var, timeidx=ALL_TIMES, method="cat", squeeze=False, units="m"
)
elif var == "height_agl":
elif var in {"height", "height_agl", "zstag"}:
var_wrf = getvar(
wrflist, var, timeidx=ALL_TIMES, method="cat", squeeze=False, units="m"
)
Expand All @@ -108,6 +107,13 @@ def open_mfdataset(

dset = xr.merge(var_wrf_list)

if not surf_only_nc:
# Compute layer thickness
dset["dz"] = (
dset["zstag"].diff("bottom_top_stag").swap_dims({"bottom_top_stag": "bottom_top"})
)
dset["dz"] = dset["dz"].assign_attrs({"units": "m", "long_name": "layer thickness"})

# Add global attributes needed
a_truelat1 = extract_global_attrs(wrflist[0], "TRUELAT1")
a_truelat2 = extract_global_attrs(wrflist[0], "TRUELAT2")
Expand Down Expand Up @@ -203,6 +209,7 @@ def open_mfdataset(
"temp": "temperature_k",
"height": "alt_msl_m_mid",
"height_agl": "alt_agl_m_mid",
"dz": "dz_m",
"PSFC": "surfpres_pa",
"pressure": "pres_pa_mid",
}
Expand Down
10 changes: 5 additions & 5 deletions monetio/sat/_modis_l2_mm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
def read_dataset(fname, variable_dict):
"""
Parameters
__________
----------
fname : str
Input file path.

Returns
_______
-------
xarray.Dataset
"""
from monetio.sat.hdfio import hdf_close, hdf_list, hdf_open, hdf_read
Expand Down Expand Up @@ -52,7 +52,7 @@ def read_dataset(fname, variable_dict):
def apply_quality_flag(ds):
"""
Parameters
__________
----------
ds : xarray.Dataset
"""
if "quality_flag" in ds.attrs:
Expand All @@ -69,12 +69,12 @@ def apply_quality_flag(ds):
def read_mfdataset(fnames, variable_dict, debug=False):
"""
Parameters
__________
----------
fnames : str
Regular expression for input file paths.

Returns
_______
-------
xarray.Dataset
"""
if debug:
Expand Down
190 changes: 146 additions & 44 deletions monetio/sat/_tropomi_l2_no2_mm.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
"""Read TROPOMI L2 NO2 data."""
"""Read TROPOMI L2 NO2 data.

TROPOspheric Monitoring Instrument (TROPOMI) instrument.

http://www.tropomi.eu
https://sentinels.copernicus.eu/web/sentinel/missions/sentinel-5p
"""

import logging
import os
import sys
import warnings
from collections import OrderedDict
from datetime import datetime
from glob import glob
Expand Down Expand Up @@ -54,45 +61,120 @@ def _open_one_dataset(fname, variable_dict):
ds["scan_time"] = ds["time"] + dtime
ds["scan_time"].attrs.update({"long_name": "scan time"})
ds = ds.set_coords(["lon", "lat", "time", "scan_time"])
ds.attrs["reference_time_string"] = ref_time_val.astype(datetime).strftime(r"%Y%m%d")
ds.attrs["reference_time_string"] = ref_time_val.astype(datetime).strftime(r"%Y-%m-%d")

def get_extra(varname_, *, dct_=None, default_group="PRODUCT"):
"""Get non-varname variables."""
if dct_ is None:
dct_ = variable_dict.get(varname_, {})
group_name = dct_.get("group", default_group)
if isinstance(group_name, list):
group_name = group_name[0]
return _get_values(dso[group_name][varname_], dct_)

for varname, dct in variable_dict.items():
print(f"- {varname}")
if dct is None or (isinstance(dct, str) and dct in {"None", "NONE"}):
dct = {}

if varname == "preslev":
# Compute layered pressure and tropopause pressure

itrop_ = get_extra("tm5_tropopause_layer_index", dct_=dct)
itrop_[itrop_ == 0] = 1 # Avoid trop in surface layer
itrop = xr.DataArray(itrop_, dims=("y", "x"))
a = xr.DataArray(data=get_extra("tm5_constant_a", dct_=dct), dims=("z", "v"))
b = xr.DataArray(data=get_extra("tm5_constant_b", dct_=dct), dims=("z", "v"))
psfc = xr.DataArray(
data=get_extra(
"surface_pressure",
default_group="PRODUCT/SUPPORT_DATA/INPUT_DATA",
dct_=dct,
),
dims=("y", "x"),
)

# Mid-layer pressure
assert a.sizes["v"] == 2, "base and top"
p = ((a.isel(v=0) + b.isel(v=0) * psfc) + (a.isel(v=1) + b.isel(v=1) * psfc)) / 2
zmoon marked this conversation as resolved.
Show resolved Hide resolved
ds["preslev"] = p
ds["preslev"].attrs.update({"long_name": "mid-layer pressure", "units": "Pa"})

# Tropopause pressure
ptrop = xr.full_like(itrop, np.nan, dtype=ds["preslev"].dtype)
for i in np.unique(itrop):
if np.isnan(i):
continue
ptrop = xr.where(itrop == i, p.isel(z=int(i)), ptrop)
ds["troppres"] = ptrop
ds["troppres"].attrs.update({"long_name": "tropopause pressure", "units": "Pa"})

elif varname in {"latitude_bounds", "longitude_bounds"}:
group_name = dct.get("group", "PRODUCT/SUPPORT_DATA/GEOLOCATIONS")
values = _get_values(dso[group_name][varname], dct)
assert values.shape[-1] == 4
for i in range(4):
ds[f"{varname}_{i}"] = (
("y", "x"),
values[:, :, i],
{"long_name": f"{varname} {i}"},
)

else:
group_name = dct.get("group", "PRODUCT")
var = dso[group_name][varname]
values = _get_values(var, dct)

if values.ndim == 2:
dims = ("y", "x")
elif values.ndim == 3:
dims = ("y", "x", "z")
else:
raise ValueError(f"unexpected ndim ({varname}): {values.ndim}")

ds[varname] = (
dims,
values,
{"long_name": var.long_name, "units": var.units},
)

for varname in variable_dict:
print(varname)
values_var = dso.groups["PRODUCT"][varname]
values = values_var[:].squeeze()
if "quality_flag_min" in dct:
ds.attrs["quality_flag"] = varname
ds.attrs["quality_thresh_min"] = dct["quality_flag_min"]
ds.attrs["var_applied"] = dct.get("var_applied", [])

fv = values_var.getncattr("_FillValue")
if fv is not None:
values[:][values[:] == fv] = np.nan
dso.close()

if "fillvalue" in variable_dict[varname]:
fillvalue = variable_dict[varname]["fillvalue"]
values[:][values[:] == fillvalue] = np.nan
return ds

if "scale" in variable_dict[varname]:
values[:] = variable_dict[varname]["scale"] * values[:]

if "minimum" in variable_dict[varname]:
minimum = variable_dict[varname]["minimum"]
values[:][values[:] < minimum] = np.nan
def _get_values(var, dct):
"""Take netCDF4 Variable, squeeze, tweak values based on user-provided attribute dict,
and return NumPy array."""

if "maximum" in variable_dict[varname]:
maximum = variable_dict[varname]["maximum"]
values[:][values[:] > maximum] = np.nan
values = var[:].squeeze()

ds[varname] = (
("y", "x"),
values,
{"long_name": values_var.long_name, "units": values_var.units},
)
if np.ma.is_masked(values):
logging.info(f"{var.name} already masked")

if "quality_flag_min" in variable_dict[varname]:
ds.attrs["quality_flag"] = varname
ds.attrs["quality_thresh_min"] = variable_dict[varname]["quality_flag_min"]
scale = dct.get("scale")
if scale is not None:
values *= float(scale)

dso.close()
# Note netcdf4-python masks based on nc _FillValue attr automatically
fv = dct.get("fillvalue")
if fv is not None:
values = np.ma.masked_values(values, fv, atol=0, copy=False)

return ds
minimum = dct.get("minimum")
if minimum is not None:
values = np.ma.masked_less(values, minimum, copy=False)

maximum = dct.get("maximum")
if maximum is not None:
values = np.ma.masked_greater(values, maximum, copy=False)

return values


def apply_quality_flag(ds):
Expand All @@ -103,20 +185,19 @@ def apply_quality_flag(ds):
ds : xarray.Dataset
"""
if "quality_flag" in ds.attrs:
quality_flag = ds[ds.attrs["quality_flag"]]
quality_flag = ds[ds.attrs["quality_flag"]] # quality flag variable (float)
quality_thresh_min = ds.attrs["quality_thresh_min"]

# Apply the quality thresh minimum to all variables in ds
for varname in ds:
print(varname)
if varname != ds.attrs["quality_flag"]:
logging.debug(varname)
values = ds[varname].values
values[quality_flag <= quality_thresh_min] = np.nan
# Apply the quality thresh minimum to selected variables
for varname in ds.attrs["var_applied"]:
logging.debug(f"applying quality flag to {varname}")
values = ds[varname].values
values[quality_flag <= quality_thresh_min] = np.nan


def open_dataset(fnames, variable_dict, debug=False):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change the function name to read_trpdataset? Just to be consistent with driver which has been incorporated into melodies-monet

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it as an alias: 626b95c

In general, we're trying to keep API's for the different model/sat readers somewhat consistent, with the same function names etc.

"""
"""Open one or more TROPOMI L2 NO2 files.

Parameters
----------
fnames : str
Expand All @@ -126,6 +207,14 @@ def open_dataset(fnames, variable_dict, debug=False):
that, if provided, will be used when processing the data
(``fillvalue``, ``scale``, ``maximum``, ``minimum``, ``quality_flag_min``).
A variable's attribute dict is allowed to be empty (``{}``).
For ``longitude_bounds`` and ``latitude_bounds``,
variables ``longitude_bounds_{1..4}`` and/or ``latitude_bounds_{1..4}`` are created.
For ``preslev``, you can specify attributes for the variables used to calculate pressure
(``tm5_tropopause_layer_index``, ``tm5_constant_a``, ``tm5_constant_b``, ``surface_pressure``),
and variables ``preslev`` (mid-layer pressure)
and ``troppres`` (tropopause pressure) are created.
For any variable,
a non-default group name can be specified with the key ``group`` (use ``/`` for nesting).
Or, instead, you can pass a single variable name as a string
or a sequence of variable names.
debug : bool
Expand All @@ -134,7 +223,8 @@ def open_dataset(fnames, variable_dict, debug=False):
Returns
-------
OrderedDict
Dict mapping reference time string (date) to :class:`xarray.Dataset` of the granule.
Dict mapping reference time string (date, YYYY-MM-DD)
to a list of :class:`xarray.Dataset` granules.
"""
if debug:
logging_level = logging.DEBUG
Expand All @@ -155,17 +245,29 @@ def open_dataset(fnames, variable_dict, debug=False):
envvar = subpath.replace("$", "")
envval = os.getenv(envvar)
if envval is None:
print("environment variable not defined: " + subpath)
exit(1)
raise RuntimeError(f"environment variable {envvar!r} not defined: " + subpath)
else:
fnames = fnames.replace(subpath, envval)

print(fnames)
files = sorted(glob(fnames))
granules = OrderedDict()
for file in files:
granule = _open_one_dataset(file, variable_dict)
apply_quality_flag(granule)
granules[granule.attrs["reference_time_string"]] = granule
key = granule.attrs["reference_time_string"]
if key in granules:
granules[key].append(granule)
else:
granules[key] = [granule]

return granules


def read_trpdataset(*args, **kwargs):
"""Alias for :func:`open_dataset`."""
warnings.warn(
"read_trpdataset is an alias for open_dataset and may be removed in the future",
FutureWarning,
stacklevel=2,
)
return open_dataset(*args, **kwargs)
Loading