Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename events & run in hook functions #408

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion SpiffWorkflow/specs/AcquireMutex.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _update_hook(self, my_task):
super()._update_hook(my_task)
mutex = my_task.workflow._get_mutex(self.mutex)
if mutex.testandset():
self.entered_event.emit(my_task.workflow, my_task)
self.update_event.emit(my_task.workflow, my_task)
return True
else:
my_task._set_state(TaskState.WAITING)
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/specs/ThreadMerge.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _update_hook(self, my_task):
# completed, except for the first one, which should be READY.
for task in tasks:
if task == last_changed:
self.entered_event.emit(my_task.workflow, my_task)
self.update_event.emit(my_task.workflow, my_task)
task._ready()
else:
task._set_state(TaskState.COMPLETED)
Expand Down
27 changes: 11 additions & 16 deletions SpiffWorkflow/specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ def __init__(self, wf_spec, name, **kwargs):
self.lookahead = 2 # Maximum number of MAYBE predictions.

# Events.
self.entered_event = Event()
self.reached_event = Event()
self.update_event = Event()
self.ready_event = Event()
self.completed_event = Event()
self.error_event = Event()
self.cancelled_event = Event()
self.finished_event = Event()
self.run_event = Event()

self._wf_spec._add_notify(self)
self.data.update(self.defines)
Expand Down Expand Up @@ -257,7 +257,6 @@ def _update(self, my_task):
"""
if my_task.has_state(TaskState.PREDICTED_MASK):
self._predict(my_task)
self.entered_event.emit(my_task.workflow, my_task)
if self._update_hook(my_task):
my_task._ready()

Expand All @@ -268,6 +267,7 @@ def _update_hook(self, my_task):
Returning True will cause the task to go into READY.
"""
my_task._inherit_data()
self.update_event.emit(my_task.workflow, my_task)
return True

def _on_ready(self, my_task):
Expand All @@ -285,7 +285,6 @@ def _on_ready(self, my_task):

# Run task-specific code.
self._on_ready_hook(my_task)
self.reached_event.emit(my_task.workflow, my_task)

def _on_ready_hook(self, my_task):
"""
Expand All @@ -294,7 +293,7 @@ def _on_ready_hook(self, my_task):
:type my_task: Task
:param my_task: The associated task in the task tree.
"""
pass
self.ready_event.emit(my_task.workflow, my_task)

def _run(self, my_task):
"""
Expand All @@ -314,15 +313,11 @@ def _run(self, my_task):
try:
result = self._run_hook(my_task)
# Run user code, if any.
if self.ready_event.emit(my_task.workflow, my_task):
# Assign variables, if so requested.
for assignment in self.post_assign:
assignment.assign(my_task, my_task)

self.finished_event.emit(my_task.workflow, my_task)
for assignment in self.post_assign:
assignment.assign(my_task, my_task)
return result
except Exception as exc:
my_task._set_state(TaskState.ERROR)
my_task.error()
raise exc

def _run_hook(self, my_task):
Expand All @@ -332,6 +327,7 @@ def _run_hook(self, my_task):
:type my_task: Task
:param my_task: The associated task in the task tree.
"""
self.run_event.emit(my_task.workflow, my_task)
return True

def _on_cancel(self, my_task):
Expand Down Expand Up @@ -371,7 +367,6 @@ def _on_complete(self, my_task):
if not child.has_state(TaskState.FINISHED_MASK):
child.task_spec._update(child)
my_task.workflow._task_completed_notify(my_task)
self.completed_event.emit(my_task.workflow, my_task)

def _on_complete_hook(self, my_task):
"""
Expand All @@ -382,14 +377,14 @@ def _on_complete_hook(self, my_task):
:rtype: bool
:returns: True on success, False otherwise.
"""
pass
self.completed_event.emit(my_task.workflow, my_task)

def _on_error(self, my_task):
self._on_error_hook(my_task)

def _on_error_hook(self, my_task):
"""Can be overridden for task specific error handling"""
pass
self.error_event.emit(my_task.workflow, my_task)

@abstractmethod
def serialize(self, serializer, **kwargs):
Expand Down
8 changes: 4 additions & 4 deletions tests/SpiffWorkflow/bpmn/events/CallActivityEscalationTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
__author__ = '[email protected]'


def on_reached_cb(workflow, task, completed_set):
def on_ready_cb(workflow, task, completed_set):
# In workflows that load a subworkflow, the newly loaded children
# will not have on_reached_cb() assigned. By using this function, we
# re-assign the function in every step, thus making sure that new
Expand All @@ -24,9 +24,9 @@ def on_complete_cb(workflow, task, completed_set):


def track_task(task_spec, completed_set):
if task_spec.reached_event.is_connected(on_reached_cb):
task_spec.reached_event.disconnect(on_reached_cb)
task_spec.reached_event.connect(on_reached_cb, completed_set)
if task_spec.ready_event.is_connected(on_ready_cb):
task_spec.ready_event.disconnect(on_ready_cb)
task_spec.ready_event.connect(on_ready_cb, completed_set)
if task_spec.completed_event.is_connected(on_complete_cb):
task_spec.completed_event.disconnect(on_complete_cb)
task_spec.completed_event.connect(on_complete_cb, completed_set)
Expand Down
20 changes: 10 additions & 10 deletions tests/SpiffWorkflow/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from SpiffWorkflow.specs import SubWorkflow


def on_reached_cb(workflow, task, taken_path):
def on_ready_cb(workflow, task, taken_path):
reached_key = "%s_reached" % str(task.task_spec.name)
n_reached = task.get_data(reached_key, 0) + 1
task.set_data(**{reached_key: n_reached,
Expand Down Expand Up @@ -44,31 +44,31 @@ def on_complete_cb(workflow, task, taken_path):
indent = ' ' * task.depth
taken_path.append('%s%s' % (indent, task.task_spec.name))
# In workflows that load a subworkflow, the newly loaded children
# will not have on_reached_cb() assigned. By using this function, we
# will not have on_ready_cb() assigned. By using this function, we
# re-assign the function in every step, thus making sure that new
# children also call on_reached_cb().
# children also call on_ready_cb().
for child in task.children:
track_task(child.task_spec, taken_path)
return True

def on_entered_cb(workflow, task, taken_path):
def on_update_cb(workflow, task, taken_path):
for child in task.children:
track_task(child.task_spec, taken_path)
return True

def track_task(task_spec, taken_path):
# Disconnecting and reconnecting makes absolutely no sense but inexplicably these tests break
# if just connected based on a check that they're not
if task_spec.reached_event.is_connected(on_reached_cb):
task_spec.reached_event.disconnect(on_reached_cb)
task_spec.reached_event.connect(on_reached_cb, taken_path)
if task_spec.ready_event.is_connected(on_ready_cb):
task_spec.ready_event.disconnect(on_ready_cb)
task_spec.ready_event.connect(on_ready_cb, taken_path)
if task_spec.completed_event.is_connected(on_complete_cb):
task_spec.completed_event.disconnect(on_complete_cb)
task_spec.completed_event.connect(on_complete_cb, taken_path)
if isinstance(task_spec, SubWorkflow):
if task_spec.entered_event.is_connected(on_entered_cb):
task_spec.entered_event.disconnect(on_entered_cb)
task_spec.entered_event.connect(on_entered_cb, taken_path)
if task_spec.update_event.is_connected(on_update_cb):
task_spec.update_event.disconnect(on_update_cb)
task_spec.update_event.connect(on_update_cb, taken_path)

def track_workflow(wf_spec, taken_path=None):
if taken_path is None:
Expand Down
Loading