Skip to content

Commit

Permalink
Merge pull request #29 from mrognlie/refactor
Browse files Browse the repository at this point in the history
Consolidate refactor
  • Loading branch information
bbardoczy authored Oct 11, 2021
2 parents 5ed6eb8 + 63a4d4d commit e6ed38e
Show file tree
Hide file tree
Showing 78 changed files with 4,648 additions and 6,026 deletions.
15 changes: 5 additions & 10 deletions src/sequence_jacobian/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
"""Public-facing objects."""

from . import estimation, jacobian, nonlinear, utilities, devtools

from .models import rbc, krusell_smith, hank, two_asset
from . import estimation, utilities

from .blocks.simple_block import simple
from .blocks.het_block import het, hetoutput
from .blocks.het_block import het
from .blocks.solved_block import solved
from .blocks.combined_block import combine, create_model
from .blocks.support.simple_displacement import apply_function

from .visualization.draw_dag import draw_dag, draw_solved, inspect_solved

from .steady_state.drivers import steady_state
from .jacobian.drivers import get_G, get_H_U, get_impulse
from .nonlinear import td_solve
from .classes.steady_state_dict import SteadyStateDict
from .classes.impulse_dict import ImpulseDict
from .classes.jacobian_dict import JacobianDict

# Useful utilities for setting up HetBlocks
from .utilities.discretize import agrid, markov_rouwenhorst, markov_tauchen
Expand Down
25 changes: 11 additions & 14 deletions src/sequence_jacobian/blocks/auxiliary_blocks/jacobiandict_block.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
"""A simple wrapper for JacobianDicts to be embedded in DAGs"""

from numbers import Real
from typing import Dict, Union, List

from ...primitives import Block, Array
from ...jacobian.classes import JacobianDict

from ..block import Block
from ...classes import ImpulseDict, JacobianDict

class JacobianDictBlock(JacobianDict, Block):
"""A wrapper for nested dicts/JacobianDicts passed directly into DAGs to ensure method compatibility"""
def __init__(self, nesteddict, outputs=None, inputs=None, name=None):
super().__init__(nesteddict, outputs=outputs, inputs=inputs, name=name)
Block.__init__(self)

def __repr__(self):
return f"<JacobianDictBlock outputs={self.outputs}, inputs={self.inputs}>"

def impulse_linear(self, ss: Dict[str, Union[Real, Array]],
exogenous: Dict[str, Array], **kwargs) -> Dict[str, Array]:
return self.jacobian(list(exogenous.keys())).apply(exogenous)
def _impulse_linear(self, ss, inputs, outputs, Js):
return ImpulseDict(self.jacobian(ss, list(inputs.keys()), outputs, inputs.T, Js).apply(inputs))

def jacobian(self, exogenous: List[str] = None, **kwargs) -> JacobianDict:
if exogenous is None:
return JacobianDict(self.nesteddict)
else:
return self[:, exogenous]
def _jacobian(self, ss, inputs, outputs, T):
if not inputs <= self.inputs:
raise KeyError(f'Asking JacobianDictBlock for {inputs - self.inputs}, which are among its inputs {self.inputs}')
if not outputs <= self.outputs:
raise KeyError(f'Asking JacobianDictBlock for {outputs - self.outputs}, which are among its outputs {self.outputs}')
return self[outputs, inputs]
344 changes: 344 additions & 0 deletions src/sequence_jacobian/blocks/block.py

Large diffs are not rendered by default.

165 changes: 74 additions & 91 deletions src/sequence_jacobian/blocks/combined_block.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
"""CombinedBlock class and the combine function to generate it"""

from copy import deepcopy

from .support.impulse import ImpulseDict
from ..primitives import Block
from .. import utilities as utils
from ..blocks.auxiliary_blocks.jacobiandict_block import JacobianDictBlock
from ..steady_state.drivers import eval_block_ss
from ..steady_state.support import provide_solver_default
from ..jacobian.classes import JacobianDict
from ..steady_state.classes import SteadyStateDict
from .support.bijection import Bijection
from .block import Block
from .auxiliary_blocks.jacobiandict_block import JacobianDictBlock
from .support.parent import Parent
from ..classes import ImpulseDict, JacobianDict
from ..utilities.graph import DAG, find_intermediate_inputs


def combine(blocks, name="", model_alias=False):
Expand All @@ -22,32 +16,29 @@ def create_model(blocks, **kwargs):
return combine(blocks, model_alias=True, **kwargs)


