Skip to content

Commit

Permalink
Merge pull request #234 from USEPA/v1.2.2_release
Browse files Browse the repository at this point in the history
V1.2.2 release
  • Loading branch information
bl-young authored Jun 15, 2022
2 parents 06bb019 + d865063 commit cc0e5cb
Show file tree
Hide file tree
Showing 45 changed files with 2,306 additions and 259 deletions.
17 changes: 11 additions & 6 deletions flowsa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,30 @@ def getFlowByActivity(datasource, year, flowclass=None, geographic_level=None,
fba = load_preprocessed_output(fba_meta, paths)
# If that didn't work, try to download a remote version of FBA
if fba is None and download_FBA_if_missing:
log.info('%s %s not found in %s, downloading from remote source',
datasource, str(year), fbaoutputpath)
log.info(f'{datasource} {str(year)} not found in {fbaoutputpath}, '
'downloading from remote source')
download_from_remote(fba_meta, paths)
fba = load_preprocessed_output(fba_meta, paths)
# If that didn't work or wasn't allowed, try to construct the FBA
if fba is None:
log.info('%s %s not found in %s, running functions to generate FBA',
datasource, str(year), fbaoutputpath)
log.info(f'{datasource} {str(year)} not found in {fbaoutputpath}, '
'running functions to generate FBA')
# Generate the fba
flowsa.flowbyactivity.main(year=year, source=datasource)
# Now load the fba
fba = load_preprocessed_output(fba_meta, paths)
# If none of the above worked, log an error message
if fba is None:
log.error('getFlowByActivity failed, FBA not found')
raise flowsa.exceptions.FBANotAvailableError(method=datasource,
year=year)
# Otherwise (that is, if one of the above methods successfuly loaded the
# FBA), log it.
else:
log.info('Loaded %s %s from %s', datasource, str(year), fbaoutputpath)
log.info(f'Loaded {datasource} {str(year)} from {fbaoutputpath}')

if len(fba) ==0:
raise flowsa.exceptions.FBANotAvailableError(
message=f"Error generating {datasource} for {str(year)}")

# Address optional parameters
if flowclass is not None:
Expand Down
10 changes: 5 additions & 5 deletions flowsa/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dotenv import load_dotenv
from esupy.processed_data_mgmt import create_paths_if_missing
import flowsa.flowsa_yaml as flowsa_yaml
import flowsa.exceptions
from flowsa.schema import flow_by_activity_fields, flow_by_sector_fields, \
flow_by_sector_collapsed_fields, flow_by_activity_mapped_fields, \
flow_by_activity_wsec_fields, flow_by_activity_mapped_wsec_fields, \
Expand Down Expand Up @@ -51,8 +52,7 @@ def load_api_key(api_source):
load_dotenv(f'{MODULEPATH}API_Keys.env', verbose=True)
key = os.getenv(api_source)
if key is None:
log.error(f"Key file {api_source} not found. See github wiki for help "
"https://github.com/USEPA/flowsa/wiki/Using-FLOWSA#api-keys")
raise flowsa.exceptions.APIError(api_source=api_source)
return key


Expand Down Expand Up @@ -131,9 +131,9 @@ def load_yaml_dict(filename, flowbytype=None, filepath=None):
try:
with open(yaml_path, 'r') as f:
config = flowsa_yaml.load(f, filepath)
except IOError:
log.error(f'{flowbytype} method file not found')
raise
except FileNotFoundError:
raise flowsa.exceptions.FlowsaMethodNotFoundError(
method_type=flowbytype, method=filename)
return config


Expand Down
1,117 changes: 1,117 additions & 0 deletions flowsa/data/activitytosectormapping/NAICS_Crosswalk_BEA_2012_Summary.csv

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions flowsa/data/source_catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ EPA_GHGI:
sector-like_activities: False
activity_schema:
sector_aggregation_level: "aggregated"
EPA_StateGHGI:
class:
- Chemicals
sector-like_activities: False
activity_schema:
sector_aggregation_level: "aggregated"
EPA_SIT:
class:
- Chemicals
sector-like_activities: False
activity_schema:
sector_aggregation_level: "aggregated"
EPA_NEI_Nonpoint:
class:
- Chemicals
Expand Down Expand Up @@ -170,6 +182,12 @@ StatCan_LFS:
sector-like_activities: False
activity_schema:
sector_aggregation_level: "aggregated"
stateio:
class:
- Money
sector-like_activities: False #update to true once alternate activity_schema in place
# activity_schema: BEA_2012_Summary_Code
sector_aggregation_level: "disaggregated"
USDA_CoA_Cropland:
class:
- Land
Expand Down
48 changes: 20 additions & 28 deletions flowsa/data_source_scripts/BEA.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,8 @@ def bea_use_detail_br_parse(*, year, **_):
f'_Detail_Use_PRO_BeforeRedef.csv'
df_raw = pd.read_csv(csv_load)

# first column is the commodity being consumed
df = df_raw.rename(columns={'Unnamed: 0': 'ActivityProducedBy'})

# use "melt" fxn to convert colummns into rows
df = df.melt(id_vars=["ActivityProducedBy"],
var_name="ActivityConsumedBy",
value_name="FlowAmount")

df['Year'] = str(year)
# hardcode data
df['FlowName'] = "USD" + str(year)
df["Class"] = "Money"
df["FlowType"] = "TECHNOSPHERE_FLOW"
df['Description'] = 'BEA_2012_Detail_Code'
df = bea_detail_parse(df_raw, year)
df["SourceName"] = "BEA_Use_Detail_PRO_BeforeRedef"
df["Location"] = US_FIPS
df['LocationSystem'] = "FIPS_2015"
# original unit in million USD
df['FlowAmount'] = df['FlowAmount'] * 1000000
df["Unit"] = "USD"
df['DataReliability'] = 5 # tmp
df['DataCollection'] = 5 # tmp

return df

Expand All @@ -96,10 +76,17 @@ def bea_make_detail_br_parse(*, year, **_):
flowbyactivity specifications
"""
# Read directly into a pandas df
df_raw = pd.read_csv(externaldatapath + "BEA_" + str(year) +
"_Detail_Make_BeforeRedef.csv")
csv_load = f'{externaldatapath}BEA_{str(year)}' \
f'_Detail_Make_BeforeRedef.csv'
df_raw = pd.read_csv(csv_load)

df = bea_detail_parse(df_raw, year)
df["SourceName"] = "BEA_Make_Detail_BeforeRedef"

# first column is the industry
return df


def bea_detail_parse(df_raw, year):
df = df_raw.rename(columns={'Unnamed: 0': 'ActivityProducedBy'})

# use "melt" fxn to convert colummns into rows
Expand All @@ -109,19 +96,17 @@ def bea_make_detail_br_parse(*, year, **_):

df['Year'] = str(year)
# hardcode data
df['FlowName'] = "USD" + str(year)
df['FlowName'] = f"USD{str(year)}"
df["Class"] = "Money"
df["FlowType"] = "TECHNOSPHERE_FLOW"
df['Description'] = 'BEA_2012_Detail_Code'
df["SourceName"] = "BEA_Make_Detail_BeforeRedef"
df["Location"] = US_FIPS
df['LocationSystem'] = "FIPS_2015"
# original unit in million USD
df['FlowAmount'] = df['FlowAmount'] * 1000000
df["Unit"] = "USD"
df['DataReliability'] = 5 # tmp
df['DataCollection'] = 5 # tmp

return df


Expand Down Expand Up @@ -187,9 +172,16 @@ def subset_and_allocate_BEA_table(df, attr, **_):
"""
Temporary function to mimic use of 2nd helper allocation dataset
"""

df = subset_BEA_table(df, attr)
v = {'geoscale_to_use': 'national'}
method2 = {'target_sector_source': 'NAICS_2012_Code'}

import importlib
fxn = getattr(importlib.import_module(
'flowsa.data_source_scripts.BLS_QCEW'),
"bls_clean_allocation_fba_w_sec")

attr2 = {"helper_source": "BLS_QCEW",
"helper_method": "proportional",
"helper_source_class": "Employment",
Expand All @@ -200,7 +192,7 @@ def subset_and_allocate_BEA_table(df, attr, **_):
"Number of employees, Private"],
"helper_from_scale": "national",
"allocation_from_scale": "national",
"clean_helper_fba_wsec": "bls_clean_allocation_fba_w_sec"}
"clean_helper_fba_wsec": fxn}
df2 = allocation_helper(df, attr2, method2, v, False)
# Drop remaining rows with no sectors e.g. T001 and other final demands
df2 = df2.dropna(subset=['SectorConsumedBy']).reset_index(drop=True)
Expand Down
14 changes: 7 additions & 7 deletions flowsa/data_source_scripts/BLS_QCEW.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,21 @@ def bls_qcew_parse(*, df_list, year, **_):
return df2


def clean_bls_qcew_fba_for_employment_sat_table(fba_df, **kwargs):
def clean_bls_qcew_fba_for_employment_sat_table(fba, **_):
"""
When creating the employment satellite table for use in useeior,
modify the flow name to match prior methodology for mapping/impact factors
modify the flow name to match prior methodology for mapping/impact factors.
clean_fba_df_fxn
:param fba_df: df, flowbyactivity
:param kwargs: dictionary, can include attr, a dictionary of parameters
in the FBA method yaml
:param fba: df, flowbyactivity
:return: df, flowbyactivity, with modified flow names
"""

# rename flowname value
for c in ['FlowName', 'Flowable']:
fba_df[c] = fba_df[c].str.replace('Number of employees', 'Jobs')
fba[c] = fba[c].str.replace('Number of employees', 'Jobs')

return fba_df
return fba


def bls_clean_allocation_fba_w_sec(df_w_sec, **kwargs):
Expand Down
15 changes: 10 additions & 5 deletions flowsa/data_source_scripts/CalRecycle_WasteCharacterization.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from flowsa.settings import externaldatapath
from flowsa.sectormapping import get_fba_allocation_subset, \
add_sectors_to_flowbyactivity
from flowsa.dataclean import replace_strings_with_NoneType
from flowsa.dataclean import replace_strings_with_NoneType, standardize_units


def produced_by(entry):
Expand Down Expand Up @@ -109,16 +109,17 @@ def calR_parse(*, year, **_):
return output


def keep_generated_quantity(fba, **kwargs):
def keep_generated_quantity(fba, **_):
"""
Function to clean CalRecycles FBA to remove quantities not
assigned as Generated
:param fba: df, FBA format
:param kwargs: dictionary, can include attr, a dictionary of parameters in
the FBA method yaml
:return: df, modified CalRecycles FBA
"""
fba = fba[fba['Description'] == 'Generated']
fba = fba[fba['Description'] == 'Generated'].reset_index(drop=True)
# if no mapping performed, still update units
if 'tons' in fba['Unit'].values:
fba = standardize_units(fba)
return fba


Expand Down Expand Up @@ -146,6 +147,10 @@ def apply_tons_per_employee_per_year_to_states(fbs, method, **_):

# Calculate tons per employee per year per material and sector in CA
bls_CA = bls[bls['Location'] == '06000'] # California
# aggregate all employment prior to generating tpepy
bls_CA = (bls_CA.groupby(['Location','Year','SectorProducedBy'])
.agg({'Employees':'sum'})
.reset_index())
tpepy = fbs.merge(bls_CA, how='inner')
tpepy['TPEPY'] = np.divide(tpepy['FlowAmount'], tpepy['Employees'],
out=np.zeros_like(tpepy['Employees']),
Expand Down
21 changes: 10 additions & 11 deletions flowsa/data_source_scripts/Census_CBP.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ def Census_CBP_URL_helper(*, build_url, year, **_):
# This is only for years 2010 and 2011. This is done because the State
# query that gets all counties returns too many results and errors out.
if year in ['2010', '2011']:
if year == '2011':
fips_year = '2010'
else:
fips_year = '2010'
county_fips_df = get_county_FIPS(fips_year)
county_fips_df = get_county_FIPS('2010')
county_fips = county_fips_df.FIPS
for d in county_fips:
url = build_url
Expand Down Expand Up @@ -82,16 +78,15 @@ def Census_CBP_URL_helper(*, build_url, year, **_):
urls_census.append(url)
else:
FIPS_2 = get_all_state_FIPS_2()['FIPS_2']
for c in FIPS_2:
for state in FIPS_2:
url = build_url
url = url.replace("__stateFIPS__", c)
url = url.replace("__stateFIPS__", state)
# specified NAICS code year depends on year of data
if year in ['2017']:
if year in ['2017', '2018', '2019', '2020']:
url = url.replace("__NAICS__", "NAICS2017")
url = url.replace("__countyFIPS__", "*")
if year in ['2012', '2013', '2014', '2015', '2016']:
elif year in ['2012', '2013', '2014', '2015', '2016']:
url = url.replace("__NAICS__", "NAICS2012")
url = url.replace("__countyFIPS__", "*")
url = url.replace("__countyFIPS__", "*")
urls_census.append(url)

return urls_census
Expand Down Expand Up @@ -152,6 +147,10 @@ def census_cbp_parse(*, df_list, year, **_):
value_name="FlowAmount")
# specify unit based on flowname
df['Unit'] = np.where(df["FlowName"] == 'Annual payroll', "USD", "p")
# Payroll in units of thousand USD
df['FlowAmount'] = np.where(df["FlowName"] == 'Annual payroll',
df['FlowAmount'] * 1000,
df['FlowAmount'])
# specify class
df.loc[df['FlowName'] == 'Number of employees', 'Class'] = 'Employment'
df.loc[df['FlowName'] == 'Number of establishments', 'Class'] = 'Other'
Expand Down
6 changes: 3 additions & 3 deletions flowsa/data_source_scripts/EIA_CBECS_Land.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,15 @@ def standardize_eia_cbecs_land_activity_names(df, column_to_standardize):
return df


def cbecs_land_fba_cleanup(fba_load):
def cbecs_land_fba_cleanup(fba, **_):
"""
Clean up the land fba for use in allocation
:param fba_load: df, eia cbecs land flowbyactivity format
:param fba: df, eia cbecs land flowbyactivity format
:return: df, flowbyactivity with modified values
"""

# estimate floor space using number of floors
fba = calculate_floorspace_based_on_number_of_floors(fba_load)
fba = calculate_floorspace_based_on_number_of_floors(fba)

# calculate the land area in addition to building footprint
fba1 = calculate_total_facility_land_area(fba)
Expand Down
6 changes: 3 additions & 3 deletions flowsa/data_source_scripts/EIA_MECS.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def eia_mecs_energy_clean_allocation_fba_w_sec(
return df2


def mecs_land_fba_cleanup(fba):
def mecs_land_fba_cleanup(fba, **_):
"""
Modify the EIA MECS Land FBA
:param fba: df, EIA MECS Land FBA format
Expand All @@ -452,15 +452,15 @@ def mecs_land_fba_cleanup(fba):
return fba


def mecs_land_fba_cleanup_for_land_2012_fbs(fba):
def mecs_land_fba_cleanup_for_land_2012_fbs(fba, **_):
"""
The 'land_national_2012' FlowBySector uses MECS 2014 data, set
MECS year to 2012
:param fba: df, EIA MECS Land, FBA format
:return: df, EIA MECS Land FBA modified
"""

fba = mecs_land_fba_cleanup(fba)
fba = mecs_land_fba_cleanup(fba=fba)

# reset the EIA MECS Land year from 2014 to 2012 to match
# the USDA ERS MLU year
Expand Down
15 changes: 10 additions & 5 deletions flowsa/data_source_scripts/EPA_CDDPath.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from flowsa.location import US_FIPS
from flowsa.settings import externaldatapath
from flowsa.flowbyfunctions import assign_fips_location_system
from flowsa.dataclean import standardize_units


# Read pdf into list of DataFrame
Expand Down Expand Up @@ -95,17 +96,21 @@ def combine_cdd_path(*, resp, **_):
return df


def assign_wood_to_engineering(df):
def assign_wood_to_engineering(fba, **_):
"""clean_fba_df_fxn that reclassifies Wood from 'Other' to
'Other - Wood' so that its mapping can be adjusted to only use
237990/Heavy engineering NAICS according to method in Meyer et al. 2020
:param df: df, FBA of CDDPath
:param fba: df, FBA of CDDPath
:return: df, CDDPath FBA with wood reassigned
"""

# Update wood to a new activity for improved mapping
df.loc[((df.FlowName == 'Wood') &
(df.ActivityProducedBy == 'Other')),
fba.loc[((fba.FlowName == 'Wood') &
(fba.ActivityProducedBy == 'Other')),
'ActivityProducedBy'] = 'Other - Wood'

return df
# if no mapping performed, still update units
if 'short tons' in fba['Unit'].values:
fba = standardize_units(fba)

return fba
Loading

0 comments on commit cc0e5cb

Please sign in to comment.