Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple broker and coordinator interface #6675

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from

Conversation

unkcpz
Copy link
Member

@unkcpz unkcpz commented Dec 20, 2024

  • all FIXME notes resolved
  • MockCoordinator -> InMemoryCoordinator and an exmaple of submit without rmq.
  • runner with dedicated thread (need more experiments)
  • Which change in plumpy/rmq-out resolve the test_disconnect timeout?
  • BroadcastFilter absorbed into add_broadcast_subscriber API.
  • broker and coordinator are duplicate abstraction, remove broker this should be done after, in this change, it bring coordinator layer first to not break the broker abstraction introduced (which is not real abstraction and relatively useless).
  • Move all kiwipy/rmq modules into broker/rabbitmq and make aiida-core not directly (but through broker/coordinator interface) dep on it.

@unkcpz unkcpz requested a review from agoscinski as a code owner December 20, 2024 15:46
@unkcpz unkcpz marked this pull request as draft December 20, 2024 15:46
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch 3 times, most recently from cff69d3 to b1f446a Compare December 20, 2024 17:42
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from afc0925 to 28cdb1c Compare December 21, 2024 02:08
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch 3 times, most recently from e613a3b to 5d59e6a Compare December 21, 2024 11:09
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from c6c6352 to c769906 Compare December 21, 2024 13:52
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from abaf3e9 to 03f7a5b Compare December 27, 2024 00:36
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from ca1dd09 to 02a939e Compare December 27, 2024 01:39
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from fe92318 to 329c51c Compare December 27, 2024 13:29
unkcpz and others added 3 commits December 27, 2024 14:30
When calling add_rpc_subscriber and add_task_subscriber, the event loop
of caller may from random event loop. But the target event loop is the
runner one. Therefore it requires to pass the loop to the broker when
creating the runner and runner's broker.
Instead of as a method of runner which is not needed and confuse.
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from 9583218 to 69db549 Compare December 28, 2024 00:35
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from a08471c to 5746ae8 Compare December 29, 2024 00:44
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from 5572660 to d62816e Compare December 29, 2024 22:59
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from ccf4af2 to d62816e Compare December 30, 2024 00:15
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from bf35126 to f2ec982 Compare January 10, 2025 20:28
unkcpz added a commit to unkcpz/plumpy that referenced this pull request Jan 17, 2025
The refactoring is targeting to decouple the dependencies of using kiwipy+rmq as the communicator for the process control. 
By forming a `Coordinator` protocol contract, the different type of rmq/kiwipy related codes are removed out from plumpy logic. The new contract also pave the way to make it clearly show how a new type coordinator can be implemented (future examples will be the `tatzelwurm` a task broker that has scheduler support and file based task broker require no background service). 
For the prototype of how a coordinator should look like, the `MockCoordinator` in `tests/utils` is the coordinator that store things in memory, and can serve as the lightweight ephemeral daemon without persistent functionality.

Another major change here is hand write the resolver of future by mimic how tho asyncio does for wrapping `concurrent.futures.Future` into `asyncio.Future`. I use the same way to convert `asyncio.Future` into `concurent.futures.Future` (which is the `kiwipy.Future` as alias). 

- move the `aio_pika` import lazily by moving the rmq exceptions to `rmq` module, this can increase the performance of `import aiida; aiida.orm`.
- ~~`CancellableAction` using composite to behave as a Future like object.~~
- use `asyncio.Future` in favor of alias `plumpy.Future` and 
- use `concurrent.futures.Future` instead of alias `kiwipy.Future`.
- Hand write `_chain` and `_copy_future` since we can not just rely on the API of asyncio that is not exposed.
- Forming the `coordinator/Communicator` protocol.
- Just forming the `coordinator/Coordinator` protocol and wrap rmq/communicator as a coordinator that not require changs in kiwipy.
- Mock the coordinator for unit test.
- test against aiida-core see what need to be changed there and improve here. (aiidateam/aiida-core#6675)
- The API for plumpy process can be more compact instead of using kiwipy/rmq "subscriber" concept. (how to replace rpc pattern??)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant