Skip to content

Commit

Permalink
Merge pull request #233 from uktrade/LTD-4652-Restrict-concurrent-sen…
Browse files Browse the repository at this point in the history
…ding-of-emails

LTD-4652: Manage concurrent SMTP connections using a lock
  • Loading branch information
saruniitr authored Feb 19, 2024
2 parents 2aff0a3 + b3f57bf commit 558eba9
Show file tree
Hide file tree
Showing 11 changed files with 427 additions and 100 deletions.
2 changes: 2 additions & 0 deletions conf/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
app.autodiscover_tasks(related_name="celery_tasks")
app.autodiscover_tasks(["core"], related_name="celery_tasks")

# Also allow messages that are serialized/deserialized using pickle
app.conf.accept_content = ["json", "pickle"]

# Define any regular scheduled tasks

Expand Down
8 changes: 8 additions & 0 deletions conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ def _build_redis_url(base_url, db_number, **query_args):
CELERY_BROKER_URL = _build_redis_url(REDIS_BASE_URL, REDIS_CELERY_DB, **url_args)
CELERY_RESULT_BACKEND = CELERY_BROKER_URL


CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.redis.RedisCache",
"LOCATION": REDIS_BASE_URL,
}
}

CELERY_TASK_ALWAYS_EAGER = env.bool("CELERY_TASK_ALWAYS_EAGER", False)
CELERY_TASK_STORE_EAGER_RESULT = env.bool("CELERY_TASK_STORE_EAGER_RESULT", False)
CELERY_TASK_SEND_SENT_EVENT = env.bool("CELERY_TASK_SEND_SENT_EVENT", True)
178 changes: 147 additions & 31 deletions mail/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
import time
import urllib.parse
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

from smtplib import SMTPException
from typing import List, MutableMapping, Tuple

from celery import Task, shared_task
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from django.utils import timezone
from rest_framework.status import HTTP_207_MULTI_STATUS, HTTP_208_ALREADY_REPORTED

from mail import requests as mail_requests
from mail.enums import ReceptionStatusEnum, SourceEnum
from mail.libraries.builders import build_licence_data_mail
from mail.libraries.builders import build_email_message, build_licence_data_mail, build_licence_rejected_email_message
from mail.libraries.data_processors import build_request_mail_message_dto
from mail.libraries.routing_controller import check_and_route_emails, send, update_mail
from mail.libraries.email_message_dto import EmailMessageDto
from mail.libraries.lite_to_edifact_converter import EdifactValidationError
from mail.libraries.routing_controller import check_and_route_emails, update_mail
from mail.libraries.usage_data_decomposition import build_json_payload_from_data_blocks, split_edi_data_by_id
from mail.models import LicenceIdMapping, LicencePayload, Mail, UsageData
from mail.servers import smtp_send

logger = get_task_logger(__name__)


