Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic support for broadcast messages #66

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,25 @@ docker compose exec app python scripts/talk_to_process_manager.py
```

Take the servers down with `docker compose down`

### Working with Kafka

Due to the complexities of containerising Kafka it is not possible to use the standard
Docker Compose setup. Instead when working with functionality that requires Kafka it is
necessary to run the individual components manually.

1. Start Kafka - See [Running drunc with pocket kafka].

1. Start the drunc shell:
`poetry run drunc-unified-shell --log-level debug ./data/process-manager-pocket-kafka.json`

1. Start the application server:
`poetry run python manage.py runserver`

1. Start the Kafka consumer:
`poetry run python scripts/kafka_consumer.py`

From here you should be able to see broadcast messages displayed at the top of the index
page on every refresh.

[Running drunc with pocket kafka]: https://github.com/DUNE-DAQ/drunc/wiki/Running-drunc-with-pocket-kafka
5 changes: 5 additions & 0 deletions data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Data files

## process-manager-pocket-kafka.json

Process manager configuration file for use in local development with Kafka.
13 changes: 13 additions & 0 deletions data/process-manager-pocket-kafka.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"type": "ssh",
"name": "SSHProcessManager",
"command_address": "0.0.0.0:10054",
"authoriser": {
"type": "dummy"
},
"broadcaster": {
"type": "kafka",
"kafka_address": "127.0.0.1:30092",
cc-a marked this conversation as resolved.
Show resolved Hide resolved
"publish_timeout": 2
}
}
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ services:
volumes:
- .:/usr/src/app
- db:/usr/src/app/db
environment:
- PROCESS_MANAGER_URL=drunc:10054
cc-a marked this conversation as resolved.
Show resolved Hide resolved
drunc:
build: ./drunc_docker_service/
command:
Expand Down
3 changes: 3 additions & 0 deletions dune_processes/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
https://docs.djangoproject.com/en/5.1/ref/settings/
"""

import os
from pathlib import Path

# Build paths inside the project like this: BASE_DIR / 'subdir'.
Expand Down Expand Up @@ -133,3 +134,5 @@

INSTALLED_APPS += ["django_bootstrap5"]
DJANGO_TABLES2_TEMPLATE = "django_tables2/bootstrap5.html"

PROCESS_MANAGER_URL = os.getenv("PROCESS_MANAGER_URL", "localhost:10054")
cc-a marked this conversation as resolved.
Show resolved Hide resolved
12 changes: 12 additions & 0 deletions main/templates/main/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,21 @@
{% block title %}Home{% endblock title %}

{% block content %}

<div class="col">
{% if messages %}
<h2>Messages</h2>
<div style="white-space: pre-wrap;">
cc-a marked this conversation as resolved.
Show resolved Hide resolved
{% for message in messages %}
{{ message }}
{% endfor %}
</div>
<hr class="solid">
{% endif %}

<a href="{% url 'main:boot_process' %}" class="btn btn-primary">Boot</a>
{% render_table table %}
</div>


{% endblock content %}
1 change: 1 addition & 0 deletions main/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
path("flush/<uuid:uuid>", views.flush_process, name="flush"),
path("logs/<uuid:uuid>", views.logs, name="logs"),
path("boot_process/", views.BootProcessView.as_view(), name="boot_process"),
path("message/", views.deposit_message, name="message"),
]
32 changes: 30 additions & 2 deletions main/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import asyncio
import uuid
from enum import Enum
from http import HTTPStatus

import django_tables2
from django.conf import settings
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse, reverse_lazy
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from django.views.generic.edit import FormView
from drunc.process_manager.process_manager_driver import ProcessManagerDriver
from drunc.utils.shell_utils import DecodedResponse, create_dummy_token_from_uname
Expand All @@ -22,11 +26,18 @@
from .forms import BootProcessForm
from .tables import ProcessTable

# extreme hackiness suitable only for demonstration purposes
# TODO: replace this with per-user session storage - once we've added auth
MESSAGES: list[str] = []
"""Broadcast messages to display to the user."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting as a reminder to open this as an issue to address later

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO label



def get_process_manager_driver() -> ProcessManagerDriver:
"""Get a ProcessManagerDriver instance."""
token = create_dummy_token_from_uname()
return ProcessManagerDriver("drunc:10054", token=token, aio_channel=True)
return ProcessManagerDriver(
settings.PROCESS_MANAGER_URL, token=token, aio_channel=True
)


async def get_session_info() -> ProcessInstanceList:
Expand Down Expand Up @@ -63,8 +74,10 @@ def index(request: HttpRequest) -> HttpResponse:
table_configurator = django_tables2.RequestConfig(request)
table_configurator.configure(table)

context = {"table": table}
global MESSAGES
MESSAGES, messages = [], MESSAGES

context = {"table": table, "messages": messages}
return render(request=request, context=context, template_name="main/index.html")


Expand Down Expand Up @@ -199,3 +212,18 @@ def form_valid(self, form: BootProcessForm) -> HttpResponse:
"""
asyncio.run(_boot_process("root", form.cleaned_data))
return super().form_valid(form)


@require_POST
@csrf_exempt
Comment on lines +217 to +218
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What these two do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require_POST makes it so that the view only accepts POST requests. csrf_exempt removes django's cross site request forgery protection which isn't relevant here.

def deposit_message(request: HttpRequest) -> HttpResponse:
"""Upload point for broadcast messages for display to end user.

Args:
request: the triggering request.

Returns:
A NO_CONTENT response.
"""
MESSAGES.append(request.POST["message"])
return HttpResponse(status=HTTPStatus.NO_CONTENT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how this is hacky. Is the plan to add a messages field to the user model?

Also, is there a desire to make this a REST API using the rest_framework?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would a REST API endpoint be best here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whilst we could store messages under the user (or other model) I think the ephemeral nature of the data makes storage in user sessions a better fit - https://docs.djangoproject.com/en/5.1/topics/http/sessions/. Ultimately this does store data in the database but it provides a need way of making sure we only collect messages when a user needs to see them.

No desire to use rest_framework as it's a bit of a sledge hammer. After thinking about it more I don't think we actually need an endpoint if we write the Kafka consumer as a custom django-admin command it can populate data into sessions directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is NO_CONTENT returned here? Is it the same in general for a successful POST?

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ module = "tests.*"
disallow_untyped_defs = false

[[tool.mypy.overrides]]
module = ["druncschema.*", "drunc.*", "django_tables2.*"]
module = ["druncschema.*", "drunc.*", "django_tables2.*", "kafka.*"]
ignore_missing_imports = true

[tool.django-stubs]
Expand Down
33 changes: 33 additions & 0 deletions scripts/kafka_consumer.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, is this all it is from the point of view of the consumer or it is just a minimal version to get going? Should this be running as some sort of daemon rather than a script?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in a previous comment I suggest we turn this into a manage.py command. Could probably use a pass for robustness.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Example client that consumes messages from Kafka and sends them to the web app."""

import os
from urllib.parse import urlencode
from urllib.request import Request, urlopen

from druncschema.broadcast_pb2 import BroadcastMessage
from kafka import KafkaConsumer

KAFKA_URL = os.getenv("KAFKA_URL", "127.0.0.1:30092")
SERVER_URL = os.getenv("SERVER_URL", "http://localhost:8000")


def main() -> None:
"""Listen for Kafka messages and process them indefinitely."""
consumer = KafkaConsumer(bootstrap_servers=[KAFKA_URL])
consumer.subscribe(pattern="control.*.process_manager")

print("Listening for messages from Kafka.")
while True:
for messages in consumer.poll(timeout_ms=500).values():
for message in messages:
print(f"Message received: {message}")
bm = BroadcastMessage()
bm.ParseFromString(message.value)
cc-a marked this conversation as resolved.
Show resolved Hide resolved

data = urlencode(dict(message=bm.data.value))
request = Request(f"{SERVER_URL}/message/", data=data.encode())
urlopen(request)


if __name__ == "__main__":
main()