Skip to content

Commit

Permalink
store one copy of a data object in the workflow data of the process w…
Browse files Browse the repository at this point in the history
…here it was defined
  • Loading branch information
essweine committed Mar 12, 2024
1 parent 32c00e4 commit 9c74ab2
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 35 deletions.
7 changes: 2 additions & 5 deletions SpiffWorkflow/bpmn/parser/ProcessParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def __init__(self, p, node, nsmap, data_stores, filename=None, lane=None):
:param node: the XML node for the process
:param data_stores: map of ids to data store implementations
:param filename: the source BPMN filename (optional)
:param doc_xpath: an xpath evaluator for the document (optional)
:param lane: the lane of a subprocess (optional)
"""
super().__init__(node, nsmap, filename=filename, lane=lane)
Expand All @@ -48,7 +47,7 @@ def __init__(self, p, node, nsmap, data_stores, filename=None, lane=None):
self.spec = None
self.process_executable = node.get('isExecutable', 'true') == 'true'
self.data_stores = data_stores
self.inherited_data_objects = {}
self.parent = None

def get_name(self):
"""
Expand Down Expand Up @@ -118,8 +117,6 @@ def _parse(self):
raise ValidationException(f"Process {self.bpmn_id} is not executable.", node=self.node, file_name=self.filename)
self.spec = BpmnProcessSpec(name=self.bpmn_id, description=self.get_name(), filename=self.filename)

self.spec.data_objects.update(self.inherited_data_objects)

# Get the data objects
for obj in self.xpath('./bpmn:dataObject'):
data_object = self.parse_data_object(obj)
Expand Down Expand Up @@ -147,7 +144,7 @@ def _parse(self):
split_task.inputs = [self.spec.start]

def parse_data_object(self, obj):
return DataObject(obj.get('id'), obj.get('name'))
return self.create_data_spec(obj, DataObject)

