diff --git a/.gitignore b/.gitignore index f6ec28d..4526876 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,8 @@ __pycache__/ .ipynb_checkpoints/ .DS_Store -.vscode +.vscode/ +.pytest_cache/ *.zip diff --git a/src/sequence_jacobian/blocks/combined_block.py b/src/sequence_jacobian/blocks/combined_block.py index 3835b1f..08af6c2 100644 --- a/src/sequence_jacobian/blocks/combined_block.py +++ b/src/sequence_jacobian/blocks/combined_block.py @@ -9,6 +9,8 @@ from ..steady_state.drivers import eval_block_ss from ..steady_state.support import provide_solver_default from ..jacobian.classes import JacobianDict +from ..steady_state.classes import SteadyStateDict +from .support.bijection import Bijection def combine(blocks, name="", model_alias=False): @@ -33,6 +35,7 @@ def __init__(self, blocks, name="", model_alias=False): self._sorted_indices = utils.graph.block_sort(blocks) self._required = utils.graph.find_outputs_that_are_intermediate_inputs(blocks) self.blocks = [self._blocks_unsorted[i] for i in self._sorted_indices] + self.M = Bijection({}) if not name: self.name = f"{self.blocks[0].name}_to_{self.blocks[-1].name}_combined" @@ -55,7 +58,7 @@ def __repr__(self): else: return f"" - def steady_state(self, calibration, helper_blocks=None, **kwargs): + def _steady_state(self, calibration, helper_blocks=None, **kwargs): """Evaluate a partial equilibrium steady state of the CombinedBlock given a `calibration`""" if helper_blocks is None: helper_blocks = [] @@ -63,12 +66,17 @@ def steady_state(self, calibration, helper_blocks=None, **kwargs): topsorted = utils.graph.block_sort(self.blocks, calibration=calibration, helper_blocks=helper_blocks) blocks_all = self.blocks + helper_blocks - ss_partial_eq = deepcopy(calibration) + ss_partial_eq_toplevel = deepcopy(calibration) + ss_partial_eq_internal = {} for i in topsorted: - ss_partial_eq.update(eval_block_ss(blocks_all[i], ss_partial_eq, **kwargs)) - return ss_partial_eq - - def impulse_nonlinear(self, ss, exogenous, **kwargs): + outputs = eval_block_ss(blocks_all[i], ss_partial_eq_toplevel, **kwargs) + ss_partial_eq_toplevel.update(outputs.toplevel) + if outputs.internal: + ss_partial_eq_internal.update(outputs.internal) + ss_partial_eq_internal = {self.name: ss_partial_eq_internal} if ss_partial_eq_internal else {} + return SteadyStateDict(ss_partial_eq_toplevel, internal=ss_partial_eq_internal) + + def _impulse_nonlinear(self, ss, exogenous, **kwargs): """Calculate a partial equilibrium, non-linear impulse response to a set of `exogenous` shocks from a steady state, `ss`""" irf_nonlin_partial_eq = deepcopy(exogenous) @@ -80,7 +88,7 @@ def impulse_nonlinear(self, ss, exogenous, **kwargs): return ImpulseDict(irf_nonlin_partial_eq) - def impulse_linear(self, ss, exogenous, T=None, Js=None): + def _impulse_linear(self, ss, exogenous, T=None, Js=None): """Calculate a partial equilibrium, linear impulse response to a set of `exogenous` shocks from a steady_state, `ss`""" irf_lin_partial_eq = deepcopy(exogenous) @@ -92,7 +100,7 @@ def impulse_linear(self, ss, exogenous, T=None, Js=None): return ImpulseDict(irf_lin_partial_eq) - def jacobian(self, ss, exogenous=None, T=None, outputs=None, Js=None): + def _jacobian(self, ss, exogenous=None, T=None, outputs=None, Js=None): """Calculate a partial equilibrium Jacobian with respect to a set of `exogenous` shocks at a steady state, `ss`""" if exogenous is None: @@ -102,7 +110,7 @@ def jacobian(self, ss, exogenous=None, T=None, outputs=None, Js=None): kwargs = {"exogenous": exogenous, "T": T, "outputs": outputs, "Js": Js} for i, block in enumerate(self.blocks): - curlyJ = block.jacobian(ss, **{k: kwargs[k] for k in utils.misc.input_kwarg_list(block.jacobian) if k in kwargs}).complete() + curlyJ = block._jacobian(ss, **{k: kwargs[k] for k in utils.misc.input_kwarg_list(block._jacobian) if k in kwargs}).complete() # If we want specific list of outputs, restrict curlyJ to that before continuing curlyJ = curlyJ[[k for k in curlyJ.outputs if k in outputs or k in self._required]] diff --git a/src/sequence_jacobian/blocks/discont_block.py b/src/sequence_jacobian/blocks/discont_block.py new file mode 100644 index 0000000..091acda --- /dev/null +++ b/src/sequence_jacobian/blocks/discont_block.py @@ -0,0 +1,948 @@ +import warnings +import copy +import numpy as np + +from .support.impulse import ImpulseDict +from ..primitives import Block +from .. import utilities as utils +from ..steady_state.classes import SteadyStateDict +from ..jacobian.classes import JacobianDict +from ..utilities.misc import verify_saved_jacobian +from .support.bijection import Bijection + + +def discont(exogenous, policy, disc_policy, backward, backward_init=None): + def decorator(back_step_fun): + return DiscontBlock(back_step_fun, exogenous, policy, disc_policy, backward, backward_init=backward_init) + return decorator + + +class DiscontBlock(Block): + """Part 1: Initializer for DiscontBlock, intended to be called via @hetdc() decorator on backward step function. + + IMPORTANT: All `policy` and non-aggregate output variables of this HetBlock need to be *lower-case*, since + the methods that compute steady state, transitional dynamics, and Jacobians for HetBlocks automatically handle + aggregation of non-aggregate outputs across the distribution and return aggregates as upper-case equivalents + of the `policy` and non-aggregate output variables specified in the backward step function. + """ + + def __init__(self, back_step_fun, exogenous, policy, disc_policy, backward, backward_init=None): + """Construct HetBlock from backward iteration function. + + Parameters + ---------- + back_step_fun : function + backward iteration function + exogenous : str + name of Markov transition matrix for exogenous variable + (now only single allowed for simplicity; use Kronecker product for more) + policy : str or sequence of str + names of policy variables of endogenous, continuous state variables + e.g. assets 'a', must be returned by function + disc_policy: str + name of policy function for discrete choices (probabilities) + backward : str or sequence of str + variables that together comprise the 'v' that we use for iterating backward + must appear both as outputs and as arguments + + It is assumed that every output of the function (except possibly backward), including policy, + will be on a grid of dimension 1 + len(policy), where the first dimension is the exogenous + variable and then the remaining dimensions are each of the continuous policy variables, in the + same order they are listed in 'policy'. + + The Markov transition matrix between the current and future period and backward iteration + variables should appear in the backward iteration function with '_p' subscripts ("prime") to + indicate that they come from the next period. + + Currently, we only support up to two policy variables. + """ + self.name = back_step_fun.__name__ + self.M = Bijection({}) + + # self.back_step_fun is one iteration of the backward step function pertaining to a given HetBlock. + # i.e. the function pertaining to equation (14) in the paper: v_t = curlyV(v_{t+1}, X_t) + self.back_step_fun = back_step_fun + + # self.back_step_outputs and self.back_step_inputs are all of the output and input arguments of + # self.back_step_fun, the variables used in the backward iteration, + # which generally include value and/or policy functions. + self.back_step_output_list = utils.misc.output_list(back_step_fun) + self.back_step_outputs = set(self.back_step_output_list) + self.back_step_inputs = set(utils.misc.input_list(back_step_fun)) + + # See the docstring of HetBlock for details on the attributes directly below + self.disc_policy = disc_policy + self.policy = policy + self.exogenous, self.back_iter_vars = (utils.misc.make_tuple(x) for x in (exogenous, backward)) + + # self.inputs_to_be_primed indicates all variables that enter into self.back_step_fun whose name has "_p" + # (read as prime). Because it's the case that the initial dict of input arguments for self.back_step_fun + # contains the names of these variables that omit the "_p", we need to swap the key from the unprimed to + # the primed key name, such that self.back_step_fun will properly call those variables. + # e.g. the key "Va" will become "Va_p", associated to the same value. + self.inputs_to_be_primed = set(self.exogenous) | set(self.back_iter_vars) + + # self.non_back_iter_outputs are all of the outputs from self.back_step_fun excluding the backward + # iteration variables themselves. + self.non_back_iter_outputs = self.back_step_outputs - set(self.back_iter_vars) - set(self.disc_policy) + + # self.outputs and self.inputs are the *aggregate* outputs and inputs of this HetBlock, which are used + # in utils.graph.block_sort to topologically sort blocks along the DAG + # according to their aggregate outputs and inputs. + self.outputs = {o.capitalize() for o in self.non_back_iter_outputs} + self.inputs = self.back_step_inputs - {k + '_p' for k in self.back_iter_vars} + for ex in self.exogenous: + self.inputs.remove(ex + '_p') + self.inputs.add(ex) + + # A HetBlock can have heterogeneous inputs and heterogeneous outputs, henceforth `hetinput` and `hetoutput`. + # See docstring for methods `add_hetinput` and `add_hetoutput` for more details. + self.hetinput = None + self.hetinput_inputs = set() + self.hetinput_outputs = set() + self.hetinput_outputs_order = tuple() + + # start without a hetoutput + self.hetoutput = None + self.hetoutput_inputs = set() + self.hetoutput_outputs = set() + self.hetoutput_outputs_order = tuple() + + # The set of variables that will be wrapped in a separate namespace for this HetBlock + # as opposed to being available at the top level + self.internal = utils.misc.smart_set(self.back_step_outputs) | utils.misc.smart_set(self.exogenous) | {"D"} + + if len(self.policy) > 1: + raise ValueError(f"More than one continuous states in {back_step_fun.__name__}, not yet supported") + + # Checking that the various inputs/outputs attributes are correctly set + for ex in self.exogenous: + if ex + '_p' not in self.back_step_inputs: + raise ValueError(f"Markov matrix '{ex}_p' not included as argument in {back_step_fun.__name__}") + + if self.policy not in self.back_step_outputs: + raise ValueError(f"Policy '{self.policy}' not included as output in {back_step_fun.__name__}") + if self.policy[0].isupper(): + raise ValueError(f"Policy '{self.policy}' is uppercase in {back_step_fun.__name__}, which is not allowed") + + for back in self.back_iter_vars: + if back + '_p' not in self.back_step_inputs: + raise ValueError(f"Backward variable '{back}_p' not included as argument in {back_step_fun.__name__}") + + if back not in self.back_step_outputs: + raise ValueError(f"Backward variable '{back}' not included as output in {back_step_fun.__name__}") + + for out in self.non_back_iter_outputs: + if out[0].isupper(): + raise ValueError("Output '{out}' is uppercase in {back_step_fun.__name__}, which is not allowed") + + # Add the backward iteration initializer function (the initial guesses for self.back_iter_vars) + if backward_init is None: + # TODO: Think about implementing some "automated way" of providing + # an initial guess for the backward iteration. + self.backward_init = backward_init + else: + self.backward_init = backward_init + + # note: should do more input checking to ensure certain choices not made: 'D' not input, etc. + + def __repr__(self): + """Nice string representation of HetBlock for printing to console""" + if self.hetinput is not None: + if self.hetoutput is not None: + return f"" + else: + return f"" + else: + return f"" + + '''Part 2: high-level routines, with first three called analogously to SimpleBlock counterparts + - steady_state : do backward and forward iteration until convergence to get complete steady state + - impulse_nonlinear : do backward and forward iteration up to T to compute dynamics given some shocks + - impulse_linear : apply jacobians to compute linearized dynamics given some shocks + - jacobian : compute jacobians of outputs with respect to shocked inputs, using fake news algorithm + + - add_hetinput : add a hetinput to the HetBlock that first processes inputs through function hetinput + - add_hetoutput: add a hetoutput to the HetBlock that is computed after the entire ss computation, or after + each backward iteration step in td, jacobian is not computed for these! + ''' + + + def _steady_state(self, calibration, backward_tol=1E-8, backward_maxit=5000, + forward_tol=1E-10, forward_maxit=100_000, hetoutput=False): + """Evaluate steady state HetBlock using keyword args for all inputs. Analog to SimpleBlock.ss. + + Parameters + ---------- + backward_tol : [optional] float + in backward iteration, max abs diff between policy in consecutive steps needed for convergence + backward_maxit : [optional] int + maximum number of backward iterations, if 'backward_tol' not reached by then, raise error + forward_tol : [optional] float + in forward iteration, max abs diff between dist in consecutive steps needed for convergence + forward_maxit : [optional] int + maximum number of forward iterations, if 'forward_tol' not reached by then, raise error + + kwargs : dict + The following inputs are required as keyword arguments, which show up in 'kwargs': + - The exogenous Markov matrix, e.g. Pi=... if self.exogenous=='Pi' + - A seed for each backward variable, e.g. Va=... and Vb=... if self.back_iter_vars==('Va','Vb') + - A grid for each policy variable, e.g. a_grid=... and b_grid=... if self.policy==('a','b') + - All other inputs to the backward iteration function self.back_step_fun, except _p added to + for self.exogenous and self.back_iter_vars, for which the method uses steady-state values. + If there is a self.hetinput, then we need the inputs to that, not to self.back_step_fun. + + Other inputs in 'kwargs' are optional: + - A seed for the distribution: D=... + - If no seed for the distribution is provided, a seed for the invariant distribution + of the Markov process, e.g. Pi_seed=... if self.exogenous=='Pi' + + Returns + ---------- + ss : dict, contains + - ss inputs of self.back_step_fun and (if present) self.hetinput + - ss outputs of self.back_step_fun + - ss distribution 'D' + - ss aggregates (in uppercase) for all outputs of self.back_step_fun except self.back_iter_vars + """ + + ss = copy.deepcopy(calibration) + + # extract information from calibration + grid = calibration[self.policy + '_grid'] + D_seed = calibration.get('D', None) + + # run backward iteration + sspol = self.policy_ss(calibration, tol=backward_tol, maxit=backward_maxit) + ss.update(sspol) + + # run forward iteration + D = self.dist_ss(sspol, grid, forward_tol, forward_maxit, D_seed) + ss.update({"D": D}) + + # aggregate all outputs other than backward variables on grid, capitalize + aggregates = {o.capitalize(): np.vdot(D, sspol[o]) for o in self.non_back_iter_outputs} + ss.update(aggregates) + + if hetoutput and self.hetoutput is not None: + hetoutputs = self.hetoutput.evaluate(ss) + aggregate_hetoutputs = self.hetoutput.aggregate(hetoutputs, D, ss, mode="ss") + else: + hetoutputs = {} + aggregate_hetoutputs = {} + ss.update({**hetoutputs, **aggregate_hetoutputs}) + + return SteadyStateDict(ss, internal=self) + + def _impulse_nonlinear(self, ss, exogenous, returnindividual=False, grid_paths=None): + """Evaluate transitional dynamics for DiscontBlock given dynamic paths for inputs in exogenous, + assuming that we start and end in steady state ss, and that all inputs not specified in + exogenous are constant at their ss values. Analog to SimpleBlock.td. + + Parameters + ---------- + ss : SteadyStateDict + all steady-state info, intended to be from .ss() + exogenous : dict of {str : array(T, ...)} + all time-varying inputs here (in deviations), with first dimension being time + this must have same length T for all entries (all outputs will be calculated up to T) + returnindividual : [optional] bool + return distribution and full outputs on grid + grid_paths: [optional] dict of {str: array(T, Number of grid points)} + time-varying grids for policies + + Returns + ---------- + td : dict + if returnindividual = False, time paths for aggregates (uppercase) for all outputs + of self.back_step_fun except self.back_iter_vars + if returnindividual = True, additionally time paths for distribution and for all outputs + of self.back_Step_fun on the full grid + """ + # infer T from exogenous, check that all shocks have same length + shock_lengths = [x.shape[0] for x in exogenous.values()] + if shock_lengths[1:] != shock_lengths[:-1]: + raise ValueError('Not all shocks in kwargs (exogenous) are same length!') + T = shock_lengths[0] + + # copy from ss info + D, P = ss.internal[self.name]['D'], ss.internal[self.name][self.disc_policy] + + # construct grids for policy variables either from the steady state grid if the grid is meant to be + # non-time-varying or from the provided `grid_path` if the grid is meant to be time-varying. + if grid_paths is not None and self.policy in grid_paths: + grid = grid_paths[self.policy] + use_ss_grid = False + else: + grid = ss[self.policy + "_grid"] + use_ss_grid = True + # sspol_i, sspol_pi = utils.interpolate_coord_robust(grid, ss[self.policy]) + + # allocate empty arrays to store result, assume all like D + individual_paths = {k: np.empty((T,) + D.shape) for k in self.non_back_iter_outputs} + hetoutput_paths = {k: np.empty((T,) + D.shape) for k in self.hetoutput_outputs} + P_path = np.empty((T,) + P.shape) + + # obtain full path of multidimensional inputs + multidim_inputs = {k: np.empty((T,) + ss.internal[self.name][k].shape) for k in self.hetinput_outputs_order} + if self.hetinput is not None: + indict = dict(ss.items()) + for t in range(T): + indict.update({k: ss[k] + v[t, ...] for k, v in exogenous.items()}) + hetout = dict(zip(self.hetinput_outputs_order, + self.hetinput(**{k: indict[k] for k in self.hetinput_inputs}))) + for k in self.hetinput_outputs_order: + multidim_inputs[k][t, ...] = hetout[k] + + # backward iteration + backdict = dict(ss.items()) + backdict.update(copy.deepcopy(ss.internal[self.name])) + for t in reversed(range(T)): + # be careful: if you include vars from self.back_iter_vars in exogenous, agents will use them! + backdict.update({k: ss[k] + v[t, ...] for k, v in exogenous.items()}) + + # add in multidimensional inputs EXCEPT exogenous state transitions (at lead 0) + backdict.update({k: ss.internal[self.name][k] + v[t, ...] for k, v in multidim_inputs.items() if k not in self.exogenous}) + + # add in multidimensional inputs FOR exogenous state transitions (at lead 1) + if t < T - 1: + backdict.update({k: ss.internal[self.name][k] + v[t+1, ...] for k, v in multidim_inputs.items() if k in self.exogenous}) + + # step back + individual = {k: v for k, v in zip(self.back_step_output_list, + self.back_step_fun(**self.make_inputs(backdict)))} + + # update backward variables + backdict.update({k: individual[k] for k in self.back_iter_vars}) + + # compute hetoutputs + if self.hetoutput is not None: + hetoutput = self.hetoutput.evaluate(backdict) + for k in self.hetoutput_outputs: + hetoutput_paths[k][t, ...] = hetoutput[k] + + # save individual outputs of interest + P_path[t, ...] = individual[self.disc_policy] + for k in self.non_back_iter_outputs: + individual_paths[k][t, ...] = individual[k] + + # forward iteration + # initial markov matrix (may have been shocked) + Pi_path = [[multidim_inputs[k][0, ...] if k in self.hetinput_outputs_order else ss[k] for k in self.exogenous]] + + # on impact: assets are predetermined, but Pi could be shocked, and P can change + D_path = np.empty((T,) + D.shape) + if use_ss_grid: + grid_var = grid + else: + grid_var = grid[0, ...] + sspol_i, sspol_pi = utils.interpolate.interpolate_coord_robust(grid_var, ss.internal[self.name][self.policy]) + D_path[0, ...] = self.forward_step(D, P_path[0, ...], Pi_path[0], sspol_i, sspol_pi) + for t in range(T-1): + # have to interpolate policy separately for each t to get sparse transition matrices + if not use_ss_grid: + grid_var = grid[t, ...] + pol_i, pol_pi = utils.interpolate.interpolate_coord_robust(grid_var, individual_paths[self.policy][t, ...]) + + # update exogenous Markov matrices + Pi = [multidim_inputs[k][t+1, ...] if k in self.hetinput_outputs_order else ss[k] for k in self.exogenous] + Pi_path.append(Pi) + + # step forward + D_path[t+1, ...] = self.forward_step(D_path[t, ...], P_path[t+1, ...], Pi, pol_i, pol_pi) + + # obtain aggregates of all outputs, made uppercase + aggregates = {o.capitalize(): utils.optimized_routines.fast_aggregate(D_path, individual_paths[o]) + for o in self.non_back_iter_outputs} + if self.hetoutput: + aggregate_hetoutputs = self.hetoutput.aggregate(hetoutput_paths, D_path, backdict, mode="td") + else: + aggregate_hetoutputs = {} + + # return either this, or also include distributional information + if returnindividual: + return ImpulseDict({**aggregates, **aggregate_hetoutputs, **individual_paths, **multidim_inputs, + **hetoutput_paths, 'D': D_path, 'P_path': P_path, 'Pi_path': Pi_path}) - ss + else: + return ImpulseDict({**aggregates, **aggregate_hetoutputs}) - ss + + def _impulse_linear(self, ss, exogenous, T=None, Js=None, **kwargs): + # infer T from exogenous, check that all shocks have same length + shock_lengths = [x.shape[0] for x in exogenous.values()] + if shock_lengths[1:] != shock_lengths[:-1]: + raise ValueError('Not all shocks in kwargs (exogenous) are same length!') + T = shock_lengths[0] + + return ImpulseDict(self.jacobian(ss, list(exogenous.keys()), T=T, Js=Js, **kwargs).apply(exogenous)) + + def _jacobian(self, ss, exogenous=None, T=300, outputs=None, Js=None, h=1E-4): + """Assemble nested dict of Jacobians of agg outputs vs. inputs, using fake news algorithm. + + Parameters + ---------- + ss : dict, + all steady-state info, intended to be from .ss() + T : [optional] int + number of time periods for T*T Jacobian + exogenous : list of str + names of input variables to differentiate wrt (main cost scales with # of inputs) + outputs : list of str + names of output variables to get derivatives of, if not provided assume all outputs of + self.back_step_fun except self.back_iter_vars + h : [optional] float + h for numerical differentiation of backward iteration + Js : [optional] dict of {str: JacobianDict}} + supply saved Jacobians + + Returns + ------- + J : dict of {str: dict of {str: array(T,T)}} + J[o][i] for output o and input i gives T*T Jacobian of o with respect to i + """ + # The default set of outputs are all outputs of the backward iteration function + # except for the backward iteration variables themselves + if exogenous is None: + exogenous = list(self.inputs) + if outputs is None: + outputs = self.non_back_iter_outputs + + relevant_shocks = [i for i in self.back_step_inputs | self.hetinput_inputs if i in exogenous] + + # if we supply Jacobians, use them if possible, warn if they cannot be used + if Js is not None: + outputs_cap = [o.capitalize() for o in outputs] + if verify_saved_jacobian(self.name, Js, outputs_cap, relevant_shocks, T): + return Js[self.name] + + # step 0: preliminary processing of steady state + (ssin_dict, ssout_list, ss_for_hetinput, sspol_i, sspol_pi, sspol_space, D0, D2, Pi, P) = self.jac_prelim(ss) + + # step 1 of fake news algorithm + # compute curlyY and curlyD (backward iteration) for each input i + dYs, dDs, dD_ps, dD_direct = {}, {}, {}, {} + for i in relevant_shocks: + dYs[i], dDs[i], dD_ps[i], dD_direct[i] = self.backward_iteration_fakenews(i, outputs, ssin_dict, ssout_list, + ss.internal[self.name]['D'], + D0, D2, P, Pi, sspol_i, sspol_pi, + sspol_space, T, h, + ss_for_hetinput) + + # step 2 of fake news algorithm + # compute prediction vectors curlyP (forward iteration) for each outcome o + curlyPs = {} + for o in outputs: + curlyPs[o] = self.forward_iteration_fakenews(ss.internal[self.name][o], Pi, P, sspol_i, sspol_pi, T) + + # steps 3-4 of fake news algorithm + # make fake news matrix and Jacobian for each outcome-input pair + F, J = {}, {} + for o in outputs: + for i in relevant_shocks: + if o.capitalize() not in F: + F[o.capitalize()] = {} + if o.capitalize() not in J: + J[o.capitalize()] = {} + F[o.capitalize()][i] = DiscontBlock.build_F(dYs[i][o], dD_ps[i], curlyPs[o], dD_direct[i], dDs[i]) + J[o.capitalize()][i] = DiscontBlock.J_from_F(F[o.capitalize()][i]) + + return JacobianDict(J, name=self.name) + + def add_hetinput(self, hetinput, overwrite=False, verbose=True): + """Add a hetinput to this HetBlock. Any call to self.back_step_fun will first process + inputs through the hetinput function. + + A `hetinput` is any non-scalar-valued input argument provided to the HetBlock's backward iteration function, + self.back_step_fun, which is of the same dimensions as the distribution of agents in the HetBlock over + the relevant idiosyncratic state variables, generally referred to as `D`. e.g. The one asset HANK model + example provided in the models directory of sequence_jacobian has a hetinput `T`, which is skill-specific + transfers. + """ + if self.hetinput is not None and overwrite is False: + raise ValueError('Trying to attach hetinput when one already exists!') + else: + if verbose: + if self.hetinput is not None and overwrite is True: + print(f"Overwriting current hetinput, {self.hetinput.__name__} with new hetinput," + f" {hetinput.__name__}!") + else: + print(f"Added hetinput {hetinput.__name__} to the {self.back_step_fun.__name__} HetBlock") + + self.hetinput = hetinput + self.hetinput_inputs = set(utils.misc.input_list(hetinput)) + self.hetinput_outputs = set(utils.misc.output_list(hetinput)) + self.hetinput_outputs_order = utils.misc.output_list(hetinput) + + # modify inputs to include hetinput's additional inputs, remove outputs + self.inputs |= self.hetinput_inputs + self.inputs -= self.hetinput_outputs + + self.internal |= self.hetinput_outputs + + def add_hetoutput(self, hetoutput, overwrite=False, verbose=True): + """Add a hetoutput to this HetBlock. Any call to self.back_step_fun will first process + inputs through the hetoutput function. + + A `hetoutput` is any *non-scalar-value* output that the user might desire to be calculated from + the output arguments of the HetBlock's backward iteration function. Importantly, as of now the `hetoutput` + cannot be a function of time displaced values of the HetBlock's outputs but rather must be able to + be calculated from the outputs statically. e.g. The two asset HANK model example provided in the models + directory of sequence_jacobian has a hetoutput, `chi`, the adjustment costs for any initial level of assets + `a`, to any new level of assets `a'`. + """ + if self.hetoutput is not None and overwrite is False: + raise ValueError('Trying to attach hetoutput when one already exists!') + else: + if verbose: + if self.hetoutput is not None and overwrite is True: + print(f"Overwriting current hetoutput, {self.hetoutput.name} with new hetoutput," + f" {hetoutput.name}!") + else: + print(f"Added hetoutput {hetoutput.name} to the {self.back_step_fun.__name__} HetBlock") + + self.hetoutput = hetoutput + self.hetoutput_inputs = set(hetoutput.input_list) + self.hetoutput_outputs = set(hetoutput.output_list) + self.hetoutput_outputs_order = hetoutput.output_list + + # Modify the HetBlock's inputs to include additional inputs required for computing both the hetoutput + # and aggregating the hetoutput, but do not include: + # 1) objects computed within the HetBlock's backward iteration that enter into the hetoutput computation + # 2) objects computed within hetoutput that enter into hetoutput's aggregation (self.hetoutput.outputs) + # 3) D, the cross-sectional distribution of agents, which is used in the hetoutput aggregation + # but is computed after the backward iteration + self.inputs |= (self.hetoutput_inputs - self.hetinput_outputs - self.back_step_outputs - self.hetoutput_outputs - set("D")) + # Modify the HetBlock's outputs to include the aggregated hetoutputs + self.outputs |= set([o.capitalize() for o in self.hetoutput_outputs]) + + self.internal |= self.hetoutput_outputs + + '''Part 3: components of ss(): + - policy_ss : backward iteration to get steady-state policies and other outcomes + - dist_ss : forward iteration to get steady-state distribution and compute aggregates + ''' + + def policy_ss(self, ssin, tol=1E-8, maxit=5000): + """Find steady-state policies and backward variables through backward iteration until convergence. + + Parameters + ---------- + ssin : dict + all steady-state inputs to back_step_fun, including seed values for backward variables + tol : [optional] float + max diff between consecutive iterations of policy variables needed for convergence + maxit : [optional] int + maximum number of iterations, if 'tol' not reached by then, raise error + + Returns + ---------- + sspol : dict + all steady-state outputs of backward iteration, combined with inputs to backward iteration + """ + + # find initial values for backward iteration and account for hetinputs + original_ssin = ssin + ssin = self.make_inputs(ssin) + + old = {} + for it in range(maxit): + try: + # run and store results of backward iteration, which come as tuple, in dict + sspol = {k: v for k, v in zip(self.back_step_output_list, self.back_step_fun(**ssin))} + except KeyError as e: + print(f'Missing input {e} to {self.back_step_fun.__name__}!') + raise + + # only check convergence every 10 iterations for efficiency + if it % 10 == 1 and all(utils.optimized_routines.within_tolerance(sspol[k], old[k], tol) + for k in self.back_iter_vars): + break + + # update 'old' for comparison during next iteration, prepare 'ssin' as input for next iteration + old.update({k: sspol[k] for k in self.back_iter_vars}) + ssin.update({k + '_p': sspol[k] for k in self.back_iter_vars}) + else: + raise ValueError(f'No convergence of policy functions after {maxit} backward iterations!') + + # want to record inputs in ssin, but remove _p, add in hetinput inputs if there + for k in self.inputs_to_be_primed: + ssin[k] = ssin[k + '_p'] + del ssin[k + '_p'] + if self.hetinput is not None: + for k in self.hetinput_inputs: + if k in original_ssin: + ssin[k] = original_ssin[k] + return {**ssin, **sspol} + + def dist_ss(self, sspol, grid, tol=1E-10, maxit=100_000, D_seed=None): + """Find steady-state distribution through forward iteration until convergence. + + Parameters + ---------- + sspol : dict + steady-state policies on grid for all policy variables in self.policy + grid : dict + grids for all policy variables in self.policy + tol : [optional] float + absolute tolerance for max diff between consecutive iterations for distribution + maxit : [optional] int + maximum number of iterations, if 'tol' not reached by then, raise error + D_seed : [optional] array + initial seed for overall distribution + + Returns + ---------- + D : array + steady-state distribution + """ + # extract transition matrix for exogenous states + Pi = [sspol[k] for k in self.exogenous] + P = sspol[self.disc_policy] + + # first obtain initial distribution D + if D_seed is None: + # initialize at uniform distribution + D = np.ones_like(sspol[self.policy]) / sspol[self.policy].size + else: + D = D_seed + + # obtain interpolated policy rule for each dimension of endogenous policy + sspol_i, sspol_pi = utils.interpolate.interpolate_coord_robust(grid, sspol[self.policy]) + + # iterate until convergence by tol, or maxit + for it in range(maxit): + Dnew = self.forward_step(D, P, Pi, sspol_i, sspol_pi) + + # only check convergence every 10 iterations for efficiency + if it % 10 == 0 and utils.optimized_routines.within_tolerance(D, Dnew, tol): + break + D = Dnew + else: + raise ValueError(f'No convergence after {maxit} forward iterations!') + + return D + + '''Part 4: components of jac(), corresponding to *4 steps of fake news algorithm* in paper + - Step 1: backward_step_fakenews and backward_iteration_fakenews to get curlyYs and curlyDs + - Step 2: forward_iteration_fakenews to get curlyPs + - Step 3: build_F to get fake news matrix from curlyYs, curlyDs, curlyPs + - Step 4: J_from_F to get Jacobian from fake news matrix + ''' + + def shock_timing(self, input_shocked, D0, Pi_ss, P, ss_for_hetinput, h): + """Figure out the details of how the scalar shock feeds into back_step_fun. + + Main complication: shocks to Pi transmit via hetinput with a lead of 1. + """ + if self.hetinput is not None and input_shocked in self.hetinput_inputs: + # if input_shocked is an input to hetinput, take numerical diff to get response + din_dict = dict(zip(self.hetinput_outputs_order, + utils.differentiate.numerical_diff_symmetric(self.hetinput, ss_for_hetinput, + {input_shocked: 1}, h))) + + if all(k not in din_dict.keys() for k in self.exogenous): + # if Pi is not generated by hetinput, no work to be done + lead = 0 + dD3_direct = None + elif all(np.count_nonzero(din_dict[k]) == 0 for k in self.exogenous if k in din_dict): + # if Pi is generated by hetinput but input_shocked does not affect it, replace Pi with Pi_p + lead = 0 + dD3_direct = None + for k in self.exogenous: + if k in din_dict.keys(): + din_dict[k + '_p'] = din_dict.pop(k) + else: + # if Pi is generated by hetinput and input_shocked affects it, replace that with Pi_p at lead 1 + lead = 1 + Pi = [din_dict[k] if k in din_dict else Pi_ss[num] for num, k in enumerate(self.exogenous)] + dD2_direct = utils.forward_step.forward_step_exo(D0, Pi) + dD3_direct = utils.forward_step.forward_step_dpol(dD2_direct, P) + for k in self.exogenous: + if k in din_dict.keys(): + din_dict[k + '_p'] = din_dict.pop(k) + else: + # if input_shocked feeds directly into back_step_fun with lead 0, no work to be done + lead = 0 + din_dict = {input_shocked: 1} + dD3_direct = None + + return din_dict, lead, dD3_direct + + def backward_step_fakenews(self, din_dict, output_list, ssin_dict, ssout_list, + Dss, D2, P, Pi, sspol_i, sspol_pi, sspol_space, h=1E-4): + # 1. shock perturbs outputs + shocked_outputs = {k: v for k, v in zip(self.back_step_output_list, + utils.differentiate.numerical_diff(self.back_step_fun, + ssin_dict, din_dict, h, + ssout_list))} + dV = {k: shocked_outputs[k] for k in self.back_iter_vars} + + # 2. which affects the distribution tomorrow via the savings policy + pol_pi_shock = -shocked_outputs[self.policy] / sspol_space + if "delev_exante" in din_dict: + # include an additional term to account for the effect of a deleveraging shock affecting the grid + dx = np.zeros_like(sspol_pi) + dx[sspol_i == 0] = 1. + add_term = sspol_pi * dx / sspol_space + pol_pi_shock += add_term + dD3_p = self.forward_step_shock(Dss, sspol_i, pol_pi_shock, Pi, P) + + # 3. and the distribution today (and Dmid tomorrow) via the discrete choice + P_shock = shocked_outputs[self.disc_policy] + dD3 = utils.forward_step.forward_step_dpol(D2, P_shock) # s[0], z[0], a[-1] + + # 4. and the aggregate outcomes today (ignoring dD and dD_direct) + dY = {k: np.vdot(Dss, shocked_outputs[k]) for k in output_list} + + return dV, dD3, dD3_p, dY + + def backward_iteration_fakenews(self, input_shocked, output_list, ssin_dict, ssout_list, Dss, D0, D2, P, Pi, + sspol_i, sspol_pi, sspol_space, T, h=1E-4, ss_for_hetinput=None): + """Iterate policy steps backward T times for a single shock.""" + # map name of shocked input into a perturbation of the inputs of back_step_fun + din_dict, lead, dD_direct = self.shock_timing(input_shocked, D0, Pi.copy(), P, ss_for_hetinput, h) + + # contemporaneous response to unit scalar shock + dV, dD, dD_p, dY = self.backward_step_fakenews(din_dict, output_list, ssin_dict, ssout_list, + Dss, D2, P, Pi, sspol_i, sspol_pi, sspol_space, h=h) + + # infer dimensions from this and initialize empty arrays + dDs = np.empty((T,) + dD.shape) + dD_ps = np.empty((T,) + dD_p.shape) + dYs = {k: np.empty(T) for k in dY.keys()} + + # fill in current effect of shock (be careful to handle lead = 1) + dDs[:lead, ...], dD_ps[:lead, ...] = 0, 0 + dDs[lead, ...], dD_ps[lead, ...] = dD, dD_p + for k in dY.keys(): + dYs[k][:lead] = 0 + dYs[k][lead] = dY[k] + + # fill in anticipation effects + for t in range(lead + 1, T): + dV, dDs[t, ...], dD_ps[t, ...], dY = self.backward_step_fakenews({k + '_p': + v for k, v in dV.items()}, + output_list, ssin_dict, ssout_list, + Dss, D2, P, Pi, sspol_i, sspol_pi, + sspol_space, h) + + for k in dY.keys(): + dYs[k][t] = dY[k] + + return dYs, dDs, dD_ps, dD_direct + + def forward_iteration_fakenews(self, o_ss, Pi, P, pol_i_ss, pol_pi_ss, T): + """Iterate transpose forward T steps to get full set of curlyPs for a given outcome. + + Note we depart from definition in paper by applying the demeaning operator in addition to Lambda + at each step. This does not affect products with curlyD (which are the only way curlyPs enter + Jacobian) since perturbations to distribution always have mean zero. It has numerical benefits + since curlyPs now go to zero for high t (used in paper in proof of Proposition 1). + """ + curlyPs = np.empty((T,) + o_ss.shape) + curlyPs[0, ...] = utils.misc.demean(o_ss) + for t in range(1, T): + curlyPs[t, ...] = utils.misc.demean(self.forward_step_transpose(curlyPs[t - 1, ...], + P, Pi, pol_i_ss, pol_pi_ss)) + return curlyPs + + @staticmethod + def build_F(dYs, dD_ps, curlyPs, dD_direct, dDs): + T = dYs.shape[0] + F = np.empty((T, T)) + + # standard effect + F[0, :] = dYs + F[1:, :] = curlyPs[:T-1, ...].reshape((T-1, -1)) @ dD_ps.reshape((T, -1)).T + + # contemporaneous effect via discrete choice + if dDs is not None: + F += curlyPs.reshape((T, -1)) @ dDs.reshape((T, -1)).T + + # direct effect of shock + if dD_direct is not None: + F[:, 0] += curlyPs.reshape((T, -1)) @ dD_direct.ravel() + + return F + + @staticmethod + def J_from_F(F): + J = F.copy() + for t in range(1, J.shape[1]): + J[1:, t] += J[:-1, t - 1] + return J + + '''Part 5: helpers for .jac: preliminary processing''' + + def jac_prelim(self, ss): + """Helper that does preliminary processing of steady state for fake news algorithm. + + Parameters + ---------- + ss : dict, all steady-state info, intended to be from .ss() + + Returns + ---------- + ssin_dict : dict, ss vals of exactly the inputs needed by self.back_step_fun for backward step + D0 : array (nS, nZ, nA), distribution over s[-1], z[-1], a[-1] + ssout_list : tuple, what self.back_step_fun returns when given ssin_dict (not exactly the same + as steady-state numerically since SS convergence was to some tolerance threshold) + ss_for_hetinput : dict, ss vals of exactly the inputs needed by self.hetinput (if it exists) + sspol_i : dict, indices on lower bracketing gridpoint for all in self.policy + sspol_pi : dict, weights on lower bracketing gridpoint for all in self.policy + sspol_space : dict, space between lower and upper bracketing gridpoints for all in self.policy + """ + # preliminary a: obtain ss inputs and other info, run once to get baseline for numerical differentiation + ssin_dict = self.make_inputs(ss) + ssout_list = self.back_step_fun(**ssin_dict) + + ss_for_hetinput = None + if self.hetinput is not None: + ss_for_hetinput = {k: ss[k] for k in self.hetinput_inputs if k in ss} + + # preliminary b: get sparse representations of policy rules and distance between neighboring policy gridpoints + grid = ss[self.policy + '_grid'] + sspol_i, sspol_pi = utils.interpolate.interpolate_coord_robust(grid, ss.internal[self.name][self.policy]) + sspol_space = grid[sspol_i + 1] - grid[sspol_i] + + # preliminary c: get end-of-period distribution, need it when Pi is shocked + Pi = [ss.internal[self.name][k] for k in self.exogenous] + D = ss.internal[self.name]['D'] + D0 = utils.forward_step.forward_step_cpol(D, sspol_i, sspol_pi) + D2 = utils.forward_step.forward_step_exo(D0, Pi) + + Pss = ss.internal[self.name][self.disc_policy] + + toreturn = (ssin_dict, ssout_list, ss_for_hetinput, sspol_i, sspol_pi, sspol_space, D0, D2, Pi, Pss) + + return toreturn + + '''Part 6: helper to extract inputs and potentially process them through hetinput''' + + def make_inputs(self, back_step_inputs_dict): + """Extract from back_step_inputs_dict exactly the inputs needed for self.back_step_fun, + process stuff through self.hetinput first if it's there. + """ + input_dict = copy.deepcopy(back_step_inputs_dict) + + # TODO: This make_inputs function needs to be revisited since it creates inputs both for initial steady + # state computation as well as for Jacobian/impulse evaluation for HetBlocks, + # where in the former the hetinputs and value function have yet to be computed, + # whereas in the latter they have already been computed + # and hence do not need to be recomputed. There may be room to clean this function up a bit. + if isinstance(back_step_inputs_dict, SteadyStateDict): + input_dict = copy.deepcopy(back_step_inputs_dict.toplevel) + input_dict.update({k: v for k, v in back_step_inputs_dict.internal[self.name].items()}) + else: + # If this HetBlock has a hetinput, then we need to compute the outputs of the hetinput first and include + # them as inputs for self.back_step_fun + if self.hetinput is not None: + outputs_as_tuple = utils.misc.make_tuple(self.hetinput(**{k: input_dict[k] + for k in self.hetinput_inputs if k in input_dict})) + input_dict.update(dict(zip(self.hetinput_outputs_order, outputs_as_tuple))) + + # Check if there are entries in indict corresponding to self.inputs_to_be_primed. + # In particular, we are interested in knowing if an initial value + # for the backward iteration variable has been provided. + # If it has not been provided, then use self.backward_init to calculate the initial values. + if not self.inputs_to_be_primed.issubset(set(input_dict.keys())): + initial_value_input_args = [input_dict[arg_name] for arg_name in utils.misc.input_list(self.backward_init)] + input_dict.update(zip(utils.misc.output_list(self.backward_init), + utils.misc.make_tuple(self.backward_init(*initial_value_input_args)))) + + for i_p in self.inputs_to_be_primed: + input_dict[i_p + "_p"] = input_dict[i_p] + del input_dict[i_p] + + try: + return {k: input_dict[k] for k in self.back_step_inputs if k in input_dict} + except KeyError as e: + print(f'Missing backward variable or Markov matrix {e} for {self.back_step_fun.__name__}!') + raise + + '''Part 7: routines to do forward steps of different kinds, all wrap functions in utils''' + + + def forward_step(self, D3_prev, P, Pi, a_i, a_pi): + """Update distribution from (s[0], z[0], a[-1]) to (s[1], z[1], a[0])""" + # update with continuous policy of last period + D4 = utils.forward_step.forward_step_cpol(D3_prev, a_i, a_pi) + + # update with exogenous shocks today + D2 = utils.forward_step.forward_step_exo(D4, Pi) + + # update with discrete choice today + D3 = utils.forward_step.forward_step_dpol(D2, P) + return D3 + + def forward_step_shock(self, D0, pol_i, pol_pi_shock, Pi, P): + """Forward_step linearized wrt pol_pi.""" + D4 = utils.forward_step.forward_step_cpol_shock(D0, pol_i, pol_pi_shock) + D2 = utils.forward_step.forward_step_exo(D4, Pi) + D3 = utils.forward_step.forward_step_dpol(D2, P) + return D3 + + def forward_step_transpose(self, D, P, Pi, a_i, a_pi): + """Transpose of forward_step.""" + D1 = np.einsum('sza,xsza->xza', D, P) + D2 = np.einsum('xpa,zp->xza', D1, Pi[1]) + D3 = np.einsum('xza,sx->sza', D2, Pi[0]) + D4 = utils.forward_step.forward_step_cpol_transpose(D3, a_i, a_pi) + return D4 + + +def hetoutput(custom_aggregation=None): + def decorator(f): + return HetOutput(f, custom_aggregation=custom_aggregation) + return decorator + + +class HetOutput: + def __init__(self, f, custom_aggregation=None): + self.name = f.__name__ + self.f = f + self.eval_input_list = utils.misc.input_list(f) + + self.custom_aggregation = custom_aggregation + self.agg_input_list = [] if custom_aggregation is None else utils.misc.input_list(custom_aggregation) + + # We are distinguishing between the eval_input_list and agg_input_list because custom aggregation may require + # certain arguments that are not required for simply evaluating the hetoutput + self.input_list = list(set(self.eval_input_list).union(set(self.agg_input_list))) + self.output_list = utils.misc.output_list(f) + + def evaluate(self, arg_dict): + hetoutputs = dict(zip(self.output_list, utils.misc.make_tuple(self.f(*[arg_dict[i] for i + in self.eval_input_list])))) + return hetoutputs + + def aggregate(self, hetoutputs, D, custom_aggregation_args, mode="ss"): + if self.custom_aggregation is not None: + hetoutputs_w_std_aggregation = list(set(self.output_list) - + set([utils.misc.uncapitalize(o) for o + in utils.misc.output_list(self.custom_aggregation)])) + hetoutputs_w_custom_aggregation = list(set(self.output_list) - set(hetoutputs_w_std_aggregation)) + else: + hetoutputs_w_std_aggregation = self.output_list + hetoutputs_w_custom_aggregation = [] + + # TODO: May need to check if this works properly for td + if self.custom_aggregation is not None: + hetoutputs_w_custom_aggregation_args = dict(zip(hetoutputs_w_custom_aggregation, + [hetoutputs[i] for i in hetoutputs_w_custom_aggregation])) + custom_agg_inputs = {"D": D, **hetoutputs_w_custom_aggregation_args, **custom_aggregation_args} + custom_aggregates = dict(zip([o.capitalize() for o in hetoutputs_w_custom_aggregation], + utils.misc.make_tuple(self.custom_aggregation(*[custom_agg_inputs[i] for i + in self.agg_input_list])))) + else: + custom_aggregates = {} + + if mode == "ss": + std_aggregates = {o.capitalize(): np.vdot(D, hetoutputs[o]) for o in hetoutputs_w_std_aggregation} + elif mode == "td": + std_aggregates = {o.capitalize(): utils.optimized_routines.fast_aggregate(D, hetoutputs[o]) + for o in hetoutputs_w_std_aggregation} + else: + raise RuntimeError(f"Mode {mode} is not supported in HetOutput aggregation. Choose either 'ss' or 'td'") + + return {**std_aggregates, **custom_aggregates} diff --git a/src/sequence_jacobian/blocks/het_block.py b/src/sequence_jacobian/blocks/het_block.py index 259d4d8..2af7d55 100644 --- a/src/sequence_jacobian/blocks/het_block.py +++ b/src/sequence_jacobian/blocks/het_block.py @@ -3,10 +3,12 @@ import numpy as np from .support.impulse import ImpulseDict +from .support.bijection import Bijection from ..primitives import Block from .. import utilities as utils from ..steady_state.classes import SteadyStateDict from ..jacobian.classes import JacobianDict +from .support.bijection import Bijection from ..devtools.deprecate import rename_output_list_to_outputs from ..utilities.misc import verify_saved_jacobian @@ -55,6 +57,7 @@ def __init__(self, back_step_fun, exogenous, policy, backward, backward_init=Non Currently, we only support up to two policy variables. """ self.name = back_step_fun.__name__ + self.M = Bijection({}) # self.back_step_fun is one iteration of the backward step function pertaining to a given HetBlock. # i.e. the function pertaining to equation (14) in the paper: v_t = curlyV(v_{t+1}, X_t) @@ -145,44 +148,26 @@ def __repr__(self): """Nice string representation of HetBlock for printing to console""" if self.hetinput is not None: if self.hetoutput is not None: - return f"" else: - return f"" + return f"" else: - return f"" + return f"" '''Part 2: high-level routines, with first three called analogously to SimpleBlock counterparts - - ss : do backward and forward iteration until convergence to get complete steady state - - td : do backward and forward iteration up to T to compute dynamics given some shocks - - jac : compute jacobians of outputs with respect to shocked inputs, using fake news algorithm - - ajac : compute asymptotic columns of jacobians output by jac, also using fake news algorithm + - steady_state : do backward and forward iteration until convergence to get complete steady state + - impulse_nonlinear : do backward and forward iteration up to T to compute dynamics given some shocks + - impulse_linear : apply jacobians to compute linearized dynamics given some shocks + - jacobian : compute jacobians of outputs with respect to shocked inputs, using fake news algorithm - add_hetinput : add a hetinput to the HetBlock that first processes inputs through function hetinput - add_hetoutput: add a hetoutput to the HetBlock that is computed after the entire ss computation, or after - each backward iteration step in td + each backward iteration step in td, jacobian is not computed for these! ''' - # TODO: Deprecated methods, to be removed! - def ss(self, **kwargs): - warnings.warn("This method has been deprecated. Please invoke by calling .steady_state", DeprecationWarning) - return self.steady_state(kwargs) - - def td(self, ss, **kwargs): - warnings.warn("This method has been deprecated. Please invoke by calling .impulse_nonlinear", - DeprecationWarning) - return self.impulse_nonlinear(ss, **kwargs) - - def jac(self, ss, shock_list=None, T=None, **kwargs): - if shock_list is None: - shock_list = list(self.inputs) - warnings.warn("This method has been deprecated. Please invoke by calling .jacobian.\n" - "Also, note that the kwarg `shock_list` in .jacobian has been renamed to `shocked_vars`", - DeprecationWarning) - return self.jacobian(ss, shock_list, T, **kwargs) - - def steady_state(self, calibration, backward_tol=1E-8, backward_maxit=5000, - forward_tol=1E-10, forward_maxit=100_000, hetoutput=False): + def _steady_state(self, calibration, backward_tol=1E-8, backward_maxit=5000, + forward_tol=1E-10, forward_maxit=100_000): """Evaluate steady state HetBlock using keyword args for all inputs. Analog to SimpleBlock.ss. Parameters @@ -239,7 +224,7 @@ def steady_state(self, calibration, backward_tol=1E-8, backward_maxit=5000, aggregates = {o.capitalize(): np.vdot(D, sspol[o]) for o in self.non_back_iter_outputs} ss.update(aggregates) - if hetoutput and self.hetoutput is not None: + if self.hetoutput is not None: hetoutputs = self.hetoutput.evaluate(ss) aggregate_hetoutputs = self.hetoutput.aggregate(hetoutputs, D, ss, mode="ss") else: @@ -249,7 +234,7 @@ def steady_state(self, calibration, backward_tol=1E-8, backward_maxit=5000, return SteadyStateDict(ss, internal=self) - def impulse_nonlinear(self, ss, exogenous, monotonic=False, returnindividual=False, grid_paths=None): + def _impulse_nonlinear(self, ss, exogenous, monotonic=False, returnindividual=False, grid_paths=None): """Evaluate transitional dynamics for HetBlock given dynamic paths for inputs in kwargs, assuming that we start and end in steady state ss, and that all inputs not specified in kwargs are constant at their ss values. Analog to SimpleBlock.td. @@ -360,7 +345,7 @@ def impulse_nonlinear(self, ss, exogenous, monotonic=False, returnindividual=Fal else: return ImpulseDict({**aggregates, **aggregate_hetoutputs}) - ss - def impulse_linear(self, ss, exogenous, T=None, Js=None, **kwargs): + def _impulse_linear(self, ss, exogenous, T=None, Js=None, **kwargs): # infer T from exogenous, check that all shocks have same length shock_lengths = [x.shape[0] for x in exogenous.values()] if shock_lengths[1:] != shock_lengths[:-1]: @@ -369,7 +354,7 @@ def impulse_linear(self, ss, exogenous, T=None, Js=None, **kwargs): return ImpulseDict(self.jacobian(ss, list(exogenous.keys()), T=T, Js=Js, **kwargs).apply(exogenous)) - def jacobian(self, ss, exogenous=None, T=300, outputs=None, output_list=None, Js=None, h=1E-4): + def _jacobian(self, ss, exogenous=None, T=300, outputs=None, output_list=None, Js=None, h=1E-4): """Assemble nested dict of Jacobians of agg outputs vs. inputs, using fake news algorithm. Parameters diff --git a/src/sequence_jacobian/blocks/simple_block.py b/src/sequence_jacobian/blocks/simple_block.py index df43848..ae6ee2b 100644 --- a/src/sequence_jacobian/blocks/simple_block.py +++ b/src/sequence_jacobian/blocks/simple_block.py @@ -6,6 +6,7 @@ from .support.simple_displacement import ignore, Displace, AccumulatedDerivative from .support.impulse import ImpulseDict +from .support.bijection import Bijection from ..primitives import Block from ..steady_state.classes import SteadyStateDict from ..jacobian.classes import JacobianDict, SimpleSparse, ZeroMatrix @@ -40,35 +41,18 @@ def __init__(self, f): self.output_list = misc.output_list(f) self.inputs = set(self.input_list) self.outputs = set(self.output_list) + self.M = Bijection({}) def __repr__(self): - return f"" - - # TODO: Deprecated methods, to be removed! - def ss(self, **kwargs): - warnings.warn("This method has been deprecated. Please invoke by calling .steady_state", DeprecationWarning) - return self.steady_state(kwargs) - - def td(self, ss, **kwargs): - warnings.warn("This method has been deprecated. Please invoke by calling .impulse_nonlinear", - DeprecationWarning) - return self.impulse_nonlinear(ss, exogenous=kwargs) - - def jac(self, ss, T=None, shock_list=None): - if shock_list is None: - shock_list = [] - warnings.warn("This method has been deprecated. Please invoke by calling .jacobian.\n" - "Also, note that the kwarg `shock_list` in .jacobian has been renamed to `shocked_vars`", - DeprecationWarning) - return self.jacobian(ss, exogenous=shock_list, T=T) - - def steady_state(self, calibration): + return f"" + + def _steady_state(self, calibration): input_args = {k: ignore(v) for k, v in calibration.items() if k in misc.input_list(self.f)} output_vars = [misc.numeric_primitive(o) for o in self.f(**input_args)] if len(self.output_list) > 1 else [ misc.numeric_primitive(self.f(**input_args))] return SteadyStateDict({**calibration, **dict(zip(self.output_list, output_vars))}) - def impulse_nonlinear(self, ss, exogenous): + def _impulse_nonlinear(self, ss, exogenous): input_args = {} for k, v in exogenous.items(): if np.isscalar(v): @@ -81,10 +65,10 @@ def impulse_nonlinear(self, ss, exogenous): return ImpulseDict(make_impulse_uniform_length(self.f(**input_args), self.output_list)) - ss - def impulse_linear(self, ss, exogenous, T=None, Js=None): + def _impulse_linear(self, ss, exogenous, T=None, Js=None): return ImpulseDict(self.jacobian(ss, exogenous=list(exogenous.keys()), T=T, Js=Js).apply(exogenous)) - def jacobian(self, ss, exogenous=None, T=None, Js=None): + def _jacobian(self, ss, exogenous=None, T=None, Js=None): """Assemble nested dict of Jacobians Parameters diff --git a/src/sequence_jacobian/blocks/solved_block.py b/src/sequence_jacobian/blocks/solved_block.py index fc05109..f2e1217 100644 --- a/src/sequence_jacobian/blocks/solved_block.py +++ b/src/sequence_jacobian/blocks/solved_block.py @@ -3,6 +3,7 @@ from ..primitives import Block from ..blocks.simple_block import simple from ..utilities import graph +from .support.bijection import Bijection from ..jacobian.classes import JacobianDict @@ -41,6 +42,7 @@ class SolvedBlock(Block): def __init__(self, blocks, name, unknowns, targets, solver=None, solver_kwargs={}): # Store the actual blocks in ._blocks_unsorted, and use .blocks_w_helpers and .blocks to index from there. self._blocks_unsorted = blocks + self.M = Bijection({}) # don't inherit membrane from parent blocks (think more about this later) # Upon instantiation, we only have enough information to conduct a sort ignoring HelperBlocks # since we need a `calibration` to resolve cyclic dependencies when including HelperBlocks in a topological sort @@ -68,26 +70,8 @@ def __init__(self, blocks, name, unknowns, targets, solver=None, solver_kwargs={ def __repr__(self): return f"" - # TODO: Deprecated methods, to be removed! - def ss(self, **kwargs): - warnings.warn("This method has been deprecated. Please invoke by calling .steady_state", DeprecationWarning) - return self.steady_state(kwargs) - - def td(self, ss, **kwargs): - warnings.warn("This method has been deprecated. Please invoke by calling .impulse_nonlinear", - DeprecationWarning) - return self.impulse_nonlinear(ss, **kwargs) - - def jac(self, ss, T=None, shock_list=None, **kwargs): - if shock_list is None: - shock_list = [] - warnings.warn("This method has been deprecated. Please invoke by calling .jacobian.\n" - "Also, note that the kwarg `shock_list` in .jacobian has been renamed to `shocked_vars`", - DeprecationWarning) - return self.jacobian(ss, shock_list, T, **kwargs) - - def steady_state(self, calibration, unknowns=None, helper_blocks=None, solver=None, - consistency_check=False, ttol=1e-9, ctol=1e-9, verbose=False): + def _steady_state(self, calibration, unknowns=None, helper_blocks=None, solver=None, + consistency_check=False, ttol=1e-9, ctol=1e-9, verbose=False): # If this is the first time invoking steady_state/solve_steady_state, cache the sorted indices # accounting for HelperBlocks if self._sorted_indices_w_helpers is None: @@ -105,18 +89,18 @@ def steady_state(self, calibration, unknowns=None, helper_blocks=None, solver=No return super().solve_steady_state(calibration, unknowns, self.targets, solver=solver, consistency_check=consistency_check, ttol=ttol, ctol=ctol, verbose=verbose) - def impulse_nonlinear(self, ss, exogenous=None, monotonic=False, Js=None, returnindividual=False, verbose=False): + def _impulse_nonlinear(self, ss, exogenous=None, monotonic=False, Js=None, returnindividual=False, verbose=False): return super().solve_impulse_nonlinear(ss, exogenous=exogenous, unknowns=list(self.unknowns.keys()), Js=Js, targets=self.targets if isinstance(self.targets, list) else list(self.targets.keys()), monotonic=monotonic, returnindividual=returnindividual, verbose=verbose) - def impulse_linear(self, ss, exogenous, T=None, Js=None): + def _impulse_linear(self, ss, exogenous, T=None, Js=None): return super().solve_impulse_linear(ss, exogenous=exogenous, unknowns=list(self.unknowns.keys()), targets=self.targets if isinstance(self.targets, list) else list(self.targets.keys()), T=T, Js=Js) - def jacobian(self, ss, exogenous=None, T=300, outputs=None, Js=None): + def _jacobian(self, ss, exogenous=None, T=300, outputs=None, Js=None): if exogenous is None: exogenous = list(self.inputs) if outputs is None: diff --git a/src/sequence_jacobian/blocks/support/bijection.py b/src/sequence_jacobian/blocks/support/bijection.py new file mode 100644 index 0000000..834dac8 --- /dev/null +++ b/src/sequence_jacobian/blocks/support/bijection.py @@ -0,0 +1,58 @@ +class Bijection: + def __init__(self, map): + # identity always implicit, remove if there explicitly + self.map = {k: v for k, v in map.items() if k != v} + invmap = {} + for k, v in map.items(): + if v in invmap: + raise ValueError(f'Duplicate value {v}, for keys {invmap[v]} and {k}') + invmap[v] = k + self.invmap = invmap + + @property + def inv(self): + invmap = Bijection.__new__(Bijection) # better way to do this? + invmap.map = self.invmap + invmap.invmap = self.map + return invmap + + def __repr__(self): + return f'Bijection({repr(self.map)})' + + def __getitem__(self, k): + return self.map.get(k, k) + + def __matmul__(self, x): + if isinstance(x, Bijection): + # compose self: v -> u with x: w -> v + # assume everything missing in either is the identity + M = {} + for v, u in self.map.items(): + w = x.invmap.get(v, v) + M[w] = u + for w, v in x.map.items(): + if v not in self.map: + M[w] = v + return Bijection(M) + elif isinstance(x, dict): + return {self[k]: v for k, v in x.items()} + elif isinstance(x, list): + return [self[k] for k in x] + elif isinstance(x, set): + return {self[k] for k in x} + elif isinstance(x, tuple): + return tuple(self[k] for k in x) + else: + return NotImplemented + + def __rmatmul__(self, x): + if isinstance(x, dict): + return {self[k]: v for k, v in x.items()} + elif isinstance(x, list): + return [self[k] for k in x] + elif isinstance(x, set): + return {self[k] for k in x} + elif isinstance(x, tuple): + return tuple(self[k] for k in x) + else: + return NotImplemented \ No newline at end of file diff --git a/src/sequence_jacobian/blocks/support/impulse.py b/src/sequence_jacobian/blocks/support/impulse.py index 4be8f93..345363f 100644 --- a/src/sequence_jacobian/blocks/support/impulse.py +++ b/src/sequence_jacobian/blocks/support/impulse.py @@ -1,8 +1,10 @@ """ImpulseDict class for manipulating impulse responses.""" import numpy as np +from copy import deepcopy from ...steady_state.classes import SteadyStateDict +from .bijection import Bijection class ImpulseDict: @@ -33,37 +35,61 @@ def __getitem__(self, item): if isinstance(item, str): # Case 1: ImpulseDict['C'] returns array return self.impulse[item] - if isinstance(item, list): + elif isinstance(item, list): # Case 2: ImpulseDict[['C']] or ImpulseDict[['C', 'Y']] return smaller ImpulseDicts return type(self)({k: self.impulse[k] for k in item}) + else: + ValueError("Use ImpulseDict['X'] to return an array or ImpulseDict[['X']] to return a smaller ImpulseDict.") def __add__(self, other): if isinstance(other, (float, int)): return type(self)({k: v + other for k, v in self.impulse.items()}) - if isinstance(other, SteadyStateDict): + elif isinstance(other, SteadyStateDict): return type(self)({k: v + other[k] for k, v in self.impulse.items()}) + else: + NotImplementedError('Only a number or a SteadyStateDict can be added from an ImpulseDict.') def __sub__(self, other): if isinstance(other, (float, int)): return type(self)({k: v - other for k, v in self.impulse.items()}) - if isinstance(other, (SteadyStateDict, ImpulseDict)): + elif isinstance(other, (SteadyStateDict, ImpulseDict)): return type(self)({k: v - other[k] for k, v in self.impulse.items()}) + else: + NotImplementedError('Only a number or a SteadyStateDict can be subtracted from an ImpulseDict.') def __mul__(self, other): if isinstance(other, (float, int)): return type(self)({k: v * other for k, v in self.impulse.items()}) - if isinstance(other, SteadyStateDict): + elif isinstance(other, SteadyStateDict): return type(self)({k: v * other[k] for k, v in self.impulse.items()}) + else: + NotImplementedError('An ImpulseDict can only be multiplied by a number or a SteadyStateDict.') def __rmul__(self, other): if isinstance(other, (float, int)): return type(self)({k: v * other for k, v in self.impulse.items()}) - if isinstance(other, SteadyStateDict): + elif isinstance(other, SteadyStateDict): return type(self)({k: v * other[k] for k, v in self.impulse.items()}) + else: + NotImplementedError('An ImpulseDict can only be multiplied by a number or a SteadyStateDict.') def __truediv__(self, other): if isinstance(other, (float, int)): return type(self)({k: v / other for k, v in self.impulse.items()}) # ImpulseDict[['C, 'Y']] / ss[['C', 'Y']]: matches steady states; don't divide by zero - if isinstance(other, SteadyStateDict): + elif isinstance(other, SteadyStateDict): return type(self)({k: v / other[k] if not np.isclose(other[k], 0) else v for k, v in self.impulse.items()}) + else: + NotImplementedError('An ImpulseDict can only be divided by a number or a SteadyStateDict.') + + def __matmul__(self, x): + # remap keys in toplevel + if isinstance(x, Bijection): + new = deepcopy(self) + new.impulse = x @ self.impulse + return new + else: + NotImplemented + + def __rmatmul__(self, x): + return self.__matmul__(x) diff --git a/src/sequence_jacobian/jacobian/classes.py b/src/sequence_jacobian/jacobian/classes.py index b522f7d..d5e1cdc 100644 --- a/src/sequence_jacobian/jacobian/classes.py +++ b/src/sequence_jacobian/jacobian/classes.py @@ -5,6 +5,7 @@ import numpy as np from . import support +from ..blocks.support.bijection import Bijection class Jacobian(metaclass=ABCMeta): @@ -385,9 +386,18 @@ def addinputs(self): def __matmul__(self, x): if isinstance(x, JacobianDict): return self.compose(x) + elif isinstance(x, Bijection): + nesteddict = x @ self.nesteddict + for o in nesteddict.keys(): + nesteddict[o] = x @ nesteddict[o] + return JacobianDict(nesteddict, inputs=x @ self.inputs, outputs=x @ self.outputs) else: return self.apply(x) + def __rmatmul__(self, x): + if isinstance(x, Bijection): + return JacobianDict(x @ self.nesteddict, inputs=x @ self.inputs, outputs=x @ self.outputs) + def __bool__(self): return bool(self.outputs) and bool(self.inputs) diff --git a/src/sequence_jacobian/models/hank.py b/src/sequence_jacobian/models/hank.py index 8ee9576..a38df0e 100644 --- a/src/sequence_jacobian/models/hank.py +++ b/src/sequence_jacobian/models/hank.py @@ -190,9 +190,8 @@ def hank_ss(beta_guess=0.986, vphi_guess=0.8, r=0.005, eis=0.5, frisch=0.5, mu=1 Tax = r * B T = transfers(pi_e, Div, Tax, e_grid) - # initialize guess for policy function iteration - coh = (1 + r) * a_grid[np.newaxis, :] + w * e_grid[:, np.newaxis] + T[:, np.newaxis] - Va = (1 + r) * (0.1 * coh) ** (-1 / eis) + calibration = {'Pi': Pi, 'a_grid': a_grid, 'e_grid': e_grid, 'pi_e': pi_e, 'w': w, 'r': r, 'eis': eis, 'Div': Div, + 'Tax': Tax, 'frisch': frisch} # residual function def res(x): @@ -200,16 +199,18 @@ def res(x): # precompute constrained c and n which don't depend on Va if beta_loc > 0.999 / (1 + r) or vphi_loc < 0.001: raise ValueError('Clearly invalid inputs') - out = household.ss(Va=Va, Pi=Pi, a_grid=a_grid, e_grid=e_grid, pi_e=pi_e, w=w, r=r, beta=beta_loc, - eis=eis, Div=Div, Tax=Tax, frisch=frisch, vphi=vphi_loc) + + calibration['beta'], calibration['vphi'] = beta_loc, vphi_loc + out = household.steady_state(calibration) + return np.array([out['A'] - B, out['N_e'] - 1]) # solve for beta, vphi (beta, vphi), _ = utils.solvers.broyden_solver(res, np.array([beta_guess, vphi_guess]), verbose=False) + calibration['beta'], calibration['vphi'] = beta, vphi # extra evaluation for reporting - ss = household.ss(Va=Va, Pi=Pi, a_grid=a_grid, e_grid=e_grid, pi_e=pi_e, w=w, r=r, beta=beta, eis=eis, - Div=Div, Tax=Tax, frisch=frisch, vphi=vphi) + ss = household.steady_state(calibration) # check Walras's law goods_mkt = 1 - ss['C'] diff --git a/src/sequence_jacobian/models/krusell_smith.py b/src/sequence_jacobian/models/krusell_smith.py index 82e4559..fddd2cd 100644 --- a/src/sequence_jacobian/models/krusell_smith.py +++ b/src/sequence_jacobian/models/krusell_smith.py @@ -4,6 +4,7 @@ from .. import utilities as utils from ..blocks.simple_block import simple from ..blocks.het_block import het +from ..steady_state.classes import SteadyStateDict '''Part 1: HA block''' @@ -77,14 +78,12 @@ def asset_state_vars(amax, nA): @simple -def firm_steady_state_solution(r, delta, alpha): +def firm_steady_state_solution(r, Y, L, delta, alpha): rk = r + delta - Z = (rk / alpha) ** alpha # normalize so that Y=1 - K = (alpha * Z / rk) ** (1 / (1 - alpha)) - Y = Z * K ** alpha - w = (1 - alpha) * Z * (alpha * Z / rk) ** (alpha / (1 - alpha)) - - return Z, K, Y, w + w = (1 - alpha) * Y / L + K = alpha * Y / rk + Z = Y / K ** alpha / L ** (1 - alpha) + return w, K, Z '''Part 3: Steady state''' @@ -104,22 +103,35 @@ def ks_ss(lb=0.98, ub=0.999, r=0.01, eis=1, delta=0.025, alpha=0.11, rho=0.966, Y = Z * K ** alpha w = (1 - alpha) * Z * (alpha * Z / rk) ** (alpha / (1 - alpha)) - # figure out initializer - coh = (1 + r) * a_grid[np.newaxis, :] + w * e_grid[:, np.newaxis] - Va = (1 + r) * (0.1 * coh) ** (-1 / eis) + calibration = {'Pi': Pi, 'a_grid': a_grid, 'e_grid': e_grid, 'r': r, 'w': w, 'eis': eis} # solve for beta consistent with this beta_min = lb / (1 + r) beta_max = ub / (1 + r) - beta, sol = opt.brentq(lambda bet: household.ss(Pi=Pi, a_grid=a_grid, e_grid=e_grid, r=r, w=w, beta=bet, eis=eis, - Va=Va)['A'] - K, beta_min, beta_max, full_output=True) + def res(beta_loc): + calibration['beta'] = beta_loc + return household.steady_state(calibration)['A'] - K + + beta, sol = opt.brentq(res, beta_min, beta_max, full_output=True) + calibration['beta'] = beta + if not sol.converged: raise ValueError('Steady-state solver did not converge.') # extra evaluation to report variables - ss = household.ss(Pi=Pi, a_grid=a_grid, e_grid=e_grid, r=r, w=w, beta=beta, eis=eis, Va=Va) - ss.update({'Pi': Pi, 'Z': Z, 'K': K, 'L': 1, 'Y': Y, 'alpha': alpha, 'delta': delta, + ss = household.steady_state(calibration) + ss.update({'Z': Z, 'K': K, 'L': 1.0, 'Y': Y, 'alpha': alpha, 'delta': delta, 'Pi': Pi, 'goods_mkt': Y - ss['C'] - delta * K, 'nA': nA, 'amax': amax, 'sigma': sigma, - 'rho': rho, 'nS': nS, 'asset_mkt': ss["A"] - K}) + 'rho': rho, 'nS': nS, 'asset_mkt': ss['A'] - K}) return ss + + +'''Part 4: Permanent beta heterogeneity''' + + +@simple +def aggregate(A_patient, A_impatient, C_patient, C_impatient, mass_patient): + C = mass_patient * C_patient + (1 - mass_patient) * C_impatient + A = mass_patient * A_patient + (1 - mass_patient) * A_impatient + return C, A diff --git a/src/sequence_jacobian/models/two_asset.py b/src/sequence_jacobian/models/two_asset.py index 403bdbe..9948f1c 100644 --- a/src/sequence_jacobian/models/two_asset.py +++ b/src/sequence_jacobian/models/two_asset.py @@ -359,26 +359,27 @@ def two_asset_ss(beta_guess=0.976, chi1_guess=6.5, r=0.0125, tot_wealth=14, K=10 rb = r - omega # figure out initializer - z_grid = income(e_grid, tax, w, 1) - Va = (0.6 + 1.1 * b_grid[:, np.newaxis] + a_grid) ** (-1 / eis) * np.ones((z_grid.shape[0], 1, 1)) - Vb = (0.5 + b_grid[:, np.newaxis] + 1.2 * a_grid) ** (-1 / eis) * np.ones((z_grid.shape[0], 1, 1)) + calibration = {'Pi': Pi, 'a_grid': a_grid, 'b_grid': b_grid, 'e_grid': e_grid, 'k_grid': k_grid, + 'N': 1.0, 'tax': tax, 'w': w, 'eis': eis, 'rb': rb, 'ra': ra, 'chi0': chi0, 'chi2': chi2} # residual function def res(x): beta_loc, chi1_loc = x + if beta_loc > 0.999 / (1 + r) or chi1_loc < 0.5: raise ValueError('Clearly invalid inputs') - out = household.ss(Va=Va, Vb=Vb, Pi=Pi, a_grid=a_grid, b_grid=b_grid, N=1, tax=tax, w=w, e_grid=e_grid, - k_grid=k_grid, beta=beta_loc, eis=eis, rb=rb, ra=ra, chi0=chi0, chi1=chi1_loc, chi2=chi2) + + calibration['beta'], calibration['chi1'] = beta_loc, chi1_loc + out = household.steady_state(calibration) asset_mkt = out['A'] + out['B'] - p - Bg return np.array([asset_mkt, out['B'] - Bh]) # solve for beta, vphi, omega (beta, chi1), _ = utils.solvers.broyden_solver(res, np.array([beta_guess, chi1_guess]), verbose=verbose) + calibration['beta'], calibration['chi1'] = beta, chi1 - # extra evaluation to report variables - ss = household.ss(Va=Va, Vb=Vb, Pi=Pi, a_grid=a_grid, b_grid=b_grid, N=1, tax=tax, w=w, e_grid=e_grid, - k_grid=k_grid, beta=beta, eis=eis, rb=rb, ra=ra, chi0=chi0, chi1=chi1, chi2=chi2) + # extra evaluation for reporting + ss = household.steady_state(calibration) # other things of interest vphi = (1 - tax) * w * ss['U'] / muw diff --git a/src/sequence_jacobian/primitives.py b/src/sequence_jacobian/primitives.py index 15a2def..042e1c1 100644 --- a/src/sequence_jacobian/primitives.py +++ b/src/sequence_jacobian/primitives.py @@ -4,14 +4,16 @@ from abc import ABCMeta as NativeABCMeta from numbers import Real from typing import Any, Dict, Union, Tuple, Optional, List +from copy import deepcopy -from .steady_state.drivers import steady_state +from .steady_state.drivers import steady_state as ss from .steady_state.support import provide_solver_default from .nonlinear import td_solve from .jacobian.drivers import get_impulse, get_G from .steady_state.classes import SteadyStateDict from .jacobian.classes import JacobianDict from .blocks.support.impulse import ImpulseDict +from .blocks.support.bijection import Bijection # Basic types Array = Any @@ -68,23 +70,27 @@ def inputs(self): def outputs(self): pass - # Typing information is purely to inform future user-developed `Block` sub-classes to enforce a canonical - # input and output argument structure - def steady_state(self, calibration: Dict[str, Union[Real, Array]], - **kwargs) -> SteadyStateDict: - raise NotImplementedError(f'{type(self)} does not implement .steady_state()') + def steady_state(self, calibration: SteadyStateDict, **kwargs) -> SteadyStateDict: + """Evaluate a partial equilibrium steady state of Block given a `calibration`.""" + return self.M @ self._steady_state(self.M.inv @ calibration, **kwargs) - def impulse_nonlinear(self, ss: Dict[str, Union[Real, Array]], + def impulse_nonlinear(self, ss: SteadyStateDict, exogenous: Dict[str, Array], **kwargs) -> ImpulseDict: - raise NotImplementedError(f'{type(self)} does not implement .impulse_nonlinear()') + """Calculate a partial equilibrium, non-linear impulse response to a set of `exogenous` shocks + from a steady state `ss`.""" + return self.M @ self._impulse_nonlinear(self.M.inv @ ss, self.M.inv @ exogenous, **kwargs) - def impulse_linear(self, ss: Dict[str, Union[Real, Array]], + def impulse_linear(self, ss: SteadyStateDict, exogenous: Dict[str, Array], **kwargs) -> ImpulseDict: - raise NotImplementedError(f'{type(self)} does not implement .impulse_linear()') + """Calculate a partial equilibrium, linear impulse response to a set of `exogenous` shocks + from a steady state `ss`.""" + return self.M @ self._impulse_linear(self.M.inv @ ss, self.M.inv @ exogenous, **kwargs) - def jacobian(self, ss: Dict[str, Union[Real, Array]], exogenous: List[str] = None, - T: int = None, **kwargs) -> JacobianDict: - raise NotImplementedError(f'{type(self)} does not implement .jacobian()') + def jacobian(self, ss: SteadyStateDict, + exogenous: Dict[str, Array], + T: Optional[int] = None, **kwargs) -> JacobianDict: + """Calculate a partial equilibrium Jacobian to a set of `exogenous` shocks at a steady state `ss`.""" + return self.M @ self._jacobian(self.M.inv @ ss, self.M.inv @ exogenous, T=T, **kwargs) def solve_steady_state(self, calibration: Dict[str, Union[Real, Array]], unknowns: Dict[str, Union[Real, Tuple[Real, Real]]], @@ -95,7 +101,7 @@ def solve_steady_state(self, calibration: Dict[str, Union[Real, Array]], the target conditions that must hold in general equilibrium""" blocks = self.blocks if hasattr(self, "blocks") else [self] solver = solver if solver else provide_solver_default(unknowns) - return steady_state(blocks, calibration, unknowns, targets, solver=solver, **kwargs) + return ss(blocks, calibration, unknowns, targets, solver=solver, **kwargs) def solve_impulse_nonlinear(self, ss: Dict[str, Union[Real, Array]], exogenous: Dict[str, Array], @@ -135,3 +141,21 @@ def solve_jacobian(self, ss: Dict[str, Union[Real, Array]], variables to be solved for and the target conditions that must hold in general equilibrium""" blocks = self.blocks if hasattr(self, "blocks") else [self] return get_G(blocks, exogenous, unknowns, targets, T=T, ss=ss, Js=Js, **kwargs) + + def remap(self, map): + other = deepcopy(self) + other.M = self.M @ Bijection(map) + other.inputs = other.M @ self.inputs + other.outputs = other.M @ self.outputs + if hasattr(self, 'input_list'): + other.input_list = other.M @ self.input_list + if hasattr(self, 'output_list'): + other.output_list = other.M @ self.output_list + if hasattr(self, 'non_back_iter_outputs'): + other.non_back_iter_outputs = other.M @ self.non_back_iter_outputs + return other + + def rename(self, name): + renamed = deepcopy(self) + renamed.name = name + return renamed diff --git a/src/sequence_jacobian/steady_state/classes.py b/src/sequence_jacobian/steady_state/classes.py index cb8238f..c281b74 100644 --- a/src/sequence_jacobian/steady_state/classes.py +++ b/src/sequence_jacobian/steady_state/classes.py @@ -3,6 +3,7 @@ from copy import deepcopy from ..utilities.misc import dict_diff +from ..blocks.support.bijection import Bijection class SteadyStateDict: @@ -32,6 +33,18 @@ def __getitem__(self, k): def __setitem__(self, k, v): self.toplevel[k] = v + def __matmul__(self, x): + # remap keys in toplevel + if isinstance(x, Bijection): + new = deepcopy(self) + new.toplevel = x @ self.toplevel + return new + else: + NotImplemented + + def __rmatmul__(self, x): + return self.__matmul__(x) + def keys(self): return self.toplevel.keys() diff --git a/src/sequence_jacobian/steady_state/drivers.py b/src/sequence_jacobian/steady_state/drivers.py index dbe6e9c..c105270 100644 --- a/src/sequence_jacobian/steady_state/drivers.py +++ b/src/sequence_jacobian/steady_state/drivers.py @@ -145,20 +145,25 @@ def residual(targets_dict, unknown_keys, unknown_values, include_helpers=True, c def eval_block_ss(block, calibration, toplevel_unknowns=None, dissolve=None, **kwargs): """Evaluate the .ss method of a block, given a dictionary of potential arguments""" if toplevel_unknowns is None: - toplevel_unknowns = {} - block_unknowns_in_toplevel_unknowns = set(block.unknowns.keys()).issubset(set(toplevel_unknowns)) if hasattr(block, "unknowns") else False + toplevel_unknowns = [] + if dissolve is None: + dissolve = [] # Add the block's internal variables as inputs, if the block has an internal attribute - input_arg_dict = {**calibration.toplevel, **calibration.internal[block.name]} if block.name in calibration.internal else calibration.toplevel + if isinstance(calibration, dict): + input_arg_dict = deepcopy(calibration) + else: + input_arg_dict = {**calibration.toplevel, **calibration.internal[block.name]} if block.name in calibration.internal else calibration.toplevel # Bypass the behavior for SolvedBlocks to numerically solve for their unknowns and simply evaluate them # at the provided set of unknowns if included in dissolve. - valid_input_kwargs = misc.input_kwarg_list(block.steady_state) + valid_input_kwargs = misc.input_kwarg_list(block._steady_state) + block_unknowns_in_toplevel_unknowns = set(block.unknowns.keys()).issubset(set(toplevel_unknowns)) if hasattr(block, "unknowns") else False input_kwarg_dict = {k: v for k, v in kwargs.items() if k in valid_input_kwargs} if block in dissolve and "solver" in valid_input_kwargs: input_kwarg_dict["solver"] = "solved" input_kwarg_dict["unknowns"] = {k: v for k, v in calibration.items() if k in block.unknowns} - elif block.name not in dissolve and block_unknowns_in_toplevel_unknowns: + elif block not in dissolve and block_unknowns_in_toplevel_unknowns: raise RuntimeError(f"The block '{block.name}' is not in the kwarg `dissolve` but its unknowns," f" {set(block.unknowns.keys())} are a subset of the top-level unknowns," f" {set(toplevel_unknowns)}.\n" diff --git a/src/sequence_jacobian/utilities/forward_step.py b/src/sequence_jacobian/utilities/forward_step.py index a80b7bf..50d96db 100644 --- a/src/sequence_jacobian/utilities/forward_step.py +++ b/src/sequence_jacobian/utilities/forward_step.py @@ -171,3 +171,73 @@ def forward_step_transpose_endo_2d(D, x_i, y_i, x_pi, y_pi): (1-alpha) * beta * D[iz, ixp+1, iyp] + (1-alpha) * (1-beta) * D[iz, ixp+1, iyp+1]) return Dnew + + +''' +For HetDC block. + +D0 : s[-1], z[-1], a[-1] (end of last period) +D1 : x[0], z[-1], a[-1] (employment shock) +D2 : x[0], z[0], a[-1] (productivity shock) +D3 : s[0], z[0], a[-1] (discrete choice) REFERENCE STAGE +D4 : s[0], z[0], a[0] (cont choice) + +''' + + +def forward_step_exo(D0, Pi): + """Update distribution s[-1], z[-1], a[-1] to x[0], z[0], a[-1].""" + D1 = np.einsum('sza,sx->xza', D0, Pi[0]) # s[-1] -> x[0] + D2 = np.einsum('xza,zp->xpa', D1, Pi[1]) # z[-1] -> z[0] + return D2 + + +def forward_step_dpol(D2, P): + """Update distribution x[0], z[0], a[-1] to s[0], z[0], a[-1].""" + D3 = np.einsum('xza,xsza->sza', D2, P) + return D3 + + +@njit +def forward_step_cpol(D3, a_i, a_pi): + """Update distribution s[0], z[0], a[-1] to s[0], z[0], a[0].""" + nS, nZ, nA = D3.shape + D4 = np.zeros_like(D3) + for iw in range(nS): + for iz in range(nZ): + for ia in range(nA): + i = a_i[iw, iz, ia] + pi = a_pi[iw, iz, ia] + d = D3[iw, iz, ia] + D4[iw, iz, i] += d * pi + D4[iw, iz, i+1] += d * (1 - pi) + return D4 + + +@njit +def forward_step_cpol_shock(D3, a_i_ss, a_pi_shock): + """forward_step_cpol linearized wrt a_pi""" + nS, nZ, nA = D3.shape + dD4 = np.zeros_like(D3) + for iw in range(nS): + for iz in range(nZ): + for ia in range(nA): + i = a_i_ss[iw, iz, ia] + dshock = a_pi_shock[iw, iz, ia] * D3[iw, iz, ia] + dD4[iw, iz, i] += dshock + dD4[iw, iz, i + 1] -= dshock + return dD4 + + +@njit +def forward_step_cpol_transpose(D3, a_i, a_pi): + """Transpose of forward_step_cpol""" + nS, nZ, nA = D3.shape + D4 = np.zeros_like(D3) + for iw in range(nS): + for iz in range(nZ): + for ia in range(nA): + i = a_i[iw, iz, ia] + pi = a_pi[iw, iz, ia] + D4[iw, iz, ia] = pi * D3[iw, iz, i] + (1 - pi) * D3[iw, iz, i + 1] + return D4 diff --git a/src/sequence_jacobian/utilities/misc.py b/src/sequence_jacobian/utilities/misc.py index cf47e7b..8ff9ff0 100644 --- a/src/sequence_jacobian/utilities/misc.py +++ b/src/sequence_jacobian/utilities/misc.py @@ -173,3 +173,40 @@ def verify_saved_jacobian(block_name, Js, outputs, inputs, T): return False return True + + +'''Tools for taste shocks used in discrete choice problems''' + + +def choice_prob(vfun, lam): + """ + Logit choice probability of choosing along first axis. + + Parameters + ---------- + vfun : array(Ns, Nz, Na): discrete choice specific value function + lam : float, scale of taste shock + + Returns + ------- + prob : array (Ns, Nz, nA): choice probability + """ + # rescale values for numeric robustness + vmax = np.max(vfun, axis=0) + vfun_norm = vfun - vmax + + # apply formula (could be njitted in separate function) + P = np.exp(vfun_norm / lam) / np.sum(np.exp(vfun_norm / lam), axis=0) + return P + + +def logsum(vfun, lam): + """Logsum formula for expected continuation value.""" + + # rescale values for numeric robustness + vmax = np.max(vfun, axis=0) + vfun_norm = vfun - vmax + + # apply formula (could be njitted in separate function) + VE = vmax + lam * np.log(np.sum(np.exp(vfun_norm / lam), axis=0)) + return VE diff --git a/tests/base/test_jacobian.py b/tests/base/test_jacobian.py index e097bb9..6bd4a1d 100644 --- a/tests/base/test_jacobian.py +++ b/tests/base/test_jacobian.py @@ -15,8 +15,8 @@ def test_ks_jac(krusell_smith_dag): G2 = ks_model.solve_jacobian(ss, exogenous, unknowns, targets, T=T) # Manually calculate the general equilibrium Jacobian - J_firm = firm.jac(ss, shock_list=['K', 'Z']) - J_ha = household.jac(ss, T=T, shock_list=['r', 'w']) + J_firm = firm.jacobian(ss, exogenous=['K', 'Z']) + J_ha = household.jacobian(ss, T=T, exogenous=['r', 'w']) J_curlyK_K = J_ha['A']['r'] @ J_firm['r']['K'] + J_ha['A']['w'] @ J_firm['w']['K'] J_curlyK_Z = J_ha['A']['r'] @ J_firm['r']['Z'] + J_ha['A']['w'] @ J_firm['w']['Z'] J_curlyK = {'curlyK': {'K': J_curlyK_K, 'Z': J_curlyK_Z}} @@ -62,8 +62,8 @@ def test_fake_news_v_actual(one_asset_hank_dag): household = hank_model._blocks_unsorted[0] T = 40 - shock_list = ['w', 'r', 'Div', 'Tax'] - Js = household.jac(ss, shock_list, T) + exogenous = ['w', 'r', 'Div', 'Tax'] + Js = household.jacobian(ss, exogenous, T) output_list = household.non_back_iter_outputs # Preliminary processing of the steady state @@ -72,7 +72,7 @@ def test_fake_news_v_actual(one_asset_hank_dag): # Step 1 of fake news algorithm: backward iteration h = 1E-4 curlyYs, curlyDs = {}, {} - for i in shock_list: + for i in exogenous: curlyYs[i], curlyDs[i] = household.backward_iteration_fakenews(i, output_list, ssin_dict, ssout_list, ss.internal["household"]['D'], Pi.T.copy(), sspol_i, sspol_pi, sspol_space, @@ -94,7 +94,7 @@ def test_fake_news_v_actual(one_asset_hank_dag): # Step 3 of fake news algorithm: combine everything to make the fake news matrix for each output-input pair Fs = {o.capitalize(): {} for o in output_list} for o in output_list: - for i in shock_list: + for i in exogenous: F = np.empty((T,T)) F[0, ...] = curlyYs[i][o] F[1:, ...] = curlyPs[o].reshape(T-1, -1) @ curlyDs[i].reshape(T, -1).T @@ -109,7 +109,7 @@ def test_fake_news_v_actual(one_asset_hank_dag): Js_original = Js Js = {o.capitalize(): {} for o in output_list} for o in output_list: - for i in shock_list: + for i in exogenous: # implement recursion (30): start with J=F and accumulate terms along diagonal J = Fs[o.capitalize()][i].copy() for t in range(1, J.shape[1]): @@ -117,7 +117,7 @@ def test_fake_news_v_actual(one_asset_hank_dag): Js[o.capitalize()][i] = J for o in output_list: - for i in shock_list: + for i in exogenous: assert np.array_equal(Js[o.capitalize()][i], Js_original[o.capitalize()][i]) @@ -126,23 +126,23 @@ def test_fake_news_v_direct_method(one_asset_hank_dag): household = hank_model._blocks_unsorted[0] T = 40 - shock_list = 'r' + exogenous = ['r'] output_list = household.non_back_iter_outputs h = 1E-4 - Js = household.jac(ss, shock_list, T) - Js_direct = {o.capitalize(): {i: np.empty((T, T)) for i in shock_list} for o in output_list} + Js = household.jacobian(ss, exogenous, T) + Js_direct = {o.capitalize(): {i: np.empty((T, T)) for i in exogenous} for o in output_list} # run td once without any shocks to get paths to subtract against # (better than subtracting by ss since ss not exact) # monotonic=True lets us know there is monotonicity of policy rule, makes TD run faster - # .td requires at least one input 'shock', so we put in steady-state w - td_noshock = household.td(ss, exogenous={"w": np.zeros(T)}, monotonic=True) + # .impulse_nonlinear requires at least one input 'shock', so we put in steady-state w + td_noshock = household.impulse_nonlinear(ss, exogenous={'w': np.zeros(T)}, monotonic=True) - for i in shock_list: + for i in exogenous: # simulate with respect to a shock at each date up to T for t in range(T): - td_out = household.td(ss, exogenous={i: h * (np.arange(T) == t)}) + td_out = household.impulse_nonlinear(ss, exogenous={i: h * (np.arange(T) == t)}) # store results as column t of J[o][i] for each outcome o for o in output_list: diff --git a/tests/base/test_public_classes.py b/tests/base/test_public_classes.py index 57a86e8..ceee3d3 100644 --- a/tests/base/test_public_classes.py +++ b/tests/base/test_public_classes.py @@ -1,10 +1,12 @@ """Test public-facing classes""" import numpy as np +import pytest from sequence_jacobian import het from sequence_jacobian.steady_state.classes import SteadyStateDict from sequence_jacobian.blocks.support.impulse import ImpulseDict +from sequence_jacobian.blocks.support.bijection import Bijection def test_steadystatedict(): @@ -72,3 +74,25 @@ def test_impulsedict(krusell_smith_dag): dC1 = 100 * ir_lin['C'] / ss['C'] dC2 = 100 * ir_lin[['C']] / ss assert np.allclose(dC1, dC2['C']) + + +def test_bijection(): + # generate and invert + mymap = Bijection({'a': 'a1', 'b': 'b1'}) + mymapinv = mymap.inv + assert mymap['a'] == 'a1' and mymap['b'] == 'b1' + assert mymapinv['a1'] == 'a' and mymapinv['b1'] == 'b' + + # duplicate keys rejected + with pytest.raises(ValueError): + Bijection({'a': 'a1', 'b': 'a1'}) + + # composition with another bijection (flows backwards) + mymap2 = Bijection({'a1': 'a2'}) + assert (mymap2 @ mymap)['a'] == 'a2' + + # composition with SteadyStateDict + ss = SteadyStateDict({'a': 2.0, 'b': 1.0}, internal={}) + ss_remapped = ss @ mymap + assert isinstance(ss_remapped, SteadyStateDict) + assert ss_remapped['a1'] == ss['a'] and ss_remapped['b1'] == ss['b'] diff --git a/tests/base/test_simple_block.py b/tests/base/test_simple_block.py index 6610dba..9c6fda4 100644 --- a/tests/base/test_simple_block.py +++ b/tests/base/test_simple_block.py @@ -38,15 +38,15 @@ def test_block_consistency(block, ss): """Make sure ss, td, and jac methods are all consistent with each other. Requires that all inputs of simple block allow calculating Jacobians""" # get ss output - ss_results = block.ss(**ss) + ss_results = block.steady_state(ss) # now if we put in constant inputs, td should give us the same! - td_results = block.td(ss_results, **{k: np.zeros(20) for k in ss.keys()}) + td_results = block.impulse_nonlinear(ss_results, exogenous={k: np.zeros(20) for k in ss.keys()}) for k, v in td_results.impulse.items(): assert np.all(v == 0) # now get the Jacobian - J = block.jac(ss, shock_list=block.input_list) + J = block.jacobian(ss, exogenous=block.input_list) # now perturb the steady state by small random vectors # and verify that the second-order numerical derivative implied by .td @@ -54,8 +54,8 @@ def test_block_consistency(block, ss): h = 1E-5 all_shocks = {i: np.random.rand(10) for i in block.input_list} - td_up = block.td(ss_results, **{i: h*shock for i, shock in all_shocks.items()}) - td_dn = block.td(ss_results, **{i: -h*shock for i, shock in all_shocks.items()}) + td_up = block.impulse_nonlinear(ss_results, exogenous={i: h*shock for i, shock in all_shocks.items()}) + td_dn = block.impulse_nonlinear(ss_results, exogenous={i: -h*shock for i, shock in all_shocks.items()}) linear_impulses = {o: (td_up.impulse[o] - td_dn.impulse[o])/(2*h) for o in td_up.impulse} linear_impulses_from_jac = {o: sum(J[o][i] @ all_shocks[i] for i in all_shocks if i in J[o]) for o in td_up.impulse} diff --git a/tests/base/test_steady_state.py b/tests/base/test_steady_state.py index 0a4c372..2a412e1 100644 --- a/tests/base/test_steady_state.py +++ b/tests/base/test_steady_state.py @@ -35,3 +35,9 @@ def test_two_asset_steady_state(two_asset_hank_dag): assert set(ss.keys()) == set(ss_ref.keys()) for k in ss.keys(): assert np.all(np.isclose(ss[k], ss_ref[k])) + + +def test_remap_steady_state(ks_remapped_dag): + _, _, _, _, ss = ks_remapped_dag + assert ss['beta_impatient'] < ss['beta_patient'] + assert ss['A_impatient'] < ss['A_patient'] diff --git a/tests/base/test_two_asset.py b/tests/base/test_two_asset.py index 1454e5c..94ce995 100644 --- a/tests/base/test_two_asset.py +++ b/tests/base/test_two_asset.py @@ -36,13 +36,11 @@ def hank_ss_singlerun(beta=0.976, r=0.0125, tot_wealth=14, K=10, delta=0.02, Bg= rb = r - omega # figure out initializer - z_grid = two_asset.income(e_grid, tax, w, 1) - Va = (0.6 + 1.1 * b_grid[:, np.newaxis] + a_grid) ** (-1 / eis) * np.ones((z_grid.shape[0], 1, 1)) - Vb = (0.5 + b_grid[:, np.newaxis] + 1.2 * a_grid) ** (-1 / eis) * np.ones((z_grid.shape[0], 1, 1)) + calibration = {'Pi': Pi, 'a_grid': a_grid, 'b_grid': b_grid, 'e_grid': e_grid, 'k_grid': k_grid, + 'beta': beta, 'N': 1.0, 'tax': tax, 'w': w, 'eis': eis, 'rb': rb, 'ra': ra, + 'chi0': chi0, 'chi1': chi1, 'chi2': chi2} - out = two_asset.household.ss(Va=Va, Vb=Vb, Pi=Pi, a_grid=a_grid, b_grid=b_grid, - N=1, tax=tax, w=w, e_grid=e_grid, k_grid=k_grid, beta=beta, - eis=eis, rb=rb, ra=ra, chi0=chi0, chi1=chi1, chi2=chi2) + out = two_asset.household.steady_state(calibration) return out['A'], out['B'], out['U'] diff --git a/tests/conftest.py b/tests/conftest.py index ab85a96..274d7d4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -98,3 +98,30 @@ def two_asset_hank_dag(): targets = ["asset_mkt", "fisher", "wnkpc"] return two_asset_model, exogenous, unknowns, targets, ss + + +@pytest.fixture(scope='session') +def ks_remapped_dag(): + # Create 2 versions of the household block using `remap` + to_map = ['beta', *krusell_smith.household.outputs] + hh_patient = krusell_smith.household.remap({k: k + '_patient' for k in to_map}).rename('hh_patient') + hh_impatient = krusell_smith.household.remap({k: k + '_impatient' for k in to_map}).rename('hh_impatient') + blocks = [hh_patient, hh_impatient, krusell_smith.firm, krusell_smith.mkt_clearing, krusell_smith.income_state_vars, + krusell_smith.asset_state_vars, krusell_smith.aggregate] + ks_remapped = create_model(blocks, name='Krusell-Smith') + + # Steady State + calibration = {'eis': 1., 'delta': 0.025, 'alpha': 0.3, 'rho': 0.966, 'sigma': 0.5, 'L': 1.0, + 'nS': 3, 'nA': 100, 'amax': 1000, 'beta_impatient': 0.985, 'mass_patient': 0.5} + unknowns_ss = {'beta_patient': (0.98 / 1.01, 0.999 / 1.01), 'Z': 0.5, 'K': 8.} + targets_ss = {'asset_mkt': 0., 'Y': 1., 'r': 0.01} + helper_blocks = [krusell_smith.firm_steady_state_solution] + ss = ks_remapped.solve_steady_state(calibration, unknowns_ss, targets_ss, solver='brentq', + helper_blocks=helper_blocks, helper_targets=['Y', 'r']) + + # Transitional Dynamics/Jacobian Calculation + exogenous = ["Z"] + unknowns = ["K"] + targets = ["asset_mkt"] + + return ks_remapped, exogenous, unknowns, targets, ss