Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new notification dispatcher mechanism #975

Open
wants to merge 32 commits into
base: stable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
78d398b
feat: add new notification dispatcher mechanism
linuxbandit Apr 13, 2023
67693ff
feat: add retry if template not found
linuxbandit Apr 14, 2023
5f9f437
chore: add docker setup
linuxbandit Apr 17, 2023
ba33f79
chore: declare queue only once
linuxbandit Apr 19, 2023
b0cffbe
feat: make persistent queue and message
linuxbandit Apr 19, 2023
d9c689d
chore: add explicit exchange as per pattern
linuxbandit Apr 19, 2023
ce89fb2
chore: make exchange persistent and add route key
linuxbandit Apr 19, 2023
5019cb2
feat: add full templates tester, add templates
linuxbandit Apr 19, 2023
ff69624
chore: optimise docker image building; add volumes
linuxbandit Apr 19, 2023
ae97e74
chore: correct core templates
linuxbandit Apr 19, 2023
4a0fddf
feat: add SU templates. Also add macros.jinja2
linuxbandit Apr 19, 2023
cb9c695
chore: remove comment
linuxbandit Apr 19, 2023
bae8aa9
chore: add all templates for SU, working
linuxbandit Apr 21, 2023
415f653
chore: add events templates
linuxbandit Apr 21, 2023
a7b9b1f
chore: add stuff for statutory
linuxbandit Apr 24, 2023
74ee32e
chore: add last templates
linuxbandit Apr 25, 2023
ef03dd6
chore: add checker for thorough testing
linuxbandit Apr 25, 2023
de2f6fc
docs: add dockerisation
linuxbandit Apr 25, 2023
1d63fda
chore: forgot another update
linuxbandit Apr 25, 2023
c435822
chore: add quickstart
linuxbandit Apr 25, 2023
f35b247
fix: correct make test
linuxbandit Apr 25, 2023
26a277b
chore: add extra checks for errors during render
linuxbandit Apr 25, 2023
262580b
feat: add requeue with a delay
linuxbandit Jul 7, 2023
a174fa9
chore: correct template, add comments
linuxbandit Aug 2, 2023
c0da07b
feat: requeue with dynamic delay
linuxbandit Aug 2, 2023
bdf6d32
chore: make USER non-root
linuxbandit Aug 5, 2023
fcc5950
chore: differentiate between environments
linuxbandit Sep 20, 2023
7a90520
chore: add everything into a main to be modular
linuxbandit Sep 20, 2023
526bcd8
chore: add loglevels
linuxbandit Sep 20, 2023
c405a67
chore: increase clarity of incremental retry block
linuxbandit Sep 21, 2023
083b1ad
feat: add notification system
linuxbandit Sep 21, 2023
89e4f47
chore: apply suggestions from code review for templates
linuxbandit Sep 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ SENDGRID_SMTP_HOST=smtp.sendgrid.net
SENDGRID_USER=
SENDGRID_PW=5ecr3t

ETHEREAL_SMTP_HOST=smtp.ethereal.email
ETHEREAL_PORT=587
ETHEREAL_USER=
ETHEREAL_PW=

ELASTIC_HOST=https://example.com:9243
ELASTIC_USER=5ecr3t
ELASTIC_PASSWORD=5ecr3t
Expand Down Expand Up @@ -78,6 +83,8 @@ SUBDOMAIN_SURVEY=survey.
SUBDOMAIN_UPMONITOR=ciao.
SUBDOMAIN_STATPING=ping.
SUBDOMAIN_APIDOCS=apidocs.
SUBDOMAIN_RABBITMQ=rabbit.
SUBDOMAIN_MAILHOG=mailhog.
SUBDOMAIN_REDISADMIN=redis.

# Paths in the filesystem
Expand All @@ -94,6 +101,7 @@ PATH_KNOWLEDGE=knowledge/docker/
PATH_PASS_MANAGER=pass-manager/docker/
PATH_MONITOR=monitor/docker/
PATH_GSUITE_WRAPPER=gsuite-wrapper/docker/
PATH_DISPATCHER=dispatcher/docker/
PATH_OMS_ELASTIC=oms-elastic/docker/

# Other variables
Expand Down
17 changes: 17 additions & 0 deletions dispatcher/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

default: build-dev

