diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index c27c642790..4f75d11af0 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -7,7 +7,6 @@ on: branches: [ main ] env: - GAMS_VERSION: 25.1.1 # For setuptools-scm. With fetch --tags below, this ensures that enough # history is fetched to contain the latest tag, so that setuptools-scm can # generate the version number. Update: @@ -91,12 +90,10 @@ jobs: ${{ matrix.os }}-gams${{ env.GAMS_VERSION }}- ${{ matrix.os }}- - - name: Install GAMS - # Use the scripts from the checked-out ixmp repo - env: - CI_OS: ${{ matrix.os }} - run: ixmp/ci/install-gams.sh - shell: bash + - uses: iiasa/actions/setup-gams@main + with: + version: 25.1.1 + license: ${{ secrets.GAMS_LICENSE }} - name: Upgrade pip, wheel, setuptools-scm run: python -m pip install --upgrade pip wheel setuptools-scm diff --git a/doc/api/disutility.rst b/doc/api/disutility.rst new file mode 100644 index 0000000000..75edbde4b0 --- /dev/null +++ b/doc/api/disutility.rst @@ -0,0 +1,120 @@ +.. currentmodule:: message_ix_models.model.disutility + +Consumer disutility +******************* + +This module provides a generalized consumer disutility formulation, currently used by :mod:`message_data.model.transport`. +The formulation rests on the concept of “consumer groups”; each consumer group may have a distinct disutility associated with using the outputs of each technology. +A set of ‘pseudo-’/‘virtual’/non-physical “usage technologies” converts the outputs of the actual technologies into the commodities demanded by each group, while also requiring input of a costly “disutility” commodity. + + +Method & usage +============== + +Use this code by calling :func:`add`, which takes arguments that describe the concrete usage: + +Consumer groups + This is a list of :class:`.Code` objects describing the consumer groups. + The list must be 1-dimensional, but can be composed (as in :mod:`message_data.model.transport`) from multiple dimensions. + +Technologies + This is a list of :class:`.Code` objects describing the technologies for which the consumers in the different groups experience disutility. + Each object must be have 'input' and 'output' annotations (:attr:`.Code.anno`); each of these is a :class:`dict` with the keys 'commodity', 'input', and 'unit', describing the source or sink for the technology. + +Template + This is also a :class:`.Code` object, similar to those in ``technologies``; see below. + +The code creates a source technology for the “disutility” commodity. +The code does *not* perform the following step(s) needed to completely parametrize the formulation: + +- Set consumer group-specific ``demand`` parameter values for new commodities. +- Set the amounts of “disutility” commodities used as ``input`` to the new usage technologies. + +These must be parametrized based on the particular application. + +Detailed example +================ + +This example is similar to the one used in :func:`.test_disutility.test_minimal`: + +.. code-block:: python + + # Two consumer groups + groups = [Code(id="g0"), Code(id="g1")] + + # Two technologies, for which groups may have different disutilities. + techs = [Code(id="t0"), Code(id="t1")] + + # Add generalized disutility formulation to some technologies + disutility.add( + scenario, + groups=groups, + technologies=techs, + + template=Code( + # Template for IDs of conversion technologies + id="usage of {technology} by {group}", + + # Templates for inputs of conversion technologies + input=dict( + # Technology-specific output commodity + commodity="output of {technology}", + level="useful", + unit="kg", + ), + + # Templates for outputs of conversion technologies + output=dict( + # Consumer-group–specific demand commodity + commodity="demand of group {group}", + level="useful", + unit="kg", + ), + ), + **options, + ) + + +:func:`add` uses :func:`get_spec` to generate a specification that adds the following: + +- For the set ``commodity``: + + - The single element “disutility”. + - One element per `technologies`, using the `template` “input” annotation, e.g. “output of t0” generated from ``output of {technology}`` and the id “t0”. + These **may** already be present in the `scenario`; if not, the spec causes them to be added. + - One elements per `groups`, using the `template` “output” annotation, e.g. “demand of group g1” generated from ``demand of group {group}`` and the id “g1”. + These **may** already be present in the `scenario`; if not, the spec causes them to be added. + +- For the set ``technology``: + + - The single element “disutility source”. + - One element per each combination of disutility-affected technology (`technologies`) and consumer group (`groups`). + For example, “usage of t0 by g1” generated from ``usage of {technology} by {group}``, and the ids “t0” and “g1”. + +The spec is applied to the target scenario using :func:`.model.build.apply_spec`. +If the arguments produce a spec that is inconsistent with the target scenario, an exception will by raised at this point. + + +Next, :func:`add` uses :func:`data_conversion` and :func:`data_source` to generate: + +- ``output`` and ``var_cost`` parameter data for “disutility source”. + This technology outputs the unitless commodity “disutility” at a cost of 1.0 per unit. + +- ``input`` and ``output`` parameter data for the new usage technologies. + For example, the new technology “usage of t0 by g1”… + + - …takes input from the *technology-specific* commodity “output of t0”. + - …takes input from the common commodity “disutility”, in an amount specific to group “g1”. + - …outputs to a *group-specific* commodity “demand of group g1”. + +Note that the `technologies` towards which the groups have disutility are assumed to already be configured to ``output`` to the corresponding commodities. +For example, the technology “t0” outputs to the commodity “output of t0”; the ``output`` values for this technology are **not** added/introduced by :func:`add`. + + +Code reference +============== + +See also :mod:`message_ix_models.tests.model.test_disutility`. + +.. automodule:: message_ix_models.model.disutility + :members: diff --git a/doc/api/model-build.rst b/doc/api/model-build.rst index 35444a631a..cc3d6367cb 100644 --- a/doc/api/model-build.rst +++ b/doc/api/model-build.rst @@ -37,7 +37,7 @@ The following modules use this workflow and can be examples for developing simil Code reference ============== -.. currentmodule:: message_data.model.build +.. currentmodule:: message_ix_models.model.build -.. automodule:: message_data.model.build +.. automodule:: message_ix_models.model.build :members: diff --git a/doc/api/util.rst b/doc/api/util.rst index f84f747790..e7206b775f 100644 --- a/doc/api/util.rst +++ b/doc/api/util.rst @@ -19,10 +19,18 @@ Commonly used: .. autosummary:: as_codes + broadcast + copy_column + ffill load_package_data load_private_data + make_io + make_matched_dfs + make_source_tech + merge_data package_data_path private_data_path + same_node ~context.Context ~scenarioinfo.ScenarioInfo diff --git a/doc/index.rst b/doc/index.rst index 830c41d083..3e64193ab0 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -23,6 +23,7 @@ These models are built in the `MESSAGEix framework ` api/model api/model-bare api/model-build + api/disutility api/project api/tools api/util diff --git a/doc/whatsnew.rst b/doc/whatsnew.rst index ce515ceb84..1bedb1e34a 100644 --- a/doc/whatsnew.rst +++ b/doc/whatsnew.rst @@ -1,8 +1,10 @@ What's new ********** -.. Next release -.. ============ +Next release +============ + +- Add :mod:`.model.disutility`, code for setting up structure and data for generalized consumer disutility (:pull:`13`) 2021.3.24 ========= diff --git a/message_ix_models/model/disutility.py b/message_ix_models/model/disutility.py new file mode 100644 index 0000000000..afe3c74d75 --- /dev/null +++ b/message_ix_models/model/disutility.py @@ -0,0 +1,236 @@ +import logging +from collections import defaultdict +from functools import partial +from itertools import product +from typing import Dict, List, Mapping, Sequence, Union + +import message_ix +import pandas as pd +from sdmx.model import Annotation, Code + +from message_ix_models import ScenarioInfo +from message_ix_models.model.build import apply_spec +from message_ix_models.util import ( + broadcast, + eval_anno, + make_io, + make_matched_dfs, + make_source_tech, + merge_data, + same_node, +) + +log = logging.getLogger(__name__) + +CodeLike = Union[str, Code] + + +def add( + scenario: message_ix.Scenario, + groups: Sequence[Code], + technologies: Sequence[Code], + template: Code, + **options, +) -> None: + """Add disutility formulation to `scenario`.""" + # Generate the spec given the configuration options + spec = get_spec(groups, technologies, template) + + # Apply spec and add data + apply_spec(scenario, spec, partial(get_data, spec=spec), **options) + + +def get_spec( + groups: Sequence[Code], + technologies: Sequence[Code], + template: Code, +) -> Dict[str, ScenarioInfo]: + """Get a spec for a disutility formulation. + + Parameters + ---------- + groups : list of Code + Identities of the consumer groups with distinct disutilities. + technologies : list of Code + The technologies to which the disutilities are applied. + template : .Code + + """ + require = ScenarioInfo() + remove = ScenarioInfo() + add = ScenarioInfo() + + require.set["technology"].extend(technologies) + + # Disutility commodity and source + add.set["commodity"] = [Code(id="disutility")] + add.set["technology"] = [Code(id="disutility source")] + + # Disutility is unitless + add.set["unit"].append("") + + # Add conversion technologies + for t, g in product(technologies, groups): + # String formatting arguments + fmt = dict(technology=t, group=g) + + # Format each field in the "input" and "output" annotations + input = {k: v.format(**fmt) for k, v in eval_anno(template, id="input").items()} + output = { + k: v.format(**fmt) for k, v in eval_anno(template, id="output").items() + } + + # - Format the ID string from the template + # - Copy the "output" annotation without modification + t_code = Code( + id=template.id.format(**fmt), + annotations=[ + Annotation(id="input", text=repr(input)), + Annotation(id="output", text=repr(output)), + ], + ) + + # "commodity" set elements to add + add.set["commodity"].extend([input["commodity"], output["commodity"]]) + + # "technology" set elements to add + t_code.annotations.append(Annotation(id="input", text=repr(input))) + add.set["technology"].append(t_code) + + # Deduplicate "commodity" set elements + add.set["commodity"] = sorted(map(str, set(add.set["commodity"]))) + + return dict(require=require, remove=remove, add=add) + + +def get_data(scenario, spec, **kwargs) -> Mapping[str, pd.DataFrame]: + """Get data for disutility formulation. + + Calls :meth:`data_conversion` and :meth:`data_source`. + + Parameters + ---------- + spec : dict + The output of :meth:`get_spec`. + """ + if len(kwargs): + log.warning(f"Ignore {repr(kwargs)}") + + info = ScenarioInfo(scenario) + + # Get conversion technology data + data = data_conversion(info, spec) + + # Get and append source data + merge_data(data, data_source(info, spec)) + + return data + + +def dp_for(col_name: str, info: ScenarioInfo) -> pd.Series: # pragma: no cover + """:meth:`pandas.DataFrame.assign` helper for ``duration_period``. + + Returns a callable to be passed to :meth:`pandas.DataFrame.assign`. The callable + takes a data frame as the first argument, and returns a :class:`pandas.Series` + based on the ``duration_period`` parameter in `info`, aligned to `col_name` in the + data frame. + + Currently (2021-04-07) unused. + """ + + def func(df): + return df.merge(info.par["duration_period"], left_on=col_name, right_on="year")[ + "value_y" + ] + + return func + + +def data_conversion(info, spec) -> Mapping[str, pd.DataFrame]: + """Generate input and output data for disutility conversion technologies.""" + common = dict( + mode="all", + year_vtg=info.Y, + year_act=info.Y, + # No subannual detail + time="year", + time_origin="year", + time_dest="year", + ) + + # Use the spec to retrieve information + technology = spec["add"].set["technology"] + + # Data to return + data0: Mapping[str, List[pd.DataFrame]] = defaultdict(list) + + # Loop over conversion technologies + for t in technology: + # Use the annotations on the technology Code to get information about the + # commodity, level, and unit + input = eval_anno(t, "input") + output = eval_anno(t, "output") + if None in (input, output): + if t.id == "disutility source": + continue # Data for this tech is from data_source() + else: # pragma: no cover + raise ValueError(t) # Error in user input + + # Make input and output data frames + i_o = make_io( + (input["commodity"], input["level"], input["unit"]), + (output["commodity"], output["level"], output["unit"]), + 1.0, + on="output", + technology=t.id, + **common, + ) + for par, df in i_o.items(): + # Broadcast across nodes + df = df.pipe(broadcast, node_loc=info.N[1:]).pipe(same_node) + + if par == "input": + # Add input of disutility + df = pd.concat( + [df, df.assign(commodity="disutility", unit="")], ignore_index=True + ) + + data0[par].append(df) + + # Concatenate to a single data frame per parameter + data = {par: pd.concat(dfs, ignore_index=True) for par, dfs in data0.items()} + + # Create data for capacity_factor + data.update(make_matched_dfs(base=data["input"], capacity_factor=1.0)) + + return data + + +def data_source(info, spec) -> Mapping[str, pd.DataFrame]: + """Generate data for a technology that emits the “disutility” commodity.""" + # List of input levels where disutility commodity must exist + levels = set() + for t in spec["add"].set["technology"]: + input = eval_anno(t, "input") + if input: + levels.add(input["level"]) + + log.info(f"Generate disutility on level(s): {repr(levels)}") + + # Use default capacity_factor = 1.0 + result = make_source_tech( + info, + common=dict( + commodity="disutility", + mode="all", + technology="disutility source", + time="year", + time_dest="year", + unit="-", + ), + output=1.0, + var_cost=1.0, + ) + result["output"] = result["output"].pipe(broadcast, level=sorted(levels)) + + return result diff --git a/message_ix_models/testing.py b/message_ix_models/testing.py index dc47615c53..558e6c338b 100644 --- a/message_ix_models/testing.py +++ b/message_ix_models/testing.py @@ -127,7 +127,8 @@ def assert_exit_0(self, *args, **kwargs): self.invoke(*args, **kwargs) if self.last_result.exit_code != 0: - raise self.last_result.exc_info[1] + # Re-raise the exception triggered within the CLI invocation + raise self.last_result.exc_info[1].__context__ return self.last_result @@ -157,8 +158,9 @@ def bare_res(request, context: Context, solved: bool = False) -> message_ix.Scen Parameters ---------- - request : .Request - The pytest :fixture:`pytest:request` fixture. + request : .Request or None + The pytest :fixture:`pytest:request` fixture. If provided the pytest test node + name is used for the scenario name of the returned Scenario. context : .Context Passed to :func:`.testing.bare_res`. solved : bool, optional @@ -188,5 +190,10 @@ def bare_res(request, context: Context, solved: bool = False) -> message_ix.Scen log.info("Solve") base.solve(solve_options=dict(lpmethod=4), quiet=True) - log.info(f"Clone to '{name}/{request.node.name}'") - return base.clone(scenario=request.node.name, keep_solution=solved) + try: + new_name = request.node.name + except AttributeError: + new_name = "baseline" + + log.info(f"Clone to '{name}/{new_name}'") + return base.clone(scenario=new_name, keep_solution=solved) diff --git a/message_ix_models/tests/model/test_disutility.py b/message_ix_models/tests/model/test_disutility.py new file mode 100644 index 0000000000..41f1cb7cec --- /dev/null +++ b/message_ix_models/tests/model/test_disutility.py @@ -0,0 +1,268 @@ +"""Tests of :mod:`.model.disutility`.""" +from itertools import product + +import pandas as pd +import pandas.testing as pdt +import pytest +from message_ix import make_df +from sdmx.model import Annotation, Code + +from message_ix_models import ScenarioInfo, testing +from message_ix_models.model import disutility +from message_ix_models.util import ( + add_par_data, + copy_column, + make_source_tech, + merge_data, +) + +# Common data and fixtures for test_minimal() and other tests + +COMMON = dict( + level="useful", + mode="all", + node_dest="R14_AFR", + node_loc="R14_AFR", + node_origin="R14_AFR", + node="R14_AFR", + time_dest="year", + time_origin="year", + time="year", + unit="kg", +) + + +@pytest.fixture +def groups(): + """Fixture: list of 2 consumer groups.""" + yield [Code(id="g0"), Code(id="g1")] + + +@pytest.fixture +def techs(): + """Fixture: list of 2 technologies for which groups can have disutility.""" + yield [Code(id="t0"), Code(id="t1")] + + +@pytest.fixture +def template(): + """Fixture: :class:.`Code` with annotations, for :func:`.disutility.get_spec`.""" + # Template for inputs of conversion technologies, from a technology-specific + # commodity + input = dict(commodity="output of {technology}", level="useful", unit="kg") + + # Template for outputs of conversion technologies, to a group–specific demand + # commodity + output = dict(commodity="demand of group {group}", level="useful", unit="kg") + + # Code's ID is itself a template for IDs of conversion technologies + yield Code( + id="usage of {technology} by {group}", + annotations=[ + Annotation(id="input", text=repr(input)), + Annotation(id="output", text=repr(output)), + ], + ) + + +@pytest.fixture +def spec(groups, techs, template): + """Fixture: a prepared spec for the minimal test case.""" + yield disutility.get_spec(groups, techs, template) + + +@pytest.fixture +def scenario(request, test_context, techs): + """Fixture: a :class:`.Scenario` with technologies given by :func:`techs`.""" + s = testing.bare_res(request, test_context, solved=False) + s.check_out() + + s.add_set("technology", ["t0", "t1"]) + + s.commit("Test fixture for .model.disutility") + yield s + + +def test_add(scenario, groups, techs, template): + """:func:`.disutility.add` runs on the bare RES; the result solves.""" + disutility.add(scenario, groups, techs, template) + + # Scenario solves (no demand) + scenario.solve(quiet=True) + assert (scenario.var("ACT")["lvl"] == 0).all() + + +def minimal_test_data(scenario): + """Generate data for :func:`test_minimal`.""" + common = COMMON.copy() + common.pop("node_loc") + common.update(dict(mode="all")) + + data = dict() + + info = ScenarioInfo(scenario) + y0 = info.Y[0] + y1 = info.Y[1] + + # Output from t0 and t1 + for t in ("t0", "t1"): + common.update(dict(technology=t, commodity=f"output of {t}")) + merge_data(data, make_source_tech(info, common, output=1.0, var_cost=1.0)) + + # Disutility input for each combination of (tech) × (group) × (2 years) + input_data = pd.DataFrame( + [ + ["usage of t0 by g0", y0, 0.1], + ["usage of t0 by g0", y1, 0.1], + ["usage of t1 by g0", y0, 0.1], + ["usage of t1 by g0", y1, 0.1], + ["usage of t0 by g1", y0, 0.1], + ["usage of t0 by g1", y1, 0.1], + ["usage of t1 by g1", y0, 0.1], + ["usage of t1 by g1", y1, 0.1], + ], + columns=["technology", "year_vtg", "value"], + ) + data["input"] = make_df( + "input", **input_data, commodity="disutility", **COMMON + ).assign(node_origin=copy_column("node_loc"), year_act=copy_column("year_vtg")) + + # Demand + c, y = zip(*product(["demand of group g0", "demand of group g1"], [y0, y1])) + data["demand"] = make_df("demand", commodity=c, year=y, value=1.0, **COMMON) + + # Constraint on activity in the first period + t = sorted(input_data["technology"].unique()) + for bound in ("lo", "up"): + par = f"bound_activity_{bound}" + data[par] = make_df(par, value=0.5, technology=t, year_act=y0, **COMMON) + + # Constraint on activity growth + annual = (1.1 ** (1.0 / 5.0)) - 1.0 + for bound, factor in (("lo", -1.0), ("up", 1.0)): + par = f"growth_activity_{bound}" + data[par] = make_df( + par, value=factor * annual, technology=t, year_act=y1, **COMMON + ) + + return data, y0, y1 + + +def test_minimal(scenario, groups, techs, template): + """Expected results are generated from a minimal test case.""" + # Set up structure + disutility.add(scenario, groups, techs, template) + + # Add test-specific data + data, y0, y1 = minimal_test_data(scenario) + + scenario.check_out() + add_par_data(scenario, data) + scenario.commit("Disutility test 1") + + # commented: pre-solve debugging output + # for par in ("input", "output", "technical_lifetime", "var_cost"): + # scenario.par(par).to_csv(f"debug-{par}.csv") + + scenario.solve(quiet=True) + + # Helper function to retrieve ACT data and condense for inspection + def get_act(s): + result = ( + scenario.var("ACT") + .query("lvl > 0") + .drop(columns=["node_loc", "mode", "time", "mrg"]) + .sort_values(["year_vtg", "technology"]) + .reset_index(drop=True) + ) + # No "stray" activity of technologies beyond the vintage periods + pdt.assert_series_equal( + result["year_act"], result["year_vtg"], check_names=False + ) + result = result.drop(columns=["year_vtg"]).set_index(["technology", "year_act"]) + # Return the activity and its inter-period delta + return result, ( + result.xs(y1, level="year_act") - result.xs(y0, level="year_act") + ) + + # Post-solve debugging output TODO comment before merging + ACT1, ACT1_delta = get_act(scenario) + + # Increase the disutility of for t0 for g0 in period y1 + data["input"].loc[1, "value"] = 0.2 + + # Re-solve + scenario.remove_solution() + scenario.check_out() + scenario.add_par("input", data["input"]) + scenario.commit("Disutility test 2") + scenario.solve(quiet=True) + + # Compare activity + ACT2, ACT2_delta = get_act(scenario) + + merged = ACT1.merge(ACT2, left_index=True, right_index=True) + merged["lvl_diff"] = merged["lvl_y"] - merged["lvl_x"] + + merged_delta = ACT1_delta.merge(ACT2_delta, left_index=True, right_index=True) + + # commented: for debugging + # print(merged, merged_delta) + + # Group g0 decreases usage of t0, and increases usage of t1, in period y1 vs. y0 + assert merged_delta.loc["usage of t0 by g0", "lvl_y"] < 0 + assert merged_delta.loc["usage of t1 by g0", "lvl_y"] > 0 + + # Group g0 usage of t0 is lower when the disutility is higher + assert merged.loc[("usage of t0 by g0", y1), "lvl_diff"] < 0 + # Group g0 usage of t1 is correspondingly higher + assert merged.loc[("usage of t1 by g0", y1), "lvl_diff"] > 0 + + +def test_data_conversion(scenario, spec): + """:func:`~.disutility.data_conversion` runs.""" + info = ScenarioInfo(scenario) + disutility.data_conversion(info, spec) + + +def test_data_source(scenario, spec): + """:func:`~.disutility.data_source` runs.""" + info = ScenarioInfo(scenario) + disutility.data_source(info, spec) + + +def test_get_data(scenario, spec): + """:func:`~.disutility.get_data` runs.""" + disutility.get_data(scenario, spec) + + +def test_get_spec(groups, techs, template): + """:func:`~.disutility.get_spec` runs and produces expected output.""" + spec = disutility.get_spec(groups, techs, template) + + # Spec requires the existence of the base technologies + assert {"technology"} == set(spec["require"].set.keys()) + assert techs == spec["require"].set["technology"] + + # Spec removes nothing + assert set() == set(spec["remove"].set.keys()) + + # Spec adds the "disutility" commodity; and adds (if not existing) the output + # commodities for t[01] and demand commodities for g[01] + assert { + "disutility", + "output of t0", + "output of t1", + "demand of group g0", + "demand of group g1", + } == set(map(str, spec["add"].set["commodity"])) + + # Spec adds the "distuility source" technology, and "usage of {tech} by {group}" + # for each tech × group, per the template + assert { + "disutility source", + "usage of t0 by g0", + "usage of t0 by g1", + "usage of t1 by g0", + "usage of t1 by g1", + } == set(map(str, spec["add"].set["technology"])) diff --git a/message_ix_models/tests/test_testing.py b/message_ix_models/tests/test_testing.py index 369de59ad0..a6d5ec05f7 100644 --- a/message_ix_models/tests/test_testing.py +++ b/message_ix_models/tests/test_testing.py @@ -1,6 +1,14 @@ +import click +import pytest + from message_ix_models.testing import bare_res +def test_bare_res_no_request(test_context): + """:func:`bare_res` works with `request` = :obj:`None`.""" + bare_res(None, test_context, solved=False) + + def test_bare_res_solved(request, test_context): """:func:`bare_res` works with `solve` = :obj:`True`. @@ -8,3 +16,8 @@ def test_bare_res_solved(request, test_context): test. """ bare_res(request, test_context, solved=True) + + +def test_cli_runner(mix_models_cli): + with pytest.raises(click.exceptions.UsageError, match="No such command 'foo'"): + mix_models_cli.assert_exit_0(["foo", "bar"]) diff --git a/message_ix_models/tests/test_util.py b/message_ix_models/tests/test_util.py index 426b54a03c..39ad4b4186 100644 --- a/message_ix_models/tests/test_util.py +++ b/message_ix_models/tests/test_util.py @@ -1,16 +1,24 @@ """Tests of :mod:`message_ix_models.util`.""" import logging +import re from pathlib import Path +import pandas as pd import pytest +from message_ix import make_df +from message_ix_models import ScenarioInfo from message_ix_models.util import ( MESSAGE_DATA_PATH, MESSAGE_MODELS_PATH, as_codes, + broadcast, + copy_column, + ffill, iter_parameters, load_package_data, load_private_data, + make_source_tech, package_data_path, private_data_path, ) @@ -28,6 +36,16 @@ def test_as_codes(): assert result[1] not in result[0].child +def test_broadcast(caplog): + # Debug message logged with length-0 values + with caplog.at_level(logging.DEBUG, logger="message_ix_models"): + broadcast(pd.DataFrame(columns=["foo", "bar"]), foo=[], bar=[]) + + assert "Don't broadcast over 'foo'; labels [] have length 0" in caplog.messages + + # TODO expand + + @pytest.mark.parametrize( "data", ( @@ -42,6 +60,41 @@ def test_as_codes_invalid(data): as_codes(data) +def test_copy_column(): + df = pd.DataFrame([[0, 1], [2, 3]], columns=["a", "b"]) + df = df.assign(c=copy_column("a"), d=4) + assert all(df["c"] == [0, 2]) + assert all(df["d"] == 4) + + +def test_ffill(): + years = list(range(6)) + + df = ( + make_df( + "fix_cost", + year_act=[0, 2, 4], + year_vtg=[0, 2, 4], + technology=["foo", "bar", "baz"], + unit="USD", + ) + .pipe(broadcast, node_loc=["A", "B", "C"]) + .assign(value=list(map(float, range(9)))) + ) + + # Function completes + result = ffill(df, "year_vtg", years, "year_act = year_vtg") + + assert 2 * len(df) == len(result) + assert years == sorted(result["year_vtg"].unique()) + + # Cannot ffill on "value" and "unit" dimensions + with pytest.raises(ValueError, match="value"): + ffill(df, "value", []) + + # TODO test some specific values + + def test_iter_parameters(test_context): """Parameters indexed by set 'node' can be retrieved.""" result = list(iter_parameters("node")) @@ -79,6 +132,46 @@ def test_load_private_data(*parts, suffix=None): load_private_data("sources.yaml") +def test_make_source_tech(): + info = ScenarioInfo() + info.set["node"] = ["World", "node0", "node1"] + info.set["year"] = [1, 2, 3] + + values = dict( + capacity_factor=1.0, + output=2.0, + var_cost=3.0, + technical_lifetime=4.0, + ) + common = dict( + commodity="commodity", + level="level", + mode="mode", + technology="technology", + time="time", + time_dest="time", + unit="unit", + ) + # Code runs + result = make_source_tech(info, common, **values) + # Result is dictionary with the expected keys + assert isinstance(result, dict) + assert set(result.keys()) == set(values.keys()) + + # "World" node does not appear in results + assert set(result["output"]["node_loc"].unique()) == set(info.N[1:]) + + for df in result.values(): + # Results have 2 nodes × 3 years + assert len(df) == 2 * 3 + # No empty values + assert not df.isna().any(None) + + del values["var_cost"] + with pytest.raises(ValueError, match=re.escape("needs values for {'var_cost'}")): + make_source_tech(info, common, **values) + + def test_package_data_path(*parts, suffix=None): assert MESSAGE_MODELS_PATH.joinpath("data", "foo", "bar") == package_data_path( "foo", "bar" diff --git a/message_ix_models/util/__init__.py b/message_ix_models/util/__init__.py index 265d618a29..d2e8bc608c 100644 --- a/message_ix_models/util/__init__.py +++ b/message_ix_models/util/__init__.py @@ -1,7 +1,8 @@ import logging +from collections import defaultdict from copy import copy from pathlib import Path -from typing import Any, Dict, List, Mapping, Optional, Union, cast +from typing import Any, Dict, List, Mapping, Optional, Sequence, Union, cast import message_ix import pandas as pd @@ -129,6 +130,49 @@ def as_codes(data: Union[List[str], Dict[str, Dict]]) -> List[Code]: return list(result.values()) +def broadcast(df, **kwargs): + """Fill missing data in `df` by broadcasting. + + Arguments + --------- + kwargs + Keys are dimensions. Values are labels along that dimension to fill. + """ + for dim, levels in kwargs.items(): + # Checks + assert df[dim].isna().all(), f"Dimension {dim} was not empty\n\n{df.head()}" + if len(levels) == 0: + log.debug( + f"Don't broadcast over {repr(dim)}; labels {levels} have length 0" + ) + continue + + # - Duplicate the data + # - Drop the existing column named 'dim' + # - Re-add the column from the constructed MultiIndex + # - Reindex for sequential row numbers + df = ( + pd.concat([df] * len(levels), keys=levels, names=[dim]) + .drop(dim, axis=1) + .reset_index(dim) + .reset_index(drop=True) + ) + return df + + +def copy_column(column_name): + """For use with :meth:`pandas.DataFrame.assign`. + + Examples + -------- + Modify `df` by filling the column 'baz' with the value ``3``, and copying the column + 'bar' into column 'foo'. + + >>> df.assign(foo=copy_column('bar'), baz=3) + """ + return lambda df: df[column_name] + + def eval_anno(obj: AnnotableArtefact, id: str): """Retrieve the annotation `id` from `obj`, run :func:`eval` on its contents. @@ -150,6 +194,48 @@ def eval_anno(obj: AnnotableArtefact, id: str): return value +def ffill( + df: pd.DataFrame, dim: str, values: Sequence[Union[str, Code]], expr: str = None +) -> pd.DataFrame: + """Forward-fill `df` on `dim` to cover `values`. + + Parameters + ---------- + df : .DataFrame + Data to fill forwards. + dim : str + Dimension to fill along. Must be a column in `df`. + values : list of str + Labels along `dim` that must be present in the returned data frame. + expr : str, optional + If provided, :meth:`.DataFrame.eval` is called. This can be used to assign one + column to another. For instance, if `dim` == "year_vtg" and `expr` is "year_act + = year_vtg", then forward filling is performed along the "year_vtg" dimension/ + column, and then the filled values are copied to the "year_act" column. + """ + if dim in ("value", "unit"): + raise ValueError(dim) + + # Mapping from (values existing in `df`) -> equal or greater members of `values` + mapping = defaultdict(set) + last_seen = None + for v in sorted(set(values) | set(df[dim].unique())): + if v in df[dim].unique(): + last_seen = v + mapping[last_seen].add(v) + + def _maybe_eval(df): + return df.eval(expr) if expr is not None else df + + dfs = [df] + for key, group_df in df.groupby(dim): + for new_label in sorted(mapping[key])[1:]: + # Duplicate the data; assign the new_label to `dim` + dfs.append(group_df.assign(**{dim: new_label}).pipe(_maybe_eval)) + + return pd.concat(dfs, ignore_index=True) + + def iter_parameters(set_name): """Iterate over MESSAGEix parameters with *set_name* as a dimension. @@ -261,6 +347,131 @@ def load_private_data(*parts: str) -> Mapping: # pragma: no cover (needs messag return _load(PRIVATE_DATA, MESSAGE_DATA_PATH / "data", *parts) +def make_io(src, dest, efficiency, on="input", **kwargs): + """Return input and output data frames for a 1-to-1 technology. + + Parameters + ---------- + src : tuple (str, str, str) + Input commodity, level, unit. + dest : tuple (str, str, str) + Output commodity, level, unit. + efficiency : float + Conversion efficiency. + on : 'input' or 'output' + If 'input', `efficiency` applies to the input, and the output, thus the activity + level of the technology, is in dest[2] units. If 'output', the opposite. + kwargs + Passed to :func:`make_df`. + + Returns + ------- + dict (str -> pd.DataFrame) + Keys are 'input' and 'output'; values are data frames. + """ + return dict( + input=message_ix.make_df( + "input", + commodity=src[0], + level=src[1], + unit=src[2], + value=efficiency if on == "input" else 1.0, + **kwargs, + ), + output=message_ix.make_df( + "output", + commodity=dest[0], + level=dest[1], + unit=dest[2], + value=1.0 if on == "input" else efficiency, + **kwargs, + ), + ) + + +def make_matched_dfs(base, **par_value): + """Return data frames derived from *base* for multiple parameters. + + *par_values* maps from parameter names (e.g. 'fix_cost') to values. + make_matched_dfs returns a :class:`dict` of :class:`pandas.DataFrame`, one for each + parameter in *par_value*. The contents of *base* are used to populate the columns + of each data frame, and the values of *par_value* overwrite the 'value' column. + Duplicates—which occur when the target parameter has fewer dimensions than + *base*—are dropped. + + Examples + -------- + >>> input = make_df('input', ...) + >>> cf_tl = make_matched_dfs( + >>> input, + >>> capacity_factor=1, + >>> technical_lifetime=1, + >>> ) + """ + data = {col: v for col, v in base.iteritems() if col != "value"} + return { + par: message_ix.make_df(par, **data, value=value) + .drop_duplicates() + .reset_index(drop=True) + for par, value in par_value.items() + } + + +def make_source_tech(info, common, **values) -> Dict[str, pd.DataFrame]: + """Return parameter data for a ‘source’ technology. + + The technology has no inputs; its output commodity and/or level are determined by + `common`; either single values, or :obj:`None` if the result will be + :meth:`~DataFrame.pipe`'d through :func:`broadcast`. + + Parameters + ---------- + info : ScenarioInfo + common : dict + Passed to :func:`make_df`. + **values + Values for 'capacity_factor' (optional; default 1.0), 'output', 'var_cost', and + optionally 'technical_lifetime'. + + Returns + ------- + dict + Suitable for :func:`add_par_data`. + """ + # Check arguments + values.setdefault("capacity_factor", 1.0) + missing = {"capacity_factor", "output", "var_cost"} - set(values.keys()) + if len(missing): + raise ValueError(f"make_source_tech() needs values for {repr(missing)}") + elif "technical_lifetime" not in values: + log.debug("No technical_lifetime for source technology") + + # Create data for "output" + result = dict( + output=message_ix.make_df( + "output", + value=values.pop("output"), + year_act=info.Y, + year_vtg=info.Y, + **common, + ) + .pipe(broadcast, node_loc=info.N[1:]) + .pipe(same_node) + ) + + # Add data for other parameters + result.update(make_matched_dfs(base=result["output"], **values)) + + return result + + +def merge_data(base, *others): + """Merge dictionaries of DataFrames together into `base`.""" + for other in others: + for par, df in other.items(): + base[par] = base[par].append(df) if par in base else df + + def package_data_path(*parts) -> Path: """Construct a path to a file under :file:`message_ix_models/data/`.""" return _make_path(MESSAGE_MODELS_PATH / "data", *parts) @@ -271,6 +482,12 @@ def private_data_path(*parts) -> Path: # pragma: no cover (needs message_data) return _make_path(cast(Path, MESSAGE_DATA_PATH) / "data", *parts) +def same_node(df): + """Fill 'node_origin'/'node_dest' in `df` from 'node_loc'.""" + cols = list(set(df.columns) & {"node_origin", "node_dest"}) + return df.assign(**{c: copy_column("node_loc") for c in cols}) + + def strip_par_data( scenario, set_name, value, dry_run=False, dump: Dict[str, pd.DataFrame] = None ): @@ -290,11 +507,12 @@ def strip_par_data( # Iterate over parameters with ≥1 dimensions indexed by `set_name` for par_name in iter_parameters(set_name): - if par_name not in par_list: - raise RuntimeError( # pragma: no cover + if par_name not in par_list: # pragma: no cover + log.warning( f"MESSAGEix parameter {repr(par_name)} missing in Scenario " f"{scenario.model}/{scenario.scenario}" ) + continue # Iterate over dimensions indexed by `set_name` for dim, _ in filter( diff --git a/message_ix_models/util/scenarioinfo.py b/message_ix_models/util/scenarioinfo.py index b347d8e865..b39cb27b29 100644 --- a/message_ix_models/util/scenarioinfo.py +++ b/message_ix_models/util/scenarioinfo.py @@ -64,6 +64,9 @@ def __init__(self, scenario=None): except AttributeError: continue # pd.DataFrame for ≥2-D set; don't convert + for name in ("duration_period",): + self.par[name] = scenario.par(name) + self.is_message_macro = "PRICE_COMMODITY" in scenario.par_list() # Computed once