Skip to content

Commit

Permalink
Merge pull request #24 from mrognlie/develop-bence
Browse files Browse the repository at this point in the history
DisContBlock & Remap
  • Loading branch information
bbardoczy authored Jul 26, 2021
2 parents 82e1659 + a1b8851 commit 5ed6eb8
Show file tree
Hide file tree
Showing 23 changed files with 1,392 additions and 170 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
__pycache__/
.ipynb_checkpoints/
.DS_Store
.vscode
.vscode/
.pytest_cache/

*.zip

Expand Down
26 changes: 17 additions & 9 deletions src/sequence_jacobian/blocks/combined_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from ..steady_state.drivers import eval_block_ss
from ..steady_state.support import provide_solver_default
from ..jacobian.classes import JacobianDict
from ..steady_state.classes import SteadyStateDict
from .support.bijection import Bijection


def combine(blocks, name="", model_alias=False):
Expand All @@ -33,6 +35,7 @@ def __init__(self, blocks, name="", model_alias=False):
self._sorted_indices = utils.graph.block_sort(blocks)
self._required = utils.graph.find_outputs_that_are_intermediate_inputs(blocks)
self.blocks = [self._blocks_unsorted[i] for i in self._sorted_indices]
self.M = Bijection({})

if not name:
self.name = f"{self.blocks[0].name}_to_{self.blocks[-1].name}_combined"
Expand All @@ -55,20 +58,25 @@ def __repr__(self):
else:
return f"<CombinedBlock '{self.name}'>"

def steady_state(self, calibration, helper_blocks=None, **kwargs):
def _steady_state(self, calibration, helper_blocks=None, **kwargs):
"""Evaluate a partial equilibrium steady state of the CombinedBlock given a `calibration`"""
if helper_blocks is None:
helper_blocks = []

topsorted = utils.graph.block_sort(self.blocks, calibration=calibration, helper_blocks=helper_blocks)
blocks_all = self.blocks + helper_blocks

ss_partial_eq = deepcopy(calibration)
ss_partial_eq_toplevel = deepcopy(calibration)
ss_partial_eq_internal = {}
for i in topsorted:
ss_partial_eq.update(eval_block_ss(blocks_all[i], ss_partial_eq, **kwargs))
return ss_partial_eq

def impulse_nonlinear(self, ss, exogenous, **kwargs):
outputs = eval_block_ss(blocks_all[i], ss_partial_eq_toplevel, **kwargs)
ss_partial_eq_toplevel.update(outputs.toplevel)
if outputs.internal:
ss_partial_eq_internal.update(outputs.internal)
ss_partial_eq_internal = {self.name: ss_partial_eq_internal} if ss_partial_eq_internal else {}
return SteadyStateDict(ss_partial_eq_toplevel, internal=ss_partial_eq_internal)

def _impulse_nonlinear(self, ss, exogenous, **kwargs):
"""Calculate a partial equilibrium, non-linear impulse response to a set of `exogenous` shocks from
a steady state, `ss`"""
irf_nonlin_partial_eq = deepcopy(exogenous)
Expand All @@ -80,7 +88,7 @@ def impulse_nonlinear(self, ss, exogenous, **kwargs):

return ImpulseDict(irf_nonlin_partial_eq)

def impulse_linear(self, ss, exogenous, T=None, Js=None):
def _impulse_linear(self, ss, exogenous, T=None, Js=None):
"""Calculate a partial equilibrium, linear impulse response to a set of `exogenous` shocks from
a steady_state, `ss`"""
irf_lin_partial_eq = deepcopy(exogenous)
Expand All @@ -92,7 +100,7 @@ def impulse_linear(self, ss, exogenous, T=None, Js=None):

return ImpulseDict(irf_lin_partial_eq)