def get_spec(self):
"""
Expand Down
20 changes: 16 additions & 4 deletions SpiffWorkflow/bpmn/parser/node_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ def parse_incoming_data_references(self):
specs = []
for name in self.xpath('./bpmn:dataInputAssociation/bpmn:sourceRef'):
ref = first(self.doc_xpath(f".//bpmn:dataObjectReference[@id='{name.text}']"))
if ref is not None and ref.get('dataObjectRef') in self.process_parser.spec.data_objects:
specs.append(self.process_parser.spec.data_objects[ref.get('dataObjectRef')])
data_obj = self._resolve_data_object_ref(ref)
if data_obj is not None:
specs.append(data_obj)
else:
ref = first(self.doc_xpath(f".//bpmn:dataStoreReference[@id='{name.text}']"))
if ref is not None and ref.get('dataStoreRef') in self.process_parser.data_stores:
Expand All @@ -96,8 +97,9 @@ def parse_outgoing_data_references(self):
specs = []
for name in self.xpath('./bpmn:dataOutputAssociation/bpmn:targetRef'):
ref = first(self.doc_xpath(f".//bpmn:dataObjectReference[@id='{name.text}']"))
if ref is not None and ref.get('dataObjectRef') in self.process_parser.spec.data_objects:
specs.append(self.process_parser.spec.data_objects[ref.get('dataObjectRef')])
data_obj = self._resolve_data_object_ref(ref)
if data_obj is not None:
specs.append(data_obj)
else:
ref = first(self.doc_xpath(f".//bpmn:dataStoreReference[@id='{name.text}']"))
if ref is not None and ref.get('dataStoreRef') in self.process_parser.data_stores:
Expand All @@ -124,6 +126,16 @@ def parse_io_spec(self):
outputs.append(data_refs[ref.text])
return BpmnIoSpecification(inputs, outputs)

def _resolve_data_object_ref(self, ref):
if ref is not None:
current = self.process_parser
while current is not None:
data_obj = current.spec.data_objects.get(ref.get('dataObjectRef'))
if data_obj is None:
current = self.process_parser.parent
else:
return data_obj

def create_data_spec(self, item, cls):
return cls(item.attrib.get('id'), item.attrib.get('name'))

Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/parser/task_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def get_subprocess_spec(task_parser):
spec_id = task_parser.node.get('id')
# This parser makes me want to cry
spec_parser = task_parser.process_parser.parser.process_parsers[spec_id]
spec_parser.inherited_data_objects.update(task_parser.process_parser.spec.data_objects)
spec_parser.parent = task_parser.process_parser
return spec_id

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/bpmn_task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self, *args):
super(_BpmnCondition, self).__init__(*args)

def _matches(self, task):
return task.workflow.script_engine.evaluate(task, self.args[0], external_context=task.workflow.data)
return task.workflow.script_engine.evaluate(task, self.args[0], external_context=task.workflow.data_objects)


class BpmnIoSpecification:
Expand Down
19 changes: 16 additions & 3 deletions SpiffWorkflow/bpmn/specs/data_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,31 @@ class DataObject(BpmnDataSpecification):

def get(self, my_task):
"""Copy a value form the workflow data to the task data."""
if self.bpmn_id not in my_task.workflow.data:

# Find the spec where the data object is defined and put it there
wf = my_task.workflow
while wf is not None and self.bpmn_id not in wf.spec.data_objects:
wf = wf.parent_workflow

if wf is None or self.bpmn_id not in wf.data_objects:
message = f"The data object could not be read; '{self.bpmn_id}' does not exist in the process."
raise WorkflowDataException(message, my_task, data_input=self)
my_task.data[self.bpmn_id] = deepcopy(my_task.workflow.data[self.bpmn_id])

my_task.data[self.bpmn_id] = deepcopy(wf.data_objects[self.bpmn_id])
data_log.info(f'Read workflow variable {self.bpmn_id}', extra=my_task.log_info())

def set(self, my_task):
"""Copy a value from the task data to the workflow data"""

if self.bpmn_id not in my_task.data:
message = f"A data object could not be set; '{self.bpmn_id}' not exist in the task."
raise WorkflowDataException(message, my_task, data_output=self)
my_task.workflow.data[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id])

wf = my_task.workflow
while wf is not None and self.bpmn_id not in wf.spec.data_objects:
wf = wf.parent_workflow

wf.data_objects[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id])
del my_task.data[self.bpmn_id]
data_log.info(f'Set workflow variable {self.bpmn_id}', extra=my_task.log_info())

Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/event_definitions/conditional.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ def __init__(self, expression, **kwargs):

def has_fired(self, my_task):
my_task._set_internal_data(
has_fired=my_task.workflow.script_engine.evaluate(my_task, self.expression, external_context=my_task.workflow.data)
has_fired=my_task.workflow.script_engine.evaluate(my_task, self.expression, external_context=my_task.workflow.data_objects)
)
return my_task._get_internal_data('has_fired', False)
6 changes: 0 additions & 6 deletions SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ def _on_cancel(self, my_task):
subworkflow.cancel()

def copy_data(self, my_task, subworkflow):
# There is only one copy of any given data object, so it should be updated immediately
# Doing this is actually a little problematic, because it gives parent processes access to
# data objects defined in subprocesses.
# But our data management is already hopelessly messed up and in dire needs of reconsideration
if len(subworkflow.spec.data_objects) > 0:
subworkflow.data = my_task.workflow.data
start = subworkflow.get_next_task(spec_name='Start')
start.set_data(**my_task.data)

Expand Down
14 changes: 13 additions & 1 deletion SpiffWorkflow/bpmn/util/subworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,19 @@
from SpiffWorkflow import Workflow
from SpiffWorkflow.exceptions import TaskNotFoundException

class BpmnSubWorkflow(Workflow):
class BpmnBaseWorkflow(Workflow):

def __init__(self, spec, **kwargs):
super().__init__(spec, **kwargs)
if len(spec.data_objects) > 0:
self.data['data_objects'] = {}

@property
def data_objects(self):
return self.data.get('data_objects', {})


class BpmnSubWorkflow(BpmnBaseWorkflow):

def __init__(self, spec, parent_task_id, top_workflow, **kwargs):
super().__init__(spec, **kwargs)
Expand Down
5 changes: 2 additions & 3 deletions SpiffWorkflow/bpmn/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from SpiffWorkflow.task import Task
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.workflow import Workflow
from SpiffWorkflow.exceptions import WorkflowException

from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent
Expand All @@ -28,13 +27,13 @@

from SpiffWorkflow.bpmn.specs.control import BoundaryEventSplit

from SpiffWorkflow.bpmn.util.subworkflow import BpmnSubWorkflow
from SpiffWorkflow.bpmn.util.subworkflow import BpmnBaseWorkflow, BpmnSubWorkflow
from SpiffWorkflow.bpmn.util.task import BpmnTaskIterator

from .script_engine.python_engine import PythonScriptEngine


class BpmnWorkflow(Workflow):
class BpmnWorkflow(BpmnBaseWorkflow):
"""
The engine that executes a BPMN workflow. This specialises the standard
Spiff Workflow class with a few extra methods and attributes.
Expand Down
8 changes: 4 additions & 4 deletions tests/SpiffWorkflow/bpmn/DataObjectTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def testMissingDataInput(self):