# Send Usage Figures to LITE API
def get_lite_api_url():
"""The URL for the licence usage callback, from the LITE_API_URL setting.
Expand Down Expand Up @@ -75,38 +80,106 @@ def _log_error(message, lite_usage_data_id):

MAX_ATTEMPTS = 3
RETRY_BACKOFF = 180
LOCK_EXPIRE = 60 * 10 # secs (10 min)
CELERY_SEND_LICENCE_UPDATES_TASK_NAME = "mail.celery_tasks.send_licence_details_to_hmrc"
CELERY_MANAGE_INBOX_TASK_NAME = "mail.celery_tasks.manage_inbox"


# Notify Users of Rejected Mail
class SMTPConnectionBusy(SMTPException):
pass


@contextmanager
def cache_lock(lock_id):
timeout_at = time.monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists.
# return True if lock is acquired, False otherwise
status = cache.add(lock_id, "lock_acquired", LOCK_EXPIRE)
try:
yield status
finally:
if time.monotonic() < timeout_at and status:
cache.delete(lock_id)


class SendEmailBaseTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
message = """
Maximum attempts for send_email_task exceeded - the task has failed and needs manual inspection.
Args: %s
""" % (
args,
)

# Log the final failure message
logger.critical(message)


@shared_task(
autoretry_for=(SMTPException,),
autoretry_for=(SMTPConnectionBusy, SMTPException),
max_retries=MAX_ATTEMPTS,
retry_backoff=RETRY_BACKOFF,
base=SendEmailBaseTask,
)
def send_email_task(message):
"""
Main purpose of this task is to send email.
We use SMTP to send emails. As we process messages we have a requirement to
send emails from multiple places, because of this we may open multiple SMTP connections.
This results in error if the number of concurrent connections exceed maximum allowed value.
To manage this reliably we are restricting access to this shared resource using a lock.
This is achieved by deferring all email sending functionality to this task.
Before sending email it first tries to acquire a lock.
- If there are no active connections then it acquires lock and sends email.
In some cases we need to update state which is handled in subtask linked to this task.
- If there is active connection (lock acquisition fails) then it raises an exception
which triggers a retry.
If all retries fail then manual intervention may be required (unlikely)
"""

global_lock_id = "global_send_email_lock"

with cache_lock(global_lock_id) as lock_acquired:
if not lock_acquired:
logger.exception("Another SMTP connection is active, will be retried after backing off")
raise SMTPConnectionBusy()

logger.info("Lock acquired, proceeding to send email")

try:
smtp_send(message)
except SMTPException:
logger.exception("An unexpected error occurred when sending email -> %s")
raise

logger.info("Email sent successfully")


# Notify Users of Rejected Mail
@shared_task
def notify_users_of_rejected_licences(mail_id, mail_response_subject):
"""If a reply is received with rejected licences this task notifies users of the rejection"""

logger.info("Notifying users of rejected licences found in mail with subject %s", mail_response_subject)

try:
multipart_msg = MIMEMultipart()
multipart_msg["From"] = settings.EMAIL_USER
multipart_msg["To"] = ",".join(settings.NOTIFY_USERS)
multipart_msg["Subject"] = "Licence rejected by HMRC"
body = MIMEText(f"Mail (Id: {mail_id}) with subject {mail_response_subject} has rejected licences")
multipart_msg.attach(body)

smtp_send(multipart_msg)

except SMTPException:
logger.exception(
"An unexpected error occurred when notifying users of rejected licences, Mail Id: %s, subject: %s",
mail_id,
mail_response_subject,
)
raise
message_dto = EmailMessageDto(
run_number=None,
sender=settings.EMAIL_USER,
receiver=",".join(settings.NOTIFY_USERS),
date=timezone.now(),
subject="Licence rejected by HMRC",
body=f"Mail (Id: {mail_id}) with subject {mail_response_subject} has rejected licences",
attachment=None,
raw_data=None,
)
message = build_licence_rejected_email_message(message_dto)

send_email_task.apply_async(
args=(message,),
serializer="pickle",
)

logger.info("Successfully notified users of rejected licences found in mail with subject %s", mail_response_subject)

Expand All @@ -125,10 +198,52 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error(message)


@shared_task
def finalise_sending_spire_licence_details(mail_id, message_dto):
"""Subtask that performs follow-up tasks after completing the primary purpose
of sending an email"""

mail = Mail.objects.get(id=mail_id)
message_dto = EmailMessageDto(*message_dto)

update_mail(mail, message_dto)


@shared_task
def finalise_sending_lite_licence_details(mail_id, message_dto, licence_payload_ids):
"""Subtask that performs follow-up tasks after completing the primary purpose
of sending an email"""

mail = Mail.objects.get(id=mail_id)
message_dto = EmailMessageDto(*message_dto)

update_mail(mail, message_dto)

licence_payloads = LicencePayload.objects.filter(id__in=licence_payload_ids, is_processed=False)
references = [item.reference for item in licence_payloads]

licence_payloads.update(is_processed=True)

logger.info("Licence payloads with references %s marked as processed", references)


class SendLicenceDetailsBaseTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
message = """
Maximum attempts for send_licence_details_to_hmrc task exceeded - the task has failed and needs manual inspection.
Args: %s
""" % (
args,
)
logger.error(message)


@shared_task(
autoretry_for=(SMTPException,),
autoretry_for=(EdifactValidationError,),
max_retries=MAX_ATTEMPTS,
retry_backoff=RETRY_BACKOFF,
base=SendLicenceDetailsBaseTask,
)
def send_licence_details_to_hmrc():
"""Sends LITE issued licence details to HMRC"""
Expand Down Expand Up @@ -162,14 +277,15 @@ def send_licence_details_to_hmrc():
"Created licenceData mail with subject %s for licences [%s]", mail_dto.subject, licence_references
)

send(mail_dto)
update_mail(mail, mail_dto)

# Mark the payloads as processed
licences.update(is_processed=True)
logger.info("Licence references [%s] marked as processed", licence_references)
message = build_email_message(mail_dto)
licence_payload_ids = [str(licence.id) for licence in licences]
send_email_task.apply_async(
args=(message,),
serializer="pickle",
link=finalise_sending_lite_licence_details.si(mail.id, mail_dto, licence_payload_ids),
)

except SMTPException:
except EdifactValidationError:
logger.exception("An unexpected error occurred when sending LITE licence updates to HMRC -> %s")
raise

Expand Down
14 changes: 14 additions & 0 deletions mail/libraries/builders.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import json
import logging

from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
Expand Down Expand Up @@ -274,6 +275,19 @@ def build_email_message(email_message_dto: EmailMessageDto) -> MIMEMultipart:
return multipart_msg


def build_licence_rejected_email_message(email_message_dto: EmailMessageDto) -> MIMEMultipart:
logger.info("Building licences rejected notification email message")
multipart_msg = MIMEMultipart()
multipart_msg["From"] = settings.EMAIL_USER
multipart_msg["To"] = ",".join(settings.NOTIFY_USERS)
multipart_msg["Subject"] = email_message_dto.subject
multipart_msg["name"] = email_message_dto.subject
body = MIMEText(email_message_dto.body)
multipart_msg.attach(body)
logger.info("Message headers: %s", multipart_msg.items())
return multipart_msg


def _validate_dto(email_message_dto):
if email_message_dto is None:
raise TypeError("None email_message_dto received!")
Expand Down
13 changes: 10 additions & 3 deletions mail/libraries/routing_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def send(email_message_dto: EmailMessageDto):


def _collect_and_send(mail: Mail):
from mail.celery_tasks import send_email_task, finalise_sending_spire_licence_details

logger.info("Sending Mail [%s] of extract type %s", mail.id, mail.extract_type)

message_to_send_dto = to_email_message_dto_from(mail)
Expand All @@ -185,11 +187,16 @@ def _collect_and_send(mail: Mail):

if message_to_send_dto:
if message_to_send_dto.receiver != SourceEnum.LITE and message_to_send_dto.subject:
send(message_to_send_dto)
update_mail(mail, message_to_send_dto)
message = build_email_message(message_to_send_dto)
# Schedule a task to send email
send_email_task.apply_async(
args=(message,),
serializer="pickle",
link=finalise_sending_spire_licence_details.si(mail.id, message_to_send_dto),
)

logger.info(
"Mail [%s] routed from [%s] to [%s] with subject %s",
"Scheduled sending of mail [%s] from [%s] to [%s] with subject %s",
mail.id,
message_to_send_dto.sender,
message_to_send_dto.receiver,
Expand Down
Loading

0 comments on commit 558eba9

Please sign in to comment.