Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic support for broadcast messages #66

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

Basic support for broadcast messages #66

wants to merge 6 commits into from

Conversation

cc-a
Copy link
Contributor

@cc-a cc-a commented Sep 11, 2024

Description

Provides the simplest possible implementation for broadcast messages that I could come up with. A more sophisticated version will be possible after #7 is completed. Adds:

  • A web endpoint where messages can be deposited - currently saved into a global variable but will change to user sessions (i.e. databese) once we have users...
  • A script - kafka_consumer.py - that listens for messages send by Kafka and uploads to the web endpoint.
  • Basic display of messages on the index page.
  • Some supporting configuration changes.

Working with Kafka is a bit fiddly so let me know how you get on with the instructions added in the README.

No tests yet as I don't they're useful for something so hacky.

I'm not aiming to have this merged by the end of the day so take your time.

Fixes #11 and #10

Type of change

  • Documentation (non-breaking change that adds or improves the documentation)
  • New feature (non-breaking change which adds functionality)
  • Optimization (non-breaking, back-end change that speeds up the code)
  • Bug fix (non-breaking change which fixes an issue)
  • Breaking change (whatever its nature)

Key checklist

  • All tests pass (eg. python -m pytest)
  • The documentation builds and looks OK (eg. python -m sphinx -b html docs docs/build)
  • Pre-commit hooks run successfully (eg. pre-commit run --all-files)

