Skip to content

Commit

Permalink
Merge pull request #1109 from emlys/bugfix/1105
Browse files Browse the repository at this point in the history
Refactor monthly quickflow function
  • Loading branch information
dcdenu4 authored Jul 31, 2023
2 parents b2788ec + 399f7b0 commit 8045806
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 110 deletions.
8 changes: 8 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ Unreleased Changes
set to 0. The old behavior was not well documented and caused some
confusion when nodata pixels did not line up. It's safer not to fill in
unknown data. (`#1317 <https://github.com/natcap/invest/issues/1317>`_)
* Negative monthly quickflow values will now be set to 0. This is because
very small negative values occasionally result from valid data, but they
should be interpreted as 0.
(`#1318 <https://github.com/natcap/invest/issues/1318>`_)
* In the monthly quickflow calculation, QF_im will be set to 0 on any pixel
where s_i / a_im > 100. This is done to avoid overflow errors when
calculating edge cases where the result would round down to 0 anyway.
(`#1318 <https://github.com/natcap/invest/issues/1318>`_)
* Urban Flood Risk
* Fixed a bug where the model incorrectly raised an error if the
biophysical table contained a row of all 0s.
Expand Down
220 changes: 129 additions & 91 deletions src/natcap/invest/seasonal_water_yield/seasonal_water_yield.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def _execute(args):
# TypeError when n_workers is None.
n_workers = -1 # Synchronous mode.
task_graph = taskgraph.TaskGraph(
cache_dir, n_workers, reporting_interval=5.0)
cache_dir, n_workers, reporting_interval=5)

