From eae9f82eb644dc1bc1ae8f8bb4522ab67b9e0555 Mon Sep 17 00:00:00 2001 From: Sarah Johnson Date: Mon, 21 Oct 2024 07:55:54 +0100 Subject: [PATCH] WIP --- .gitignore | 4 +- dpypelines/pipeline/configuration.py | 30 +- dpypelines/pipeline/csvcubed_ingress_v1.py | 466 ++++++++++++++++++ .../shared/transforms/sanity_check.py | 24 + dpypelines/s3_tar_received.py | 4 +- myscript.py | 33 +- 6 files changed, 542 insertions(+), 19 deletions(-) create mode 100644 dpypelines/pipeline/csvcubed_ingress_v1.py diff --git a/.gitignore b/.gitignore index e4bd55cd..e95f317e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ __pycache__ .coverage behave_debug.py features/fixtures/data/ -data \ No newline at end of file +data +output +.venv \ No newline at end of file diff --git a/dpypelines/pipeline/configuration.py b/dpypelines/pipeline/configuration.py index 065c8798..85710c95 100644 --- a/dpypelines/pipeline/configuration.py +++ b/dpypelines/pipeline/configuration.py @@ -1,10 +1,17 @@ +from pathlib import Path import re +from dpypelines.pipeline.csvcubed_ingress_v1 import csvcubed_ingress_v1 from dpypelines.pipeline.dataset_ingress_v1 import dataset_ingress_v1 from dpypelines.pipeline.generic_file_ingress_v1 import generic_file_ingress_v1 -from dpypelines.pipeline.shared.transforms.sanity_check import sdmx_sanity_check_v1 +from dpypelines.pipeline.shared.transforms.sanity_check import ( + csv_sanity_check_v1, + json_sanity_check_v1, + sdmx_sanity_check_v1, +) from dpypelines.pipeline.shared.transforms.sdmx.v20 import sdmx_compact_2_0_prototype_1 from dpypelines.pipeline.shared.transforms.sdmx.v21 import sdmx_generic_2_1_prototype_1 +from csvcubed.cli.buildcsvw.build import build_csvw # Set a regex pattern matching the `source_id` as `CONFIGURATION` dictionary key # All fields are required in order for a pipeline transform to run successfully @@ -38,6 +45,27 @@ "supplementary_distributions": {}, "secondary_function": generic_file_ingress_v1, }, + "^.*csvcubed$": { + "config_version": 1, + "transform": build_csvw, + "transform_inputs": { + "^dataset.csv$": csv_sanity_check_v1, + "^metadata.json$": json_sanity_check_v1, + }, + "transform_kwargs": { + "output_directory": Path("output"), + "validation_errors_file_name": "validation_errors.json", + }, + "required_files": [ + {"matches": "^dataset.csv$"}, + {"matches": "^metadata.json$"}, + ], + "supplementary_distributions": [ + {"matches": "^dataset.csv$"}, + {"matches": "^metadata.json$"}, + ], + "secondary_function": csvcubed_ingress_v1, + }, } diff --git a/dpypelines/pipeline/csvcubed_ingress_v1.py b/dpypelines/pipeline/csvcubed_ingress_v1.py new file mode 100644 index 00000000..998abab9 --- /dev/null +++ b/dpypelines/pipeline/csvcubed_ingress_v1.py @@ -0,0 +1,466 @@ +import os +import re +from pathlib import Path + +from dpytools.http.upload.upload_service_client import UploadServiceClient +from dpytools.logging.logger import DpLogger +from dpytools.stores.directory.local import LocalDirectoryStore +from dpytools.utilities.utilities import str_to_bool + +# from dpypelines.pipeline.shared.email_template_message import ( +# file_not_found_email, +# supplementary_distribution_not_found_email, +# ) +# from dpypelines.pipeline.shared.notification import ( +# BasePipelineNotifier, +# notifier_from_env_var_webhook, +# ) +from dpypelines.pipeline.shared.pipelineconfig.matching import get_matching_pattern +from dpypelines.pipeline.shared.pipelineconfig.transform import get_transform_details + +# from dpypelines.pipeline.shared.utils import get_email_client, get_submitter_email + +logger = DpLogger("data-ingress-pipelines") + + +def csvcubed_ingress_v1(files_dir: str, pipeline_config: dict): + """ + Version 1 of the csvcubed ingress pipeline. + + Args: + files_dir (str): Path to the directory where the input files for this pipeline are located. + pipeline_config (dict): Dictionary of configuration details required to run the pipeline (determined by dataset id) + + Raises: + Exception: If any unexpected error occurs. + """ + # Create notifier from webhook env var + # try: + # de_notifier: BasePipelineNotifier = notifier_from_env_var_webhook( + # "DE_SLACK_WEBHOOK" + # ) + # logger.info("Notifier created", data={"notifier": de_notifier}) + # except Exception as err: + # logger.error("Error occurred when creating notifier", err) + # raise err + + try: + local_store = LocalDirectoryStore(files_dir) + files_in_directory = local_store.get_file_names() + logger.info( + "Local data store created", + data={ + "local_store": local_store, + "local_store_dir": files_dir, + "files_in_directory": files_in_directory, + }, + ) + except Exception as err: + logger.error( + "Error occurred when creating local data store from files directory", + err, + data={"local_store_dir": files_dir}, + ) + # de_notifier.failure() + raise err + + try: + manifest_dict = local_store.get_lone_matching_json_as_dict("manifest.json") + logger.info( + "Got manifest.json dict output", + data={"manifest_dict": manifest_dict}, + ) + except Exception as err: + logger.error("Error occurred when getting manifest_dict", err) + # de_notifier.failure() + raise err + + # try: + # submitter_email = get_submitter_email(manifest_dict) + # logger.info( + # "Got submitter email", + # data={"submitter_email": submitter_email}, + # ) + # except Exception as err: + # logger.error( + # "Error occurred when getting submitter email", + # err, + # data={"manifest_dict": manifest_dict}, + # ) + # # de_notifier.failure() + # raise err + + # # Create email client from env var + # try: + # email_client = get_email_client() + # logger.info("Created email client", data={"email_client": email_client}) + # except Exception as err: + # logger.error("Error occurred when creating email client", err) + # # de_notifier.failure() + # raise err + + skip_data_upload = os.environ.get("SKIP_DATA_UPLOAD", False) + + if skip_data_upload is not False: + try: + skip_data_upload = str_to_bool(skip_data_upload) + except Exception as err: + logger.error( + "Unable to cast SKIP_DATA_UPLOAD to boolean", + err, + data={"value": skip_data_upload}, + ) + # de_notifier.failure() + raise err + + # Get Upload Service URL from environment variable + if skip_data_upload is False: + try: + upload_url = os.environ.get("UPLOAD_SERVICE_URL", None) + assert ( + upload_url is not None + ), "UPLOAD_SERVICE_URL environment variable not set" + logger.info("Got Upload Service URL", data={"upload_url": upload_url}) + except Exception as err: + logger.error("Error occurred when getting Upload Service URL", err) + # de_notifier.failure() + raise err + + # Extract the patterns for required files from the pipeline configuration + try: + required_file_patterns = get_matching_pattern(pipeline_config, "required_files") + logger.info( + "Required file patterns retrieved from pipeline config", + data={ + "required_file_patterns": required_file_patterns, + "pipeline_config": pipeline_config, + }, + ) + except Exception as err: + logger.error( + "Error occurred when getting required file patterns", + err, + data={ + "pipeline_config": pipeline_config, + }, + ) + # de_notifier.failure() + raise err + + # Check for the existence of each required file + for required_file in required_file_patterns: + try: + if not local_store.has_lone_file_matching(required_file): + try: + raise FileNotFoundError( + f"Could not find file found matching pattern {required_file}" + ) + except FileNotFoundError as err: + # email_content = file_not_found_email(required_file) + # email_client.send( + # submitter_email, email_content.subject, email_content.message + # ) + logger.error( + "Error occurred when looking for required file", + err, + data={"required_file": required_file}, + ) + # de_notifier.failure() + raise err + except Exception as err: + logger.error( + "Error occurred when looking for required file", + err, + data={ + "required_file": required_file, + "required_file_patterns": required_file_patterns, + "files_in_directory": files_in_directory, + "pipeline_config": pipeline_config, + }, + ) + # de_notifier.failure() + raise err + + # Extract the patterns for supplementary distributions from the pipeline configuration + try: + supp_dist_patterns = get_matching_pattern( + pipeline_config, "supplementary_distributions" + ) + logger.info( + "Supplementary distribution patterns retrieved from pipeline config", + data={"supplementary_distribution_patterns": supp_dist_patterns}, + ) + except Exception as err: + files_in_directory = local_store.get_file_names() + logger.error( + "Error occurred when getting supplementary distribution patterns", + err, + data={"pipeline_config": pipeline_config}, + ) + # de_notifier.failure() + raise err + + # Check for the existence of each supplementary distribution + for supp_dist_pattern in supp_dist_patterns: + try: + if not local_store.has_lone_file_matching(supp_dist_pattern): + # Catch a trivial raise as we need the stack trace of the + # error for the logger, so it needs to be a raised error. + try: + raise FileNotFoundError( + f"No file found matching pattern {supp_dist_pattern}" + ) + except FileNotFoundError as err: + # email_content = supplementary_distribution_not_found_email( + # supp_dist_pattern + # ) + # email_client.send( + # submitter_email, email_content.subject, email_content.message + # ) + logger.error( + "Error occurred when looking for supplementary distribution", + err, + data={ + "supplementary_distribution": supp_dist_pattern, + }, + ) + # de_notifier.failure() + raise err + except Exception as err: + logger.error( + "Error occurred when looking for supplementary distribution", + err, + data={ + "supplementary_distribution": supp_dist_pattern, + "supplementary_distribution_patterns": supp_dist_patterns, + "files_in_directory": files_in_directory, + "pipeline_config": pipeline_config, + }, + ) + # de_notifier.failure() + raise err + + # Get the transform inputs from the pipeline_config and run the specified sanity checker for it + input_file_paths = [] + try: + transform_inputs = get_transform_details(pipeline_config, "transform_inputs") + logger.info("Got transform inputs", data={"transform_inputs": transform_inputs}) + except Exception as err: + logger.error( + "Error when getting transform inputs from pipeline config", + err, + data={"pipeline_config": pipeline_config}, + ) + + for pattern, sanity_checker in transform_inputs.items(): + try: + input_file_path: Path = local_store.get_pathlike_of_file_matching(pattern) + logger.info( + "Got input file that matches pattern.", + data={ + "input_file_path": input_file_path, + "pattern": pattern, + "files_in_directory": files_in_directory, + }, + ) + except Exception as err: + logger.error( + "Error occurred when looking for file matching pattern", + err, + data={ + "pattern": pattern, + "files_in_directory": files_in_directory, + "pipeline_config": pipeline_config, + }, + ) + + # de_notifier.failure() + raise err + + try: + sanity_checker(input_file_path) + logger.info( + "Sanity check run on input file path.", + data={ + "sanity_checker": sanity_checker, + "input_file_path": input_file_path, + }, + ) + except Exception as err: + logger.error( + "Error occurred when running sanity checker on input file path.", + err, + data={ + "input_file_path": input_file_path, + "files_in_directory": files_in_directory, + "pipeline_config": pipeline_config, + }, + ) + + # de_notifier.failure() + raise err + + input_file_paths.append(input_file_path) + + # Get the transform function from pipeline config + try: + transform_function = get_transform_details(pipeline_config, "transform") + logger.info( + "Got transform function from pipeline config", + data={ + "transform_function": transform_function, + "input_file_paths": input_file_paths, + }, + ) + except Exception as err: + logger.error( + "Error occurred when getting transform function from pipeline config", + err, + data={"pipeline_config": pipeline_config}, + ) + # Get transform keyword arguments (kwargs) from pipeline config + try: + transform_kwargs = get_transform_details(pipeline_config, "transform_kwargs") + logger.info( + "Got transform kwargs from pipeline config", + data={"transform_kwargs": transform_kwargs}, + ) + except Exception as err: + logger.error( + "Error occurred when getting transform kwargs from pipeline config", + err, + data={"pipeline_config": pipeline_config}, + ) + + try: + cube, validation_errors = transform_function( + *input_file_paths, **transform_kwargs + ) + logger.info( + "Successfully ran transform function", + data={ + "transform_function": transform_function, + "input_file_paths": input_file_paths, + "transform_kwargs": transform_kwargs, + "cube": cube, + "validation_errors": validation_errors, + }, + ) + + except Exception as err: + logger.error( + "Error occurred when running transform function", + err, + data={ + "transform_function": transform_function, + "input_file_paths": input_file_paths, + "transform_kwargs": transform_kwargs, + "pipeline_config": pipeline_config, + }, + ) + # de_notifier.failure() + raise err + + # TODO - validate the metadata once we have a schema for it. + + # TODO - validate the csv once we know what we're validating + + # Allow DE's to skip the upload to s3 part of the pipeline while + # developing code locally. + + logger.info( + "skip_data_upload set from SKIP_DATA_UPLOAD env var", + data={"value": skip_data_upload}, + ) + + if skip_data_upload is not True: + + # Upload output files to Upload Service + try: + # Create UploadClient from upload_url + upload_client = UploadServiceClient(upload_url) + logger.info( + "UploadClient created from upload_url", data={"upload_url": upload_url} + ) + except Exception as err: + logger.error( + "Error creating UploadClient", err, data={"upload_url": upload_url} + ) + # de_notifier.failure() + raise err + + # try: + # # Upload CSV to Upload Service + # upload_client.upload_new_csv(csv_path) + # logger.info( + # "Uploaded CSV to Upload Service", + # data={ + # "csv_path": csv_path, + # "upload_url": upload_url, + # }, + # ) + # except Exception as err: + # logger.error( + # "Error uploading CSV file to Upload Service", + # err, + # data={ + # "csv_path": csv_path, + # "upload_url": upload_url, + # }, + # ) + # # de_notifier.failure() + # raise err + + # # Check for supplementary distributions to upload + # if supp_dist_patterns: + # # Get list of all files in local store + # all_files = local_store.get_file_names() + # logger.info("Got all files in local store", data={"files": all_files}) + # for supp_dist_pattern in supp_dist_patterns: + # # Get supplementary distribution filename matching pattern from local store + # supp_dist_matching_files = [ + # f for f in all_files if re.search(supp_dist_pattern, f) + # ] + # assert ( + # len(supp_dist_matching_files) == 1 + # ), f"Error finding file matching pattern {supp_dist_pattern}: matching files are {supp_dist_matching_files}" + + # # Create a directory to save supplementary distribution + # supp_dist_path = local_store.get_pathlike_of_file_matching( + # supp_dist_pattern + # ) + # logger.info( + # "Got supplementary distribution", + # data={ + # "supplementary_distribution": supp_dist_path, + # "file_extension": supp_dist_path.suffix, + # }, + # ) + # # If the supplementary distribution is an XML file, upload to the Upload Service + # if supp_dist_path.suffix == ".xml": + # try: + # upload_client.upload_new_sdmx(supp_dist_path) + # logger.info( + # "Uploaded supplementary distribution", + # data={ + # "supplementary_distribution": supp_dist_path, + # "upload_url": upload_url, + # }, + # ) + # except Exception as err: + # logger.error( + # "Error uploading SDMX file to Upload Service", + # err, + # data={ + # "supplementary_distribution": supp_dist_path, + # "upload_url": upload_url, + # }, + # ) + # # de_notifier.failure() + # raise err + # else: + # raise NotImplementedError( + # f"Uploading files of type {supp_dist_path.suffix} not supported." + # ) + + # de_notifier.success() diff --git a/dpypelines/pipeline/shared/transforms/sanity_check.py b/dpypelines/pipeline/shared/transforms/sanity_check.py index 695cb915..c0c04fe6 100644 --- a/dpypelines/pipeline/shared/transforms/sanity_check.py +++ b/dpypelines/pipeline/shared/transforms/sanity_check.py @@ -1,4 +1,6 @@ +import json from pathlib import Path +import pandas as pd def sdmx_sanity_check_v1(sdmx_file: Path): @@ -13,3 +15,25 @@ def sdmx_sanity_check_v1(sdmx_file: Path): f.read() except Exception as err: raise Exception(f"Failed to read in xml - {sdmx_file}") from err + + +def csv_sanity_check_v1(csv_file: Path): + """ + Sanity check that the received csv file is actually an sdmx file. + """ + assert csv_file.suffix == ".csv", "Invalid csv file" + + try: + pd.read_csv(csv_file) + except Exception as err: + raise Exception(f"Failed to read csv file {csv_file}") from err + + +def json_sanity_check_v1(json_file: Path): + assert json_file.suffix == ".json" + + try: + with open(json_file, "r") as f: + json.load(f) + except Exception as err: + raise Exception(f"Failed to read json file {json_file}") from err diff --git a/dpypelines/s3_tar_received.py b/dpypelines/s3_tar_received.py index c0d08b06..39e82fa2 100644 --- a/dpypelines/s3_tar_received.py +++ b/dpypelines/s3_tar_received.py @@ -114,7 +114,7 @@ def start(s3_object_name: str): err, data={"source_id": source_id}, ) - notifier.failure(source_id=source_id) + notifier.failure() raise err # Get the path to the directory @@ -137,5 +137,5 @@ def start(s3_object_name: str): err, data={"pipeline_config": pipeline_config, "files_dir": files_dir}, ) - notifier.failure(source_id=source_id) + notifier.failure() raise err diff --git a/myscript.py b/myscript.py index 32801428..f7be5e2b 100644 --- a/myscript.py +++ b/myscript.py @@ -1,18 +1,21 @@ -#from dpypelines import s3_tar_received +import os +from csvcubed.cli.buildcsvw.build import build_csvw +from csvcubed.cli.inspectcsvw.inspect import inspect +from pathlib import Path -#s3_tar_received.start('joes-bucket-will-be-deleted/config-no-options.tar') +from dpypelines.pipeline.configuration import get_pipeline_config +from dpypelines.pipeline.csvcubed_ingress_v1 import csvcubed_ingress_v1 +from dpypelines.s3_tar_received import start -import sys -from behave.__main__ import run_behave -from behave.configuration import Configuration +# inspect(Path("out/4g-coverage.csv-metadata.json")) +# pipeline_config = get_pipeline_config("4g_coverage_csvcubed") +# csvcubed_ingress_v1("data", pipeline_config) -if __name__ == "__main__": - # args = sys.argv[1:] if len(sys.argv) > 1 else [] - args = [ - "--verbose", - "features/dataset_ingress.feature", # Feature file path - "-n", - "Generic ingress runs without errors", # Scenario text - ] - configuration = Configuration(args) - sys.exit(run_behave(configuration)) \ No newline at end of file +os.environ["DISABLE_NOTIFICATIONS"] = "true" +os.environ["AWS_PROFILE"] = "dp-sandbox" +# os.environ["SKIP_DATA_UPLOAD"] = "true" +os.environ["SERVICE_TOKEN_FOR_UPLOAD"] = ( + "eyJraWQiOiJqeFlva3pnVER5UVVNb1VTM0c0ODNoa0VjY3hFSklKdCtHVjAraHVSRUpBPSIsImFsZyI6IlJTMjU2In0.eyJzdWIiOiI4ODkwYjY4NC02NzU4LTRiN2YtODFkOS1jYjkyYTMyODJiZjMiLCJjb2duaXRvOmdyb3VwcyI6WyJyb2xlLXB1Ymxpc2hlciJdLCJpc3MiOiJodHRwczpcL1wvY29nbml0by1pZHAuZXUtd2VzdC0yLmFtYXpvbmF3cy5jb21cL2V1LXdlc3QtMl9XU0Q5RWNBc3ciLCJjbGllbnRfaWQiOiI0ZXZsOTFnNHRzNWlzbXVkaHJjYmI0ZGFvYyIsIm9yaWdpbl9qdGkiOiJkZGRlYTA5YS1mNzc2LTRhMzYtYTU3ZS02ZDkyOTJhYzk0YzAiLCJldmVudF9pZCI6Ijk0ZjMxNTRiLWU5YjctNDkzNS04NmUyLWIxMjY1YWExN2IxNCIsInRva2VuX3VzZSI6ImFjY2VzcyIsInNjb3BlIjoiYXdzLmNvZ25pdG8uc2lnbmluLnVzZXIuYWRtaW4iLCJhdXRoX3RpbWUiOjE3MjkyNDA1NzUsImV4cCI6MTcyOTI0MTQ3NSwiaWF0IjoxNzI5MjQwNTc1LCJqdGkiOiI0MDFmOGU3Ny04OGJlLTQxNzAtYTJhZi04ZDhjNzVlZGIyOWIiLCJ1c2VybmFtZSI6IjdkNTEzOWJjLTUyZGEtNDgxZi1hOTY0LTQ5MTkwYzRkZWNmMyJ9.qwnbsG0cYasPkI-mmnmLSwObyY5rNRnj4PGeSy8Img21jYhkkzZQjtUqw2TN0q_j0zlXJ1XuPwHyn8s4-a4AhYeiKeIaVzLD3edjlmd3hjIzSozK5XQ8MrOcae8LJggVCJ2OCP0eGL3EsBd_eBH2K6QOlZa5LgZFukqcXqsu0H9v80s2gTIy55jDs9gz1XOyL_YaQAd7e9LPPoszTBLNqPuBmxkhJ01vxMEinbU1ooPZ1WuzNjhxnMyzmg6RSlxreq-wsqcn-95bnJqpWfuBosDsHT8183O-7sfle5eXpiANzKgesRXTBv1YPa49_z_NVRit7e21QUwYv1EZ8nP3_w" +) +os.environ["UPLOAD_SERVICE_URL"] = "http://localhost:11850/upload-new" +start("dp-sandbox-ingest-submission-bucket/csvcubed.tar")