Skip to content

Commit

Permalink
Document update and refactor (#303)
Browse files Browse the repository at this point in the history
* Doc update; standard protection handling

Signed-off-by: Joe Moorhouse <[email protected]>

* Allow empty environment variables for inventory reader

Signed-off-by: Joe Moorhouse <[email protected]>

* Tidy

Signed-off-by: Joe Moorhouse <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: Joe Moorhouse <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
joemoorhouse and pre-commit-ci[bot] authored Jun 14, 2024
1 parent 50e4e31 commit 4f1769a
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 161 deletions.
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,14 @@ target/
src/test/data/coords.json
credentials.env
.pdm-python

# Latex
methodology/PhysicalRiskMethodology.aux
methodology/PhysicalRiskMethodology.blg
methodology/PhysicalRiskMethodology.fdb_latexmk
methodology/PhysicalRiskMethodology.fls
methodology/PhysicalRiskMethodology.glo
methodology/PhysicalRiskMethodology.glsdefs
methodology/PhysicalRiskMethodology.ist
methodology/PhysicalRiskMethodology.out
methodology/PhysicalRiskMethodology.synctex.gz
43 changes: 43 additions & 0 deletions docs/user-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,46 @@ Physic comprises:
* Financial models that use the impacts calculated by the :code:`VulnerabilityModels` to calculate risk measures and scores.

:code:`VulnerabilityModels` request hazard indicators using an :code:`indicator_id` (e.g. 'flood_depth' for inundation, 'max_speed' for wind). It is the responsibility of the :code:`HazardModel` to select the source of the hazard indicator data.

Note that units of the quantity are provided to the :code:`VulnerabilityModel` by the :code:`HazardModel`.

Hazard indicator data sets
-------------------------
The :code:`HazardModel` retrieves hazard indicators in a number of ways and can be made composite in order to combine different ways of accessing the data. At time of writing the common cases are that:

1. Hazard indicator data is stored in `Zarr <https://zarr.readthedocs.io/en/stable/>`_ format (in an arbitrary Zarr store, although S3 is a popular choice).
2. Hazard indicator data is retrieved via call to an external API. This is mainly used when combining commercial data to the public-domain.

In case 1, hazard indicators are stored as three dimensional arrays. The array is ordered :math:`(z, y, x)` where :math:`y` is the spatial :math:`y` coordinate, :math:`x` is the spatial :math:`x` coordinate and :math:`z` is an *index* coordinate. The *index* takes on different meanings according to the type of data being stored.

Indicators can be either:

* Acute (A): the data comprises a set of hazard intensities for different return periods. In this case *index* refers to the different return periods.
* Parametric (P): the data comprises a set of parameters. Here *index* refers to the different parameters. The parameters may be single values, or *index* might refer to a set of thresholds. Parametric indicators are used for chronic hazards.

As mentioned above, :code:`VulnerabilityModels` only specify the identifier of the hazard indicator that is required, as well as the climate scenario ID and the year of the future projection. This means that hazard indicator ID uniquely defines the data. For example, a vulnerability model requesting 'flood depth' could have data returned from a variety of data sets, depending on how the :code:`HazardModel` is configured. But

+-----------------------+-------------------------------+---------------------------------------+
| Hazard class | Indicator ID (type) | Description |
+=======================+===============================+=======================================+
| CoastalInundation, | flood_depth (A) | Flood depth (m) for available |
| PluvialInundation, | | return periods. This is unprotected |
| RiverineInundation | | depth. |
| +-------------------------------+---------------------------------------+
| | sop (P) | Standard of protection |
| | | (as return period in years). |
+-----------------------+-------------------------------+---------------------------------------+
| Fire | fire_probability (P) | Annual probability that location |
| | | is in a wildfire zone. |
+-----------------------+-------------------------------+---------------------------------------+
| Heat | mean_degree_days/above/index | Mean mean-temperature degree days per |
| | (P) | year above a set of temperature |
| | | threshold indices. |
+-----------------------+-------------------------------+---------------------------------------+
| Drought | months/spei/12m/below/index | Mean months per year where the 12 |
| | (P) | month SPEI index is below a set of |
| | | indices. |
+-----------------------+-------------------------------+---------------------------------------+
| Wind | max_speed | Maximum 1 minute sustained wind speed |
| | (A) | for available return periods. |
+-----------------------+-------------------------------+---------------------------------------+
4 changes: 2 additions & 2 deletions src/physrisk/api/v1/common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

import numpy as np
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -83,7 +83,7 @@ class IntensityCurve(BaseModel):
return_periods: Optional[List[float]] = Field(
[], description="[Deprecated] Return period in years in the case of an acute hazard."
)
index_values: Optional[List[float]] = Field(
index_values: Optional[Union[List[float], List[str]]] = Field(
[],
description="Set of index values. \
This is return period in years in the case of an acute hazard or \
Expand Down
103 changes: 28 additions & 75 deletions src/physrisk/data/hazard_data_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,32 +44,24 @@ def __init__(
zarr_reader: Optional[ZarrReader] = None,
interpolation: Optional[str] = "floor",
):
"""Create an EventProvider.
"""Provides hazard data.
Args:
get_source_path: provides the path to the hazard event data source depending on year/scenario/model.
get_source_path (SourcePath): Provides the source path mappings.
store (Optional[MutableMapping], optional): Zarr store instance. Defaults to None.
zarr_reader (Optional[ZarrReader], optional): ZarrReader instance. Defaults to None.
interpolation (Optional[str], optional): Interpolation type. Defaults to "floor".
Raises:
ValueError: If interpolation not in permitted list.
"""
self._get_source_path = get_source_path
self._reader = zarr_reader if zarr_reader is not None else ZarrReader(store=store)
if interpolation not in ["floor", "linear", "max", "min"]:
raise ValueError("interpolation must be 'floor', 'linear', 'max' or 'min'")
self._interpolation = interpolation


class AcuteHazardDataProvider(HazardDataProvider):
"""Provides hazard event intensities for a single Hazard (type of hazard event)."""

def __init__(
self,
get_source_path: SourcePath,
*,
store: Optional[MutableMapping] = None,
zarr_reader: Optional[ZarrReader] = None,
interpolation: Optional[str] = "floor",
):
super().__init__(get_source_path, store=store, zarr_reader=zarr_reader, interpolation=interpolation)

def get_intensity_curves(
def get_data(
self,
longitudes: List[float],
latitudes: List[float],
Expand All @@ -80,32 +72,37 @@ def get_intensity_curves(
hint: Optional[HazardDataHint] = None,
buffer: Optional[int] = None,
):
"""Get intensity curve for each latitude and longitude coordinate pair.
"""Returns data for set of latitude and longitudes.
Args:
longitudes: list of longitudes.
latitudes: list of latitudes.
model: model identifier.
scenario: identifier of scenario, e.g. rcp8p5 (RCP 8.5).
year: projection year, e.g. 2080.
buffer: delimitation of the area for the hazard data expressed in metres (within [0,1000]).
longitudes (List[float]): List of longitudes.
latitudes (List[float]): List of latitudes.
indicator_id (str): Hazard Indicator ID.
scenario (str): Identifier of scenario, e.g. ssp585 (SSP 585), rcp8p5 (RCP 8.5).
year (int): Projection year, e.g. 2080.
hint (Optional[HazardDataHint], optional): Hint. Defaults to None.
buffer (Optional[int], optional): Buffer around each point
expressed in metres (within [0, 1000]). Defaults to None.
Raises:
Exception: _description_
Returns:
curves: numpy array of intensity (no. coordinate pairs, no. return periods).
return_periods: return periods in years.
units: units.
path: path to the hazard indicator data source.
values (np.ndarray): Hazard indicator values.
indices (np.ndarray): Index values.
units (str). Units
path: Path to the hazard indicator data source.
"""

path = self._get_source_path(indicator_id=indicator_id, scenario=scenario, year=year, hint=hint)
if buffer is None:
curves, return_periods, units = self._reader.get_curves(
values, indices, units = self._reader.get_curves(
path, longitudes, latitudes, self._interpolation
) # type: ignore
else:
if buffer < 0 or 1000 < buffer:
raise Exception("The buffer must be an integer between 0 and 1000 metres.")
curves, return_periods, units = self._reader.get_max_curves(
values, indices, units = self._reader.get_max_curves(
path,
[
(
Expand All @@ -119,48 +116,4 @@ def get_intensity_curves(
],
self._interpolation,
) # type: ignore
return curves, return_periods, units, path


class ChronicHazardDataProvider(HazardDataProvider):
"""Provides hazard parameters for a single type of chronic hazard."""

def __init__(
self,
get_source_path: SourcePath,
*,
store: Optional[MutableMapping] = None,
zarr_reader: Optional[ZarrReader] = None,
interpolation: Optional[str] = "floor",
):
super().__init__(get_source_path, store=store, zarr_reader=zarr_reader, interpolation=interpolation)

def get_parameters(
self,
longitudes: List[float],
latitudes: List[float],
*,
indicator_id: str,
scenario: str,
year: int,
hint: Optional[HazardDataHint] = None,
):
"""Get hazard parameters for each latitude and longitude coordinate pair.
Args:
longitudes: list of longitudes.
latitudes: list of latitudes.
model: model identifier.
scenario: identifier of scenario, e.g. rcp8p5 (RCP 8.5).
year: projection year, e.g. 2080.
Returns:
parameters: numpy array of parameters.
defns: numpy array defining the parameters (e.g. provides thresholds).
units: units of the parameters.
path: path to the hazard indicator data source.
"""

path = self._get_source_path(indicator_id=indicator_id, scenario=scenario, year=year, hint=hint)
parameters, defns, units = self._reader.get_curves(path, longitudes, latitudes, self._interpolation)
return parameters, defns, units, path
return values, indices, units, path
8 changes: 4 additions & 4 deletions src/physrisk/data/inventory_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class InventoryReader:
# environment variable names:
__access_key = "OSC_S3_ACCESS_KEY"
__secret_key = "OSC_S3_SECRET_KEY"
__S3_bucket = "OSC_S3_BUCKET" # e.g. redhat-osc-physical-landing-647521352890
__S3_bucket = "OSC_S3_BUCKET"

def __init__(
self,
Expand All @@ -35,11 +35,11 @@ def __init__(
fs (Optional[AbstractFileSystem], optional): AbstractFileSystem. Defaults to None in which case S3FileSystem will be created. # noqa: E501
"""
if fs is None:
access_key = get_env(self.__access_key, None)
secret_key = get_env(self.__secret_key, None)
access_key = get_env(self.__access_key, "")
secret_key = get_env(self.__secret_key, "")
fs = s3fs.S3FileSystem(anon=False, key=access_key, secret=secret_key)

bucket = get_env(self.__S3_bucket, "redhat-osc-physical-landing-647521352890")
bucket = get_env(self.__S3_bucket, "physrisk-hazard-indicators")
self._base_path = bucket if base_path is None else base_path
self._fs = fs

Expand Down
44 changes: 13 additions & 31 deletions src/physrisk/data/pregenerated_hazard_model.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import concurrent.futures
from collections import defaultdict
from typing import Dict, List, Mapping, MutableMapping, Optional, cast
from typing import Dict, List, Mapping, MutableMapping, Optional, Type

import numpy as np

from physrisk.data.zarr_reader import ZarrReader
from physrisk.kernel.hazards import Hazard, HazardKind
from physrisk.kernel.hazards import Hazard, IndicatorData, indicator_data

from ..kernel.hazard_model import (
HazardDataFailedResponse,
Expand All @@ -15,26 +15,17 @@
HazardModel,
HazardParameterDataResponse,
)
from .hazard_data_provider import AcuteHazardDataProvider, ChronicHazardDataProvider, HazardDataProvider, SourcePath
from .hazard_data_provider import HazardDataProvider, SourcePath


class PregeneratedHazardModel(HazardModel):
"""Hazard event model that processes requests using EventProviders."""

def __init__(
self,
hazard_data_providers: Dict[type, HazardDataProvider],
hazard_data_providers: Dict[Type[Hazard], HazardDataProvider],
):
self.acute_hazard_data_providers = dict(
(k, cast(AcuteHazardDataProvider, v))
for (k, v) in hazard_data_providers.items()
if Hazard.kind(k) == HazardKind.acute
)
self.chronic_hazard_data_providers = dict(
(k, cast(ChronicHazardDataProvider, v))
for (k, v) in hazard_data_providers.items()
if Hazard.kind(k) == HazardKind.chronic
)
self.hazard_data_providers = hazard_data_providers

def get_hazard_events( # noqa: C901
self, requests: List[HazardDataRequest]
Expand Down Expand Up @@ -83,10 +74,8 @@ def _get_hazard_events_batch(
)
longitudes = [req.longitude for req in batch]
latitudes = [req.latitude for req in batch]
if hazard_type.kind == HazardKind.acute: # type: ignore
intensities, return_periods, units, path = self.acute_hazard_data_providers[
hazard_type
].get_intensity_curves(
if indicator_data(hazard_type, indicator_id) == IndicatorData.EVENT:
intensities, return_periods, units, path = self.hazard_data_providers[hazard_type].get_data(
longitudes,
latitudes,
indicator_id=indicator_id,
Expand All @@ -102,8 +91,8 @@ def _get_hazard_events_batch(
if len(valid_periods) == 0:
valid_periods, valid_intensities = np.array([100]), np.array([0])
responses[req] = HazardEventDataResponse(valid_periods, valid_intensities, units, path)
elif hazard_type.kind == HazardKind.chronic: # type: ignore
parameters, defns, units, path = self.chronic_hazard_data_providers[hazard_type].get_parameters(
else: # type: ignore
parameters, defns, units, path = self.hazard_data_providers[hazard_type].get_data(
longitudes, latitudes, indicator_id=indicator_id, scenario=scenario, year=year, hint=hint
)

Expand All @@ -121,7 +110,7 @@ class ZarrHazardModel(PregeneratedHazardModel):
def __init__(
self,
*,
source_paths: Dict[type, SourcePath],
source_paths: Dict[Type[Hazard], SourcePath],
reader: Optional[ZarrReader] = None,
store=None,
interpolation="floor",
Expand All @@ -130,15 +119,8 @@ def __init__(
zarr_reader = ZarrReader(store=store) if reader is None else reader

super().__init__(
dict(
(
t,
(
AcuteHazardDataProvider(sp, zarr_reader=zarr_reader, interpolation=interpolation)
if Hazard.kind(t) == HazardKind.acute
else ChronicHazardDataProvider(sp, zarr_reader=zarr_reader, interpolation=interpolation)
),
)
{
t: HazardDataProvider(sp, zarr_reader=zarr_reader, interpolation=interpolation)
for t, sp in source_paths.items()
)
}
)
Loading

0 comments on commit 4f1769a

Please sign in to comment.