build-dev:
BASE_URL=appserver.vgr SUBDOMAIN_RABBITMQ=rabbit. SUBDOMAIN_MAILHOG=mailhog. PATH_DISPATCHER=./ COMPOSE_PROJECT_NAME=myaegee docker-compose -f ./docker/docker-compose.yml -f ./docker/docker-compose.dev.yml up -d

build:
BASE_URL=appserver.vgr SUBDOMAIN_RABBITMQ=rabbit. COMPOSE_PROJECT_NAME=myaegee docker-compose -f ./docker/docker-compose.yml up -d

down:
COMPOSE_PROJECT_NAME=myaegee docker-compose -f ./docker/docker-compose.yml -f ./docker/docker-compose.dev.yml down

test: build-dev
@sleep 5; sed -i "s/'172.18.0.X'/$$(docker inspect myaegee_rabbit_1 | jq .[0].NetworkSettings.Networks.OMS.IPAddress)/" helpers/send.py ; cd helpers && python send.py

rabbit:
@DOCKER_BUILDKIT=0 docker build -t aegee/rabbit:latest -f docker/Dockerfile.rabbit . && docker push aegee/rabbit
58 changes: 58 additions & 0 deletions dispatcher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Dispatcher

Polls rabbitMQ and takes action (sends mail).

## How to run it
### Pre-requisites
In the Vagrant's appserver VM, the packages are installed globally:
1. faker
1. jinja2
1. pika

Otherwise use poetry to install dependencies and launch the virtual environment.

### Launching it and testing it
In a console, run `python3 dispatcher/main.py`.
In another console, run `python3 helpers/send.py`.
Control on `appserver.test:8025` the emails sent. It is also possible to control rabbit's stats on `appserver.test:8080`

This method is now dockerised. Using the docker way is useful for the 'DNS' feature of docker (i.e. not hardcoding the IP address of the rabbit host)

### Rationale
We do not need a web service for this, only a worker. Doing it this way only means it cannot be scaled (unless precautions are taken for the ack of the message, but pika should already give this out of the box).
In order to add templates, one can work on the filesystem: as the template file is read from memory at the time a message is received, there is basically a mechanism of hot-reload ready to be used.

We do not need a web service, because we do not need to pilot anything.

## Queues

Current queues:
1. email

Queues envisioned:
1. email
1. telegram
1. slack (If EBs have enabled it)
1. webgui (handled by vue, NOT by this program)

## TODOs and next steps
rather in order:

1. [x] (not on this project): run core with the email as 'inserting in the queue' instead of 'API request to mailer'
1. [?] (not on this project): run core with the email as 'exchange' instead of 'inserting in the queue'
1. [x] include traefik configuration to have the mailhog and rabbit on a subdomain instead of `domain:port`
1. [x] When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to: we need to mark both the queue and messages as durable
1. [ ] Add auto-retry (DLQ). rabbit is smart and doesn't let me process a message again unless i force it.. https://devcorner.digitalpress.blog/rabbitmq-retries-the-new-full-story/
1. [ ] add the telegram queue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to have users interact with the Telegram queue initially already or is it first only for us to get monitoring alerts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which way interact?
yes at first i was thinking of using it for us, to see how it scales in small numbers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interact was not the correct word. I just meant receiving messages

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say at the beginning is for us

1. investigate the massmailer queue: a queue which picks every message, and creates a list of "bcc" to send only one email? (danger: queue needs something like batch ack..) - OR it is not feasible at all because "mass"mailer is still "personalised" mailer?

1. why do we even have a `<`title`>` (which is dynamic), why not using directly the subject? (re: the body of the email)
1. remove extension Jinja2 (into jinja)
1. make it such that templates list is read from fs (for dynamic tests)



https://www.rabbitmq.com/publishers.html#unroutable


Each consumer (subscription) has an identifier called a consumer tag. It can be used to unsubscribe from messages. Consumer tags are just strings.
Empty file.
283 changes: 283 additions & 0 deletions dispatcher/dispatcher/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
import pika
import json
import smtplib
from email.message import EmailMessage
from jinja2 import Environment, FileSystemLoader, exceptions
import os
import sys
import logging
from notify import slack_alert

"""
continuously polls(*) the email queue and renders+sends the template on every acked message
(*) = waits for the queue to push a message onto the app
"""

ONCALL_HANDLER = "@grasshopper"

def connect_to_smtp():
global smtpObj