class CombinedBlock(Block):
class CombinedBlock(Block, Parent, DAG):
"""A combined `Block` object comprised of several `Block` objects, which topologically sorts them and provides
a set of partial and general equilibrium methods for evaluating their steady state, computes impulse responses,
and calculates Jacobians along the DAG"""
# To users: Do *not* manually change the attributes via assignment. Instantiating a
# CombinedBlock has some automated features that are inferred from initial instantiation but not from
# re-assignment of attributes post-instantiation.
def __init__(self, blocks, name="", model_alias=False):
def __init__(self, blocks, name="", model_alias=False, sorted_indices=None, intermediate_inputs=None):
super().__init__()

self._blocks_unsorted = [b if isinstance(b, Block) else JacobianDictBlock(b) for b in blocks]
self._sorted_indices = utils.graph.block_sort(blocks)
self._required = utils.graph.find_outputs_that_are_intermediate_inputs(blocks)
self.blocks = [self._blocks_unsorted[i] for i in self._sorted_indices]
self.M = Bijection({})
blocks_unsorted = [b if isinstance(b, Block) else JacobianDictBlock(b) for b in blocks]
DAG.__init__(self, blocks_unsorted)

# TODO: deprecate this, use DAG methods instead
self._required = find_intermediate_inputs(blocks) if intermediate_inputs is None else intermediate_inputs

if not name:
self.name = f"{self.blocks[0].name}_to_{self.blocks[-1].name}_combined"
else:
self.name = name

# Find all outputs (including those used as intermediary inputs)
self.outputs = set().union(*[block.outputs for block in self.blocks])

# Find all inputs that are *not* intermediary outputs
all_inputs = set().union(*[block.inputs for block in self.blocks])
self.inputs = all_inputs.difference(self.outputs)
# now that it has a name, do Parent initialization
Parent.__init__(self, blocks)

# If the create_model() is used instead of combine(), we will have __repr__ show this object as a 'Model'
self._model_alias = model_alias
Expand All @@ -58,81 +49,73 @@ def __repr__(self):
else:
return f"<CombinedBlock '{self.name}'>"

def _steady_state(self, calibration, helper_blocks=None, **kwargs):
def _steady_state(self, calibration, dissolve, **kwargs):
"""Evaluate a partial equilibrium steady state of the CombinedBlock given a `calibration`"""
if helper_blocks is None:
helper_blocks = []

topsorted = utils.graph.block_sort(self.blocks, calibration=calibration, helper_blocks=helper_blocks)
blocks_all = self.blocks + helper_blocks

ss_partial_eq_toplevel = deepcopy(calibration)
ss_partial_eq_internal = {}
for i in topsorted:
outputs = eval_block_ss(blocks_all[i], ss_partial_eq_toplevel, **kwargs)
ss_partial_eq_toplevel.update(outputs.toplevel)
if outputs.internal:
ss_partial_eq_internal.update(outputs.internal)
ss_partial_eq_internal = {self.name: ss_partial_eq_internal} if ss_partial_eq_internal else {}
return SteadyStateDict(ss_partial_eq_toplevel, internal=ss_partial_eq_internal)

def _impulse_nonlinear(self, ss, exogenous, **kwargs):
"""Calculate a partial equilibrium, non-linear impulse response to a set of `exogenous` shocks from
a steady state, `ss`"""
irf_nonlin_partial_eq = deepcopy(exogenous)

ss = calibration.copy()
for block in self.blocks:
input_args = {k: v for k, v in irf_nonlin_partial_eq.items() if k in block.inputs}
# TODO: make this inner_dissolve better, clumsy way to dispatch dissolve only to correct children
inner_dissolve = [k for k in dissolve if self.descendants[k] == block.name]
outputs = block.steady_state(ss, dissolve=inner_dissolve, **kwargs)
ss.update(outputs)

if input_args: # If this block is actually perturbed
irf_nonlin_partial_eq.update({k: v for k, v in block.impulse_nonlinear(ss, input_args, **kwargs)})
return ss

return ImpulseDict(irf_nonlin_partial_eq)
def _impulse_nonlinear(self, ss, inputs, outputs, internals, Js, options, ss_initial):
original_outputs = outputs
outputs = (outputs | self._required) - ss._vector_valued()