LOGGER.info('Building file registry')
file_registry = utils.build_file_registry(
Expand Down Expand Up @@ -770,14 +770,13 @@ def _execute(args):
climate_zone_rain_events_month = dict([
(cz_id, cz_rain_events_lookup[cz_id][month_label]) for
cz_id in cz_rain_events_lookup])
n_events_nodata = -1
n_events_task = task_graph.add_task(
func=utils.reclassify_raster,
args=(
(file_registry['cz_aligned_raster_path'], 1),
climate_zone_rain_events_month,
file_registry['n_events_path_list'][month_id],
gdal.GDT_Float32, n_events_nodata,
gdal.GDT_Float32, TARGET_NODATA,
reclass_error_details),
target_path_list=[
file_registry['n_events_path_list'][month_id]],
Expand Down Expand Up @@ -827,8 +826,6 @@ def _execute(args):
func=_calculate_monthly_quick_flow,
args=(
file_registry['precip_path_aligned_list'][month_index],
file_registry['lulc_aligned_path'],
file_registry['cn_path'],
file_registry['n_events_path_list'][month_index],
file_registry['stream_path'],
file_registry['si_path'],
Expand Down Expand Up @@ -858,13 +855,12 @@ def _execute(args):
kc_lookup = dict([
(lucode, biophysical_table[lucode]['kc_%d' % (month_index+1)])
for lucode in biophysical_table])
kc_nodata = -1 # a reasonable nodata value
kc_task = task_graph.add_task(
func=utils.reclassify_raster,
args=(
(file_registry['lulc_aligned_path'], 1), kc_lookup,
file_registry['kc_path_list'][month_index],
gdal.GDT_Float32, kc_nodata, reclass_error_details),
gdal.GDT_Float32, TARGET_NODATA, reclass_error_details),
target_path_list=[file_registry['kc_path_list'][month_index]],
dependent_task_list=[align_task],
task_name='classify kc month %d' % month_index)
Expand Down Expand Up @@ -978,7 +974,7 @@ def _calculate_vri(l_path, target_vri_path):
None.
"""
qb_sum = 0.0
qb_sum = 0
qb_valid_count = 0
l_nodata = pygeoprocessing.get_raster_info(l_path)['nodata'][0]

Expand Down Expand Up @@ -1039,109 +1035,154 @@ def qfi_sum_op(*qf_values):
qfi_sum_op, target_qf_path, gdal.GDT_Float32, qf_nodata)


def _calculate_monthly_quick_flow(
precip_path, lulc_raster_path, cn_path, n_events_raster_path,
stream_path, si_path, qf_monthly_path):
def _calculate_monthly_quick_flow(precip_path, n_events_path, stream_path,
si_path, qf_monthly_path):
"""Calculate quick flow for a month.
Args:
precip_path (string): path to file that correspond to monthly
precipitation
lulc_raster_path (string): path to landcover raster
cn_path (string): path to curve number raster
n_events_raster_path (string): a path to a raster where each pixel
precip_path (string): path to monthly precipitation raster
n_events_path (string): a path to a raster where each pixel
indicates the number of rain events.
stream_path (string): path to stream mask raster where 1 indicates a
stream pixel, 0 is a non-stream but otherwise valid area from the
original DEM, and nodata indicates areas outside the valid DEM.
si_path (string): path to raster that has potential maximum retention
qf_monthly_path_list (list of string): list of paths to output monthly
rasters.
qf_monthly_path (string): path to output monthly QF raster.
Returns:
None
"""
si_nodata = pygeoprocessing.get_raster_info(si_path)['nodata'][0]

qf_nodata = -1
p_nodata = pygeoprocessing.get_raster_info(precip_path)['nodata'][0]
n_events_nodata = pygeoprocessing.get_raster_info(
n_events_raster_path)['nodata'][0]
n_nodata = pygeoprocessing.get_raster_info(n_events_path)['nodata'][0]
stream_nodata = pygeoprocessing.get_raster_info(stream_path)['nodata'][0]
si_nodata = pygeoprocessing.get_raster_info(si_path)['nodata'][0]

def qf_op(p_im, s_i, n_events, stream_array):
def qf_op(p_im, s_i, n_m, stream):
"""Calculate quick flow as in Eq [1] in user's guide.
Args:
p_im (numpy.array): precipitation at pixel i on month m
s_i (numpy.array): factor that is 1000/CN_i - 10
(Equation 1b from user's guide)
n_events (numpy.array): number of rain events on the pixel
stream_mask (numpy.array): 1 if stream, otherwise not a stream
pixel.
n_m (numpy.array): number of rain events on pixel i in month m
stream (numpy.array): 1 if stream, otherwise not a stream pixel.
Returns:
quick flow (numpy.array)
"""
# s_i is an intermediate output which will always have a defined
# nodata value
valid_mask = ((p_im != 0.0) &
(stream_array != 1) &
(n_events > 0) &
~utils.array_equals_nodata(s_i, si_nodata))
if p_nodata is not None:
valid_mask &= ~utils.array_equals_nodata(p_im, p_nodata)
if n_events_nodata is not None:
valid_mask &= ~utils.array_equals_nodata(n_events, n_events_nodata)
# stream_nodata is the only input that carry over nodata values from
valid_p_mask = ~utils.array_equals_nodata(p_im, p_nodata)
valid_n_mask = ~utils.array_equals_nodata(n_m, n_nodata)
# precip mask: both p_im and n_m are defined and greater than 0
precip_mask = valid_p_mask & valid_n_mask & (p_im > 0) & (n_m > 0)
stream_mask = stream == 1
# stream_nodata is the only input that carries over nodata values from
# the aligned DEM.
if stream_nodata is not None:
valid_mask &= ~utils.array_equals_nodata(
stream_array, stream_nodata)

valid_n_events = n_events[valid_mask]
valid_si = s_i[valid_mask]

valid_mask = (
valid_p_mask &
valid_n_mask &
~utils.array_equals_nodata(stream, stream_nodata) &
~utils.array_equals_nodata(s_i, si_nodata))

# QF is defined in terms of three cases:
#
# 1. Where there is no precipitation, QF = 0
# (even if stream or s_i is undefined)
#
# 2. Where there is precipitation and we're on a stream, QF = P
# (even if s_i is undefined)
#
# 3. Where there is precipitation and we're not on a stream, use the
# quickflow equation (only if all four inputs are defined):
# QF_im = 25.4 * n_m * (
# (a_im - s_i) * exp(-0.2 * s_i / a_im) +
# s_i^2 / a_im * exp(0.8 * s_i / a_im) * E1(s_i / a_im)
# )
#
# When evaluating the QF equation, there are a few edge cases:
#
# 3a. Where s_i = 0, you get NaN and a warning from numpy because
# E1(0 / a_im) = infinity. In this case, per conversation with
# Rafa, the final term of the equation should evaluate to 0, and
# the equation can be simplified to QF_im = P_im
# (which makes sense because if s_i = 0, no water is retained).
#
# Solution: Preemptively set QF_im equal to P_im where s_i = 0 in
# order to avoid calculations with infinity.
#
# 3b. When the ratio s_i / a_im becomes large, QF approaches 0.
# [NOTE: I don't know how to prove this mathematically, but it
# holds true when I tested with reasonable values of s_i and a_im].
# The exp() term becomes very large, while the E1() term becomes
# very small.
#
# Per conversation with Rafa and Lisa, large s_i / a_im ratios
# shouldn't happen often with real world data. But if they did, it
# would be a situation where there is very little precipitation
# spread out over relatively many rain events and the soil is very
# absorbent, so logically, QF should be effectively zero.
#
# To avoid overflow, we set a threshold of 100 for the s_i / a_im
# ratio. Where s_i / a_im > 100, we set QF to 0. 100 was chosen
# because it's a nice whole number that gets us close to the
# float32 max without surpassing it (exp(0.8*100) = 5e34). When
# s_i / a_im = 100, the actual result of the QF equation is on the
# order of 1e-6, so it should be rounded down to 0 anyway.
#
# 3c. Otherwise, evaluate the QF equation as usual.
#
# 3d. With certain inputs [for example: n_m = 10, CN = 50, p_im = 30],
# it's possible that the QF equation evaluates to a very small
# negative value. Per conversation with Lisa and Rafa, this is an
# edge case that the equation was not designed for. Negative QF
# doesn't make sense, so we set any negative QF values to 0.

# qf_im is the quickflow at pixel i on month m
qf_im = numpy.full(p_im.shape, TARGET_NODATA, dtype=numpy.float32)

# case 1: where there is no precipitation
qf_im[~precip_mask] = 0

# case 2: where there is precipitation and we're on a stream
qf_im[precip_mask & stream_mask] = p_im[precip_mask & stream_mask]

# case 3: where there is precipitation and we're not on a stream
case_3_mask = valid_mask & precip_mask & ~stream_mask

# for consistent indexing, make a_im the same shape as the other
# arrays even though we only use it in case 3
a_im = numpy.full(p_im.shape, numpy.nan, dtype=numpy.float32)
# a_im is the mean rain depth on a rainy day at pixel i on month m
# the 25.4 converts inches to mm since Si is in inches
a_im = numpy.empty(valid_n_events.shape)
a_im = p_im[valid_mask] / (valid_n_events * 25.4)
qf_im = numpy.empty(p_im.shape)
qf_im[:] = qf_nodata

# Precompute the last two terms in quickflow so we can handle a
# numerical instability when s_i is large and/or a_im is small
# on large valid_si/a_im this number will be zero and the latter
# exponent will also be zero because of a divide by zero. rather than
# raise that numerical warning, just handle it manually
E1 = scipy.special.expn(1, valid_si / a_im)
E1[valid_si == 0] = 0
nonzero_e1_mask = E1 != 0
exp_result = numpy.zeros(valid_si.shape)
exp_result[nonzero_e1_mask] = numpy.exp(
(0.8 * valid_si[nonzero_e1_mask]) / a_im[nonzero_e1_mask] +
numpy.log(E1[nonzero_e1_mask]))

# qf_im is the quickflow at pixel i on month m Eq. [1]
qf_im[valid_mask] = (25.4 * valid_n_events * (
(a_im - valid_si) * numpy.exp(-0.2 * valid_si / a_im) +
valid_si ** 2 / a_im * exp_result))

# if precip is 0, then QF should be zero
qf_im[(p_im == 0) | (n_events == 0)] = 0.0
# if we're on a stream, set quickflow to the precipitation
valid_stream_precip_mask = stream_array == 1
if p_nodata is not None:
valid_stream_precip_mask &= ~utils.array_equals_nodata(
p_im, p_nodata)
qf_im[valid_stream_precip_mask] = p_im[valid_stream_precip_mask]
# the 25.4 converts inches to mm since s_i is in inches
a_im[case_3_mask] = p_im[case_3_mask] / (n_m[case_3_mask] * 25.4)

# case 3a: when s_i = 0, qf = p
case_3a_mask = case_3_mask & (s_i == 0)
qf_im[case_3a_mask] = p_im[case_3a_mask]

# case 3b: set quickflow to 0 when the s_i/a_im ratio is too large
case_3b_mask = case_3_mask & (s_i / a_im > 100)
qf_im[case_3b_mask] = 0

# case 3c: evaluate the equation as usual
case_3c_mask = case_3_mask & ~(case_3a_mask | case_3b_mask)
qf_im[case_3c_mask] = (
25.4 * n_m[case_3c_mask] * (
((a_im[case_3c_mask] - s_i[case_3c_mask]) *
numpy.exp(-0.2 * s_i[case_3c_mask] / a_im[case_3c_mask])) +
(s_i[case_3c_mask] ** 2 / a_im[case_3c_mask] *
numpy.exp(0.8 * s_i[case_3c_mask] / a_im[case_3c_mask]) *
scipy.special.exp1(s_i[case_3c_mask] / a_im[case_3c_mask]))
)
)

# case 3d: set any negative values to 0
qf_im[valid_mask & (qf_im < 0)] = 0

return qf_im

pygeoprocessing.raster_calculator(
[(path, 1) for path in [
precip_path, si_path, n_events_raster_path, stream_path]], qf_op,
qf_monthly_path, gdal.GDT_Float32, qf_nodata)
precip_path, si_path, n_events_path, stream_path]],
qf_op, qf_monthly_path, gdal.GDT_Float32, TARGET_NODATA)


def _calculate_curve_number_raster(
Expand Down Expand Up @@ -1172,7 +1213,6 @@ def _calculate_curve_number_raster(
4: 'cn_d',
}
# curve numbers are always positive so -1 a good nodata choice
cn_nodata = -1
lulc_to_soil = {}
lulc_nodata = pygeoprocessing.get_raster_info(
lulc_raster_path)['nodata'][0]
Expand All @@ -1195,7 +1235,7 @@ def _calculate_curve_number_raster(
else:
# handle the lulc nodata with cn nodata
lulc_to_soil[soil_id]['lulc_values'].append(lulc_nodata)
lulc_to_soil[soil_id]['cn_values'].append(cn_nodata)
lulc_to_soil[soil_id]['cn_values'].append(TARGET_NODATA)

# Making the landcover array a float32 in case the user provides a
# float landcover map like Kate did.
Expand All @@ -1213,7 +1253,7 @@ def _calculate_curve_number_raster(
def cn_op(lulc_array, soil_group_array):
"""Map lulc code and soil to a curve number."""
cn_result = numpy.empty(lulc_array.shape)
cn_result[:] = cn_nodata
cn_result[:] = TARGET_NODATA

# if lulc_array value not in lulc_to_soil[soil_group_id]['lulc_values']
# then numpy.digitize will not bin properly and cause an IndexError
Expand Down Expand Up @@ -1252,10 +1292,9 @@ def cn_op(lulc_array, soil_group_array):
cn_result[current_soil_mask] = cn_values[current_soil_mask]
return cn_result

cn_nodata = -1
pygeoprocessing.raster_calculator(
[(lulc_raster_path, 1), (soil_group_path, 1)], cn_op, cn_path,
gdal.GDT_Float32, cn_nodata)
gdal.GDT_Float32, TARGET_NODATA)


def _calculate_si_raster(cn_path, stream_path, si_path):
Expand All @@ -1269,7 +1308,6 @@ def _calculate_si_raster(cn_path, stream_path, si_path):
Returns:
None
"""
si_nodata = -1
cn_nodata = pygeoprocessing.get_raster_info(cn_path)['nodata'][0]

def si_op(ci_factor, stream_mask):
Expand All @@ -1278,17 +1316,17 @@ def si_op(ci_factor, stream_mask):
~utils.array_equals_nodata(ci_factor, cn_nodata) &
(ci_factor > 0))
si_array = numpy.empty(ci_factor.shape)
si_array[:] = si_nodata
si_array[:] = TARGET_NODATA
# multiply by the stream mask != 1 so we get 0s on the stream and
# unaffected results everywhere else
si_array[valid_mask] = (
(1000.0 / ci_factor[valid_mask] - 10) * (
(1000 / ci_factor[valid_mask] - 10) * (
stream_mask[valid_mask] != 1))
return si_array

pygeoprocessing.raster_calculator(
[(cn_path, 1), (stream_path, 1)], si_op, si_path, gdal.GDT_Float32,
si_nodata)
TARGET_NODATA)


def _aggregate_recharge(
Expand Down Expand Up @@ -1350,7 +1388,7 @@ def _aggregate_recharge(
"no coverage for polygon %s", ', '.join(
[str(poly_feat.GetField(_)) for _ in range(
poly_feat.GetFieldCount())]))
value = 0.0
value = 0
elif op_type == 'sum':
value = aggregate_stats[poly_index]['sum']
poly_feat.SetField(aggregate_field_id, float(value))
Expand Down
Loading

0 comments on commit 8045806

Please sign in to comment.