Skip to content

Commit

Permalink
Merge pull request #327 from ONSdigital/RDRP-990_remap_itl
Browse files Browse the repository at this point in the history
RDRP 990 remap itl cols after imputation
  • Loading branch information
JenCheshire authored Sep 10, 2024
2 parents 94f54ff + ca0f3c5 commit ff89f11
Show file tree
Hide file tree
Showing 20 changed files with 211 additions and 161 deletions.
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
junit_family=xunit1
addopts = -vv -rs
testpaths = "./tests"
filterwarnings =
ignore::DeprecationWarning

2 changes: 1 addition & 1 deletion src/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.1.6"
__version__ = "1.1.7"
74 changes: 37 additions & 37 deletions src/freezing/freezing_apply_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def apply_freezing(
check_file_exists: Callable,
read_csv: Callable,
run_id: int,
freezing_logger: logging.Logger,
FreezingLogger: logging.Logger,
) -> pd.DataFrame:
"""Read user-edited freezing files and apply them to the main snapshot.
Args:
Expand All @@ -25,15 +25,15 @@ def apply_freezing(
read_csv (callable): Function to read a csv file. This will be the hdfs or
network version depending on settings.
run_id (int): The run id for this run.
freezing_logger (logging.Logger): The logger to log to.
FreezingLogger (logging.Logger): The logger to log to.
Returns:
constructed_df (pd.DataFrame): As main_df but with records amended and added
from the freezing files.
"""
# Prepare filepaths to read from
network_or_hdfs = config["global"]["network_or_hdfs"]
paths = config[f"{network_or_hdfs}_paths"]
platform = config["global"]["platform"]
paths = config[f"{platform}_paths"]
amendments_filepath = paths["freezing_amendments_path"]
additions_filepath = paths["freezing_additions_path"]

