Skip to content

Commit

Permalink
Retrieve path to the hazard event data source (#288)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Floriane Perrin de Brichambaut <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
Floflow and pre-commit-ci[bot] authored Jun 14, 2024
1 parent 6758c32 commit cbea297
Show file tree
Hide file tree
Showing 18 changed files with 113 additions and 59 deletions.
1 change: 1 addition & 0 deletions src/physrisk/api/v1/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class HazardEventDistrib(BaseModel):

intensity_bin_edges: np.ndarray = Field(default_factory=lambda: np.zeros(10), description="")
probabilities: np.ndarray = Field(default_factory=lambda: np.zeros(10), description="")
path: List[str] = Field([], description="Path to the hazard indicator data source.")

class Config:
arbitrary_types_allowed = True
Expand Down
1 change: 1 addition & 0 deletions src/physrisk/api/v1/exposure_req_resp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class AssetExposureRequest(BaseModel):
class Exposure(BaseModel):
category: str
value: Optional[float]
path: str = Field("unknown", description="Path to the hazard indicator data source.")


class AssetExposure(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions src/physrisk/api/v1/impact_req_resp.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class AcuteHazardCalculationDetails(BaseModel):
hazard_exceedance: ExceedanceCurve
hazard_distribution: Distribution
vulnerability_distribution: VulnerabilityDistrib
hazard_path: List[str] = Field("unknown", description="Path to the hazard indicator data source.")


class ImpactKey(BaseModel):
Expand Down
6 changes: 4 additions & 2 deletions src/physrisk/data/hazard_data_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def get_intensity_curves(
curves: numpy array of intensity (no. coordinate pairs, no. return periods).
return_periods: return periods in years.
units: units.
path: path to the hazard indicator data source.
"""

path = self._get_source_path(indicator_id=indicator_id, scenario=scenario, year=year, hint=hint)
Expand All @@ -118,7 +119,7 @@ def get_intensity_curves(
],
self._interpolation,
) # type: ignore
return curves, return_periods, units
return curves, return_periods, units, path


class ChronicHazardDataProvider(HazardDataProvider):
Expand Down Expand Up @@ -157,8 +158,9 @@ def get_parameters(
parameters: numpy array of parameters.
defns: numpy array defining the parameters (e.g. provides thresholds).
units: units of the parameters.
path: path to the hazard indicator data source.
"""

path = self._get_source_path(indicator_id=indicator_id, scenario=scenario, year=year, hint=hint)
parameters, defns, units = self._reader.get_curves(path, longitudes, latitudes, self._interpolation)
return parameters, defns, units
return parameters, defns, units, path
10 changes: 6 additions & 4 deletions src/physrisk/data/pregenerated_hazard_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def _get_hazard_events_batch(
longitudes = [req.longitude for req in batch]
latitudes = [req.latitude for req in batch]
if hazard_type.kind == HazardKind.acute: # type: ignore
intensities, return_periods, units = self.acute_hazard_data_providers[hazard_type].get_intensity_curves(
intensities, return_periods, units, path = self.acute_hazard_data_providers[
hazard_type
].get_intensity_curves(
longitudes,
latitudes,
indicator_id=indicator_id,
Expand All @@ -99,15 +101,15 @@ def _get_hazard_events_batch(
valid_periods, valid_intensities = return_periods[valid], intensities[i, :][valid]
if len(valid_periods) == 0:
valid_periods, valid_intensities = np.array([100]), np.array([0])
responses[req] = HazardEventDataResponse(valid_periods, valid_intensities, units)
responses[req] = HazardEventDataResponse(valid_periods, valid_intensities, units, path)
elif hazard_type.kind == HazardKind.chronic: # type: ignore
parameters, defns, units = self.chronic_hazard_data_providers[hazard_type].get_parameters(
parameters, defns, units, path = self.chronic_hazard_data_providers[hazard_type].get_parameters(
longitudes, latitudes, indicator_id=indicator_id, scenario=scenario, year=year, hint=hint
)

for i, req in enumerate(batch):
valid = ~np.isnan(parameters[i, :])
responses[req] = HazardParameterDataResponse(parameters[i, :][valid], defns[valid], units)
responses[req] = HazardParameterDataResponse(parameters[i, :][valid], defns[valid], units, path)
except Exception as err:
# e.g. the requested data is unavailable
for i, req in enumerate(batch):
Expand Down
12 changes: 7 additions & 5 deletions src/physrisk/kernel/exposure.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ class Bounds:

@dataclass
class AssetExposureResult:
hazard_categories: Dict[type, Tuple[Category, float]]
hazard_categories: Dict[type, Tuple[Category, float, str]]


class ExposureMeasure(DataRequester):
@abstractmethod
def get_exposures(
self, asset: Asset, data_responses: Iterable[HazardDataResponse]
) -> Dict[type, Tuple[Category, float]]: ...
) -> Dict[type, Tuple[Category, float, str]]: ...


class JupterExposureMeasure(ExposureMeasure):
Expand All @@ -72,21 +72,23 @@ def get_data_requests(self, asset: Asset, *, scenario: str, year: int) -> Iterab
]

def get_exposures(self, asset: Asset, data_responses: Iterable[HazardDataResponse]):
result: Dict[type, Tuple[Category, float]] = {}
result: Dict[type, Tuple[Category, float, str]] = {}
for (k, v), resp in zip(self.exposure_bins.items(), data_responses):
if isinstance(resp, HazardParameterDataResponse):
param = resp.parameter
hazard_path = resp.path
elif isinstance(resp, HazardEventDataResponse):
if len(resp.intensities) > 1:
raise ValueError("single-value curve expected")
param = resp.intensities[0]
hazard_path = resp.path
(hazard_type, _) = k
(lower_bounds, categories) = v
if math.isnan(param):
result[hazard_type] = (Category.NODATA, float(param))
result[hazard_type] = (Category.NODATA, float(param), hazard_path)
else:
index = np.searchsorted(lower_bounds, param, side="right") - 1
result[hazard_type] = (categories[index], float(param))
result[hazard_type] = (categories[index], float(param), hazard_path)
return result

def get_exposure_bins(self):
Expand Down
18 changes: 14 additions & 4 deletions src/physrisk/kernel/hazard_event_distrib.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,27 @@ class HazardEventDistrib:
"""Intensity distribution of a hazard event (e.g. inundation depth, wind speed etc),
specific to an asset -- that is, at the location of the asset."""

__slots__ = ["__event_type", "__intensity_bins", "__prob", "__exceedance"]
__slots__ = ["__event_type", "__intensity_bins", "__prob", "__path", "__exceedance"]

def __init__(
self, event_type: type, intensity_bins: Union[List[float], np.ndarray], prob: Union[List[float], np.ndarray]
self,
event_type: type,
intensity_bins: Union[List[float], np.ndarray],
prob: Union[List[float], np.ndarray],
path: List[str],
):
"""Create a new asset event distribution.
Args:
event_type: type of event
intensity_bins: non-decreasing intensity bin edges.
intensity_bins: non-decreasing intensity bin edgess.
e.g. bin edges [1.0, 1.5, 2.0] imply two bins: 1.0 < i <= 1.5, 1.5 < i <= 2.0
prob: (annual) probability of occurrence for each intensity bin with size [len(intensity_bins) - 1]
prob: (annual) probability of occurrence for each intensity bin with size [len(intensity_bins) - 1].
path: path to the hazard indicator data source.
"""
self.__event_type = event_type
self.__intensity_bins = np.array(intensity_bins)
self.__prob = np.array(prob)
self.__path = path

def intensity_bins(self):
return zip(self.__intensity_bins[0:-1], self.__intensity_bins[1:])
Expand All @@ -38,3 +44,7 @@ def intensity_bin_edges(self) -> np.ndarray:
@property
def prob(self) -> np.ndarray:
return self.__prob

@property
def path(self) -> List[str]:
return self.__path
16 changes: 14 additions & 2 deletions src/physrisk/kernel/hazard_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,33 @@ def __init__(self, err: Exception):
class HazardEventDataResponse(HazardDataResponse):
"""Response to HazardDataRequest for acute hazards."""

def __init__(self, return_periods: np.ndarray, intensities: np.ndarray, units: str = "default"):
def __init__(
self, return_periods: np.ndarray, intensities: np.ndarray, units: str = "default", path: str = "unknown"
):
"""Create HazardEventDataResponse.
Args:
return_periods: return periods in years.
intensities: hazard event intensity for each return period, or set of hazard event intensities corresponding to different events. # noqa: E501
path: path to the hazard indicator data source.
"""

self.return_periods = return_periods
self.intensities = intensities
self.units = sys.intern(units)
self.path = sys.intern(path)


class HazardParameterDataResponse(HazardDataResponse):
"""Response to HazardDataRequest."""

def __init__(self, parameters: np.ndarray, param_defns: np.ndarray = np.empty([]), units: str = "default"):
def __init__(
self,
parameters: np.ndarray,
param_defns: np.ndarray = np.empty([]),
units: str = "default",
path: str = "unknown",
):
"""Create HazardParameterDataResponse. In general the chronic parameters are an array of values.
For example, a chronic hazard may be the number of days per year with average temperature
above :math:`x' degrees for :math:`x' in [25, 30, 35, 40]°C. In this case the param_defns would
Expand All @@ -98,10 +108,12 @@ def __init__(self, parameters: np.ndarray, param_defns: np.ndarray = np.empty([]
Args:
parameters (np.ndarray): Chronic hazard parameter values.
param_defns (np.ndarray): Chronic hazard parameter definitions.
path: path to the hazard indicator data source.
"""
self.parameters = parameters
self.param_defns = param_defns
self.units = sys.intern(units)
self.path = sys.intern(path)

@property
def parameter(self) -> float:
Expand Down
9 changes: 8 additions & 1 deletion src/physrisk/kernel/impact_distrib.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,28 @@ class ImpactType(Enum):
class ImpactDistrib:
"""Impact distributions specific to an asset."""

__slots__ = ["__hazard_type", "__impact_bins", "__prob", "impact_type"]
__slots__ = ["__hazard_type", "__impact_bins", "__prob", "impact_type", "__path"]

def __init__(
self,
hazard_type: type,
impact_bins: Union[List[float], np.ndarray],
prob: Union[List[float], np.ndarray],
path: List[str],
impact_type: ImpactType = ImpactType.damage,
):
"""Create a new asset event distribution.
Args:
event_type: type of event
impact_bins: non-decreasing impact bin bounds
prob: probabilities with size [len(intensity_bins) - 1]
path: path to the hazard indicator data source
"""
self.__hazard_type = hazard_type
self.__impact_bins = np.array(impact_bins)
self.impact_type = impact_type
self.__prob = np.array(prob)
self.__path = path

def impact_bins_explicit(self):
return zip(self.__impact_bins[0:-1], self.__impact_bins[1:])
Expand Down Expand Up @@ -74,6 +77,10 @@ def impact_bins(self) -> np.ndarray:
def prob(self) -> np.ndarray:
return self.__prob

@property
def path(self) -> List[str]:
return self.__path


class EmptyImpactDistrib(ImpactDistrib):
def __init__(self):
Expand Down
10 changes: 7 additions & 3 deletions src/physrisk/kernel/vulnerability_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ def get_impact_details(
impact_prob = vulnerability_dist.prob_matrix.T @ event_dist.prob
return (
ImpactDistrib(
vulnerability_dist.event_type, vulnerability_dist.impact_bins, impact_prob, impact_type=self.impact_type
vulnerability_dist.event_type,
vulnerability_dist.impact_bins,
impact_prob,
event_dist.path,
impact_type=self.impact_type,
),
vulnerability_dist,
event_dist,
Expand Down Expand Up @@ -225,7 +229,7 @@ def get_distributions(
self.get_impact_curve(intensity_bin_centres, asset).to_prob_matrix(self.impact_bin_edges),
)

event = HazardEventDistrib(self.hazard_type, intensity_bin_edges, probs)
event = HazardEventDistrib(self.hazard_type, intensity_bin_edges, probs, [event_data.path])
return vul, event

@abstractmethod
Expand Down Expand Up @@ -311,5 +315,5 @@ def get_distributions(
vul = VulnerabilityDistrib(
type(self.hazard_type), intensity_bin_edges, impact_bins_edges, np.eye(len(impact_bins_edges) - 1)
)
event = HazardEventDistrib(type(self.hazard_type), intensity_bin_edges, probs)
event = HazardEventDistrib(type(self.hazard_type), intensity_bin_edges, probs, [event_data.path])
return vul, event
4 changes: 3 additions & 1 deletion src/physrisk/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ def _get_asset_exposures(
AssetExposure(
asset_id="",
exposures=dict(
(t.__name__, Exposure(category=c.name, value=v)) for (t, (c, v)) in r.hazard_categories.items()
(t.__name__, Exposure(category=c.name, value=v, path=p))
for (t, (c, v, p)) in r.hazard_categories.items()
),
)
for (a, r) in results.items()
Expand Down Expand Up @@ -382,6 +383,7 @@ def compile_asset_impacts(impacts: Dict[ImpactKey, AssetImpactResult], assets: L
),
hazard_distribution=Distribution(bin_edges=v.event.intensity_bin_edges, probabilities=v.event.prob),
vulnerability_distribution=vulnerability_distribution,
hazard_path=v.impact.path,
)
else:
calc_details = None
Expand Down
19 changes: 14 additions & 5 deletions src/physrisk/vulnerability_models/chronic_heat_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def get_impact(self, asset: Asset, data_responses: List[HazardDataResponse]) ->
fraction_loss_mean = (delta_dd_above_mean * self.time_lost_per_degree_day) / hours_worked
fraction_loss_std = (delta_dd_above_mean * self.time_lost_per_degree_day_se) / hours_worked

return get_impact_distrib(fraction_loss_mean, fraction_loss_std, ChronicHeat, ImpactType.disruption)
return get_impact_distrib(
fraction_loss_mean, fraction_loss_std, ChronicHeat, [scenario_dd_above_mean.path], ImpactType.disruption
)


class ChronicHeatWBGTGZNModel(ChronicHeatGZNModel):
Expand Down Expand Up @@ -168,6 +170,8 @@ def get_impact(self, asset: Asset, data_responses: List[HazardDataResponse]) ->
assert isinstance(asset, IndustrialActivity)
wbgt_responses = [cast(HazardParameterDataResponse, r) for r in data_responses[2:]]

hazard_paths = [cast(HazardParameterDataResponse, r).path for r in data_responses]

baseline_dd_above_mean = cast(HazardParameterDataResponse, data_responses[0])
scenario_dd_above_mean = cast(HazardParameterDataResponse, data_responses[1])

Expand Down Expand Up @@ -232,7 +236,7 @@ def get_impact(self, asset: Asset, data_responses: List[HazardDataResponse]) ->

total_work_loss_delta: float = baseline_work_ability - scenario_work_ability

return get_impact_distrib(total_work_loss_delta, std_delta, self.hazard_type, self.impact_type)
return get_impact_distrib(total_work_loss_delta, std_delta, self.hazard_type, hazard_paths, self.impact_type)


def two_variable_joint_variance(ex, varx, ey, vary):
Expand All @@ -243,15 +247,20 @@ def two_variable_joint_variance(ex, varx, ey, vary):


def get_impact_distrib(
fraction_loss_mean: float, fraction_loss_std: float, hazard_type: type, impact_type: ImpactType
fraction_loss_mean: float,
fraction_loss_std: float,
hazard_type: type,
hazard_paths: List[str],
impact_type: ImpactType,
) -> ImpactDistrib:
"""Calculate impact (disruption) of asset based on the hazard data returned.
Args:
fraction_loss_mean: mean of the impact distribution
fraction_loss_std: standard deviation of the impact distribution
hazard_type: Hazard Type.
hazard_type: Hazard Type.get_impact_distrib
impact_type: Impact Type.
hazard_paths: path to the hazard indicator data source.
Returns:
Probability distribution of impacts.
Expand Down Expand Up @@ -282,4 +291,4 @@ def get_impact_distrib(
else:
probs[0] = probs[0] + prob_differential

return ImpactDistrib(hazard_type, impact_bins, probs, impact_type)
return ImpactDistrib(hazard_type, impact_bins, probs, hazard_paths, impact_type)
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def get_distributions(self, asset: Asset, event_data_responses: Iterable[HazardD
probs_protected = np.where(depth_bins[1:] <= protection_depth, 0.0, 1.0)

vul = VulnerabilityDistrib(RiverineInundation, depth_bins, impact_bins, np.diag(probs_protected))
event = HazardEventDistrib(RiverineInundation, depth_bins, probs)
event = HazardEventDistrib(RiverineInundation, depth_bins, probs, [future.path])

return vul, event

Expand Down
2 changes: 1 addition & 1 deletion src/physrisk/vulnerability_models/real_estate_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,4 @@ def get_impact(self, asset: Asset, data_responses: List[HazardDataResponse]) ->
heat_transfer = deg_days * self._default_transfer_coeff * 24 / 1000 # kWh of heat removed from asset
annual_electricity = heat_transfer / self._default_cooling_cop # kWh of electricity required for heat removal
# this is non-probabilistic model: probability of 1 of electricity use
return ImpactDistrib(ChronicHeat, [annual_electricity, annual_electricity], [1])
return ImpactDistrib(ChronicHeat, [annual_electricity, annual_electricity], [1], [data.path])
Loading

0 comments on commit cbea297

Please sign in to comment.