Skip to content

Commit

Permalink
Revert the DB is_complete change - not needed.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Mar 7, 2024
1 parent 7fd5d72 commit 0e51d8e
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 133 deletions.
7 changes: 3 additions & 4 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ class CylcWorkflowDAO:
["status"],
["flow_wait", {"datatype": "INTEGER"}],
["is_manual_submit", {"datatype": "INTEGER"}],
["is_complete", {"datatype": "INTEGER"}]
],
TABLE_TASK_TIMEOUT_TIMERS: [
["cycle", {"is_primary_key": True}],
Expand Down Expand Up @@ -796,19 +795,19 @@ def select_prev_instances(
# Not an injection, simply putting the table name in the SQL query
# expression as a string constant local to this module.
stmt = ( # nosec
r"SELECT flow_nums,submit_num,flow_wait,is_complete FROM %(name)s"
r"SELECT flow_nums,submit_num,flow_wait,status FROM %(name)s"
r" WHERE name==? AND cycle==?"
) % {"name": self.TABLE_TASK_STATES}
ret = []
for flow_nums_str, submit_num, flow_wait, is_complete in (
for flow_nums_str, submit_num, flow_wait, status in (
self.connect().execute(stmt, (name, point,))
):
ret.append(
(
submit_num,
flow_wait == 1,
deserialise(flow_nums_str),
is_complete
status
)
)
return ret
Expand Down
168 changes: 70 additions & 98 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ def db_add_new_flow_rows(self, itask: TaskProxy) -> None:

def add_to_pool(self, itask) -> None:
"""Add a task to the pool."""

self.active_tasks.setdefault(itask.point, {})
self.active_tasks[itask.point][itask.identity] = itask
self.active_tasks_changed = True
Expand Down Expand Up @@ -1480,36 +1481,36 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:

return True

def _get_task_history(self, name, point, flow_nums):
"""Get history of previous submits for this task.
def _get_task_history(
self, name: str, point: 'PointBase', flow_nums: Set[int]
) -> Tuple[int, str, bool]:
"""Get history of previous submits for this task."""

"""
info = self.workflow_db_mgr.pri_dao.select_prev_instances(
name, str(point)
)
try:
submit_num = max(s[0] for s in info)
submit_num: int = max(s[0] for s in info)
except ValueError:
# never spawned before in any flow
submit_num = 0

prev_completed = 0 # did not complete in the flow
prev_flow_wait = False # did not wait in the flow
prev_status = TASK_STATUS_WAITING
prev_flow_wait = False

for _snum, f_wait, old_fnums, is_complete in info:
# is_complete: 0: False, 1: True, 2: unknown (8.2 back compat)
for _snum, f_wait, old_fnums, status in info:
if set.intersection(flow_nums, old_fnums):
# matching flows
prev_completed = is_complete
prev_status = status
prev_flow_wait = f_wait
if prev_completed:
if prev_status in TASK_STATUSES_FINAL:
# task finished
break

# Else continue: there may be multiple entries with flow
# overlap due to merges (they'll have have same snum and
# f_wait); keep going to find the complete one, if any.
# f_wait); keep going to find the finished one, if any.

return submit_num, prev_completed, prev_flow_wait
return submit_num, prev_status, prev_flow_wait

def _load_historical_outputs(self, itask):
"""Load a task's historical outputs from the DB."""
Expand All @@ -1532,75 +1533,57 @@ def spawn_task(
force: bool = False,
flow_wait: bool = False,
) -> Optional[TaskProxy]:
"""Spawn a task if not already completed for this flow, or if forced.
"""Return task proxy if not completed in this flow, or if forced.
If finished previously with flow wait, just try to spawn children.
The creates the task proxy but does not add it to the pool.
Note finished tasks may be incomplete, but we don't automatically
re-run incomplete tasks in the same flow.
If completed previously with flow wait, just try to spawn children.
For every task spawned, we need a DB lookup for submit number,
and flow-wait.
"""
if not self.can_be_spawned(name, point):
return None

submit_num, prev_completion, prev_flow_wait = (
submit_num, prev_status, prev_flow_wait = (
self._get_task_history(name, point, flow_nums)
)

if prev_completion == 2:
# BACK COMPAT - completion not recorded before 8.3.0
# This code block is for a very niche case: it's only
# used if a flow-wait task is encountered after restarting
# an 8.2 workflow with 8.3.

itask = self._get_task_proxy(
point,
self.config.get_taskdef(name),
flow_nums,
submit_num=submit_num,
flow_wait=flow_wait,
transient=True
)
if not itask:
return None

# update completed outputs from the DB
self._load_historical_outputs(itask)

prev_completed = itask.is_complete()
else:
prev_completed = prev_completion == 1

# If previously completed and children spawned there is nothing
# to do, unless forced.
if (
prev_completed and not prev_flow_wait
and not force
):
LOG.warning(
f"({point}/{name} already completed"
f" in {stringify_flow_nums(flow_nums, full=True)})"
)
return None

# If previously completed we just create a transient task proxy to use
# for spawning children, else (or if forced) run it again.
if force:
transient = False
else:
transient = prev_completed

itask = self._get_task_proxy(
itask = self._get_task_proxy_db_outputs(
point,
self.config.get_taskdef(name),
flow_nums,
status=prev_status,
submit_num=submit_num,
flow_wait=flow_wait,
transient=transient
)
if not itask:
if itask is None:
return None

if not transient:
if prev_status in TASK_STATUSES_FINAL:
# Task finished previously.
msg = f"[{point}/{name}:{prev_status}] already finished"
if itask.is_complete():
msg += " and completed"
itask.transient = True
else:
# revive as incomplete.
msg += " incomplete"

LOG.info(
f"{msg} {stringify_flow_nums(flow_nums, full=True)})"
)
if prev_flow_wait:
self._spawn_after_flow_wait(itask)

if itask.transient and not force:
return None

# (else not previously finishedr, so run it)

if not itask.transient:
if (name, point) in self.tasks_to_hold:
LOG.info(f"[{itask}] holding (as requested earlier)")
self.hold_active_task(itask)
Expand Down Expand Up @@ -1635,39 +1618,30 @@ def spawn_task(
for cycle, task, output in self.abs_outputs_done
])

if prev_flow_wait and prev_completed:
self._spawn_after_flow_wait(itask)
LOG.warning(
f"{itask.identity} already completed for flow"
f" {stringify_flow_nums(flow_nums, full=True)}"
)
return None

self.db_add_new_flow_rows(itask)
return itask

def _spawn_after_flow_wait(self, itask: TaskProxy) -> None:
LOG.info(
f"spawning children of {itask.identity} after flow wait"
)
LOG.info(f"[{itask}] spawning outputs after flow-wait")
self.spawn_on_all_outputs(itask, completed_only=True)
# update flow wait status in the DB
itask.flow_wait = False
# itask.flow_nums = orig_fnums
self.workflow_db_mgr.put_update_task_flow_wait(itask)
return None

def _get_task_proxy(
def _get_task_proxy_db_outputs(
self,
point: 'PointBase',
taskdef: 'TaskDef',
flow_nums: 'FlowNums',
status: str = TASK_STATUS_WAITING,
flow_wait: bool = False,
transient: bool = False,
is_manual_submit: bool = False,
submit_num: int = 0
submit_num: int = 0,
) -> Optional['TaskProxy']:
"""Spawn a task proxy and update its outputs from the DB. """
"""Spawn a task, update outputs from DB."""

if not self.can_be_spawned(taskdef.name, point):
return None
Expand All @@ -1677,29 +1651,25 @@ def _get_task_proxy(
taskdef,
point,
flow_nums,
status=status,
flow_wait=flow_wait,
submit_num=submit_num,
transient=transient,
is_manual_submit=is_manual_submit
)
if itask is None:
return None

if itask is not None:
# Update it with outputs that were already completed.
info = self.workflow_db_mgr.pri_dao.select_task_outputs(
itask.tdef.name, str(itask.point))
if not info:
self.db_add_new_flow_rows(itask)
spawn_kids = False
for outputs_str, fnums in info.items():
if flow_nums.intersection(fnums):
if itask.flow_wait:
spawn_kids = True
for msg in json.loads(outputs_str):
itask.state.outputs.set_completed_by_msg(msg)

if spawn_kids:
self._spawn_after_flow_wait(itask)

# Update it with outputs that were already completed.
info = self.workflow_db_mgr.pri_dao.select_task_outputs(
itask.tdef.name, str(itask.point))
if not info:
# (Note still need this if task not run before)
self.db_add_new_flow_rows(itask)
for outputs_str, fnums in info.items():
if flow_nums.intersection(fnums):
for msg in json.loads(outputs_str):
itask.state.outputs.set_completed_by_msg(msg)
return itask

def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]':
Expand Down Expand Up @@ -1817,8 +1787,10 @@ def set_prereqs_and_outputs(
self._set_prereqs_tdef(
point, tdef, prereqs, flow_nums, flow_wait)
else:
trans = self._get_task_proxy(
point, tdef, flow_nums, flow_wait, transient=True)
trans = self._get_task_proxy_db_outputs(
point, tdef, flow_nums,
flow_wait=flow_wait, transient=True
)
if trans is not None:
self._set_outputs_itask(trans, outputs)

Expand Down Expand Up @@ -2015,7 +1987,7 @@ def force_trigger_tasks(
if not self.can_be_spawned(name, point):
continue

submit_num, _, prev_fwait = self._get_task_history(
submit_num, _prev_status, prev_fwait = self._get_task_history(
name, point, flow_nums)

itask = TaskProxy(
Expand Down
42 changes: 38 additions & 4 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,19 @@
from cylc.flow.flow_mgr import stringify_flow_nums
from cylc.flow.platforms import get_platform
from cylc.flow.task_action_timer import TimerFlags
from cylc.flow.task_outputs import (
TASK_OUTPUT_FAILED,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_SUBMIT_FAILED
)
from cylc.flow.task_state import (
TaskState,
TASK_STATUS_WAITING,
TASK_STATUS_EXPIRED
TASK_STATUS_EXPIRED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_FAILED
)
from cylc.flow.taskdef import generate_graph_children
from cylc.flow.wallclock import get_unix_time_from_time_string as str2time
Expand Down Expand Up @@ -154,7 +163,7 @@ class TaskProxy:
preparation has completed.
.transient:
This is a transient proxy - not to be added to the task pool, but
used e.g. to spawn children, or to get task-specific infomation.
used e.g. to spawn children, or to get task-specific information.
Args:
tdef: The definition object of this task.
Expand Down Expand Up @@ -300,7 +309,7 @@ def __str__(self) -> str:
"""
id_ = self.identity
if self.transient:
return f"{id_}{stringify_flow_nums(self.flow_nums)}(transient)"
return f"{id_}{stringify_flow_nums(self.flow_nums)}"
if not self.state(TASK_STATUS_WAITING, TASK_STATUS_EXPIRED):
id_ += f"/{self.submit_num:02d}"
return (
Expand Down Expand Up @@ -555,9 +564,34 @@ def clock_expire(self) -> bool:
return False
return True

def is_finished(self) -> bool:
"""Return True if a final state achieved."""
return (
self.state(
TASK_STATUS_EXPIRED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED
)
)

def is_complete(self) -> bool:
"""Return True if complete or expired, else False."""
"""Return True if complete or expired."""
return (
self.state(TASK_STATUS_EXPIRED)
or not self.state.outputs.is_incomplete()
)

def set_state_by_outputs(self) -> None:
"""Set state according to which final output is completed."""
for output in (
TASK_OUTPUT_EXPIRED, TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_FAILED, TASK_OUTPUT_SUCCEEDED
):
if self.state.outputs.is_completed(output, output):
# This assumes status and output strings are the same:
self.state_reset(
status=output,
silent=True, is_queued=False, is_runahead=False
)
break
Loading

0 comments on commit 0e51d8e

Please sign in to comment.