From 5a9b867d7a5468a25797e2605c702dccded0ba38 Mon Sep 17 00:00:00 2001 From: Cliff Hill Date: Tue, 5 Nov 2024 17:21:14 -0500 Subject: [PATCH 1/4] Sticking the job cache into the flask app's config, in an effort to improve/fix things. Signed-off-by: Cliff Hill --- app/__init__.py | 4 ++++ app/aws/s3.py | 42 ++++++++++++++++++++++++---------------- tests/app/aws/test_s3.py | 15 ++++++-------- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 380964b53..16ffbd5a9 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -4,6 +4,7 @@ import time import uuid from contextlib import contextmanager +from multiprocessing import Manager from time import monotonic from celery import Celery, Task, current_task @@ -119,6 +120,9 @@ def create_app(application): redis_store.init_app(application) document_download_client.init_app(application) + manager = Manager() + application.config["job_cache"] = manager.dict() + register_blueprint(application) # avoid circular imports by importing this file later diff --git a/app/aws/s3.py b/app/aws/s3.py index 44785cf98..3309edfcd 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -2,7 +2,6 @@ import re import time from concurrent.futures import ThreadPoolExecutor -from multiprocessing import Manager import botocore from boto3 import Session @@ -16,8 +15,6 @@ # Temporarily extend cache to 7 days ttl = 60 * 60 * 24 * 7 -manager = Manager() -job_cache = manager.dict() # Global variable @@ -25,11 +22,23 @@ s3_resource = None -def set_job_cache(job_cache, key, value): +def set_job_cache(key, value): + job_cache = current_app.config["job_cache"] job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60) +def get_job_cache(key): + job_cache = current_app.config["job_cache"] + return job_cache.get(key) + + +def len_job_cache(): + job_cache = current_app.config["job_cache"] + return len(job_cache) + + def clean_cache(): + job_cache = current_app.config["job_cache"] current_time = time.time() keys_to_delete = [] for key, (_, expiry_time) in job_cache.items(): @@ -162,17 +171,16 @@ def read_s3_file(bucket_name, object_key, s3res): """ try: job_id = get_job_id_from_s3_object_key(object_key) - if job_cache.get(job_id) is None: + if get_job_cache(job_id) is None: object = ( s3res.Object(bucket_name, object_key) .get()["Body"] .read() .decode("utf-8") ) - set_job_cache(job_cache, job_id, object) - set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object)) + set_job_cache(job_id, object) + set_job_cache(f"{job_id}_phones", extract_phones(object)) set_job_cache( - job_cache, f"{job_id}_personalisation", extract_personalisation(object), ) @@ -192,7 +200,7 @@ def get_s3_files(): s3res = get_s3_resource() current_app.logger.info( - f"job_cache length before regen: {len(job_cache)} #notify-admin-1200" + f"job_cache length before regen: {len_job_cache()} #notify-admin-1200" ) try: with ThreadPoolExecutor() as executor: @@ -201,7 +209,7 @@ def get_s3_files(): current_app.logger.exception("Connection pool issue") current_app.logger.info( - f"job_cache length after regen: {len(job_cache)} #notify-admin-1200" + f"job_cache length after regen: {len_job_cache()} #notify-admin-1200" ) @@ -424,12 +432,12 @@ def extract_personalisation(job): def get_phone_number_from_s3(service_id, job_id, job_row_number): - job = job_cache.get(job_id) + job = get_job_cache(job_id) if job is None: current_app.logger.info(f"job {job_id} was not in the cache") job = get_job_from_s3(service_id, job_id) # Even if it is None, put it here to avoid KeyErrors - set_job_cache(job_cache, job_id, job) + set_job_cache(job_id, job) else: # skip expiration date from cache, we don't need it here job = job[0] @@ -441,7 +449,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): return "Unavailable" phones = extract_phones(job) - set_job_cache(job_cache, f"{job_id}_phones", phones) + set_job_cache(f"{job_id}_phones", phones) # If we can find the quick dictionary, use it phone_to_return = phones[job_row_number] @@ -458,12 +466,12 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): # We don't want to constantly pull down a job from s3 every time we need the personalisation. # At the same time we don't want to store it in redis or the db # So this is a little recycling mechanism to reduce the number of downloads. - job = job_cache.get(job_id) + job = get_job_cache(job_id) if job is None: current_app.logger.info(f"job {job_id} was not in the cache") job = get_job_from_s3(service_id, job_id) # Even if it is None, put it here to avoid KeyErrors - set_job_cache(job_cache, job_id, job) + set_job_cache(job_id, job) else: # skip expiration date from cache, we don't need it here job = job[0] @@ -478,9 +486,9 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): ) return {} - set_job_cache(job_cache, f"{job_id}_personalisation", extract_personalisation(job)) + set_job_cache(f"{job_id}_personalisation", extract_personalisation(job)) - return job_cache.get(f"{job_id}_personalisation")[0].get(job_row_number) + return get_job_cache(f"{job_id}_personalisation")[0].get(job_row_number) def get_job_metadata_from_s3(service_id, job_id): diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index 6efe55fe2..5795f3bba 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -70,7 +70,7 @@ def test_cleanup_old_s3_objects(mocker): mock_remove_csv_object.assert_called_once_with("A") -def test_read_s3_file_success(mocker): +def test_read_s3_file_success(client, mocker): mock_s3res = MagicMock() mock_extract_personalisation = mocker.patch("app.aws.s3.extract_personalisation") mock_extract_phones = mocker.patch("app.aws.s3.extract_phones") @@ -89,16 +89,13 @@ def test_read_s3_file_success(mocker): mock_extract_phones.return_value = ["1234567890"] mock_extract_personalisation.return_value = {"name": "John Doe"} - global job_cache - job_cache = {} - read_s3_file(bucket_name, object_key, mock_s3res) mock_get_job_id.assert_called_once_with(object_key) mock_s3res.Object.assert_called_once_with(bucket_name, object_key) expected_calls = [ - call(ANY, job_id, file_content), - call(ANY, f"{job_id}_phones", ["1234567890"]), - call(ANY, f"{job_id}_personalisation", {"name": "John Doe"}), + call(job_id, file_content), + call(f"{job_id}_phones", ["1234567890"]), + call(f"{job_id}_personalisation", {"name": "John Doe"}), ] mock_set_job_cache.assert_has_calls(expected_calls, any_order=True) @@ -380,9 +377,9 @@ def test_file_exists_false(notify_api, mocker): get_s3_mock.assert_called_once() -def test_get_s3_files_success(notify_api, mocker): +def test_get_s3_files_success(client, mocker): mock_current_app = mocker.patch("app.aws.s3.current_app") - mock_current_app.config = {"CSV_UPLOAD_BUCKET": {"bucket": "test-bucket"}} + mock_current_app.config = {"CSV_UPLOAD_BUCKET": {"bucket": "test-bucket"}, "job_cache": {}} mock_thread_pool_executor = mocker.patch("app.aws.s3.ThreadPoolExecutor") mock_read_s3_file = mocker.patch("app.aws.s3.read_s3_file") mock_list_s3_objects = mocker.patch("app.aws.s3.list_s3_objects") From 50754d92c4aa7ccd64d1fd42fb3164df6412c583 Mon Sep 17 00:00:00 2001 From: Cliff Hill Date: Tue, 5 Nov 2024 17:25:58 -0500 Subject: [PATCH 2/4] Flake8 stuff. Signed-off-by: Cliff Hill --- tests/app/aws/test_s3.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index 5795f3bba..e4a9c1c07 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -1,7 +1,7 @@ import os from datetime import timedelta from os import getenv -from unittest.mock import ANY, MagicMock, Mock, call, patch +from unittest.mock import MagicMock, Mock, call, patch import botocore import pytest @@ -379,7 +379,10 @@ def test_file_exists_false(notify_api, mocker): def test_get_s3_files_success(client, mocker): mock_current_app = mocker.patch("app.aws.s3.current_app") - mock_current_app.config = {"CSV_UPLOAD_BUCKET": {"bucket": "test-bucket"}, "job_cache": {}} + mock_current_app.config = { + "CSV_UPLOAD_BUCKET": {"bucket": "test-bucket"}, + "job_cache": {}, + } mock_thread_pool_executor = mocker.patch("app.aws.s3.ThreadPoolExecutor") mock_read_s3_file = mocker.patch("app.aws.s3.read_s3_file") mock_list_s3_objects = mocker.patch("app.aws.s3.list_s3_objects") From ec8ee67ee9bf743042e5e060fbdb4dc042d5bc99 Mon Sep 17 00:00:00 2001 From: Cliff Hill Date: Tue, 5 Nov 2024 17:33:30 -0500 Subject: [PATCH 3/4] Logging job_cache actions, to see what's going on. Signed-off-by: Cliff Hill --- app/aws/s3.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 3309edfcd..f83b9059d 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -23,18 +23,26 @@ def set_job_cache(key, value): + current_app.logger.info(f"Setting {key} in the job_cache.") job_cache = current_app.config["job_cache"] job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60) def get_job_cache(key): job_cache = current_app.config["job_cache"] - return job_cache.get(key) + ret = job_cache.get(key) + if ret is None: + current_app.logger.warning(f"Could not find {key} in the job_cache.") + else: + current_app.logger.info(f"Got {key} from job_cache.") + return ret def len_job_cache(): job_cache = current_app.config["job_cache"] - return len(job_cache) + ret = len(job_cache) + current_app.logger.info(f"Length of job_cache is {ret}") + return ret def clean_cache(): @@ -45,6 +53,9 @@ def clean_cache(): if expiry_time < current_time: keys_to_delete.append(key) + current_app.logger.info( + f"Deleting the following keys from the job_cache: {keys_to_delete}" + ) for key in keys_to_delete: del job_cache[key] From e2956e28182f5406c2e81914a99372e77786bedc Mon Sep 17 00:00:00 2001 From: Carlo Costino Date: Tue, 5 Nov 2024 21:55:14 -0500 Subject: [PATCH 4/4] Bump production app memory to 3 GB This changeset bumps our production app memory for the API to 3 GB available in anticipation of the shift of the job cache being managed by the application itself instead of a worker process. Signed-off-by: Carlo Costino --- deploy-config/production.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy-config/production.yml b/deploy-config/production.yml index f593f63a2..f09f9a73d 100644 --- a/deploy-config/production.yml +++ b/deploy-config/production.yml @@ -1,6 +1,6 @@ env: production web_instances: 2 -web_memory: 2G +web_memory: 3G worker_instances: 4 worker_memory: 3G scheduler_memory: 256M