Skip to content

Commit

Permalink
merge from main
Browse files Browse the repository at this point in the history
  • Loading branch information
Kenneth Kehl committed Jan 14, 2025
2 parents 99d9db2 + 67d03dd commit fdf6158
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/daily_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
- name: Run scan
run: bandit -r app/ -f txt -o /tmp/bandit-output.txt --confidence-level medium
- name: Upload bandit artifact
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: bandit-report
path: /tmp/bandit-output.txt
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ run-celery: ## Run celery, TODO remove purge for staging/prod
-A run_celery.notify_celery worker \
--pidfile="/tmp/celery.pid" \
--loglevel=INFO \
--concurrency=4
--pool=threads
--concurrency=10


.PHONY: dead-code
Expand Down
16 changes: 15 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from werkzeug.exceptions import HTTPException as WerkzeugHTTPException
from werkzeug.local import LocalProxy

from app import config
from app.clients import NotificationProviderClients
from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient
from app.clients.document_download import DocumentDownloadClient
Expand Down Expand Up @@ -58,15 +59,28 @@ class SQLAlchemy(_SQLAlchemy):

def apply_driver_hacks(self, app, info, options):
sa_url, options = super().apply_driver_hacks(app, info, options)

if "connect_args" not in options:
options["connect_args"] = {}
options["connect_args"]["options"] = "-c statement_timeout={}".format(
int(app.config["SQLALCHEMY_STATEMENT_TIMEOUT"]) * 1000
)

return (sa_url, options)


db = SQLAlchemy()
# Set db engine settings here for now.
# They were not being set previous (despite environmental variables with appropriate
# sounding names) and were defaulting to low values
db = SQLAlchemy(
engine_options={
"pool_size": config.Config.SQLALCHEMY_POOL_SIZE,
"max_overflow": 10,
"pool_timeout": config.Config.SQLALCHEMY_POOL_TIMEOUT,
"pool_recycle": config.Config.SQLALCHEMY_POOL_RECYCLE,
"pool_pre_ping": True,
}
)
migrate = Migrate()
ma = Marshmallow()
notify_celery = NotifyCelery()
Expand Down
18 changes: 14 additions & 4 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
find_missing_row_for_job,
)
from app.dao.notifications_dao import (
dao_close_out_delivery_receipts,
dao_update_delivery_receipts,
notifications_not_yet_sent,
)
Expand Down Expand Up @@ -130,9 +131,9 @@ def check_job_status():
)
)

jobs_not_complete_after_allotted_time = (
db.session.execute(jobs_not_completed_after_allotted_time).all()
)
jobs_not_complete_after_allotted_time = db.session.execute(
jobs_not_completed_after_allotted_time
).all()

# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
# if they haven't been re-processed in time.
Expand Down Expand Up @@ -247,6 +248,8 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts"
)
def process_delivery_receipts(self):
# If we need to check db settings do it here for convenience
# current_app.logger.info(f"POOL SIZE {app.db.engine.pool.size()}")
"""
Every eight minutes or so (see config.py) we run this task, which searches the last ten
minutes of logs for delivery receipts and batch updates the db with the results. The overlap
Expand All @@ -261,7 +264,7 @@ def process_delivery_receipts(self):

cloudwatch = AwsCloudwatchClient()
cloudwatch.init_app(current_app)
start_time = aware_utcnow() - timedelta(minutes=10)
start_time = aware_utcnow() - timedelta(minutes=3)
end_time = aware_utcnow()
delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts(
start_time, end_time
Expand All @@ -283,3 +286,10 @@ def process_delivery_receipts(self):
current_app.logger.error(
"Failed process delivery receipts after max retries"
)


@notify_celery.task(
bind=True, max_retries=2, default_retry_delay=3600, name="cleanup-delivery-receipts"
)
def cleanup_delivery_receipts(self):
dao_close_out_delivery_receipts()
3 changes: 1 addition & 2 deletions app/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
"addressing_style": "virtual",
},
use_fips_endpoint=True,
# This is the default but just for doc sake
max_pool_connections=10,
max_pool_connections=50, # This should be equal or greater than our celery concurrency
)


Expand Down
9 changes: 7 additions & 2 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Config(object):
SQLALCHEMY_DATABASE_URI = cloud_config.database_url
SQLALCHEMY_RECORD_QUERIES = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 5))
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 40))
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = 300
SQLALCHEMY_STATEMENT_TIMEOUT = 1200
Expand Down Expand Up @@ -200,7 +200,12 @@ class Config(object):
},
"process-delivery-receipts": {
"task": "process-delivery-receipts",
"schedule": timedelta(minutes=8),
"schedule": timedelta(minutes=2),
"options": {"queue": QueueNames.PERIODIC},
},
"cleanup-delivery-receipts": {
"task": "cleanup-delivery-receipts",
"schedule": timedelta(minutes=82),
"options": {"queue": QueueNames.PERIODIC},
},
"expire-or-delete-invitations": {
Expand Down
5 changes: 3 additions & 2 deletions app/dao/jobs_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ def dao_get_job_by_service_id_and_job_id(service_id, job_id):


def dao_get_unfinished_jobs():
stmt = select(Job).where(Job.processing_finished.is_(None))
return db.session.execute(stmt).all()

stmt = select(Job).filter(Job.processing_finished.is_(None))
return db.session.execute(stmt).scalars().all()


def dao_get_jobs_by_service_id(
Expand Down
26 changes: 26 additions & 0 deletions app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from datetime import timedelta
from time import time

from flask import current_app
from sqlalchemy import (
Expand Down Expand Up @@ -745,6 +746,7 @@ def get_service_ids_with_notifications_on_date(notification_type, date):


def dao_update_delivery_receipts(receipts, delivered):
start_time_millis = time() * 1000
new_receipts = []
for r in receipts:
if isinstance(r, str):
Expand Down Expand Up @@ -791,3 +793,27 @@ def dao_update_delivery_receipts(receipts, delivered):
)
db.session.execute(stmt)
db.session.commit()
elapsed_time = (time() * 1000) - start_time_millis
current_app.logger.info(
f"#loadtestperformance batch update query time: \
updated {len(receipts)} notification in {elapsed_time} ms"
)


def dao_close_out_delivery_receipts():
THREE_DAYS_AGO = utc_now() - timedelta(minutes=3)
stmt = (
update(Notification)
.where(
Notification.status == NotificationStatus.PENDING,
Notification.sent_at < THREE_DAYS_AGO,
)
.values(status=NotificationStatus.FAILED, provider_response="Technical Failure")
)
result = db.session.execute(stmt)

db.session.commit()
if result:
current_app.logger.info(
f"Marked {result.rowcount} notifications as technical failures"
)
11 changes: 10 additions & 1 deletion app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,16 @@ def get_inbound_number(self):
return self.inbound_number.number

def get_default_sms_sender(self):
default_sms_sender = [x for x in self.service_sms_senders if x.is_default]
# notify-api-1513 let's try a minimalistic fix
# to see if we can get the right numbers back
default_sms_sender = [
x
for x in self.service_sms_senders
if x.is_default and x.service_id == self.id
]
current_app.logger.info(
f"#notify-api-1513 senders for service {self.name} are {self.service_sms_senders}"
)
return default_sms_sender[0].sms_sender

def get_default_reply_to_email_address(self):
Expand Down
23 changes: 23 additions & 0 deletions docs/adrs/0010-adr-celery-pool-support-best-practice.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Make best use of celery worker pools

Status: N/A
Date: N/A

### Context
Our API application started with initial celery pool support of 'prefork' (the default) and concurrency of 4. We continuously encountered instability, which we initially attributed to a resource leak. As a result of this we added the configuration `worker-max-tasks-per-child=500` which is a best practice. When we ran a load test of 25000 simulated messages, however, we continued to see stability issues, amounting to a crash of the app after 4 hours requiring a restage. Based on running `cf app notify-api-production` and observing that `cpu entitlement` was off the charts at 10000% to 12000% for the works, and after doing some further reading, we came to the conclusion that perhaps `prefork` pool support is not the best type of pool support for the API application.

The problem with `prefork` is that each process has a tendency to hang onto the CPU allocated to it, even if it is not being used. Our application is not computationally intensive and largely consists of downloading strings from S3, parsing the strings, and sending them out as SMS messages. Based on the determination that our app is likely I/O bound, we elected to do an experiment where we changed pool support to `threads` and increased concurrency to `10`. The expectation is that memory usage will decrease and CPU usage will decrease and the app will not become unavailable.

### Decision

### Consequences

### Author
@kenkehl

### Stakeholders
@ccostino
@stvnrlly

### Next Steps
- Run an after-hours load test with production configured to --pool=threads and --concurrency=10 (concurrency can be cautiously increased once we know it works)
2 changes: 1 addition & 1 deletion manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ applications:
- type: worker
instances: ((worker_instances))
memory: ((worker_memory))
command: newrelic-admin run-program celery -A run_celery.notify_celery worker --loglevel=INFO --concurrency=4
command: newrelic-admin run-program celery -A run_celery.notify_celery worker --loglevel=INFO --pool=threads --concurrency=10 --prefetch-multiplier=2
- type: scheduler
instances: 1
memory: ((scheduler_memory))
Expand Down
18 changes: 18 additions & 0 deletions tests/app/dao/notification_dao/test_notification_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from app import db
from app.dao.notifications_dao import (
dao_close_out_delivery_receipts,
dao_create_notification,
dao_delete_notifications_by_id,
dao_get_last_notification_added_for_job_id,
Expand Down Expand Up @@ -2030,6 +2031,23 @@ def test_update_delivery_receipts(mocker):
assert "provider_response" in kwargs


def test_close_out_delivery_receipts(mocker):
mock_session = mocker.patch("app.dao.notifications_dao.db.session")
mock_update = MagicMock()
mock_where = MagicMock()
mock_values = MagicMock()
mock_update.where.return_value = mock_where
mock_where.values.return_value = mock_values

mock_session.execute.return_value = None
with patch("app.dao.notifications_dao.update", return_value=mock_update):
dao_close_out_delivery_receipts()
mock_update.where.assert_called_once()
mock_where.values.assert_called_once()
mock_session.execute.assert_called_once_with(mock_values)
mock_session.commit.assert_called_once()


@pytest.mark.parametrize(
"created_at_utc,date_to_check,expected_count",
[
Expand Down

0 comments on commit fdf6158

Please sign in to comment.