Skip to content

Commit

Permalink
use events in MI tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
essweine committed May 29, 2024
1 parent 54f80ea commit 0833800
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 120 deletions.
8 changes: 8 additions & 0 deletions SpiffWorkflow/bpmn/serializer/default/process_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# 02110-1301 USA

from ..helpers.bpmn_converter import BpmnConverter
from SpiffWorkflow.bpmn.specs.mixins.multiinstance_task import LoopTask


class BpmnProcessSpecConverter(BpmnConverter):
Expand Down Expand Up @@ -71,6 +72,7 @@ def from_dict(self, dct):
# Add messaging related stuff
spec.correlation_keys = dct.pop('correlation_keys', {})

loop_tasks = []
dct['task_specs'].pop('Root', None)
for name, task_dict in dct['task_specs'].items():
# I hate this, but I need to pass in the workflow spec when I create the task.
Expand All @@ -80,6 +82,12 @@ def from_dict(self, dct):
task_spec = self.registry.restore(task_dict)
if name == 'Start':
spec.start = task_spec
if isinstance(task_spec, LoopTask):
loop_tasks.append(task_spec)
self.restore_task_spec_extensions(task_dict, task_spec)

for task_spec in loop_tasks:
child_spec = spec.task_specs.get(task_spec.task_spec)
child_spec.completed_event.connect(task_spec.merge_child)

return spec
215 changes: 98 additions & 117 deletions SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,6 @@

class LoopTask(BpmnTaskSpec):

def process_children(self, my_task):
"""
Handle any newly completed children and update merged tasks.
Returns a boolean indicating whether there is a child currently running
"""
merged = self._merged_children(my_task)
child_running = False
for child in self._instances(my_task):
if child.has_state(TaskState.FINISHED_MASK) and str(child.id) not in merged:
self.child_completed_action(my_task, child)
merged.append(str(child.id))
elif not child.has_state(TaskState.FINISHED_MASK):
child_running = True
my_task.internal_data['merged'] = merged
return child_running

def child_completed_action(self, my_task, child):
raise NotImplementedError

def _merged_children(self, my_task):
return my_task.internal_data.get('merged', [])

Expand All @@ -73,40 +54,37 @@ def task_info(self, my_task):
return info

def _update_hook(self, my_task):

if my_task.state != TaskState.WAITING:
super()._update_hook(my_task)

child_running = self.process_children(my_task)
if child_running:
# We're in the middle of an iteration; we're not done and we can't create a new task
return False
elif self.loop_complete(my_task):
# No children running and one of the completion conditions has been met; done
super()._update_hook(my_task)
if self.test_before and self.loop_complete(my_task):
return True
else:
# Execute again
if my_task.state != TaskState.WAITING:
my_task._set_state(TaskState.WAITING)
task_spec = my_task.workflow.spec.task_specs[self.task_spec]
child = my_task._add_child(task_spec, TaskState.WAITING)
child.triggered = True
child.data = deepcopy(my_task.data)
child.internal_data['iteration'] = len(self._merged_children(my_task))

def child_completed_action(self, my_task, child):
my_task._set_state(TaskState.STARTED)
my_task.internal_data['merged'] = []
self.create_child(my_task)

def create_child(self, my_task):
task_spec = my_task.workflow.spec.task_specs[self.task_spec]
if not task_spec.completed_event.is_connected(self.merge_child):
task_spec.completed_event.connect(self.merge_child)
child = my_task._add_child(task_spec, TaskState.WAITING)
child.triggered = True
child.internal_data['iteration'] = len(self._merged_children(my_task))
child.task_spec._update(child)

def merge_child(self, workflow, child):
my_task = child.parent
DeepMerge.merge(my_task.data, child.data)
my_task.internal_data['merged'].append(str(child.id))
if self.loop_complete(my_task):
my_task._set_state(TaskState.READY)
else:
self.create_child(my_task)

