Skip to content

Commit

Permalink
first pass to generalize
Browse files Browse the repository at this point in the history
  • Loading branch information
aulemahal committed Sep 6, 2024
1 parent d61182c commit d556d95
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 121 deletions.
121 changes: 1 addition & 120 deletions xclim/indices/_threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
import xarray

from xclim.core import DayOfYearStr, Quantified
from xclim.core.calendar import doy_from_string, get_calendar, parse_offset
from xclim.core.calendar import doy_from_string, get_calendar
from xclim.core.missing import at_least_n_valid
from xclim.core.units import (
convert_units_to,
declare_units,
ensure_cf_units,
pint2cfunits,
rate2amount,
str2pint,
Expand Down Expand Up @@ -55,7 +54,6 @@
"first_day_temperature_above",
"first_day_temperature_below",
"first_snowfall",
"freezing_rain_events",
"frost_free_season_end",
"frost_free_season_length",
"frost_free_season_start",
Expand Down Expand Up @@ -2624,123 +2622,6 @@ def wetdays_prop(
return fwd


@declare_units(pr="[precipitation]", thresh="[precipitation]")
def freezing_rain_events(
pr: xarray.DataArray,
thresh: Quantified = "1 kg m-2 d-1",
window_start: int = 3,
window_stop: int = 3,
freq: str | None = None,
) -> xarray.Dataset:
r"""Freezing rain events.
Parameters
----------
pr : xarray.DataArray
Freezing precipitation (`prfr`)
thresh : Quantified
Threshold that must be exceeded to be considered an event
window_start: int
Number of time steps above the threshold required to start an event
window_stop : int
Number of time steps below the threshold required to stop an event
freq : str
Resampling frequency.
Returns
-------
xarray.Dataset
"""
freq = xarray.infer_freq(pr.time)
mag, units, _, _ = parse_offset(freq)
# condition to respect for `window_start` time steps to start a run
thresh = convert_units_to(thresh, pr)
da_start = pr >= thresh
da_stop = (1 - da_start).astype(bool)

# Get basic blocks to work with, our runs with holes and the lengths of those runs
# Series of ones indicating where we have continuous runs of freezing rain with pauses
# not exceeding `window_stop`
runs = rl.runs_with_holes(da_start, window_start, da_stop, window_stop)

# Compute the length of freezing rain events
# I think int16 is safe enough
ds = rl.rle(runs).to_dataset(name="run_lengths")
ds["run_lengths"] = ds.run_lengths.astype(np.int16)
ds.run_lengths.attrs["units"] = ""

# Time duration where the precipitation threshold is exceeded during an event
# (duration of complete run - duration of holes in the run )
ds["precipitation_duration"] = (
rl._cumsum_reset(
da_start.where(runs == 1), index="first", reset_on_zero=False
).astype(np.int16)
* mag
)
ds["precipitation_duration"].attrs["units"] = units

# # Cumulated precipitation in a given freezing rain event
pram = rate2amount(pr)
ds["cumulative_precipitation"] = rl._cumsum_reset(
pram.where(runs == 1), index="first", reset_on_zero=False
)
ds["cumulative_precipitation"].attrs["units"] = pram.units

# Keep time as a variable, it will be used to keep start of events
ds["start"] = ds["time"].broadcast_like(ds) # .astype(int)
# I have to convert it to an integer for the filtering, time object won't do
# Since there are conversion needing a time object earlier, I think it's ok
# to assume this here?
time_min = ds.start.min()
ds["start"] = (ds.start - time_min).astype("timedelta64[s]").astype(int)

# Filter events: Reduce time dimension
def _filter_events(da, rl, max_event_number):
out = np.full(max_event_number, np.NaN)
events_start = da[rl > 0]
out[: len(events_start)] = events_start
return out

max_event_number = int(np.ceil(pr.time.size / (window_start + window_stop)))
v_attrs = {v: ds[v].attrs for v in ds.data_vars}
ds = xarray.apply_ufunc(
_filter_events,
ds,
ds.run_lengths,
input_core_dims=[["time"], ["time"]],
output_core_dims=[["event"]],
kwargs=dict(max_event_number=max_event_number),
output_sizes={"event": max_event_number},
dask="parallelized",
vectorize=True,
).assign_attrs(ds.attrs)

ds["event"] = np.arange(1, ds.event.size + 1)
for v in ds.data_vars:
ds[v].attrs = v_attrs[v]

# convert back start to a time
ds["start"] = time_min.astype("datetime64[ns]") + ds["start"].astype(
"timedelta64[ns]"
)

# Other indices that could be completely done outside of the function, no input needed anymore

# number of events
ds["number_of_events"] = (ds["run_lengths"] > 0).sum(dim="event").astype(np.int16)
ds.number_of_events.attrs["units"] = ""

# mean rate of precipitation during event
ds["rate"] = ds["cumulative_precipitation"] / ds["precipitation_duration"]
units = (
f"{ds['cumulative_precipitation'].units}/{ds['precipitation_duration'].units}"
)
ds["rate"].attrs["units"] = ensure_cf_units(units)

ds.attrs["units"] = ""
return ds


@declare_units(tasmin="[temperature]", thresh="[temperature]")
def maximum_consecutive_frost_days(
tasmin: xarray.DataArray,
Expand Down
132 changes: 131 additions & 1 deletion xclim/indices/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
from xarray.coding.cftime_offsets import _MONTH_ABBREVIATIONS # noqa

from xclim.core import DayOfYearStr, Quantified
from xclim.core.calendar import doy_to_days_since, get_calendar, select_time
from xclim.core.calendar import (
doy_to_days_since,
get_calendar,
parse_offset,
select_time,
)
from xclim.core.units import (
convert_units_to,
declare_relative_units,
infer_context,
pint2cfunits,
rate2amount,
str2pint,
to_agg_units,
)
Expand Down Expand Up @@ -1474,3 +1480,127 @@ def detrend(
trend = xr.polyval(ds[dim], coeff.polyfit_coefficients)
with xr.set_options(keep_attrs=True):
return ds - trend


@declare_relative_units(thresh="<data>")
def thresholded_events(
data: xr.DataArray,
thresh: Quantified = "1 kg m-2 d-1",
window_start: int = 3,
window_stop: int = 3,
freq: str | None = None,
data_is_rate: bool = False,
) -> xr.Dataset:
r"""Thresholded events.
Parameters
----------
data : xr.DataArray
Variable.
thresh : Quantified
Threshold that must be exceeded to be considered an event
window_start: int
Number of time steps above the threshold required to start an event
window_stop : int
Number of time steps below the threshold required to stop an event
freq : str
Resampling frequency.
data_is_rate : bool
True if the data is a rate that needs to be converted to an amount
when computing an accumulation.
Returns
-------
xr.Dataset
"""
freq = xr.infer_freq(data.time)
mag, units, _, _ = parse_offset(freq)
# condition to respect for `window_start` time steps to start a run
thresh = convert_units_to(thresh, data)
da_start = data >= thresh
da_stop = (1 - da_start).astype(bool)

# Get basic blocks to work with, our runs with holes and the lengths of those runs
# Series of ones indicating where we have continuous runs of freezing rain with pauses
# not exceeding `window_stop`
runs = rl.runs_with_holes(da_start, window_start, da_stop, window_stop)

# Compute the length of freezing rain events
# I think int16 is safe enough
ds = rl.rle(runs).astype(np.int16).to_dataset(name="run_lengths")
ds.run_lengths.attrs["units"] = ""

# Time duration where the precipitation threshold is exceeded during an event
# (duration of complete run - duration of holes in the run )
ds["effective_duration"] = (
rl._cumsum_reset(
da_start.where(runs == 1), index="first", reset_on_zero=False
).astype(np.int16)
* mag
)
ds["effective_duration"].attrs["units"] = units

# # Cumulated precipitation in a given freezing rain event
if data_is_rate:
dataam = rate2amount(data)
else:
dataam = data
ds["event_accumulation"] = rl._cumsum_reset(
dataam.where(runs == 1), index="first", reset_on_zero=False
)
ds["event_accumulation"].attrs["units"] = dataam.units

# Keep time as a variable, it will be used to keep start of events
ds["start"] = ds["time"].broadcast_like(ds) # .astype(int)
# I have to convert it to an integer for the filtering, time object won't do
# Since there are conversion needing a time object earlier, I think it's ok
# to assume this here?
time_min = ds.start.min()
ds["start"] = (ds.start - time_min).astype("timedelta64[s]").astype(int)

# Filter events: Reduce time dimension
def _filter_events(da, rl, max_event_number):
out = np.full(max_event_number, np.NaN)
events_start = da[rl > 0]
out[: len(events_start)] = events_start
return out

max_event_number = int(np.ceil(data.time.size / (window_start + window_stop)))
v_attrs = {v: ds[v].attrs for v in ds.data_vars}
ds = xr.apply_ufunc(
_filter_events,
ds,
ds.run_lengths,
input_core_dims=[["time"], ["time"]],
output_core_dims=[["event"]],
kwargs=dict(max_event_number=max_event_number),
output_sizes={"event": max_event_number},
dask="parallelized",
vectorize=True,
).assign_attrs(ds.attrs)

ds["event"] = np.arange(1, ds.event.size + 1)
for v in ds.data_vars:
ds[v].attrs = v_attrs[v]

# convert back start to a time
# TODO fix for calendars
ds["start"] = time_min.astype("datetime64[ns]") + ds["start"].astype(
"timedelta64[ns]"
)
return ds

# # Other indices that could be completely done outside of the function, no input needed anymore
# # number of events
# ds["number_of_events"] = (ds["run_lengths"] > 0).sum(dim="event").astype(np.int16)
# ds.number_of_events.attrs["units"] = ""

# # mean rate of precipitation during event
# ds["rate"] = ds["cumulative_precipitation"] / ds["precipitation_duration"]
# units = (
# f"{ds['cumulative_precipitation'].units}/{ds['precipitation_duration'].units}"
# )
# ds["rate"].attrs["units"] = ensure_cf_units(units)

# ds.attrs["units"] = ""
# return ds

0 comments on commit d556d95

Please sign in to comment.