Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api-1391: Rewriting the celery retry logic #1471

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions app/celery/process_ses_receipts_tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import timedelta

import iso8601
from celery.exceptions import Retry
from flask import current_app, json
from sqlalchemy.orm.exc import NoResultFound

Expand All @@ -26,7 +25,12 @@


@notify_celery.task(
bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300
bind=True,
name="process-ses-result",
autoretry_for=(Exception,),
# throws=(Exception,), # Been attempted, did nothing.
max_retries=5,
default_retry_delay=300,
)
def process_ses_results(self, response):
try:
Expand Down Expand Up @@ -65,7 +69,7 @@ def process_ses_results(self, response):
f"Callback may have arrived before notification was"
f"persisted to the DB. Adding task to retry queue"
)
self.retry(queue=QueueNames.RETRY)
raise
else:
current_app.logger.warning(
f"Notification not found for reference: {reference} "
Expand Down Expand Up @@ -95,27 +99,25 @@ def process_ses_results(self, response):

if not aws_response_dict["success"]:
current_app.logger.info(
"SES delivery failed: notification id {} and reference {} has error found. Status {}".format(
notification.id, reference, aws_response_dict["message"]
)
f"SES delivery failed: notification id {notification.id} and reference "
f"{reference} has error found. Status {aws_response_dict['message']}"
)
else:
current_app.logger.info(
"SES callback return status of {} for notification: {}".format(
notification_status, notification.id
)
f"SES callback return status of {notification_status} "
f"for notification: {notification.id}"
)

check_and_queue_callback_task(notification)

return True

except Retry:
raise

except Exception:
except Exception as e:
print("Exception REACHED")
print(type(e))
print(e)
current_app.logger.exception("Error processing SES results")
self.retry(queue=QueueNames.RETRY)
raise


def determine_notification_bounce_type(ses_message):
Expand Down
130 changes: 88 additions & 42 deletions app/celery/provider_tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import logging
import os
from datetime import timedelta
from functools import wraps

from botocore.exceptions import ClientError
from flask import current_app
Expand Down Expand Up @@ -31,6 +33,7 @@
name="check_sms_delivery_receipt",
max_retries=48,
default_retry_delay=300,
autoretry_for=(NotificationTechnicalFailureException, ClientError),
)
def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
"""
Expand All @@ -52,7 +55,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
status, provider_response, carrier = aws_cloudwatch_client.check_sms(
message_id, notification_id, sent_at
)
except NotificationTechnicalFailureException as ntfe:
except NotificationTechnicalFailureException:
provider_response = "Unable to find carrier response -- still looking"
status = "pending"
carrier = ""
Expand All @@ -62,7 +65,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
carrier=carrier,
provider_response=provider_response,
)
raise self.retry(exc=ntfe)
raise
except ClientError as err:
# Probably a ThrottlingException but could be something else
error_code = err.response["Error"]["Code"]
Expand All @@ -77,7 +80,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
carrier=carrier,
provider_response=provider_response,
)
raise self.retry(exc=err)
raise

if status == "success":
status = NotificationStatus.DELIVERED
Expand All @@ -104,8 +107,40 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
)


def _deliver_sms_task_handler(cls):
"""Handle the max retries exceeded error case for delivering sms notifications."""

func = cls.__call__

@wraps(func)
def deliver_sms_task_wrapper(self, notification_id):
try:
return func(self, notification_id)
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(
notification_id
)
)
update_notification_status_by_id(
notification_id,
NotificationStatus.TECHNICAL_FAILURE,
)
raise NotificationTechnicalFailureException(message)

cls.__call__ = deliver_sms_task_wrapper

return cls


@_deliver_sms_task_handler
@notify_celery.task(
bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300
bind=True,
name="deliver_sms",
max_retries=48,
default_retry_delay=300,
autoretry_for=(Exception,),
)
def deliver_sms(self, notification_id):
"""Branch off to the final step in delivering the notification to sns and get delivery receipts."""
Expand Down Expand Up @@ -141,36 +176,57 @@ def deliver_sms(self, notification_id):
notification_id,
NotificationStatus.TEMPORARY_FAILURE,
)

if isinstance(e, SmsClientResponseException):
current_app.logger.warning(
"SMS notification delivery for id: {} failed".format(notification_id),
)
log_lvl = logging.WARNING
log_exc_info = False
else:
current_app.logger.exception(
"SMS notification delivery for id: {} failed".format(notification_id),
)
log_lvl = logging.ERROR
log_exc_info = True

current_app.logger.log(
level=log_lvl,
msg=f"SMS notification delivery for id: {notification_id} failed",
exc_info=log_exc_info,
)

raise


def _deliver_email_task_handler(cls):
"""Handle the max retries exceeded error case for delivering email notifications."""

func = cls.__call__

@wraps(func)
def deliver_email_task_wrapper(self, notification_id):
try:
if self.request.retries == 0:
self.retry(queue=QueueNames.RETRY, countdown=0)
else:
self.retry(queue=QueueNames.RETRY)
return func(self, notification_id)
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(
notification_id
)
"RETRY FAILED: Max retries reached. "
f"The task send_email_to_provider failed for notification {notification_id}. "
"Notification has been updated to technical-failure"
)
update_notification_status_by_id(
notification_id,
NotificationStatus.TECHNICAL_FAILURE,
)
raise NotificationTechnicalFailureException(message)

cls.__call__ = deliver_email_task_wrapper

return cls


@_deliver_email_task_handler
@notify_celery.task(
bind=True, name="deliver_email", max_retries=48, default_retry_delay=30
bind=True,
name="deliver_email",
max_retries=48,
default_retry_delay=30,
autoretry_for=(Exception,),
dont_autoretry_for=(EmailClientNonRetryableException,),
)
def deliver_email(self, notification_id):
try:
Expand All @@ -191,29 +247,19 @@ def deliver_email(self, notification_id):
send_to_providers.send_email_to_provider(notification)
except EmailClientNonRetryableException:
current_app.logger.exception(f"Email notification {notification_id} failed")
update_notification_status_by_id(notification_id, "technical-failure")
update_notification_status_by_id(
notification_id,
NotificationStatus.TECHNICAL_FAILURE,
)
raise
except Exception as e:
try:
if isinstance(e, AwsSesClientThrottlingSendRateException):
current_app.logger.warning(
f"RETRY: Email notification {notification_id} was rate limited by SES"
)
else:
current_app.logger.exception(
f"RETRY: Email notification {notification_id} failed"
)

self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. "
"The task send_email_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(
notification_id
)
if isinstance(e, AwsSesClientThrottlingSendRateException):
current_app.logger.warning(
f"RETRY: Email notification {notification_id} was rate limited by SES"
)
update_notification_status_by_id(
notification_id,
NotificationStatus.TECHNICAL_FAILURE,
else:
current_app.logger.exception(
f"RETRY: Email notification {notification_id} failed"
)
raise NotificationTechnicalFailureException(message)

raise
Loading
Loading