Skip to content

Commit

Permalink
Merge pull request #408 from sartography/feature/align-callbacks-with…
Browse files Browse the repository at this point in the history
…-state-transitions

rename events & run in hook functions
  • Loading branch information
essweine authored May 14, 2024
2 parents 651ed6c + 3478d5e commit ac2d8f3
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 32 deletions.
2 changes: 1 addition & 1 deletion SpiffWorkflow/specs/AcquireMutex.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _update_hook(self, my_task):
super()._update_hook(my_task)
mutex = my_task.workflow._get_mutex(self.mutex)
if mutex.testandset():
self.entered_event.emit(my_task.workflow, my_task)
self.update_event.emit(my_task.workflow, my_task)
return True
else:
my_task._set_state(TaskState.WAITING)
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/specs/ThreadMerge.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _update_hook(self, my_task):
# completed, except for the first one, which should be READY.
for task in tasks:
if task == last_changed:
self.entered_event.emit(my_task.workflow, my_task)
self.update_event.emit(my_task.workflow, my_task)
task._ready()
else:
task._set_state(TaskState.COMPLETED)
Expand Down
27 changes: 11 additions & 16 deletions SpiffWorkflow/specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ def __init__(self, wf_spec, name, **kwargs):
self.lookahead = 2 # Maximum number of MAYBE predictions.

# Events.
self.entered_event = Event()
self.reached_event = Event()
self.update_event = Event()
self.ready_event = Event()
self.completed_event = Event()
self.error_event = Event()
self.cancelled_event = Event()
self.finished_event = Event()
self.run_event = Event()

self._wf_spec._add_notify(self)
self.data.update(self.defines)
Expand Down Expand Up @@ -257,7 +257,6 @@ def _update(self, my_task):
"""
if my_task.has_state(TaskState.PREDICTED_MASK):
self._predict(my_task)
self.entered_event.emit(my_task.workflow, my_task)
if self._update_hook(my_task):
my_task._ready()

Expand All @@ -268,6 +267,7 @@ def _update_hook(self, my_task):
Returning True will cause the task to go into READY.
"""
my_task._inherit_data()
self.update_event.emit(my_task.workflow, my_task)
return True

def _on_ready(self, my_task):
Expand All @@ -285,7 +285,6 @@ def _on_ready(self, my_task):

# Run task-specific code.
self._on_ready_hook(my_task)
self.reached_event.emit(my_task.workflow, my_task)

def _on_ready_hook(self, my_task):
"""
Expand All @@ -294,7 +293,7 @@ def _on_ready_hook(self, my_task):
:type my_task: Task
:param my_task: The associated task in the task tree.
"""
pass
self.ready_event.emit(my_task.workflow, my_task)

def _run(self, my_task):
"""
Expand All @@ -314,15 +313,11 @@ def _run(self, my_task):
try:
result = self._run_hook(my_task)
# Run user code, if any.
if self.ready_event.emit(my_task.workflow, my_task):
# Assign variables, if so requested.
for assignment in self.post_assign:
assignment.assign(my_task, my_task)

self.finished_event.emit(my_task.workflow, my_task)
for assignment in self.post_assign:
assignment.assign(my_task, my_task)
return result
except Exception as exc:
my_task._set_state(TaskState.ERROR)
my_task.error()
raise exc

def _run_hook(self, my_task):
Expand All @@ -332,6 +327,7 @@ def _run_hook(self, my_task):
:type my_task: Task
:param my_task: The associated task in the task tree.
"""
self.run_event.emit(my_task.workflow, my_task)
return True

def _on_cancel(self, my_task):
Expand Down Expand Up @@ -371,7 +367,6 @@ def _on_complete(self, my_task):
if not child.has_state(TaskState.FINISHED_MASK):
child.task_spec._update(child)
my_task.workflow._task_completed_notify(my_task)
self.completed_event.emit(my_task.workflow, my_task)

