Skip to content

Commit

Permalink
Merge pull request #1536 from GSA/main
Browse files Browse the repository at this point in the history
01/16/2025 Production Deploy
  • Loading branch information
ccostino authored Jan 17, 2025
2 parents ff10aa9 + e9206c3 commit e1601b7
Show file tree
Hide file tree
Showing 27 changed files with 424 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/daily_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
- name: Run scan
run: bandit -r app/ -f txt -o /tmp/bandit-output.txt --confidence-level medium
- name: Upload bandit artifact
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: bandit-report
path: /tmp/bandit-output.txt
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ run-celery: ## Run celery, TODO remove purge for staging/prod
-A run_celery.notify_celery worker \
--pidfile="/tmp/celery.pid" \
--loglevel=INFO \
--concurrency=4
--pool=threads
--concurrency=10


.PHONY: dead-code
Expand Down
16 changes: 15 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from werkzeug.exceptions import HTTPException as WerkzeugHTTPException
from werkzeug.local import LocalProxy

from app import config
from app.clients import NotificationProviderClients
from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient
from app.clients.document_download import DocumentDownloadClient
Expand Down Expand Up @@ -58,15 +59,28 @@ class SQLAlchemy(_SQLAlchemy):

def apply_driver_hacks(self, app, info, options):
sa_url, options = super().apply_driver_hacks(app, info, options)

if "connect_args" not in options:
options["connect_args"] = {}
options["connect_args"]["options"] = "-c statement_timeout={}".format(
int(app.config["SQLALCHEMY_STATEMENT_TIMEOUT"]) * 1000
)

return (sa_url, options)


db = SQLAlchemy()
# Set db engine settings here for now.
# They were not being set previous (despite environmental variables with appropriate
# sounding names) and were defaulting to low values
db = SQLAlchemy(
engine_options={
"pool_size": config.Config.SQLALCHEMY_POOL_SIZE,
"max_overflow": 10,
"pool_timeout": config.Config.SQLALCHEMY_POOL_TIMEOUT,
"pool_recycle": config.Config.SQLALCHEMY_POOL_RECYCLE,
"pool_pre_ping": True,
}
)
migrate = Migrate()
ma = Marshmallow()
notify_celery = NotifyCelery()
Expand Down
68 changes: 64 additions & 4 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from datetime import timedelta
import json
from datetime import datetime, timedelta

from flask import current_app
from sqlalchemy import between
from sqlalchemy.exc import SQLAlchemyError

