From 840123fedf60d7746e894f88d0e1246e024c5538 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Mon, 9 Sep 2024 15:22:48 +0100 Subject: [PATCH 01/16] 990 itl_mapper GB only --- pytest.ini | 3 +++ src/imputation/imputation_main.py | 31 ++++++++++++---------- src/mapping/itl_mapping.py | 29 ++++++++++---------- src/mapping/mapping_helpers.py | 4 ++- src/mapping/mapping_main.py | 7 +++-- src/pipeline.py | 2 +- tests/test_mapping/test_itl_mapping.py | 31 ++-------------------- tests/test_mapping/test_mapping_helpers.py | 20 +++++++++++--- 8 files changed, 61 insertions(+), 66 deletions(-) diff --git a/pytest.ini b/pytest.ini index 10412e12a..81813b693 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,3 +2,6 @@ junit_family=xunit1 addopts = -vv -rs testpaths = "./tests" +filterwarnings = + ignore::DeprecationWarning + diff --git a/src/imputation/imputation_main.py b/src/imputation/imputation_main.py index 596ff38cd..7e4c60d3d 100644 --- a/src/imputation/imputation_main.py +++ b/src/imputation/imputation_main.py @@ -10,12 +10,11 @@ from src.staging.validation import load_schema from src.imputation.apportionment import run_apportionment from src.imputation.short_to_long import run_short_to_long - -# from src.imputation.MoR import run_mor from src.imputation.sf_expansion import run_sf_expansion from src.imputation import manual_imputation as mimp from src.imputation.MoR import run_mor from src.construction.construction_main import run_construction +from src.mapping.itl_mapping import join_itl_regions from src.outputs.outputs_helpers import create_output_df @@ -135,6 +134,22 @@ def run_imputation( ["reference", "instance"], ascending=[True, True] ).reset_index(drop=True) + ImputationMainLogger.info("Finished Imputation calculation.") + + # run postcode construction now that impuation is complete + run_postcode_construction = config["global"]["run_postcode_construction"] + if run_postcode_construction: + imputed_df = run_construction( + imputed_df, + config, + rd_file_exists, + rd_read_csv, + is_run_postcode_construction = True, + ) + + # Re-calculate itl mapping now that imputation and postcode construction are done. + imputed_df = join_itl_regions(full_responses, postcode_mapper, itl_mapper, config) + # Output QA files tdate = datetime.now().strftime("%y-%m-%d") survey_year = config["years"]["survey_year"] @@ -160,18 +175,6 @@ def run_imputation( write_csv(os.path.join(qa_path, links_filename), links_df) write_csv(os.path.join(qa_path, trimmed_counts_filename), trim_counts_qa) - ImputationMainLogger.info("Finished Imputation calculation.") - - run_postcode_construction = config["global"]["run_postcode_construction"] - if run_postcode_construction: - imputed_df = run_construction( - imputed_df, - config, - rd_file_exists, - rd_read_csv, - is_run_postcode_construction = True, - ) - # remove rows and columns no longer needed from the imputed dataframe imputed_df = hlp.tidy_imputation_dataframe( imputed_df, diff --git a/src/mapping/itl_mapping.py b/src/mapping/itl_mapping.py index 8c152a8d0..a5f6c140c 100644 --- a/src/mapping/itl_mapping.py +++ b/src/mapping/itl_mapping.py @@ -6,28 +6,33 @@ def join_itl_regions( - responses: Tuple[pd.DataFrame, pd.DataFrame], + df: pd.DataFrame, postcode_mapper: pd.DataFrame, itl_mapper: pd.DataFrame, config: dict, pc_col: str = "postcodes_harmonised", ): - """Joins the itl regions onto the full dataframe using the mapper provided + """Joins the itl regions onto the full dataframe using the mapper provided. + + First, the itl column is added to the dataframe by joining the postcode_mapper. + Then the itl mapper is joined to add the region columns. Args: - responses (Tuple[pd.DataFrame, pd.DataFrame]): The GB & NI responses dataframes + df (pd.DataFrame): The BERD responses dataframes postcode_mapper (pd.DataFrame): Mapper containing postcodes and regions itl_mapper (pd.DataFrame): Mapper containing ITL regions config (dict): Pipeline configuration settings pc_col (str, optional): The column name for the postcodes. Returns: - Tuple(gb_df: pd.DataFrame, ni_df: pd.DataFrame): dfs with the ITL regions joined + pd.DataFrame: the responses dataframe with the ITL regions joined + + Unit Test: + See [test_itl_mapping](./tests/mapping/test_itl_mapping.py) """ - gb_df, ni_df = responses - # first create itl column + # first create itl column postcode_mapper = postcode_mapper.rename(columns={"pcd2": pc_col}) - gb_df = join_with_null_check(gb_df, postcode_mapper, "postcode mapper", pc_col) + df = join_with_null_check(df, postcode_mapper, "postcode mapper", pc_col) # next join the itl mapper to add the region columns gb_itl_col = config["mappers"]["gb_itl"] @@ -35,12 +40,6 @@ def join_itl_regions( itl_mapper = itl_mapper[geo_cols].rename(columns={gb_itl_col: "itl"}) # TODO: remove the "warn" parameter when the ITL mapper is fixed - gb_df = join_with_null_check(gb_df, itl_mapper, "itl mapper", "itl", warn=True) - - pd.set_option("display.max_columns", None) - pd.set_option("display.max_rows", None) - # if the ni_df is not empty, add the itl column to it - if not ni_df.empty: - ni_df["itl"] = config["mappers"]["ni_itl"] + df = join_with_null_check(df, itl_mapper, "itl mapper", "itl", warn=True) - return gb_df, ni_df + return df diff --git a/src/mapping/mapping_helpers.py b/src/mapping/mapping_helpers.py index 9aea99dc8..ddd44334c 100644 --- a/src/mapping/mapping_helpers.py +++ b/src/mapping/mapping_helpers.py @@ -198,12 +198,13 @@ def update_ref_list(full_df: pd.DataFrame, ref_list_df: pd.DataFrame) -> pd.Data return df -def create_additional_ni_cols(ni_full_responses: pd.DataFrame) -> pd.DataFrame: +def create_additional_ni_cols(ni_full_responses: pd.DataFrame, config: dict) -> pd.DataFrame: """ Create additional columns for Northern Ireland data. Args: df (pd.DataFrame): The main DataFrame. + config (dict): The pipeline configuration settings. Returns: pd.DataFrame: The DataFrame with additional columns. @@ -214,5 +215,6 @@ def create_additional_ni_cols(ni_full_responses: pd.DataFrame) -> pd.DataFrame: ni_full_responses["form_status"] = 600 ni_full_responses["602"] = 100.0 ni_full_responses["formtype"] = "0003" + ni_full_responses["itl"] = config["mappers"]["ni_itl"] return ni_full_responses diff --git a/src/mapping/mapping_main.py b/src/mapping/mapping_main.py index 7e948eb38..bf07f2299 100644 --- a/src/mapping/mapping_main.py +++ b/src/mapping/mapping_main.py @@ -102,11 +102,14 @@ def run_mapping( responses = run_pg_conversion(responses, pg_num_alpha, sic_pg_num) responses = join_fgn_ownership(responses, ultfoc_mapper) responses = validate_join_cellno_mapper(responses, cellno_df, config) - responses = join_itl_regions(responses, postcode_mapper, itl_mapper, config) # unpack the responses full_responses, ni_full_responses = responses + # Join the ITL regions mapper to the BERD full_responses dataframe + full_responses = join_itl_regions(full_responses, postcode_mapper, itl_mapper, config) + + # Process the NI full responses if they exist if not ni_full_responses.empty: ni_full_responses = hlp.create_additional_ni_cols(ni_full_responses) @@ -134,4 +137,4 @@ def run_mapping( MappingMainLogger.info("Finished Mapping NI QA calculation.") # return mapped_df - return (full_responses, ni_full_responses) + return (full_responses, ni_full_responses, itl_mapper) diff --git a/src/pipeline.py b/src/pipeline.py index f48be45e1..c89742a25 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -152,7 +152,7 @@ def run_pipeline(user_config_path, dev_config_path): # Mapping module MainLogger.info("Starting Mapping...") - (mapped_df, ni_full_responses) = run_mapping( + (mapped_df, ni_full_responses, itl_mapper) = run_mapping( full_responses, ni_df, postcode_mapper, diff --git a/tests/test_mapping/test_itl_mapping.py b/tests/test_mapping/test_itl_mapping.py index db4de2a5c..9802450cf 100644 --- a/tests/test_mapping/test_itl_mapping.py +++ b/tests/test_mapping/test_itl_mapping.py @@ -38,14 +38,6 @@ def gb_input_data(self): df = pd.DataFrame(data=data, columns=columns) return df - @pytest.fixture(scope="function") - def ni_input_data(self) -> pd.DataFrame: - """UK input data for output_intram_by_itl tests.""" - columns = ["formtype", "211"] - data = [["0003", 213.0], ["0003", 25.0], ["0003", 75.0], ["0003", 167.0]] - df = pd.DataFrame(data=data, columns=columns) - return df - @pytest.fixture(scope="function") def postcode_mapper(self) -> pd.DataFrame: """Postcode mapper for output_intram_by_itl tests.""" @@ -109,37 +101,18 @@ def expected_gb_output(self): expected_output = pd.DataFrame(data=data, columns=columns) return expected_output - @pytest.fixture(scope="function") - def expected_ni_output(self): - """Expected output for join_itl_regions tests.""" - columns = ["formtype", "211", "itl"] - data = [ - ["0003", 213.0, "N92000002"], - ["0003", 25.0, "N92000002"], - ["0003", 75.0, "N92000002"], - ["0003", 167.0, "N92000002"], - ] - expected_output = pd.DataFrame(data=data, columns=columns) - return expected_output - def test_join_itl_regions( self, gb_input_data, - ni_input_data, postcode_mapper, itl_mapper, expected_gb_output, - expected_ni_output, config ): """General tests for join_itl_regions.""" - input_data = (gb_input_data, ni_input_data) - gb_output, ni_output = join_itl_regions(input_data, postcode_mapper, itl_mapper, config) + input_data = (gb_input_data) + gb_output = join_itl_regions(input_data, postcode_mapper, itl_mapper, config) assert gb_output.equals( expected_gb_output ), "join_itl_regions not behaving as expected." - - assert ni_output.equals( - expected_ni_output - ), "join_itl_regions not behaving as expected." diff --git a/tests/test_mapping/test_mapping_helpers.py b/tests/test_mapping/test_mapping_helpers.py index fc3035d83..67bdfe9a4 100644 --- a/tests/test_mapping/test_mapping_helpers.py +++ b/tests/test_mapping/test_mapping_helpers.py @@ -214,6 +214,16 @@ def test_update_ref_list_raises(self, full_input_df, ref_list_input): class TestCreateAdditionalNiCols(object): """Tests for create_additional_ni_cols function.""" + def config(self) -> dict: + """A dummy config for running join_itl_regions tests.""" + config = { + "mappers": { + "geo_cols": ["ITL221CD", "ITL221NM", "ITL121CD", "ITL121NM"], + "gb_itl": "LAU121CD", + "ni_itl": "N92000002", + } + } + return config def test_create_additional_ni_cols(self): """Test create_additional_ni_cols function.""" @@ -235,16 +245,18 @@ def test_create_additional_ni_cols(self): "form_status", "602", "formtype", + "itl", ] expected_data = [ - [1, 10, 1, "Yes", 600, 100.0, "0003"], - [2, 20, 1, "Yes", 600, 100.0, "0003"], - [3, 30, 1, "Yes", 600, 100.0, "0003"], + [1, 10, 1, "Yes", 600, 100.0, "0003", "N92000002"], + [2, 20, 1, "Yes", 600, 100.0, "0003", "N92000002"], + [3, 30, 1, "Yes", 600, 100.0, "0003", "N92000002"], ] expected_df = pd.DataFrame(data=expected_data, columns=expected_columns) # Call the function - output_df = create_additional_ni_cols(df) + config = self.config() + output_df = create_additional_ni_cols(df, config) # Check if the output matches the expected DataFrame assert output_df.equals( From 4ddfd420db3d99e1640a86c558c427d048085eb9 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Mon, 9 Sep 2024 17:07:23 +0100 Subject: [PATCH 02/16] 990 re-mapping of itl cols after imputation --- src/imputation/imputation_main.py | 29 ++++++++-------------- src/mapping/mapping_main.py | 2 +- src/pipeline.py | 40 ++++++++++++++++++++++++++++-- src/staging/postcode_validation.py | 8 +++--- src/staging/staging_helpers.py | 9 +++---- src/user_config.yaml | 6 ++--- 6 files changed, 61 insertions(+), 33 deletions(-) diff --git a/src/imputation/imputation_main.py b/src/imputation/imputation_main.py index 7e4c60d3d..ed9e33d1b 100644 --- a/src/imputation/imputation_main.py +++ b/src/imputation/imputation_main.py @@ -28,8 +28,6 @@ def run_imputation( config: Dict[str, Any], write_csv: Callable, run_id: int, - rd_file_exists: Callable, - rd_read_csv: Callable, ) -> pd.DataFrame: """Run all the processes for the imputation module. @@ -51,8 +49,6 @@ def run_imputation( config (dict): the configuration settings. write_csv (Callable): function to write a dataframe to a csv file run_id (int): unique identifier for the run - rd_file_exists (Callable): function to check if a file exists - rd_read_csv (Callable): function to read a csv file Returns: pd.DataFrame: dataframe with the imputed columns updated @@ -136,20 +132,6 @@ def run_imputation( ImputationMainLogger.info("Finished Imputation calculation.") - # run postcode construction now that impuation is complete - run_postcode_construction = config["global"]["run_postcode_construction"] - if run_postcode_construction: - imputed_df = run_construction( - imputed_df, - config, - rd_file_exists, - rd_read_csv, - is_run_postcode_construction = True, - ) - - # Re-calculate itl mapping now that imputation and postcode construction are done. - imputed_df = join_itl_regions(full_responses, postcode_mapper, itl_mapper, config) - # Output QA files tdate = datetime.now().strftime("%y-%m-%d") survey_year = config["years"]["survey_year"] @@ -181,6 +163,17 @@ def run_imputation( to_impute_cols, ) + # run postcode construction now that impuation is complete + run_postcode_construction = config["global"]["run_postcode_construction"] + if run_postcode_construction: + imputed_df = run_construction( + imputed_df, + config, + rd_file_exists, + rd_read_csv, + is_run_postcode_construction = True, + ) + # optionally output backdata for imputation if config["global"]["output_backdata"]: ImputationMainLogger.info("Outputting backdata for imputation.") diff --git a/src/mapping/mapping_main.py b/src/mapping/mapping_main.py index bf07f2299..130010c9b 100644 --- a/src/mapping/mapping_main.py +++ b/src/mapping/mapping_main.py @@ -111,7 +111,7 @@ def run_mapping( # Process the NI full responses if they exist if not ni_full_responses.empty: - ni_full_responses = hlp.create_additional_ni_cols(ni_full_responses) + ni_full_responses = hlp.create_additional_ni_cols(ni_full_responses, config) # output QA files qa_path = config["mapping_paths"]["qa_path"] diff --git a/src/pipeline.py b/src/pipeline.py index c89742a25..5c855c47e 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -10,10 +10,12 @@ from src.utils.wrappers import logger_creator from src.utils.path_helpers import filename_validation from src.staging.staging_main import run_staging +from src.staging.postcode_validation import run_full_postcode_process from src.freezing.freezing_main import run_freezing from src.northern_ireland.ni_main import run_ni from src.construction.construction_main import run_construction from src.mapping.mapping_main import run_mapping +from src.mapping.itl_mapping import join_itl_regions from src.imputation.imputation_main import run_imputation # noqa from src.outlier_detection.outlier_main import run_outliers from src.estimation.estimation_main import run_estimation @@ -173,11 +175,45 @@ def run_pipeline(user_config_path, dev_config_path): config, mods.rd_write_csv, run_id, - mods.rd_file_exists, - mods.rd_read_csv, ) MainLogger.info("Finished Imputation...") + # Perform postcode construction now imputation is complete + #TODO: could we have a script in the construction module that runs the postcode + # construction and then does the validation and itl mapping? + run_postcode_construction = config["global"]["run_postcode_construction"] + if run_postcode_construction: + imputed_df = run_construction( + imputed_df, + config, + mods.rd_file_exists, + mods.rd_read_csv, + is_run_postcode_construction = True, + ) + # validate the constructed and imputed postcodes + imputed_df, invalid_df= run_full_postcode_process( + imputed_df, + postcode_mapper, + config, + ) + # There shouldn't be invalid postcodes at this stage, if there are they will be + # printed to the screen + if not invalid_df.empty: + MainLogger.warning( + f"Invalid postcodes found in the imputed dataframe: {invalid_df}" + ) + # re-calculate the itl columns based on imputed and constructed columns + geo_cols = config["mappers"]["geo_cols"] + imputed_df = imputed_df.copy().drop(["itl"] + geo_cols, axis=1) + imputed_df = join_itl_regions( + imputed_df, + postcode_mapper, + itl_mapper, + config, + pc_col="postcodes_harmonised", + ) + #TODO: up to here in postcodes construction script in construction module + # Outlier detection module MainLogger.info("Starting Outlier Detection...") outliered_responses_df = run_outliers( diff --git a/src/staging/postcode_validation.py b/src/staging/postcode_validation.py index 4d57b7719..fc563f55d 100644 --- a/src/staging/postcode_validation.py +++ b/src/staging/postcode_validation.py @@ -304,7 +304,7 @@ def update_full_responses( @time_logger_wrap @exception_wrap def run_full_postcode_process( - df: pd.DataFrame, postcode_masterlist: pd.DataFrame, config: dict + df: pd.DataFrame, postcode_mapper: pd.DataFrame, config: dict ): """This function creates the postcodes_harmonised column containing valid UK postcodes only. @@ -322,8 +322,8 @@ def run_full_postcode_process( Args: df (pd.DataFrame): The DataFrame containing the postcodes. - postcode_masterlist (pd.DataFrame): The dataframe containing the correct - postocdes to check against + postcode_mapper (pd.DataFrame): The dataframe containing the correct + postocdes to check against config (dict): The postcode settings from the config settings Returns: @@ -332,6 +332,8 @@ def run_full_postcode_process( """ if not isinstance(df, pd.DataFrame): raise TypeError(f"The dataframe you are attempting to validate is {type(df)}") + + postcode_masterlist = postcode_mapper["pcd2"] # Create new column and fill with "601" and the nulls with "referencepostcode" df["postcodes_harmonised"] = df["601"].fillna(df["referencepostcode"]) diff --git a/src/staging/staging_helpers.py b/src/staging/staging_helpers.py index 4be1e89b1..364d40d65 100644 --- a/src/staging/staging_helpers.py +++ b/src/staging/staging_helpers.py @@ -261,8 +261,6 @@ def stage_validate_harmonise_postcodes( 1. Loads a master list of postcodes from a CSV file. 2. Validates the postcode column in the full_responses DataFrame against the master list. - 2. Validates the postcode column in the full_responses DataFrame against - the master list. 3. Writes any invalid postcodes to a CSV file. 4. Returns the original DataFrame and the master list of postcodes. @@ -290,11 +288,10 @@ def stage_validate_harmonise_postcodes( postcode_mapper = config["mapping_paths"]["postcode_mapper"] check_file_exists(postcode_mapper, raise_error=True) postcode_mapper = read_csv(postcode_mapper) - postcode_masterlist = postcode_mapper["pcd2"] - + # Validate the postcode column in the full_responses DataFrame full_responses, invalid_df = pcval.run_full_postcode_process( - full_responses, postcode_masterlist, config + full_responses, postcode_mapper, config ) # Log the saving of invalid postcodes to a file @@ -312,7 +309,7 @@ def stage_validate_harmonise_postcodes( # Log the end of postcode validation StagingHelperLogger.info("Finished PostCode Validation") - return full_responses, postcode_mapper + return full_responses, postcode_mapper, def filter_pnp_data(full_responses): diff --git a/src/user_config.yaml b/src/user_config.yaml index 7047fd769..dc97012c6 100644 --- a/src/user_config.yaml +++ b/src/user_config.yaml @@ -26,11 +26,11 @@ global: output_ni_full_responses: False output_mapping_qa: False output_mapping_ni_qa: False - output_imputation_qa: False + output_imputation_qa: True output_auto_outliers: False output_outlier_qa : False - output_estimation_qa: False - output_apportionment_qa: False + output_estimation_qa: True + output_apportionment_qa: True # Final output settings output_long_form: False output_short_form: False From a4a99d2291b4e0ab447fca43bcc378531b1fedd1 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Mon, 9 Sep 2024 17:38:17 +0100 Subject: [PATCH 03/16] fixes to postcode validation tests --- .../test_staging/test_postcode_validation.py | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/tests/test_staging/test_postcode_validation.py b/tests/test_staging/test_postcode_validation.py index 35ada6b7c..3cf7accde 100644 --- a/tests/test_staging/test_postcode_validation.py +++ b/tests/test_staging/test_postcode_validation.py @@ -72,12 +72,24 @@ def mock_get_masterlist(postcode_masterlist): return pd.Series(["NP10 8XG", "SW1P 4DF", "PO15 5RR"]) +@pytest.fixture +def postcode_mapper(): + # Return a mock postcode_mapper dataframe + pc_mapper = pd.DataFrame( + { + "pcd2": ["NP10 8XG", "SW1P 4DF", "PO15 5RR"], + "itl": ["W06000020", "E09000033", "E07000086"], # Mock ITL regions + } + ) + return pc_mapper + + # Test case for run_full_postcode_process -def test_run_full_postcode_process(test_data_df, monkeypatch, caplog): +def test_run_full_postcode_process(test_data_df, postcode_mapper, caplog): # Monkeypatch the get_masterlist function to use the mock implementation - monkeypatch.setattr( - "src.staging.postcode_validation.get_masterlist", mock_get_masterlist - ) + # monkeypatch.setattr( + # "src.staging.postcode_validation.get_masterlist", mock_get_masterlist + # ) # Make a fake path to the masterlist fake_path = "path/to/missing_masterlist.csv" @@ -85,7 +97,7 @@ def test_run_full_postcode_process(test_data_df, monkeypatch, caplog): config = generate_config(True) # Call the function under test - run_full_postcode_process(test_data_df, fake_path, config) + run_full_postcode_process(test_data_df, postcode_mapper, config) # Using caplog to check the logged warning messages if config["global"]["postcode_csv_check"]: @@ -109,7 +121,7 @@ def test_run_full_postcode_process(test_data_df, monkeypatch, caplog): "postcodes_harmonised": ["NP10 8XG", "PO15 5RR", "SW1P 4DF"], } ) - df, df_result = run_full_postcode_process(df_valid, fake_path, config) + df, df_result = run_full_postcode_process(df_valid, postcode_mapper, config) exp_output1 = pd.DataFrame( columns=[ "reference", @@ -136,7 +148,7 @@ def test_run_full_postcode_process(test_data_df, monkeypatch, caplog): "postcodes_harmonised": ["EFG 456", "HIJ 789"], } ) - run_full_postcode_process(df_invalid, fake_path, config) + run_full_postcode_process(df_invalid, postcode_mapper, config) assert ( "Total list of unique invalid postcodes found: ['EFG 456', 'HIJ 789']" in caplog.text @@ -144,7 +156,7 @@ def test_run_full_postcode_process(test_data_df, monkeypatch, caplog): # Mixed valid and invalid postcodes - as is in the test_data - run_full_postcode_process(test_data_df, fake_path, config) + run_full_postcode_process(test_data_df, postcode_mapper, config) if config["global"]["postcode_csv_check"]: assert ( "Total list of unique invalid postcodes found: ['KL1M 2NO', 'HIJ 789']" From 6bd20eb0fbf2ffaeb3b06a71d05816a012707fb0 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 10:34:55 +0100 Subject: [PATCH 04/16] 990 move postcode revalidation to helpers --- src/imputation/imputation_main.py | 11 -------- src/pipeline.py | 28 ++++--------------- src/utils/helpers.py | 45 +++++++++++++++++++++++++++---- 3 files changed, 45 insertions(+), 39 deletions(-) diff --git a/src/imputation/imputation_main.py b/src/imputation/imputation_main.py index ed9e33d1b..3e659b49d 100644 --- a/src/imputation/imputation_main.py +++ b/src/imputation/imputation_main.py @@ -163,17 +163,6 @@ def run_imputation( to_impute_cols, ) - # run postcode construction now that impuation is complete - run_postcode_construction = config["global"]["run_postcode_construction"] - if run_postcode_construction: - imputed_df = run_construction( - imputed_df, - config, - rd_file_exists, - rd_read_csv, - is_run_postcode_construction = True, - ) - # optionally output backdata for imputation if config["global"]["output_backdata"]: ImputationMainLogger.info("Outputting backdata for imputation.") diff --git a/src/pipeline.py b/src/pipeline.py index 5c855c47e..8ea47bfa4 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -10,12 +10,11 @@ from src.utils.wrappers import logger_creator from src.utils.path_helpers import filename_validation from src.staging.staging_main import run_staging -from src.staging.postcode_validation import run_full_postcode_process +from src.utils.helpers import validate_updated_postcodes from src.freezing.freezing_main import run_freezing from src.northern_ireland.ni_main import run_ni from src.construction.construction_main import run_construction from src.mapping.mapping_main import run_mapping -from src.mapping.itl_mapping import join_itl_regions from src.imputation.imputation_main import run_imputation # noqa from src.outlier_detection.outlier_main import run_outliers from src.estimation.estimation_main import run_estimation @@ -179,8 +178,6 @@ def run_pipeline(user_config_path, dev_config_path): MainLogger.info("Finished Imputation...") # Perform postcode construction now imputation is complete - #TODO: could we have a script in the construction module that runs the postcode - # construction and then does the validation and itl mapping? run_postcode_construction = config["global"]["run_postcode_construction"] if run_postcode_construction: imputed_df = run_construction( @@ -190,29 +187,14 @@ def run_pipeline(user_config_path, dev_config_path): mods.rd_read_csv, is_run_postcode_construction = True, ) - # validate the constructed and imputed postcodes - imputed_df, invalid_df= run_full_postcode_process( - imputed_df, - postcode_mapper, - config, - ) - # There shouldn't be invalid postcodes at this stage, if there are they will be - # printed to the screen - if not invalid_df.empty: - MainLogger.warning( - f"Invalid postcodes found in the imputed dataframe: {invalid_df}" - ) - # re-calculate the itl columns based on imputed and constructed columns - geo_cols = config["mappers"]["geo_cols"] - imputed_df = imputed_df.copy().drop(["itl"] + geo_cols, axis=1) - imputed_df = join_itl_regions( + + imputed_df = validate_updated_postcodes( imputed_df, postcode_mapper, itl_mapper, config, - pc_col="postcodes_harmonised", - ) - #TODO: up to here in postcodes construction script in construction module + MainLogger, + ) # Outlier detection module MainLogger.info("Starting Outlier Detection...") diff --git a/src/utils/helpers.py b/src/utils/helpers.py index 1b9db9328..bbd46465e 100644 --- a/src/utils/helpers.py +++ b/src/utils/helpers.py @@ -1,12 +1,14 @@ -"""Define helper functions that wrap regularly-used functions.""" - -from typing import Union - -import pandas as pd +"""Define helper functions to be used throughout the pipeline..""" import yaml import toml +import logging +import pandas as pd + +from typing import Union from src.utils.defence import type_defence +from src.staging.postcode_validation import run_full_postcode_process +from src.mapping.itl_mapping import join_itl_regions # Define paths user_config_path = "config/userconfig.toml" @@ -113,6 +115,39 @@ def values_in_column( return result +def validate_updated_postcodes( + df: pd.DataFrame, + postcode_mapper: pd.DataFrame, + itl_mapper: pd.DataFrame, + config: dict, + MainLogger: logging.Logger + ) -> pd.DataFrame: + + # validate the constructed and imputed postcodes + df, invalid_df= run_full_postcode_process( + df, + postcode_mapper, + config, + ) + # There shouldn't be invalid postcodes at this stage, if there are they will be + # printed to the screen + if not invalid_df.empty: + MainLogger.warning( + f"Invalid postcodes found in the imputed dataframe: {invalid_df}" + ) + # re-calculate the itl columns based on imputed and constructed columns + geo_cols = config["mappers"]["geo_cols"] + df = df.copy().drop(["itl"] + geo_cols, axis=1) + df = join_itl_regions( + df, + postcode_mapper, + itl_mapper, + config, + pc_col="postcodes_harmonised", + ) + return df + + def tree_to_list( tree: dict, path_list: list = [], prefix: str = "" ) -> list: From b3059f91ee323f13bdaef7a3a478275ed12032eb Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 11:43:07 +0100 Subject: [PATCH 05/16] 990 full postcode validation after imputation --- src/imputation/imputation_helpers.py | 9 +++------ src/imputation/imputation_main.py | 5 +---- src/pipeline.py | 1 - src/user_config.yaml | 2 +- src/utils/helpers.py | 30 ++++++++++++++-------------- 5 files changed, 20 insertions(+), 27 deletions(-) diff --git a/src/imputation/imputation_helpers.py b/src/imputation/imputation_helpers.py index 835ac9edc..e77b95a4d 100644 --- a/src/imputation/imputation_helpers.py +++ b/src/imputation/imputation_helpers.py @@ -363,10 +363,7 @@ def calculate_totals(df): return df -def tidy_imputation_dataframe( - df: pd.DataFrame, - to_impute_cols: List, -) -> pd.DataFrame: +def tidy_imputation_dataframe(df: pd.DataFrame, to_impute_cols: List) -> pd.DataFrame: """Update cols with imputed values and remove rows and columns no longer needed. Args: @@ -387,8 +384,8 @@ def tidy_imputation_dataframe( col for col in df.columns if ( - col.endswith("prev") - | col.endswith("imputed") + # col.endswith("prev") + col.endswith("imputed") | col.endswith("link") | col.endswith("sf_exp_grouping") | col.endswith("trim") diff --git a/src/imputation/imputation_main.py b/src/imputation/imputation_main.py index 3e659b49d..88adcdcde 100644 --- a/src/imputation/imputation_main.py +++ b/src/imputation/imputation_main.py @@ -158,10 +158,7 @@ def run_imputation( write_csv(os.path.join(qa_path, trimmed_counts_filename), trim_counts_qa) # remove rows and columns no longer needed from the imputed dataframe - imputed_df = hlp.tidy_imputation_dataframe( - imputed_df, - to_impute_cols, - ) + imputed_df = hlp.tidy_imputation_dataframe(imputed_df, to_impute_cols) # optionally output backdata for imputation if config["global"]["output_backdata"]: diff --git a/src/pipeline.py b/src/pipeline.py index 8ea47bfa4..b94e2dc1f 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -193,7 +193,6 @@ def run_pipeline(user_config_path, dev_config_path): postcode_mapper, itl_mapper, config, - MainLogger, ) # Outlier detection module diff --git a/src/user_config.yaml b/src/user_config.yaml index dc97012c6..b6e36784b 100644 --- a/src/user_config.yaml +++ b/src/user_config.yaml @@ -26,7 +26,7 @@ global: output_ni_full_responses: False output_mapping_qa: False output_mapping_ni_qa: False - output_imputation_qa: True + output_imputation_qa: False output_auto_outliers: False output_outlier_qa : False output_estimation_qa: True diff --git a/src/utils/helpers.py b/src/utils/helpers.py index bbd46465e..f4615573b 100644 --- a/src/utils/helpers.py +++ b/src/utils/helpers.py @@ -1,13 +1,12 @@ """Define helper functions to be used throughout the pipeline..""" import yaml import toml -import logging import pandas as pd from typing import Union from src.utils.defence import type_defence -from src.staging.postcode_validation import run_full_postcode_process +from src.staging.postcode_validation import format_postcodes, run_full_postcode_process from src.mapping.itl_mapping import join_itl_regions # Define paths @@ -120,21 +119,22 @@ def validate_updated_postcodes( postcode_mapper: pd.DataFrame, itl_mapper: pd.DataFrame, config: dict, - MainLogger: logging.Logger ) -> pd.DataFrame: + """Update the postcodes_harmonised column and re-map the itl columns. - # validate the constructed and imputed postcodes - df, invalid_df= run_full_postcode_process( - df, - postcode_mapper, - config, - ) - # There shouldn't be invalid postcodes at this stage, if there are they will be - # printed to the screen - if not invalid_df.empty: - MainLogger.warning( - f"Invalid postcodes found in the imputed dataframe: {invalid_df}" - ) + Args: + df (pd.DataFrame): The full responses dataframe. + postcode_mapper (pd.DataFrame): The postcode mapper dataframe mapping to itl. + itl_mapper (pd.DataFrame): The ITL mapper dataframe mapping to ITL regions. + config (dict): The pipeline configuration settings. + + Returns: + pd.DataFrame: The updated full responses dataframe with the postcodes_harmonised + column updated and the itl columns re-mapped. + """ + df, invalid_postcodes = run_full_postcode_process(df, postcode_mapper, config) + #TODO: output invalid postcodes after impuatation. + # re-calculate the itl columns based on imputed and constructed columns geo_cols = config["mappers"]["geo_cols"] df = df.copy().drop(["itl"] + geo_cols, axis=1) From 21a1370fb37f9cbc3137c373884aa0f01362a359 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 11:51:38 +0100 Subject: [PATCH 06/16] 990 remove _prev cols after imputation --- src/imputation/imputation_helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/imputation/imputation_helpers.py b/src/imputation/imputation_helpers.py index e77b95a4d..41be0a2ef 100644 --- a/src/imputation/imputation_helpers.py +++ b/src/imputation/imputation_helpers.py @@ -384,8 +384,8 @@ def tidy_imputation_dataframe(df: pd.DataFrame, to_impute_cols: List) -> pd.Data col for col in df.columns if ( - # col.endswith("prev") - col.endswith("imputed") + col.endswith("prev") + | col.endswith("imputed") | col.endswith("link") | col.endswith("sf_exp_grouping") | col.endswith("trim") From bc688ce4590d1eb3154df295e4bc45d37c835c69 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 12:39:40 +0100 Subject: [PATCH 07/16] only update imputed postcodes --- src/utils/helpers.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/utils/helpers.py b/src/utils/helpers.py index f4615573b..80596e840 100644 --- a/src/utils/helpers.py +++ b/src/utils/helpers.py @@ -132,19 +132,26 @@ def validate_updated_postcodes( pd.DataFrame: The updated full responses dataframe with the postcodes_harmonised column updated and the itl columns re-mapped. """ - df, invalid_postcodes = run_full_postcode_process(df, postcode_mapper, config) + # filter out records that have been constructed or imputed with backdata + mask = df["imp_marker"].isin(["CF", "MoR", "constructed"]) + filtered_df = df.copy().loc[mask] + filtered_df, invalid_postcodes = run_full_postcode_process(filtered_df, postcode_mapper, config) #TODO: output invalid postcodes after impuatation. # re-calculate the itl columns based on imputed and constructed columns geo_cols = config["mappers"]["geo_cols"] - df = df.copy().drop(["itl"] + geo_cols, axis=1) - df = join_itl_regions( - df, + filtered_df = filtered_df.copy().drop(["itl"] + geo_cols, axis=1) + filtered_df = join_itl_regions( + filtered_df, postcode_mapper, itl_mapper, config, pc_col="postcodes_harmonised", ) + + filtered_df = filtered_df[list(df.columns)] + + df = pd.concat([df.loc[~mask], filtered_df]) return df From 146e81c72e191946dc9fe5550cd83599101708e1 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 13:29:36 +0100 Subject: [PATCH 08/16] 990 removed pc validation again after imputation --- src/utils/helpers.py | 33 ++++++++++++--------------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/src/utils/helpers.py b/src/utils/helpers.py index 80596e840..c0ec4299f 100644 --- a/src/utils/helpers.py +++ b/src/utils/helpers.py @@ -6,7 +6,6 @@ from typing import Union from src.utils.defence import type_defence -from src.staging.postcode_validation import format_postcodes, run_full_postcode_process from src.mapping.itl_mapping import join_itl_regions # Define paths @@ -91,9 +90,7 @@ def convert_formtype(formtype_value: str) -> str: def values_in_column( - df: pd.DataFrame, - col_name: str, - values: Union[list, pd.Series] + df: pd.DataFrame, col_name: str, values: Union[list, pd.Series] ) -> bool: """Determine whether a list of values are all present in a dataframe column. @@ -115,13 +112,13 @@ def values_in_column( def validate_updated_postcodes( - df: pd.DataFrame, - postcode_mapper: pd.DataFrame, - itl_mapper: pd.DataFrame, - config: dict, - ) -> pd.DataFrame: + df: pd.DataFrame, + postcode_mapper: pd.DataFrame, + itl_mapper: pd.DataFrame, + config: dict, +) -> pd.DataFrame: """Update the postcodes_harmonised column and re-map the itl columns. - + Args: df (pd.DataFrame): The full responses dataframe. postcode_mapper (pd.DataFrame): The postcode mapper dataframe mapping to itl. @@ -135,11 +132,9 @@ def validate_updated_postcodes( # filter out records that have been constructed or imputed with backdata mask = df["imp_marker"].isin(["CF", "MoR", "constructed"]) filtered_df = df.copy().loc[mask] - filtered_df, invalid_postcodes = run_full_postcode_process(filtered_df, postcode_mapper, config) - #TODO: output invalid postcodes after impuatation. # re-calculate the itl columns based on imputed and constructed columns - geo_cols = config["mappers"]["geo_cols"] + geo_cols = config["mappers"]["geo_cols"] filtered_df = filtered_df.copy().drop(["itl"] + geo_cols, axis=1) filtered_df = join_itl_regions( filtered_df, @@ -147,7 +142,7 @@ def validate_updated_postcodes( itl_mapper, config, pc_col="postcodes_harmonised", - ) + ) filtered_df = filtered_df[list(df.columns)] @@ -155,13 +150,11 @@ def validate_updated_postcodes( return df -def tree_to_list( - tree: dict, path_list: list = [], prefix: str = "" -) -> list: +def tree_to_list(tree: dict, path_list: list = [], prefix: str = "") -> list: """ Convert a dictionary of paths to a list. - This function converts a directory tree that is provided as a dictionary to a + This function converts a directory tree that is provided as a dictionary to a list of full paths. This is done recursively, so the number of tiers is not pre-defined. Returns a list of absolute directory paths. Directory and subdirectory names must be the keys in the dictionary. @@ -191,7 +184,7 @@ def tree_to_list( path_list (list): A list of full paths that is populated when the function runs. Must be empty when you call the function. prefix (str): The common prefix. It should start with the platform- - specific root, such as "R:/dap_emulation" or "dapsen/workspace_zone_res_dev", + specific root, such as "R:/dap_emulation" or "dapsen/workspace_zone_res_dev" followed by the year_surveys. Do not add a forward slash at the end. Returns: @@ -203,12 +196,10 @@ def tree_to_list( # Input must be a dictionary of dictionaries or an empty dictionary if isinstance(tree, dict): - # The recursive iteration will proceed if the current tree is not empty. # The recursive iterations will stop once we reach the lowest level # indicated by an empty dictionary. if tree: - # For a non-empty dictionary, iterating through all top-level keys. for key in tree: if prefix == "": From 533de6e2d84cee267eba6f5976b023135051081c Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 14:37:43 +0100 Subject: [PATCH 09/16] update postcodes_harmonised after carry forward --- src/imputation/MoR.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/imputation/MoR.py b/src/imputation/MoR.py index 7b98890ff..acbc9ead6 100644 --- a/src/imputation/MoR.py +++ b/src/imputation/MoR.py @@ -137,6 +137,10 @@ def carry_forwards(df, backdata, impute_vars): # Update the postcodes_harmonised column from the updated column 601 df.loc[match_cond, "postcodes_harmonised"] = df.loc[match_cond, "601"] + # fill nulls with "referencepostcode" as we do when we first construct the column + df["postcodes_harmonised"] = df["postcodes_harmonised"].fillna( + df["referencepostcode"] + ) # Update the imputation classes based on the new 200 and 201 values df = create_imp_class_col(df, "200", "201") From 07a60f35f0ff7ab5c70d4ded648ff9f0eb28e901 Mon Sep 17 00:00:00 2001 From: Cheshire Date: Tue, 10 Sep 2024 14:50:26 +0100 Subject: [PATCH 10/16] RDRP-990: consistent freezing logger --- src/freezing/freezing_apply_changes.py | 74 +++++++++++++------------- src/freezing/freezing_compare.py | 11 ++-- src/freezing/freezing_main.py | 6 +-- 3 files changed, 48 insertions(+), 43 deletions(-) diff --git a/src/freezing/freezing_apply_changes.py b/src/freezing/freezing_apply_changes.py index 231119ae4..9b55c1b0a 100644 --- a/src/freezing/freezing_apply_changes.py +++ b/src/freezing/freezing_apply_changes.py @@ -14,7 +14,7 @@ def apply_freezing( check_file_exists: Callable, read_csv: Callable, run_id: int, - freezing_logger: logging.Logger, + FreezingLogger: logging.Logger, ) -> pd.DataFrame: """Read user-edited freezing files and apply them to the main snapshot. Args: @@ -25,15 +25,15 @@ def apply_freezing( read_csv (callable): Function to read a csv file. This will be the hdfs or network version depending on settings. run_id (int): The run id for this run. - freezing_logger (logging.Logger): The logger to log to. + FreezingLogger (logging.Logger): The logger to log to. Returns: constructed_df (pd.DataFrame): As main_df but with records amended and added from the freezing files. """ # Prepare filepaths to read from - network_or_hdfs = config["global"]["network_or_hdfs"] - paths = config[f"{network_or_hdfs}_paths"] + platform = config["global"]["platform"] + paths = config[f"{platform}_paths"] amendments_filepath = paths["freezing_amendments_path"] additions_filepath = paths["freezing_additions_path"] @@ -43,38 +43,38 @@ def apply_freezing( # If each file exists, read it and call the function to apply them if not (amendments_exist or additions_exist): - freezing_logger.info("No amendments or additions to apply, skipping...") + FreezingLogger.info("No amendments or additions to apply, skipping...") return main_df # apply amendments if amendments_exist: amendments_df = read_csv(amendments_filepath) if amendments_df.empty: - freezing_logger.warning( + FreezingLogger.warning( f"Amendments file ({amendments_filepath}) is empty, skipping..." ) else: main_df = apply_amendments( - main_df, + main_df, amendments_df, - run_id, - freezing_logger, + run_id, + FreezingLogger, ) # apply additions if additions_exist: additions_df = read_csv(additions_filepath) if additions_df.empty: - freezing_logger.warning( + FreezingLogger.warning( f"Additions file {additions_filepath} is empty, skipping..." ) else: additions_df["instance"] = additions_df["instance"].astype("Int64") main_df = apply_additions( - main_df, - additions_df, - run_id, - freezing_logger + main_df, + additions_df, + run_id, + FreezingLogger ) return main_df @@ -116,7 +116,7 @@ def validate_all_refinst_in_frozen( df2 (pd.DataFrame): The ammendments/additions df. Returns: - bool: Whether all ref/inst are in the dataframe. + bool: Whether all ref/inst are in the dataframe. """ frozen_copy = frozen_df.copy() frozen_copy["refinst"] = ( @@ -133,26 +133,26 @@ def validate_all_refinst_in_frozen( def validate_amendments_df( frozen_df: pd.DataFrame, amendments_df: pd.DataFrame, - freezing_logger: logging.Logger, + FreezingLogger: logging.Logger, ) -> bool: """Validate the amendments df. Args: frozen_df (pd.DataFrame): The frozen csv df. amendments_df (pd.DataFrame): The amendments df. - freezing_logger (logging.Logger): The logger to log to. + FreezingLogger (logging.Logger): The logger to log to. Returns: bool: Whether or not the amendments df is valid. """ # check that all ref/inst combs are in staged frozen data - freezing_logger.info( + FreezingLogger.info( "Checking if all ref/inst in the amendments df are present in the frozen" " data..." ) present = validate_all_refinst_in_frozen(frozen_df, amendments_df) - if not present: - freezing_logger.info( + if not present: + FreezingLogger.info( "Not all reference/instance combinations found within the amendments" " file are present in the snapshot." ) @@ -162,27 +162,27 @@ def validate_amendments_df( def validate_additions_df( frozen_df: pd.DataFrame, additions_df: pd.DataFrame, - freezing_logger: logging.Logger, + FreezingLogger: logging.Logger, ) -> None: """Validate the additions df. Args: frozen_df (pd.DataFrame): The frozen csv df. additions_df (pd.DataFrame): The additions df. - freezing_logger (logging.Logger): The logger to log to. + FreezingLogger (logging.Logger): The logger to log to. Returns: bool: Whether or not the additions df is valid. """ # check that all ref/inst combs are not staged frozen data - freezing_logger.info( + FreezingLogger.info( "Checking if all ref/inst in the amendments df are missing from the frozen" " data..." ) any_present = validate_any_refinst_in_frozen(frozen_df, additions_df) - if any_present: - freezing_logger.info( + if any_present: + FreezingLogger.info( "Some reference/instance combinations from the additions file are " "present in the frozen data." ) @@ -194,7 +194,7 @@ def apply_amendments( main_df: pd.DataFrame, amendments_df: pd.DataFrame, run_id: int, - freezing_logger: logging.Logger + FreezingLogger: logging.Logger ) -> pd.DataFrame: """Apply amendments to the main snapshot. @@ -202,13 +202,13 @@ def apply_amendments( main_df (pd.DataFrame): The main snapshot. amendments_df (pd.DataFrame): The amendments to apply. run_id (int): The current run id. - freezing_logger (logging.Logger): The logger. + FreezingLogger (logging.Logger): The logger. Returns: amended_df (pd.DataFrame): The main snapshot with amendments applied. """ - if not validate_amendments_df(main_df, amendments_df, freezing_logger): - freezing_logger.info("Skipping amendments since the amendments csv is invalid...") + if not validate_amendments_df(main_df, amendments_df, FreezingLogger): + FreezingLogger.info("Skipping amendments since the amendments csv is invalid...") return main_df changes_refs = amendments_df[ @@ -220,7 +220,7 @@ def apply_amendments( ] if accepted_amendments_df.shape[0] == 0: - freezing_logger.info( + FreezingLogger.info( "Amendments file contained no records marked for inclusion" ) return main_df @@ -242,7 +242,7 @@ def apply_amendments( main_df = main_df[~main_df.reference.isin(changes_refs)] # add amended records to main df amended_df = pd.concat([main_df, accepted_amendments_df]) - freezing_logger.info( + FreezingLogger.info( f"{accepted_amendments_df.shape[0]} records amended during freezing" ) return amended_df @@ -252,7 +252,7 @@ def apply_additions( main_df: pd.DataFrame, additions_df: pd.DataFrame, run_id: int, - freezing_logger: logging.Logger + FreezingLogger: logging.Logger ) -> pd.DataFrame: """Apply additions to the main snapshot. @@ -260,13 +260,13 @@ def apply_additions( main_df (pd.DataFrame): The main snapshot. additions_df (pd.DataFrame): The additions to apply. run_id (int): The current run id. - freezing_logger (logging.Logger): The logger. + FreezingLogger (logging.Logger): The logger. Returns: added_df (pd.DataFrame): The main snapshot with additions applied. """ - if not validate_additions_df(main_df, additions_df, freezing_logger): - freezing_logger.info("Skipping additions since the additions csv is invalid...") + if not validate_additions_df(main_df, additions_df, FreezingLogger): + FreezingLogger.info("Skipping additions since the additions csv is invalid...") return main_df # Drop records where accept_changes is False and if any remain, add them to main df changes_refs = additions_df[ @@ -280,10 +280,10 @@ def apply_additions( if accepted_additions_df.shape[0] > 0: accepted_additions_df = _add_last_frozen_column(accepted_additions_df, run_id) added_df = pd.concat([main_df, accepted_additions_df], ignore_index=True) - freezing_logger.info( + FreezingLogger.info( f"{accepted_additions_df.shape[0]} records added during freezing" ) else: - freezing_logger.info("Additions file contained no records marked for inclusion") + FreezingLogger.info("Additions file contained no records marked for inclusion") return main_df return added_df diff --git a/src/freezing/freezing_compare.py b/src/freezing/freezing_compare.py index 18b326d57..2612a6f97 100644 --- a/src/freezing/freezing_compare.py +++ b/src/freezing/freezing_compare.py @@ -4,11 +4,11 @@ import pandas as pd from typing import Callable -FreezingLogger = logging.getLogger(__name__) def get_amendments( frozen_csv: pd.DataFrame, updated_snapshot: pd.DataFrame, + FreezingLogger: logging.Logger ) -> pd.DataFrame: """Get amended records from updated snapshot. @@ -19,6 +19,7 @@ def get_amendments( frozen_csv (pd.DataFrame): The staged and validated frozen data. updated_snapshot (pd.DataFrame): The staged and validated updated snapshot data. + FreezingLogger (logging.Logger): The logger to log to. Returns: amendments_df (pd.DataFrame): The records that have changed. @@ -100,13 +101,14 @@ def get_amendments( return amendments_df else: - freezing_logger.info("No amendments found.") + FreezingLogger.info("No amendments found.") return None def get_additions( frozen_csv: pd.DataFrame, updated_snapshot: pd.DataFrame, + FreezingLogger: logging.Logger ) -> pd.DataFrame: """Get added records from the updated snapshot. @@ -115,6 +117,7 @@ def get_additions( Args: frozen_csv (pd.DataFrame): The staged and validated frozen data. updated_snapshot (pd.DataFrame): The staged and validated updated snapshot data. + FreezingLogger (logging.Logger): The logger to log to. Returns: additions_df (pd.DataFrame): The new records identified in the updated snapshot data. @@ -146,7 +149,7 @@ def get_additions( additions_df["accept_changes"] = False return additions_df else: - freezing_logger.info("No additions found.") + FreezingLogger.info("No additions found.") return None @@ -156,6 +159,7 @@ def output_freezing_files( config: dict, write_csv: Callable, run_id: int, + FreezingLogger: logging.Logger ) -> bool: """Save CSVs of amendments and additions for user approval. @@ -166,6 +170,7 @@ def output_freezing_files( write_csv (callable): Function to write to a csv file. This will be the hdfs or network version depending on settings. run_id (int): The run id for this run. + FreezingLogger (logging.Logger): The logger to log to. Returns: bool: True if the files were written successfully. diff --git a/src/freezing/freezing_main.py b/src/freezing/freezing_main.py index 8557f34b6..04b151c31 100644 --- a/src/freezing/freezing_main.py +++ b/src/freezing/freezing_main.py @@ -53,10 +53,10 @@ def run_freezing( frozen_data_for_comparison = frozen_data_for_comparison.convert_dtypes() # Use the updated snapshot to generate freezing files for the next run - additions_df = get_additions(frozen_data_for_comparison, updated_snapshot) - amendments_df = get_amendments(frozen_data_for_comparison, updated_snapshot) + additions_df = get_additions(frozen_data_for_comparison, updated_snapshot, FreezingLogger) + amendments_df = get_amendments(frozen_data_for_comparison, updated_snapshot, FreezingLogger) output_freezing_files( - amendments_df, additions_df, config, write_csv, run_id + amendments_df, additions_df, config, write_csv, run_id, FreezingLogger ) prepared_frozen_data = snapshot_df.copy() From df25c12ac2551052c5691e3fc7491bfc695a6595 Mon Sep 17 00:00:00 2001 From: Cheshire Date: Tue, 10 Sep 2024 14:57:33 +0100 Subject: [PATCH 11/16] RDRP-990: fix freezing tests --- tests/test_freezing/test_freezing_compare.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/test_freezing/test_freezing_compare.py b/tests/test_freezing/test_freezing_compare.py index 9e9ff7711..8f245479d 100644 --- a/tests/test_freezing/test_freezing_compare.py +++ b/tests/test_freezing/test_freezing_compare.py @@ -2,9 +2,12 @@ import pandas as pd from pandas._testing import assert_frame_equal +import logging from src.freezing.freezing_compare import get_amendments, get_additions +# create a test logger to pass to functions +test_logger = logging.getLogger(__name__) class TestGetAmendments: """Tests for get_amendments().""" @@ -89,7 +92,7 @@ def test_get_amendments(self): # Run the function result = get_amendments( - input_frozen_df, input_amendments_df + input_frozen_df, input_amendments_df, test_logger ) expected_outcome_df = expected_outcome_df[result.columns] @@ -157,7 +160,7 @@ def test_get_additions(self): # Run the function result = get_additions( - input_frozen_df, input_additions_df + input_frozen_df, input_additions_df, test_logger ) # Check the output From 57b510213f25a71a43e944a16eabebe35548c0dd Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 14:58:48 +0100 Subject: [PATCH 12/16] 990 update MoR test --- tests/test_imputation/test_MoR.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_imputation/test_MoR.py b/tests/test_imputation/test_MoR.py index f57471d54..756711c96 100644 --- a/tests/test_imputation/test_MoR.py +++ b/tests/test_imputation/test_MoR.py @@ -23,6 +23,7 @@ def input_lf_mor_df(self) -> pd.DataFrame: fpath = os.path.join("tests/data/imputation/lf_mor_input_anon.csv") df = pd.read_csv(fpath) df = df.astype({"reference": "Int64", "instance": "Int64"}) + df["referencepostcode"] = pd.NA return df @pytest.fixture(scope="function") @@ -86,6 +87,7 @@ def input_sf_mor_df(self) -> pd.DataFrame: fpath = os.path.join("tests/data/imputation/sf_mor_input_anon.csv") df = pd.read_csv(fpath) df = df.astype({"reference": "Int64", "instance": "Int64"}) + df["referencepostcode"] = pd.NA return df @pytest.fixture(scope="function") From c7f9754446107f7cb75cf6e27ae960a801984c9b Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 15:47:53 +0100 Subject: [PATCH 13/16] 990 only update 601 if not null --- src/imputation/MoR.py | 7 ++----- src/mapping/itl_mapping.py | 9 +++++---- src/utils/helpers.py | 1 + 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/imputation/MoR.py b/src/imputation/MoR.py index acbc9ead6..6ea280503 100644 --- a/src/imputation/MoR.py +++ b/src/imputation/MoR.py @@ -136,11 +136,8 @@ def carry_forwards(df, backdata, impute_vars): df.loc[match_cond, var] = df.loc[match_cond, f"{var}_prev"] # Update the postcodes_harmonised column from the updated column 601 - df.loc[match_cond, "postcodes_harmonised"] = df.loc[match_cond, "601"] - # fill nulls with "referencepostcode" as we do when we first construct the column - df["postcodes_harmonised"] = df["postcodes_harmonised"].fillna( - df["referencepostcode"] - ) + pc_update_cond = match_cond & df["601"].notnull() + df.loc[pc_update_cond, "postcodes_harmonised"] = df.loc[match_cond, "601"] # Update the imputation classes based on the new 200 and 201 values df = create_imp_class_col(df, "200", "201") diff --git a/src/mapping/itl_mapping.py b/src/mapping/itl_mapping.py index a5f6c140c..522ea4cfd 100644 --- a/src/mapping/itl_mapping.py +++ b/src/mapping/itl_mapping.py @@ -1,6 +1,5 @@ """Code to join the ITL regions onto the full dataframe using the mapper provided.""" import pandas as pd -from typing import Tuple from src.mapping.mapping_helpers import join_with_null_check @@ -11,7 +10,8 @@ def join_itl_regions( itl_mapper: pd.DataFrame, config: dict, pc_col: str = "postcodes_harmonised", -): + warn_only: bool = False, +) -> pd.DataFrame: """Joins the itl regions onto the full dataframe using the mapper provided. First, the itl column is added to the dataframe by joining the postcode_mapper. @@ -23,6 +23,7 @@ def join_itl_regions( itl_mapper (pd.DataFrame): Mapper containing ITL regions config (dict): Pipeline configuration settings pc_col (str, optional): The column name for the postcodes. + nullcheck (bool, optional): Whether to perform null checks on the mappers. Returns: pd.DataFrame: the responses dataframe with the ITL regions joined @@ -30,9 +31,9 @@ def join_itl_regions( Unit Test: See [test_itl_mapping](./tests/mapping/test_itl_mapping.py) """ - # first create itl column + # first create itl column postcode_mapper = postcode_mapper.rename(columns={"pcd2": pc_col}) - df = join_with_null_check(df, postcode_mapper, "postcode mapper", pc_col) + df = join_with_null_check(df, postcode_mapper, "postcode mapper", pc_col, warn_only) # next join the itl mapper to add the region columns gb_itl_col = config["mappers"]["gb_itl"] diff --git a/src/utils/helpers.py b/src/utils/helpers.py index c0ec4299f..65d77c957 100644 --- a/src/utils/helpers.py +++ b/src/utils/helpers.py @@ -142,6 +142,7 @@ def validate_updated_postcodes( itl_mapper, config, pc_col="postcodes_harmonised", + warn_only=True, ) filtered_df = filtered_df[list(df.columns)] From f2fae31f2310cbc8303d9acd79028405efd9d590 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 15:49:52 +0100 Subject: [PATCH 14/16] update version to 1.1.7 --- src/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/_version.py b/src/_version.py index 1436d8fe8..bf7882637 100644 --- a/src/_version.py +++ b/src/_version.py @@ -1 +1 @@ -__version__ = "1.1.6" +__version__ = "1.1.7" From 2dfa6d4f79ccc9ca3df5f300d845da0a60e590a9 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 17:12:09 +0100 Subject: [PATCH 15/16] 990 include constructed postcodes in itl mapping --- src/user_config.yaml | 4 ++-- src/utils/helpers.py | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/user_config.yaml b/src/user_config.yaml index b6e36784b..7047fd769 100644 --- a/src/user_config.yaml +++ b/src/user_config.yaml @@ -29,8 +29,8 @@ global: output_imputation_qa: False output_auto_outliers: False output_outlier_qa : False - output_estimation_qa: True - output_apportionment_qa: True + output_estimation_qa: False + output_apportionment_qa: False # Final output settings output_long_form: False output_short_form: False diff --git a/src/utils/helpers.py b/src/utils/helpers.py index 65d77c957..b0ec45771 100644 --- a/src/utils/helpers.py +++ b/src/utils/helpers.py @@ -130,7 +130,12 @@ def validate_updated_postcodes( column updated and the itl columns re-mapped. """ # filter out records that have been constructed or imputed with backdata - mask = df["imp_marker"].isin(["CF", "MoR", "constructed"]) + imp_marker_mask = df["imp_marker"].isin(["CF", "MoR", "constructed"]) + if "is_constructed" in df.columns: + constructed_mask = df["is_constructed"].isin([True]) + mask = imp_marker_mask | constructed_mask + else: + mask = imp_marker_mask filtered_df = df.copy().loc[mask] # re-calculate the itl columns based on imputed and constructed columns From ca0f3c517b134a989888711987a68bc8e292c89f Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 10 Sep 2024 17:37:49 +0100 Subject: [PATCH 16/16] 990 update docstring --- src/mapping/itl_mapping.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mapping/itl_mapping.py b/src/mapping/itl_mapping.py index 522ea4cfd..1b2921eae 100644 --- a/src/mapping/itl_mapping.py +++ b/src/mapping/itl_mapping.py @@ -23,7 +23,7 @@ def join_itl_regions( itl_mapper (pd.DataFrame): Mapper containing ITL regions config (dict): Pipeline configuration settings pc_col (str, optional): The column name for the postcodes. - nullcheck (bool, optional): Whether to perform null checks on the mappers. + warn_only (bool, optional): Whether to warn only rather than error on nulls. Returns: pd.DataFrame: the responses dataframe with the ITL regions joined