Skip to content

Commit

Permalink
Merge pull request #113 from ONSdigital/RDRP-468_intram_region
Browse files Browse the repository at this point in the history
Rdrp 468 intram region
  • Loading branch information
dpurches authored Oct 30, 2023
2 parents a07088b + 3e4341a commit 4566157
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 19 deletions.
14 changes: 14 additions & 0 deletions config/itl1_detailed_schema.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[ranking]
Deduced_Data_Type = "Int64"

["Area Code (ITL1)"]
Deduced_Data_Type = "str"

["Country or region (ITL1)"]
Deduced_Data_Type = "str"

[Notes]
Deduced_Data_Type = "str"

[ITL121CD]
Deduced_Data_Type = "str"
12 changes: 7 additions & 5 deletions src/developer_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ global:
output_auto_outliers: False
output_outlier_qa : False
output_estimation_qa: False
output_imputation_qa: True
output_short_form: True
output_tau: True
output_gb_sas: True
output_intram_by_pg: True
output_imputation_qa: False
output_short_form: False
output_tau: False
output_gb_sas: False
output_intram_by_pg: False
output_intram_by_itl1: True
dev_test : False
network_or_hdfs: network #whether to load from hdfs or network (local Python)
load_from_feather: True
Expand Down Expand Up @@ -74,6 +75,7 @@ network_paths:
pg_num_alpha_path: 'R:/BERD Results System Development 2023/DAP_emulation/mappers/pg_num_alpha.csv'
sic_pg_alpha_path: 'R:/BERD Results System Development 2023/DAP_emulation/mappers/sic_pg_alpha.csv'
pg_detailed_path: "R:/BERD Results System Development 2023/DAP_emulation/mappers/pg_detailed.csv"
itl1_detailed_path: "R:/BERD Results System Development 2023/DAP_emulation/mappers/itl1_detailed.csv"
schema_paths:
frozen_shortform_schema: "config/output_schemas/frozen_shortform_schema.toml"
tau_schema: "config/output_schemas/tau_schema.toml"
Expand Down
6 changes: 3 additions & 3 deletions src/outputs/gb_sas.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def output_gb_sas(
run_id: int,
ultfoc_mapper: pd.DataFrame,
cora_mapper: pd.DataFrame,
postcode_itl_mapper: pd.DataFrame,
postcode_mapper: pd.DataFrame,
pg_alpha_num: pd.DataFrame,
):
"""Run the outputs module.
Expand All @@ -32,7 +32,7 @@ def output_gb_sas(
run_id (int): The current run id
ultfoc_mapper (pd.DataFrame): The ULTFOC mapper DataFrame.
cora_mapper (pd.DataFrame): used for adding cora "form_status" column
postcode_itl_mapper (pd.DataFrame): maps the postcode to region code
postcode_mapper (pd.DataFrame): maps the postcode to region code
pg_alpha_num (pd.DataFrame): mapper of numeric PG to alpha PG
"""
Expand All @@ -59,7 +59,7 @@ def output_gb_sas(
df = map_o.map_sizebands(df)

# Map the itl regions using the postcodes
df = map_o.join_itl_regions(df, postcode_itl_mapper)
df = map_o.join_itl_regions(df, postcode_mapper)

# Map q713 and q714 to numeric format
df = map_o.map_to_numeric(df)
Expand Down
90 changes: 90 additions & 0 deletions src/outputs/intram_by_itl1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""The main file for the Intram by PG output."""
import logging
import pandas as pd
from datetime import datetime
from typing import Callable, Dict, Any

from src.outputs.outputs_helpers import aggregate_output
import src.outputs.map_output_cols as map_o

OutputMainLogger = logging.getLogger(__name__)


def output_intram_by_itl1(
df: pd.DataFrame,
config: Dict[str, Any],
write_csv: Callable,
run_id: int,
postcode_mapper: pd.DataFrame,
itl_mapper: pd.DataFrame,
itl1_detailed: pd.DataFrame,
):
"""Run the outputs module.
Args:
df (pd.DataFrame): The dataset main with weights not applied
config (dict): The configuration settings.
write_csv (Callable): Function to write to a csv file.
This will be the hdfs or network version depending on settings.
run_id (int): The current run id
postcode_mapper (pd.DataFrame): Maps postcodes to regional codes
itl_mapper (pd.DataFrame): Maps regional codes to ITL Level 1
itl1_detailed (pd.DataFrame): Details of ITL Level 1
"""

NETWORK_OR_HDFS = config["global"]["network_or_hdfs"]
paths = config[f"{NETWORK_OR_HDFS}_paths"]
output_path = paths["output_path"]

# Join region code
df = map_o.join_itl_regions(df, postcode_mapper)

# Join itl level 1
reg_code = "LAU121CD"
itl_code = "ITL121CD"
itl1_mapper = itl_mapper[[reg_code, itl_code]]
df = df.merge(itl1_mapper, how="left", left_on="itl", right_on=reg_code)

# Group by ITL level 1 and aggregate intram
value_col = "211"
agg_method = "sum"
df_agg = aggregate_output(df, [itl_code], [value_col], agg_method)

# Create UK total
value_uk = df_agg[value_col].sum()
df_uk = pd.DataFrame({itl_code: ["TLA"], value_col: value_uk})

# Create England total
itls_eng = ["TLC", "TLD", "TLE", "TLF", "TLG", "TLH", "TLI", "TLJ", "TLK"]
df_eng = df_agg[df_agg[itl_code].isin(itls_eng)]
value_eng = df_eng[value_col].sum()
df_eng = pd.DataFrame({itl_code: ["TLB"], value_col: value_eng})

# Concatinate totals
df_agg = pd.concat([df_agg, df_uk])
df_agg = pd.concat([df_agg, df_eng])

# Merge with labels and ranks
df_merge = itl1_detailed.merge(
df_agg,
how="left",
left_on=itl_code,
right_on=itl_code)
df_merge[value_col] = df_merge[value_col].fillna(0)

# Sort by rank
df_merge = df_merge.sort_values("ranking", axis=0, ascending=True)

# Select and rename the correct columns
code = "Area Code (ITL1)"
detail = "Country or region (ITL1)"
notes = "Notes"
value_title = "2023"
df_merge = df_merge[[code, detail, value_col, notes]].rename(
columns={value_col: value_title})

# Outputting the CSV file with timestamp and run_id
tdate = datetime.now().strftime("%Y-%m-%d")
filename = f"output_intram_by_itl1_{tdate}_v{run_id}.csv"
write_csv(f"{output_path}/output_intram_by_itl1/{filename}", df_merge)
29 changes: 24 additions & 5 deletions src/outputs/outputs_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from src.outputs.tau import output_tau
from src.outputs.gb_sas import output_gb_sas
from src.outputs.intram_by_pg import output_intram_by_pg
from src.outputs.intram_by_itl1 import output_intram_by_itl1

OutputMainLogger = logging.getLogger(__name__)

Expand All @@ -19,11 +20,13 @@ def run_outputs(
run_id: int,
ultfoc_mapper: pd.DataFrame,
cora_mapper: pd.DataFrame,
postcode_itl_mapper: pd.DataFrame,
postcode_mapper: pd.DataFrame,
itl_mapper: pd.DataFrame,
pg_alpha_num: pd.DataFrame,
pg_num_alpha: pd.DataFrame,
sic_pg_alpha: pd.DataFrame,
pg_detailed: pd.DataFrame,
itl1_detailed: pd.DataFrame,
):

"""Run the outputs module.
Expand All @@ -37,9 +40,11 @@ def run_outputs(
run_id (int): The current run id
ultfoc_mapper (pd.DataFrame): The ULTFOC mapper DataFrame.
cora_mapper (pd.DataFrame): used for adding cora "form_status" column
postcode_itl_mapper (pd.DataFrame): Links postcode to region code
postcode_mapper (pd.DataFrame): Links postcode to region code
itl_mapper (pd.DataFrame): Links region to ITL codes
pg_alpha_num (pd.DataFrame): Maps alpha PG to numeric PG
pg_detailed (pd.DataFrame): Detailed descriptons of alpha PG groups
itl1_detailed (pd.DataFrame): Detailed descriptons of ITL1 regions
"""
Expand All @@ -53,7 +58,7 @@ def run_outputs(
run_id,
ultfoc_mapper,
cora_mapper,
postcode_itl_mapper,
postcode_mapper,
)
OutputMainLogger.info("Finished short form output.")

Expand All @@ -67,7 +72,7 @@ def run_outputs(
run_id,
ultfoc_mapper,
cora_mapper,
postcode_itl_mapper,
postcode_mapper,
pg_alpha_num,
)
OutputMainLogger.info("Finished TAU output.")
Expand All @@ -82,7 +87,7 @@ def run_outputs(
run_id,
ultfoc_mapper,
cora_mapper,
postcode_itl_mapper,
postcode_mapper,
pg_alpha_num,
)
OutputMainLogger.info("Finished GB SAS output.")
Expand All @@ -98,3 +103,17 @@ def run_outputs(
pg_detailed,
)
OutputMainLogger.info("Finished Intram by PG output.")

# Running Intram by ITL1
if config["global"]["output_intram_by_itl1"]:
OutputMainLogger.info("Starting Intram by ITL1 output...")
output_intram_by_itl1(
estimated_df,
config,
write_csv,
run_id,
postcode_mapper,
itl_mapper,
itl1_detailed,
)
OutputMainLogger.info("Finished Intram by ITL1 output.")
8 changes: 6 additions & 2 deletions src/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ def run_pipeline(start, config_path):
secondary_full_responses,
manual_outliers,
ultfoc_mapper,
itl_mapper,
cora_mapper,
cellno_df,
postcode_itl_mapper,
postcode_mapper,
pg_alpha_num,
pg_num_alpha,
sic_pg_alpha,
pg_detailed,
itl1_detailed,
) = run_staging(
config,
check_file_exists,
Expand Down Expand Up @@ -154,11 +156,13 @@ def run_pipeline(start, config_path):
run_id,
ultfoc_mapper,
cora_mapper,
postcode_itl_mapper,
postcode_mapper,
itl_mapper,
pg_alpha_num,
pg_num_alpha,
sic_pg_alpha,
pg_detailed,
itl1_detailed,
)

MainLogger.info("Finished All Output modules.")
Expand Down
18 changes: 14 additions & 4 deletions src/staging/staging_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def run_staging(
ultfoc_mapper (pd.DataFrame): Foreign ownership mapper,
cora_mapper (pd.DataFrame): CORA status mapper,
cellno_df (pd.DataFrame): Cell numbers mapper,
postcode_df (pd.DataFrame): Postcodes to Regional Code mapper,
postcode_mapper (pd.DataFrame): Postcodes to Regional Code mapper,
pg_alpha_num (pd.DataFrame): Product group alpha to numeric mapper.
pg_num_alpha (pd.DataFrame): Product group numeric to alpha mapper.
sic_pg_alpha (pd.DataFrame): SIC code to product group alpha mapper.
Expand Down Expand Up @@ -207,8 +207,8 @@ def run_staging(
StagingMainLogger.info("Starting PostCode Validation")
postcode_masterlist = paths["postcode_masterlist"]
check_file_exists(postcode_masterlist)
postcode_df = read_csv(postcode_masterlist)
postcode_masterlist = postcode_df["pcd2"]
postcode_mapper = read_csv(postcode_masterlist)
postcode_masterlist = postcode_mapper["pcd2"]
invalid_df = val.validate_post_col(full_responses, postcode_masterlist, config)
StagingMainLogger.info("Saving Invalid Postcodes to File")
pcodes_folder = paths["postcode_path"]
Expand Down Expand Up @@ -316,6 +316,14 @@ def run_staging(
val.validate_data_with_schema(pg_detailed, "./config/pg_detailed_schema.toml")
StagingMainLogger.info("PG detailed mapper File Loaded Successfully...")

# Loading ITL1 detailed mapper
StagingMainLogger.info("Loading ITL1 detailed mapper File...")
itl1_detailed_path = paths["itl1_detailed_path"]
check_file_exists(itl1_detailed_path)
itl1_detailed = read_csv(itl1_detailed_path)
val.validate_data_with_schema(itl1_detailed, "./config/itl1_detailed_schema.toml")
StagingMainLogger.info("ITL1 detailed mapper File Loaded Successfully...")

# Output the staged BERD data for BaU testing when on local network.
if config["global"]["output_full_responses"]:
StagingMainLogger.info("Starting output of staged BERD data...")
Expand All @@ -332,11 +340,13 @@ def run_staging(
secondary_full_responses,
manual_outliers,
ultfoc_mapper,
itl_mapper,
cora_mapper,
cellno_df,
postcode_df,
postcode_mapper,
pg_alpha_num,
pg_num_alpha,
sic_pg_alpha,
pg_detailed,
itl1_detailed,
)

0 comments on commit 4566157

Please sign in to comment.