from app import notify_celery, zendesk_client
from app import notify_celery, redis_store, zendesk_client
from app.celery.tasks import (
get_recipient_csv_and_template_and_sender_id,
process_incomplete_jobs,
Expand All @@ -24,6 +25,8 @@
find_missing_row_for_job,
)
from app.dao.notifications_dao import (
dao_batch_insert_notifications,
dao_close_out_delivery_receipts,
dao_update_delivery_receipts,
notifications_not_yet_sent,
)
Expand All @@ -33,7 +36,7 @@
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.enums import JobStatus, NotificationType
from app.models import Job
from app.models import Job, Notification
from app.notifications.process_notifications import send_notification_to_queue
from app.utils import utc_now
from notifications_utils import aware_utcnow
Expand Down Expand Up @@ -242,6 +245,8 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts"
)
def process_delivery_receipts(self):
# If we need to check db settings do it here for convenience
# current_app.logger.info(f"POOL SIZE {app.db.engine.pool.size()}")
"""
Every eight minutes or so (see config.py) we run this task, which searches the last ten
minutes of logs for delivery receipts and batch updates the db with the results. The overlap
Expand All @@ -256,7 +261,7 @@ def process_delivery_receipts(self):

cloudwatch = AwsCloudwatchClient()
cloudwatch.init_app(current_app)
start_time = aware_utcnow() - timedelta(minutes=10)
start_time = aware_utcnow() - timedelta(minutes=3)
end_time = aware_utcnow()
delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts(
start_time, end_time
Expand All @@ -278,3 +283,58 @@ def process_delivery_receipts(self):
current_app.logger.error(
"Failed process delivery receipts after max retries"
)


@notify_celery.task(
bind=True, max_retries=2, default_retry_delay=3600, name="cleanup-delivery-receipts"
)
def cleanup_delivery_receipts(self):
dao_close_out_delivery_receipts()


@notify_celery.task(bind=True, name="batch-insert-notifications")
def batch_insert_notifications(self):
batch = []

# TODO We probably need some way to clear the list if
# things go haywire. A command?

# with redis_store.pipeline():
# while redis_store.llen("message_queue") > 0:
# redis_store.lpop("message_queue")
# current_app.logger.info("EMPTY!")
# return
current_len = redis_store.llen("message_queue")
with redis_store.pipeline():
# since this list is being fed by other processes, just grab what is available when
# this call is made and process that.

count = 0
while count < current_len:
count = count + 1
notification_bytes = redis_store.lpop("message_queue")
notification_dict = json.loads(notification_bytes.decode("utf-8"))
notification_dict["status"] = notification_dict.pop("notification_status")
if not notification_dict.get("created_at"):
notification_dict["created_at"] = utc_now()
elif isinstance(notification_dict["created_at"], list):
notification_dict["created_at"] = notification_dict["created_at"][0]
notification = Notification(**notification_dict)
if notification is not None:
batch.append(notification)
try:
dao_batch_insert_notifications(batch)
except Exception:
current_app.logger.exception("Notification batch insert failed")
for n in batch:
# Use 'created_at' as a TTL so we don't retry infinitely
notification_time = n.created_at
if isinstance(notification_time, str):
notification_time = datetime.fromisoformat(n.created_at)
if notification_time < utc_now() - timedelta(seconds=50):
current_app.logger.warning(
f"Abandoning stale data, could not write to db: {n.serialize_for_redis(n)}"
)
continue
else:
redis_store.rpush("message_queue", json.dumps(n.serialize_for_redis(n)))
2 changes: 1 addition & 1 deletion app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
)
)
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)], queue=QueueNames.SEND_SMS
[str(saved_notification.id)], queue=QueueNames.SEND_SMS, countdown=60
)

current_app.logger.debug(
Expand Down
3 changes: 1 addition & 2 deletions app/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
"addressing_style": "virtual",
},
use_fips_endpoint=True,
# This is the default but just for doc sake
max_pool_connections=10,
max_pool_connections=50, # This should be equal or greater than our celery concurrency
)


Expand Down
11 changes: 11 additions & 0 deletions app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,17 @@ def _update_template(id, name, template_type, content, subject):
db.session.commit()


@notify_command(name="clear-redis-list")
@click.option("-n", "--name_of_list", required=True)
def clear_redis_list(name_of_list):
my_len_before = redis_store.llen(name_of_list)
redis_store.ltrim(name_of_list, 1, 0)
my_len_after = redis_store.llen(name_of_list)
current_app.logger.info(
f"Cleared redis list {name_of_list}. Before: {my_len_before} after {my_len_after}"
)


@notify_command(name="update-templates")
def update_templates():
with open(current_app.config["CONFIG_FILES"] + "/templates.json") as f:
Expand Down
14 changes: 12 additions & 2 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Config(object):
SQLALCHEMY_DATABASE_URI = cloud_config.database_url
SQLALCHEMY_RECORD_QUERIES = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 5))
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 40))
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = 300
SQLALCHEMY_STATEMENT_TIMEOUT = 1200
Expand Down Expand Up @@ -200,7 +200,17 @@ class Config(object):
},
"process-delivery-receipts": {
"task": "process-delivery-receipts",
"schedule": timedelta(minutes=8),
"schedule": timedelta(minutes=2),
"options": {"queue": QueueNames.PERIODIC},
},
"cleanup-delivery-receipts": {
"task": "cleanup-delivery-receipts",
"schedule": timedelta(minutes=82),
"options": {"queue": QueueNames.PERIODIC},
},
"batch-insert-notifications": {
"task": "batch-insert-notifications",
"schedule": 10.0,
"options": {"queue": QueueNames.PERIODIC},
},
"expire-or-delete-invitations": {
Expand Down
2 changes: 1 addition & 1 deletion app/dao/jobs_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def dao_get_job_by_service_id_and_job_id(service_id, job_id):

def dao_get_unfinished_jobs():
stmt = select(Job).filter(Job.processing_finished.is_(None))
return db.session.execute(stmt).all()
return db.session.execute(stmt).scalars().all()


def dao_get_jobs_by_service_id(
Expand Down
63 changes: 62 additions & 1 deletion app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
from datetime import timedelta
import os
from datetime import datetime, timedelta
from time import time

from flask import current_app
from sqlalchemy import (
Expand Down Expand Up @@ -94,6 +96,32 @@ def dao_create_notification(notification):
# notify-api-1454 insert only if it doesn't exist
if not dao_notification_exists(notification.id):
db.session.add(notification)
# There have been issues with invites expiring.
# Ensure the created at value is set and debug.
if notification.notification_type == "email":
orig_time = notification.created_at
now_time = utc_now()
try:
diff_time = now_time - orig_time
except TypeError:
try:
orig_time = datetime.strptime(orig_time, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
orig_time = datetime.strptime(orig_time, "%Y-%m-%d")
diff_time = now_time - orig_time
current_app.logger.error(
f"dao_create_notification orig created at: {orig_time} and now created at: {now_time}"
)
if diff_time.total_seconds() > 300:
current_app.logger.error(
"Something is wrong with notification.created_at in email!"
)
if os.getenv("NOTIFY_ENVIRONMENT") not in ["test"]:
notification.created_at = now_time
dao_update_notification(notification)
current_app.logger.error(
f"Email notification created_at reset to {notification.created_at}"
)


def country_records_delivery(phone_prefix):
Expand Down Expand Up @@ -727,6 +755,7 @@ def get_service_ids_with_notifications_on_date(notification_type, date):


def dao_update_delivery_receipts(receipts, delivered):
start_time_millis = time() * 1000
new_receipts = []
for r in receipts:
if isinstance(r, str):
Expand Down Expand Up @@ -773,3 +802,35 @@ def dao_update_delivery_receipts(receipts, delivered):
)
db.session.execute(stmt)
db.session.commit()
elapsed_time = (time() * 1000) - start_time_millis
current_app.logger.info(
f"#loadtestperformance batch update query time: \
updated {len(receipts)} notification in {elapsed_time} ms"
)


def dao_close_out_delivery_receipts():
THREE_DAYS_AGO = utc_now() - timedelta(minutes=3)
stmt = (
update(Notification)
.where(
Notification.status == NotificationStatus.PENDING,
Notification.sent_at < THREE_DAYS_AGO,
)
.values(status=NotificationStatus.FAILED, provider_response="Technical Failure")
)
result = db.session.execute(stmt)

db.session.commit()
if result:
current_app.logger.info(
f"Marked {result.rowcount} notifications as technical failures"
)


def dao_batch_insert_notifications(batch):

db.session.bulk_save_objects(batch)
db.session.commit()
current_app.logger.info(f"Batch inserted notifications: {len(batch)}")
return len(batch)
40 changes: 38 additions & 2 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import CheckConstraint, Index, UniqueConstraint
from sqlalchemy.dialects.postgresql import JSON, JSONB, UUID
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.declarative import DeclarativeMeta, declared_attr
from sqlalchemy.orm import validates
from sqlalchemy.orm.collections import attribute_mapped_collection

Expand Down Expand Up @@ -577,7 +577,16 @@ def get_inbound_number(self):
return self.inbound_number.number

def get_default_sms_sender(self):
default_sms_sender = [x for x in self.service_sms_senders if x.is_default]
# notify-api-1513 let's try a minimalistic fix
# to see if we can get the right numbers back
default_sms_sender = [
x
for x in self.service_sms_senders
if x.is_default and x.service_id == self.id
]
current_app.logger.info(
f"#notify-api-1513 senders for service {self.name} are {self.service_sms_senders}"
)
return default_sms_sender[0].sms_sender

def get_default_reply_to_email_address(self):
Expand Down Expand Up @@ -1685,6 +1694,33 @@ def get_created_by_email_address(self):
else:
return None

def serialize_for_redis(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
fields = {}
for column in obj.__table__.columns:
if column.name == "notification_status":
new_name = "status"
value = getattr(obj, new_name)
elif column.name == "created_at":
if isinstance(obj.created_at, str):
value = obj.created_at
else:
value = (obj.created_at.strftime("%Y-%m-%d %H:%M:%S"),)
elif column.name in ["sent_at", "completed_at"]:
value = None
elif column.name.endswith("_id"):
value = getattr(obj, column.name)
value = str(value)
else:
value = getattr(obj, column.name)
if column.name in ["message_id", "api_key_id"]:
pass # do nothing because we don't have the message id yet
else:
fields[column.name] = value

return fields
raise ValueError("Provided object is not a SQLAlchemy instance")

def serialize_for_csv(self):
serialized = {
"row_number": (
Expand Down
Loading

0 comments on commit e1601b7

Please sign in to comment.