EMAIL_HOST='mailhog'
EMAIL_PORT=1025
EMAIL_ADDRESS=None
EMAIL_PASSWORD=None

if env == 'production':
EMAIL_HOST= os.environ.get("EMAIL_HOST")
EMAIL_PORT= os.environ.get("EMAIL_PORT")
EMAIL_ADDRESS= os.environ.get("EMAIL_ADDRESS")
EMAIL_PASSWORD= os.environ.get("EMAIL_PASSWORD")

try:
smtpObj = smtplib.SMTP( EMAIL_HOST, EMAIL_PORT )

if env == 'production':
# we have to upgrade the connection and login
# (At least with ethereal.email.. didn't try with gmail!) #TODO
smtpObj.starttls()
smtpObj.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
logging.info(" -> Connected")
except smtplib.SMTPConnectError:
logging.error("Could not connect to the SMTP server.")
except smtplib.SMTPAuthenticationError:
logging.error("Failed to authenticate with given credentials.")
except Exception as e:
logging.error(f"Could not connect to SMTP server for generic reason: {e}")

def requeue_wait(ch, method, properties, body, reason):
REQUEUE_DELAY_DURATIONS = [
5 * 60000, # 5 mins
50 * 60000, # 50 mins
5*60 * 60000, # 5 hrs
5*60*10 * 60000, # 50 hrs
5*60*20 * 60000, # 100 hrs
]

current_delay = properties.headers.get("x-delay") if properties.headers else 0
try:
index = REQUEUE_DELAY_DURATIONS.index(int(current_delay))
except ValueError:
index = -1

next_index = index + 1

if next_index >= len(REQUEUE_DELAY_DURATIONS):
logging.warning('Max retry time hit, dropping message')
slack_alert(f"Time over, a message was dropped ({reason})", submessage = ":poop:")
ch.basic_ack(delivery_tag=method.delivery_tag)
return

wait = REQUEUE_DELAY_DURATIONS[next_index]
retry_message = f'Retry attempt {next_index + 1}/{len(REQUEUE_DELAY_DURATIONS)} will happen in {int(wait/1000)} sec'
logging.info(retry_message)
last_chance = ''
if next_index + 1 == len(REQUEUE_DELAY_DURATIONS):
last_chance = f'-- LAST ATTEMPT TO FIX: within {int(wait/1000)} sec' + f' {ONCALL_HANDLER}'
logging.error(last_chance)
slack_alert(f"A template is missing! ({reason})",
submessage = retry_message + " " + last_chance
)

headers = {
'reason': reason,
'x-delay': wait,
}
prop = pika.BasicProperties(
headers=headers,
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE,
)
channel.basic_publish(exchange='wait_exchange',
routing_key='wait',
body=body,
properties=prop) #NOTE it completely ignores the previous properties (and it's fine)
ch.basic_ack(delivery_tag=method.delivery_tag)

def send_email(ch, method, properties, body):
"""
Callback for the NORMAL MESSAGE
Output: send an email
OR
Output: Wait-exchange
"""
msg = json.loads(body)

try:
template = tpl_environment.get_template(f"{msg['template']}.jinja2")
except exceptions.TemplateNotFound:
logging.error(f"Template {msg['template']}.jinja2 not found")
# NOTE: this is a requeuable message
requeue_wait(ch, method, properties, body, reason=f"template_not_found-{msg['template']}")
return

try:
rendered = template.render(msg['parameters'], altro=msg['subject'])
except exceptions.UndefinedError as e:
logging.error(f"Error in rendering: some parameter is undefined (error: {e}; message: {msg})")
# NOTE: this is a NON-requeuable message
requeue_wait(ch, method, properties, body, reason="parameter_undefined")
return
except exceptions.TemplateNotFound:
logging.error(f"A sub-template in {msg['template']}.jinja2 was not found")
# NOTE: this is a requeuable message
requeue_wait(ch, method, properties, body, reason=f"subtemplate_not_found-{msg['template']}")
return

try:
email = EmailMessage()
email.set_content(rendered, subtype='html')
email['From'] = msg['from']
email['Reply-To'] = msg['reply_to']
email['To'] = msg['to']
email['Subject'] = msg['subject']
smtpObj.send_message(email)
ch.basic_ack(delivery_tag = method.delivery_tag)
except smtplib.SMTPServerDisconnected:
logging.error("Server unexpectedly disconnected. Attempting to reconnect")
connect_to_smtp()
except smtplib.SMTPResponseException as e:
logging.error(f"SMTP error occurred: {e.smtp_code} - {e.smtp_error}")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")

