diff --git a/.ds.baseline b/.ds.baseline index 34faf15cc..1c279e018 100644 --- a/.ds.baseline +++ b/.ds.baseline @@ -209,7 +209,7 @@ "filename": "tests/app/aws/test_s3.py", "hashed_secret": "67a74306b06d0c01624fe0d0249a570f4d093747", "is_verified": false, - "line_number": 28, + "line_number": 29, "is_secret": false } ], @@ -384,5 +384,5 @@ } ] }, - "generated_at": "2024-09-26T20:29:19Z" + "generated_at": "2024-09-27T16:42:53Z" } diff --git a/app/aws/s3.py b/app/aws/s3.py index cfe46a3b3..256800bf9 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -1,6 +1,7 @@ import datetime import re import time +from concurrent.futures import ThreadPoolExecutor from multiprocessing import Manager import botocore @@ -132,33 +133,67 @@ def cleanup_old_s3_objects(): ) -def get_s3_files(): +def get_job_id_from_s3_object_key(key): + object_arr = key.split("/") + job_id = object_arr[1] # get the job_id + job_id = job_id.replace(".csv", "") # we just want the job_id + return job_id + + +def read_s3_file(bucket_name, object_key, s3res): + """ + This method runs during the 'regenerate job cache' task. + Note that in addition to retrieving the jobs and putting them + into the cache, this method also does some pre-processing by + putting a list of all phone numbers into the cache as well. + + This means that when the report needs to be regenerated, it + can easily find the phone numbers in the cache through job_cache[_phones] + and the personalization through job_cache[_personalisation], which + in theory should make report generation a lot faster. + + We are moving processing from the front end where the user can see it + in wait time, to this back end process. + """ + try: + job_id = get_job_id_from_s3_object_key(object_key) + if job_cache.get(job_id) is None: + object = ( + s3res.Object(bucket_name, object_key) + .get()["Body"] + .read() + .decode("utf-8") + ) + set_job_cache(job_cache, job_id, object) + set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object)) + set_job_cache( + job_cache, + f"{job_id}_personalisation", + extract_personalisation(object), + ) + + except LookupError: + # perhaps our key is not formatted as we expected. If so skip it. + current_app.logger.exception("LookupError #notify-admin-1200") + +def get_s3_files(): + """ + We're using the ThreadPoolExecutor here to speed up the retrieval of S3 + csv files for scaling needs. + """ bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] - objects = list_s3_objects() + object_keys = list_s3_objects() s3res = get_s3_resource() current_app.logger.info( f"job_cache length before regen: {len(job_cache)} #notify-admin-1200" ) - for object in objects: - # We put our csv files in the format "service-{service_id}-notify/{job_id}" - try: - object_arr = object.split("/") - job_id = object_arr[1] # get the job_id - job_id = job_id.replace(".csv", "") # we just want the job_id - if job_cache.get(job_id) is None: - object = ( - s3res.Object(bucket_name, object) - .get()["Body"] - .read() - .decode("utf-8") - ) - if "phone number" in object.lower(): - set_job_cache(job_cache, job_id, object) - except LookupError: - # perhaps our key is not formatted as we expected. If so skip it. - current_app.logger.exception("LookupError #notify-admin-1200") + try: + with ThreadPoolExecutor() as executor: + executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys) + except Exception: + current_app.logger.exception("Connection pool issue") current_app.logger.info( f"job_cache length after regen: {len(job_cache)} #notify-admin-1200" @@ -363,7 +398,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): ) return "Unavailable" - # If we look in the JOBS cache for the quick lookup dictionary of phones for a given job + # If we look in the job_cache for the quick lookup dictionary of phones for a given job # and that dictionary is not there, create it if job_cache.get(f"{job_id}_phones") is None: phones = extract_phones(job) @@ -400,7 +435,7 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): ) return {} - # If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job + # If we look in the job_cache for the quick lookup dictionary of personalisations for a given job # and that dictionary is not there, create it if job_cache.get(f"{job_id}_personalisation") is None: set_job_cache( diff --git a/app/clients/__init__.py b/app/clients/__init__.py index 9c1f4af68..19b719c1c 100644 --- a/app/clients/__init__.py +++ b/app/clients/__init__.py @@ -13,6 +13,10 @@ "addressing_style": "virtual", }, use_fips_endpoint=True, + # This is the default but just for doc sake + # there may come a time when increasing this helps + # with job cache management. + max_pool_connections=10, ) diff --git a/notifications_utils/logging.py b/notifications_utils/logging.py index 3ec092b8c..dc55ae653 100644 --- a/notifications_utils/logging.py +++ b/notifications_utils/logging.py @@ -39,6 +39,12 @@ def init_app(app): for logger_instance, handler in product(warning_loggers, handlers): logger_instance.addHandler(handler) logger_instance.setLevel(logging.WARNING) + + # Suppress specific loggers to prevent leaking sensitive info + logging.getLogger("boto3").setLevel(logging.ERROR) + logging.getLogger("botocore").setLevel(logging.ERROR) + logging.getLogger("urllib3").setLevel(logging.ERROR) + app.logger.info("Logging configured") diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index 9cdadc354..8e3863d5c 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -9,6 +9,7 @@ cleanup_old_s3_objects, file_exists, get_job_from_s3, + get_job_id_from_s3_object_key, get_personalisation_from_s3, get_phone_number_from_s3, get_s3_file, @@ -117,6 +118,21 @@ def test_get_phone_number_from_s3( assert phone_number == expected_phone_number +@pytest.mark.parametrize( + "key, expected_job_id", + [ + ("service-blahblahblah-notify/abcde.csv", "abcde"), + ( + "service-x-notify/4c99f361-4ed7-49b1-bd6f-02fe0c807c53.csv", + "4c99f361-4ed7-49b1-bd6f-02fe0c807c53", + ), + ], +) +def test_get_job_id_from_s3_object_key(key, expected_job_id): + actual_job_id = get_job_id_from_s3_object_key(key) + assert actual_job_id == expected_job_id + + def mock_s3_get_object_slowdown(*args, **kwargs): error_response = { "Error": {