From 51cd0cc5aa2a64cec5ab625590b9a621bdcc5fa6 Mon Sep 17 00:00:00 2001 From: Christopher Cave-Ayland Date: Tue, 10 Sep 2024 18:22:07 +0100 Subject: [PATCH 1/6] Add ability to upload and display broadcast messages to users --- main/templates/main/index.html | 12 ++++++++++++ main/urls.py | 1 + main/views.py | 27 ++++++++++++++++++++++++++- pyproject.toml | 2 +- scripts/kafka_consumer.py | 33 +++++++++++++++++++++++++++++++++ 5 files changed, 73 insertions(+), 2 deletions(-) create mode 100644 scripts/kafka_consumer.py diff --git a/main/templates/main/index.html b/main/templates/main/index.html index 1ec7155..cc4d60b 100644 --- a/main/templates/main/index.html +++ b/main/templates/main/index.html @@ -4,9 +4,21 @@ {% block title %}Home{% endblock title %} {% block content %} +
+ {% if messages %} +

Messages

+
+ {% for message in messages %} + {{ message }} + {% endfor %} +
+
+ {% endif %} + Boot {% render_table table %}
+ {% endblock content %} diff --git a/main/urls.py b/main/urls.py index 2703ba6..0833051 100644 --- a/main/urls.py +++ b/main/urls.py @@ -12,4 +12,5 @@ path("flush/", views.flush_process, name="flush"), path("logs/", views.logs, name="logs"), path("boot_process/", views.BootProcessView.as_view(), name="boot_process"), + path("message/", views.deposit_message, name="message"), ] diff --git a/main/views.py b/main/views.py index 3f9c7ae..176e0e9 100644 --- a/main/views.py +++ b/main/views.py @@ -3,11 +3,14 @@ import asyncio import uuid from enum import Enum +from http import HTTPStatus import django_tables2 from django.http import HttpRequest, HttpResponse, HttpResponseRedirect from django.shortcuts import render from django.urls import reverse, reverse_lazy +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_POST from django.views.generic.edit import FormView from drunc.process_manager.process_manager_driver import ProcessManagerDriver from drunc.utils.shell_utils import DecodedResponse, create_dummy_token_from_uname @@ -22,6 +25,11 @@ from .forms import BootProcessForm from .tables import ProcessTable +# extreme hackiness suitable only for demonstration purposes +# will replace this with per-user session storage - once we've added auth +MESSAGES: list[str] = [] +"""Broadcast messages to display to the user.""" + def get_process_manager_driver() -> ProcessManagerDriver: """Get a ProcessManagerDriver instance.""" @@ -63,8 +71,10 @@ def index(request: HttpRequest) -> HttpResponse: table_configurator = django_tables2.RequestConfig(request) table_configurator.configure(table) - context = {"table": table} + global MESSAGES + MESSAGES, messages = [], MESSAGES + context = {"table": table, "messages": messages} return render(request=request, context=context, template_name="main/index.html") @@ -199,3 +209,18 @@ def form_valid(self, form: BootProcessForm) -> HttpResponse: """ asyncio.run(_boot_process("root", form.cleaned_data)) return super().form_valid(form) + + +@require_POST +@csrf_exempt +def deposit_message(request: HttpRequest) -> HttpResponse: + """Upload point for broadcast messages for display to end user. + + Args: + request: the triggering request. + + Returns: + A NO_CONTENT response. + """ + MESSAGES.append(request.POST["message"]) + return HttpResponse(status=HTTPStatus.NO_CONTENT) diff --git a/pyproject.toml b/pyproject.toml index bdea211..1c7ea79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,7 @@ module = "tests.*" disallow_untyped_defs = false [[tool.mypy.overrides]] -module = ["druncschema.*", "drunc.*", "django_tables2.*"] +module = ["druncschema.*", "drunc.*", "django_tables2.*", "kafka.*"] ignore_missing_imports = true [tool.django-stubs] diff --git a/scripts/kafka_consumer.py b/scripts/kafka_consumer.py new file mode 100644 index 0000000..50816e1 --- /dev/null +++ b/scripts/kafka_consumer.py @@ -0,0 +1,33 @@ +"""Example client that consumes messages from Kafka and sends them to the web app.""" + +import os +from urllib.parse import urlencode +from urllib.request import Request, urlopen + +from druncschema.broadcast_pb2 import BroadcastMessage +from kafka import KafkaConsumer + +KAFKA_URL = os.getenv("KAFKA_URL", "127.0.0.1:30092") +SERVER_URL = os.getenv("SERVER_URL", "http://localhost:8000") + + +def main() -> None: + """Listen for Kafka messages and process them indefinitely.""" + consumer = KafkaConsumer(bootstrap_servers=[KAFKA_URL]) + consumer.subscribe(pattern="control.*.process_manager") + + print("Listening for messages from Kafka.") + while True: + for messages in consumer.poll(timeout_ms=500).values(): + for message in messages: + print(f"Message received: {message}") + bm = BroadcastMessage() + bm.ParseFromString(message.value) + + data = urlencode(dict(message=bm.data.value)) + request = Request(f"{SERVER_URL}/message/", data=data.encode()) + urlopen(request) + + +if __name__ == "__main__": + main() From 2913cb421af13b747699aee3ad3abfe120ca7ce4 Mon Sep 17 00:00:00 2001 From: Christopher Cave-Ayland Date: Wed, 11 Sep 2024 14:16:19 +0100 Subject: [PATCH 2/6] Make server locations configurable --- docker-compose.yml | 2 ++ dune_processes/settings/settings.py | 3 +++ main/views.py | 5 ++++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index a830ff0..9e1fc82 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,8 @@ services: volumes: - .:/usr/src/app - db:/usr/src/app/db + environment: + - PROCESS_MANAGER_URL=drunc:10054 drunc: build: ./drunc_docker_service/ command: diff --git a/dune_processes/settings/settings.py b/dune_processes/settings/settings.py index 98764f5..803bdc4 100644 --- a/dune_processes/settings/settings.py +++ b/dune_processes/settings/settings.py @@ -9,6 +9,7 @@ https://docs.djangoproject.com/en/5.1/ref/settings/ """ +import os from pathlib import Path # Build paths inside the project like this: BASE_DIR / 'subdir'. @@ -133,3 +134,5 @@ INSTALLED_APPS += ["django_bootstrap5"] DJANGO_TABLES2_TEMPLATE = "django_tables2/bootstrap5.html" + +PROCESS_MANAGER_URL = os.getenv("PROCESS_MANAGER_URL", "localhost:10054") diff --git a/main/views.py b/main/views.py index 176e0e9..273e6d1 100644 --- a/main/views.py +++ b/main/views.py @@ -6,6 +6,7 @@ from http import HTTPStatus import django_tables2 +from django.conf import settings from django.http import HttpRequest, HttpResponse, HttpResponseRedirect from django.shortcuts import render from django.urls import reverse, reverse_lazy @@ -34,7 +35,9 @@ def get_process_manager_driver() -> ProcessManagerDriver: """Get a ProcessManagerDriver instance.""" token = create_dummy_token_from_uname() - return ProcessManagerDriver("drunc:10054", token=token, aio_channel=True) + return ProcessManagerDriver( + settings.PROCESS_MANAGER_URL, token=token, aio_channel=True + ) async def get_session_info() -> ProcessInstanceList: From 8437dbaf1b73f7fdd3279d9cbecba90a713178f5 Mon Sep 17 00:00:00 2001 From: Christopher Cave-Ayland Date: Wed, 11 Sep 2024 14:38:59 +0100 Subject: [PATCH 3/6] Add instructions for developing with Kafka --- README.md | 22 ++++++++++++++++++++++ data/process-manager-pocket-kafka.json | 13 +++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 data/process-manager-pocket-kafka.json diff --git a/README.md b/README.md index 0c4d26a..75e8398 100644 --- a/README.md +++ b/README.md @@ -53,3 +53,25 @@ docker compose exec app python scripts/talk_to_process_manager.py ``` Take the servers down with `docker compose down` + +### Working with Kafka + +Due to the complexities of containerising Kafka it is not possible to use the standard +Docker Compose setup. Instead when working with functionality that requires Kafka it is +necessary to run the individual components manually. + +1. Start the drunc shell: + `poetry run drunc-unified-shell --log-level debug ./data/process-manager-pocket-kafka.json` + +1. Start Kafka - See [Running drunc with pocket kafka]. + +1. Start the application server: + `poetry run python manage.py runserver` + +1. Start the Kafka consumer: + `poetry run python scripts/kafka_consumer.py` + +From here you should be able to see broadcast messages displayed at the top of the index +page on every refresh. + +[Running drunc with pocket kafka]: https://github.com/DUNE-DAQ/drunc/wiki/Running-drunc-with-pocket-kafka diff --git a/data/process-manager-pocket-kafka.json b/data/process-manager-pocket-kafka.json new file mode 100644 index 0000000..5432175 --- /dev/null +++ b/data/process-manager-pocket-kafka.json @@ -0,0 +1,13 @@ +{ + "type": "ssh", + "name": "SSHProcessManager", + "command_address": "0.0.0.0:10054", + "authoriser": { + "type": "dummy" + }, + "broadcaster": { + "type": "kafka", + "kafka_address": "127.0.0.1:30092", + "publish_timeout": 2 + } +} From c0d66ab2d02f7d19f17e20c36cf66ee52c520331 Mon Sep 17 00:00:00 2001 From: Christopher Cave-Ayland Date: Wed, 18 Sep 2024 16:54:18 +0100 Subject: [PATCH 4/6] Add todo for message storage global --- main/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main/views.py b/main/views.py index 273e6d1..13e62b6 100644 --- a/main/views.py +++ b/main/views.py @@ -27,7 +27,7 @@ from .tables import ProcessTable # extreme hackiness suitable only for demonstration purposes -# will replace this with per-user session storage - once we've added auth +# TODO: replace this with per-user session storage - once we've added auth MESSAGES: list[str] = [] """Broadcast messages to display to the user.""" From 8987b95d9fb1a8341d89410880141c131f35e968 Mon Sep 17 00:00:00 2001 From: Christopher Cave-Ayland Date: Wed, 18 Sep 2024 17:07:22 +0100 Subject: [PATCH 5/6] Add readme to data directory --- data/README.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 data/README.md diff --git a/data/README.md b/data/README.md new file mode 100644 index 0000000..4c59e5b --- /dev/null +++ b/data/README.md @@ -0,0 +1,5 @@ +# Data files + +## process-manager-pocket-kafka.json + +Process manager configuration file for use in local development with Kafka. From 10f05e901b1bf0f247e55a48d263483a44730d4c Mon Sep 17 00:00:00 2001 From: Christopher Cave-Ayland Date: Wed, 18 Sep 2024 17:10:36 +0100 Subject: [PATCH 6/6] Update instructions to start kafka first --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 75e8398..53834e4 100644 --- a/README.md +++ b/README.md @@ -60,11 +60,11 @@ Due to the complexities of containerising Kafka it is not possible to use the st Docker Compose setup. Instead when working with functionality that requires Kafka it is necessary to run the individual components manually. +1. Start Kafka - See [Running drunc with pocket kafka]. + 1. Start the drunc shell: `poetry run drunc-unified-shell --log-level debug ./data/process-manager-pocket-kafka.json` -1. Start Kafka - See [Running drunc with pocket kafka]. - 1. Start the application server: `poetry run python manage.py runserver`