def _on_complete_hook(self, my_task):
"""
Expand All @@ -382,14 +377,14 @@ def _on_complete_hook(self, my_task):
:rtype: bool
:returns: True on success, False otherwise.
"""
pass
self.completed_event.emit(my_task.workflow, my_task)

def _on_error(self, my_task):
self._on_error_hook(my_task)

def _on_error_hook(self, my_task):
"""Can be overridden for task specific error handling"""
pass
self.error_event.emit(my_task.workflow, my_task)

@abstractmethod
def serialize(self, serializer, **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions tests/SpiffWorkflow/bpmn/events/CallActivityEscalationTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
__author__ = '[email protected]'


def on_reached_cb(workflow, task, completed_set):
def on_ready_cb(workflow, task, completed_set):
# In workflows that load a subworkflow, the newly loaded children
# will not have on_reached_cb() assigned. By using this function, we
# re-assign the function in every step, thus making sure that new
Expand All @@ -24,9 +24,9 @@ def on_complete_cb(workflow, task, completed_set):


def track_task(task_spec, completed_set):
if task_spec.reached_event.is_connected(on_reached_cb):
task_spec.reached_event.disconnect(on_reached_cb)
task_spec.reached_event.connect(on_reached_cb, completed_set)
if task_spec.ready_event.is_connected(on_ready_cb):
task_spec.ready_event.disconnect(on_ready_cb)
task_spec.ready_event.connect(on_ready_cb, completed_set)
if task_spec.completed_event.is_connected(on_complete_cb):
task_spec.completed_event.disconnect(on_complete_cb)
task_spec.completed_event.connect(on_complete_cb, completed_set)
Expand Down
20 changes: 10 additions & 10 deletions tests/SpiffWorkflow/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from SpiffWorkflow.specs import SubWorkflow


def on_reached_cb(workflow, task, taken_path):
def on_ready_cb(workflow, task, taken_path):
reached_key = "%s_reached" % str(task.task_spec.name)
n_reached = task.get_data(reached_key, 0) + 1
task.set_data(**{reached_key: n_reached,
Expand Down Expand Up @@ -44,31 +44,31 @@ def on_complete_cb(workflow, task, taken_path):
indent = ' ' * task.depth
taken_path.append('%s%s' % (indent, task.task_spec.name))
# In workflows that load a subworkflow, the newly loaded children
# will not have on_reached_cb() assigned. By using this function, we
# will not have on_ready_cb() assigned. By using this function, we
# re-assign the function in every step, thus making sure that new
# children also call on_reached_cb().
# children also call on_ready_cb().
for child in task.children:
track_task(child.task_spec, taken_path)
return True

def on_entered_cb(workflow, task, taken_path):
def on_update_cb(workflow, task, taken_path):
for child in task.children:
track_task(child.task_spec, taken_path)
return True

def track_task(task_spec, taken_path):
# Disconnecting and reconnecting makes absolutely no sense but inexplicably these tests break
# if just connected based on a check that they're not
if task_spec.reached_event.is_connected(on_reached_cb):
task_spec.reached_event.disconnect(on_reached_cb)
task_spec.reached_event.connect(on_reached_cb, taken_path)
if task_spec.ready_event.is_connected(on_ready_cb):
task_spec.ready_event.disconnect(on_ready_cb)
task_spec.ready_event.connect(on_ready_cb, taken_path)
if task_spec.completed_event.is_connected(on_complete_cb):
task_spec.completed_event.disconnect(on_complete_cb)
task_spec.completed_event.connect(on_complete_cb, taken_path)
if isinstance(task_spec, SubWorkflow):
if task_spec.entered_event.is_connected(on_entered_cb):
task_spec.entered_event.disconnect(on_entered_cb)
task_spec.entered_event.connect(on_entered_cb, taken_path)
if task_spec.update_event.is_connected(on_update_cb):
task_spec.update_event.disconnect(on_update_cb)
task_spec.update_event.connect(on_update_cb, taken_path)

def track_workflow(wf_spec, taken_path=None):
if taken_path is None:
Expand Down

0 comments on commit ac2d8f3

Please sign in to comment.