From 87d07468983f846346850892730a7acf51c16915 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20G=C3=BCtschow?= Date: Thu, 28 Nov 2024 12:26:44 +0100 Subject: [PATCH] local trend filling strategy finished. tests almost finished --- primap2/csg/_strategies/gaps.py | 30 ++++---- primap2/csg/_strategies/local_trends.py | 91 +++++++++++++++++-------- primap2/tests/csg/test_gaps.py | 34 +++++++-- primap2/tests/csg/test_strategies.py | 88 ++++++++++++------------ 4 files changed, 153 insertions(+), 90 deletions(-) diff --git a/primap2/csg/_strategies/gaps.py b/primap2/csg/_strategies/gaps.py index 69f57d8..e02b731 100644 --- a/primap2/csg/_strategies/gaps.py +++ b/primap2/csg/_strategies/gaps.py @@ -201,13 +201,13 @@ def calculate_boundary_trend_with_fallback( gap=gap, fit_params=fit_params, ) - if not all(trend_ts): + if any(np.isnan(trend_ts)): trend_ts = calculate_boundary_trend( ts, gap=gap, fit_params=fit_params.get_fallback(), ) - if not all(trend_ts): + if any(np.isnan(trend_ts)): logger.info( f"Not enough values to calculate fit for ts and gap:" f"{gap.type}, [{gap.left}:{gap.right}].\n" @@ -276,14 +276,14 @@ def calculate_boundary_trend( else: raise ValueError(f"Unknown gap type: {gap.type}") - return [left, right] + return np.array([left, right]) def calculate_right_boundary_trend( ts: xr.DataArray, boundary: np.datetime64, fit_params: FitParameters, -) -> float | None: +) -> float: """ Replace right boundary point by trend value @@ -330,14 +330,14 @@ def calculate_right_boundary_trend( f"{fit_params.log_string(fallback=False)}" f"Timeseries info: {timeseries_coord_repr(ts)}" ) - return None + return np.nan def calculate_left_boundary_trend( ts: xr.DataArray, boundary: np.datetime64, fit_params: FitParameters, -) -> float | None: +) -> float: """ Replace left boundary point by trend value @@ -385,7 +385,7 @@ def calculate_left_boundary_trend( f"{fit_params.log_string(fallback=False)}" f"Timeseries info: {timeseries_coord_repr(ts)}" ) - return None + return np.nan def calculate_scaling_factor( @@ -424,9 +424,9 @@ def calculate_scaling_factor( gap=gap, fit_params=fit_params, ) - if not all(trend_ts): + if any(np.isnan(trend_ts)): # logging has been done already - return None + return np.array([np.nan, np.nan]) # trend values for fill_ts trend_fill = calculate_boundary_trend_with_fallback( @@ -434,17 +434,17 @@ def calculate_scaling_factor( gap=gap, fit_params=fit_params, ) - if not all(trend_fill): + if any(np.isnan(trend_fill)): # logging has been done already - return None + return np.array([np.nan, np.nan]) factor = np.divide(trend_ts, trend_fill) - if not all(factor): - # we have some nan values which have to come from division by zero + if any(np.isnan(factor)) or any(np.isinf(factor)): + # we have some nan or inf values which have to come from division by zero # we fill them with 0 in case the trend values are zero as well - nan_mask_factor = np.isnan(factor) + nan_mask_factor = np.isnan(factor) | np.isinf(factor) zero_mask_ts = trend_ts == 0 - factor[nan_mask_factor and zero_mask_ts] = trend_ts[nan_mask_factor and zero_mask_ts] + factor[nan_mask_factor & zero_mask_ts] = trend_ts[nan_mask_factor & zero_mask_ts] return factor diff --git a/primap2/csg/_strategies/local_trends.py b/primap2/csg/_strategies/local_trends.py index f989986..6caef90 100644 --- a/primap2/csg/_strategies/local_trends.py +++ b/primap2/csg/_strategies/local_trends.py @@ -1,3 +1,4 @@ +import numpy as np import xarray as xr from attrs import frozen @@ -143,24 +144,28 @@ def fill( filled how. """ filled_mask = ts.isnull() & ~fill_ts.isnull() - time_filled = filled_mask["time"][filled_mask].to_numpy() + time_fillable = filled_mask["time"][filled_mask].to_numpy() # overlap = ts.notnull() & fill_ts.notnull() # commented for linting to pass - if time_filled.any(): - # TODO implement boundary and gap filling + if time_fillable.any(): any_filled = False + time_filled = np.array([], dtype=np.datetime64) description = ( f"filled with local trend matched data from {fill_ts_repr}. " f"The following gaps have been filled:" ) gaps = get_gaps(ts) + filled_ts = ts.copy() for gap in gaps: + gap_description = ( + f" gap {np.datetime_as_string(gap.left, unit='h')}" + f" - {np.datetime_as_string(gap.right, unit='h')}:" + ) # check if we have information for the specific gap filled_mask_gap = filled_mask.pr.loc[gap.get_date_slice()] time_filled_gap = filled_mask_gap["time"][filled_mask_gap].to_numpy() if time_filled_gap.any(): - any_filled = True # get factor factor = calculate_scaling_factor( ts=ts, @@ -168,40 +173,68 @@ def fill( gap=gap, fit_params=self.fit_params, ) - if any(factor < 0) and not self.allow_negative: - print("not implemented") - # TODO: fallback and log message if that fails as well - # multiply fill_ts by factor + # check if positive or negative allowed. if true proceed, if false + # use fallback - # fill nans (in the gap only) - filled_ts = fill_gap(ts=ts, fill_ts=fill_ts, gap=gap, factor=factor) + # it would be more consistent to handle the negative value fallback in + # calculate_scaling_factor as well. It comes with the drawback that + # it can't be controlled from the filling function and that we have + # to deal with different return values here - # update description - if factor[0] == factor[1]: - description = ( - description + f" gap {gap.left!s}-{gap.right!s} for times " - f"{time_filled_gap} using factor {factor[0]};" + if any(factor < 0) and not self.allow_negative: + factor = calculate_scaling_factor( + ts=ts, + fill_ts=fill_ts, + gap=gap, + fit_params=self.fit_params.get_fallback(), ) - else: - description = ( - description + f" gap {gap.left!s}-{gap.right!s} for times " - f"{time_filled_gap} using factors {factor[0]} " - f"and {factor[1]};" + gap_description = ( + gap_description + f" negative scaling factor - use fallback degree " + f"{self.fit_params.fallback_degree}" ) - # e = fill_ts[overlap.data].data - # e_ref = ts[overlap.data].data - # a0 = [1] # start with 1 as scaling factor - # res = least_squares(self._factor_mult, a0, jac=self._jac, args=(e, e_ref)) - # - # fill_ts_h = fill_ts * res["x"][0] - # filled_ts = xr.core.ops.fillna(ts, fill_ts_h, join="exact") + if any(factor < 0) and not self.allow_negative: + # negative with fallback. fail to fill gap + gap_description = ( + gap_description + + " negative scaling after fallback - failed to fill gap;" + ) + else: + if any(np.isnan(factor)): + # fail because no factor can be calculated + gap_description = ( + gap_description + " scaling factor is nan - failed to fill gap;" + ) + else: + any_filled = True + time_filled = np.concatenate((time_filled, time_filled_gap)) + + # fill nans (in the gap only) + filled_ts = fill_gap( + ts=filled_ts, fill_ts=fill_ts, gap=gap, factor=factor + ) + + if factor[0] == factor[1]: + gap_description = ( + gap_description + f" filled for times " + f"{np.datetime_as_string(time_filled_gap, unit='h')} " + f"using factor {factor[0]};" + ) + else: + gap_description = ( + gap_description + f" filled for times " + f"{np.datetime_as_string(time_filled_gap, unit='h')} " + f"using factors {factor[0]} and {factor[1]};" + ) + + # update description + description = description + gap_description + if any_filled: descriptions = [ primap2.ProcessingStepDescription( time=time_filled, description=description, - # Factor={res['x'][0]:0.3f}", function=self.type, source=fill_ts_repr, ) @@ -216,7 +249,7 @@ def fill( filled_ts = ts descriptions = [ primap2.ProcessingStepDescription( - time=time_filled, + time=np.array([], dtype=np.datetime64), description=f"no additional data in {fill_ts_repr}", function=self.type, source=fill_ts_repr, diff --git a/primap2/tests/csg/test_gaps.py b/primap2/tests/csg/test_gaps.py index 33846fa..20eadcd 100644 --- a/primap2/tests/csg/test_gaps.py +++ b/primap2/tests/csg/test_gaps.py @@ -237,7 +237,7 @@ def test_calculate_left_boundary_trend(test_ts, fit_params_linear, fit_params_co trend_value = calculate_left_boundary_trend( test_ts, boundary=gaps[1].left, fit_params=fit_params_linear ) - assert trend_value is None + assert np.isnan(trend_value) log_str = ( "Not enough values to calculate fit for left boundary at " @@ -290,7 +290,7 @@ def test_calculate_right_boundary_trend(test_ts, fit_params_linear, fit_params_c boundary=gaps[0].right, fit_params=fit_params_linear, ) - assert trend_value is None + assert np.isnan(trend_value) log_str = ( "Not enough values to calculate fit for right boundary at " @@ -397,13 +397,39 @@ def test_calculate_boundary_trend_with_fallback(test_ts, fit_params_linear): assert np.allclose(trend_values, expected_constant, rtol=1e-04) -def test_calculate_scaling_factor(test_ts, fill_ts, fit_params_linear): +def test_calculate_scaling_factor(test_ts, fill_ts, fit_params_linear, caplog): gaps = get_gaps(test_ts) - # TODO: special cases, fallback etc + factor = calculate_scaling_factor( ts=test_ts, fill_ts=fill_ts, fit_params=fit_params_linear, gap=gaps[2] ) assert np.allclose([0.33333, 1], factor) + # as fallback will be used we also check for the log message (not strictly necessary + # to test his as it's raise by a different function) + assert ( + "Not enough values to calculate fit for right boundary " + "at 1973-01-01T00:00:00.000000000.\nfit_degree: 1, trend_length: 10, " + "trend_length_unit: YS, min_trend_points: 5.\n" + "Timeseries info: {'category': 'test'}" in caplog.text + ) + + # zero for fill_ts only result should be nan + fill_ts.pr.loc[{"time": pd.date_range("1956-01-01", "1966-01-01", freq="YS")}] = ( + fill_ts.pr.loc[{"time": pd.date_range("1956-01-01", "1966-01-01", freq="YS")}] * 0 + ) + factor = calculate_scaling_factor( + ts=test_ts, fill_ts=fill_ts, fit_params=fit_params_linear, gap=gaps[0] + ) + assert all(np.isinf(factor)) + + # zero for both ts: result should be 0 + test_ts.pr.loc[{"time": pd.date_range("1956-01-01", "1966-01-01", freq="YS")}] = ( + test_ts.pr.loc[{"time": pd.date_range("1956-01-01", "1966-01-01", freq="YS")}] * 0 + ) + factor = calculate_scaling_factor( + ts=test_ts, fill_ts=fill_ts, fit_params=fit_params_linear, gap=gaps[0] + ) + assert np.allclose(factor, [0, 0]) def test_fill_gap(test_ts, fill_ts, expected_ts, fit_params_linear): diff --git a/primap2/tests/csg/test_strategies.py b/primap2/tests/csg/test_strategies.py index 55881ca..d1ff270 100644 --- a/primap2/tests/csg/test_strategies.py +++ b/primap2/tests/csg/test_strategies.py @@ -121,7 +121,6 @@ def test_localTrends_strategy(): fill_ts = get_single_ts(data=2.0) # nothing to fill - fit_params = FitParameters( trend_length=10, min_trend_points=5, @@ -149,51 +148,56 @@ def test_localTrends_strategy(): assert ( result_descriptions[0].description == "filled with local trend matched data from B. " "The following gaps have been filled: " - "gap 1850-01-01T00:00:00.000000000-1850-01-01T00:00:00.000000000 for " - "times ['1850-01-01T00:00:00.000000000'] using factor 0.5;" + "gap 1850-01-01T00 - 1850-01-01T00: filled for " + "times ['1850-01-01T00'] using factor 0.5;" ) + ts[20:22] = np.nan + result_ts, result_descriptions = primap2.csg.LocalTrendsStrategy(fit_params=fit_params).fill( + ts=ts, fill_ts=fill_ts, fill_ts_repr="B" + ) + xr.testing.assert_allclose(get_single_ts(data=1.0), result_ts) + assert len(result_descriptions) == 1 + assert all( + result_descriptions[0].time == np.array(["1850", "1870", "1871"], dtype=np.datetime64) + ) + assert ( + result_descriptions[0].description == "filled with local trend matched data from B. " + "The following gaps have been filled: " + "gap 1850-01-01T00 - 1850-01-01T00: filled for " + "times ['1850-01-01T00'] using factor 0.5; " + "gap 1870-01-01T00 - 1871-01-01T00: filled for " + "times ['1870-01-01T00' '1871-01-01T00'] using factor 0.5;" + ) + + fill_ts[23:] = fill_ts[23:] * -1 + result_ts, result_descriptions = primap2.csg.LocalTrendsStrategy(fit_params=fit_params).fill( + ts=ts, fill_ts=fill_ts, fill_ts_repr="B" + ) + expected_ts = ts.copy() + expected_ts[0] = 1 + xr.testing.assert_allclose(expected_ts, result_ts) + assert all(result_descriptions[0].time == np.array(["1850"], dtype=np.datetime64)) + assert ( + result_descriptions[0].description == "filled with local trend matched data from B. " + "The following gaps have been filled: " + "gap 1850-01-01T00 - 1850-01-01T00: filled for " + "times ['1850-01-01T00'] using factor 0.5; " + "gap 1870-01-01T00 - 1871-01-01T00: negative scaling factor - " + "use fallback degree 0 negative scaling after fallback - " + "failed to fill gap;" + ) + + ts[1:5] = np.nan + fill_ts[5:] = np.nan + with pytest.raises(StrategyUnableToProcess): + primap2.csg.LocalTrendsStrategy(fit_params=fit_params).fill( + ts=ts, fill_ts=fill_ts, fill_ts_repr="B" + ) + # TODO: to test - # no gap filled despite data present: error raised - # middle gap filled? (maybe testing the function is enough) + # all fails and fallbacks # fallback if negative - # description when multiple gaps are filled - # description for unfilled gaps? currently there is none - - # # allow_shift = True - # ts[1:5] = 0.5 - # fill_ts[0:5] = 1.5 - # result_ts, result_descriptions = primap2.csg.GlobalLSStrategy(allow_shift=True).fill( - # ts=ts, fill_ts=fill_ts, fill_ts_repr="B" - # ) - # - # expected_ts = ts.copy(deep=True) - # expected_ts[0] = 0.5 - # xr.testing.assert_allclose(expected_ts, result_ts) - # assert len(result_descriptions) == 1 - # assert result_descriptions[0].time == np.array(["1850"], dtype=np.datetime64) - # assert ( - # result_descriptions[0].description == - # "filled with least squares matched data from B. " - # "a*x+b with a=1.000, b=-1.000" - # ) - # - # # error for negative emissions - # ts[1:5] = 0.5 - # fill_ts[1:5] = 1.5 - # fill_ts[0] = 0.1 - # with pytest.raises(StrategyUnableToProcess): - # result_ts, result_descriptions = primap2.csg.GlobalLSStrategy( - # allow_shift=True, allow_negative=False - # ).fill(ts=ts, fill_ts=fill_ts, fill_ts_repr="B") - # - # # error for no overlap - # ts[1:5] = np.nan - # fill_ts[5:] = np.nan - # with pytest.raises(StrategyUnableToProcess): - # result_ts, result_descriptions = primap2.csg.GlobalLSStrategy( - # allow_shift=True, allow_negative=False - # ).fill(ts=ts, fill_ts=fill_ts, fill_ts_repr="B") # general assert "source" not in result_ts.coords.keys()