Skip to content

Commit

Permalink
clean up and refactor of pyremo.preproc (#130)
Browse files Browse the repository at this point in the history
* some clean up

* removed old test

* update docstrings

* extracted xarray pyintorg interface

* more clean up

* Update whats_new.rst

* clean up

* some cleanup for verctical coordinates

* fix

* update file output pattern
  • Loading branch information
larsbuntemeyer authored Aug 17, 2023
1 parent e52ac27 commit 78d034e
Show file tree
Hide file tree
Showing 13 changed files with 786 additions and 1,974 deletions.
3 changes: 2 additions & 1 deletion docs/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ Internal Changes
Breaking Changes
~~~~~~~~~~~~~~~~

- Refactoring of :py:meth:`preproc.ERA5` for new DKRZ data pool conventions (:pull:`129`).
- Refactoring of :py:meth:`preproc.ERA5` for new DKRZ data pool conventions (:pull:`129`).
- Refactoring and clean up of preprocessing module (:pull:`130`).

v0.5.1 (24 May 2023)
--------------------
Expand Down
6 changes: 3 additions & 3 deletions pyremo/conventions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ def output_pattern(
"""
wild = "*"
if middle is None:
middle = ""
# if middle is None:
# middle = ""
if expid is None:
expid = wild
else:
expid = f"{int(expid):06d}"
if type in ["e", "n", "p"]:
if type in ["e", "n", "p"] and middle is None:
if code is None:
middle = "_c*_"
else:
Expand Down
2 changes: 1 addition & 1 deletion pyremo/preproc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# flake8: noqa

from ._remap import remap, remap_remo, to_netcdf, to_tar
from .cf import get_gfile, to_cfdatetime
from .core import gfile
from .era5 import ERA5
from .remap import remap, remap_remo, to_netcdf, to_tar

# from .remap_new import Remapper
from .utils import write_forcing_file
115 changes: 45 additions & 70 deletions pyremo/preproc/cf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
import xarray as xr
from cdo import Cdo

# from .constants import lev_i
from .core import check_lev, convert_units, get_vc2, horizontal_dims, open_mfdataset
from .core import check_lev, convert_units, get_vc, horizontal_dims, open_mfdataset

cdo_exe = "cdo"
default_catalog = "/work/ik1017/Catalogs/dkrz_cmip6_disk.csv.gz"

cdo_datetime_format = "%Y-%m-%dT%H:%M:%S"

Expand All @@ -38,7 +36,6 @@ def get_min_max_time(filename):
return np.nan, np.nan


# @dask.delayed
def get_times_from_files(files):
result = {}
for f in files:
Expand Down Expand Up @@ -76,6 +73,13 @@ def create_df(data):


def create_catalog(**args):
"""Scan directories for files and add time min and time max
This function is supposed to scan directories for global model input files.
It will create a catalog that contains the path to the file and the
min and max times of that file.
"""
files = {}
print(args)
for v, d in args.items():
Expand All @@ -93,7 +97,6 @@ def to_datetime(time):


def to_cfdatetime(time, calendar="standard"):
# time = str(int(time))
try:
return xr.cftime_range(start=time, end=time, calendar=calendar)[0]
except Exception:
Expand Down Expand Up @@ -124,34 +127,20 @@ def cdo_call(self, options="", op="", input="", output="temp", print_command=Tru


class CFModelSelector:
def __init__(self, df=None, calendar="standard", **kwargs):
if df is None:
df = pd.read_csv(default_catalog)
"""CF Model Selector
The CF model selector class simply selects files by a certain timestep.
"""

def __init__(self, df, calendar="standard", **kwargs):
df = df.copy()
if kwargs:
df = search_df(df, **kwargs)
self.calendar = calendar
self.tempfiles = []
self.df = self._update_time(df)

def __repr__(self):
return repr(self._group())

def _repr_html_(self):
return self._group()._repr_html_()

def _group(self):
groups = ["source_id", "member_id", "experiment_id", "table_id"]
return self.df.groupby(groups)[
[
"variable_id",
# "member_id",
"institution_id",
# "table_id",
"activity_id",
]
].agg(["unique"])

def _update_time(self, df):
df["time_min"] = df["time_min"].apply(to_cfdatetime, calendar=self.calendar)
df["time_max"] = df["time_max"].apply(to_cfdatetime, calendar=self.calendar)
Expand All @@ -161,17 +150,12 @@ def get_file(self, datetime=None, **kwargs):
sel = get_var_by_time(self.df, datetime=datetime, **kwargs)
if len(sel.index) > 1:
return list(sel.path)
# raise Exception("file selection is not unique")
if sel.empty:
raise Exception("no file found: {}, date: {}".format(kwargs, datetime))
return sel.iloc[0].path

# def _cdo_call(self, options="", op="", input="", output="temp", print_command=True):
# cdo = Cdo(tempdir=self.scratch)
# getattr(cdo, op)(options=options, input=input)


def gfile(ds, ref_ds=None, tos=None, time_range=None, attrs=None, use_cftime=True):
def gfile(ds, ref_ds=None, attrs=None, use_cftime=True):
"""Creates a global dataset ready for preprocessing.
This function creates a homogenized global dataset. If neccessary,
Expand All @@ -188,12 +172,6 @@ def gfile(ds, ref_ds=None, tos=None, time_range=None, attrs=None, use_cftime=Tru
coordinates and the global attributes. If ``ref_ds=None``, ``ta`` from
the input dataset is used as a reference.
tos : xarray.Dataset
Sea surface dataset.
time_rage :
The common time range from the input and sst that should be used.
attrs:
Global attributes for the output dataset. If ``attrs=None``, the global
attributes from ``ref_ds`` are used.
Expand All @@ -205,19 +183,13 @@ def gfile(ds, ref_ds=None, tos=None, time_range=None, attrs=None, use_cftime=Tru
"""
if isinstance(ds, dict):
ds = open_datasets(ds, ref_ds, time_range)
if time_range is None:
time_range = ds.time
ds = open_datasets(ds, ref_ds)
else:
ds = ds.copy()
if time_range is None:
time_range = ds.time
ds = ds.sel(time=time_range)
ds["akgm"], ds["bkgm"] = get_vc2(ds)
ds["akgm"], ds["bkgm"] = get_vc(ds)
ds = check_lev(ds)
# if tos is not None:
# ds["tos"] = map_sst(tos, ds.sel(time=time_range))
# ds = ds.rename({"lev": lev_i})

# ensure correct units
ds = convert_units(ds)
if "sftlf" in ds:
ds["sftlf"] = np.around(ds.sftlf)
Expand Down Expand Up @@ -251,8 +223,8 @@ def __init__(self, df=None, calendar=None, scratch=None, **kwargs):
self.scratch = os.path.join(os.environ["SCRATCH"], ".cf-selector")
except Exception:
pass
# self.cdo = Cdo(tempdir=self.scratch)
self.regridder = None

self.tos_regridder = None

def get_files(self, variables, datetime, **kwargs):
datetime = to_cfdatetime(datetime, self.calendar)
Expand All @@ -266,6 +238,8 @@ def extract_timestep(self, datetime=None, **kwargs):
pass

def extract_dynamic_timesteps(self, datetime=None, **kwargs):
"""Extract timesteps for a certain date from input files."""

datetime = to_cfdatetime(datetime, self.calendar)
files = self.get_files(self.dynamics, datetime=datetime, **kwargs)
if datetime is None:
Expand All @@ -289,11 +263,12 @@ def extract_data(self, datetime, **kwargs):
for var, f in self.get_files(self.fx, datetime=None, **kwargs).items()
}
)
# files.update(self.get_files(self.sst, datetime=datetime, **kwargs))

return xr.merge(files.values(), compat="override", join="override")
# return files

def get_sst(self, datetime, atmo_grid=None):
"""Extract and interpolate SST in space and time."""

datetime = to_cfdatetime(datetime, self.calendar)
times = get_sst_times(datetime)
files = {
Expand All @@ -319,33 +294,34 @@ def get_sst(self, datetime, atmo_grid=None):
return self.regrid_to_atmosphere(sst_da.squeeze(drop=True), atmo_grid)

def regrid_to_atmosphere(self, da, atmo_grid):
"""Regrid tos to atmoshperic grid."""
import xesmf as xe

attrs = da.attrs
atmo_grid = atmo_grid.copy()
atmo_grid["mask"] = ~(atmo_grid.sftlf > 0).squeeze(drop=True)
# if self.regridder is None:

ds = da.to_dataset()
ds["mask"] = ~ds.tos.isnull().squeeze(drop=True)
print("creating regridder")
self.regridder = xe.Regridder(
ds, atmo_grid, method="nearest_s2d", extrap_method="nearest_s2d"
)
da = self.regridder(ds.tos)
da.attrs = attrs
return da

if not self.tos_regridder:
ds["mask"] = ~ds.tos.isnull().squeeze(drop=True)
print("creating tos regridder")
self.tos_regridder = xe.Regridder(
ds, atmo_grid, method="nearest_s2d", extrap_method="nearest_s2d"
)

out = self.tos_regridder(da)
out.attrs = attrs
return out

def gfile(self, datetime, sst=True, **kwargs):
"""Creates a gfile from CF input data."""
gds = self.extract_data(datetime=datetime, **kwargs)
if sst is True:
gds[self.sst] = self.get_sst(datetime, gds)
if "variable_id" in gds.attrs:
del gds.attrs["variable_id"]
return gfile(gds)
# gds = convert_units(gds)
# if "sftlf" in gds:
# gds["sftlf"] = np.around(gds.sftlf)
# return gds


def get_sst_times(dt):
Expand All @@ -363,7 +339,7 @@ def get_sst_times(dt):
)


def open_datasets(datasets, ref_ds=None, time_range=None):
def open_datasets(datasets, ref_ds=None):
"""Creates a virtual gfile"""
if ref_ds is None:
try:
Expand All @@ -372,26 +348,25 @@ def open_datasets(datasets, ref_ds=None, time_range=None):
raise Exception("ta is required in the datasets dict if no ref_ds is given")
lon, lat = horizontal_dims(ref_ds)
# ak_bnds, bk_bnds = get_ab_bnds(ref_ds)
if time_range is None:
time_range = ref_ds.time
dsets = []
for var, f in datasets.items():
try:
da = open_mfdataset(f, chunks={"time": 1})[var]
da = da.sel(time=time_range)
except Exception:
da = open_mfdataset(f, chunks={})[var]
if "vertical" in da.cf:
da = check_lev(da)
dsets.append(da)
dsets += list(get_vc2(ref_ds))
dsets += list(get_vc(ref_ds))
return xr.merge(dsets, compat="override", join="override")


def get_gfile(scratch=None, **kwargs):
if "df" in kwargs:
df = kwargs["df"]
else:
# create a file catalog containing files and their
# start and end times.
files = create_catalog(**kwargs)
data = dask.compute(files)
df = create_df(data[0])
Expand Down
10 changes: 10 additions & 0 deletions pyremo/preproc/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,13 @@
lev = "lev"
lev_input = "lev_input"
lev_gm = lev_input

# variables that should have a mask with fill values
fillvars = ["TSW", "SEAICE", "TSI"]


class const:
"""constants used for unit conversion"""

grav_const = 9.806805923
absolute_zero = 273.5
Loading

0 comments on commit 78d034e

Please sign in to comment.