diff --git a/ogjpn/demographics.py b/ogjpn/demographics.py index 47fbb60..1a51ae6 100644 --- a/ogjpn/demographics.py +++ b/ogjpn/demographics.py @@ -1,677 +1,208 @@ -""" -------------------------------------------------------------------------------- -Functions for generating demographic objects necessary for the OG-JPN model -This module contains the following functions: - get_un_fert_data() - get_un_mort_data() - get_wb_infmort_rate() - get_un_pop_data() - get_fert() - get_mort() - pop_rebin() - get_imm_resid() - immsolve() - get_pop_objs() - extrap_exp_3() - extrap_arctan_3() - ab_zero_eqs_exp_func() - b_zero_eq_artctan_func() -------------------------------------------------------------------------------- -""" # Import packages import os +import requests +from requests.exceptions import RequestException +import json import numpy as np -import pandas as pd import scipy.optimize as opt -from ogcore import parameter_plots as pp -from pandas_datareader import wb +import pandas as pd import matplotlib.pyplot as plt - +from matplotlib.pyplot import plot as pp +import urllib3 +import ssl + +class CustomHttpAdapter (requests.adapters.HTTPAdapter): + # "Transport adapter" that allows us to use custom ssl_context. + + def __init__(self, ssl_context=None, **kwargs): + self.ssl_context = ssl_context + super().__init__(**kwargs) + + def init_poolmanager(self, connections, maxsize, block=False): + self.poolmanager = urllib3.poolmanager.PoolManager( + num_pools=connections, maxsize=maxsize, + block=block, ssl_context=self.ssl_context) + +def get_legacy_session(): + ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) + ctx.options |= 0x4 # OP_LEGACY_SERVER_CONNECT + session = requests.session() + session.mount('https://', CustomHttpAdapter(ctx)) + return session + +locations = 392 # UN code for JPN # create output director for figures CUR_PATH = os.path.split(os.path.abspath(__file__))[0] -DATA_DIR = os.path.join(CUR_PATH, "data", "demographic") OUTPUT_DIR = os.path.join(CUR_PATH, "OUTPUT", "Demographics") if os.access(OUTPUT_DIR, os.F_OK) is False: os.makedirs(OUTPUT_DIR) + """ ------------------------------------------------------------------------ Define functions ------------------------------------------------------------------------ """ - - -def get_un_fert_data( - country_id: str = "392", - start_year: int = 2021, - end_year: int = None, - download: bool = True, -) -> pd.DataFrame: +def get_un_data(indicators, locations, start_year, end_year): """ - Get UN fertility rate data for a country for some range of years (at least - one year) and by age. The country_id=392 is for Japan. These data - come from the United Nations Data Portal API for UN population data (see + This function retrieves data from the United Nations Data Portal API + for UN population data (see https://population.un.org/dataportal/about/dataapi) Args: - country_id (str): 3-digit country id (numerical) - start_year (int): beginning year of the data - end_year (int or None): end year of the data - download (bool): whether to download the data from the UN Data Portal. - If False, a path must be specified in the path_folder argument. - path_folder (None or str): string path to folder where data are stored + variable_code (str): variable code for UN data + country_id (str): country id for UN data + start_year (int): start year for UN data + end_year (int): end year for UN data Returns: - fert_rates_df (DataFrame): dataset with fertility rates by age - """ - if end_year is None: - end_year = start_year - # UN variable code for Population by 1-year age groups and sex - pop_code = "47" - # UN variable code for Fertility rates by age of mother (1-year) - fert_code = "68" - - if download: - pop_target = ( - "https://population.un.org/dataportalapi/api/v1/data/indicators/" - + pop_code - + "/locations/" - + country_id - + "/start/" - + str(start_year) - + "/end/" - + str(end_year) - + "?format=csv" - ) - fert_target = ( - "https://population.un.org/dataportalapi/api/v1/data/indicators/" - + fert_code - + "/locations/" - + country_id - + "/start/" - + str(start_year) - + "/end/" - + str(end_year) - + "?format=csv" - ) - else: - pop_target = os.path.join(DATA_DIR, "un_jpn_pop.csv") - fert_target = os.path.join(DATA_DIR, "un_jpn_fert.csv") - - # Convert .csv file to Pandas DataFrame - pop_df = pd.read_csv( - pop_target, - sep="|", - header=1, - usecols=["TimeLabel", "SexId", "Sex", "AgeStart", "Value"], - float_precision="round_trip", - ) - fert_rates_df = pd.read_csv( - fert_target, - sep="|", - header=1, - usecols=["TimeLabel", "AgeStart", "Value"], - float_precision="round_trip", - ) - - # Rename variables in the population and fertility rates data - pop_df.rename( - columns={ - "TimeLabel": "year", - "SexId": "sex_num", - "Sex": "sex_str", - "AgeStart": "age", - "Value": "pop", - }, - inplace=True, - ) - fert_rates_df.rename( - columns={ - "TimeLabel": "year", - "AgeStart": "age", - "Value": "births_p_1000f", - }, - inplace=True, - ) - - # Merge in the male and female population by age data - fert_rates_df = fert_rates_df.merge( - pop_df[["year", "age", "pop"]][pop_df["sex_num"] == 1], - how="left", - on=["year", "age"], - ) - fert_rates_df.rename(columns={"pop": "pop_male"}, inplace=True) - fert_rates_df = fert_rates_df.merge( - pop_df[["year", "age", "pop"]][pop_df["sex_num"] == 2], - how="left", - on=["year", "age"], - ) - fert_rates_df.rename(columns={"pop": "pop_female"}, inplace=True) - fert_rates_df["fert_rate"] = fert_rates_df["births_p_1000f"] / ( - 1000 * (1 + (fert_rates_df["pop_male"] / fert_rates_df["pop_female"])) - ) - fert_rates_df = fert_rates_df[ - ( - (fert_rates_df["year"] >= start_year) - & (fert_rates_df["year"] <= end_year) - ) - ] - - return fert_rates_df - - -def get_un_mort_data( - country_id: str = "392", - start_year: int = 2021, - end_year: int = None, - download: bool = True, -) -> pd.DataFrame: + df (Pandas DataFrame): DataFrame of UN data """ - Get UN mortality rate data for a country for some range of years (at least - one year) and by age, and get infant mortality rate data. The - country_id=392 is for Japan. These data come from the United Nations - Population Data Portal API for UN population data (see - https://population.un.org/dataportal/about/dataapi) - Args: - country_id (str): 3-digit country id (numerical) - start_year (int): beginning year of the data - end_year (int or None): end year of the data - download (bool): whether to download the data from the UN Data Portal. - If False, a path must be specified in the path_folder argument. - path_folder (None or str): string path to folder where data are stored - Returns: - infmort_rate_df (DataFrame): dataset with infant mortality rates by yr - mort_rates_df(DataFrame): dataset with mortality rates by age - """ - if end_year is None: - end_year = start_year - # UN variable code for Population by 1-year age groups and sex - pop_code = "47" - # # UN variable code for Age specific mortality rate - # mort_code = "80" - # We use deaths and population to compute mortality rates rather than the - # mortality rates data so that we have the option to have totpers<100. - # UN variable code for Deaths by 1-year age groups - deaths_code = "69" - # UN variable code for Age specific mortality rate - infmort_code = "22" - - if download: - pop_target = ( - "https://population.un.org/dataportalapi/api/v1/data/indicators/" - + pop_code - + "/locations/" - + country_id - + "/start/" - + str(start_year) - + "/end/" - + str(end_year) - + "?format=csv" - ) - infmort_target = ( - "https://population.un.org/dataportalapi/api/v1/data/indicators/" - + infmort_code - + "/locations/" - + country_id - + "/start/" - + str(start_year) - + "/end/" - + str(end_year) - + "?format=csv" - ) - deaths_target = ( - "https://population.un.org/dataportalapi/api/v1/data/indicators/" - + deaths_code - + "/locations/" - + country_id - + "/start/" - + str(start_year) - + "/end/" - + str(end_year) - + "?format=csv" - ) - else: - pop_target = os.path.join(DATA_DIR, "un_jpn_pop.csv") - infmort_target = os.path.join(DATA_DIR, "un_jpn_infmort.csv") - deaths_target = os.path.join(DATA_DIR, "un_jpn_deaths.csv") - - # Convert .csv file to Pandas DataFrame - pop_df = pd.read_csv( - pop_target, - sep="|", - header=1, - usecols=["TimeLabel", "SexId", "Sex", "AgeStart", "Value"], - float_precision="round_trip", - ) - infmort_rate_df = pd.read_csv( - infmort_target, - sep="|", - header=1, - usecols=["TimeLabel", "SexId", "Sex", "Value"], - float_precision="round_trip", - ) - deaths_df = pd.read_csv( - deaths_target, - sep="|", - header=1, - usecols=["TimeLabel", "SexId", "Sex", "AgeStart", "Value"], - float_precision="round_trip", - ) - - # Rename variables in the population and mortality rates data - pop_df.rename( - columns={ - "TimeLabel": "year", - "SexId": "sex_num", - "Sex": "sex_str", - "AgeStart": "age", - "Value": "pop", - }, - inplace=True, - ) - infmort_rate_df.rename( - columns={ - "TimeLabel": "year", - "SexId": "sex_num", - "Sex": "sex_str", - "Value": "inf_deaths_p_1000", - }, - inplace=True, - ) - deaths_df.rename( - columns={ - "TimeLabel": "year", - "SexId": "sex_num", - "Sex": "sex_str", - "AgeStart": "age", - "Value": "deaths", - }, - inplace=True, - ) - - # Merge in the male and female population by age data to the deaths_df - deaths_df = deaths_df.merge( - pop_df, - how="left", - on=["year", "sex_num", "sex_str", "age"], - ) - deaths_df["mort_rate"] = deaths_df["deaths"] / deaths_df["pop"] - deaths_df = deaths_df[ - ((deaths_df["year"] >= start_year) & (deaths_df["year"] <= end_year)) - ] - mort_rates_df = deaths_df.copy() - - # Clean the data - infmort_rate_df["infmort_rate"] = ( - infmort_rate_df["inf_deaths_p_1000"] / 1000 - ) - - infmort_rate_df = infmort_rate_df[ - ( - (infmort_rate_df["year"] >= start_year) - & (infmort_rate_df["year"] <= end_year) - ) - ] - - return infmort_rate_df, mort_rates_df - - -def get_wb_infmort_rate( - country: str = "JPN", - start_year: int = 2020, - end_year: int = None, - download: bool = True, -) -> np.float64: - """ - Get World Bank infant mortality rate measure from neonatal mortality rate - (deaths per 1,000 live births, divided by 1,0000) - https://data.worldbank.org/indicator/SH.DYN.NMRT - Args: - country (str): 3-digit country id (alphabetic) - start_year (int): beginning year of the data - end_year (int or None): end year of the data - download (bool): whether to download the data from the UN Data Portal. - If False, a path must be specified in the path_folder argument. - Returns: - wb_infmort_rate (float): neonatal infant mortality rate - """ - if end_year is None: - end_year = start_year - if download: - wb_infmort_rate = ( - wb.download( - indicator="SH.DYN.NMRT", - country=country, - start=start_year, - end=end_year, - ).squeeze() - / 1000 - ) - else: - # Hard code the infant mortality rate for Japan from the most - # recent year (2020) - wb_infmort_rate = 1.564 - - return wb_infmort_rate - - -def get_un_pop_data( - country_id: str = "392", - start_year: int = 2021, - end_year: int = None, - download: bool = True, -) -> pd.DataFrame: - """ - Get UN population data for a country for some range of years (at least - one year) and by age. The country_id=392 is for Japan. These data - come from the United Nations Data Portal API for UN population data (see - https://population.un.org/dataportal/about/dataapi) - Args: - country_id (str): 3-digit country id (numerical) - start_year (int): beginning year of the data - end_year (int): end year of the data - download (bool): whether to download the data from the UN Data Portal. - If False, a path must be specified in the path_folder argument. - path_folder (None or str): string path to folder where data are stored - Returns: - pop_df (DataFrame): dataset with total population by age - """ - if end_year is None: - end_year = start_year - # UN variable code for Population by 1-year age groups and sex - pop_code = "47" - - if download: - pop_target = ( - "https://population.un.org/dataportalapi/api/v1/data/indicators/" - + pop_code - + "/locations/" - + country_id - + "/start/" - + str(start_year) - + "/end/" - + str(end_year) - + "?format=csv" - ) - else: - pop_target = os.path.join(DATA_DIR, "un_jpn_pop.csv") - - # Convert .csv file to Pandas DataFrame - pop_df = pd.read_csv( - pop_target, - sep="|", - header=1, - usecols=["TimeLabel", "SexId", "Sex", "AgeStart", "Value"], - float_precision="round_trip", - ) - - # Rename variables in the population and fertility rates data - pop_df.rename( - columns={ - "TimeLabel": "year", - "SexId": "sex_num", - "Sex": "sex_str", - "AgeStart": "age", - "Value": "pop", - }, - inplace=True, - ) - - # Clean the data - pop_df = pop_df[ - ((pop_df["year"] >= start_year) & (pop_df["year"] <= end_year)) - ] - - return pop_df - - -def get_fert(totpers, start_year=2021, end_year=None, graph=False): + endpoint_url = 'https://population.un.org/dataportalapi/api/v1' + url = endpoint_url + '/data/indicators/' + str(indicators) + '/locations/' + str(locations) + '/start/' + str(start_year) + '/end/' + str(end_year) + '&pageSize=100' + #with ( + # get_legacy_session() as s, + # s.get(url) as r + #): + # j = r.json() + # print(j) + r = requests.get(url) + res = r.json() + try: + df = pd.DataFrame(res) + for page in range(2,99): + url = endpoint_url + '/data/indicators/' + str(indicators) + '/locations/' + str(locations) + '/start/' + str(start_year) + '/end/' + str(end_year) + '?pageNumber=' + page + '&pageSize=100' + r = requests.get(url) + res = r.json() + # Store the next page in a data frame + df_temp = pd.DataFrame(res) + # Append next page to the data frame + df = df.append(df_temp) + df.rename(columns=df.iloc[0], inplace = True) + df.drop(df.index[0], inplace = True) + except RequestException as e: + raise SystemExit(e) + return df + #assert len(df)==j['total'], "DataFrame observations do not match total number of records in response" + # keep just what is needed from data + df = df[df.variant == "Median"] + df = df[df.sex == "Both sexes"][["timeLabel", "ageLabel", "value"]] + df.rename({"timeLabel": "year", "ageLabel": "age"}, axis=1, inplace=True) + df.loc[df.age == "100+", "age"] = 100 + df.age = df.age.astype(int) + df.year = df.year.astype(int) + df = df[df.age <= 100] + return df + + +def get_fert(totpers=100, min_age=0, max_age=100, graph=False): """ This function generates a vector of fertility rates by model period age that corresponds to the fertility rate data by age in years. Args: totpers (int): total number of agent life periods (E+S), >= 3 - start_year (int): first year data to download - end_year (int or None): end year data to download + min_age (int): age in years at which agents are born, >= 0 + max_age (int): age in years at which agents die with certainty, + >= 4 graph (bool): =True if want graphical output Returns: fert_rates (Numpy array): fertility rates for each model period of life """ - if totpers > 100: - err_msg = "ERROR get_fert(): totpers must be <= 100." - raise ValueError(err_msg) - - # Get UN fertility rates for Japan for ages 15-49 - ages_15_49 = np.arange(15, 50) - fert_rates_15_49 = ( - get_un_fert_data( - start_year=start_year, end_year=end_year, download=False - )["fert_rate"] - .to_numpy() - .flatten() - ) - - # Extrapolate fertility rates for ages 1-14 and 50-100 using exponential - # function - ages_1_14 = np.arange(1, 15) - slope_15 = (fert_rates_15_49[1] - fert_rates_15_49[0]) / ( - ages_15_49[1] - ages_15_49[0] - ) - fert_rates_1_14 = extrap_exp_3( - ages_1_14, (15, fert_rates_15_49[0]), slope_15, (9, 0.0001), low=True - ) - ages_50_100 = np.arange(50, 101) - slope_49 = (fert_rates_15_49[-1] - fert_rates_15_49[-2]) / ( - ages_15_49[-1] - ages_15_49[-2] - ) - fert_rates_50_100 = extrap_exp_3( - ages_50_100, - (49, fert_rates_15_49[-1]), - slope_49, - (57, 0.0001), - low=False, + # Read UN data + df = get_un_data(indicators=68, locations=392, start_year=1990, end_year=2023) + # put in vector + fert_rates = df.value.values + # fill in with zeros for ages < 15 and > 49 + # NOTE: this assumes min_year < 15 and max_age > 49 + fert_rates = np.append(fert_rates, np.zeros(max_age - 49)) + fert_rates = np.append(np.zeros(15 - min_age), fert_rates) + # divide by 1000 because fertility rates are number of births per + # 1000 woman and we want births per person (might update to account + # from fraction men more correctly - below assumes 50/50 men and women) + fert_rates = fert_rates / 2000 + # Rebin data in the case that model period not equal to one calendar + # year + fert_rates = pop_rebin(fert_rates, totpers) + + # if graph: # need to fix plot function for new data output + # pp.plot_fert_rates(fert_rates, age_midp, totpers, min_age, max_age, + # fert_rates, fert_rates, output_dir=OUTPUT_DIR) + + output_dir = OUTPUT_DIR + # Using pyplot here until update to OG-Core mort rates plotting function + plt.plot( + np.arange(totpers), + fert_rates, ) - fert_rates_1_100 = np.hstack( - (fert_rates_1_14, fert_rates_15_49, fert_rates_50_100) + plt.xlabel(r"Age $s$") + plt.ylabel(r"Fertility rate") + plt.legend(loc="upper left") + plt.text( + -5, + -0.2, + "Source: United Nations Population Prospects.", + fontsize=9, ) - if totpers == 100: - fert_rates = fert_rates_1_100.copy() - ages = np.arange(1, 101) - elif totpers < 100: - # Create population weighted average fertility rates across bins - # Get population data for ages 1-100 - pop_df = get_un_pop_data( - start_year=start_year, end_year=end_year, download=False - ) - pop_1_100 = ( - pop_df[((pop_df["age"] < 100) & (pop_df["sex_num"] == 3))]["pop"] - .to_numpy() - .flatten() - ) - fert_rates = np.zeros(totpers) - len_subbins = len_subbins = np.float64(100 / totpers) - end_sub_bin = int(0) - end_pct = 0.0 - for i in range(totpers): - if end_pct < 1.0: - beg_sub_bin = int(end_sub_bin) - beg_pct = 1 - end_pct - elif end_pct == 1.0: - beg_sub_bin = 1 + int(end_sub_bin) - beg_pct = 1.0 - end_sub_bin = int((i + 1) * len_subbins) - if (i + 1) * len_subbins - end_sub_bin == 0.0: - end_sub_bin = end_sub_bin - 1 - end_pct = 1 - elif (i + 1) * len_subbins - end_sub_bin > 0.0: - end_pct = (i + 1) * len_subbins - end_sub_bin - fert_rates_sub = fert_rates_1_100[beg_sub_bin : end_sub_bin + 1] - pop_sub = pop_1_100[beg_sub_bin : end_sub_bin + 1] - pop_sub[0] = beg_pct * pop_sub[0] - pop_sub[-1] = end_pct * pop_sub[-1] - fert_rates[i] = ((fert_rates_sub * pop_sub) / pop_sub.sum()).sum() - ages = np.arange(1, totpers + 1) - - if graph: # Plot fertility rates - plt.plot(ages, fert_rates) - plt.scatter(ages, fert_rates, marker="d") - plt.xlabel(r"Age $s$") - plt.ylabel(r"Fertility rate $f_{s}$") - plt.text( - -0, - -0.023, - "Source: UN Population Data", - fontsize=9, - ) - plt.tight_layout(rect=(0, 0.035, 1, 1)) - output_path = os.path.join(OUTPUT_DIR, "fert_rates") - plt.savefig(output_path) - plt.close() + plt.tight_layout(rect=(0, 0.03, 1, 1)) + output_path = os.path.join(output_dir, "fert_rates") + plt.savefig(output_path) + plt.close() return fert_rates -def get_mort(totpers, start_year=2021, end_year=None, graph=False): +def get_mort(totpers=100, min_age=0, max_age=100, graph=True): """ This function generates a vector of mortality rates by model period - age. Source: UN Population Data portal. + age. Args: totpers (int): total number of agent life periods (E+S), >= 3 - start_year (int): first year data to download - end_year (int or None): end year data to download + min_age (int): age in years at which agents are born, >= 0 + max_age (int): age in years at which agents die with certainty, + >= 4 graph (bool): =True if want graphical output Returns: mort_rates (Numpy array) mortality rates that correspond to each period of life infmort_rate (scalar): infant mortality rate """ - if totpers > 100: - err_msg = "ERROR get_mort(): totpers must be <= 100." - raise ValueError(err_msg) - - # Get UN infant mortality and mortality rate data by age - un_infmort_rate_df, mort_rates_df = get_un_mort_data( - start_year=start_year, end_year=end_year, download=False - ) - un_infmort_rate = un_infmort_rate_df["infmort_rate"][ - un_infmort_rate_df["sex_num"] == 3 - ].to_numpy()[0] - - # Use World Bank infant mortality rate data (neonatal mortality rate) from - # World Bank World Development Indicators database - most_recent_wb_infmort_datayear = 2020 - if start_year > most_recent_wb_infmort_datayear: - wb_infmort_rate = get_wb_infmort_rate( - start_year=most_recent_wb_infmort_datayear, download=False - ) - else: - wb_infmort_rate = get_wb_infmort_rate( - start_year=start_year, download=False - ) - infmort_rate = wb_infmort_rate - if totpers == 100: - mort_rates = ( - mort_rates_df["mort_rate"][ - ( - (mort_rates_df["sex_num"] == 3) - & (mort_rates_df["age"] < 100) - ) - ] - .to_numpy() - .flatten() - ) - - elif totpers < 100: - # Create population weighted average mortality rates across bins - mort_rates = np.zeros(totpers) - len_subbins = np.float64(100 / totpers) - end_sub_bin = int(0) - end_pct = 0.0 - deaths_0_99 = ( - mort_rates_df[ - ( - (mort_rates_df["sex_num"] == 3) - & (mort_rates_df["age"] < 100) - ) - ]["deaths"] - .to_numpy() - .flatten() - ) - pop_0_99 = ( - mort_rates_df[ - ( - (mort_rates_df["sex_num"] == 3) - & (mort_rates_df["age"] < 100) - ) - ]["pop"] - .to_numpy() - .flatten() - ) - deaths_pop_0_99 = mort_rates_df[ - ((mort_rates_df["sex_num"] == 3) & (mort_rates_df["age"] < 100)) - ][["age", "deaths", "pop"]] - for i in range(totpers): - if end_pct < 1.0: - beg_sub_bin = int(end_sub_bin) - beg_pct = 1 - end_pct - elif end_pct == 1.0: - beg_sub_bin = 1 + int(end_sub_bin) - beg_pct = 1.0 - end_sub_bin = int((i + 1) * len_subbins) - if (i + 1) * len_subbins - end_sub_bin == 0.0: - end_sub_bin = end_sub_bin - 1 - end_pct = 1 - elif (i + 1) * len_subbins - end_sub_bin > 0.0: - end_pct = (i + 1) * len_subbins - end_sub_bin - deaths_sub = deaths_0_99[beg_sub_bin : end_sub_bin + 1] - pop_sub = pop_0_99[beg_sub_bin : end_sub_bin + 1] - deaths_sub[0] = beg_pct * deaths_sub[0] - pop_sub[0] = beg_pct * pop_sub[0] - deaths_sub[-1] = end_pct * deaths_sub[-1] - pop_sub[-1] = end_pct * pop_sub[-1] - mort_rates[i] = deaths_sub.sum() / pop_sub.sum() - # Artificially set the mortality rate of the oldest age to 1. - orig_end_mort_rate = mort_rates[-1] - mort_rates[-1] = 1.0 - ages = np.arange(1, totpers + 1) + # Read UN data + df = get_un_data(indicators=80, locations=392, start_year=1990, end_year=2023) + # put in vector + mort_rates_data = df.value.values + # In UN data, mortality rates for 0 year olds are the infant + # mortality rates + infmort_rate = mort_rates_data[0] + # Rebin data in the case that model period not equal to one calendar + # year + mort_rates = pop_rebin(mort_rates_data[1:], totpers) + + # Mortality rate in last period is set to 1 + mort_rates[-1] = 1 if graph: - mort_rates_all = np.hstack((infmort_rate, mort_rates)) - mort_rates_all[-1] = orig_end_mort_rate - plt.plot(np.arange(0, totpers + 1), mort_rates_all) - plt.scatter( - 0, - infmort_rate, - c="green", - marker="d", - label="Infant mortality rate", - ) - plt.scatter( - ages, - np.hstack((mort_rates[:-1], orig_end_mort_rate)), - c="blue", - marker="d", - label="Mortality rates, model ages 1 to " + str(totpers), - ) - plt.scatter( - totpers, - 1.0, - c="red", - marker="d", - label="Artificial mortality limit, model age " + str(totpers), + output_dir = OUTPUT_DIR + # Using pyplot here until update to OG-Core mort rates plotting function + plt.plot( + df.age.values, + mort_rates_data, ) plt.xlabel(r"Age $s$") plt.ylabel(r"Mortality rate $\rho_{s}$") plt.legend(loc="upper left") plt.text( - 0, - -0.23, - "Source: UN Population Data", + -5, + -0.2, + "Source: United Nations Population Prospects.", fontsize=9, ) - plt.tight_layout(rect=(0, 0.035, 1, 1)) - output_path = os.path.join(OUTPUT_DIR, "mort_rates") - plt.savefig(output_path) - plt.close() + plt.tight_layout(rect=(0, 0.03, 1, 1)) + # Save or return figure + if output_dir: + output_path = os.path.join(output_dir, "mort_rates") + plt.savefig(output_path) + plt.close() + else: + plt.show() return mort_rates, infmort_rate @@ -690,121 +221,117 @@ def pop_rebin(curr_pop_dist, totpers_new): curr_pop_new (Numpy array): new population distribution over totpers (E+S) periods that approximates curr_pop_dist """ + # Number of periods in original data assert totpers_new >= 3 # Number of periods in original data totpers_orig = len(curr_pop_dist) if int(totpers_new) == totpers_orig: curr_pop_new = curr_pop_dist elif int(totpers_new) < totpers_orig: - curr_pop_new = np.zeros(totpers_new) - len_subbins = np.float64(totpers_orig / totpers_new) - end_sub_bin = int(0) - end_pct = 0.0 + num_sub_bins = float(10000) + curr_pop_sub = np.repeat( + np.float64(curr_pop_dist) / num_sub_bins, num_sub_bins + ) + len_subbins = (np.float64(totpers_orig * num_sub_bins)) / totpers_new + curr_pop_new = np.zeros(totpers_new, dtype=np.float64) + end_sub_bin = 0 for i in range(totpers_new): - if end_pct < 1.0: - beg_sub_bin = int(end_sub_bin) - beg_pct = 1 - end_pct - elif end_pct == 1.0: - beg_sub_bin = 1 + int(end_sub_bin) - beg_pct = 1.0 - end_sub_bin = int((i + 1) * len_subbins) - if (i + 1) * len_subbins - end_sub_bin == 0.0: - end_sub_bin = end_sub_bin - 1 - end_pct = 1 - elif (i + 1) * len_subbins - end_sub_bin > 0.0: - end_pct = (i + 1) * len_subbins - end_sub_bin - curr_pop_sub = curr_pop_dist[beg_sub_bin : end_sub_bin + 1] - curr_pop_sub[0] = beg_pct * curr_pop_sub[0] - curr_pop_sub[-1] = end_pct * curr_pop_sub[-1] - curr_pop_new[i] = curr_pop_sub.sum() + beg_sub_bin = int(end_sub_bin) + end_sub_bin = int(np.rint((i + 1) * len_subbins)) + curr_pop_new[i] = curr_pop_sub[beg_sub_bin:end_sub_bin].sum() + # Return curr_pop_new to single precision float (float32) + # datatype + curr_pop_new = np.float32(curr_pop_new) return curr_pop_new -def get_imm_resid(totpers, start_year=2021, end_year=None, graph=False): +def get_imm_rates(totpers=100, min_age=0, max_age=100): """ - Calculate immigration rates by age as a residual given population levels in - different periods, then output average calculated immigration rate. We have - to replace the first mortality rate in this function in order to adjust the - first implied immigration rate. Source: UN Population Data portal. + Calculate immigration rates by age as a residual given population + levels in different periods, then output average calculated + immigration rate. We have to replace the first mortality rate in + this function in order to adjust the first implied immigration rate Args: totpers (int): total number of agent life periods (E+S), >= 3 - start_year (int): first year data to download - end_year (int or None): end year data to download + min_age (int): age in years at which agents are born, >= 0 + max_age (int): age in years at which agents die with certainty, + >= 4 graph (bool): =True if want graphical output Returns: imm_rates (Numpy array):immigration rates that correspond to each period of life, length E+S """ - pop_df = get_un_pop_data(start_year=2019, end_year=2021, download=False) - pop_2019 = ( - pop_df["pop"][ - ( - (pop_df["sex_num"] == 3) - & (pop_df["year"] == 2019) - & (pop_df["age"] < 100) - ) - ] - .to_numpy() - .flatten() - ) - pop_2020 = ( - pop_df["pop"][ - ( - (pop_df["sex_num"] == 3) - & (pop_df["year"] == 2020) - & (pop_df["age"] < 100) - ) - ] - .to_numpy() - .flatten() - ) - pop_2021 = ( - pop_df["pop"][ - ( - (pop_df["sex_num"] == 3) - & (pop_df["year"] == 2021) - & (pop_df["age"] < 100) - ) - ] - .to_numpy() - .flatten() - ) - pop_2019_EpS = pop_rebin(pop_2019, totpers) - pop_2020_EpS = pop_rebin(pop_2020, totpers) - pop_2021_EpS = pop_rebin(pop_2021, totpers) - - fert_rates = get_fert(totpers, start_year=start_year, end_year=end_year) - mort_rates, infmort_rate = get_mort( - totpers, start_year=start_year, end_year=end_year - ) - - imm_rate_1_2020 = ( - pop_2021_EpS[0] - - (1 - infmort_rate) * (fert_rates * pop_2020_EpS).sum() - ) / pop_2020_EpS[0] - imm_rate_1_2019 = ( - pop_2020_EpS[0] - - (1 - infmort_rate) * (fert_rates * pop_2019_EpS).sum() - ) / pop_2019_EpS[0] - imm_rate_1 = (imm_rate_1_2020 + imm_rate_1_2019) / 2 - - imm_rates_sp1_2020 = ( - pop_2021_EpS[1:] - (1 - mort_rates[:-1]) * pop_2020_EpS[:-1] - ) / pop_2020_EpS[:-1] - imm_rates_sp1_2019 = ( - pop_2020_EpS[1:] - (1 - mort_rates[:-1]) * pop_2019_EpS[:-1] - ) / pop_2019_EpS[:-1] - imm_rates_sp1 = (imm_rates_sp1_2020 + imm_rates_sp1_2019) / 2 - imm_rates = np.hstack((imm_rate_1, imm_rates_sp1)) - if graph: - ages = np.arange(1, totpers + 1) - plt.plot(ages, imm_rates, label="Residual data") - plt.xlabel(r"Age $s$") - plt.ylabel(r"immigration rates $\i_s$") - output_path = os.path.join(OUTPUT_DIR, "imm_rates") - plt.savefig(output_path) - plt.close() + # Read UN data + start_year = 1990 + end_year = 2023 + num_years = end_year - start_year + 1 + df = get_un_data(indicators=47, locations=392, start_year=1990, end_year=2023) + + # separate pop dist by year and put into dictionary of arrays + pop_dict = {} + for t in range(num_years): + pop_dist = df[ + (df.year == start_year + t) & (df.age <= 100) & (df.age > 0) + ].value.values + pop_dict[t] = pop_rebin(pop_dist, totpers) + pop_dict[t] = pop_dist + + # Create num_years - 1 years of estimated immigration rates for youngest age + # individuals + imm_mat = np.zeros((num_years - 1, totpers)) + pop_list = [] + for t in range(num_years): + pop_list.append(pop_dict[t][0]) + pop11vec = np.array(pop_list[:-1]) + pop21vec = np.array(pop_list[1:]) + fert_rates = get_fert(totpers, min_age, max_age, False) + mort_rates, infmort_rate = get_mort(totpers, min_age, max_age, False) + newbornvec = np.dot( + fert_rates, np.vstack((pop_dict[0], pop_dict[1], pop_dict[2])).T + ) + imm_mat[:, 0] = (pop21vec - (1 - infmort_rate) * newbornvec) / pop11vec + # Estimate num_years - 1 years of immigration rates for all other-aged + # individuals + pop_mat_dict = {} + pop_mat_dict[0] = np.vstack( + (pop_dict[0][:-1], pop_dict[1][:-1], pop_dict[2][:-1]) + ) + pop_mat_dict[1] = np.vstack( + (pop_dict[0][1:], pop_dict[1][1:], pop_dict[2][1:]) + ) + pop_mat_dict[2] = np.vstack( + (pop_dict[1][1:], pop_dict[2][1:], pop_dict[3][1:]) + ) + mort_mat = np.tile(mort_rates[:-1], (num_years - 1, 1)) + imm_mat[:, 1:] = ( + pop_mat_dict[2] - (1 - mort_mat) * pop_mat_dict[0] + ) / pop_mat_dict[1] + # Final estimated immigration rates are the averages over 3 years + imm_rates = imm_mat.mean(axis=0) + # replace highest agg imm rate bc it doesn't make sense + # imm_rates[-1] = imm_rates[-2] + + output_dir = OUTPUT_DIR + # Using pyplot here until update to OG-Core mort rates plotting function + plt.plot( + np.arange(totpers), + imm_rates, + label="Estimated", + ) + plt.xlabel(r"Age $s$") + plt.ylabel(r"Immigration Rates") + plt.legend(loc="upper left") + plt.text( + -5, + -0.2, + "Source: United Nations Population Prospects.", + fontsize=9, + ) + plt.tight_layout(rect=(0, 0.03, 1, 1)) + output_path = os.path.join(output_dir, "imm_rates_w_un_data.png") + plt.savefig(output_path) + plt.close() return imm_rates @@ -815,19 +342,19 @@ def immsolve(imm_rates, *args): difference in two consecutive periods stationary population distributions. This vector of differences is the zero-function objective used to solve for the immigration rates vector, similar to - the original immigration rates vector from get_imm_resid(), that + the original immigration rates vector from get_imm_rates(), that sets the steady-state population distribution by age equal to the population distribution in period int(1.5*S) Args: imm_rates (Numpy array):immigration rates that correspond to each period of life, length E+S args (tuple): (fert_rates, mort_rates, infmort_rate, omega_cur, - g_n_ss) + g_n_SS) Returns: omega_errs (Numpy array): difference between omega_new and omega_cur_pct, length E+S """ - fert_rates, mort_rates, infmort_rate, omega_cur_lev, g_n_ss = args + fert_rates, mort_rates, infmort_rate, omega_cur_lev, g_n_SS = args omega_cur_pct = omega_cur_lev / omega_cur_lev.sum() totpers = len(fert_rates) OMEGA = np.zeros((totpers, totpers)) @@ -836,23 +363,35 @@ def immsolve(imm_rates, *args): ) OMEGA[1:, :-1] += np.diag(1 - mort_rates[:-1]) OMEGA[1:, 1:] += np.diag(imm_rates[1:]) - omega_new = np.dot(OMEGA, omega_cur_pct) / (1 + g_n_ss) + omega_new = np.dot(OMEGA, omega_cur_pct) / (1 + g_n_SS) omega_errs = omega_new - omega_cur_pct return omega_errs -def get_pop_objs(E, S, T, curr_year, GraphDiag=False): +def get_pop_objs( + E=20, + S=80, + T=320, + min_age=0, + max_age=100, + data_year=1990, + model_year=2023, + GraphDiag=True, +): """ - This function produces the demographics objects to be used in the OG-JPN - model package. + This function produces the demographics objects to be used in the + OG-JPN model package. Args: E (int): number of model periods in which agent is not economically active, >= 1 S (int): number of model periods in which agent is economically active, >= 3 T (int): number of periods to be simulated in TPI, > 2*S - curr_year (int): current year for which analysis will begin, + min_age (int): age in years at which agents are born, >= 0 + max_age (int): age in years at which agents die with certainty, + >= 4 + model_year (int): current year for which analysis will begin, >= 2016 GraphDiag (bool): =True if want graphical output and printed diagnostics @@ -861,7 +400,7 @@ def get_pop_objs(E, S, T, curr_year, GraphDiag=False): omega_path_S (Numpy array), time path of the population distribution from the current state to the steady-state, size T+S x S - g_n_ss (scalar): steady-state population growth rate + g_n_SS (scalar): steady-state population growth rate omega_SS (Numpy array): normalized steady-state population distribution, length S surv_rates (Numpy array): survival rates that correspond to @@ -871,75 +410,64 @@ def get_pop_objs(E, S, T, curr_year, GraphDiag=False): g_n_path (Numpy array): population growth rates over the time path, length T + S """ - assert curr_year >= 2021 - most_recent_data_year = 2021 - hardcode_start_year = min(curr_year, most_recent_data_year) - fert_rates = get_fert(E + S, start_year=hardcode_start_year) - mort_rates, infmort_rate = get_mort(E + S, start_year=hardcode_start_year) + #assert model_year >= 2011 and model_year <= 2100 + #assert data_year >= 2011 and data_year <= 2100 + # need data year to be before model year to get omega_S_preTP + assert data_year < model_year + + # Get fertility, mortality, and immigration rates + # will be used to generate population distribution in future years + fert_rates = get_fert(E + S, min_age, max_age) + mort_rates, infmort_rate = get_mort(E + S, min_age, max_age) mort_rates_S = mort_rates[-S:] - imm_rates_orig = get_imm_resid(E + S, start_year=hardcode_start_year) + imm_rates_orig = get_imm_rates(E + S, min_age, max_age) OMEGA_orig = np.zeros((E + S, E + S)) - OMEGA_orig[0, :] = (1 - infmort_rate) * fert_rates + OMEGA_orig[0, :] = (1 - infmort_rate) * fert_rates + np.hstack( + (imm_rates_orig[0], np.zeros(E + S - 1)) + ) OMEGA_orig[1:, :-1] += np.diag(1 - mort_rates[:-1]) - OMEGA_orig += np.diag(imm_rates_orig) + OMEGA_orig[1:, 1:] += np.diag(imm_rates_orig[1:]) # Solve for steady-state population growth rate and steady-state # population distribution by age using eigenvalue and eigenvector # decomposition - eigvalues_orig, eigvectors_orig = np.linalg.eig(OMEGA_orig) - g_n_ss_orig = (eigvalues_orig[np.isreal(eigvalues_orig)].real).max() - 1 - eigvec_raw_orig = eigvectors_orig[ - :, (eigvalues_orig[np.isreal(eigvalues_orig)].real).argmax() + eigvalues, eigvectors = np.linalg.eig(OMEGA_orig) + g_n_SS = (eigvalues[np.isreal(eigvalues)].real).max() - 1 + eigvec_raw = eigvectors[ + :, (eigvalues[np.isreal(eigvalues)].real).argmax() ].real - omega_SS_orig = eigvec_raw_orig / eigvec_raw_orig.sum() + omega_SS_orig = eigvec_raw / eigvec_raw.sum() # Generate time path of the nonstationary population distribution omega_path_lev = np.zeros((E + S, T + S)) - pop_df = get_un_pop_data(start_year=2019, end_year=2021, download=False) - pop_2020 = ( - pop_df["pop"][ - ( - (pop_df["sex_num"] == 3) - & (pop_df["year"] == 2020) - & (pop_df["age"] < 100) - ) - ] - .to_numpy() - .flatten() - ) - pop_2021 = ( - pop_df["pop"][ - ( - (pop_df["sex_num"] == 3) - & (pop_df["year"] == 2021) - & (pop_df["age"] < 100) - ) - ] - .to_numpy() - .flatten() - ) + pop_data = get_un_data("47", start_year=data_year, end_year=data_year) + # TODO: allow one to read in multiple years of UN forecast then + # extrapolate from the end of that + pop_data_sample = pop_data[ + (pop_data["age"] >= min_age - 1) & (pop_data["age"] <= max_age - 1) + ] + pop = pop_data_sample.value.values + # Generate the current population distribution given that E+S might + # be less than max_age-min_age+1 age_per_EpS = np.arange(1, E + S + 1) - pop_2020_EpS = pop_rebin(pop_2020, E + S) - pop_2021_EpS = pop_rebin(pop_2021, E + S) - pop_2021_pct = pop_2021_EpS / pop_2021_EpS.sum() - # Age most recent population data to the current year of analysis - pop_curr = pop_2020_EpS.copy() - # pop_past = pop_2020_EpS.copy() - if curr_year == most_recent_data_year: - pop_past = pop_curr.copy() - pop_curr = np.dot(OMEGA_orig, pop_past) - g_n_curr = (pop_curr[-S:].sum() - pop_past[-S:].sum()) / pop_past[ + pop_EpS = pop_rebin(pop, E + S) + pop_pct = pop_EpS / pop_EpS.sum() + # Age the data to the model year + pop_curr = pop_EpS.copy() + pop_next = np.dot(OMEGA_orig, pop_curr) + g_n_curr = (pop_next[-S:].sum() - pop_curr[-S:].sum()) / pop_curr[ + -S: + ].sum() # g_n in data year + pop_past = pop_curr + for per in range(model_year - data_year): + pop_next = np.dot(OMEGA_orig, pop_curr) + g_n_curr = (pop_next[-S:].sum() - pop_curr[-S:].sum()) / pop_curr[ -S: ].sum() - omega_path_lev[:, 0] = pop_curr - elif curr_year > most_recent_data_year: - for per in range(curr_year - most_recent_data_year): - pop_past = pop_curr.copy() - pop_curr = np.dot(OMEGA_orig, pop_past) - g_n_curr = (pop_curr[-S:].sum() - pop_past[-S:].sum()) / pop_past[ - -S: - ].sum() - omega_path_lev[:, 0] = pop_curr + pop_past = pop_curr + pop_curr = pop_next + # Generate time path of the population distribution + omega_path_lev[:, 0] = pop_curr.copy() for per in range(1, T + S): pop_next = np.dot(OMEGA_orig, pop_curr) omega_path_lev[:, per] = pop_next.copy() @@ -956,7 +484,7 @@ def get_pop_objs(E, S, T, curr_year, GraphDiag=False): mort_rates, infmort_rate, omega_path_lev[:, fixper], - g_n_ss_orig, + g_n_SS, ) imm_fulloutput = opt.fsolve( immsolve, @@ -979,16 +507,8 @@ def get_pop_objs(E, S, T, curr_year, GraphDiag=False): omega_path_lev[-S:, 1:].sum(axis=0) - omega_path_lev[-S:, :-1].sum(axis=0) ) / omega_path_lev[-S:, :-1].sum(axis=0) - # Compute adjusted population growth rate - OMEGA2 = np.zeros((E + S, E + S)) - OMEGA2[0, :] = (1 - infmort_rate) * fert_rates - OMEGA2[1:, :-1] += np.diag(1 - mort_rates[:-1]) - OMEGA2 += np.diag(imm_rates_adj) - eigvalues2, eigvectors2 = np.linalg.eig(OMEGA2) - g_n_ss_adj = (eigvalues2[np.isreal(eigvalues2)].real).max() - 1 - g_n_ss = g_n_ss_adj.copy() - # g_n_path[fixper + 1 :] = g_n_ss - omega_S_preTP = pop_past[-S:] / pop_past[-S:].sum() + g_n_path[fixper + 1 :] = g_n_SS + omega_S_preTP = (pop_past.copy()[-S:]) / (pop_past.copy()[-S:].sum()) imm_rates_mat = np.hstack( ( np.tile(np.reshape(imm_rates_orig[E:], (S, 1)), (1, fixper)), @@ -1027,7 +547,7 @@ def get_pop_objs(E, S, T, curr_year, GraphDiag=False): omegaSSvTmaxdiff = np.absolute(omega_SS_orig - omega_SSfx).max() if omegaSSvTmaxdiff > 0.0003: print( - "POP. WARNING: The maximimum absolute difference " + "POP. WARNING: The maximum absolute difference " + "between any two corresponding points in the original" + " and adjusted steady-state population " + "distributions is" @@ -1067,20 +587,28 @@ def get_pop_objs(E, S, T, curr_year, GraphDiag=False): # Test whether the steady-state growth rates implied by the # adjusted OMEGA matrix equals the steady-state growth rate of # the original OMEGA matrix - if np.max(np.absolute(g_n_ss_adj - g_n_ss_orig)) > 10 ** (-8): + OMEGA2 = np.zeros((E + S, E + S)) + OMEGA2[0, :] = (1 - infmort_rate) * fert_rates + np.hstack( + (imm_rates_adj[0], np.zeros(E + S - 1)) + ) + OMEGA2[1:, :-1] += np.diag(1 - mort_rates[:-1]) + OMEGA2[1:, 1:] += np.diag(imm_rates_adj[1:]) + eigvalues2, eigvectors2 = np.linalg.eig(OMEGA2) + g_n_SS_adj = (eigvalues[np.isreal(eigvalues2)].real).max() - 1 + if np.max(np.absolute(g_n_SS_adj - g_n_SS)) > 10 ** (-8): print( "FAILURE: The steady-state population growth rate" + " from adjusted OMEGA is different (diff is " - + str(g_n_ss_adj - g_n_ss_orig) + + str(g_n_SS_adj - g_n_SS) + ") than the steady-" + "state population growth rate from the original" + " OMEGA." ) - elif np.max(np.absolute(g_n_ss_adj - g_n_ss_orig)) <= 10 ** (-8): + elif np.max(np.absolute(g_n_SS_adj - g_n_SS)) <= 10 ** (-8): print( "SUCCESS: The steady-state population growth rate" + " from adjusted OMEGA is close to (diff is " - + str(g_n_ss_adj - g_n_ss_orig) + + str(g_n_SS_adj - g_n_SS) + ") the steady-" + "state population growth rate from the original" + " OMEGA." @@ -1125,20 +653,20 @@ def get_pop_objs(E, S, T, curr_year, GraphDiag=False): ) pp.plot_population_path( age_per_EpS, - pop_2021_pct, + pop_pct, omega_path_lev, omega_SSfx, - curr_year, + model_year, E, S, output_dir=OUTPUT_DIR, ) - # return omega_path_S, g_n_ss, omega_SSfx, survival rates, + # return omega_path_S, g_n_SS, omega_SSfx, survival rates, # mort_rates_S, and g_n_path pop_dict = { "omega": omega_path_S.T, - "g_n_ss": g_n_ss, + "g_n_ss": g_n_SS, "omega_SS": omega_SSfx[-S:] / omega_SSfx[-S:].sum(), "surv_rate": 1 - mort_rates_S, "rho": mort_rates_S, @@ -1147,246 +675,4 @@ def get_pop_objs(E, S, T, curr_year, GraphDiag=False): "omega_S_preTP": omega_S_preTP, } - return pop_dict - - -def extrap_exp_3( - x_vals, con_val: tuple, con_slope: float, eps_val: tuple, low: bool = True -): - """ - This function fits a smooth exponential extrapolation to either the low end - of data or the high end of data, both of which are monotonically - asymptoting to zero. For the exponential function extrapolation on both - ends of the distribution, we use the function: - f(x) = e ** (a * (x ** 2) + b * x + c) - s.t. (i) f(x_con) = y_con - (ii) f'(x_con) = con_slope - (iii) f'(x_eps) = eps_low (>0) or eps_high (<0) - Args: - x_vals (array_like): array of x values to be extrapolated - con_val (tuple): (x, y) coordinate at which the function must connect - to the data - con_slope (float): slope of the data at the connecting value - eps_val (tuple): (x, y) coordinate at which the function must be close - to zero - low (bool): If True, the function is fit to the low end of the data. - If False, the function is fit to the high end of the data. - Returns: - y_vals (array_like): extrapolated y values corresponding to x values - """ - if low: - if con_slope <= 0: - err_msg = ( - "ERROR extrap_exp_3: con_slope must be positive if " - + "extrapolating to the low end of the data." - ) - raise ValueError(err_msg) - else: - if con_slope >= 0: - err_msg = ( - "ERROR extrap_exp_3: con_slope must be negative if " - + "extrapolating to the high end of the data." - ) - raise ValueError(err_msg) - - eps_slope_low = 0.0001 - eps_slope_high = -eps_slope_low - - # Unpack the coordinates - x_con, y_con = con_val - x_eps, y_eps = eps_val - - # check if linear extrapolation intersects zero beyond x_eps - lin_y_intercept = y_con - con_slope * x_con - x_intercept = -lin_y_intercept / con_slope - if low: - lin_extrap_overshoot = x_intercept < x_eps - else: - lin_extrap_overshoot = x_intercept > x_eps - if lin_extrap_overshoot: - # Estimate an arctangent function to fit the data - print( - "WARNING: extrap_exp_3: Linear extrapolation overshoots " - + "furthest value. Using arctangent function instead." - ) - y_vals = extrap_arctan_3(x_vals, con_slope, x_con, y_con, x_eps, low) - else: - # Estimate an exponential function to fit the data - if low: - params = [con_slope, x_con, y_con, x_eps, eps_slope_low] - else: - params = [con_slope, x_con, y_con, x_eps, eps_slope_high] - a_guess = 0.1 - b_guess = 0.1 - ab_guess = np.array([a_guess, b_guess]) - solution = opt.root( - ab_zero_eqs_exp_func, ab_guess, args=params, method="lm" - ) - if not solution.success: - err_msg = ( - "ERROR extrap_exp_3: Root finder failed in " - + "ab_zero_eqs_exp_func." - ) - raise ValueError(err_msg) - a, b = solution.x - if low: - # a = np.log(con_slope / eps_low) / (x_con - x_eps) - y_pos_ind = x_vals >= x_eps - else: - # a = np.log(con_slope / eps_high) / (x_con - x_eps) - y_pos_ind = x_vals <= x_eps - # b = np.log(con_slope / (a * np.exp(a * x_con))) - # c = y_con - np.exp(a * x_con + b) - c = np.log(y_con) - a * (x_con**2) - b * x_con - - len_x_vals = len(x_vals) - len_y_pos_ind = y_pos_ind.sum() - if low: - y_vals = np.hstack( - ( - np.zeros(len_x_vals - len_y_pos_ind), - np.exp( - a * (x_vals[y_pos_ind] ** 2) - + b * x_vals[y_pos_ind] - + c - ), - ) - ) - else: - y_vals = np.hstack( - ( - np.exp( - a * (x_vals[y_pos_ind] ** 2) - + b * x_vals[y_pos_ind] - + c - ), - np.zeros(len_x_vals - len_y_pos_ind), - ) - ) - - return y_vals - - -def extrap_arctan_3( - x_vals, con_slope: float, x_con, y_con, x_eps, low: bool = True -): - """ - This function fits an arctangent function to extrapolate data that - monotonically decrease to zero and start with small absolute slope, then - absolute slope increases, then absolute slope decreases to zero. The - arctangent function is the following with the three following identifying - conditions: - if extrapolating to the low end of the data: - f(x) = (a / pi) * arctan(b * x + c) + (a / 2) s.t. a, b > 0 - where f'(x) = (a * b) / (pi * (1 + (b * x + c)^2)) - if extrapolating to the high end of the data: - f(x) = (-a / pi) * arctan(b * x + c) + (a / 2) s.t. a, b > 0 - where f'(x) = (-a * b) / (pi * (1 + (b * x + c)^2)) - s.t. (i) f(x_con) = y_con - and (ii) f'(x_con) = con_slope - and (iii) b * (2/3 * x_con + 1/3 * x_eps) + c = 0 - The solution to this problem can be reduced to finding the root of a - univariate equation in the parameter b. - Args: - x_vals (array_like): array of x values to be extrapolated - con_slope (float): slope of the data at the connecting value - x_con (float): x value at which the function must connect to the data - y_con (float): y value at which the function must connect to the data - x_eps (float): x value at which the function must be close to zero - low (bool): If True, the function is fit to the low end of the data. - If False, the function is fit to the high end of the data. - Returns: - y_vals (array_like): extrapolated y values corresponding to x values - """ - y_vals = np.zeros_like(x_vals) - - # Solve for the parameter b - params = [con_slope, x_con, y_con, x_eps, low] - b_guess = 20.0 - solution = opt.root(b_zero_eq_arctan_func, b_guess, args=params) - if not solution.success: - err_msg = ( - "ERROR extrap_arctan_3: Root finder failed in " - + "b_zero_eq_arctan_func." - ) - raise ValueError(err_msg) - b = solution.x - - len_x_vals = len(x_vals) - - if low: - a = y_con / ( - (1 / np.pi) * np.arctan((b / 3) * (x_con - x_eps)) + (1 / 2) - ) - c = -b * ((2 / 3) * x_con + (1 / 3) * x_eps) - y_pos_ind = x_vals >= x_eps - len_y_pos_ind = y_pos_ind.sum() - y_vals = np.hstack( - ( - np.zeros(len_x_vals - len_y_pos_ind), - (a / np.pi) * np.arctan(b * x_vals[y_pos_ind] + c) + (a / 2), - ) - ) - else: - a = y_con / ( - (-1 / np.pi) * np.arctan((b / 3) * (x_con - x_eps)) + (1 / 2) - ) - c = -b * ((2 / 3) * x_con + (1 / 3) * x_eps) - y_pos_ind = x_vals <= x_eps - len_y_pos_ind = y_pos_ind.sum() - y_vals = np.hstack( - ( - (-a / np.pi) * np.arctan(b * x_vals[y_pos_ind] + c) + (a / 2), - np.zeros(len_x_vals - len_y_pos_ind), - ) - ) - - return y_vals - - -def ab_zero_eqs_exp_func(ab_vals, params): - """ " - This function returns a vector of error values for the two zero equations - in terms of parameters a and b for given values of a and b. - """ - con_slope, x_con, y_con, x_eps, eps = params - a, b = ab_vals - - c = np.log(y_con) - a * (x_con**2) - b * x_con - error_1 = (2 * a * x_con + b) * np.exp( - a * (x_con**2) + b * x_con + c - ) - con_slope - error_2 = (2 * a * x_eps + b) * np.exp( - a * (x_eps**2) + b * x_eps + c - ) - eps - - error_vec = np.array([error_1, error_2]) - - return error_vec - - -def b_zero_eq_arctan_func(b, params): - """ " - This function returns a scalar error value of the univariate error function - in parameter b for given values of b. - """ - con_slope, x_con, y_con, x_eps, low = params - - if low: - a = y_con / ( - (1 / np.pi) * np.arctan((b / 3) * (x_con - x_eps)) + (1 / 2) - ) - a_other = ( - con_slope * np.pi * (1 + ((b / 3) ** 2) * ((x_con - x_eps) ** 2)) - ) / b - error_val = a_other - a - else: - a = y_con / ( - (-1 / np.pi) * np.arctan((b / 3) * (x_con - x_eps)) + (1 / 2) - ) - a_other = ( - -con_slope * np.pi * (1 + ((b / 3) ** 2) * ((x_con - x_eps) ** 2)) - ) / b - error_val = a_other - a - - return error_val \ No newline at end of file + return pop_dict \ No newline at end of file