def jacobian(self, ss, exogenous=None, T=None, outputs=None, Js=None):
def _jacobian(self, ss, exogenous=None, T=None, outputs=None, Js=None):
"""Calculate a partial equilibrium Jacobian with respect to a set of `exogenous` shocks at
a steady state, `ss`"""
if exogenous is None:
Expand All @@ -102,7 +110,7 @@ def jacobian(self, ss, exogenous=None, T=None, outputs=None, Js=None):
kwargs = {"exogenous": exogenous, "T": T, "outputs": outputs, "Js": Js}

for i, block in enumerate(self.blocks):
curlyJ = block.jacobian(ss, **{k: kwargs[k] for k in utils.misc.input_kwarg_list(block.jacobian) if k in kwargs}).complete()
curlyJ = block._jacobian(ss, **{k: kwargs[k] for k in utils.misc.input_kwarg_list(block._jacobian) if k in kwargs}).complete()

# If we want specific list of outputs, restrict curlyJ to that before continuing
curlyJ = curlyJ[[k for k in curlyJ.outputs if k in outputs or k in self._required]]
Expand Down
948 changes: 948 additions & 0 deletions src/sequence_jacobian/blocks/discont_block.py

Large diffs are not rendered by default.

49 changes: 17 additions & 32 deletions src/sequence_jacobian/blocks/het_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import numpy as np

from .support.impulse import ImpulseDict
from .support.bijection import Bijection
from ..primitives import Block
from .. import utilities as utils
from ..steady_state.classes import SteadyStateDict
from ..jacobian.classes import JacobianDict
from .support.bijection import Bijection
from ..devtools.deprecate import rename_output_list_to_outputs
from ..utilities.misc import verify_saved_jacobian

