diff --git a/main.py b/main.py index d8b5d07..3721aed 100644 --- a/main.py +++ b/main.py @@ -1,80 +1,22 @@ """ETL script for raw Epi/HAI CRE alert docx.""" import io -import sys -from pathlib import Path -import boto3 as boto3 import dateutil.parser as dparser import pandas as pd -from awsglue.context import GlueContext -from awsglue.utils import getResolvedOptions +from capepy.aws.glue import EtlJob from docx import Document -from pyspark.sql import SparkSession -# for our purposes here, the spark and glue context are only (currently) needed -# to get the logger. -spark_ctx = SparkSession.builder.getOrCreate() # pyright: ignore -glue_ctx = GlueContext(spark_ctx) -logger = glue_ctx.get_logger() - -# TODO: -# - add error handling for the format of the document being incorrect -# - figure out how we want to name and namespace clean files (e.g. will we -# take the object key we're given, strip the extension and replace it with -# one for the new format, or will we do something else) -# - see what we can extract out of here to be useful for other ETLs. imagine -# we'd have a few different things that could be made into a reusable -# package - -parameters = getResolvedOptions( - sys.argv, - [ - "RAW_BUCKET_NAME", - "ALERT_OBJ_KEY", - "CLEAN_BUCKET_NAME", - ], -) - -raw_bucket_name = parameters["RAW_BUCKET_NAME"] -alert_obj_key = parameters["ALERT_OBJ_KEY"] -clean_bucket_name = parameters["CLEAN_BUCKET_NAME"] +etl_job = EtlJob() # NOTE: for now we'll take the alert object key and change out the file # extension for the clean data (leaving all namespacing and such). this # will probably need to change -clean_obj_key = str(Path(alert_obj_key).with_suffix(".csv")) - -# NOTE: May need some creds here -s3_client = boto3.client("s3") - -# try to get the docx object from S3 and handle any error that would keep us -# from continuing. -response = s3_client.get_object(Bucket=raw_bucket_name, Key=alert_obj_key) - -status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") - -if status != 200: - err = ( - f"ERROR - Could not get object {alert_obj_key} from bucket " - f"{raw_bucket_name}. ETL Cannot continue." - ) - - logger.error(err) - - # NOTE: need to properly handle exception stuff here, and we probably want - # this going somewhere very visible (e.g. SNS topic or a perpetual log - # as someone will need to be made aware) - raise Exception(err) - -logger.info(f"Obtained object {alert_obj_key} from bucket {raw_bucket_name}.") - -# handle the document itself... +clean_obj_key = etl_job.parameters["OBJECT_KEY"].replace(".docx", ".csv") # the response should contain a StreamingBody object that needs to be converted # to a file like object to make the docx library happy -f = io.BytesIO(response.get("Body").read()) -document = Document(f) +document = Document(io.BytesIO(etl_job.get_raw_file())) # NOTE: this document is assumed to contain a single table that needs to be # processed and nothing else. The file consists of: @@ -125,27 +67,4 @@ # write out the transformed data with io.StringIO() as csv_buff: interim.to_csv(csv_buff, index=False) - - response = s3_client.put_object( - Bucket=clean_bucket_name, Key=clean_obj_key, Body=csv_buff.getvalue() - ) - - status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") - - if status != 200: - err = ( - f"ERROR - Could not write transformed data object {clean_obj_key} " - f"to bucket {clean_bucket_name}. ETL Cannot continue." - ) - - logger.error(err) - - # NOTE: need to properly handle exception stuff here, and we probably - # want this going somewhere very visible (e.g. SNS topic or a - # perpetual log as someone will need to be made aware) - raise Exception(err) - - logger.info( - f"Transformed {raw_bucket_name}/{alert_obj_key} and wrote result " - f"to {clean_bucket_name}/{clean_obj_key}" - ) + etl_job.write_clean_file(csv_buff.getvalue(), clean_obj_key) diff --git a/requirements.txt b/requirements.txt index fe0f945..972b427 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ aws-glue-libs @ git+https://github.com/awslabs/aws-glue-libs@9d8293962e6ffc607e5dc328e246f40b24010fa8 boto3==1.34.103 +capepy>=1.0.0,<2.0.0 openpyxl==3.1.2 pandas==2.2.2 pyspark==3.5.1