def _impulse_linear(self, ss, exogenous, T=None, Js=None):
"""Calculate a partial equilibrium, linear impulse response to a set of `exogenous` shocks from
a steady_state, `ss`"""
irf_lin_partial_eq = deepcopy(exogenous)
impulses = inputs.copy()
for block in self.blocks:
input_args = {k: v for k, v in irf_lin_partial_eq.items() if k in block.inputs}
input_args = {k: v for k, v in impulses.items() if k in block.inputs}

if input_args or ss_initial is not None:
# If this block is actually perturbed, or we start from different initial ss
# TODO: be more selective about ss_initial here - did any inputs change that matter for this one block?
impulses.update(block.impulse_nonlinear(ss, input_args, outputs & block.outputs, internals, Js, options, ss_initial))

return ImpulseDict({k: impulses.toplevel[k] for k in original_outputs if k in impulses.toplevel}, impulses.internals, impulses.T)

def _impulse_linear(self, ss, inputs, outputs, Js, options):
original_outputs = outputs
outputs = (outputs | self._required) - ss._vector_valued()

impulses = inputs.copy()
for block in self.blocks:
input_args = {k: v for k, v in impulses.items() if k in block.inputs}

if input_args: # If this block is actually perturbed
irf_lin_partial_eq.update({k: v for k, v in block.impulse_linear(ss, input_args, T=T, Js=Js)})

return ImpulseDict(irf_lin_partial_eq)

def _jacobian(self, ss, exogenous=None, T=None, outputs=None, Js=None):
"""Calculate a partial equilibrium Jacobian with respect to a set of `exogenous` shocks at
a steady state, `ss`"""
if exogenous is None:
exogenous = list(self.inputs)
if outputs is None:
outputs = self.outputs
kwargs = {"exogenous": exogenous, "T": T, "outputs": outputs, "Js": Js}

for i, block in enumerate(self.blocks):
curlyJ = block._jacobian(ss, **{k: kwargs[k] for k in utils.misc.input_kwarg_list(block._jacobian) if k in kwargs}).complete()

# If we want specific list of outputs, restrict curlyJ to that before continuing
curlyJ = curlyJ[[k for k in curlyJ.outputs if k in outputs or k in self._required]]
if i == 0:
J_partial_eq = curlyJ.compose(JacobianDict.identity(exogenous))
else:
J_partial_eq.update(curlyJ.compose(J_partial_eq))

return J_partial_eq

def solve_steady_state(self, calibration, unknowns, targets, solver=None, helper_blocks=None,
sort_blocks=False, **kwargs):
"""Evaluate a general equilibrium steady state of the CombinedBlock given a `calibration`
and a set of `unknowns` and `targets` corresponding to the endogenous variables to be solved for and
the target conditions that must hold in general equilibrium"""
if solver is None:
solver = provide_solver_default(unknowns)
if helper_blocks and sort_blocks is False:
sort_blocks = True

return super().solve_steady_state(calibration, unknowns, targets, solver=solver,
helper_blocks=helper_blocks, sort_blocks=sort_blocks, **kwargs)
impulses.update(block.impulse_linear(ss, input_args, outputs & block.outputs, Js, options))

return ImpulseDict({k: impulses.toplevel[k] for k in original_outputs if k in impulses.toplevel}, T=impulses.T)

def _partial_jacobians(self, ss, inputs, outputs, T, Js, options):
vector_valued = ss._vector_valued()
inputs = (inputs | self._required) - vector_valued
outputs = (outputs | self._required) - vector_valued

curlyJs = {}
for block in self.blocks:
curlyJ = block.partial_jacobians(ss, inputs & block.inputs, outputs & block.outputs, T, Js, options)
curlyJs.update(curlyJ)

return curlyJs

def _jacobian(self, ss, inputs, outputs, T, Js, options):
Js = self._partial_jacobians(ss, inputs, outputs, T, Js, options)

original_outputs = outputs
total_Js = JacobianDict.identity(inputs)

# TODO: horrible, redoing work from partial_jacobians, also need more efficient sifting of intermediates!
vector_valued = ss._vector_valued()
inputs = (inputs | self._required) - vector_valued
outputs = (outputs | self._required) - vector_valued
for block in self.blocks:
J = block.jacobian(ss, inputs & block.inputs, outputs & block.outputs, T, Js, options)
total_Js.update(J @ total_Js)

return total_Js[original_outputs, :]


# Useful type aliases
Expand Down
Loading

0 comments on commit e6ed38e

Please sign in to comment.