Further checks

  • Code is commented, particularly in hard-to-understand areas
  • Tests added or an issue has been opened to tackle that in the future. (Indicate issue here: # (issue))

@cc-a cc-a linked an issue Sep 16, 2024 that may be closed by this pull request
Copy link
Contributor

@AdrianDAlessandro AdrianDAlessandro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy that everything looks sensible. But I was not able to run it:

$ poetry run drunc-unified-shell --log-level debug ./data/process-manager-pocket-kafka.json
[17:21:19] DEBUG    "unified_shell": Parent's PID: 36084 | This PID: 66041                                                                                                                                                 shell.py:41
Starting process manager with configuration file://./data/process-manager-pocket-kafka.json
Process Process-1:
Traceback (most recent call last):
  File "/Users/adalessa/.pyenv/versions/3.11.1/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/Users/adalessa/.pyenv/versions/3.11.1/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/adalessa/Projects/QwikApp/QwikApp_web/.venv/lib/python3.11/site-packages/drunc/process_manager/interface/process_manager.py", line 15, in run_pm
    parent_death_pact() # If the parent dies (for example unified shell), we die too
    ^^^^^^^^^^^^^^^^^^^
  File "/Users/adalessa/Projects/QwikApp/QwikApp_web/.venv/lib/python3.11/site-packages/drunc/utils/utils.py", line 183, in parent_death_pact
    assert sys.platform == 'linux', \
           ^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: this fn only works on Linux right now

dune_processes/settings/settings.py Show resolved Hide resolved
main/templates/main/index.html Show resolved Hide resolved
# extreme hackiness suitable only for demonstration purposes
# will replace this with per-user session storage - once we've added auth
MESSAGES: list[str] = []
"""Broadcast messages to display to the user."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting as a reminder to open this as an issue to address later

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO label

A NO_CONTENT response.
"""
MESSAGES.append(request.POST["message"])
return HttpResponse(status=HTTPStatus.NO_CONTENT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how this is hacky. Is the plan to add a messages field to the user model?

Also, is there a desire to make this a REST API using the rest_framework?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would a REST API endpoint be best here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whilst we could store messages under the user (or other model) I think the ephemeral nature of the data makes storage in user sessions a better fit - https://docs.djangoproject.com/en/5.1/topics/http/sessions/. Ultimately this does store data in the database but it provides a need way of making sure we only collect messages when a user needs to see them.

No desire to use rest_framework as it's a bit of a sledge hammer. After thinking about it more I don't think we actually need an endpoint if we write the Kafka consumer as a custom django-admin command it can populate data into sessions directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is NO_CONTENT returned here? Is it the same in general for a successful POST?

data/process-manager-pocket-kafka.json Show resolved Hide resolved
scripts/kafka_consumer.py Show resolved Hide resolved
Copy link
Collaborator

@dalonsoa dalonsoa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I manage to get it working in a VM with Ubuntu and Python 3.11.

All good but I had to invert steps 1 and 2 for it to work. Drunk refused to work with that configuration unless Kafka was up and running first.

I get a lot of scary looking, error messages in drunc, as well. See for example the following (but more nicely formatted). Not sure if that is normal:

[11:07:25] INFO "kafka.coordinator": Elected group leader -- performing partition base.py:521 assignments using range INFO "kafka.coordinator": Successfully joined group base.py:335 drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf with generation 3 INFO "kafka.consumer.subscription_state": Updated partition subscription_state.py:257 assignment: [TopicPartition(topic='control.dalonsoa.process_manage r', partition=0)] INFO "kafka.coordinator.consumer": Setting newly assigned partitions consumer.py:245 {TopicPartition(topic='control.dalonsoa.process_manager', partition=0)} for group drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf [11:08:14] WARNING "kafka.coordinator": Heartbeat session expired, marking coordinator base.py:985 dead WARNING "kafka.coordinator": Marking the coordinator dead (node base.py:715 coordinator-0) for group drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf: Heartbeat session expired. INFO "kafka.cluster": Group coordinator for cluster.py:371 drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf is BrokerMetadata(nodeId='coordinator-0', host='192.168.1.22', port=30092, rack=None) INFO "kafka.coordinator": Discovered coordinator coordinator-0 for group base.py:693 drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf WARNING "kafka.coordinator.consumer": Auto offset commit failed for consumer.py:794 group drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf : CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max_poll_interval_ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the rebalance timeout with max_poll_interval_ms, or by reducing the maximum size of batches returned in poll() with max_poll_records.
       ERROR    "kafka.coordinator.consumer": Offset commit failed: This is      consumer.py:539
                likely to cause duplicate message delivery                                      
                Traceback (most recent call last):                                              
                  File                                                                          
                "/home/dalonsoa/.cache/pypoetry/virtualenvs/dune-processes-gWVRM                
                ber-py3.11/lib/python3.11/site-packages/kafka/coordinator/consum                
                er.py", line 528, in _maybe_auto_commit_offsets_sync                            
                    self.commit_offsets_sync(self._subscription.all_consumed_off                
                sets())                                                                         
                  File                                                                          
                "/home/dalonsoa/.cache/pypoetry/virtualenvs/dune-processes-gWVRM                
                ber-py3.11/lib/python3.11/site-packages/kafka/coordinator/consum                
                er.py", line 521, in commit_offsets_sync                                        
                    raise future.exception # pylint:                                            
                disable-msg=raising-bad-type                                                    
                    ^^^^^^^^^^^^^^^^^^^^^^                                                      
                kafka.errors.CommitFailedError: CommitFailedError: Commit cannot                
                be completed since the group has already                                        
                            rebalanced and assigned the partitions to another                   
                member.                                                                         
                            This means that the time between subsequent calls to                
                poll()                                                                          
                            was longer than the configured max_poll_interval_ms,                
                which                                                                           
                            typically implies that the poll loop is spending too                
                much                                                                            
                            time message processing. You can address this either                
                by                                                                              
                            increasing the rebalance timeout with                               
                max_poll_interval_ms,                                                           
                            or by reducing the maximum size of batches returned                 
                in poll()                                                                       
                            with max_poll_records.                                              
                                                                                                
       INFO     "kafka.coordinator.consumer": Revoking previously assigned       consumer.py:348
                partitions                                                                      
                {TopicPartition(topic='control.dalonsoa.process_manager',                       
                partition=0)} for group                                                         
                drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf                
       INFO     "kafka.coordinator": (Re-)joining group                              base.py:450
                drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf                

[11:08:18] INFO "kafka.coordinator": Elected group leader -- performing partition base.py:521
assignments using range
INFO "kafka.coordinator": Successfully joined group base.py:335
drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf
with generation 5
INFO "kafka.consumer.subscription_state": Updated partition subscription_state.py:257
assignment:
[TopicPartition(topic='control.dalonsoa.process_manage
r', partition=0)]
INFO "kafka.coordinator.consumer": Setting newly assigned partitions consumer.py:245
{TopicPartition(topic='control.dalonsoa.process_manager',
partition=0)} for group
drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf
[11:10:31] WARNING "kafka.coordinator": Heartbeat session expired, marking coordinator base.py:985
dead
WARNING "kafka.coordinator": Marking the coordinator dead (node base.py:715
coordinator-0) for group
drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf:
Heartbeat session expired.
[11:10:34] INFO "kafka.cluster": Group coordinator for cluster.py:371
drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf
is BrokerMetadata(nodeId='coordinator-0', host='192.168.1.22',
port=30092, rack=None)
INFO "kafka.coordinator": Discovered coordinator coordinator-0 for group base.py:693
drunc-stdout-broadcasthandler-dalonsoa-2024-09-18-10-56-22-unssf

README.md Outdated Show resolved Hide resolved
data/process-manager-pocket-kafka.json Show resolved Hide resolved
docker-compose.yml Show resolved Hide resolved
# extreme hackiness suitable only for demonstration purposes
# will replace this with per-user session storage - once we've added auth
MESSAGES: list[str] = []
"""Broadcast messages to display to the user."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO label

Comment on lines +217 to +218
@require_POST
@csrf_exempt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What these two do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require_POST makes it so that the view only accepts POST requests. csrf_exempt removes django's cross site request forgery protection which isn't relevant here.

A NO_CONTENT response.
"""
MESSAGES.append(request.POST["message"])
return HttpResponse(status=HTTPStatus.NO_CONTENT)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would a REST API endpoint be best here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, is this all it is from the point of view of the consumer or it is just a minimal version to get going? Should this be running as some sort of daemon rather than a script?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in a previous comment I suggest we turn this into a manage.py command. Could probably use a pass for robustness.

@cc-a
Copy link
Contributor Author

cc-a commented Sep 18, 2024

Ok, so I manage to get it working in a VM with Ubuntu and Python 3.11.

All good but I had to invert steps 1 and 2 for it to work. Drunk refused to work with that configuration unless Kafka was up and running first.

I get a lot of scary looking, error messages in drunc, as well. See for example the following (but more nicely formatted). Not sure if that is normal:

Well done for persevering. So long as it worked I wouldn't worry about the messages.

Copy link
Contributor

@jamesturner246 jamesturner246 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all makes sense once we get Kafka running. LGTM 👍

Only minor comment is about default values for environment variables. But then I know nothing about best practice in wep apps. Cheers for the help earlier.

dune_processes/settings/settings.py Show resolved Hide resolved
A NO_CONTENT response.
"""
MESSAGES.append(request.POST["message"])
return HttpResponse(status=HTTPStatus.NO_CONTENT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is NO_CONTENT returned here? Is it the same in general for a successful POST?

@jamesturner246
Copy link
Contributor

Did have one extra question actually: what's the difference between docker compose and kubernetes, and when/why do they prefer one over the other?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka Listener API endpoint for Kafka messages
4 participants