Skip to content

Commit

Permalink
Merge pull request #291 from mhcomm/dvl/quentin/add_init_nodes
Browse files Browse the repository at this point in the history
add init nodes that are called before channel subhandle
  • Loading branch information
quentinql authored Aug 29, 2024
2 parents 859b52e + ff727bd commit 33c9a33
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
12 changes: 12 additions & 0 deletions pypeman/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self, name=None, parent_channel=None, loop=None, message_store_fact
self.drop_nodes = None
self.reject_nodes = None
self.final_nodes = None
self.init_nodes = None
self.wait_subchans = wait_subchans
self.raise_dropped = False

Expand Down Expand Up @@ -375,6 +376,8 @@ async def handle(self, msg):
async with self.lock:
self.status = BaseChannel.PROCESSING
try:
if self.init_nodes:
msg = await self.init_nodes[0].handle(msg.copy())
result = await self.subhandle(msg.copy())
await self.message_store.change_message_state(msg_store_id, message.Message.PROCESSED)
msg.chan_rslt = result
Expand Down Expand Up @@ -593,6 +596,15 @@ def _init_end_nodes(self, *end_nodes):
previous_node = node
return end_nodes

def add_init_nodes(self, *nodes):
"""
Add nodes that will be launched at the start of the channel before all
processing nodes
"""
if self.init_nodes:
nodes = self.init_nodes.extend(nodes)
self.init_nodes = self._init_end_nodes(*nodes)

def add_join_nodes(self, *end_nodes):
"""
Add nodes that will be launched only after a successful channel process
Expand Down
1 change: 0 additions & 1 deletion pypeman/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ async def handle(self, msg):
result = await self.async_run(msg)
else:
result = self.run(msg)

self.processed += 1

if isinstance(result, asyncio.Future):
Expand Down
29 changes: 29 additions & 0 deletions pypeman/tests/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import tempfile
import time

from functools import partial

from hl7.client import MLLPClient

from pathlib import Path
Expand Down Expand Up @@ -39,6 +41,11 @@ def raise_exc(msg):
raise Exception()


def return_text(msg, text):
msg.payload = text
return msg


class ChannelsTests(TestCase):
def clean_loop(self):
# Useful to execute future callbacks # TODO: remove ?
Expand Down Expand Up @@ -234,6 +241,28 @@ def test_final_nodes(self):
vars(self.clean_msg(msg1)), vars(self.clean_msg(endnode_input)),
"Channel final_nodes don't takes event msg in input")

def test_init_nodes(self):
""" Whether BaseChannel init_nodes is working """
chan1 = BaseChannel(name="test_channel_init_clbk", loop=self.loop)
initouttext = "inittxt"

n1 = TstNode(name="n1")
initnode = TstNode(name="initnode")
n1._reset_test()
initnode._reset_test()
initnode.mock(output=partial(return_text, text=initouttext))
chan1.add_init_nodes(initnode)
chan1.add(n1)
msg1 = generate_msg(message_content="startmsg")
self.start_channels()

self.loop.run_until_complete(chan1.handle(msg1))

n1_input = n1.last_input()
self.assertEqual(
n1_input.payload, initouttext,
"Channel init_nodes doesn't seem to work")

def test_multiple_callbacks(self):
"""
Whether BaseChannel all endnodes are working at same time
Expand Down

0 comments on commit 33c9a33

Please sign in to comment.