Skip to content

Commit

Permalink
local trend filling strategy finished. tests almost finished
Browse files Browse the repository at this point in the history
  • Loading branch information
JGuetschow committed Nov 28, 2024
1 parent 10155ae commit 87d0746
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 90 deletions.
30 changes: 15 additions & 15 deletions primap2/csg/_strategies/gaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,13 @@ def calculate_boundary_trend_with_fallback(
gap=gap,
fit_params=fit_params,
)
if not all(trend_ts):
if any(np.isnan(trend_ts)):
trend_ts = calculate_boundary_trend(
ts,
gap=gap,
fit_params=fit_params.get_fallback(),
)
if not all(trend_ts):
if any(np.isnan(trend_ts)):
logger.info(
f"Not enough values to calculate fit for ts and gap:"
f"{gap.type}, [{gap.left}:{gap.right}].\n"
Expand Down Expand Up @@ -276,14 +276,14 @@ def calculate_boundary_trend(
else:
raise ValueError(f"Unknown gap type: {gap.type}")

return [left, right]
return np.array([left, right])


def calculate_right_boundary_trend(
ts: xr.DataArray,
boundary: np.datetime64,
fit_params: FitParameters,
) -> float | None:
) -> float:
"""
Replace right boundary point by trend value
Expand Down Expand Up @@ -330,14 +330,14 @@ def calculate_right_boundary_trend(
f"{fit_params.log_string(fallback=False)}"
f"Timeseries info: {timeseries_coord_repr(ts)}"
)
return None
return np.nan


def calculate_left_boundary_trend(
ts: xr.DataArray,
boundary: np.datetime64,
fit_params: FitParameters,
) -> float | None:
) -> float:
"""
Replace left boundary point by trend value
Expand Down Expand Up @@ -385,7 +385,7 @@ def calculate_left_boundary_trend(
f"{fit_params.log_string(fallback=False)}"
f"Timeseries info: {timeseries_coord_repr(ts)}"
)
return None
return np.nan


def calculate_scaling_factor(
Expand Down Expand Up @@ -424,27 +424,27 @@ def calculate_scaling_factor(
gap=gap,
fit_params=fit_params,
)
if not all(trend_ts):
if any(np.isnan(trend_ts)):
# logging has been done already
return None
return np.array([np.nan, np.nan])

# trend values for fill_ts
trend_fill = calculate_boundary_trend_with_fallback(
fill_ts,
gap=gap,
fit_params=fit_params,
)
if not all(trend_fill):
if any(np.isnan(trend_fill)):
# logging has been done already
return None
return np.array([np.nan, np.nan])

factor = np.divide(trend_ts, trend_fill)
if not all(factor):
# we have some nan values which have to come from division by zero
if any(np.isnan(factor)) or any(np.isinf(factor)):
# we have some nan or inf values which have to come from division by zero
# we fill them with 0 in case the trend values are zero as well
nan_mask_factor = np.isnan(factor)
nan_mask_factor = np.isnan(factor) | np.isinf(factor)
zero_mask_ts = trend_ts == 0
factor[nan_mask_factor and zero_mask_ts] = trend_ts[nan_mask_factor and zero_mask_ts]
factor[nan_mask_factor & zero_mask_ts] = trend_ts[nan_mask_factor & zero_mask_ts]

return factor

Expand Down
91 changes: 62 additions & 29 deletions primap2/csg/_strategies/local_trends.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import numpy as np
import xarray as xr
from attrs import frozen

Expand Down Expand Up @@ -143,65 +144,97 @@ def fill(
filled how.
"""
filled_mask = ts.isnull() & ~fill_ts.isnull()
time_filled = filled_mask["time"][filled_mask].to_numpy()
time_fillable = filled_mask["time"][filled_mask].to_numpy()
# overlap = ts.notnull() & fill_ts.notnull() # commented for linting to pass

if time_filled.any():
# TODO implement boundary and gap filling
if time_fillable.any():
any_filled = False
time_filled = np.array([], dtype=np.datetime64)
description = (
f"filled with local trend matched data from {fill_ts_repr}. "
f"The following gaps have been filled:"
)
gaps = get_gaps(ts)
filled_ts = ts.copy()
for gap in gaps:
gap_description = (
f" gap {np.datetime_as_string(gap.left, unit='h')}"
f" - {np.datetime_as_string(gap.right, unit='h')}:"
)
# check if we have information for the specific gap
filled_mask_gap = filled_mask.pr.loc[gap.get_date_slice()]
time_filled_gap = filled_mask_gap["time"][filled_mask_gap].to_numpy()

if time_filled_gap.any():
any_filled = True
# get factor
factor = calculate_scaling_factor(
ts=ts,
fill_ts=fill_ts,
gap=gap,
fit_params=self.fit_params,
)
if any(factor < 0) and not self.allow_negative:
print("not implemented")
# TODO: fallback and log message if that fails as well
# multiply fill_ts by factor
# check if positive or negative allowed. if true proceed, if false
# use fallback

# fill nans (in the gap only)
filled_ts = fill_gap(ts=ts, fill_ts=fill_ts, gap=gap, factor=factor)
# it would be more consistent to handle the negative value fallback in
# calculate_scaling_factor as well. It comes with the drawback that
# it can't be controlled from the filling function and that we have
# to deal with different return values here

# update description
if factor[0] == factor[1]:
description = (
description + f" gap {gap.left!s}-{gap.right!s} for times "
f"{time_filled_gap} using factor {factor[0]};"
if any(factor < 0) and not self.allow_negative:
factor = calculate_scaling_factor(
ts=ts,
fill_ts=fill_ts,
gap=gap,
fit_params=self.fit_params.get_fallback(),
)
else:
description = (
description + f" gap {gap.left!s}-{gap.right!s} for times "
f"{time_filled_gap} using factors {factor[0]} "
f"and {factor[1]};"
gap_description = (
gap_description + f" negative scaling factor - use fallback degree "
f"{self.fit_params.fallback_degree}"
)

# e = fill_ts[overlap.data].data
# e_ref = ts[overlap.data].data
# a0 = [1] # start with 1 as scaling factor
# res = least_squares(self._factor_mult, a0, jac=self._jac, args=(e, e_ref))
#
# fill_ts_h = fill_ts * res["x"][0]
# filled_ts = xr.core.ops.fillna(ts, fill_ts_h, join="exact")
if any(factor < 0) and not self.allow_negative:
# negative with fallback. fail to fill gap
gap_description = (
gap_description
+ " negative scaling after fallback - failed to fill gap;"
)
else:
if any(np.isnan(factor)):
# fail because no factor can be calculated
gap_description = (
gap_description + " scaling factor is nan - failed to fill gap;"
)
else:
any_filled = True
time_filled = np.concatenate((time_filled, time_filled_gap))

# fill nans (in the gap only)
filled_ts = fill_gap(
ts=filled_ts, fill_ts=fill_ts, gap=gap, factor=factor
)

if factor[0] == factor[1]:
gap_description = (
gap_description + f" filled for times "
f"{np.datetime_as_string(time_filled_gap, unit='h')} "
f"using factor {factor[0]};"
)
else:
gap_description = (
gap_description + f" filled for times "
f"{np.datetime_as_string(time_filled_gap, unit='h')} "
f"using factors {factor[0]} and {factor[1]};"
)

# update description
description = description + gap_description

if any_filled:
descriptions = [
primap2.ProcessingStepDescription(
time=time_filled,
description=description,
# Factor={res['x'][0]:0.3f}",
function=self.type,
source=fill_ts_repr,
)
Expand All @@ -216,7 +249,7 @@ def fill(
filled_ts = ts
descriptions = [
primap2.ProcessingStepDescription(
time=time_filled,
time=np.array([], dtype=np.datetime64),
description=f"no additional data in {fill_ts_repr}",
function=self.type,
source=fill_ts_repr,
Expand Down
34 changes: 30 additions & 4 deletions primap2/tests/csg/test_gaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def test_calculate_left_boundary_trend(test_ts, fit_params_linear, fit_params_co
trend_value = calculate_left_boundary_trend(
test_ts, boundary=gaps[1].left, fit_params=fit_params_linear
)
assert trend_value is None
assert np.isnan(trend_value)

log_str = (
"Not enough values to calculate fit for left boundary at "
Expand Down Expand Up @@ -290,7 +290,7 @@ def test_calculate_right_boundary_trend(test_ts, fit_params_linear, fit_params_c
boundary=gaps[0].right,
fit_params=fit_params_linear,
)
assert trend_value is None
assert np.isnan(trend_value)

log_str = (
"Not enough values to calculate fit for right boundary at "
Expand Down Expand Up @@ -397,13 +397,39 @@ def test_calculate_boundary_trend_with_fallback(test_ts, fit_params_linear):
assert np.allclose(trend_values, expected_constant, rtol=1e-04)


def test_calculate_scaling_factor(test_ts, fill_ts, fit_params_linear):
def test_calculate_scaling_factor(test_ts, fill_ts, fit_params_linear, caplog):
gaps = get_gaps(test_ts)
# TODO: special cases, fallback etc

factor = calculate_scaling_factor(
ts=test_ts, fill_ts=fill_ts, fit_params=fit_params_linear, gap=gaps[2]
)
assert np.allclose([0.33333, 1], factor)
# as fallback will be used we also check for the log message (not strictly necessary
# to test his as it's raise by a different function)
assert (
"Not enough values to calculate fit for right boundary "
"at 1973-01-01T00:00:00.000000000.\nfit_degree: 1, trend_length: 10, "
"trend_length_unit: YS, min_trend_points: 5.\n"
"Timeseries info: {'category': 'test'}" in caplog.text
)

# zero for fill_ts only result should be nan
fill_ts.pr.loc[{"time": pd.date_range("1956-01-01", "1966-01-01", freq="YS")}] = (
fill_ts.pr.loc[{"time": pd.date_range("1956-01-01", "1966-01-01", freq="YS")}] * 0
)
factor = calculate_scaling_factor(
ts=test_ts, fill_ts=fill_ts, fit_params=fit_params_linear, gap=gaps[0]
)
assert all(np.isinf(factor))

# zero for both ts: result should be 0
test_ts.pr.loc[{"time": pd.date_range("1956-01-01", "1966-01-01", freq="YS")}] = (
test_ts.pr.loc[{"time": pd.date_range("1956-01-01", "1966-01-01", freq="YS")}] * 0
)
factor = calculate_scaling_factor(
ts=test_ts, fill_ts=fill_ts, fit_params=fit_params_linear, gap=gaps[0]
)
assert np.allclose(factor, [0, 0])


def test_fill_gap(test_ts, fill_ts, expected_ts, fit_params_linear):
Expand Down
Loading

0 comments on commit 87d0746

Please sign in to comment.