Skip to content

Commit

Permalink
Add create_new_database_for_flows_with_missing_top_level_context
Browse files Browse the repository at this point in the history
  • Loading branch information
cmutel committed Sep 2, 2024
1 parent 7364f91 commit e773ca5
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 15 deletions.
107 changes: 94 additions & 13 deletions bw2io/importers/base_lci.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ParameterizedExchange,
ProjectParameter,
)
import randonneur as rd
import randonneur as rn

from ..errors import NonuniqueCode, StrategyError, WrongDatabase
from ..export.excel import write_lci_matching
Expand All @@ -25,6 +25,7 @@
link_iterable_by_fields,
link_technosphere_based_on_name_unit_location,
link_technosphere_by_activity_hash,
match_against_only_available_in_given_context_tree,
match_against_top_level_context,
normalize_units,
strip_biosphere_exc_locations,
Expand All @@ -33,6 +34,30 @@
from .base import ImportBase


EXCHANGE_SPECIFIC_KEYS = (
"amount",
"functional",
"loc",
"maximum",
"minimum",
"output",
"scale",
"shape",
"temporal_distribution",
"uncertainty type",
"uncertainty_type",
)


def _reformat_biosphere_exc_as_new_node(exc: dict, db_name: str) -> dict:
return {k: v for k, v in exc.items() if k not in EXCHANGE_SPECIFIC_KEYS} | {
"type": labels.biosphere_node_default,
"exchanges": [],
"database": db_name,
"code": activity_hash(exc),
}


class LCIImporter(ImportBase):
"""Base class for format-specific importers.
Expand Down Expand Up @@ -399,8 +424,8 @@ def match_database(
def match_database_against_top_level_context(
self,
other_db_name: str,
fields: Optional[List[str]] = None,
kinds: Optional[List[str]] = None,
fields: List[str] = ["name", "unit", "categories"],
kinds: List[str] = labels.biosphere_edge_types,
# randonneur_transformations: Optional[list] = None
) -> None:
"""
Expand Down Expand Up @@ -461,23 +486,79 @@ def match_database_against_only_available_in_given_context_tree(
)
)

def create_new_database_for_flows_with_missing_top_level_context(
self,
target_db_name: str,
placeholder_db_name: str,
fields: List[str] = ["name", "unit", "categories"],
kinds: List[str] = labels.biosphere_edge_types,
) -> None:
"""
Create proxy datasets for flows who have corresponding flows in another database, but not
with the given top-level context.
In other words, if we are trying to match `{'name': 'foo', 'categories': ['foo']}`, and
our corresponding database only has `{'name': 'foo', 'categories': ['bar']}`, then we can
create a placeholder dataset in a new database, as no amount of category manipulation will
result in a match in the given target database.
"""

def get_key(
obj: dict, fields: List[str], include_categories: bool = True
) -> tuple:
return tuple(
[obj.get(field) for field in fields]
+ ([tuple(obj["categories"])[0]] if include_categories else [])
)

if target_db_name not in databases:
raise StrategyError(f"Can't find target database {target_db_name}")
if "categories" not in fields:
raise StrategyError("`fields` must include `categories`")

placeholder = Database(placeholder_db_name)
if placeholder_db_name not in databases:
placeholder.register(
format=self.format,
comment=f"Database for unlinked biosphere flows with wrong top-level context from {self.db_name}. Generated by `bw2io` method `create_new_database_for_flows_with_missing_top_level_context`",
)

ffields = [field for field in fields if field != "categories"]
mapping = {
get_key(obj, ffields): obj.key
for obj in Database(target_db_name)
if obj.get("categories")
}
existence = {
get_key(obj, ffields, False)
for obj in Database(target_db_name)
if obj.get("categories")
}

for ds in self.data:
for exc in filter(
lambda x: "input" not in x and x.get("type") in kinds,
ds.get("exchanges", []),
):
if (
get_key(exc, ffields) not in mapping
and get_key(exc, ffields, False) in existence
):
node = placeholder.new_node(
**_reformat_biosphere_exc_as_new_node(exc, placeholder_db_name)
)
node.save()
exc["input"] = node.key

def create_new_biosphere(self, biosphere_name: str):
"""Create new biosphere database from unlinked biosphere flows in ``self.data``"""
if biosphere_name in databases:
raise ValueError(f"{biosphere_name} database already exists")

def reformat(exc):
return exc | {
"type": labels.biosphere_node_default,
"exchanges": [],
"database": biosphere_name,
"code": activity_hash(exc),
}

bio_data = {
(flow["database"], flow["code"]): flow
for flow in [
reformat(exc)
_reformat_biosphere_exc_as_new_node(exc, biosphere_name)
for ds in self.data
for exc in ds.get("exchanges", [])
if exc["type"] in labels.biosphere_edge_types and not exc.get("input")
Expand Down Expand Up @@ -579,7 +660,7 @@ def randonneur(
edges_label="exchanges",
verbose=verbose,
case_sensitive=case_sensitive,
)
),
)

def migrate(self, migration_name):
Expand Down
74 changes: 72 additions & 2 deletions tests/base_lci_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import numpy as np
import pytest
from bw2data import Database
from bw2data import Database, databases
from bw2data.parameters import *
from bw2data.tests import bw2test

from bw2io import ExcelImporter
from bw2io.errors import StrategyError
from bw2io.errors import NonuniqueCode, WrongDatabase
from bw2io.importers.base_lci import LCIImporter

Expand Down Expand Up @@ -695,3 +695,73 @@ def test_delete_pe_update_still_deletes():
== 1
)
assert ParameterizedExchange.get(group="h").formula == "6 + 7"


@bw2test
def test_create_new_database_for_flows_with_missing_top_level_context_new_database():
importer = LCIImporter("testcase")
importer.data = [
{
"exchanges": [
{
"type": "custom",
"name": "a",
"extra": True,
"unit": "b",
"categories": ("c", "d"),
},
{
"type": "custom",
"name": "wrong",
"extra": True,
"unit": "b",
"categories": ("c", "d"),
},
{
"type": "custom",
"name": "a",
"extra": True,
"unit": "b",
"categories": ("e", "c"),
},
]
}
]

with pytest.raises(StrategyError):
importer.create_new_database_for_flows_with_missing_top_level_context(
"missing",
"placeholder",
)

Database("matchable").write(
{
("matchable", "a"): {
"name": "a",
"unit": "b",
"extra": True,
"categories": ("e", "f"),
}
}
)

with pytest.raises(StrategyError):
importer.create_new_database_for_flows_with_missing_top_level_context(
"matchable", "placeholder", fields=["name", "unit"]
)

importer.create_new_database_for_flows_with_missing_top_level_context(
"matchable",
"placeholder",
fields=["categories", "name", "unit", "extra"],
kinds=["custom"],
)
assert "placeholder" in databases
placeholder = Database("placeholder")
assert len(placeholder) == 1
placeholder_node = list(placeholder)[0]
assert placeholder_node["name"] == "a"
assert placeholder_node["unit"] == "b"
assert placeholder_node["categories"] == ("c", "d")
assert importer.data[0]["exchanges"][0]["input"] == placeholder_node.key
assert not any("input" in exc for exc in importer.data[0]["exchanges"][1:])

0 comments on commit e773ca5

Please sign in to comment.