Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor workflow status #920

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions beeflow/common/gdb/gdb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,6 @@ def execute_workflow(self):
Set the initial tasks' states to 'READY'.
"""

@abstractmethod
def pause_workflow(self):
"""Pause execution of a running workflow.

Set workflow from state 'RUNNING' to 'PAUSED'.
"""

@abstractmethod
def resume_workflow(self):
"""Resume execution of a paused workflow.

Set workflow state from 'PAUSED' to 'RUNNING'.
"""

@abstractmethod
def reset_workflow(self, new_id):
"""Reset the execution state of a stored workflow.
Expand Down
24 changes: 0 additions & 24 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,30 +93,6 @@ def execute_workflow(self, workflow_id):
"""
self._write_transaction(tx.set_init_task_inputs, wf_id=workflow_id)
self._write_transaction(tx.set_runnable_tasks_to_ready, wf_id=workflow_id)
self._write_transaction(tx.set_workflow_state, state='RUNNING', wf_id=workflow_id)

def pause_workflow(self, workflow_id):
"""Pause execution of a running workflow in Neo4j.

Sets tasks with state 'RUNNING' to 'PAUSED'.

:param workflow_id: the workflow id
:type workflow_id: str

"""
with self._driver.session() as session:
session.write_transaction(tx.set_workflow_state, state='PAUSED', wf_id=workflow_id)

def resume_workflow(self, workflow_id):
"""Resume execution of a paused workflow in Neo4j.

Sets workflow state to 'RUNNING'

:param workflow_id: the workflow id
:type workflow_id: str
"""
with self._driver.session() as session:
session.write_transaction(tx.set_workflow_state, state='RUNNING', wf_id=workflow_id)

def reset_workflow(self, old_id, new_id):
"""Reset the execution state of an entire workflow.
Expand Down
8 changes: 0 additions & 8 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,6 @@ def execute_workflow(self):
"""Begin execution of a BEE workflow."""
self._gdb_driver.execute_workflow(self._workflow_id)

def pause_workflow(self):
"""Pause the execution of a BEE workflow."""
self._gdb_driver.pause_workflow(self._workflow_id)

def resume_workflow(self):
"""Resume the execution of a paused BEE workflow."""
self._gdb_driver.resume_workflow(self._workflow_id)

def reset_workflow(self, workflow_id):
"""Reset the execution state and ID of a BEE workflow."""
self._gdb_driver.reset_workflow(self._workflow_id, workflow_id)
Expand Down
16 changes: 0 additions & 16 deletions beeflow/tests/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ def __init__(self):
self._workflow_id = '42'
self._loaded = False

def pause_workflow(self):
"""Pause a workflow."""
return

def resume_workflow(self):
"""Resume a workflow."""
return

def reset_workflow(self, wf_id): #noqa
"""Reset a workflow."""
wf_id = 0 # noqa
Expand Down Expand Up @@ -149,14 +141,6 @@ def execute_workflow(self, workflow_id): #noqa not using parameter in mock
if self._is_ready(task_id):
self.task_states[task_id] = 'READY'

def pause_workflow(self, workflow_id): #noqa not using parameter in mock
"""Pause execution of a running workflow."""
self.workflow_state = 'PAUSED'

def resume_workflow(self, workflow_id): #noqa not using parameter in mock
"""Resume execution of a running workflow."""
self.workflow_state = 'RESUME'

def reset_workflow(self, old_id, new_id): #noqa not using parameter in mock
"""Reset the execution state and ID of a workflow."""
self.workflow = deepcopy(self.workflow)
Expand Down
34 changes: 0 additions & 34 deletions beeflow/tests/test_wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,40 +67,6 @@ def test_execute_workflow(self):
self.assertEqual("READY", self.wfi.get_task_state(tasks[0]))
self.assertEqual("RUNNING", self.wfi.get_workflow_state())

def test_pause_workflow(self):
"""Test workflow execution pausing (set running tasks' states to 'PAUSED')."""
workflow_id = generate_workflow_id()
self.wfi.initialize_workflow(Workflow(
"test_workflow", None, None,
[InputParameter("test_input", "File", "input.txt")],
[OutputParameter("test_output", "File", "output.txt", "viz/output")],
workflow_id))
self._create_test_tasks(workflow_id)