Expand All @@ -43,38 +43,38 @@ def apply_freezing(

# If each file exists, read it and call the function to apply them
if not (amendments_exist or additions_exist):
freezing_logger.info("No amendments or additions to apply, skipping...")
FreezingLogger.info("No amendments or additions to apply, skipping...")
return main_df

# apply amendments
if amendments_exist:
amendments_df = read_csv(amendments_filepath)
if amendments_df.empty:
freezing_logger.warning(
FreezingLogger.warning(
f"Amendments file ({amendments_filepath}) is empty, skipping..."
)
else:
main_df = apply_amendments(
main_df,
main_df,
amendments_df,
run_id,
freezing_logger,
run_id,
FreezingLogger,
)

# apply additions
if additions_exist:
additions_df = read_csv(additions_filepath)
if additions_df.empty:
freezing_logger.warning(
FreezingLogger.warning(
f"Additions file {additions_filepath} is empty, skipping..."
)
else:
additions_df["instance"] = additions_df["instance"].astype("Int64")
main_df = apply_additions(
main_df,
additions_df,
run_id,
freezing_logger
main_df,
additions_df,
run_id,
FreezingLogger
)

return main_df
Expand Down Expand Up @@ -116,7 +116,7 @@ def validate_all_refinst_in_frozen(
df2 (pd.DataFrame): The ammendments/additions df.
Returns:
bool: Whether all ref/inst are in the dataframe.
bool: Whether all ref/inst are in the dataframe.
"""
frozen_copy = frozen_df.copy()
frozen_copy["refinst"] = (
Expand All @@ -133,26 +133,26 @@ def validate_all_refinst_in_frozen(
def validate_amendments_df(
frozen_df: pd.DataFrame,
amendments_df: pd.DataFrame,
freezing_logger: logging.Logger,
FreezingLogger: logging.Logger,
) -> bool:
"""Validate the amendments df.
Args:
frozen_df (pd.DataFrame): The frozen csv df.
amendments_df (pd.DataFrame): The amendments df.
freezing_logger (logging.Logger): The logger to log to.
FreezingLogger (logging.Logger): The logger to log to.
Returns:
bool: Whether or not the amendments df is valid.
"""
# check that all ref/inst combs are in staged frozen data
freezing_logger.info(
FreezingLogger.info(
"Checking if all ref/inst in the amendments df are present in the frozen"
" data..."
)
present = validate_all_refinst_in_frozen(frozen_df, amendments_df)
if not present:
freezing_logger.info(
if not present:
FreezingLogger.info(
"Not all reference/instance combinations found within the amendments"
" file are present in the snapshot."
)
Expand All @@ -162,27 +162,27 @@ def validate_amendments_df(
def validate_additions_df(
frozen_df: pd.DataFrame,
additions_df: pd.DataFrame,
freezing_logger: logging.Logger,
FreezingLogger: logging.Logger,
) -> None:
"""Validate the additions df.
Args:
frozen_df (pd.DataFrame): The frozen csv df.
additions_df (pd.DataFrame): The additions df.
freezing_logger (logging.Logger): The logger to log to.
FreezingLogger (logging.Logger): The logger to log to.
Returns:
bool: Whether or not the additions df is valid.
"""
# check that all ref/inst combs are not staged frozen data
freezing_logger.info(
FreezingLogger.info(
"Checking if all ref/inst in the amendments df are missing from the frozen"
" data..."
)

any_present = validate_any_refinst_in_frozen(frozen_df, additions_df)
if any_present:
freezing_logger.info(
if any_present:
FreezingLogger.info(
"Some reference/instance combinations from the additions file are "
"present in the frozen data."
)
Expand All @@ -194,21 +194,21 @@ def apply_amendments(
main_df: pd.DataFrame,
amendments_df: pd.DataFrame,
run_id: int,
freezing_logger: logging.Logger
FreezingLogger: logging.Logger
) -> pd.DataFrame:
"""Apply amendments to the main snapshot.
Args:
main_df (pd.DataFrame): The main snapshot.
amendments_df (pd.DataFrame): The amendments to apply.
run_id (int): The current run id.
freezing_logger (logging.Logger): The logger.
FreezingLogger (logging.Logger): The logger.
Returns:
amended_df (pd.DataFrame): The main snapshot with amendments applied.
"""
if not validate_amendments_df(main_df, amendments_df, freezing_logger):
freezing_logger.info("Skipping amendments since the amendments csv is invalid...")
if not validate_amendments_df(main_df, amendments_df, FreezingLogger):
FreezingLogger.info("Skipping amendments since the amendments csv is invalid...")
return main_df

changes_refs = amendments_df[
Expand All @@ -220,7 +220,7 @@ def apply_amendments(
]

if accepted_amendments_df.shape[0] == 0:
freezing_logger.info(
FreezingLogger.info(
"Amendments file contained no records marked for inclusion"
)
return main_df
Expand All @@ -242,7 +242,7 @@ def apply_amendments(
main_df = main_df[~main_df.reference.isin(changes_refs)]
# add amended records to main df
amended_df = pd.concat([main_df, accepted_amendments_df])
freezing_logger.info(
FreezingLogger.info(
f"{accepted_amendments_df.shape[0]} records amended during freezing"
)
return amended_df
Expand All @@ -252,21 +252,21 @@ def apply_additions(
main_df: pd.DataFrame,
additions_df: pd.DataFrame,
run_id: int,
freezing_logger: logging.Logger
FreezingLogger: logging.Logger
) -> pd.DataFrame:
"""Apply additions to the main snapshot.
Args:
main_df (pd.DataFrame): The main snapshot.
additions_df (pd.DataFrame): The additions to apply.
run_id (int): The current run id.
freezing_logger (logging.Logger): The logger.
FreezingLogger (logging.Logger): The logger.
Returns:
added_df (pd.DataFrame): The main snapshot with additions applied.
"""
if not validate_additions_df(main_df, additions_df, freezing_logger):
freezing_logger.info("Skipping additions since the additions csv is invalid...")
if not validate_additions_df(main_df, additions_df, FreezingLogger):
FreezingLogger.info("Skipping additions since the additions csv is invalid...")
return main_df
# Drop records where accept_changes is False and if any remain, add them to main df
changes_refs = additions_df[
Expand All @@ -280,10 +280,10 @@ def apply_additions(
if accepted_additions_df.shape[0] > 0:
accepted_additions_df = _add_last_frozen_column(accepted_additions_df, run_id)
added_df = pd.concat([main_df, accepted_additions_df], ignore_index=True)
freezing_logger.info(
FreezingLogger.info(
f"{accepted_additions_df.shape[0]} records added during freezing"
)
else:
freezing_logger.info("Additions file contained no records marked for inclusion")
FreezingLogger.info("Additions file contained no records marked for inclusion")
return main_df
return added_df
11 changes: 8 additions & 3 deletions src/freezing/freezing_compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import pandas as pd
from typing import Callable

FreezingLogger = logging.getLogger(__name__)

def get_amendments(
frozen_csv: pd.DataFrame,
updated_snapshot: pd.DataFrame,
FreezingLogger: logging.Logger
) -> pd.DataFrame:
"""Get amended records from updated snapshot.
Expand All @@ -19,6 +19,7 @@ def get_amendments(
frozen_csv (pd.DataFrame): The staged and validated frozen data.
updated_snapshot (pd.DataFrame): The staged and validated updated
snapshot data.
FreezingLogger (logging.Logger): The logger to log to.
Returns:
amendments_df (pd.DataFrame): The records that have changed.
Expand Down Expand Up @@ -100,13 +101,14 @@ def get_amendments(

return amendments_df
else:
freezing_logger.info("No amendments found.")
FreezingLogger.info("No amendments found.")
return None


def get_additions(
frozen_csv: pd.DataFrame,
updated_snapshot: pd.DataFrame,
FreezingLogger: logging.Logger
) -> pd.DataFrame:
"""Get added records from the updated snapshot.
Expand All @@ -115,6 +117,7 @@ def get_additions(
Args:
frozen_csv (pd.DataFrame): The staged and validated frozen data.
updated_snapshot (pd.DataFrame): The staged and validated updated snapshot data.
FreezingLogger (logging.Logger): The logger to log to.
Returns:
additions_df (pd.DataFrame): The new records identified in the updated snapshot data.
Expand Down Expand Up @@ -146,7 +149,7 @@ def get_additions(
additions_df["accept_changes"] = False
return additions_df
else:
freezing_logger.info("No additions found.")
FreezingLogger.info("No additions found.")
return None


Expand All @@ -156,6 +159,7 @@ def output_freezing_files(
config: dict,
write_csv: Callable,
run_id: int,
FreezingLogger: logging.Logger
) -> bool:
"""Save CSVs of amendments and additions for user approval.
Expand All @@ -166,6 +170,7 @@ def output_freezing_files(
write_csv (callable): Function to write to a csv file. This will be the
hdfs or network version depending on settings.
run_id (int): The run id for this run.
FreezingLogger (logging.Logger): The logger to log to.
Returns:
bool: True if the files were written successfully.
Expand Down
6 changes: 3 additions & 3 deletions src/freezing/freezing_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ def run_freezing(
frozen_data_for_comparison = frozen_data_for_comparison.convert_dtypes()

# Use the updated snapshot to generate freezing files for the next run
additions_df = get_additions(frozen_data_for_comparison, updated_snapshot)
amendments_df = get_amendments(frozen_data_for_comparison, updated_snapshot)
additions_df = get_additions(frozen_data_for_comparison, updated_snapshot, FreezingLogger)
amendments_df = get_amendments(frozen_data_for_comparison, updated_snapshot, FreezingLogger)
output_freezing_files(
amendments_df, additions_df, config, write_csv, run_id
amendments_df, additions_df, config, write_csv, run_id, FreezingLogger
)
prepared_frozen_data = snapshot_df.copy()

Expand Down
3 changes: 2 additions & 1 deletion src/imputation/MoR.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def carry_forwards(df, backdata, impute_vars):
df.loc[match_cond, var] = df.loc[match_cond, f"{var}_prev"]

# Update the postcodes_harmonised column from the updated column 601
df.loc[match_cond, "postcodes_harmonised"] = df.loc[match_cond, "601"]
pc_update_cond = match_cond & df["601"].notnull()
df.loc[pc_update_cond, "postcodes_harmonised"] = df.loc[match_cond, "601"]

# Update the imputation classes based on the new 200 and 201 values
df = create_imp_class_col(df, "200", "201")
Expand Down
5 changes: 1 addition & 4 deletions src/imputation/imputation_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,7 @@ def calculate_totals(df):
return df


def tidy_imputation_dataframe(
df: pd.DataFrame,
to_impute_cols: List,
) -> pd.DataFrame:
def tidy_imputation_dataframe(df: pd.DataFrame, to_impute_cols: List) -> pd.DataFrame:
"""Update cols with imputed values and remove rows and columns no longer needed.
Args:
Expand Down
Loading

0 comments on commit ff89f11

Please sign in to comment.