From 8f69e603feeda4152c105d3b01e4507eaf9e4c61 Mon Sep 17 00:00:00 2001 From: Joe Moorhouse <5102656+joemoorhouse@users.noreply.github.com> Date: Wed, 19 Jun 2024 23:06:56 +0100 Subject: [PATCH] Support hazard-only score-based risk measures; expose multiple measures types to API (#306) --- src/physrisk/api/v1/impact_req_resp.py | 1 + src/physrisk/container.py | 3 + src/physrisk/data/zarr_reader.py | 4 +- src/physrisk/kernel/calculation.py | 22 +++++-- src/physrisk/kernel/risk.py | 23 ++++--- src/physrisk/requests.py | 16 +++-- .../risk_models/generic_risk_model.py | 3 +- src/physrisk/risk_models/risk_models.py | 8 ++- .../vulnerability_models/example_models.py | 64 ++++++++++++++++++- tests/kernel/exposure_test.py | 2 + tests/risk_models/risk_models_test.py | 58 ++++++++++++++--- 11 files changed, 171 insertions(+), 33 deletions(-) diff --git a/src/physrisk/api/v1/impact_req_resp.py b/src/physrisk/api/v1/impact_req_resp.py index 11784525..bb5e753c 100644 --- a/src/physrisk/api/v1/impact_req_resp.py +++ b/src/physrisk/api/v1/impact_req_resp.py @@ -21,6 +21,7 @@ class AssetImpactRequest(BaseModel): include_asset_level: bool = Field(True, description="If true, include asset-level impacts.") include_measures: bool = Field(False, description="If true, include calculation of risk measures.") include_calc_details: bool = Field(True, description="If true, include impact calculation details.") + use_case_id: str = Field("", description="Identifier for 'use case' used in the risk measures calculation.") provider_max_requests: Dict[str, int] = Field( {}, description="The maximum permitted number of requests \ diff --git a/src/physrisk/container.py b/src/physrisk/container.py index e5945950..36fcc257 100644 --- a/src/physrisk/container.py +++ b/src/physrisk/container.py @@ -58,6 +58,8 @@ class Container(containers.DeclarativeContainer): hazard_model_factory = providers.Factory(ZarrHazardModelFactory, reader=zarr_reader, source_paths=source_paths) + measures_factory = providers.Factory(calc.DefaultMeasuresFactory) + vulnerability_models_factory = providers.Factory(DictBasedVulnerabilityModelsFactory) requester = providers.Singleton( @@ -68,4 +70,5 @@ class Container(containers.DeclarativeContainer): inventory_reader=inventory_reader, reader=zarr_reader, colormaps=colormaps, + measures_factory=measures_factory, ) diff --git a/src/physrisk/data/zarr_reader.py b/src/physrisk/data/zarr_reader.py index 336fd5d6..38e340f4 100644 --- a/src/physrisk/data/zarr_reader.py +++ b/src/physrisk/data/zarr_reader.py @@ -60,8 +60,8 @@ def all_data(self, set_id: str): @classmethod def create_s3_zarr_store(cls, get_env: Callable[[str, Optional[str]], str] = get_env): - access_key = get_env(cls.__access_key, None) - secret_key = get_env(cls.__secret_key, None) + access_key = get_env(cls.__access_key, "") + secret_key = get_env(cls.__secret_key, "") s3_bucket = get_env(cls.__S3_bucket, "physrisk-hazard-indicators") zarr_path = get_env(cls.__zarr_path, "hazard/hazard.zarr") diff --git a/src/physrisk/kernel/calculation.py b/src/physrisk/kernel/calculation.py index feac3cff..4714379f 100644 --- a/src/physrisk/kernel/calculation.py +++ b/src/physrisk/kernel/calculation.py @@ -1,11 +1,15 @@ -from typing import Dict, Sequence +from typing import Dict, Sequence, Type from physrisk.data.pregenerated_hazard_model import ZarrHazardModel from physrisk.hazard_models.core_hazards import get_default_source_paths -from physrisk.kernel.risk import RiskMeasureCalculator +from physrisk.kernel.hazards import Fire +from physrisk.kernel.impact_distrib import ImpactType +from physrisk.kernel.risk import RiskMeasureCalculator, RiskMeasuresFactory +from physrisk.risk_models.generic_risk_model import GenericScoreBasedRiskMeasures from physrisk.risk_models.risk_models import RealEstateToyRiskMeasures from physrisk.vulnerability_models import power_generating_asset_models as pgam from physrisk.vulnerability_models.chronic_heat_models import ChronicHeatGZNModel +from physrisk.vulnerability_models.example_models import PlaceholderVulnerabilityModel from physrisk.vulnerability_models.real_estate_models import ( CoolingModel, GenericTropicalCycloneModel, @@ -41,7 +45,10 @@ def get_default_vulnerability_models() -> Dict[type, Sequence[VulnerabilityModel RealEstateCoastalInundationModel(), RealEstateRiverineInundationModel(), GenericTropicalCycloneModel(), - CoolingModel(), + PlaceholderVulnerabilityModel("fire_probability", Fire, ImpactType.damage), + # PlaceholderVulnerabilityModel("days/above/35c", ChronicHeat, ImpactType.damage), + # PlaceholderVulnerabilityModel("days/above/5cm", Hail, ImpactType.damage), + # PlaceholderVulnerabilityModel("months/spei3m/below/-2", Drought, ImpactType.damage), ], PowerGeneratingAsset: [pgam.InundationModel()], RealEstateAsset: [ @@ -63,6 +70,13 @@ def get_default_vulnerability_models() -> Dict[type, Sequence[VulnerabilityModel } -def get_default_risk_measure_calculators() -> Dict[type, RiskMeasureCalculator]: +def get_default_risk_measure_calculators() -> Dict[Type[Asset], RiskMeasureCalculator]: """For asset-level risk measure, define the measure calculators to use.""" return {RealEstateAsset: RealEstateToyRiskMeasures()} + + +class DefaultMeasuresFactory(RiskMeasuresFactory): + def calculators(self, use_case_id: str) -> Dict[Type[Asset], RiskMeasureCalculator]: + if use_case_id == "generic": + return {Asset: GenericScoreBasedRiskMeasures()} + return get_default_risk_measure_calculators() diff --git a/src/physrisk/kernel/risk.py b/src/physrisk/kernel/risk.py index 9c0142ea..3f65bc1f 100644 --- a/src/physrisk/kernel/risk.py +++ b/src/physrisk/kernel/risk.py @@ -7,7 +7,6 @@ from physrisk.kernel.hazard_model import HazardModel from physrisk.kernel.hazards import Hazard, all_hazards from physrisk.kernel.impact import AssetImpactResult, ImpactKey, calculate_impacts -from physrisk.kernel.impact_distrib import EmptyImpactDistrib from physrisk.kernel.vulnerability_model import VulnerabilityModels # from asyncio import ALL_COMPLETED @@ -87,13 +86,20 @@ class Measure: class RiskMeasureCalculator(Protocol): - def calc_measure(self, hazard_type: type, base_impact: AssetImpactResult, impact: AssetImpactResult) -> Measure: ... + def calc_measure( + self, hazard_type: Type[Hazard], base_impact: AssetImpactResult, impact: AssetImpactResult + ) -> Optional[Measure]: ... - def get_definition(self, hazard_type: type) -> ScoreBasedRiskMeasureDefinition: ... + def get_definition(self, hazard_type: Type[Hazard]) -> ScoreBasedRiskMeasureDefinition: ... def supported_hazards(self) -> Set[type]: ... +class RiskMeasuresFactory(Protocol): + def calculators(self, use_case_id: str) -> Dict[Type[Asset], RiskMeasureCalculator]: + pass + + class AssetLevelRiskModel(RiskModel): def __init__( self, @@ -123,7 +129,10 @@ def populate_measure_definitions( # the identifiers of the score-based risk measures used for each asset, for each hazard type measure_ids_for_hazard: Dict[Type[Hazard], List[str]] = {} # one - calcs_by_asset = [self._measure_calculators.get(type(asset), None) for asset in assets] + calcs_by_asset = [ + self._measure_calculators.get(type(asset), self._measure_calculators.get(Asset, None)) for asset in assets + ] + # match to specific asset and if no match then use the generic calculator assigned to Asset used_calcs = {c for c in calcs_by_asset if c is not None} # get all measures measure_id_lookup = { @@ -167,9 +176,7 @@ def calculate_risk_measures(self, assets: Sequence[Asset], prosp_scens: Sequence prosp_impact = impacts.get( ImpactKey(asset=asset, hazard_type=hazard_type, scenario=prosp_scen, key_year=year) ) - if not isinstance(base_impact.impact, EmptyImpactDistrib) and not isinstance( - prosp_impact.impact, EmptyImpactDistrib - ): - risk_ind = measure_calc.calc_measure(hazard_type, base_impact, prosp_impact) + risk_ind = measure_calc.calc_measure(hazard_type, base_impact, prosp_impact) + if risk_ind is not None: measures[MeasureKey(asset, prosp_scen, year, hazard_type)] = risk_ind return impacts, measures diff --git a/src/physrisk/requests.py b/src/physrisk/requests.py index 315be729..6cd93e39 100644 --- a/src/physrisk/requests.py +++ b/src/physrisk/requests.py @@ -19,7 +19,7 @@ from physrisk.kernel.hazards import all_hazards from physrisk.kernel.impact import AssetImpactResult, ImpactKey # , ImpactKey from physrisk.kernel.impact_distrib import EmptyImpactDistrib -from physrisk.kernel.risk import AssetLevelRiskModel, Measure, MeasureKey +from physrisk.kernel.risk import AssetLevelRiskModel, Measure, MeasureKey, RiskMeasureCalculator, RiskMeasuresFactory from physrisk.kernel.vulnerability_model import ( DictBasedVulnerabilityModels, VulnerabilityModels, @@ -74,9 +74,11 @@ def __init__( inventory_reader: InventoryReader, reader: ZarrReader, colormaps: Colormaps, + measures_factory: RiskMeasuresFactory, ): self.colormaps = colormaps self.hazard_model_factory = hazard_model_factory + self.measures_factory = measures_factory self.vulnerability_models_factory = vulnerability_models_factory self.inventory = inventory self.inventory_reader = inventory_reader @@ -109,7 +111,10 @@ def get(self, *, request_id, request_dict): interpolation=request.calc_settings.hazard_interp, provider_max_requests=request.provider_max_requests ) vulnerability_models = self.vulnerability_models_factory.vulnerability_models() - return dumps(_get_asset_impacts(request, hazard_model, vulnerability_models).model_dump()) + measure_calculators = self.measures_factory.calculators(request.use_case_id) + return dumps( + _get_asset_impacts(request, hazard_model, vulnerability_models, measure_calculators).model_dump() + ) elif request_id == "get_example_portfolios": return dumps(_get_example_portfolios()) else: @@ -320,6 +325,7 @@ def _get_asset_impacts( request: AssetImpactRequest, hazard_model: HazardModel, vulnerability_models: Optional[VulnerabilityModels] = None, + measure_calculators: Optional[Dict[Type[Asset], RiskMeasureCalculator]] = None, assets: Optional[List[Asset]] = None, ): vulnerability_models = ( @@ -330,8 +336,10 @@ def _get_asset_impacts( # we keep API definition of asset separate from internal Asset class; convert by reflection # based on asset_class: _assets = create_assets(request.assets, assets) - measure_calcs = calc.get_default_risk_measure_calculators() - risk_model = AssetLevelRiskModel(hazard_model, vulnerability_models, measure_calcs) + measure_calculators = ( + calc.get_default_risk_measure_calculators() if measure_calculators is None else measure_calculators + ) + risk_model = AssetLevelRiskModel(hazard_model, vulnerability_models, measure_calculators) scenarios = [request.scenario] if request.scenarios is None or len(request.scenarios) == 0 else request.scenarios years = [request.year] if request.years is None or len(request.years) == 0 else request.years diff --git a/src/physrisk/risk_models/generic_risk_model.py b/src/physrisk/risk_models/generic_risk_model.py index 39718946..0a96b4ab 100644 --- a/src/physrisk/risk_models/generic_risk_model.py +++ b/src/physrisk/risk_models/generic_risk_model.py @@ -115,6 +115,7 @@ def calc_measure( # in general we want to use the impact distribution, but in certain circumstances we can use # the underlying hazard data some care is needed given that vulnerability models are interchangeable # (what if the vulnerability model used does not make use of the hazard indicator we require?) + (lower_bounds, categories, bounds) = self._bounds_lookup[hazard_type] if isinstance(bounds[0], HazardIndicatorBounds): assert impact_res.hazard_data is not None @@ -146,4 +147,4 @@ def get_definition(self, hazard_type: Type[Hazard]): return self._definition_lookup.get(hazard_type, None) def supported_hazards(self) -> Set[type]: - return set([Wind]) # RiverineInundation, CoastalInundation, + return set([Wind]) # Fire, RiverineInundation, CoastalInundation, diff --git a/src/physrisk/risk_models/risk_models.py b/src/physrisk/risk_models/risk_models.py index deb1cbda..01723ad4 100644 --- a/src/physrisk/risk_models/risk_models.py +++ b/src/physrisk/risk_models/risk_models.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Callable, Set, Type +from typing import Callable, Optional, Set, Type from physrisk.api.v1.impact_req_resp import ( Category, @@ -9,7 +9,7 @@ ) from physrisk.kernel.hazards import ChronicHeat, CoastalInundation, Hazard, RiverineInundation, Wind from physrisk.kernel.impact import AssetImpactResult -from physrisk.kernel.impact_distrib import ImpactDistrib +from physrisk.kernel.impact_distrib import EmptyImpactDistrib, ImpactDistrib from physrisk.kernel.risk import Measure, RiskMeasureCalculator @@ -162,7 +162,9 @@ def _cooling_description(self, category: Category): def calc_measure( self, hazard_type: Type[Hazard], base_impact_res: AssetImpactResult, impact_res: AssetImpactResult - ) -> Measure: + ) -> Optional[Measure]: + if isinstance(base_impact_res.impact, EmptyImpactDistrib) or isinstance(impact_res.impact, EmptyImpactDistrib): + return None if hazard_type == ChronicHeat: return self.calc_measure_cooling(hazard_type, base_impact_res.impact, impact_res.impact) else: diff --git a/src/physrisk/vulnerability_models/example_models.py b/src/physrisk/vulnerability_models/example_models.py index d4edc16a..4817ed75 100644 --- a/src/physrisk/vulnerability_models/example_models.py +++ b/src/physrisk/vulnerability_models/example_models.py @@ -1,8 +1,14 @@ +from typing import Iterable + import numpy as np -from ..kernel.impact_distrib import ImpactType +from physrisk.kernel.assets import Asset +from physrisk.kernel.hazard_model import HazardDataRequest, HazardDataResponse +from physrisk.kernel.hazards import Fire + +from ..kernel.impact_distrib import EmptyImpactDistrib, ImpactDistrib, ImpactType from ..kernel.vulnerability_matrix_provider import VulnMatrixProvider -from ..kernel.vulnerability_model import VulnerabilityModel, checked_beta_distrib +from ..kernel.vulnerability_model import VulnerabilityModel, VulnerabilityModelBase, checked_beta_distrib class ExampleCdfBasedVulnerabilityModel(VulnerabilityModel): @@ -25,3 +31,57 @@ def get_impact_curve(self, intensities, asset): return VulnMatrixProvider( intensities, impact_cdfs=[checked_beta_distrib(m, s) for m, s in zip(impact_means, impact_stddevs)] ) + + +class PlaceholderVulnerabilityModel(VulnerabilityModelBase): + """Placeholder vulnerability models are for when an impact or hazard indicator value + is only intended to be used in a score-based risk measure. + """ + + def __init__(self, indicator_id: str, hazard_type: type, impact_type: ImpactType): + super().__init__(indicator_id=indicator_id, hazard_type=hazard_type, impact_type=impact_type) + + def get_data_requests(self, asset: Asset, *, scenario: str, year: int): + return HazardDataRequest( + self.hazard_type, + asset.longitude, + asset.latitude, + scenario=scenario, + year=year, + indicator_id=self.indicator_id, + ) + + def get_impact(self, asset: Asset, data_responses: Iterable[HazardDataResponse]) -> ImpactDistrib: + return EmptyImpactDistrib() + + +class PlaceholderFireModel(PlaceholderVulnerabilityModel): + def __init__(self): + """This model has two functions: + 1) Demonstrate how a wildfire model can be added in such a way that it combines as expected + with other acute and chronic risks. + 2) Provide a vulnerability function that can supply a reference impact distribution. + This can be used with score-based measures. + + The hazard indicator used is annual probability of wildfire. This is the probability that + in a year there is a wildfire such that the wildfire zone includes the asset location. + 'Wildfire' is taken to be a major fire event, one with the potential to damage or disrupt the asset. + Wildfire zone refers to the affected area, i.e. into which the wildfire propagates. + + Typically fire models are calculated using a certain pixel size and provide the probability that + the wildfire propagates into the pixel containing the asset. For large pixel sizes, clearly this is + not the same as the probability that the asset is within the wildfire zone. + """ + ... + + super().__init__( + indicator_id="fire_probability", + hazard_type=Fire, + impact_type=ImpactType.damage, + ) + + def get_impact(self, asset: Asset, data_responses: Iterable[HazardDataResponse]) -> ImpactDistrib: + # params = data_responses + # assert isinstance(data_response, HazardParameterDataResponse) + # TODO add model with stochastic impact + return EmptyImpactDistrib() diff --git a/tests/kernel/exposure_test.py b/tests/kernel/exposure_test.py index de150dbd..8c726a5d 100644 --- a/tests/kernel/exposure_test.py +++ b/tests/kernel/exposure_test.py @@ -11,6 +11,7 @@ from physrisk.data.zarr_reader import ZarrReader from physrisk.hazard_models.core_hazards import get_default_source_paths from physrisk.kernel.assets import Asset +from physrisk.kernel.calculation import DefaultMeasuresFactory from physrisk.kernel.exposure import Category, JupterExposureMeasure, calculate_exposures from physrisk.kernel.hazards import ChronicHeat, CombinedInundation, Drought, Fire, Hail, Wind from physrisk.requests import Requester @@ -30,6 +31,7 @@ def test_jupiter_exposure_service(self): inventory_reader=InventoryReader(fs=local.LocalFileSystem(), base_path=""), reader=ZarrReader(store=store), colormaps=inventory.colormaps(), + measures_factory=DefaultMeasuresFactory, ) assets_api = physrisk.api.v1.common.Assets( items=[ diff --git a/tests/risk_models/risk_models_test.py b/tests/risk_models/risk_models_test.py index bb68c3e6..5ad82608 100644 --- a/tests/risk_models/risk_models_test.py +++ b/tests/risk_models/risk_models_test.py @@ -1,21 +1,28 @@ """ Test asset impact calculations.""" -from typing import Sequence +from typing import Dict, Sequence +import fsspec.implementations.local as local import numpy as np +from dependency_injector import providers from physrisk import requests -from physrisk.api.v1.impact_req_resp import RiskMeasureKey, RiskMeasuresHelper +from physrisk.api.v1.impact_req_resp import AssetImpactResponse, RiskMeasureKey, RiskMeasuresHelper +from physrisk.container import Container +from physrisk.data.inventory_reader import InventoryReader from physrisk.data.pregenerated_hazard_model import ZarrHazardModel +from physrisk.data.zarr_reader import ZarrReader from physrisk.hazard_models.core_hazards import get_default_source_paths from physrisk.kernel.assets import Asset, RealEstateAsset from physrisk.kernel.calculation import get_default_vulnerability_models -from physrisk.kernel.hazards import ChronicHeat, CoastalInundation, RiverineInundation, Wind +from physrisk.kernel.hazard_model import HazardModelFactory +from physrisk.kernel.hazards import ChronicHeat, CoastalInundation, Fire, RiverineInundation, Wind from physrisk.kernel.risk import AssetLevelRiskModel, MeasureKey from physrisk.kernel.vulnerability_model import DictBasedVulnerabilityModels from physrisk.requests import _create_risk_measures from physrisk.risk_models.generic_risk_model import GenericScoreBasedRiskMeasures from physrisk.risk_models.risk_models import RealEstateToyRiskMeasures +from tests.api.container_test import TestContainer from ..base_test import TestWithCredentials from ..data.hazard_model_store_test import TestData, ZarrStoreMocker, inundation_return_periods @@ -104,6 +111,9 @@ def sp_wind(scenario, year): def sp_heat(scenario, year): return source_paths[ChronicHeat](indicator_id="mean_degree_days/above/index", scenario=scenario, year=year) + def sp_fire(scenario, year): + return source_paths[Fire](indicator_id="fire_probability", scenario=scenario, year=year) + mocker = ZarrStoreMocker() return_periods = inundation_return_periods() flood_histo_curve = np.array([0.0596, 0.333, 0.505, 0.715, 0.864, 1.003, 1.149, 1.163, 1.163]) @@ -147,6 +157,20 @@ def sp_heat(scenario, year): TestData.temperature_thresholds, TestData.degree_days_above_index_2, ) + mocker.add_curves_global( + sp_fire("historical", -1), + TestData.longitudes, + TestData.latitudes, + [0], + [0.15], + ) + mocker.add_curves_global( + sp_fire("rcp8p5", 2050), + TestData.longitudes, + TestData.latitudes, + [0], + [0.2], + ) return ZarrHazardModel(source_paths=get_default_source_paths(), store=mocker.store) @@ -167,12 +191,28 @@ def test_via_requests(self): "scenarios": scenarios, } - request = requests.AssetImpactRequest(**request_dict) - response = requests._get_asset_impacts( - request, - hazard_model, - vulnerability_models=DictBasedVulnerabilityModels(get_default_vulnerability_models()), - ) + # request = requests.AssetImpactRequest(**request_dict) + + container = Container() + + class TestHazardModelFactory(HazardModelFactory): + def hazard_model(self, interpolation: str = "floor", provider_max_requests: Dict[str, int] = ...): + return hazard_model + + container.override_providers(hazard_model_factory=providers.Factory(TestHazardModelFactory)) + container.override_providers(config=providers.Configuration(default={"zarr_sources": ["embedded"]})) + container.override_providers(inventory_reader=None) + container.override_providers(zarr_reader=None) + + requester = container.requester() + res = requester.get(request_id="get_asset_impact", request_dict=request_dict) + response = AssetImpactResponse.model_validate_json(res) + + # response = requests._get_asset_impacts( + # request, + # hazard_model, + # vulnerability_models=DictBasedVulnerabilityModels(get_default_vulnerability_models()), + # ) res = next( ma for ma in response.risk_measures.measures_for_assets if ma.key.hazard_type == "RiverineInundation" )