Skip to content

Commit

Permalink
Change scores API
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Moorhouse <[email protected]>
  • Loading branch information
joemoorhouse committed Oct 16, 2023
1 parent 0e1da54 commit 5c9cdf9
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 76 deletions.
68 changes: 19 additions & 49 deletions src/physrisk/api/v1/impact_req_resp.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,8 @@ class Category(int, Enum):
REDFLAG = 4


class RiskKey(BaseModel):
scenario_id: str
year: str


class RiskMeasureKey(RiskKey):
risk_measure_id: str = Field("", description="Identifier of the risk measure.")


class AssetsRiskScores(BaseModel):
"""Risk scores for a set of assets, with risk measures used to calculate the measures.
A single score may be derived from multiple risk measures in principle, the measures are identified
by the ScoreBasedMeasureDefinition corresponding to the asset.
In principle multiple measures may be used to compute the score, hence 'measures_0', 'measures_1' etc,
although no example yet.
"""

key: RiskKey
scores: List[int] = Field(None, description="Identifier for the risk measure.")
measures_0: List[float]
measures_1: Optional[List[float]]


class AssetRiskMeasures(BaseModel):
"""Risk measures for a set of assets."""

key: RiskMeasureKey
measures: List[float]


class AssetScoreModel(BaseModel):
asset_model_id: List[str]