Expand Down Expand Up @@ -55,6 +57,7 @@ def __init__(self, back_step_fun, exogenous, policy, backward, backward_init=Non
Currently, we only support up to two policy variables.
"""
self.name = back_step_fun.__name__
self.M = Bijection({})

# self.back_step_fun is one iteration of the backward step function pertaining to a given HetBlock.
# i.e. the function pertaining to equation (14) in the paper: v_t = curlyV(v_{t+1}, X_t)
Expand Down Expand Up @@ -145,44 +148,26 @@ def __repr__(self):
"""Nice string representation of HetBlock for printing to console"""
if self.hetinput is not None:
if self.hetoutput is not None:
return f"<HetBlock '{self.back_step_fun.__name__}' with hetinput '{self.hetinput.__name__}'" \
return f"<HetBlock '{self.name}' with hetinput '{self.hetinput.__name__}'" \
f" and with hetoutput `{self.hetoutput.name}'>"
else:
return f"<HetBlock '{self.back_step_fun.__name__}' with hetinput '{self.hetinput.__name__}'>"
return f"<HetBlock '{self.name}' with hetinput '{self.hetinput.__name__}'>"
else:
return f"<HetBlock '{self.back_step_fun.__name__}'>"
return f"<HetBlock '{self.name}'>"

'''Part 2: high-level routines, with first three called analogously to SimpleBlock counterparts
- ss : do backward and forward iteration until convergence to get complete steady state
- td : do backward and forward iteration up to T to compute dynamics given some shocks
- jac : compute jacobians of outputs with respect to shocked inputs, using fake news algorithm
- ajac : compute asymptotic columns of jacobians output by jac, also using fake news algorithm
- steady_state : do backward and forward iteration until convergence to get complete steady state
- impulse_nonlinear : do backward and forward iteration up to T to compute dynamics given some shocks
- impulse_linear : apply jacobians to compute linearized dynamics given some shocks
- jacobian : compute jacobians of outputs with respect to shocked inputs, using fake news algorithm
- add_hetinput : add a hetinput to the HetBlock that first processes inputs through function hetinput
- add_hetoutput: add a hetoutput to the HetBlock that is computed after the entire ss computation, or after
each backward iteration step in td
each backward iteration step in td, jacobian is not computed for these!
'''

# TODO: Deprecated methods, to be removed!
def ss(self, **kwargs):
warnings.warn("This method has been deprecated. Please invoke by calling .steady_state", DeprecationWarning)
return self.steady_state(kwargs)

def td(self, ss, **kwargs):
warnings.warn("This method has been deprecated. Please invoke by calling .impulse_nonlinear",
DeprecationWarning)
return self.impulse_nonlinear(ss, **kwargs)

def jac(self, ss, shock_list=None, T=None, **kwargs):
if shock_list is None:
shock_list = list(self.inputs)
warnings.warn("This method has been deprecated. Please invoke by calling .jacobian.\n"
"Also, note that the kwarg `shock_list` in .jacobian has been renamed to `shocked_vars`",
DeprecationWarning)
return self.jacobian(ss, shock_list, T, **kwargs)

def steady_state(self, calibration, backward_tol=1E-8, backward_maxit=5000,
forward_tol=1E-10, forward_maxit=100_000, hetoutput=False):
def _steady_state(self, calibration, backward_tol=1E-8, backward_maxit=5000,
forward_tol=1E-10, forward_maxit=100_000):
"""Evaluate steady state HetBlock using keyword args for all inputs. Analog to SimpleBlock.ss.
Parameters
Expand Down Expand Up @@ -239,7 +224,7 @@ def steady_state(self, calibration, backward_tol=1E-8, backward_maxit=5000,
aggregates = {o.capitalize(): np.vdot(D, sspol[o]) for o in self.non_back_iter_outputs}
ss.update(aggregates)

if hetoutput and self.hetoutput is not None:
if self.hetoutput is not None:
hetoutputs = self.hetoutput.evaluate(ss)
aggregate_hetoutputs = self.hetoutput.aggregate(hetoutputs, D, ss, mode="ss")
else:
Expand All @@ -249,7 +234,7 @@ def steady_state(self, calibration, backward_tol=1E-8, backward_maxit=5000,

return SteadyStateDict(ss, internal=self)

def impulse_nonlinear(self, ss, exogenous, monotonic=False, returnindividual=False, grid_paths=None):
def _impulse_nonlinear(self, ss, exogenous, monotonic=False, returnindividual=False, grid_paths=None):
"""Evaluate transitional dynamics for HetBlock given dynamic paths for inputs in kwargs,
assuming that we start and end in steady state ss, and that all inputs not specified in
kwargs are constant at their ss values. Analog to SimpleBlock.td.
Expand Down Expand Up @@ -360,7 +345,7 @@ def impulse_nonlinear(self, ss, exogenous, monotonic=False, returnindividual=Fal
else:
return ImpulseDict({**aggregates, **aggregate_hetoutputs}) - ss

def impulse_linear(self, ss, exogenous, T=None, Js=None, **kwargs):
def _impulse_linear(self, ss, exogenous, T=None, Js=None, **kwargs):
# infer T from exogenous, check that all shocks have same length
shock_lengths = [x.shape[0] for x in exogenous.values()]
if shock_lengths[1:] != shock_lengths[:-1]:
Expand All @@ -369,7 +354,7 @@ def impulse_linear(self, ss, exogenous, T=None, Js=None, **kwargs):

return ImpulseDict(self.jacobian(ss, list(exogenous.keys()), T=T, Js=Js, **kwargs).apply(exogenous))

def jacobian(self, ss, exogenous=None, T=300, outputs=None, output_list=None, Js=None, h=1E-4):
def _jacobian(self, ss, exogenous=None, T=300, outputs=None, output_list=None, Js=None, h=1E-4):
"""Assemble nested dict of Jacobians of agg outputs vs. inputs, using fake news algorithm.
Parameters
Expand Down
32 changes: 8 additions & 24 deletions src/sequence_jacobian/blocks/simple_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from .support.simple_displacement import ignore, Displace, AccumulatedDerivative
from .support.impulse import ImpulseDict
from .support.bijection import Bijection
from ..primitives import Block
from ..steady_state.classes import SteadyStateDict
from ..jacobian.classes import JacobianDict, SimpleSparse, ZeroMatrix
Expand Down Expand Up @@ -40,35 +41,18 @@ def __init__(self, f):
self.output_list = misc.output_list(f)
self.inputs = set(self.input_list)
self.outputs = set(self.output_list)
self.M = Bijection({})

def __repr__(self):
return f"<SimpleBlock '{self.f.__name__}'>"

# TODO: Deprecated methods, to be removed!
def ss(self, **kwargs):
warnings.warn("This method has been deprecated. Please invoke by calling .steady_state", DeprecationWarning)
return self.steady_state(kwargs)

def td(self, ss, **kwargs):
warnings.warn("This method has been deprecated. Please invoke by calling .impulse_nonlinear",
DeprecationWarning)
return self.impulse_nonlinear(ss, exogenous=kwargs)

def jac(self, ss, T=None, shock_list=None):
if shock_list is None:
shock_list = []
warnings.warn("This method has been deprecated. Please invoke by calling .jacobian.\n"
"Also, note that the kwarg `shock_list` in .jacobian has been renamed to `shocked_vars`",
DeprecationWarning)
return self.jacobian(ss, exogenous=shock_list, T=T)

def steady_state(self, calibration):
return f"<SimpleBlock '{self.name}'>"

def _steady_state(self, calibration):
input_args = {k: ignore(v) for k, v in calibration.items() if k in misc.input_list(self.f)}
output_vars = [misc.numeric_primitive(o) for o in self.f(**input_args)] if len(self.output_list) > 1 else [
misc.numeric_primitive(self.f(**input_args))]
return SteadyStateDict({**calibration, **dict(zip(self.output_list, output_vars))})

def impulse_nonlinear(self, ss, exogenous):
def _impulse_nonlinear(self, ss, exogenous):
input_args = {}
for k, v in exogenous.items():
if np.isscalar(v):
Expand All @@ -81,10 +65,10 @@ def impulse_nonlinear(self, ss, exogenous):

return ImpulseDict(make_impulse_uniform_length(self.f(**input_args), self.output_list)) - ss

def impulse_linear(self, ss, exogenous, T=None, Js=None):
def _impulse_linear(self, ss, exogenous, T=None, Js=None):
return ImpulseDict(self.jacobian(ss, exogenous=list(exogenous.keys()), T=T, Js=Js).apply(exogenous))

def jacobian(self, ss, exogenous=None, T=None, Js=None):
def _jacobian(self, ss, exogenous=None, T=None, Js=None):
"""Assemble nested dict of Jacobians
Parameters
Expand Down
30 changes: 7 additions & 23 deletions src/sequence_jacobian/blocks/solved_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ..primitives import Block
from ..blocks.simple_block import simple
from ..utilities import graph
from .support.bijection import Bijection

from ..jacobian.classes import JacobianDict

Expand Down Expand Up @@ -41,6 +42,7 @@ class SolvedBlock(Block):
def __init__(self, blocks, name, unknowns, targets, solver=None, solver_kwargs={}):
# Store the actual blocks in ._blocks_unsorted, and use .blocks_w_helpers and .blocks to index from there.
self._blocks_unsorted = blocks
self.M = Bijection({}) # don't inherit membrane from parent blocks (think more about this later)

# Upon instantiation, we only have enough information to conduct a sort ignoring HelperBlocks
# since we need a `calibration` to resolve cyclic dependencies when including HelperBlocks in a topological sort
Expand Down Expand Up @@ -68,26 +70,8 @@ def __init__(self, blocks, name, unknowns, targets, solver=None, solver_kwargs={
def __repr__(self):
return f"<SolvedBlock '{self.name}'>"

# TODO: Deprecated methods, to be removed!
def ss(self, **kwargs):
warnings.warn("This method has been deprecated. Please invoke by calling .steady_state", DeprecationWarning)
return self.steady_state(kwargs)

def td(self, ss, **kwargs):
warnings.warn("This method has been deprecated. Please invoke by calling .impulse_nonlinear",
DeprecationWarning)
return self.impulse_nonlinear(ss, **kwargs)

def jac(self, ss, T=None, shock_list=None, **kwargs):
if shock_list is None:
shock_list = []
warnings.warn("This method has been deprecated. Please invoke by calling .jacobian.\n"
"Also, note that the kwarg `shock_list` in .jacobian has been renamed to `shocked_vars`",
DeprecationWarning)
return self.jacobian(ss, shock_list, T, **kwargs)

def steady_state(self, calibration, unknowns=None, helper_blocks=None, solver=None,
consistency_check=False, ttol=1e-9, ctol=1e-9, verbose=False):
def _steady_state(self, calibration, unknowns=None, helper_blocks=None, solver=None,
consistency_check=False, ttol=1e-9, ctol=1e-9, verbose=False):
# If this is the first time invoking steady_state/solve_steady_state, cache the sorted indices
# accounting for HelperBlocks
if self._sorted_indices_w_helpers is None:
Expand All @@ -105,18 +89,18 @@ def steady_state(self, calibration, unknowns=None, helper_blocks=None, solver=No
return super().solve_steady_state(calibration, unknowns, self.targets, solver=solver,
consistency_check=consistency_check, ttol=ttol, ctol=ctol, verbose=verbose)

def impulse_nonlinear(self, ss, exogenous=None, monotonic=False, Js=None, returnindividual=False, verbose=False):
def _impulse_nonlinear(self, ss, exogenous=None, monotonic=False, Js=None, returnindividual=False, verbose=False):
return super().solve_impulse_nonlinear(ss, exogenous=exogenous,
unknowns=list(self.unknowns.keys()), Js=Js,
targets=self.targets if isinstance(self.targets, list) else list(self.targets.keys()),
monotonic=monotonic, returnindividual=returnindividual, verbose=verbose)

def impulse_linear(self, ss, exogenous, T=None, Js=None):
def _impulse_linear(self, ss, exogenous, T=None, Js=None):
return super().solve_impulse_linear(ss, exogenous=exogenous, unknowns=list(self.unknowns.keys()),
targets=self.targets if isinstance(self.targets, list) else list(self.targets.keys()),
T=T, Js=Js)

def jacobian(self, ss, exogenous=None, T=300, outputs=None, Js=None):
def _jacobian(self, ss, exogenous=None, T=300, outputs=None, Js=None):
if exogenous is None:
exogenous = list(self.inputs)
if outputs is None:
Expand Down
58 changes: 58 additions & 0 deletions src/sequence_jacobian/blocks/support/bijection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
class Bijection:
def __init__(self, map):
# identity always implicit, remove if there explicitly
self.map = {k: v for k, v in map.items() if k != v}
invmap = {}
for k, v in map.items():
if v in invmap:
raise ValueError(f'Duplicate value {v}, for keys {invmap[v]} and {k}')
invmap[v] = k
self.invmap = invmap

@property
def inv(self):
invmap = Bijection.__new__(Bijection) # better way to do this?
invmap.map = self.invmap
invmap.invmap = self.map
return invmap

def __repr__(self):
return f'Bijection({repr(self.map)})'

def __getitem__(self, k):
return self.map.get(k, k)

def __matmul__(self, x):
if isinstance(x, Bijection):
# compose self: v -> u with x: w -> v
# assume everything missing in either is the identity
M = {}
for v, u in self.map.items():
w = x.invmap.get(v, v)
M[w] = u
for w, v in x.map.items():
if v not in self.map:
M[w] = v
return Bijection(M)
elif isinstance(x, dict):
return {self[k]: v for k, v in x.items()}
elif isinstance(x, list):
return [self[k] for k in x]
elif isinstance(x, set):
return {self[k] for k in x}
elif isinstance(x, tuple):
return tuple(self[k] for k in x)
else:
return NotImplemented

def __rmatmul__(self, x):
if isinstance(x, dict):
return {self[k]: v for k, v in x.items()}
elif isinstance(x, list):
return [self[k] for k in x]
elif isinstance(x, set):
return {self[k] for k in x}
elif isinstance(x, tuple):
return tuple(self[k] for k in x)
else:
return NotImplemented
Loading

0 comments on commit 5ed6eb8

Please sign in to comment.