Skip to content

Commit

Permalink
Merge pull request #1339 from GSA/faster_s3_downloads
Browse files Browse the repository at this point in the history
improve report performance
  • Loading branch information
terrazoon authored Oct 1, 2024
2 parents 816c8e9 + 544e7e6 commit 216528d
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 24 deletions.
4 changes: 2 additions & 2 deletions .ds.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
"filename": "tests/app/aws/test_s3.py",
"hashed_secret": "67a74306b06d0c01624fe0d0249a570f4d093747",
"is_verified": false,
"line_number": 28,
"line_number": 29,
"is_secret": false
}
],
Expand Down Expand Up @@ -384,5 +384,5 @@
}
]
},
"generated_at": "2024-09-26T20:29:19Z"
"generated_at": "2024-09-27T16:42:53Z"
}
79 changes: 57 additions & 22 deletions app/aws/s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import re
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Manager

import botocore
Expand Down Expand Up @@ -132,33 +133,67 @@ def cleanup_old_s3_objects():
)


def get_s3_files():
def get_job_id_from_s3_object_key(key):
object_arr = key.split("/")
job_id = object_arr[1] # get the job_id
job_id = job_id.replace(".csv", "") # we just want the job_id
return job_id


def read_s3_file(bucket_name, object_key, s3res):
"""
This method runs during the 'regenerate job cache' task.
Note that in addition to retrieving the jobs and putting them
into the cache, this method also does some pre-processing by
putting a list of all phone numbers into the cache as well.
This means that when the report needs to be regenerated, it
can easily find the phone numbers in the cache through job_cache[<job_id>_phones]
and the personalization through job_cache[<job_id>_personalisation], which
in theory should make report generation a lot faster.
We are moving processing from the front end where the user can see it
in wait time, to this back end process.
"""
try:
job_id = get_job_id_from_s3_object_key(object_key)
if job_cache.get(job_id) is None:
object = (
s3res.Object(bucket_name, object_key)
.get()["Body"]
.read()
.decode("utf-8")
)
set_job_cache(job_cache, job_id, object)
set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object))
set_job_cache(
job_cache,
f"{job_id}_personalisation",
extract_personalisation(object),
)

except LookupError:
# perhaps our key is not formatted as we expected. If so skip it.
current_app.logger.exception("LookupError #notify-admin-1200")


def get_s3_files():
"""
We're using the ThreadPoolExecutor here to speed up the retrieval of S3
csv files for scaling needs.
"""
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
objects = list_s3_objects()
object_keys = list_s3_objects()

s3res = get_s3_resource()
current_app.logger.info(
f"job_cache length before regen: {len(job_cache)} #notify-admin-1200"
)
for object in objects:
# We put our csv files in the format "service-{service_id}-notify/{job_id}"
try:
object_arr = object.split("/")
job_id = object_arr[1] # get the job_id
job_id = job_id.replace(".csv", "") # we just want the job_id
if job_cache.get(job_id) is None:
object = (
s3res.Object(bucket_name, object)
.get()["Body"]
.read()
.decode("utf-8")
)
if "phone number" in object.lower():
set_job_cache(job_cache, job_id, object)
except LookupError:
# perhaps our key is not formatted as we expected. If so skip it.
current_app.logger.exception("LookupError #notify-admin-1200")
try:
with ThreadPoolExecutor() as executor:
executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys)
except Exception:
current_app.logger.exception("Connection pool issue")

current_app.logger.info(
f"job_cache length after regen: {len(job_cache)} #notify-admin-1200"
Expand Down Expand Up @@ -363,7 +398,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
)
return "Unavailable"

# If we look in the JOBS cache for the quick lookup dictionary of phones for a given job
# If we look in the job_cache for the quick lookup dictionary of phones for a given job
# and that dictionary is not there, create it
if job_cache.get(f"{job_id}_phones") is None:
phones = extract_phones(job)
Expand Down Expand Up @@ -400,7 +435,7 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
)
return {}

# If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job
# If we look in the job_cache for the quick lookup dictionary of personalisations for a given job
# and that dictionary is not there, create it
if job_cache.get(f"{job_id}_personalisation") is None:
set_job_cache(
Expand Down
4 changes: 4 additions & 0 deletions app/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"addressing_style": "virtual",
},
use_fips_endpoint=True,
# This is the default but just for doc sake
# there may come a time when increasing this helps
# with job cache management.
max_pool_connections=10,
)


Expand Down
6 changes: 6 additions & 0 deletions notifications_utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def init_app(app):
for logger_instance, handler in product(warning_loggers, handlers):
logger_instance.addHandler(handler)
logger_instance.setLevel(logging.WARNING)

# Suppress specific loggers to prevent leaking sensitive info
logging.getLogger("boto3").setLevel(logging.ERROR)
logging.getLogger("botocore").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)

app.logger.info("Logging configured")


Expand Down
16 changes: 16 additions & 0 deletions tests/app/aws/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
cleanup_old_s3_objects,
file_exists,
get_job_from_s3,
get_job_id_from_s3_object_key,
get_personalisation_from_s3,
get_phone_number_from_s3,
get_s3_file,
Expand Down Expand Up @@ -117,6 +118,21 @@ def test_get_phone_number_from_s3(
assert phone_number == expected_phone_number


@pytest.mark.parametrize(
"key, expected_job_id",
[
("service-blahblahblah-notify/abcde.csv", "abcde"),
(
"service-x-notify/4c99f361-4ed7-49b1-bd6f-02fe0c807c53.csv",
"4c99f361-4ed7-49b1-bd6f-02fe0c807c53",
),
],
)
def test_get_job_id_from_s3_object_key(key, expected_job_id):
actual_job_id = get_job_id_from_s3_object_key(key)
assert actual_job_id == expected_job_id


def mock_s3_get_object_slowdown(*args, **kwargs):
error_response = {
"Error": {
Expand Down

0 comments on commit 216528d

Please sign in to comment.