Skip to content

Commit

Permalink
progress on combine
Browse files Browse the repository at this point in the history
  • Loading branch information
spjuhel committed Sep 26, 2024
1 parent 41a09d8 commit bebfcb6
Showing 1 changed file with 117 additions and 34 deletions.
151 changes: 117 additions & 34 deletions climada_petals/engine/supplychain/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
Define the SupplyChain class.
"""

from __future__ import (
annotations,
)
from functools import reduce
import operator # See https://stackoverflow.com/questions/33533148/how-do-i-type-hint-a-method-with-the-type-of-the-enclosing-class

# Replace by `from typing import Self` when python 3.11+ is the min required version.

__all__ = ["SupplyChain"]

import logging
Expand Down Expand Up @@ -51,7 +59,7 @@
download_mriot,
translate_exp_to_regions,
translate_exp_to_sectors,
translate_reg_impact_to_mriot_regions
translate_reg_impact_to_mriot_regions,
)

LOGGER = logging.getLogger(__name__)
Expand All @@ -62,6 +70,7 @@
VA_NAME = "value added"
"""Index name for value added"""


class DirectShock:
"""DirectShock class.
Expand All @@ -87,81 +96,155 @@ class DirectShock:
"""
def __init__(self, mriot, exposed_assets, impacted_assets) -> None:

def __init__(
self,
mriot: pymrio.IOSystem,
exposed_assets: pd.Series,
impacted_assets: pd.DataFrame,
shock_name :str = "unnamed"
) -> None:
self.mriot_sectors = mriot.get_sectors()
self.mriot_regions = mriot.get_regions()
self.mriot_name = mriot.name
self.shock_name = shock_name
self.exposed_assets = exposed_assets
self.impacted_assets = impacted_assets
self.shock = 1 - (impacted_assets / exposed_assets)
self.impacted_assets = impacted_assets # indexed should be named "event_id".

# constructor without mriot (for combine)?

@property
def shock(self):
return 1 - (self.impacted_assets / self.exposed_assets)

@classmethod
def from_exp_imp(cls,
mriot: pymrio.IOSystem,
exposure: Exposures,
impact: Impact,
affected_sectors: Iterable[str] | dict[str,float] | Literal["all"],
impact_distribution: dict[str,float]|pd.Series|None,
exp_value_col: str = "value",
):
def from_exp_imp(
cls,
mriot: pymrio.IOSystem,
exposure: Exposures,
impact: Impact,
shock_name: str,
affected_sectors: Iterable[str] | dict[str, float] | Literal["all"],
impact_distribution: dict[str, float] | pd.Series | None,
exp_value_col: str = "value",
):

mriot_type = get_mriot_type(mriot)
exp = translate_exp_to_regions(exposure, mriot_type=mriot_type)
exposed_assets = translate_exp_to_sectors(exp, affected_sectors=affected_sectors,mriot=mriot,value_col=exp_value_col)
return cls.from_assets_imp(mriot, exposed_assets, impact, affected_sectors, impact_distribution)
exposed_assets = translate_exp_to_sectors(
exp, affected_sectors=affected_sectors, mriot=mriot, value_col=exp_value_col
)
return cls.from_assets_imp(
mriot, exposed_assets, impact, shock_name, affected_sectors, impact_distribution
)

@classmethod
def from_assets_imp(cls,
mriot: pymrio.IOSystem,
exposed_assets: pd.Series,
impact: Impact,
affected_sectors: Iterable[str] | dict[str,float] | Literal["all"],
impact_distribution: dict[str,float]|pd.Series|None,
):
def from_assets_imp(
cls,
mriot: pymrio.IOSystem,
exposed_assets: pd.Series,
impact: Impact,
shock_name: str,
affected_sectors: Iterable[str] | dict[str, float] | Literal["all"],
impact_distribution: dict[str, float] | pd.Series | None,
):

mriot_type = get_mriot_type(mriot)
impacted_assets = impact.impact_at_reg()
impacted_assets = translate_reg_impact_to_mriot_regions(impacted_assets, mriot_type)
impacted_assets = translate_reg_impact_to_mriot_regions(
impacted_assets, mriot_type
)

if impact_distribution is None:
# Default uses production distribution across sectors, region.
impact_distribution = mriot.x.loc[pd.IndexSlice[impacted_assets.columns, affected_sectors], "indout"].groupby(level=0).transform(lambda x: x / x.sum())
impact_distribution = (
mriot.x.loc[
pd.IndexSlice[impacted_assets.columns, affected_sectors], "indout"
]
.groupby(level=0)
.transform(lambda x: x / x.sum())
)

if isinstance(impact_distribution, dict):
impact_distribution = pd.Series(impact_distribution)

if not isinstance(impact_distribution, pd.Series):
raise ValueError(f"impact_distribution could not be converted to a Series")

impacted_assets = distribute_reg_impact_to_sectors(impacted_assets, impact_distribution)
impacted_assets = distribute_reg_impact_to_sectors(
impacted_assets, impact_distribution
)

return cls(mriot, exposed_assets, impacted_assets)
return cls(mriot, exposed_assets, impacted_assets, shock_name)

@classmethod
def combine(cls, direct_shocks: list[DirectShock], kind:Literal["merge", "concat"] = "merge"):
# 1. Check that MRIOT name and assets are the same
cls._check_compatible(direct_shocks)
# 2. concat the impacted assets

# New name ? optional in arg ?

if kind == "merge":
imp_assets = [shock.impacted_assets for shock in direct_shocks]
if not all([set(imp_assets[0].columns) == set(df.columns) for df in imp_assets]):
raise ValueError("Provide DirectShock do not all share the same columns (i.e., region-sector), merging shock is not possible.")
if not all([set(imp_assets[0].index) == set(df.index) for df in imp_assets]):
raise ValueError("Provide DirectShock do not all share the same index (i.e., event_id), merging shocks is not possible.")
LOGGER.info("Merging direct shocks together. The resulting direct shock per event and per each region,sector will be the sum of the different direct shocks")
merged_imp_assets = reduce(lambda left, right: left.combine(right,sum), imp_assets)
return cls()

# concat case
# add a shock name index / keep original event_id
#

@staticmethod
def _check_compatible(direct_shocks: list[DirectShock]) -> None:
def _check_mriot_name(mriot_names:list[str]) -> bool:
return all(name == mriot_names[0] for name in mriot_names)

def _check_assets(assets: list[pd.Series]) -> bool:
return all( [pd.Series.equals(asset,assets[0]) for asset in assets] )

mriot_names, assets = zip(
*[(shock.mriot_name, shock.exposed_assets) for shock in direct_shocks]
)
if not _check_mriot_name(mriot_names):
raise ValueError("DirectShocks do not all have the same mriot_name.")
if not _check_assets(assets):
raise ValueError("DirectShocks do not all have the same exposed_assets.")

class IndirectCost:
def __init__(self, direct_shock: DirectShock, mriot:pymrio.IOSystem) -> None:

self.direct_shock=direct_shock
class IndirectCostModel:
def __init__(self, mriot: pymrio.IOSystem) -> None:
self.mriot = mriot

def shock_model_with(self, direct_shock: DirectShock | list[DirectShock]):
if isinstance(direct_shock, DirectShock):
direct_shock = [direct_shock]

def calc(self,
method:str):
def calc(self, method: str):
if method == "leontief":
demand = self.mriot.Y.sum(1)
shock = self.direct_shock.shock.reindex(columns=demand.index, fill_value=1.0)
shock = self.direct_shock.shock.reindex(
columns=demand.index, fill_value=1.0
)
degraded_demand = shock * demand

res = []
for event_id in shock.index:
changed_production = pymrio.calc_x_from_L(self.mriot.L, degraded_demand.loc[event_id])["indout"]
changed_production = pymrio.calc_x_from_L(
self.mriot.L, degraded_demand.loc[event_id]
)["indout"]
# If production was already at 0, sometimes new production becomes negative.
# This should not be significant.
changed_production.loc[changed_production < 0] = 0.0
res.append(changed_production)

return pd.DataFrame(res, index=shock.index)


class SupplyChain:
"""SupplyChain class.
Expand Down Expand Up @@ -233,7 +316,7 @@ def __init__(self, mriot):
self.supchain_imp = dict()

@classmethod
def from_mriot(
def init_mriot(
cls, mriot_type, mriot_year, mriot_dir=MRIOT_DIRECTORY, del_downloads=True
):
"""Download, parse and read WIOD16, EXIOBASE3, or OECD21 Multi-Regional
Expand Down Expand Up @@ -324,7 +407,7 @@ def from_mriot(
)
mriot.meta.change_meta("name", f"{mriot_type}-{mriot_year}")

return cls(mriot=mriot)
return mriot

def calc_shock_to_sectors(
self, exposure, impact, impacted_secs=None, shock_factor=None
Expand Down

0 comments on commit bebfcb6

Please sign in to comment.