Skip to content

Commit

Permalink
Logging tasks and local celery + flower (#789)
Browse files Browse the repository at this point in the history
* Logging tasks and local celery + flower

* flower

* Handler logger

* more logging

* format

* format

---------

Co-authored-by: abarolo <[email protected]>
  • Loading branch information
abarolo and abarolo authored May 10, 2024
1 parent 5d752e9 commit e3fe2ff
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 10 deletions.
3 changes: 3 additions & 0 deletions api/barrier_downloads/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def generate_barrier_download_file(
barrier_download_id: str,
barrier_ids: List[str],
):
logger.info("Running generate_barrier_download_file() task")

import time

from django.db import connection
Expand All @@ -31,6 +33,7 @@ def generate_barrier_download_file(

@shared_task
def barrier_download_complete_notification(barrier_download_id: str):
logger.info("Running barrier_download_complete_notification() task")
service.barrier_download_complete_notification(
barrier_download_id=barrier_download_id
)
7 changes: 7 additions & 0 deletions api/barriers/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ def barrier_changed_after_published(sender, instance, **kwargs):


def related_barrier_update_embeddings(sender, instance, *args, **kwargs):
logger.info(
f"(Handler) Running related_barrier_update_embeddings() handler for {instance.pk}"
)
try:
current_barrier_object = sender.objects.get(pk=instance.pk)
except sender.DoesNotExist:
Expand All @@ -390,6 +393,10 @@ def related_barrier_update_embeddings(sender, instance, *args, **kwargs):
getattr(current_barrier_object, field) != getattr(instance, field)
for field in BARRIER_UPDATE_FIELDS
)
logger.info(
f"(Handler) Updating related barrier embeddings for {instance.pk}: {changed}"
)

if changed and not current_barrier_object.draft:
if not manager.manager:
manager.init()
Expand Down
5 changes: 3 additions & 2 deletions api/barriers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def auto_update_inactive_barrier_status():
Take a list of barriers with modifed_on dates older than X months and X months
for each barrier, update their status to "Dormant" and "Archived" respectively
"""

logger.info("Running auto_update_inactive_barrier_status() task")
barriers_to_update = get_barriers_to_update_this_month()

for barrier in barriers_to_update["barriers_to_be_archived"]:
Expand All @@ -168,7 +168,7 @@ def send_auto_update_inactive_barrier_notification():
Get a list of barriers that will, in the next month, pass the threshold for automatic
status change to dormancy and archival. Send an email to relevant regional leads.
"""

logger.info("Running send_auto_update_inactive_barrier_notification() task")
# Get the barriers that are scheduled for auto-updating this month
barriers_to_update = get_barriers_to_update_this_month()

Expand Down Expand Up @@ -241,6 +241,7 @@ def send_barrier_inactivity_reminders():
Get list of all barriers with modified_on and activity_reminder_sent dates older than 6 months
For each barrier sent a reminder notification to the barrier owner
"""
logger.info("Running send_barrier_inactivity_reminders() task")

threshold_dates = get_inactivty_threshold_dates()

Expand Down
2 changes: 2 additions & 0 deletions api/documents/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
@transaction.atomic
def delete_document(document_pk):
"""Handle document delete."""
logger.info(f"Deleting document {document_pk}")
try:
perform_delete_document(document_pk)
except Exception:
Expand All @@ -29,6 +30,7 @@ def virus_scan_document(document_pk: str):
Any errors are logged and sent to Sentry.
"""
logger.info(f"Virus scanning document {document_pk}")
with advisory_lock(f"av-scan-{document_pk}"):
document = get_document_by_pk(document_pk)
if document:
Expand Down
12 changes: 12 additions & 0 deletions api/related_barriers/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def set_data(self, data: List[Dict]):
"""
Load data into memory.
"""
logger.info("(Related Barriers): set_data")
self.flush()
barrier_ids = [str(d["id"]) for d in data]
barrier_data = [d["barrier_corpus"] for d in data]
Expand All @@ -81,23 +82,28 @@ def set_data(self, data: List[Dict]):

@staticmethod
def flush():
logger.info("(Related Barriers): flush cache")
cache.delete(EMBEDDINGS_CACHE_KEY)
cache.delete(BARRIER_IDS_CACHE_KEY)

@staticmethod
def set_embeddings(embeddings):
logger.info("(Related Barriers): set_embeddings")
cache.set(EMBEDDINGS_CACHE_KEY, embeddings, timeout=None)

@staticmethod
def set_barrier_ids(barrier_ids):
logger.info("(Related Barriers): barrier_ids")
cache.set(BARRIER_IDS_CACHE_KEY, barrier_ids, timeout=None)

@staticmethod
def get_embeddings():
logger.info("(Related Barriers): get_embeddings")
return cache.get(EMBEDDINGS_CACHE_KEY, [])

@staticmethod
def get_barrier_ids():
logger.info("(Related Barriers): get_barrier_ids")
return cache.get(BARRIER_IDS_CACHE_KEY, [])

@property
Expand All @@ -106,6 +112,7 @@ def model(self):

@timing
def get_cosine_sim(self):
logger.info("(Related Barriers): get_cosine_sim")
embeddings = self.get_embeddings()
return util.cos_sim(embeddings, embeddings)

Expand All @@ -117,6 +124,7 @@ def encode_barrier_corpus(self, barrier: BarrierEntry):
def add_barrier(
self, barrier: BarrierEntry, barrier_ids: Optional[List[str]] = None
):
logger.info(f"(Related Barriers): add_barrier {barrier.id}")
"""barrier_ids: optimisation flag to avoid multiple cache requests"""
if barrier_ids is None:
barrier_ids = self.get_barrier_ids()
Expand All @@ -131,6 +139,7 @@ def add_barrier(

@timing
def remove_barrier(self, barrier: BarrierEntry, barrier_ids=None):
logger.info(f"(Related Barriers): remove_barrier {barrier.id}")
embeddings = self.get_embeddings()
if not barrier_ids:
barrier_ids = self.get_barrier_ids()
Expand All @@ -151,6 +160,7 @@ def remove_barrier(self, barrier: BarrierEntry, barrier_ids=None):

@timing
def update_barrier(self, barrier: BarrierEntry):
logger.info(f"(Related Barriers): update_barrier {barrier.id}")
barrier_ids = manager.get_barrier_ids()
if barrier.id in barrier_ids:
self.remove_barrier(barrier, barrier_ids)
Expand All @@ -160,6 +170,7 @@ def update_barrier(self, barrier: BarrierEntry):
def get_similar_barriers(
self, barrier: BarrierEntry, similarity_threshold: float, quantity: int
):
logger.info(f"(Related Barriers): get_similar_barriers {barrier.id}")
barrier_ids = self.get_barrier_ids()

if not barrier_ids:
Expand Down Expand Up @@ -216,6 +227,7 @@ def init():

manager = RelatedBarrierManager()
if not manager.get_barrier_ids():
logger.info("(Related Barriers): Initialising)")
data = get_data()
manager.set_data(data)

Expand Down
5 changes: 5 additions & 0 deletions api/related_barriers/views.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

from django.shortcuts import get_object_or_404
from rest_framework.decorators import api_view
from rest_framework.response import Response
Expand All @@ -11,12 +13,15 @@
)
from api.related_barriers.serializers import BarrierRelatedListSerializer

logger = logging.getLogger(__name__)


@api_view(["GET"])
def related_barriers(request, pk) -> Response:
"""
Return a list of related barriers
"""
logger.info(f"Getting related barriers for {pk}")
barrier = get_object_or_404(Barrier, pk=pk)

if manager.manager is None:
Expand Down
2 changes: 2 additions & 0 deletions api/user/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def get_saved_searches_for_notification(user):

@shared_task
def send_notification_emails():
logger.info("Running send_notification_emails() task")

User = get_user_model()
count = 0

Expand Down
8 changes: 0 additions & 8 deletions config/settings/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@

DJANGO_ENV = "local"

# Celery
# ----------------------------------------------------------------------------
# During local development all tasks will be executed syncronously,
# blocking the processes until the task returns
CELERY_TASK_ALWAYS_EAGER = True
BROKER_URL = "redis://redis:6379/2"
CELERY_RESULT_BACKEND = "redis://redis:6379/3"

LOGGING = DEFAULT_LOGGING # we don't care about comprehensive ECS logging for local

MIDDLEWARE.remove(
Expand Down
40 changes: 40 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,45 @@ services:
networks:
- webnet

celery_worker:
build:
context: .
dockerfile: docker/local/Dockerfile
command: celery -A config.celery worker -l INFO
volumes:
- .:/usr/src/app:delegated
env_file: docker-compose.env
depends_on:
- redis
- db
networks:
- webnet

celery_beat:
build:
context: .
dockerfile: docker/local/Dockerfile
command: celery -A config.celery beat -l INFO
volumes:
- .:/usr/src/app:delegated
env_file: docker-compose.env
depends_on:
- redis
- db
networks:
- webnet

flower:
image: mher/flower
container_name: flower
command: celery flower
environment:
- CELERY_BROKER=redis://redis:6379/1
- FLOWER_PORT=8777
ports:
- "8777:8777"
networks:
- webnet

networks:
webnet:

0 comments on commit e3fe2ff

Please sign in to comment.