def loop_complete(self, my_task):
merged = self._merged_children(my_task)
if not self.test_before and len(merged) == 0:
# "test before" isn't really compatible our execution model in a transparent way
# This guarantees that the task will run at least once if test_before is False
return False
else:
max_complete = self.maximum is not None and len(merged) >= self.maximum
cond_complete = self.condition is not None and my_task.workflow.script_engine.evaluate(my_task, self.condition)
return max_complete or cond_complete
merged = my_task.internal_data.get('merged', [])
max_complete = self.maximum is not None and len(merged) >= self.maximum
cond_complete = self.condition is not None and my_task.workflow.script_engine.evaluate(my_task, self.condition)
return max_complete or cond_complete


class MultiInstanceTask(LoopTask):
Expand Down Expand Up @@ -145,9 +123,9 @@ def task_info(self, my_task):
info['instance_map'][str(value)] = str(task.id)
return info

def child_completed_action(self, my_task, child):
def merge_child(self, workflow, child):
"""This merges child data into this task's data."""

my_task = child.parent
if self.data_output is not None and self.output_item is not None:
if not self.output_item.exists(child):
self.raise_data_exception("Expected an output item", child)
Expand All @@ -161,10 +139,13 @@ def child_completed_action(self, my_task, child):
data_output.append(item)
else:
DeepMerge.merge(my_task.data, child.data)
my_task.internal_data['merged'].append(str(child.id))

def create_child(self, my_task, item, key_or_index=None):

task_spec = my_task.workflow.spec.task_specs[self.task_spec]
if not task_spec.completed_event.is_connected(self.merge_child):
task_spec.completed_event.connect(self.merge_child)
child = my_task._add_child(task_spec, TaskState.WAITING)
child.triggered = True
if self.input_item is not None and self.input_item.exists(my_task):
Expand All @@ -181,8 +162,8 @@ def create_child(self, my_task, item, key_or_index=None):

def check_completion_condition(self, my_task):

merged = self._merged_children(my_task)
if len(merged) > 0:
merged = my_task.internal_data.get('merged', [])
if len(merged) > 0 and self.condition is not None:
last_child = [c for c in my_task.children if str(c.id) == merged[-1]][0]
return my_task.workflow.script_engine.evaluate(last_child, self.condition)

Expand Down Expand Up @@ -225,17 +206,23 @@ def raise_data_exception(self, message, my_task):
class SequentialMultiInstanceTask(MultiInstanceTask):

def _update_hook(self, my_task):

if my_task.state != TaskState.WAITING:
super()._update_hook(my_task)

child_running = self.process_children(my_task)
if child_running:
return False
if self.condition is not None and self.check_completion_condition(my_task):
return True
super()._update_hook(my_task)
my_task.internal_data['merged'] = []
if self.data_input is not None:
input_data = self.data_input.get(my_task)
my_task.internal_data['remaining'] = self.init_remaining_items(my_task)
if self.data_output is not None:
self.init_data_output_with_input_data(my_task, input_data)
else:
my_task.internal_data['cardinality'] = my_task.workflow.script_engine.evaluate(my_task, self.cardinality)
my_task.internal_data['current'] = 0
if self.data_output is not None:
self.init_data_output_with_cardinality(my_task)
self.add_next_child(my_task)
if not self.children_complete(my_task):
my_task._set_state(TaskState.STARTED)
else:
return self.add_next_child(my_task)
return True

def task_info(self, my_task):
info = super().task_info(my_task)
Expand All @@ -245,29 +232,17 @@ def task_info(self, my_task):
return info

def add_next_child(self, my_task):

if self.data_input is not None:
key_or_index, item = self.get_next_input_item(my_task)
else:
key_or_index, item = self.get_next_index(my_task)

if item is not None:
if my_task.state != TaskState.WAITING:
my_task._set_state(TaskState.WAITING)
self.create_child(my_task, item, key_or_index)
else:
return True

def get_next_input_item(self, my_task):

input_data = self.data_input.get(my_task)
remaining = my_task.internal_data.get('remaining')

if remaining is None:
remaining = self.init_remaining_items(my_task)
if self.data_output is not None:
self.init_data_output_with_input_data(my_task, input_data)

if len(remaining) > 0:
if isinstance(input_data, (Mapping, Sequence)):
# In this case, we want to preserve a key or index
Expand All @@ -280,6 +255,25 @@ def get_next_input_item(self, my_task):
else:
return None, None

def get_next_index(self, my_task):

