From 0833800179d8b11309767f4b90bd3b7545aa8ae3 Mon Sep 17 00:00:00 2001 From: Elizabeth Esswein Date: Tue, 28 May 2024 16:11:22 -0400 Subject: [PATCH] use events in MI tasks --- .../bpmn/serializer/default/process_spec.py | 8 + .../bpmn/specs/mixins/multiinstance_task.py | 215 ++++++++---------- .../camunda/specs/multiinstance_task.py | 4 +- 3 files changed, 107 insertions(+), 120 deletions(-) diff --git a/SpiffWorkflow/bpmn/serializer/default/process_spec.py b/SpiffWorkflow/bpmn/serializer/default/process_spec.py index c0f60fec..ec80fb26 100644 --- a/SpiffWorkflow/bpmn/serializer/default/process_spec.py +++ b/SpiffWorkflow/bpmn/serializer/default/process_spec.py @@ -18,6 +18,7 @@ # 02110-1301 USA from ..helpers.bpmn_converter import BpmnConverter +from SpiffWorkflow.bpmn.specs.mixins.multiinstance_task import LoopTask class BpmnProcessSpecConverter(BpmnConverter): @@ -71,6 +72,7 @@ def from_dict(self, dct): # Add messaging related stuff spec.correlation_keys = dct.pop('correlation_keys', {}) + loop_tasks = [] dct['task_specs'].pop('Root', None) for name, task_dict in dct['task_specs'].items(): # I hate this, but I need to pass in the workflow spec when I create the task. @@ -80,6 +82,12 @@ def from_dict(self, dct): task_spec = self.registry.restore(task_dict) if name == 'Start': spec.start = task_spec + if isinstance(task_spec, LoopTask): + loop_tasks.append(task_spec) self.restore_task_spec_extensions(task_dict, task_spec) + for task_spec in loop_tasks: + child_spec = spec.task_specs.get(task_spec.task_spec) + child_spec.completed_event.connect(task_spec.merge_child) + return spec diff --git a/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py b/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py index e4ad3b2c..795a3bd3 100644 --- a/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py +++ b/SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py @@ -29,25 +29,6 @@ class LoopTask(BpmnTaskSpec): - def process_children(self, my_task): - """ - Handle any newly completed children and update merged tasks. - Returns a boolean indicating whether there is a child currently running - """ - merged = self._merged_children(my_task) - child_running = False - for child in self._instances(my_task): - if child.has_state(TaskState.FINISHED_MASK) and str(child.id) not in merged: - self.child_completed_action(my_task, child) - merged.append(str(child.id)) - elif not child.has_state(TaskState.FINISHED_MASK): - child_running = True - my_task.internal_data['merged'] = merged - return child_running - - def child_completed_action(self, my_task, child): - raise NotImplementedError - def _merged_children(self, my_task): return my_task.internal_data.get('merged', []) @@ -73,40 +54,37 @@ def task_info(self, my_task): return info def _update_hook(self, my_task): - - if my_task.state != TaskState.WAITING: - super()._update_hook(my_task) - - child_running = self.process_children(my_task) - if child_running: - # We're in the middle of an iteration; we're not done and we can't create a new task - return False - elif self.loop_complete(my_task): - # No children running and one of the completion conditions has been met; done + super()._update_hook(my_task) + if self.test_before and self.loop_complete(my_task): return True else: - # Execute again - if my_task.state != TaskState.WAITING: - my_task._set_state(TaskState.WAITING) - task_spec = my_task.workflow.spec.task_specs[self.task_spec] - child = my_task._add_child(task_spec, TaskState.WAITING) - child.triggered = True - child.data = deepcopy(my_task.data) - child.internal_data['iteration'] = len(self._merged_children(my_task)) - - def child_completed_action(self, my_task, child): + my_task._set_state(TaskState.STARTED) + my_task.internal_data['merged'] = [] + self.create_child(my_task) + + def create_child(self, my_task): + task_spec = my_task.workflow.spec.task_specs[self.task_spec] + if not task_spec.completed_event.is_connected(self.merge_child): + task_spec.completed_event.connect(self.merge_child) + child = my_task._add_child(task_spec, TaskState.WAITING) + child.triggered = True + child.internal_data['iteration'] = len(self._merged_children(my_task)) + child.task_spec._update(child) + + def merge_child(self, workflow, child): + my_task = child.parent DeepMerge.merge(my_task.data, child.data) + my_task.internal_data['merged'].append(str(child.id)) + if self.loop_complete(my_task): + my_task._set_state(TaskState.READY) + else: + self.create_child(my_task) def loop_complete(self, my_task): - merged = self._merged_children(my_task) - if not self.test_before and len(merged) == 0: - # "test before" isn't really compatible our execution model in a transparent way - # This guarantees that the task will run at least once if test_before is False - return False - else: - max_complete = self.maximum is not None and len(merged) >= self.maximum - cond_complete = self.condition is not None and my_task.workflow.script_engine.evaluate(my_task, self.condition) - return max_complete or cond_complete + merged = my_task.internal_data.get('merged', []) + max_complete = self.maximum is not None and len(merged) >= self.maximum + cond_complete = self.condition is not None and my_task.workflow.script_engine.evaluate(my_task, self.condition) + return max_complete or cond_complete class MultiInstanceTask(LoopTask): @@ -145,9 +123,9 @@ def task_info(self, my_task): info['instance_map'][str(value)] = str(task.id) return info - def child_completed_action(self, my_task, child): + def merge_child(self, workflow, child): """This merges child data into this task's data.""" - + my_task = child.parent if self.data_output is not None and self.output_item is not None: if not self.output_item.exists(child): self.raise_data_exception("Expected an output item", child) @@ -161,10 +139,13 @@ def child_completed_action(self, my_task, child): data_output.append(item) else: DeepMerge.merge(my_task.data, child.data) + my_task.internal_data['merged'].append(str(child.id)) def create_child(self, my_task, item, key_or_index=None): task_spec = my_task.workflow.spec.task_specs[self.task_spec] + if not task_spec.completed_event.is_connected(self.merge_child): + task_spec.completed_event.connect(self.merge_child) child = my_task._add_child(task_spec, TaskState.WAITING) child.triggered = True if self.input_item is not None and self.input_item.exists(my_task): @@ -181,8 +162,8 @@ def create_child(self, my_task, item, key_or_index=None): def check_completion_condition(self, my_task): - merged = self._merged_children(my_task) - if len(merged) > 0: + merged = my_task.internal_data.get('merged', []) + if len(merged) > 0 and self.condition is not None: last_child = [c for c in my_task.children if str(c.id) == merged[-1]][0] return my_task.workflow.script_engine.evaluate(last_child, self.condition) @@ -225,17 +206,23 @@ def raise_data_exception(self, message, my_task): class SequentialMultiInstanceTask(MultiInstanceTask): def _update_hook(self, my_task): - - if my_task.state != TaskState.WAITING: - super()._update_hook(my_task) - - child_running = self.process_children(my_task) - if child_running: - return False - if self.condition is not None and self.check_completion_condition(my_task): - return True + super()._update_hook(my_task) + my_task.internal_data['merged'] = [] + if self.data_input is not None: + input_data = self.data_input.get(my_task) + my_task.internal_data['remaining'] = self.init_remaining_items(my_task) + if self.data_output is not None: + self.init_data_output_with_input_data(my_task, input_data) + else: + my_task.internal_data['cardinality'] = my_task.workflow.script_engine.evaluate(my_task, self.cardinality) + my_task.internal_data['current'] = 0 + if self.data_output is not None: + self.init_data_output_with_cardinality(my_task) + self.add_next_child(my_task) + if not self.children_complete(my_task): + my_task._set_state(TaskState.STARTED) else: - return self.add_next_child(my_task) + return True def task_info(self, my_task): info = super().task_info(my_task) @@ -245,29 +232,17 @@ def task_info(self, my_task): return info def add_next_child(self, my_task): - if self.data_input is not None: key_or_index, item = self.get_next_input_item(my_task) else: key_or_index, item = self.get_next_index(my_task) - if item is not None: - if my_task.state != TaskState.WAITING: - my_task._set_state(TaskState.WAITING) self.create_child(my_task, item, key_or_index) - else: - return True def get_next_input_item(self, my_task): input_data = self.data_input.get(my_task) remaining = my_task.internal_data.get('remaining') - - if remaining is None: - remaining = self.init_remaining_items(my_task) - if self.data_output is not None: - self.init_data_output_with_input_data(my_task, input_data) - if len(remaining) > 0: if isinstance(input_data, (Mapping, Sequence)): # In this case, we want to preserve a key or index @@ -280,6 +255,25 @@ def get_next_input_item(self, my_task): else: return None, None + def get_next_index(self, my_task): + + current = my_task.internal_data.get('current') + cardinality = my_task.internal_data.get('cardinality') + if current < cardinality: + # If using loop cardinality, use the index as the "item" + my_task.internal_data['current'] = current + 1 + return None, current + else: + return None, None + + def merge_child(self, workflow, child): + super().merge_child(workflow, child) + my_task = child.parent + if self.children_complete(my_task) or self.check_completion_condition(my_task): + my_task._set_state(TaskState.READY) + else: + self.add_next_child(my_task) + def init_remaining_items(self, my_task): if not self.data_input.exists(my_task): @@ -300,43 +294,34 @@ def init_remaining_items(self, my_task): self.raise_data_exception("Multiinstance data input must be iterable", my_task) return remaining - def get_next_index(self, my_task): - - current = my_task.internal_data.get('current') - if current is None: - current = 0 - if self.data_output is not None: - self.init_data_output_with_cardinality(my_task) - - cardinality = my_task.internal_data.get('cardinality') - if cardinality is None: - # In case the evaluated expression changes during execution - cardinality = my_task.workflow.script_engine.evaluate(my_task, self.cardinality) - my_task.internal_data['cardinality'] = cardinality - - if current < cardinality: - # If using loop cardinalty, if a data input was specified, use the index as the "item" - my_task.internal_data['current'] = current + 1 - return None, current + def children_complete(self, my_task): + if self.data_input is not None: + return len(my_task.internal_data.get('remaining', [])) == 0 else: - return None, None + return my_task.internal_data.get('current', 0) == my_task.internal_data.get('cardinality', 0) class ParallelMultiInstanceTask(MultiInstanceTask): def _update_hook(self, my_task): - - if my_task.state != TaskState.WAITING: - super()._update_hook(my_task) - self.create_children(my_task) - - child_running = self.process_children(my_task) - if self.condition is not None and self.check_completion_condition(my_task): - for child in my_task.children: - if child.task_spec.name == self.task_spec and child.state != TaskState.COMPLETED: - child.cancel() + super()._update_hook(my_task) + my_task.internal_data['merged'] = [] + self.create_children(my_task) + # If the input collection or cardinalty is 0, there won't be any children to cause the task to become ready + if not self.children_complete(my_task): + my_task._set_state(TaskState.STARTED) + else: return True - return not child_running + + def merge_child(self, workflow, child): + super().merge_child(workflow, child) + my_task = child.parent + if self.check_completion_condition(my_task): + for child in self._instances(my_task): + child.cancel() + my_task._set_state(TaskState.READY) + elif self.children_complete(my_task): + my_task._set_state(TaskState.READY) def create_children(self, my_task): @@ -355,18 +340,14 @@ def create_children(self, my_task): cardinality = my_task.workflow.script_engine.evaluate(my_task, self.cardinality) children = ((idx, idx) for idx in range(cardinality)) - if not my_task.internal_data.get('started', False): - - if self.data_output is not None: - if self.data_input is not None: - self.init_data_output_with_input_data(my_task, self.data_input.get(my_task)) - else: - self.init_data_output_with_cardinality(my_task) + if self.data_output is not None: + if self.data_input is not None: + self.init_data_output_with_input_data(my_task, self.data_input.get(my_task)) + else: + self.init_data_output_with_cardinality(my_task) - my_task._set_state(TaskState.WAITING) - for key_or_index, item in children: - self.create_child(my_task, item, key_or_index) + for key_or_index, item in children: + self.create_child(my_task, item, key_or_index) - my_task.internal_data['started'] = True - else: - return len(self._merged_children(my_task)) == len(children) + def children_complete(self, my_task): + return all(c.state == TaskState.COMPLETED for c in self._instances(my_task)) \ No newline at end of file diff --git a/SpiffWorkflow/camunda/specs/multiinstance_task.py b/SpiffWorkflow/camunda/specs/multiinstance_task.py index 764d12f8..364e4bd8 100644 --- a/SpiffWorkflow/camunda/specs/multiinstance_task.py +++ b/SpiffWorkflow/camunda/specs/multiinstance_task.py @@ -64,7 +64,5 @@ def _update_hook(self, my_task): class ParallelMultiInstanceTask(BpmnParallelMITask): def _update_hook(self, my_task): - if not my_task.internal_data.get('started', False): - update_task_spec(my_task) - self.create_children(my_task) + update_task_spec(my_task) return super()._update_hook(my_task) \ No newline at end of file