diff --git a/unseen/eva.py b/unseen/eva.py index f0e57fc..46da1a0 100644 --- a/unseen/eva.py +++ b/unseen/eva.py @@ -4,8 +4,9 @@ import numpy as np from scipy.optimize import minimize from scipy.stats import genextreme, goodness_of_fit +from scipy.stats.distributions import chi2 import warnings -from xarray import apply_ufunc +from xarray import apply_ufunc, DataArray def event_in_context(data, threshold, direction): @@ -45,196 +46,259 @@ def event_in_context(data, threshold, direction): return n_events, n_population, return_period, percentile +def fit_stationary_gev(x, user_estimates=[], generate_estimates=False): + """Estimate stationary shape, location and scale parameters. + + Parameters + ---------- + x : array_like + Data to use in estimating the distribution parameters + user_estimates : list, optional + Initial guess of the shape, loc and scale parameters + generate_estimates : bool, optional + Generate initial parameter guesses using a data subset + + Returns + ------- + shape, loc, scale : float + GEV parameters + """ + + if any(user_estimates): + shape, loc, scale = user_estimates + shape, loc, scale = genextreme.fit(x, shape, loc=loc, scale=scale) + + elif generate_estimates: + # Generate initial estimates using a data subset (useful for large datasets) + shape, loc, scale = genextreme.fit(x[::2]) + shape, loc, scale = genextreme.fit(x, shape, loc=loc, scale=scale) + else: + shape, loc, scale = genextreme.fit(x) + return shape, loc, scale + + +def penalised_sum(x): + """Sum finite values in a 1D array and add non-finite penalties. + + This is a utility function used when evaluating the negative + log-likelihood for a distribution and an array of samples. + Adapted/stolen from: + https://github.com/scipy/scipy/blob/v1.11.3/scipy/stats/_distn_infrastructure.py + """ + finite_x = np.isfinite(x) + bad_count = finite_x.size - np.count_nonzero(finite_x) + + total = np.sum(x[finite_x]) + penalty = bad_count * np.log(np.finfo(float).max) * 100 + + return total + penalty + + +def nllf(theta, x, covariate=None): + """Penalised negative log-likelihood function. + + Parameters + ---------- + theta : tuple of floats + Distribution parameters (stationary or non-stationary) + x : array_like + Data to use in estimating the distribution parameters + covariate : array_like, optional + Covariate used to estimate nonstationary parameters + + Returns + ------- + total : float + The penalised NLLF summed over all values of x + + Notes + ----- + This is modified version of `scipy.stats.genextreme.fit` for fitting extreme + value distribution parameters, in which the location and scale parameters + can vary linearly with a covariate. + The log-likelihood equations are based on Méndez et al. (2007). + It is suitable for stationary or nonstationary distributions: + - theta = shape, loc, scale + - theta = shape, loc, loc1, scale, scale1 + The nonstationary parameters are returned if `theta` incudes the location + and scale trend parameters. + A large finite penalty (instead of infinity) is applied for observations + beyond the support of the distribution. + The NLLF is not finite when the shape is nonzero and Z is negative because + the PDF is zero (i.e., ``log(0)=inf)``). + """ + if len(theta) == 5: + # Nonstationary GEV parameters + shape, loc0, loc1, scale0, scale1 = theta + loc = loc0 + loc1 * covariate + scale = scale0 + scale1 * covariate + + else: + # Stationary GEV parameters + shape, loc, scale = theta + + s = (x - loc) / scale + + # Calculate the NLLF (type 1 or types 2-3 extreme value distributions) + # Type I extreme value distributions (Gumbel) + if shape == 0: + valid = scale > 0 + L = np.log(scale, where=valid) + s + np.exp(-s) + + # Types II & III extreme value distributions (Fréchet and Weibull) + else: + Z = 1 + shape * s + # The log-likelihood function is finite when the shape and Z are positive + valid = np.isfinite(Z) & (Z > 0) & (scale > 0) + L = ( + np.log(scale, where=valid) + + (1 + 1 / shape) * np.log(Z, where=valid) + + np.power(Z, -1 / shape, where=valid) + ) + + L = np.where(valid, L, np.inf) + + # Sum function along all axes (where finite) & add penalty for each infinite element + total = penalised_sum(L) + return total + + def fit_gev( data, - user_estimates=None, - loc1=0, - scale1=0, - covariate="time", core_dim="time", stationary=True, - check_fit=True, - check_relative_fit=None, + covariate=None, + loc1=0, + scale1=0, + test_fit_goodness=False, + relative_fit_test=None, alpha=0.05, + user_estimates=[], generate_estimates=False, method="Nelder-Mead", ): - """Estimate parameters for stationary or nonstationary data distributions. + """Estimate stationary or nonstationary GEV distribution parameters. Parameters ---------- - data : numpy.ndarray - One-dimensional data array to fit - user estimates: list, default None - Initial estimates of the shape, loc and scale parameters - loc1, scale1 : float, default 0 - Initial estimates of the location and scale trend parameters - covariate : array-like or string, default "time" - A non-stationary covariate array or coordinate name (non-stationary only). - core_dim : str, default "time" - Name of time/sample dimension in 'data' - stationary : bool, default True - Fit as a stationary GEV using "scipy.stats.genextremes.fit" - check_fit : bool, default False - Test goodness of fit and attempt retry - check_relative_fit : (None, "aic", "bic", "lrt"), default "aic". - Method to test relative fit of stationary and non-stationary models. - The trend paramters are set to zero if the stationary fit is better (non-stationary only) - alpha : float, default 0.05 - Goodness of fit p-value threshold - generate_estimates : bool, default False + data : array_like + Data to use in estimating the distribution parameters + core_dim : str, optional + Name of time/sample dimension in `data`. Default: "time". + stationary : bool, optional + Fit as a stationary GEV using `fit_stationary_gev`. Default: True. + covariate : array_like or str, optional + A nonstationary covariate array or coordinate name + loc1, scale1 : float or None, optional + Initial guess of trend parameters. If None, the trend is fixed at zero. + test_fit_goodness : bool, optional + Test goodness of fit and attempt retry. Default False. + relative_fit_test : {None, 'lrt', 'aic', 'bic'}, optional + Method to test relative fit of stationary and nonstationary models. + The trend parameters are set to zero if the stationary fit is better. + alpha : float, optional + Goodness of fit p-value threshold. Default 0.05. + user estimates: list, optional + Initial guess of the shape, loc and scale parameters + generate_estimates : bool, optional Generate initial parameter guesses using a data subset - method : str, default "Nelder-Mead" - Optimisation method for non-stationary fit (non-stationary only) + method : str, optional + Optimization method for nonstationary fit {'Nelder-Mead', 'L-BFGS-B', + 'TNC', 'SLSQP', 'Powell', 'trust-constr', 'COBYLA'}. Returns ------- - theta : tuple - Shape, location and scale parameters (and loc1, scale1 if applicable) + theta : xr.DataArray + The GEV distribution parameters with the same dimensions as `data` + (excluding `core_dim`) and a new dimension `theta`: + If stationary, theta = (shape, loc, scale). + If nonstationary, theta = (shape, loc0, loc1, scale0, scale1). Notes ----- - - For stationary data the shape, location and scale parameters are - estimated using 'scipy.stats.genextremes.fit()'. - - For non-stationary data, the linear location and scale trend - parameters are estimated using a penalised negative log-likelihood + For stationary data the shape, location and scale parameters are + estimated using `gev_stationary_fit`. + For nonstationary data, the linear location and scale trend + parameters are estimated using a penalized negative log-likelihood function with initial estimates based on the stationary fit. - - The distribution fit is considered good if the p-value is above - alpha (i.e., accept the null hypothesis). - - If the parameters fail the goodness of fit test, it will attempt - to fit the data again by generating an initial guess, if that - hasn't already been tried. - - If data is a stacked forecast ensemble, the covariate will need to be + The distribution fit is considered good if the p-value is above + `alpha` (i.e., accept the null hypothesis). Otherwise, it retry the fit + without `user_estimates` and with `generating_estimates`. + If data is a stacked forecast ensemble, the covariate may need to be stacked in the same way. """ kwargs = locals() # Function inputs - def fit_stationary_gev(data, user_estimates, generate_estimates): - """Estimate stationary shape, location and scale parameters.""" - if user_estimates: - shape, loc, scale = user_estimates - shape, loc, scale = genextreme.fit(data, shape, loc=loc, scale=scale) - - elif generate_estimates: - # Generate initial estimates using a data subset (useful for large datasets). - shape, loc, scale = genextreme.fit(data[::2]) - shape, loc, scale = genextreme.fit(data, shape, loc=loc, scale=scale) - else: - shape, loc, scale = genextreme.fit(data) - return shape, loc, scale - - def penalised_sum(x): - """Sum finite values in a 1D array and add nonfinite penalties. - - This is a utility function used when evaluating the negative - loglikelihood for a distribution and an array of samples. - Adapted/stolen from: - https://github.com/scipy/scipy/blob/v1.11.3/scipy/stats/_distn_infrastructure.py - """ - finite_x = np.isfinite(x) - bad_count = finite_x.size - np.count_nonzero(finite_x) - - total = np.sum(x[finite_x]) - penalty = bad_count * np.log(np.finfo(float).max) * 100 - - return total + penalty - - def nllf(theta, data, covariate=None): - """Penalised negative log-likelihood function. - - Parameters - ---------- - theta : tuple of floats - Distribution parameters (can be stationary or non-stationary) - or - data, covariate: array-like - Data to fit and covariate (e.g., timesteps). - - Returns - ------- - total : float - The penalised NLLF summed over all values of x. - - Notes - ----- - - A modified version of scipy.stats.genextremes.fit for fitting - extreme value distribution parameters, in which the location and - scale parameters can vary linearly with a covariate. - - Suitable for stationary or non-stationary distributions: - - theta = (shape, loc, scale) - - theta = (shape, loc, loc1, scale, scale1) - - The non-stationary parameters are returned if the input theta - incudes the varying location and scale parameters. - - A large finite penalty (instead of infinity) is applied for - observations beyond the support of the distribution. - - The NLLF is not finite when the shape is nonzero and Z is - negative because the PDF is zero (i.e., log(0)=inf)). - """ - if len(theta) == 5: - # Non-stationary GEV parameters. - shape, loc0, loc1, scale0, scale1 = theta - loc = loc0 + loc1 * covariate - scale = scale0 + scale1 * covariate - - else: - # Stationary GEV parameters. - shape, loc, scale = theta - - s = (data - loc) / scale + def _format_covariate(data, covariate, stationary, core_dim): + """Format or generate covariate .""" + if not stationary: + if isinstance(covariate, str): + # Select coordinate in data + covariate = data[covariate] + elif covariate is None: + # Guess covariate + if core_dim in data: + covariate = data[core_dim] + else: + covariate = np.arange(data.shape[0]) - # Calculate the NLLF (type 1 or types 2-3 extreme value distributions). - if shape == 0: - f = np.log(scale) + s + np.exp(-s) + if covariate.dtype.kind not in set("buifc"): + # Convert dates to numbers + covariate = date2num(covariate) + if not isinstance(covariate, DataArray): + # Convert to DataArray with the same core_dim as data + covariate = DataArray(covariate, dims=[core_dim]) else: - Z = 1 + shape * s - # NLLF where data is supported by the parameters (see notes). - f = np.where( - Z > 0, - np.log(scale) - + (1 + 1 / shape) * np.ma.log(Z) - + np.ma.power(Z, -1 / shape), - np.inf, - ) + covariate = 0 # Dummy covariate for apply_ufunc - f = np.where(scale > 0, f, np.inf) # Scale parameter must be positive. - - # Sum function along all axes (where finite) & add penalty for each infinite element. - total = penalised_sum(f) - return total + return covariate def _fit( data, + covariate, + core_dim, user_estimates, + generate_estimates, loc1, scale1, - covariate, - core_dim, stationary, - check_fit, - check_relative_fit, + test_fit_goodness, + relative_fit_test, alpha, - generate_estimates, method, ): """Estimate distribution parameters.""" - if not np.isfinite(data).all(): - warnings.warn("The data contains non-finite values.") - # Return NaNs if any input data is not finite + if np.all(~np.isfinite(data)): + # Return NaNs if all input data is infinite n = 3 if stationary else 5 return np.array([np.nan] * n) - # Use genextremes to get stationary distribution parameters + if np.isnan(data).any(): + # Mask NaNs in data + mask = np.isfinite(data) + data = data[mask] + if not stationary: + covariate = covariate[mask] + + # Use genextreme to get stationary distribution parameters theta = fit_stationary_gev(data, user_estimates, generate_estimates) if not stationary: - # Use genextremes fit as initial guesses (scipy.stats reverses sign of shape) + # Use genextreme as initial guesses shape, loc, scale = theta - theta_i = -shape, loc, loc1, scale, scale1 + # Temporarily reverse shape sign (scipy uses different sign convention) + theta_i = [-shape, loc, loc1, scale, scale1] # Optimisation bounds (scale parameter must be non-negative) - bounds = [(None, None)] * 5 - bounds[3] = (0, None) + bounds = [(None, None)] * len(theta_i) + bounds[3] = (0, None) # Positive scale parameter + if loc1 is None: + theta_i[2] = 0 + bounds[2] = (0, 0) # Only allow trend in scale + if scale1 is None: + theta_i[4] = 0 + bounds[4] = (0, 0) # Only allow trend in location # Minimise the negative log-likelihood function to get optimal theta res = minimize( @@ -246,22 +310,20 @@ def _fit( ) theta = res.x - if check_relative_fit is not None: - # Test relative fit of stationary and non-stationary models + if isinstance(relative_fit_test, str): + # Test relative fit of stationary and nonstationary models # Negative log likelihood using genextreme parameters - ll_stationary = nllf([-shape, loc, scale], data) - ll_nonstationary = res.fun + L1 = nllf([-shape, loc, scale], data) + L2 = res.fun result = check_gev_relative_fit( - data, - [ll_stationary, ll_nonstationary], - test=check_relative_fit, + data, L1, L2, test=relative_fit_test, alpha=alpha ) if result is False: warnings.warn( - f"{check_relative_fit} test failed. Returning stationary parameters." + f"{relative_fit_test} test failed. Returning stationary parameters." ) - # Return genextremes parameters with no trend + # Return stationary parameters (genextreme.fit output) with no trend theta = [shape, loc, 0, scale, 0] # Reverse shape sign for consistency with scipy.stats results @@ -269,151 +331,157 @@ def _fit( theta = np.array([i for i in theta], dtype="float64") - if check_fit and stationary: - pvalue = check_gev_fit(data, theta, core_dim=kwargs["core_dim"]) + if test_fit_goodness and stationary: + pvalue = check_gev_fit(data, theta, core_dim=core_dim) # Accept null distribution of the Anderson-darling test (same distribution) if np.all(pvalue < alpha): - if not kwargs["generate_estimates"]: + if any(kwargs["user_estimates"]): + warnings.warn("GEV fit failed. Retrying without user_estimates.") + kwargs["user_estimates"] = [None, None, None] + theta = _fit(data, covariate, **kwargs) + elif not kwargs["generate_estimates"]: warnings.warn( - "Data fit failed. Retrying with 'generate_estimates=True'." + "GEV fit failed. Retrying with generate_estimates=True." ) kwargs["generate_estimates"] = True # Also breaks loop - theta = fit(data, **kwargs) + theta = _fit(data, covariate, **kwargs) else: # Return NaNs theta = theta * np.nan warnings.warn("Data fit failed.") return theta - def fit(data, **kwargs): - """xarray.apply_ufunc wrapper for _fit.""" - stationary = kwargs["stationary"] - core_dim = kwargs["core_dim"] - - # Expected output of theta - n = 3 if stationary else 5 - - theta = apply_ufunc( - _fit, - data, - input_core_dims=[[core_dim]], - output_core_dims=[["theta"]], - vectorize=True, - dask="parallelized", - kwargs=kwargs, - output_dtypes=["float64"], - dask_gufunc_kwargs=dict(output_sizes={"theta": n}), - ) - return theta - data = kwargs.pop("data") + covariate = kwargs.pop("covariate") + covariate = _format_covariate(data, covariate, stationary, core_dim) - # Format or generate covariate - if not stationary: - if isinstance(covariate, str): - # Select coordinate in data - covariate = data[covariate] - elif covariate is None: - # Guess covariate - if core_dim in data: - covariate = data[core_dim] - else: - covariate = np.arange(data.shape[0]) - - if covariate.dtype.kind not in set("buifc"): - # Convert dates to numbers - covariate = date2num(covariate) + # Input core dimensions + if hasattr(covariate, core_dim): + # Covariate has the same core dimension as data + input_core_dims = [[core_dim], [core_dim]] + else: + # Covariate is a 1D array + input_core_dims = [[core_dim], []] - kwargs["covariate"] = covariate # Update kw dict + # Expected output of theta + n = 3 if stationary else 5 # Fit data to distribution parameters - theta = fit(data, **kwargs) + theta = apply_ufunc( + _fit, + data, + covariate, + input_core_dims=input_core_dims, + output_core_dims=[["theta"]], + vectorize=True, + dask="parallelized", + kwargs=kwargs, + output_dtypes=["float64"], + dask_gufunc_kwargs=dict(output_sizes={"theta": n}), + ) - # Return a tuple of scalars instead of a 1D data array + # Format output if len(data.shape) == 1: + # Return a tuple of scalars instead of a data array theta = np.array([i for i in theta], dtype="float64") + + if isinstance(theta, DataArray): + if stationary: + coords = ["shape", "loc", "scale"] + else: + coords = ["shape", "loc0", "loc1", "scale0", "scale1"] + theta.coords["theta"] = coords return theta -def check_gev_fit(data, theta, core_dim="time"): - """Test stationary distribution goodness of fit. +def check_gev_fit(data, params, core_dim="time", **kwargs): + """Test stationary GEV distribution goodness of fit. Parameters ---------- - data, x: array-like - Data and covariate to fit. - theta : tuple of floats - Shape, location and scale parameters. - core_dim : str, default is "time" - Data dimension to test over. + data: array_like + Data used to estimate the distribution parameters + params : tuple of floats + Shape, location and scale parameters + core_dim : str, optional + Data dimension to test over + kwargs : dict, optional + Additional keyword arguments to pass to `goodness_of_fit`. Returns ------- pvalue : scipy.stats._fit.GoodnessOfFitResult.pvalue - Goodness of fit pvalue. + Goodness of fit p-value """ - def _goodness_of_fit(data, theta): - """Test goodness of fit.""" + def _goodness_of_fit(data, params, **kwargs): + """Test GEV goodness of fit.""" # Stationary parameters - shape, loc, scale = theta + shape, loc, scale = params res = goodness_of_fit( genextreme, data, known_params=dict(c=shape, loc=loc, scale=scale), + **kwargs, ) return res.pvalue pvalue = apply_ufunc( _goodness_of_fit, data, - theta, + params, input_core_dims=[[core_dim], ["theta"]], vectorize=True, + kwargs=kwargs, dask="parallelized", dask_gufunc_kwargs=dict(meta=(np.ndarray(1, float),)), ) return pvalue -def check_gev_relative_fit(data, nll, test): - """Test relative fit of stationary and non-stationary distribution parameters. - https://doi.org/10.1016/j.jhydrol.2017.02.005 +def check_gev_relative_fit(data, L1, L2, test, alpha=0.05): + """Test relative fit of stationary and nonstationary GEV distribution. Parameters ---------- - data : array-like - Data to fit. - ll : list - Negative log-likelihood of the stationary and non-stationary models. - test : {'aic', 'bic', 'lrt'} - Method to test relative fit of stationary and non-stationary models + data : array_like + Data to use in estimating the distribution parameters + L1, L2 : float + Negative log-likelihood of the stationary and nonstationary model + test : {"aic", "bic", "lrt"} + Method to test relative fit of stationary and nonstationary models Returns ------- result : bool - True if the non-stationary model is better + If True, the nonstationary model is better + + Notes + ----- + For more information on the tests see: + Kim, H., Kim, S., Shin, H., & Heo, J. (2017). Appropriate model selection + methods for nonstationary generalized extreme value models. Journal of + Hydrology, 547, 557-574. https://doi.org/10.1016/j.jhydrol.2017.02.005 """ - result = False - if test == "lrt": - # Calculate the likelihood ratio test statistic (only valid for nested models) - d = -2 * (nll[1] - nll[0]) - if d > 1: - result = True - - elif test == "aic": - # Calculate the Alkaike Information Criterion (AIC) for each model - aic = [(2 * k) + (2 * ll) for ll, k in zip(nll, [3, 5])] - if aic[0] > aic[1]: - result = True - - elif test == "bic": - # Calculate the Bayesian Information Criterion (BIC) - bic = [k * np.log(len(data)) + (2 * ll) for ll, k in zip(nll, [3, 5])] - if bic[0] > bic[1]: - result = True + + dof = [3, 5] # Degrees of freedom of each model + + if test.casefold() == "lrt": + # Likelihood ratio test statistic + LR = -2 * (L2 - L1) + result = chi2.sf(LR, dof[1] - dof[0]) <= alpha + + elif test.casefold() == "aic": + # Akaike Information Criterion (AIC) + aic = [(2 * k) + (2 * n) for n, k in zip([L1, L2], dof)] + result = aic[0] > aic[1] + + elif test.casefold() == "bic": + # Bayesian Information Criterion (BIC) + bic = [k * np.log(len(data)) + (2 * n) for n, k in zip([L1, L2], dof)] + result = bic[0] > bic[1] return result diff --git a/unseen/tests/test_eva.py b/unseen/tests/test_eva.py index 86a611c..1396172 100644 --- a/unseen/tests/test_eva.py +++ b/unseen/tests/test_eva.py @@ -4,7 +4,7 @@ import numpy as np import numpy.testing as npt from scipy.stats import genextreme -import xarray as xr +from xarray import cftime_range, DataArray from unseen.eva import fit_gev @@ -14,9 +14,9 @@ def example_da_gev_1d(): """An example 1D GEV DataArray and distribution parameters.""" - time = xr.cftime_range(start="2000-01-01", periods=1500, freq="D") + time = cftime_range(start="2000-01-01", periods=1500, freq="D") - # Shape, location and scale parameters. + # Shape, location and scale parameters np.random.seed(0) shape = np.random.uniform() loc = np.random.uniform(-10, 10) @@ -24,7 +24,7 @@ def example_da_gev_1d(): theta = shape, loc, scale rvs = genextreme.rvs(shape, loc=loc, scale=scale, size=(time.size), random_state=0) - data = xr.DataArray(rvs, coords=[time], dims=["time"]) + data = DataArray(rvs, coords=[time], dims=["time"]) return data, theta @@ -37,11 +37,11 @@ def example_da_gev_1d_dask(): def example_da_gev_3d(): """An example 3D GEV DataArray and distribution parameters.""" - time = xr.cftime_range(start="2000-01-01", periods=1500, freq="D") + time = cftime_range(start="2000-01-01", periods=1500, freq="D") lat = np.arange(2) lon = np.arange(2) - # Shape, location and scale parameters. + # Shape, location and scale parameters size = (len(lat), len(lon)) np.random.seed(0) shape = np.random.uniform(size=size) @@ -56,7 +56,7 @@ def example_da_gev_3d(): size=(len(time), len(lat), len(lon)), random_state=0, ) - data = xr.DataArray(rvs, coords=[time, lat, lon], dims=["time", "lat", "lon"]) + data = DataArray(rvs, coords=[time, lat, lon], dims=["time", "lat", "lon"]) return data, theta @@ -69,7 +69,7 @@ def example_da_gev_3d_dask(): def add_example_gev_trend(data): trend = np.arange(data.time.size) * 2.5 / data.time.size - trend = xr.DataArray(trend, coords={"time": data.time}) + trend = DataArray(trend, coords={"time": data.time}) return data + trend @@ -77,7 +77,7 @@ def example_da_gev_forecast(): """Create example stacked forecast dataArray.""" ensemble = np.arange(3) lead_time = np.arange(5) - init_date = xr.cftime_range(start="2000-01-01", periods=24, freq="MS") + init_date = cftime_range(start="2000-01-01", periods=24, freq="MS") time = [ init_date.shift(i, freq="MS")[: len(lead_time)] for i in range(len(init_date)) ] @@ -96,7 +96,7 @@ def example_da_gev_forecast(): size=(len(ensemble), len(init_date), len(lead_time)), random_state=0, ) - data = xr.DataArray( + data = DataArray( rvs, coords=[ensemble, init_date, lead_time], dims=["ensemble", "init_date", "lead_time"], @@ -109,8 +109,25 @@ def example_da_gev_forecast(): def test_fit_gev_1d(): """Run stationary fit using 1D array & check results.""" data, theta_i = example_da_gev_1d() - theta = fit_gev(data, stationary=True, core_dim="time") - # Check fitted params match params used to create data. + theta = fit_gev(data, stationary=True) + # Check fitted params match params used to create data + npt.assert_allclose(theta, theta_i, rtol=rtol) + + +def test_fit_gev_1d_user_estimates(): + """Run stationary fit using 1D array & user_estimates.""" + data, theta_i = example_da_gev_1d() + user_estimates = list(theta_i) + theta = fit_gev(data, stationary=True, user_estimates=user_estimates) + # Check fitted params match params used to create data + npt.assert_allclose(theta, theta_i, rtol=rtol) + + +def test_fit_gev_1d_goodness(): + """Run stationary fit using 1D array & fit_goodness_test.""" + data, theta_i = example_da_gev_1d() + theta = fit_gev(data, stationary=True, test_fit_goodness=True) + # Check fitted params match params used to create data npt.assert_allclose(theta, theta_i, rtol=rtol) @@ -118,8 +135,8 @@ def test_fit_gev_1d_numpy(): """Run stationary fit using 1D np.ndarray & check results.""" data, theta_i = example_da_gev_1d() data = data.values - theta = fit_gev(data, stationary=True, core_dim=None) - # Check fitted params match params used to create data. + theta = fit_gev(data, stationary=True) + # Check fitted params match params used to create data npt.assert_allclose(theta, theta_i, rtol=rtol) @@ -127,7 +144,7 @@ def test_fit_gev_1d_dask(): """Run stationary fit using 1D dask array & check results.""" data, theta_i = example_da_gev_1d_dask() theta = fit_gev(data, stationary=True, core_dim="time") - # Check fitted params match params used to create data. + # Check fitted params match params used to create data npt.assert_allclose(theta, theta_i, rtol=rtol) @@ -135,7 +152,7 @@ def test_fit_gev_3d(): """Run stationary fit using 3D array & check results.""" data, theta_i = example_da_gev_3d() theta = fit_gev(data, stationary=True, core_dim="time") - # Check fitted params match params used to create data. + # Check fitted params match params used to create data npt.assert_allclose(theta, theta_i, rtol=rtol) @@ -143,7 +160,7 @@ def test_fit_gev_3d_dask(): """Run stationary fit using 3D dask array & check results.""" data, theta_i = example_da_gev_3d_dask() theta = fit_gev(data, stationary=True, core_dim="time") - # Check fitted params match params used to create data. + # Check fitted params match params used to create data npt.assert_allclose(theta, theta_i, rtol=rtol) @@ -159,48 +176,84 @@ def test_fit_ns_gev_1d(): core_dim="time", covariate=covariate, ) - assert np.all(theta[2] > 0) # Positive trend in mean + assert np.all(theta[2] > 0) # Positive trend in location + + +def test_fit_ns_gev_1d_loc_only(): + """Run non-stationary fit using 1D array (location parameter only).""" + data, _ = example_da_gev_1d() + data = add_example_gev_trend(data) + covariate = np.arange(data.time.size, dtype=int) + + theta = fit_gev( + data, + stationary=False, + core_dim="time", + loc1=0, + scale1=None, + covariate=covariate, + ) + assert np.all(theta[2] > 0) # Positive trend in location + assert np.all(theta[4] == 0) # No trend in scale + + +def test_fit_ns_gev_1d_scale_only(): + """Run non-stationary fit using 1D array (scale parameter only).""" + data, _ = example_da_gev_1d() + data = add_example_gev_trend(data) + covariate = np.arange(data.time.size, dtype=int) + + theta = fit_gev( + data, + stationary=False, + core_dim="time", + loc1=None, + scale1=0, + covariate=covariate, + ) + assert np.all(theta[2] == 0) # No trend in location + assert np.all(theta[4] != 0) # Nonzero trend in scale def test_fit_ns_gev_1d_dask(): """Run non-stationary fit using 1D dask array & check results.""" data, _ = example_da_gev_1d_dask() - # Add a positive linear trend. + # Add a positive linear trend data = add_example_gev_trend(data) covariate = np.arange(data.time.size, dtype=int) theta = fit_gev(data, stationary=False, covariate=covariate, core_dim="time") - assert np.all(theta[2] > 0) # Positive trend in mean + assert np.all(theta[2] > 0) # Positive trend in location def test_fit_ns_gev_3d(): """Run non-stationary fit using 3D array & check results.""" data, _ = example_da_gev_3d() - # Add a positive linear trend. + # Add a positive linear trend data = add_example_gev_trend(data) covariate = np.arange(data.time.size, dtype=int) theta = fit_gev(data, stationary=False, covariate=covariate, core_dim="time") - assert np.all(theta.isel(theta=2) > 0) # Positive trend in mean + assert np.all(theta.isel(theta=2) > 0) # Positive trend in location def test_fit_ns_gev_3d_dask(): """Run non-stationary fit using 3D dask array & check results.""" data, _ = example_da_gev_3d_dask() - # Add a positive linear trend. + # Add a positive linear trend data = add_example_gev_trend(data) covariate = np.arange(data.time.size, dtype=int) theta = fit_gev(data, stationary=False, covariate=covariate, core_dim="time") - assert np.all(theta.isel(theta=2) > 0) # Positive trend in mean + assert np.all(theta.isel(theta=2) > 0) # Positive trend in location def test_fit_ns_gev_forecast(): """Run non-stationary fit using stacked forecast dataArray & check results.""" data, _ = example_da_gev_forecast() - # Convert times to numerical timesteps. - covariate = xr.DataArray(date2num(data.time), coords={"sample": data.sample}) + # Convert times to numerical timesteps + covariate = DataArray(date2num(data.time), coords={"sample": data.sample}) # Add a positive linear trend trend = covariate / 1e2 data = data + trend data = data.sortby(data.time) covariate = covariate.sortby(data.time) theta = fit_gev(data, stationary=False, covariate=covariate, core_dim="sample") - assert np.all(theta[2] > 0) # Positive trend in mean + assert np.all(theta[2] > 0) # Positive trend in location