Skip to content

Commit

Permalink
Add possibility to extend end nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin on chickenita committed Aug 29, 2024
1 parent b483e34 commit be6fba6
Showing 1 changed file with 5 additions and 7 deletions.
12 changes: 5 additions & 7 deletions pypeman/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from pathlib import Path

from pypeman import message, msgstore, events
from pypeman.errors import PypemanConfigError
from pypeman.helpers.itertools import flatten
from pypeman.helpers.sleeper import Sleeper

Expand Down Expand Up @@ -441,7 +440,6 @@ async def subhandle(self, msg):
"""

result = await self.process(msg)

if self.next_node:
if isinstance(result, types.GeneratorType):
gene = result
Expand Down Expand Up @@ -616,7 +614,7 @@ def add_join_nodes(self, *end_nodes):
- SubChannel: no other endnodes will be launched (except final_nodes)
"""
if self.join_nodes:
raise PypemanConfigError(f"join_nodes already existing for channel {self.name}")
end_nodes = self.join_nodes.extend(end_nodes)
self.join_nodes = self._init_end_nodes(*end_nodes)

def add_fail_nodes(self, *end_nodes):
Expand All @@ -626,7 +624,7 @@ def add_fail_nodes(self, *end_nodes):
The first node take the entry message of the channel as input
"""
if self.fail_nodes:
raise PypemanConfigError(f"fail_nodes already existing for channel {self.name}")
end_nodes = self.fail_nodes.extend(end_nodes)
self.fail_nodes = self._init_end_nodes(*end_nodes)

def add_drop_nodes(self, *end_nodes):
Expand All @@ -635,7 +633,7 @@ def add_drop_nodes(self, *end_nodes):
The first node take the entry message of the channel as input
"""
if self.drop_nodes:
raise PypemanConfigError(f"drop_nodes already existing for channel {self.name}")
end_nodes = self.drop_nodes.extend(end_nodes)
self.drop_nodes = self._init_end_nodes(*end_nodes)

def add_reject_nodes(self, *end_nodes):
Expand All @@ -644,7 +642,7 @@ def add_reject_nodes(self, *end_nodes):
The first node take the entry message of the channel as input
"""
if self.reject_nodes:
raise PypemanConfigError(f"reject_nodes already existing for channel {self.name}")
end_nodes = self.reject_nodes.extend(end_nodes)
self.reject_nodes = self._init_end_nodes(*end_nodes)

def add_final_nodes(self, *end_nodes):
Expand All @@ -653,7 +651,7 @@ def add_final_nodes(self, *end_nodes):
The first node take the entry message of the channel as input (TODO: ask if it's ok)
"""
if self.final_nodes:
raise PypemanConfigError(f"final_nodes already existing for channel {self.name}")
end_nodes = self.final_nodes.extend(end_nodes)
self.final_nodes = self._init_end_nodes(*end_nodes)

# def _callback(self, future):
Expand Down

0 comments on commit be6fba6

Please sign in to comment.