Skip to content

Commit

Permalink
Merge pull request #231 from USEPA/state_ghg
Browse files Browse the repository at this point in the history
GHG datasets for states
  • Loading branch information
bl-young authored Jun 8, 2022
2 parents e9939c5 + 938041a commit e0b19aa
Show file tree
Hide file tree
Showing 27 changed files with 2,095 additions and 158 deletions.
1,117 changes: 1,117 additions & 0 deletions flowsa/data/activitytosectormapping/NAICS_Crosswalk_BEA_2012_Summary.csv

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions flowsa/data/source_catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ EPA_GHGI:
sector-like_activities: False
activity_schema:
sector_aggregation_level: "aggregated"
EPA_StateGHGI:
class:
- Chemicals
sector-like_activities: False
activity_schema:
sector_aggregation_level: "aggregated"
EPA_SIT:
class:
- Chemicals
sector-like_activities: False
activity_schema:
sector_aggregation_level: "aggregated"
EPA_NEI_Nonpoint:
class:
- Chemicals
Expand Down Expand Up @@ -170,6 +182,12 @@ StatCan_LFS:
sector-like_activities: False
activity_schema:
sector_aggregation_level: "aggregated"
stateio:
class:
- Money
sector-like_activities: False #update to true once alternate activity_schema in place
# activity_schema: BEA_2012_Summary_Code
sector_aggregation_level: "disaggregated"
USDA_CoA_Cropland:
class:
- Land
Expand Down
48 changes: 20 additions & 28 deletions flowsa/data_source_scripts/BEA.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,8 @@ def bea_use_detail_br_parse(*, year, **_):
f'_Detail_Use_PRO_BeforeRedef.csv'
df_raw = pd.read_csv(csv_load)

# first column is the commodity being consumed
df = df_raw.rename(columns={'Unnamed: 0': 'ActivityProducedBy'})

# use "melt" fxn to convert colummns into rows
df = df.melt(id_vars=["ActivityProducedBy"],
var_name="ActivityConsumedBy",
value_name="FlowAmount")

df['Year'] = str(year)
# hardcode data
df['FlowName'] = "USD" + str(year)
df["Class"] = "Money"
df["FlowType"] = "TECHNOSPHERE_FLOW"
df['Description'] = 'BEA_2012_Detail_Code'
df = bea_detail_parse(df_raw, year)
df["SourceName"] = "BEA_Use_Detail_PRO_BeforeRedef"
df["Location"] = US_FIPS
df['LocationSystem'] = "FIPS_2015"
# original unit in million USD
df['FlowAmount'] = df['FlowAmount'] * 1000000
df["Unit"] = "USD"
df['DataReliability'] = 5 # tmp
df['DataCollection'] = 5 # tmp

return df