self.wfi.execute_workflow()

self.wfi.pause_workflow()

# Workflow state should now be 'PAUSED'
self.assertEqual("PAUSED", self.wfi.get_workflow_state())

def test_resume_workflow(self):
"""Test workflow execution resuming (set paused tasks' states to 'RUNNING')."""
workflow_id = generate_workflow_id()
self.wfi.initialize_workflow(Workflow(
"test_workflow", None, None,
[InputParameter("test_input", "File", "input.txt")],
[OutputParameter("test_output", "File", "output.txt", "viz/output")],
workflow_id))
self._create_test_tasks(workflow_id)

self.wfi.execute_workflow()
self.wfi.pause_workflow()
self.wfi.resume_workflow()

# Workflow state should now be 'RESUME'
self.assertEqual("RESUME", self.wfi.get_workflow_state())

def test_reset_workflow(self):
"""Test workflow execution resetting (set all tasks to 'WAITING', delete metadata)."""
workflow_id = generate_workflow_id()
Expand Down
25 changes: 17 additions & 8 deletions beeflow/tests/test_wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ def teardown_workflow():


@pytest.fixture()
def setup_teardown_workflow(teardown_workflow):
def setup_teardown_workflow(teardown_workflow, mocker, temp_db):
"""Set up and tear down for tests that use the workflow directory."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db)
wf_utils.create_workflow_dir(WF_ID)
wf_utils.create_wf_status(WF_ID)
yield
Expand Down Expand Up @@ -162,6 +166,7 @@ def test_start_workflow(client, mocker, temp_db):
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)
mocker.patch('beeflow.common.wf_interface.WorkflowInterface.get_workflow_state', 'Waiting')
mocker.patch('beeflow.wf_manager.resources.wf_utils.read_wf_status', return_value='Waiting')
resp = client().post(f'/bee_wfm/v1/jobs/{WF_ID}')
assert resp.status_code == 200

Expand All @@ -170,7 +175,7 @@ def test_workflow_status(client, mocker, setup_teardown_workflow, temp_db):
"""Test getting workflow status."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)
wf_name = 'wf'
workdir = 'dir'
Expand All @@ -188,7 +193,7 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db):
"""Test cancelling a workflow."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)

wf_name = 'wf'
Expand All @@ -208,7 +213,7 @@ def test_remove_workflow(client, mocker, setup_teardown_workflow, temp_db):
"""Test removing a workflow."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)

wf_name = 'wf'
Expand All @@ -230,10 +235,12 @@ def test_pause_workflow(client, mocker, setup_teardown_workflow, temp_db):
return_value=MockWFI())
mocker.patch('beeflow.tests.mocks.MockWFI.get_workflow_state',
return_value='RUNNING')
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db)
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_utils.read_wf_status', return_value='RUNNING')

wf_utils.update_wf_status(WF_ID, 'Running')
wf_utils.update_wf_status(WF_ID, 'RUNNING')
request = {'option': 'pause'}
resp = client().patch(f'/bee_wfm/v1/jobs/{WF_ID}', json=request)
assert resp.json['status'] == 'Workflow Paused'
Expand All @@ -246,13 +253,15 @@ def test_resume_workflow(client, mocker, setup_teardown_workflow, temp_db):
return_value=MockWFI())
mocker.patch('beeflow.tests.mocks.MockWFI.get_workflow_state',
return_value='PAUSED')
mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db)
mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_tm', return_value=None)
mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_scheduler', return_value=None)
mocker.patch('beeflow.wf_manager.resources.wf_utils.update_wf_status', return_value=None)
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_utils.read_wf_status', return_value='PAUSED')

wf_utils.update_wf_status(WF_ID, 'Paused')
wf_utils.update_wf_status(WF_ID, 'PAUSED')
request = {'option': 'resume'}
resp = client().patch(f'/bee_wfm/v1/jobs/{WF_ID}', json=request)
assert resp.json['status'] == 'Workflow Resumed'
Expand Down
16 changes: 3 additions & 13 deletions beeflow/wf_manager/resources/wf_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ def __init__(self):

