From e773ca514d61168ec0e68390ed82e01e2db5c831 Mon Sep 17 00:00:00 2001 From: Chris Mutel Date: Mon, 2 Sep 2024 08:36:08 +0200 Subject: [PATCH] Add `create_new_database_for_flows_with_missing_top_level_context` --- bw2io/importers/base_lci.py | 107 +++++++++++++++++++++++++++++++----- tests/base_lci_importer.py | 74 ++++++++++++++++++++++++- 2 files changed, 166 insertions(+), 15 deletions(-) diff --git a/bw2io/importers/base_lci.py b/bw2io/importers/base_lci.py index 44e4f0da..f1244ee4 100644 --- a/bw2io/importers/base_lci.py +++ b/bw2io/importers/base_lci.py @@ -13,7 +13,7 @@ ParameterizedExchange, ProjectParameter, ) -import randonneur as rd +import randonneur as rn from ..errors import NonuniqueCode, StrategyError, WrongDatabase from ..export.excel import write_lci_matching @@ -25,6 +25,7 @@ link_iterable_by_fields, link_technosphere_based_on_name_unit_location, link_technosphere_by_activity_hash, + match_against_only_available_in_given_context_tree, match_against_top_level_context, normalize_units, strip_biosphere_exc_locations, @@ -33,6 +34,30 @@ from .base import ImportBase +EXCHANGE_SPECIFIC_KEYS = ( + "amount", + "functional", + "loc", + "maximum", + "minimum", + "output", + "scale", + "shape", + "temporal_distribution", + "uncertainty type", + "uncertainty_type", +) + + +def _reformat_biosphere_exc_as_new_node(exc: dict, db_name: str) -> dict: + return {k: v for k, v in exc.items() if k not in EXCHANGE_SPECIFIC_KEYS} | { + "type": labels.biosphere_node_default, + "exchanges": [], + "database": db_name, + "code": activity_hash(exc), + } + + class LCIImporter(ImportBase): """Base class for format-specific importers. @@ -399,8 +424,8 @@ def match_database( def match_database_against_top_level_context( self, other_db_name: str, - fields: Optional[List[str]] = None, - kinds: Optional[List[str]] = None, + fields: List[str] = ["name", "unit", "categories"], + kinds: List[str] = labels.biosphere_edge_types, # randonneur_transformations: Optional[list] = None ) -> None: """ @@ -461,23 +486,79 @@ def match_database_against_only_available_in_given_context_tree( ) ) + def create_new_database_for_flows_with_missing_top_level_context( + self, + target_db_name: str, + placeholder_db_name: str, + fields: List[str] = ["name", "unit", "categories"], + kinds: List[str] = labels.biosphere_edge_types, + ) -> None: + """ + Create proxy datasets for flows who have corresponding flows in another database, but not + with the given top-level context. + + In other words, if we are trying to match `{'name': 'foo', 'categories': ['foo']}`, and + our corresponding database only has `{'name': 'foo', 'categories': ['bar']}`, then we can + create a placeholder dataset in a new database, as no amount of category manipulation will + result in a match in the given target database. + """ + + def get_key( + obj: dict, fields: List[str], include_categories: bool = True + ) -> tuple: + return tuple( + [obj.get(field) for field in fields] + + ([tuple(obj["categories"])[0]] if include_categories else []) + ) + + if target_db_name not in databases: + raise StrategyError(f"Can't find target database {target_db_name}") + if "categories" not in fields: + raise StrategyError("`fields` must include `categories`") + + placeholder = Database(placeholder_db_name) + if placeholder_db_name not in databases: + placeholder.register( + format=self.format, + comment=f"Database for unlinked biosphere flows with wrong top-level context from {self.db_name}. Generated by `bw2io` method `create_new_database_for_flows_with_missing_top_level_context`", + ) + + ffields = [field for field in fields if field != "categories"] + mapping = { + get_key(obj, ffields): obj.key + for obj in Database(target_db_name) + if obj.get("categories") + } + existence = { + get_key(obj, ffields, False) + for obj in Database(target_db_name) + if obj.get("categories") + } + + for ds in self.data: + for exc in filter( + lambda x: "input" not in x and x.get("type") in kinds, + ds.get("exchanges", []), + ): + if ( + get_key(exc, ffields) not in mapping + and get_key(exc, ffields, False) in existence + ): + node = placeholder.new_node( + **_reformat_biosphere_exc_as_new_node(exc, placeholder_db_name) + ) + node.save() + exc["input"] = node.key + def create_new_biosphere(self, biosphere_name: str): """Create new biosphere database from unlinked biosphere flows in ``self.data``""" if biosphere_name in databases: raise ValueError(f"{biosphere_name} database already exists") - def reformat(exc): - return exc | { - "type": labels.biosphere_node_default, - "exchanges": [], - "database": biosphere_name, - "code": activity_hash(exc), - } - bio_data = { (flow["database"], flow["code"]): flow for flow in [ - reformat(exc) + _reformat_biosphere_exc_as_new_node(exc, biosphere_name) for ds in self.data for exc in ds.get("exchanges", []) if exc["type"] in labels.biosphere_edge_types and not exc.get("input") @@ -579,7 +660,7 @@ def randonneur( edges_label="exchanges", verbose=verbose, case_sensitive=case_sensitive, - ) + ), ) def migrate(self, migration_name): diff --git a/tests/base_lci_importer.py b/tests/base_lci_importer.py index f6dfd526..0edb024f 100644 --- a/tests/base_lci_importer.py +++ b/tests/base_lci_importer.py @@ -2,11 +2,11 @@ import numpy as np import pytest -from bw2data import Database +from bw2data import Database, databases from bw2data.parameters import * from bw2data.tests import bw2test -from bw2io import ExcelImporter +from bw2io.errors import StrategyError from bw2io.errors import NonuniqueCode, WrongDatabase from bw2io.importers.base_lci import LCIImporter @@ -695,3 +695,73 @@ def test_delete_pe_update_still_deletes(): == 1 ) assert ParameterizedExchange.get(group="h").formula == "6 + 7" + + +@bw2test +def test_create_new_database_for_flows_with_missing_top_level_context_new_database(): + importer = LCIImporter("testcase") + importer.data = [ + { + "exchanges": [ + { + "type": "custom", + "name": "a", + "extra": True, + "unit": "b", + "categories": ("c", "d"), + }, + { + "type": "custom", + "name": "wrong", + "extra": True, + "unit": "b", + "categories": ("c", "d"), + }, + { + "type": "custom", + "name": "a", + "extra": True, + "unit": "b", + "categories": ("e", "c"), + }, + ] + } + ] + + with pytest.raises(StrategyError): + importer.create_new_database_for_flows_with_missing_top_level_context( + "missing", + "placeholder", + ) + + Database("matchable").write( + { + ("matchable", "a"): { + "name": "a", + "unit": "b", + "extra": True, + "categories": ("e", "f"), + } + } + ) + + with pytest.raises(StrategyError): + importer.create_new_database_for_flows_with_missing_top_level_context( + "matchable", "placeholder", fields=["name", "unit"] + ) + + importer.create_new_database_for_flows_with_missing_top_level_context( + "matchable", + "placeholder", + fields=["categories", "name", "unit", "extra"], + kinds=["custom"], + ) + assert "placeholder" in databases + placeholder = Database("placeholder") + assert len(placeholder) == 1 + placeholder_node = list(placeholder)[0] + assert placeholder_node["name"] == "a" + assert placeholder_node["unit"] == "b" + assert placeholder_node["categories"] == ("c", "d") + assert importer.data[0]["exchanges"][0]["input"] == placeholder_node.key + assert not any("input" in exc for exc in importer.data[0]["exchanges"][1:])