Expand All @@ -96,10 +76,17 @@ def bea_make_detail_br_parse(*, year, **_):
flowbyactivity specifications
"""
# Read directly into a pandas df
df_raw = pd.read_csv(externaldatapath + "BEA_" + str(year) +
"_Detail_Make_BeforeRedef.csv")
csv_load = f'{externaldatapath}BEA_{str(year)}' \
f'_Detail_Make_BeforeRedef.csv'
df_raw = pd.read_csv(csv_load)

df = bea_detail_parse(df_raw, year)
df["SourceName"] = "BEA_Make_Detail_BeforeRedef"

# first column is the industry
return df


def bea_detail_parse(df_raw, year):
df = df_raw.rename(columns={'Unnamed: 0': 'ActivityProducedBy'})

# use "melt" fxn to convert colummns into rows
Expand All @@ -109,19 +96,17 @@ def bea_make_detail_br_parse(*, year, **_):

df['Year'] = str(year)
# hardcode data
df['FlowName'] = "USD" + str(year)
df['FlowName'] = f"USD{str(year)}"
df["Class"] = "Money"
df["FlowType"] = "TECHNOSPHERE_FLOW"
df['Description'] = 'BEA_2012_Detail_Code'
df["SourceName"] = "BEA_Make_Detail_BeforeRedef"
df["Location"] = US_FIPS
df['LocationSystem'] = "FIPS_2015"
# original unit in million USD
df['FlowAmount'] = df['FlowAmount'] * 1000000
df["Unit"] = "USD"
df['DataReliability'] = 5 # tmp
df['DataCollection'] = 5 # tmp

return df


Expand Down Expand Up @@ -187,9 +172,16 @@ def subset_and_allocate_BEA_table(df, attr, **_):
"""
Temporary function to mimic use of 2nd helper allocation dataset
"""

df = subset_BEA_table(df, attr)
v = {'geoscale_to_use': 'national'}
method2 = {'target_sector_source': 'NAICS_2012_Code'}

import importlib
fxn = getattr(importlib.import_module(
'flowsa.data_source_scripts.BLS_QCEW'),
"bls_clean_allocation_fba_w_sec")

attr2 = {"helper_source": "BLS_QCEW",
"helper_method": "proportional",
"helper_source_class": "Employment",
Expand All @@ -200,7 +192,7 @@ def subset_and_allocate_BEA_table(df, attr, **_):
"Number of employees, Private"],
"helper_from_scale": "national",
"allocation_from_scale": "national",
"clean_helper_fba_wsec": "bls_clean_allocation_fba_w_sec"}
"clean_helper_fba_wsec": fxn}
df2 = allocation_helper(df, attr2, method2, v, False)
# Drop remaining rows with no sectors e.g. T001 and other final demands
df2 = df2.dropna(subset=['SectorConsumedBy']).reset_index(drop=True)
Expand Down
14 changes: 7 additions & 7 deletions flowsa/data_source_scripts/BLS_QCEW.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,21 @@ def bls_qcew_parse(*, df_list, year, **_):
return df2


def clean_bls_qcew_fba_for_employment_sat_table(fba_df, **kwargs):
def clean_bls_qcew_fba_for_employment_sat_table(fba, **_):
"""
When creating the employment satellite table for use in useeior,
modify the flow name to match prior methodology for mapping/impact factors
modify the flow name to match prior methodology for mapping/impact factors.
clean_fba_df_fxn
:param fba_df: df, flowbyactivity
:param kwargs: dictionary, can include attr, a dictionary of parameters
in the FBA method yaml
:param fba: df, flowbyactivity
:return: df, flowbyactivity, with modified flow names
"""

# rename flowname value
for c in ['FlowName', 'Flowable']:
fba_df[c] = fba_df[c].str.replace('Number of employees', 'Jobs')
fba[c] = fba[c].str.replace('Number of employees', 'Jobs')

return fba_df
return fba


def bls_clean_allocation_fba_w_sec(df_w_sec, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,11 @@ def calR_parse(*, year, **_):
return output


def keep_generated_quantity(fba, **kwargs):
def keep_generated_quantity(fba, **_):
"""
Function to clean CalRecycles FBA to remove quantities not
assigned as Generated
:param fba: df, FBA format
:param kwargs: dictionary, can include attr, a dictionary of parameters in
the FBA method yaml
:return: df, modified CalRecycles FBA
"""
fba = fba[fba['Description'] == 'Generated'].reset_index(drop=True)
Expand Down
6 changes: 3 additions & 3 deletions flowsa/data_source_scripts/EIA_CBECS_Land.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,15 @@ def standardize_eia_cbecs_land_activity_names(df, column_to_standardize):
return df


def cbecs_land_fba_cleanup(fba_load):
def cbecs_land_fba_cleanup(fba, **_):
"""
Clean up the land fba for use in allocation
:param fba_load: df, eia cbecs land flowbyactivity format
:param fba: df, eia cbecs land flowbyactivity format
:return: df, flowbyactivity with modified values
"""

# estimate floor space using number of floors
fba = calculate_floorspace_based_on_number_of_floors(fba_load)
fba = calculate_floorspace_based_on_number_of_floors(fba)

# calculate the land area in addition to building footprint
fba1 = calculate_total_facility_land_area(fba)
Expand Down
6 changes: 3 additions & 3 deletions flowsa/data_source_scripts/EIA_MECS.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def eia_mecs_energy_clean_allocation_fba_w_sec(
return df2


def mecs_land_fba_cleanup(fba):
def mecs_land_fba_cleanup(fba, **_):
"""
Modify the EIA MECS Land FBA
:param fba: df, EIA MECS Land FBA format
Expand All @@ -452,15 +452,15 @@ def mecs_land_fba_cleanup(fba):
return fba


