Skip to content

Commit

Permalink
fix endnodes for condition channels that launch ever if the condition…
Browse files Browse the repository at this point in the history
… is false
  • Loading branch information
quentin on chickenita committed Jul 19, 2023
1 parent a70747e commit dbf3d72
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 20 deletions.
9 changes: 6 additions & 3 deletions pypeman/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def _reset_test(self):
if self.final_nodes:
for node in self.final_nodes:
node._reset_test()
if self.final_nodes:
if self.fail_nodes:
for node in self.fail_nodes:
node._reset_test()

Expand Down Expand Up @@ -362,7 +362,6 @@ async def handle(self, msg):

if self.status in [BaseChannel.STOPPED, BaseChannel.STOPPING]:
raise ChannelStopped("Channel is stopped so you can't send message.")

self.logger.info("chan %s handle %s", str(self), str(msg))
has_callback = hasattr(self, "_callback")
setattr(msg, "chan_rslt", None)
Expand Down Expand Up @@ -749,8 +748,12 @@ def test_condition(self, msg):
return self.condition

async def subhandle(self, msg):
result = await self.process(msg)
return result

async def handle(self, msg):
if self.test_condition(msg):
result = await self.process(msg)
result = await super().handle(msg)
else:
if self.next_node:
result = await self.next_node.handle(msg)
Expand Down
1 change: 0 additions & 1 deletion pypeman/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ async def handle(self, msg):
# TODO : Make sure exceptions are well raised (does not happen if i.e 1/0 here atm)
if self.store_input_as:
msg.add_context(self.store_input_as, msg)

if self.passthrough:
old_msg = msg.copy()
# Allow process as coroutine function
Expand Down
46 changes: 30 additions & 16 deletions pypeman/tests/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,35 +358,49 @@ def test_condchan_endnodes(self):
chan1.add_fail_nodes(chan1_endfail)
chan1.add_join_nodes(chan1_endok)
chan1.add_final_nodes(chan1_endfinal)

condchan_end = TstNode(name="condchan_end")
condchan.add_final_nodes(condchan_end)

chan1._reset_test()

# Test Msg don't enter in exc subchan
startmsg = generate_msg(message_content="startmsg")
self.start_channels()
self.loop.run_until_complete(chan1.handle(startmsg))
self.assertTrue(
chan1_endok.processed,
"chan1 ok_endnodes not called")
self.assertTrue(
chan1_endfinal.processed,
"chan1 final_endnodes not called")
self.assertFalse(
chan1_endfail.processed,
self.assertEqual(
chan1_endok.processed, 1,
"chan1 ok_endnodes not called or called multiple times")
self.assertEqual(
chan1_endfinal.processed, 1,
"chan1 final_endnodes not called or called multiple times")
self.assertEqual(
chan1_endfail.processed, 0,
"chan1 fail_callback called when nobody ask to him")
self.assertEqual(
chan1_endfail.processed, 0,
"chan1 fail_callback called when nobody ask to him")
self.assertEqual(
condchan_end.processed, 0,
"condchan1 final_callback called when nobody ask to him")

chan1._reset_test()
n3exc.mock(output=raise_exc)
excmsg = generate_msg(message_content="exc")
with self.assertRaises(Exception):
self.loop.run_until_complete(chan1.handle(excmsg))
self.assertTrue(
chan1_endfail.processed,
"chan1 fail_endnodes not called")
self.assertTrue(
chan1_endfinal.processed,
"chan1 final_endnodes not called")
self.assertFalse(
chan1_endok.processed,
self.assertEqual(
chan1_endfail.processed, 1,
"chan1 fail_endnodes not called or called multiple times")
self.assertEqual(
chan1_endfinal.processed, 1,
"chan1 final_endnodes not called or called multiple times")
self.assertEqual(
chan1_endok.processed, 0,
"chan1 ok_callback called when nobody ask to him")
self.assertEqual(
condchan_end.processed, 1,
"condchan1 final_callback not called or called multiple times")

def test_casecondchan_endnodes(self):
"""
Expand Down

0 comments on commit dbf3d72

Please sign in to comment.