Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Freezing rain events #1778

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ v0.50.0 (unreleased)
--------------------
Contributors to this version: Trevor James Smith (:user:`Zeitsperre`), Éric Dupuis (:user:`coxipi`).

New indicators
^^^^^^^^^^^^^^
* New indicator ``freezing_rain_events`` gives statistics about freezing rain sequences. (:pull:`1778`).


New features and enhancements
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* New properties: Bivariate Spell Length (``xclim.sdba.properties.bivariate_spell_length``), generalized spell lengths with an argument for `window`, and specific spell lengths with `window` fixed to 1 (``xclim.sdba.propertiies.threshold_count``, ``xclim.sdba.propertiies.bivariate_threshold_count``). (:pull:`1758`).
* New option `normalize` in ``sdba.measures.taylordiagram`` to obtain normalized Taylor diagrams (divide standard deviations by standard deviation of the reference). (:pull:`1764`).
* ``xclim.indices.run_length.runs_with_holes`` allows to input a condition that must be met for a run to start and a second condition that must be met for the run to stop. (:pull:`1778`).

Breaking changes
^^^^^^^^^^^^^^^^
Expand Down
28 changes: 27 additions & 1 deletion tests/test_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from xclim import indices as xci
from xclim.core.calendar import percentile_doy
from xclim.core.options import set_options
from xclim.core.units import ValidationError, convert_units_to, units
from xclim.core.units import ValidationError, convert_units_to, rate2amount, units

K2C = 273.15

Expand Down Expand Up @@ -338,6 +338,32 @@ def test_bedd(self, method, end_date, deg_days, max_deg_days):
if method == "icclim":
np.testing.assert_array_equal(bedd, bedd_high_lat)

def test_freezing_rain_events(self, open_dataset):
times = pd.date_range("1950-01-01", "1950-01-31", freq="D")
da = xr.DataArray(
np.zeros(len(times)),
dims={"time"},
coords={"time": times},
attrs={"units": "kg m-2 s-1"},
)

# Two sequences separated by 3 days, which will be split with window_stop = 3
da[0:10] = 1
da[13:20] = 1
out = xci.freezing_rain_events(
da, thresh="0.5 kg m-2 s-1", window_start=3, window_stop=3
)
assert (out.run_lengths.values[0:2] == [10, 7]).all()

# Two sequences separated by 2 days, which will form a single large event
da[12] = 1
out = xci.freezing_rain_events(
da, thresh="0.5 kg m-2 s-1", window_start=3, window_stop=3
)
pram = rate2amount(da)
assert out.run_lengths.values[0] == 20
assert (out.cumulative_precipitation - pram.sum()).sum() == 0

def test_cool_night_index(self, open_dataset):
ds = open_dataset("cmip5/tas_Amon_CanESM2_rcp85_r1i1p1_200701-200712.nc")
ds = ds.rename(dict(tas="tasmin"))
Expand Down
1 change: 1 addition & 0 deletions xclim/core/units.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ def cf_conversion(
FREQ_UNITS = {
"D": "d",
"W": "week",
"h": "h",
}
"""
Resampling frequency units for :py:func:`xclim.core.units.infer_sampling_units`.
Expand Down
121 changes: 120 additions & 1 deletion xclim/indices/_threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import numpy as np
import xarray

from xclim.core.calendar import doy_from_string, get_calendar, select_time
from xclim.core.calendar import doy_from_string, get_calendar, parse_offset, select_time
from xclim.core.missing import at_least_n_valid
from xclim.core.units import (
convert_units_to,
declare_units,
ensure_cf_units,
pint2cfunits,
rate2amount,
str2pint,
Expand Down Expand Up @@ -52,6 +53,7 @@
"first_day_temperature_above",
"first_day_temperature_below",
"first_snowfall",
"freezing_rain_events",
"frost_free_season_end",
"frost_free_season_length",
"frost_free_season_start",
Expand Down Expand Up @@ -2654,6 +2656,123 @@ def wetdays_prop(
return fwd


@declare_units(pr="[precipitation]", thresh="[precipitation]")
def freezing_rain_events(
pr: xarray.DataArray,
thresh: Quantified = "1 kg m-2 d-1",
window_start: int = 3,
window_stop: int = 3,
freq: str | None = None,
) -> xarray.Dataset:
r"""Freezing rain events.

Parameters
----------
pr : xarray.DataArray
Freezing precipitation (`prfr`)
thresh : Quantified
Threshold that must be exceeded to be considered an event
window_start: int
Number of time steps above the threshold required to start an event
window_stop : int
Number of time steps below the threshold required to stop an event
freq : str
Resampling frequency.

Returns
-------
xarray.Dataset
"""
freq = xarray.infer_freq(pr.time)
mag, units, _, _ = parse_offset(freq)
# condition to respect for `window_start` time steps to start a run
thresh = convert_units_to(thresh, pr)
da_start = pr >= thresh
da_stop = (1 - da_start).astype(bool)

# Get basic blocks to work with, our runs with holes and the lengths of those runs
# Series of ones indicating where we have continuous runs of freezing rain with pauses
# not exceeding `window_stop`
runs = rl.runs_with_holes(da_start, window_start, da_stop, window_stop)

# Compute the length of freezing rain events
# I think int16 is safe enough
ds = rl.rle(runs).to_dataset(name="run_lengths")
ds["run_lengths"] = ds.run_lengths.astype(np.int16)
ds.run_lengths.attrs["units"] = ""

# Time duration where the precipitation threshold is exceeded during an event
# (duration of complete run - duration of holes in the run )
ds["precipitation_duration"] = (
rl._cumsum_reset(
da_start.where(runs == 1), index="first", reset_on_zero=False
).astype(np.int16)
* mag
)
ds["precipitation_duration"].attrs["units"] = units

# # Cumulated precipitation in a given freezing rain event
pram = rate2amount(pr)
ds["cumulative_precipitation"] = rl._cumsum_reset(
pram.where(runs == 1), index="first", reset_on_zero=False
)
ds["cumulative_precipitation"].attrs["units"] = pram.units

# Keep time as a variable, it will be used to keep start of events
ds["start"] = ds["time"].broadcast_like(ds) # .astype(int)
# I have to convert it to an integer for the filtering, time object won't do
# Since there are conversion needing a time object earlier, I think it's ok
# to assume this here?
time_min = ds.start.min()
ds["start"] = (ds.start - time_min).astype("timedelta64[s]").astype(int)

# Filter events: Reduce time dimension
def _filter_events(da, rl, max_event_number):
out = np.full(max_event_number, np.NaN)
events_start = da[rl > 0]
out[: len(events_start)] = events_start
return out

max_event_number = int(np.ceil(pr.time.size / (window_start + window_stop)))
v_attrs = {v: ds[v].attrs for v in ds.data_vars}
ds = xarray.apply_ufunc(
_filter_events,
ds,
ds.run_lengths,
input_core_dims=[["time"], ["time"]],
output_core_dims=[["event"]],
kwargs=dict(max_event_number=max_event_number),
output_sizes={"event": max_event_number},
dask="parallelized",
vectorize=True,
).assign_attrs(ds.attrs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change allows to filter the events, reorder this dimension. This requires xr.apply_ufunc and contiguous time dimension though. I don't think the previous implementation would have worked for without contiguous time anyways.

However! The rest of the code uses calls to run_length and I believe this can be done with chunks over the time dimension. One other approach could to keep all time coords at first, and allow the use of yearly time chunks for instance, then save the output. Then, in a second pass, filter events using this apply_ufunc above.

ds["event"] = np.arange(1, ds.event.size + 1)
for v in ds.data_vars:
ds[v].attrs = v_attrs[v]

# convert back start to a time
ds["start"] = time_min.astype("datetime64[ns]") + ds["start"].astype(
"timedelta64[ns]"
)

# Other indices that could be completely done outside of the function, no input needed anymore

# number of events
ds["number_of_events"] = (ds["run_lengths"] > 0).sum(dim="event").astype(np.int16)
ds.number_of_events.attrs["units"] = ""

# mean rate of precipitation during event
ds["rate"] = ds["cumulative_precipitation"] / ds["precipitation_duration"]
units = (
f"{ds['cumulative_precipitation'].units}/{ds['precipitation_duration'].units}"
)
ds["rate"].attrs["units"] = ensure_cf_units(units)

ds.attrs["units"] = ""
return ds


@declare_units(tasmin="[temperature]", thresh="[temperature]")
def maximum_consecutive_frost_days(
tasmin: xarray.DataArray,
Expand Down
34 changes: 28 additions & 6 deletions xclim/indices/run_length.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ def resample_and_rl(
return out


def _cumsum_reset_on_zero(
def _cumsum_reset(
da: xr.DataArray,
dim: str = "time",
index: str = "last",
reset_on_zero: bool = True,
) -> xr.DataArray:
"""Compute the cumulative sum for each series of numbers separated by zero.

Expand All @@ -134,6 +135,9 @@ def _cumsum_reset_on_zero(
index : {'first', 'last'}
If 'first', the largest value of the cumulative sum is indexed with the first element in the run.
If 'last'(default), with the last element in the run.
reset_on_zero : bool
If True, the cumulative sum is reset on each zero value of `da`. Otherwise, the cumulative sum resets
on NaNs. Default is True.

Returns
-------
Expand All @@ -145,7 +149,10 @@ def _cumsum_reset_on_zero(

# Example: da == 100110111 -> cs_s == 100120123
cs = da.cumsum(dim=dim) # cumulative sum e.g. 111233456
cs2 = cs.where(da == 0) # keep only numbers at positions of zeroes e.g. N11NN3NNN
cond = da == 0 if reset_on_zero else da.isnull() # reset condition
cs2 = cs.where(
cond
) # keep only numbers at positions of zeroes e.g. N11NN3NNN (default)
cs2[{dim: 0}] = 0 # put a zero in front e.g. 011NN3NNN
cs2 = cs2.ffill(dim=dim) # e.g. 011113333
out = cs - cs2
Expand Down Expand Up @@ -186,7 +193,7 @@ def rle(
da = da[{dim: slice(None, None, -1)}]

# Get cumulative sum for each series of 1, e.g. da == 100110111 -> cs_s == 100120123
cs_s = _cumsum_reset_on_zero(da, dim)
cs_s = _cumsum_reset(da, dim)

# Keep total length of each series (and also keep 0's), e.g. 100120123 -> 100N20NN3
# Keep numbers with a 0 to the right and also the last number
Expand Down Expand Up @@ -495,7 +502,7 @@ def find_boundary_run(runs, position):

else:
# _cusum_reset_on_zero() is an intermediate step in rle, which is sufficient here
d = _cumsum_reset_on_zero(da, dim=dim, index=position)
d = _cumsum_reset(da, dim=dim, index=position)
d = xr.where(d >= window, 1, 0)
# for "first" run, return "first" element in the run (and conversely for "last" run)
if freq is not None:
Expand Down Expand Up @@ -744,8 +751,8 @@ def extract_events(
da_start = da_start.astype(int).fillna(0)
da_stop = da_stop.astype(int).fillna(0)

start_runs = _cumsum_reset_on_zero(da_start, dim=dim, index="first")
stop_runs = _cumsum_reset_on_zero(da_stop, dim=dim, index="first")
start_runs = _cumsum_reset(da_start, dim=dim, index="first")
stop_runs = _cumsum_reset(da_stop, dim=dim, index="first")
start_positions = xr.where(start_runs >= window_start, 1, np.NaN)
stop_positions = xr.where(stop_runs >= window_stop, 0, np.NaN)

Expand All @@ -755,6 +762,21 @@ def extract_events(
return runs


def runs_with_holes(da_start, window_start, da_stop, window_stop, dim="time"):
"""Runs with holes"""
da_start = da_start.astype(int).fillna(0)
da_stop = da_stop.astype(int).fillna(0)

start_runs = _cumsum_reset(da_start, dim=dim, index="first")
stop_runs = _cumsum_reset(da_stop, dim=dim, index="first")
start_positions = xr.where(start_runs >= window_start, 1, np.NaN)
stop_positions = xr.where(stop_runs >= window_stop, 0, np.NaN)

# start positions (1) are f-filled until a stop position (0) is met
runs = stop_positions.combine_first(start_positions).ffill(dim=dim).fillna(0)
return runs


def season(
da: xr.DataArray,
window: int,
Expand Down
Loading