def process_dead_letter_messages(ch, method, properties, body):
"""
Callback for the ERROR MESSAGE
Output: none yet. I don't expect for messages to fall here, I keep the DLQ for safety

@see https://stackoverflow.com/a/58500336
"The way to do this is not to use NACK at all but to generate and return a 'new' message
(which is simply the current message you are handling, but adding new headers to it).
It appears that a NACK is basically doing this anyway according to the AMQP spec."
"""
REQUEUE_DELAY_DURATIONS = [
5 * 60000, # 5 mins
50 * 60000, # 50 mins
5*60 * 60000, # 5 hrs
5*60*10 * 60000, # 50 hrs
5*60*20 * 60000, # 100 hrs
] #TODO: why is this here again?
wait_for = REQUEUE_DELAY_DURATIONS[-1]

logging.error("For some reason there's the DLQ handler that was triggered!")
slack_alert("For some reason there's the DLQ handler that was triggered!")

headers = {
'x-delay': wait_for,
}
fullheaders = {**properties.headers, **headers}
prop = pika.BasicProperties(
headers=fullheaders,
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE,
)
channel.basic_publish(exchange='wait_exchange',
routing_key='wait',
body=body,
properties=prop)

ch.basic_ack(delivery_tag = method.delivery_tag)

def process_requeue(ch, method, properties, body):
"""
Callback for the WAITING MESSAGES
Output: Requeue on normal exchange (if error about missing template)
OR
Output: Remove (if unfixable error)
"""

if (properties.headers["reason"] == 'parameter_undefined'):
logging.warning('Impossible to fix error, dropping message')
#TODO output something/notify to leave a trail for better debugging on what was missing
ch.basic_ack(delivery_tag = method.delivery_tag)
return

channel.basic_publish(exchange='eml',
routing_key='mail',
body=body,
properties=pika.BasicProperties(
headers = properties.headers, # propagation to avoid endless loop
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE,
))
ch.basic_ack(delivery_tag = method.delivery_tag)

def main():
global smtpObj
global tpl_environment
global env
global channel

# Configure logging for this app, and remove "info" of pika
logging.basicConfig(level=logging.INFO)
logging.getLogger('pika').setLevel(logging.WARNING)

tpl_environment = Environment(loader=FileSystemLoader("../templates/"))
env = os.environ.get("ENV") or 'development'

RABBIT_HOST='rabbit'
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBIT_HOST))
channel = connection.channel()

channel.exchange_declare(exchange='eml',
exchange_type='direct',
durable=True)
channel.queue_declare(queue='email',
arguments={
'x-dead-letter-exchange': "dead_letter_exchange",
'x-dead-letter-routing-key': "dead_letter_routing_key",
'x-death-header': True,
},
durable=True)
channel.queue_bind(exchange='eml',
queue='email',
routing_key='mail')

#channel.basic_qos(prefetch_count=1) #TODO: notice that with this enabled, an error processing a message will BLOCK the others from being processed

channel.exchange_declare(exchange="dead_letter_exchange",
exchange_type='direct',
durable=True)
channel.queue_declare(queue='error_queue',
durable=True)
channel.queue_bind(exchange='dead_letter_exchange',
queue='error_queue',
routing_key='dead_letter_routing_key')

channel.exchange_declare(exchange="wait_exchange",
exchange_type='x-delayed-message',
durable=True,
arguments={"x-delayed-type": "direct"}
)
channel.queue_declare(queue='requeue_queue',
durable=True)
channel.queue_bind(exchange='wait_exchange',
queue='requeue_queue',
routing_key='wait')

channel.basic_consume(queue='email',
auto_ack=False,
on_message_callback=send_email)

channel.basic_consume(queue='error_queue',
auto_ack=False,
on_message_callback=process_dead_letter_messages)

channel.basic_consume(queue='requeue_queue',
auto_ack=False,
on_message_callback=process_requeue)

logging.info(' [*] Connecting to smtp')
connect_to_smtp()
logging.info(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
logging.error('Interrupted')
smtpObj.quit()
try:
sys.exit(0)
except SystemExit:
os._exit(0)

Loading