def post(self, wf_id):
"""Start workflow. Send ready tasks to the task manager."""
db = connect_db(wfm_db, db_path)
if wf_utils.start_workflow(wf_id):
db.workflows.update_workflow_state(wf_id, 'Running')
resp = make_response(jsonify(msg='Started workflow!', status='ok'), 200)
else:
resp_body = jsonify(msg='Cannot start workflow it is {state.lower()}.', status='ok')
Expand All @@ -46,7 +44,7 @@ def get(wf_id):

for task in tasks:
tasks_status.append((task.id, task.name, task.state))
wf_status = db.workflows.get_workflow_state(wf_id)
wf_status = wf_utils.read_wf_status(wf_id)

resp = make_response(jsonify(tasks_status=tasks_status,
wf_status=wf_status, status='ok'), 200)
Expand All @@ -58,11 +56,8 @@ def delete(self, wf_id):
option = self.reqparse.parse_args()['option']
db = connect_db(wfm_db, db_path)
if option == "cancel":
wfi = wf_utils.get_workflow_interface(wf_id)
# Remove all tasks currently in the database
wfi.set_workflow_state('Cancelled')
wf_utils.update_wf_status(wf_id, 'Cancelled')
db.workflows.update_workflow_state(wf_id, 'Cancelled')
log.info(f"Workflow {wf_id} cancelled")
resp = make_response(jsonify(status='Cancelled'), 202)
elif option == "remove":
Expand All @@ -79,25 +74,20 @@ def delete(self, wf_id):

def patch(self, wf_id):
"""Pause or resume workflow."""
db = connect_db(wfm_db, db_path)
self.reqparse.add_argument('option', type=str, location='json')
option = self.reqparse.parse_args()['option']

wfi = wf_utils.get_workflow_interface(wf_id)
log.info('Pausing/resuming workflow')
wf_state = wfi.get_workflow_state()
wf_state = wf_utils.read_wf_status(wf_id)
if option == 'pause' and wf_state in ('RUNNING', 'INITIALIZING'):
wfi.pause_workflow()
wf_utils.update_wf_status(wf_id, 'Paused')
db.workflows.update_workflow_state(wf_id, 'Paused')
log.info(f"Workflow {wf_id} Paused")
resp = make_response(jsonify(status='Workflow Paused'), 200)
elif option == 'resume' and wf_state == 'PAUSED':
wfi.resume_workflow()
wf_utils.update_wf_status(wf_id, 'Running')
tasks = wfi.get_ready_tasks()
wf_utils.schedule_submit_tasks(wf_id, tasks)
wf_utils.update_wf_status(wf_id, 'Running')
db.workflows.update_workflow_state(wf_id, 'Running')
log.info(f"Workflow {wf_id} Resumed")
resp = make_response(jsonify(status='Workflow Resumed'), 200)
else:
Expand Down
12 changes: 7 additions & 5 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ def update_wf_status(wf_id, status_msg):
with open(status_path, 'w', encoding="utf8") as status:
status.write(status_msg)

db = connect_db(wfm_db, get_db_path())
db.workflows.update_workflow_state(wf_id, status_msg)

wfi = get_workflow_interface(wf_id)
wfi.set_workflow_state(status_msg)


def read_wf_status(wf_id):
"""Read workflow status metadata file."""
Expand Down Expand Up @@ -288,26 +294,22 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
db.workflows.add_task(task.id, wf_id, task.name, "WAITING")

update_wf_status(wf_id, 'Waiting')
db.workflows.update_workflow_state(wf_id, 'Waiting')
if no_start:
log.info('Not starting workflow, as requested')
else:
log.info('Starting workflow')
db.workflows.update_workflow_state(wf_id, 'Running')
start_workflow(wf_id)


def start_workflow(wf_id):
"""Attempt to start the workflow, returning True if successful."""
db = connect_db(wfm_db, get_db_path())
wfi = get_workflow_interface(wf_id)
state = wfi.get_workflow_state()
state = read_wf_status(wf_id)
if state in ('RUNNING', 'PAUSED', 'COMPLETED'):
return False
wfi.execute_workflow()
tasks = wfi.get_ready_tasks()
schedule_submit_tasks(wf_id, tasks)
wf_id = wfi.workflow_id
update_wf_status(wf_id, 'Running')
db.workflows.update_workflow_state(wf_id, 'Running')
return True
Loading