class RiskMeasureDefinition(BaseModel):
measure_id: str = Field(None, description="Identifier for the risk measure.")
measure_index: int = Field(None, description="Identifier for the risk measure.")
label: str = Field(
"<short description of the measure, e.g. fractional loss for 1-in-100 year event.",
description="Value of the score.",
Expand Down Expand Up @@ -103,26 +68,31 @@ def __hash__(self):
return id(self)


class HazardRiskMeasures(BaseModel):
"""Risk measures for one particular type of hazard"""

class RiskMeasureKey(BaseModel):
hazard_type: str
scores_for_assets: Optional[List[AssetsRiskScores]] = Field(
[], description="Risk scores for the set of assets for different scenarios and years."
)
measures_for_assets: Optional[List[AssetRiskMeasures]] = Field(
[], description="Risk measures for the set of assets for different scenarios and years."
)
score_measure_ids_for_assets: Optional[List[str]] = Field(
None, description="Identifiers of the score-based risk measures used for each asset."
)
scenario_id: str
year: str
measure_id: str


class RiskMeasuresForAssets(BaseModel):
key: RiskMeasureKey
scores: List[int] = Field(None, description="Identifier for the risk measure.")
measures_0: List[float]
measures_1: Optional[List[float]]


class ScoreBasedRiskMeasureSetDefinition(BaseModel):
measure_set_id: str
asset_measure_ids_for_hazard: Dict[str, List[str]]
score_definitions: Optional[Dict[str, ScoreBasedRiskMeasureDefinition]]


class RiskMeasures(BaseModel):
"""Risk measures"""

hazard_risk_measures: List[HazardRiskMeasures]
score_definitions: Optional[Dict[str, ScoreBasedRiskMeasureDefinition]]
measures_for_assets: List[RiskMeasuresForAssets]
score_based_measure_set_defn: ScoreBasedRiskMeasureSetDefinition
measures_definitions: Optional[List[RiskMeasureDefinition]]


Expand Down
37 changes: 17 additions & 20 deletions src/physrisk/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
AssetLevelImpact,
Assets,
AssetSingleImpact,
AssetsRiskScores,
HazardRiskMeasures,
RiskKey,
RiskMeasureKey,
RiskMeasures,
RiskMeasuresForAssets,
ScoreBasedRiskMeasureDefinition,
ScoreBasedRiskMeasureSetDefinition,
)
from .data.image_creator import ImageCreator
from .data.inventory import EmbeddedInventory, Inventory
Expand Down Expand Up @@ -352,15 +352,15 @@ def _create_risk_measures(
RiskMeasures: Output for writing to JSON.
"""
hazard_types = all_hazards()
hazard_risk_measures = []
measure_set_id = "measure_set_1"
measures_for_assets: List[RiskMeasuresForAssets] = []
for hazard_type in hazard_types:
# assets_risk_scores contains the hazard scores for each scenario and year
assets_risk_scores: List[AssetsRiskScores] = []
# if all asset measure IDs are empty
for scenario_id in scenarios:
for year in years:
# we calculate and tag results for each scenario, year and hazard
score_key = RiskKey(scenario_id=scenario_id, year=str(year))
score_key = RiskMeasureKey(
hazard_type=hazard_type.__name__, scenario_id=scenario_id, year=str(year), measure_id=measure_set_id
)
scores = [-1] * len(assets)
measures_0 = [float("nan")] * len(assets)
for i, asset in enumerate(assets):
Expand All @@ -370,20 +370,17 @@ def _create_risk_measures(
if measure is not None:
scores[i] = measure.score
measures_0[i] = measure.measure_0
assets_risk_scores.append(
AssetsRiskScores(key=score_key, scores=scores, measures_0=measures_0, measures_1=None)
measures_for_assets.append(
RiskMeasuresForAssets(key=score_key, scores=scores, measures_0=measures_0, measures_1=None)
)
hazard_risk_measures.append(
HazardRiskMeasures(
hazard_type=hazard_type.__name__,
scores_for_assets=assets_risk_scores,
measures_for_assets=None,
score_measure_ids_for_assets=measure_ids_for_asset[hazard_type],
)
)
return RiskMeasures(
hazard_risk_measures=hazard_risk_measures,
score_based_measure_set_defn = ScoreBasedRiskMeasureSetDefinition(
measure_set_id=measure_set_id,
asset_measure_ids_for_hazard={k.__name__: v for k, v in measure_ids_for_asset.items()},
score_definitions={v: k for (k, v) in definitions.items()},
)
return RiskMeasures(
measures_for_assets=measures_for_assets,
score_based_measure_set_defn=score_based_measure_set_defn,
measures_definitions=None,
)

Expand Down
28 changes: 21 additions & 7 deletions src/test/risk_models/test_risk_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

import numpy as np

from physrisk.api.v1.impact_req_resp import RiskMeasureKey
from physrisk.data.pregenerated_hazard_model import ZarrHazardModel
from physrisk.hazard_models.core_hazards import get_default_source_paths
from physrisk.kernel.assets import RealEstateAsset
from physrisk.kernel.calculation import get_default_vulnerability_models
from physrisk.kernel.hazards import CoastalInundation, RiverineInundation
from physrisk.kernel.risk import AssetLevelRiskModel
from physrisk.kernel.risk import AssetLevelRiskModel, MeasureKey
from physrisk.requests import _create_risk_measures
from physrisk.risk_models.risk_models import RealEstateToyRiskMeasures

Expand Down Expand Up @@ -51,7 +52,7 @@ def sp_coastal(scenario, year):

assets = [
RealEstateAsset(TestData.latitudes[0], TestData.longitudes[0], location="Asia", type="Buildings/Industrial")
for i in range(100)
for i in range(2)
]

model = AssetLevelRiskModel(
Expand All @@ -60,9 +61,22 @@ def sp_coastal(scenario, year):
measure_ids_for_asset, definitions = model.populate_measure_definitions(assets)
_, measures = model.calculate_risk_measures(assets, prosp_scens=scenarios, years=years)

risk_measures = _create_risk_measures(measures, measure_ids_for_asset, definitions, assets, scenarios, years)

#with open("test.json", "w") as f:
# f.write(risk_measures.model_dump_json())
# how to get a score using the MeasureKey
measure = measures[MeasureKey(assets[0], scenarios[0], years[0], RiverineInundation)]
score = measure.score
measure_0 = measure.measure_0

# print(measures[MeasureKey(assets[0], scenarios[0], years[0], RiverineInundation)])
# packing up the risk measures, e.g. for JSON transmission:
risk_measures = _create_risk_measures(measures, measure_ids_for_asset, definitions, assets, scenarios, years)
# we still have a key, but no asset:
key = RiskMeasureKey(
hazard_type="RiverineInundation",
scenario_id=scenarios[0],
year=str(years[0]),
measure_id=risk_measures.score_based_measure_set_defn.measure_set_id,
)
item = next(m for m in risk_measures.measures_for_assets if m.key == key)
score2 = item.scores[0]
measure_0_2 = item.measures_0[0]
assert score == score2
assert measure_0 == measure_0_2

0 comments on commit 5c9cdf9

Please sign in to comment.