def mecs_land_fba_cleanup_for_land_2012_fbs(fba):
def mecs_land_fba_cleanup_for_land_2012_fbs(fba, **_):
"""
The 'land_national_2012' FlowBySector uses MECS 2014 data, set
MECS year to 2012
:param fba: df, EIA MECS Land, FBA format
:return: df, EIA MECS Land FBA modified
"""

fba = mecs_land_fba_cleanup(fba)
fba = mecs_land_fba_cleanup(fba=fba)

# reset the EIA MECS Land year from 2014 to 2012 to match
# the USDA ERS MLU year
Expand Down
14 changes: 7 additions & 7 deletions flowsa/data_source_scripts/EPA_CDDPath.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,21 @@ def combine_cdd_path(*, resp, **_):
return df


def assign_wood_to_engineering(df):
def assign_wood_to_engineering(fba, **_):
"""clean_fba_df_fxn that reclassifies Wood from 'Other' to
'Other - Wood' so that its mapping can be adjusted to only use
237990/Heavy engineering NAICS according to method in Meyer et al. 2020
:param df: df, FBA of CDDPath
:param fba: df, FBA of CDDPath
:return: df, CDDPath FBA with wood reassigned
"""

# Update wood to a new activity for improved mapping
df.loc[((df.FlowName == 'Wood') &
(df.ActivityProducedBy == 'Other')),
fba.loc[((fba.FlowName == 'Wood') &
(fba.ActivityProducedBy == 'Other')),
'ActivityProducedBy'] = 'Other - Wood'

# if no mapping performed, still update units
if 'short tons' in df['Unit'].values:
df = standardize_units(df)
if 'short tons' in fba['Unit'].values:
fba = standardize_units(fba)

return df
return fba
64 changes: 41 additions & 23 deletions flowsa/data_source_scripts/EPA_GHGI.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
from flowsa.dataclean import replace_NoneType_with_empty_cells
from flowsa.settings import log, externaldatapath
from flowsa.schema import flow_by_activity_fields
from flowsa.common import load_yaml_dict
from flowsa.data_source_scripts import EIA_MECS


SECTOR_DICT = {'Res.': 'Residential',
'Comm.': 'Commercial',
'Ind.': 'Industrial',
Expand Down Expand Up @@ -635,18 +637,35 @@ def get_manufacturing_energy_ratios(year):
'Natural Gas': 'Natural Gas',
}

# TODO make this year dynamic
def closest_value(input_list, input_value):
difference = lambda input_list : abs(input_list - input_value)
return min(input_list, key=difference)

mecs_year = closest_value(load_yaml_dict('EIA_MECS_Energy',
flowbytype='FBA').get('years'),
year)

# Filter MECS for total national energy consumption for manufacturing sectors
mecs = load_fba_w_standardized_units(datasource='EIA_MECS_Energy',
year=year,
year=mecs_year,
flowclass='Energy')
mecs = mecs.loc[(mecs['ActivityConsumedBy'] == '31-33') &
(mecs['Location'] == '00000')].reset_index(drop=True)
mecs = EIA_MECS.mecs_energy_fba_cleanup(mecs, None)

# TODO dynamically change the table imported here based on year
ghgi = load_fba_w_standardized_units(datasource='EPA_GHGI_T_A_14',
year=2016,
# Identify the GHGI table that matches EIA_MECS
for t, v in (load_yaml_dict('EPA_GHGI', 'FBA')
.get('Annex').get('Annex').items()):
if ((v.get('class') == 'Energy')
& ('Energy Consumption Data' in v.get('desc'))
& (v.get('year') == str(mecs_year))):
table = f"EPA_GHGI_T_{t.replace('-', '_')}"
break
else:
log.error('unable to identify corresponding GHGI table')

ghgi = load_fba_w_standardized_units(datasource=table,
year=mecs_year,
flowclass='Energy')
ghgi = ghgi[ghgi['ActivityConsumedBy']=='Industrial'].reset_index(drop=True)

Expand All @@ -661,17 +680,15 @@ def get_manufacturing_energy_ratios(year):
return pct_dict


def allocate_industrial_combustion(df):
def allocate_industrial_combustion(fba, source_dict, **_):
"""
Split industrial combustion emissions into two buckets to be further allocated.
clean_fba_df_fxn. Calculate the percentage of fuel consumption captured in
EIA MECS relative to EPA GHGI. Create new activities to distinguish those
which use EIA MECS as allocation source and those that use alternate source.
"""
# TODO make this year dynamic
year = 2014
pct_dict = get_manufacturing_energy_ratios(year)
pct_dict = get_manufacturing_energy_ratios(source_dict.get('year'))

# activities reflect flows in A_14 and 3_8 and 3_9
activities_to_split = {'Industrial Other Coal Industrial': 'Coal',
Expand All @@ -680,29 +697,30 @@ def allocate_industrial_combustion(df):
'Natural gas industrial': 'Natural Gas'}

for activity, fuel in activities_to_split.items():
df_subset = df.loc[df['ActivityProducedBy'] == activity].reset_index(drop=True)
df_subset = fba.loc[fba['ActivityProducedBy'] == activity].reset_index(drop=True)
if len(df_subset) == 0:
continue
df_subset['FlowAmount'] = df_subset['FlowAmount'] * pct_dict[fuel]
df_subset['ActivityProducedBy'] = f"{activity} - Manufacturing"
df.loc[df['ActivityProducedBy'] == activity,
'FlowAmount'] = df['FlowAmount'] * (1-pct_dict[fuel])
df = pd.concat([df, df_subset], ignore_index=True)
fba.loc[fba['ActivityProducedBy'] == activity,
'FlowAmount'] = fba['FlowAmount'] * (1-pct_dict[fuel])
fba = pd.concat([fba, df_subset], ignore_index=True)

return df
return fba


def split_HFCs_by_type(df):
"""Speciates HFCs and PFCs for all activities based on T_4_99."""
def split_HFCs_by_type(fba, **_):
"""Speciates HFCs and PFCs for all activities based on T_4_99.
clean_fba_before_mapping_df_fxn"""
splits = load_fba_w_standardized_units(datasource='EPA_GHGI_T_4_99',
year=df['Year'][0])
year=fba['Year'][0])
splits['pct'] = splits['FlowAmount'] / splits['FlowAmount'].sum()
splits = splits[['FlowName', 'pct']]

speciated_df = df.apply(lambda x: [p * x['FlowAmount'] for p in splits['pct']],
speciated_df = fba.apply(lambda x: [p * x['FlowAmount'] for p in splits['pct']],
axis=1, result_type='expand')
speciated_df.columns = splits['FlowName']
speciated_df = pd.concat([df, speciated_df], axis=1)
speciated_df = pd.concat([fba, speciated_df], axis=1)
speciated_df = speciated_df.melt(id_vars=flow_by_activity_fields.keys(),
var_name='Flow')
speciated_df['FlowName'] = speciated_df['Flow']
Expand Down Expand Up @@ -782,20 +800,20 @@ def split_HFC_foams(df):
return df


def clean_HFC_fba(df):
def clean_HFC_fba(fba, **_):
"""Adjust HFC emissions for improved parsing.
clean_fba_before_mapping_df_fxn used in EPA_GHGI_T_4_101."""
df = subtract_HFC_transport_emissions(df)
df = subtract_HFC_transport_emissions(fba)
df = allocate_HFC_to_residential(df)
df = split_HFC_foams(df)
df = split_HFCs_by_type(df)
return df


def remove_HFC_kt(df):
def remove_HFC_kt(fba, **_):
"""Remove records of emissions in kt, data are also provided in MMT CO2e.
clean_fba_before_mapping_df_fxn used in EPA_GHGI_T_4_50."""
return df.loc[df['Unit'] != 'kt']
return fba.loc[fba['Unit'] != 'kt']


def adjust_transport_activities(df, **_):
Expand Down
Loading

0 comments on commit e0b19aa

Please sign in to comment.