# Remove the data before advancing
ready_tasks = self.get_ready_user_tasks()
self.workflow.data.pop('obj_1')
self.workflow.data_objects.pop('obj_1')
with self.assertRaises(WorkflowDataException) as exc:
ready_tasks[0].run()
self.assertEqual(exc.data_output.name, 'obj_1')
Expand All @@ -51,7 +51,7 @@ def actual_test(self, save_restore):
ready_tasks[0].run()
# After task completion, obj_1 should be copied out of the task into the workflow
self.assertNotIn('obj_1', ready_tasks[0].data)
self.assertIn('obj_1', self.workflow.data)
self.assertIn('obj_1', self.workflow.data_objects)

if save_restore:
self.save_restore()
Expand All @@ -70,7 +70,7 @@ def actual_test(self, save_restore):
# We did not set an output data reference so obj_1 should remain unchanged in the workflow data
# and be removed from the task data
self.assertNotIn('obj_1', ready_tasks[0].data)
self.assertEqual(self.workflow.data['obj_1'], 'hello')
self.assertEqual(self.workflow.data_objects['obj_1'], 'hello')

if save_restore:
self.save_restore()
Expand All @@ -86,7 +86,7 @@ def actual_test(self, save_restore):
# It was copied out
self.assertNotIn('obj_1', sp.data)
# The update should persist in the main process
self.assertEqual(self.workflow.data['obj_1'], 'hello again')
self.assertEqual(self.workflow.data_objects['obj_1'], 'hello again')

class DataObjectGatewayTest(BpmnWorkflowTestCase):

Expand Down
4 changes: 2 additions & 2 deletions tests/SpiffWorkflow/bpmn/events/ConditionalEventTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def testIntermediateEvent(self):
spec, subprocesses = self.load_workflow_spec('conditional_event.bpmn', 'intermediate')
self.workflow = BpmnWorkflow(spec, subprocesses)
# I don't want to complicate the diagram with extra tasks just for initializing this value
self.workflow.data['task_a_done'] = False
self.workflow.data_objects['task_a_done'] = False
self.workflow.do_engine_steps()
b = self.workflow.get_next_task(spec_name='task_b')
b.run()
Expand All @@ -29,7 +29,7 @@ def testIntermediateEvent(self):
def testBoundaryEvent(self):
spec, subprocesses = self.load_workflow_spec('conditional_event.bpmn', 'boundary')
self.workflow = BpmnWorkflow(spec, subprocesses)
self.workflow.data['task_c_done'] = False
self.workflow.data_objects['task_c_done'] = False
self.workflow.do_engine_steps()
c = self.workflow.get_next_task(spec_name='task_c')
c.data['task_c_done'] = True
Expand Down
8 changes: 7 additions & 1 deletion tests/SpiffWorkflow/spiff/PrescriptPostscriptTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ def testCallActivitySaveRestore(self):
self.call_activity_test(True)

def testDataObject(self):
self.test_data_object()

def testDataObjectSaveRestore(self):
self.test_data_object(True)

def test_data_object(self, save_restore=False):

spec, subprocesses = self.load_workflow_spec('prescript_postscript_data_object.bpmn', 'Process_1')
self.workflow = BpmnWorkflow(spec, subprocesses)
# Set a on the workflow and b in the first task.
self.workflow.data['a'] = 1
self.workflow.data_objects['a'] = 1
self.set_process_data({'b': 2})
ready_tasks = self.workflow.get_tasks(state=TaskState.READY)
# This execute the same script as task_test
Expand Down
3 changes: 1 addition & 2 deletions tests/SpiffWorkflow/spiff/ServiceTaskVariableTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ def setUp(self):
global assertEqual
assertEqual = self.assertEqual

spec, subprocesses = self.load_workflow_spec('service_task_variable.bpmn',
'Process_bd2e724555')
spec, subprocesses = self.load_workflow_spec('service_task_variable.bpmn', 'Process_bd2e724555')
self.script_engine = ExampleCustomScriptEngine()
self.workflow = BpmnWorkflow(spec, subprocesses, script_engine=self.script_engine)

Expand Down
1 change: 0 additions & 1 deletion tests/SpiffWorkflow/spiff/data/service_task_variable.bpmn
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
<bpmn:incoming>Flow_1boxww6</bpmn:incoming>
<bpmn:outgoing>Flow_1h9lfz7</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:dataObject id="DataObject_0zzgapz" />
<bpmn:sequenceFlow id="Flow_1boxww6" sourceRef="Activity_1shyq8p" targetRef="Activity_0xhr131" />
<bpmn:scriptTask id="Activity_1shyq8p" name="Set Employee ID">
<bpmn:incoming>Flow_1tqygmt</bpmn:incoming>
Expand Down

0 comments on commit 9c74ab2

Please sign in to comment.