Skip to content

Commit

Permalink
[526] refactored pearson_parameters to reduce cognitive complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
james.adams committed Aug 9, 2023
1 parent 6dc5934 commit ead6196
Showing 1 changed file with 161 additions and 76 deletions.
237 changes: 161 additions & 76 deletions src/climate_indices/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,37 @@ def _probability_of_zero(
return probabilities_of_zero


# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def reshape_values(values, periodicity):
if periodicity is Periodicity.monthly:
return utils.reshape_to_2d(values, 12)
elif periodicity is Periodicity.daily:
return utils.reshape_to_2d(values, 366)
else:
raise ValueError(f"Invalid periodicity argument: {periodicity}")

def validate_values_shape(values):
if len(values.shape) != 2 or values.shape[1] not in (12, 366):
_log_and_raise_shape_error(shape=values.shape)
return values.shape[1]

def adjust_calibration_years(data_start_year, data_end_year, calibration_start_year, calibration_end_year):
if (calibration_start_year < data_start_year) or (calibration_end_year > data_end_year):
return data_start_year, data_end_year
return calibration_start_year, calibration_end_year

def calculate_time_step_params(time_step_values):
number_of_zeros, number_of_non_missing = utils.count_zeros_and_non_missings(time_step_values)
if (number_of_non_missing - number_of_zeros) < 4:
return 0.0, 0.0, 0.0, 0.0

probability_of_zero = number_of_zeros / number_of_non_missing if number_of_zeros > 0 else 0.0

if (number_of_non_missing - number_of_zeros) > 3:
params = lmoments.fit(time_step_values)
return probability_of_zero, params["loc"], params["scale"], params["skew"]
return 0.0, 0.0, 0.0, 0.0

def pearson_parameters(
values: np.ndarray,
data_start_year: int,
Expand Down Expand Up @@ -254,95 +285,149 @@ def pearson_parameters(
returned array 3 :second Pearson Type III distribution parameter (scale)
returned array 4: third Pearson Type III distribution parameter (skew)
"""

# reshape precipitation values to (years, 12) for monthly,
# or to (years, 366) for daily
if periodicity is Periodicity.monthly:

values = utils.reshape_to_2d(values, 12)

elif periodicity is Periodicity.daily:

values = utils.reshape_to_2d(values, 366)

else:

raise ValueError("Invalid periodicity argument: %s" % periodicity)

# validate that the values array has shape: (years, 12) for monthly or (years, 366) for daily
if len(values.shape) != 2:
_log_and_raise_shape_error(shape=values.shape)

else:

time_steps_per_year = values.shape[1]
if time_steps_per_year not in (12, 366):
_log_and_raise_shape_error(shape=values.shape)

# determine the end year of the values array
values = reshape_values(values, periodicity)
time_steps_per_year = validate_values_shape(values)
data_end_year = data_start_year + values.shape[0]

# make sure that we have data within the full calibration period,
# otherwise use the full period of record
if (calibration_start_year < data_start_year) or \
(calibration_end_year > data_end_year):
calibration_start_year = data_start_year
calibration_end_year = data_end_year

# get the year axis indices corresponding to
# the calibration start and end years
calibration_start_year, calibration_end_year = adjust_calibration_years(data_start_year, data_end_year, calibration_start_year, calibration_end_year)
calibration_begin_index = calibration_start_year - data_start_year
calibration_end_index = (calibration_end_year - data_start_year) + 1

# get the values for the current calendar time step
# that fall within the calibration years period
calibration_values = values[calibration_begin_index:calibration_end_index, :]

# the values we'll compute and return
probabilities_of_zero = np.zeros((time_steps_per_year,))
locs = np.zeros((time_steps_per_year,))
scales = np.zeros((time_steps_per_year,))
skews = np.zeros((time_steps_per_year,))

# compute the probability of zero and Pearson
# parameters for each calendar time step
# TODO vectorize the below loop? create a @numba.vectorize() ufunc
# for application over the second axis
for time_step_index in range(time_steps_per_year):

# get the values for the current calendar time step
time_step_values = calibration_values[:, time_step_index]

# count the number of zeros and valid (non-missing/non-NaN) values
number_of_zeros, number_of_non_missing = \
utils.count_zeros_and_non_missings(time_step_values)

# make sure we have at least four values that are both non-missing (i.e. non-NaN)
# and non-zero, otherwise use the entire period of record
if (number_of_non_missing - number_of_zeros) < 4:

# we can't proceed, bail out using zeros
continue

# calculate the probability of zero for the calendar time step
probability_of_zero = 0.0
if number_of_zeros > 0:

probability_of_zero = number_of_zeros / number_of_non_missing

# get the estimated L-moments, if we have
# more than three non-missing/non-zero values
if (number_of_non_missing - number_of_zeros) > 3:

# get the Pearson Type III parameters for this time
# step's values within the calibration period
params = lmoments.fit(time_step_values)
probabilities_of_zero[time_step_index] = probability_of_zero
locs[time_step_index] = params["loc"]
scales[time_step_index] = params["scale"]
skews[time_step_index] = params["skew"]
prob, loc, scale, skew = calculate_time_step_params(time_step_values)
probabilities_of_zero[time_step_index] = prob
locs[time_step_index] = loc
scales[time_step_index] = scale
skews[time_step_index] = skew

return probabilities_of_zero, locs, scales, skews
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


# def pearson_parameters_previous(
# values: np.ndarray,
# data_start_year: int,
# calibration_start_year: int,
# calibration_end_year: int,
# periodicity: Periodicity,
# ) -> (np.ndarray, np.ndarray, np.ndarray, np.ndarray):
# """
# This function computes the probability of zero and Pearson Type III
# distribution parameters corresponding to an array of values.
#
# :param values: 2-D array of values, with each row representing a year
# containing either 12 values corresponding to the calendar months of
# that year, or 366 values corresponding to the days of the year
# (with Feb. 29th being an average of the Feb. 28th and Mar. 1st values for
# non-leap years) and assuming that the first value of the array is
# January of the initial year for an input array of monthly values or
# Jan. 1st of initial year for an input array daily values
# :param data_start_year:
# :param calibration_start_year:
# :param calibration_end_year:
# :param periodicity: monthly or daily
# :return: four 1-D array of fitting values for the Pearson Type III
# distribution, with shape (12,) for monthly or (366,) for daily
#
# returned array 1: probability of zero
# returned array 2: first Pearson Type III distribution parameter (loc)
# returned array 3 :second Pearson Type III distribution parameter (scale)
# returned array 4: third Pearson Type III distribution parameter (skew)
# """
#
# # reshape precipitation values to (years, 12) for monthly,
# # or to (years, 366) for daily
# if periodicity is Periodicity.monthly:
#
# values = utils.reshape_to_2d(values, 12)
#
# elif periodicity is Periodicity.daily:
#
# values = utils.reshape_to_2d(values, 366)
#
# else:
#
# raise ValueError("Invalid periodicity argument: %s" % periodicity)
#
# # validate that the values array has shape: (years, 12) for monthly or (years, 366) for daily
# if len(values.shape) != 2:
# _log_and_raise_shape_error(shape=values.shape)
#
# else:
#
# time_steps_per_year = values.shape[1]
# if time_steps_per_year not in (12, 366):
# _log_and_raise_shape_error(shape=values.shape)
#
# # determine the end year of the values array
# data_end_year = data_start_year + values.shape[0]
#
# # make sure that we have data within the full calibration period,
# # otherwise use the full period of record
# if (calibration_start_year < data_start_year) or \
# (calibration_end_year > data_end_year):
# calibration_start_year = data_start_year
# calibration_end_year = data_end_year
#
# # get the year axis indices corresponding to
# # the calibration start and end years
# calibration_begin_index = calibration_start_year - data_start_year
# calibration_end_index = (calibration_end_year - data_start_year) + 1
#
# # get the values for the current calendar time step
# # that fall within the calibration years period
# calibration_values = values[calibration_begin_index:calibration_end_index, :]
#
# # the values we'll compute and return
# probabilities_of_zero = np.zeros((time_steps_per_year,))
# locs = np.zeros((time_steps_per_year,))
# scales = np.zeros((time_steps_per_year,))
# skews = np.zeros((time_steps_per_year,))
#
# # compute the probability of zero and Pearson
# # parameters for each calendar time step
# # TODO vectorize the below loop? create a @numba.vectorize() ufunc
# # for application over the second axis
# for time_step_index in range(time_steps_per_year):
#
# # get the values for the current calendar time step
# time_step_values = calibration_values[:, time_step_index]
#
# # count the number of zeros and valid (non-missing/non-NaN) values
# number_of_zeros, number_of_non_missing = \
# utils.count_zeros_and_non_missings(time_step_values)
#
# # make sure we have at least four values that are both non-missing (i.e. non-NaN)
# # and non-zero, otherwise use the entire period of record
# if (number_of_non_missing - number_of_zeros) < 4:
#
# # we can't proceed, bail out using zeros
# continue
#
# # calculate the probability of zero for the calendar time step
# probability_of_zero = 0.0
# if number_of_zeros > 0:
#
# probability_of_zero = number_of_zeros / number_of_non_missing
#
# # get the estimated L-moments, if we have
# # more than three non-missing/non-zero values
# if (number_of_non_missing - number_of_zeros) > 3:
#
# # get the Pearson Type III parameters for this time
# # step's values within the calibration period
# params = lmoments.fit(time_step_values)
# probabilities_of_zero[time_step_index] = probability_of_zero
# locs[time_step_index] = params["loc"]
# scales[time_step_index] = params["scale"]
# skews[time_step_index] = params["skew"]
#
# return probabilities_of_zero, locs, scales, skews


def _minimum_possible(
Expand Down

0 comments on commit ead6196

Please sign in to comment.