current = my_task.internal_data.get('current')
cardinality = my_task.internal_data.get('cardinality')
if current < cardinality:
# If using loop cardinality, use the index as the "item"
my_task.internal_data['current'] = current + 1
return None, current
else:
return None, None

def merge_child(self, workflow, child):
super().merge_child(workflow, child)
my_task = child.parent
if self.children_complete(my_task) or self.check_completion_condition(my_task):
my_task._set_state(TaskState.READY)
else:
self.add_next_child(my_task)

def init_remaining_items(self, my_task):

if not self.data_input.exists(my_task):
Expand All @@ -300,43 +294,34 @@ def init_remaining_items(self, my_task):
self.raise_data_exception("Multiinstance data input must be iterable", my_task)
return remaining

def get_next_index(self, my_task):

current = my_task.internal_data.get('current')
if current is None:
current = 0
if self.data_output is not None:
self.init_data_output_with_cardinality(my_task)

cardinality = my_task.internal_data.get('cardinality')
if cardinality is None:
# In case the evaluated expression changes during execution
cardinality = my_task.workflow.script_engine.evaluate(my_task, self.cardinality)
my_task.internal_data['cardinality'] = cardinality

if current < cardinality:
# If using loop cardinalty, if a data input was specified, use the index as the "item"
my_task.internal_data['current'] = current + 1
return None, current
def children_complete(self, my_task):
if self.data_input is not None:
return len(my_task.internal_data.get('remaining', [])) == 0
else:
return None, None
return my_task.internal_data.get('current', 0) == my_task.internal_data.get('cardinality', 0)


class ParallelMultiInstanceTask(MultiInstanceTask):

def _update_hook(self, my_task):

if my_task.state != TaskState.WAITING:
super()._update_hook(my_task)
self.create_children(my_task)

child_running = self.process_children(my_task)
if self.condition is not None and self.check_completion_condition(my_task):
for child in my_task.children:
if child.task_spec.name == self.task_spec and child.state != TaskState.COMPLETED:
child.cancel()
super()._update_hook(my_task)
my_task.internal_data['merged'] = []
self.create_children(my_task)
# If the input collection or cardinalty is 0, there won't be any children to cause the task to become ready
if not self.children_complete(my_task):
my_task._set_state(TaskState.STARTED)
else:
return True
return not child_running

def merge_child(self, workflow, child):
super().merge_child(workflow, child)
my_task = child.parent
if self.check_completion_condition(my_task):
for child in self._instances(my_task):
child.cancel()
my_task._set_state(TaskState.READY)
elif self.children_complete(my_task):
my_task._set_state(TaskState.READY)

def create_children(self, my_task):

Expand All @@ -355,18 +340,14 @@ def create_children(self, my_task):
cardinality = my_task.workflow.script_engine.evaluate(my_task, self.cardinality)
children = ((idx, idx) for idx in range(cardinality))

if not my_task.internal_data.get('started', False):

if self.data_output is not None:
if self.data_input is not None:
self.init_data_output_with_input_data(my_task, self.data_input.get(my_task))
else:
self.init_data_output_with_cardinality(my_task)
if self.data_output is not None:
if self.data_input is not None:
self.init_data_output_with_input_data(my_task, self.data_input.get(my_task))
else:
self.init_data_output_with_cardinality(my_task)

my_task._set_state(TaskState.WAITING)
for key_or_index, item in children:
self.create_child(my_task, item, key_or_index)
for key_or_index, item in children:
self.create_child(my_task, item, key_or_index)

my_task.internal_data['started'] = True
else:
return len(self._merged_children(my_task)) == len(children)
def children_complete(self, my_task):
return all(c.state == TaskState.COMPLETED for c in self._instances(my_task))
4 changes: 1 addition & 3 deletions SpiffWorkflow/camunda/specs/multiinstance_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,5 @@ def _update_hook(self, my_task):
class ParallelMultiInstanceTask(BpmnParallelMITask):

def _update_hook(self, my_task):
if not my_task.internal_data.get('started', False):
update_task_spec(my_task)
self.create_children(my_task)
update_task_spec(my_task)
return super()._update_hook(my_task)

0 comments on commit 0833800

Please sign in to comment.