Skip to content

Commit

Permalink
Merge pull request #1486 from GSA/notify-api-1465
Browse files Browse the repository at this point in the history
switch to procedural approach for delivery receipts
  • Loading branch information
ccostino authored Dec 23, 2024
2 parents 6077be4 + bb40c1d commit d5e1008
Show file tree
Hide file tree
Showing 9 changed files with 10,230 additions and 329 deletions.
103 changes: 3 additions & 100 deletions app/celery/provider_tasks.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,19 @@
import json
import os
from datetime import timedelta

from botocore.exceptions import ClientError
from flask import current_app
from sqlalchemy.orm.exc import NoResultFound

from app import aws_cloudwatch_client, notify_celery, redis_store
from app import notify_celery, redis_store
from app.clients.email import EmailClientNonRetryableException
from app.clients.email.aws_ses import AwsSesClientThrottlingSendRateException
from app.clients.sms import SmsClientResponseException
from app.config import Config, QueueNames
from app.dao import notifications_dao
from app.dao.notifications_dao import (
sanitize_successful_notification_by_id,
update_notification_status_by_id,
)
from app.dao.notifications_dao import update_notification_status_by_id
from app.delivery import send_to_providers
from app.enums import NotificationStatus
from app.exceptions import NotificationTechnicalFailureException
from app.utils import utc_now

# This is the amount of time to wait after sending an sms message before we check the aws logs and look for delivery
# receipts
DELIVERY_RECEIPT_DELAY_IN_SECONDS = 30


@notify_celery.task(
bind=True,
name="check_sms_delivery_receipt",
max_retries=48,
default_retry_delay=300,
)
def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
"""
This is called after deliver_sms to check the status of the message. This uses the same number of
retries and the same delay period as deliver_sms. In addition, this fires five minutes after
deliver_sms initially. So the idea is that most messages will succeed and show up in the logs quickly.
Other message will resolve successfully after a retry or to. A few will fail but it will take up to
4 hours to know for sure. The call to check_sms will raise an exception if neither a success nor a
failure appears in the cloudwatch logs, so this should keep retrying until the log appears, or until
we run out of retries.
"""
# TODO the localstack cloudwatch doesn't currently have our log groups. Possibly create them with awslocal?
if aws_cloudwatch_client.is_localstack():
status = "success"
provider_response = "this is a fake successful localstack sms message"
carrier = "unknown"
else:
try:
status, provider_response, carrier = aws_cloudwatch_client.check_sms(
message_id, notification_id, sent_at
)
except NotificationTechnicalFailureException as ntfe:
provider_response = "Unable to find carrier response -- still looking"
status = "pending"
carrier = ""
update_notification_status_by_id(
notification_id,
status,
carrier=carrier,
provider_response=provider_response,
)
raise self.retry(exc=ntfe)
except ClientError as err:
# Probably a ThrottlingException but could be something else
error_code = err.response["Error"]["Code"]
provider_response = (
f"{error_code} while checking sms receipt -- still looking"
)
status = "pending"
carrier = ""
update_notification_status_by_id(
notification_id,
status,
carrier=carrier,
provider_response=provider_response,
)
raise self.retry(exc=err)

if status == "success":
status = NotificationStatus.DELIVERED
elif status == "failure":
status = NotificationStatus.FAILED
# if status is not success or failure the client raised an exception and this method will retry

if status == NotificationStatus.DELIVERED:
sanitize_successful_notification_by_id(
notification_id, carrier=carrier, provider_response=provider_response
)
current_app.logger.info(
f"Sanitized notification {notification_id} that was successfully delivered"
)
else:
update_notification_status_by_id(
notification_id,
status,
carrier=carrier,
provider_response=provider_response,
)
current_app.logger.info(
f"Updated notification {notification_id} with response '{provider_response}'"
)


@notify_celery.task(
Expand All @@ -127,17 +39,8 @@ def deliver_sms(self, notification_id):
ansi_green + f"AUTHENTICATION CODE: {notification.content}" + ansi_reset
)
# Code branches off to send_to_providers.py
message_id = send_to_providers.send_sms_to_provider(notification)
send_to_providers.send_sms_to_provider(notification)

# DEPRECATED
# We have to put it in UTC. For other timezones, the delay
# will be ignored and it will fire immediately (although this probably only affects developer testing)
my_eta = utc_now() + timedelta(seconds=DELIVERY_RECEIPT_DELAY_IN_SECONDS)
check_sms_delivery_receipt.apply_async(
[message_id, notification_id, notification.created_at],
eta=my_eta,
queue=QueueNames.CHECK_SMS,
)
except Exception as e:
update_notification_status_by_id(
notification_id,
Expand Down
49 changes: 48 additions & 1 deletion app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
process_job,
process_row,
)
from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient
from app.config import QueueNames
from app.dao.invited_org_user_dao import (
delete_org_invitations_created_more_than_two_days_ago,
Expand All @@ -22,7 +23,10 @@
find_jobs_with_missing_rows,
find_missing_row_for_job,
)
from app.dao.notifications_dao import notifications_not_yet_sent
from app.dao.notifications_dao import (
dao_update_delivery_receipts,
notifications_not_yet_sent,
)
from app.dao.services_dao import (
dao_find_services_sending_to_tv_numbers,
dao_find_services_with_high_failure_rates,
Expand All @@ -32,6 +36,7 @@
from app.models import Job
from app.notifications.process_notifications import send_notification_to_queue
from app.utils import utc_now
from notifications_utils import aware_utcnow
from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTicket

MAX_NOTIFICATION_FAILS = 10000
Expand Down Expand Up @@ -231,3 +236,45 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
technical_ticket=True,
)
zendesk_client.send_ticket_to_zendesk(ticket)


@notify_celery.task(
bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts"
)
def process_delivery_receipts(self):
"""
Every eight minutes or so (see config.py) we run this task, which searches the last ten
minutes of logs for delivery receipts and batch updates the db with the results. The overlap
is intentional. We don't mind re-updating things, it is better than losing data.
We also set this to retry with exponential backoff in the case of failure. The only way this would
fail is if, for example the db went down, or redis filled causing the app to stop processing. But if
it does fail, we need to go back over at some point when things are running again and process those results.
"""
try:
batch_size = 1000 # in theory with postgresql this could be 10k to 20k?

cloudwatch = AwsCloudwatchClient()
cloudwatch.init_app(current_app)
start_time = aware_utcnow() - timedelta(minutes=10)
end_time = aware_utcnow()
delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts(
start_time, end_time
)
delivered_receipts = list(delivered_receipts)
for i in range(0, len(delivered_receipts), batch_size):
batch = delivered_receipts[i : i + batch_size]
dao_update_delivery_receipts(batch, True)
failed_receipts = list(failed_receipts)
for i in range(0, len(failed_receipts), batch_size):
batch = failed_receipts[i : i + batch_size]
dao_update_delivery_receipts(batch, False)
except Exception as ex:
retry_count = self.request.retries
wait_time = 3600 * 2**retry_count
try:
raise self.retry(ex=ex, countdown=wait_time)
except self.MaxRetriesExceededError:
current_app.logger.error(
"Failed process delivery receipts after max retries"
)
Loading

0 comments on commit d5e1008

Please sign in to comment.