Skip to content

Commit

Permalink
outputs: handle tasks that have been removed from the config
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Apr 8, 2024
1 parent 103e1ac commit 05e6f82
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 3 deletions.
23 changes: 22 additions & 1 deletion cylc/flow/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,21 @@ def get_optional_outputs(
}


# a completion expression that considers the outputs complete if any final task
# output is received
FINAL_OUTPUT_COMPLETION = ' or '.join(
map(
trigger_to_completion_variable,
[
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_EXPIRED,
],
)
)


class TaskOutputs:
"""Represents a collection of outputs for a task.
Expand Down Expand Up @@ -371,8 +386,14 @@ def __iter__(self) -> Iterator[Tuple[str, str, bool]]:

def is_complete(self) -> bool:
"""Return True if the outputs are complete."""
# NOTE: If a task has been removed from the workflow via restart /
# reload, then it is possible for the completion expression to be blank
# (empty string). In this case, we consider the task outputs to be
# complete when any final output has been generated.
# See https://github.com/cylc/cylc-flow/pull/5067
expr = self._completion_expression or FINAL_OUTPUT_COMPLETION
return CompletionEvaluator(
self._completion_expression,
expr,
**{
self._message_to_compvar[message]: completed
for message, completed in self._completed.items()
Expand Down
71 changes: 69 additions & 2 deletions tests/integration/test_optional_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,28 @@
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.cycling.iso8601 import ISO8601Point
from cylc.flow.network.resolvers import TaskMsg
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_events_mgr import (
TaskEventsManager,
)
from cylc.flow.task_outputs import (
TASK_OUTPUTS,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED,
TASK_OUTPUT_FINISHED,
TASK_OUTPUT_SUCCEEDED,
get_completion_expression,
)
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUS_EXPIRED,
TASK_STATUS_PREPARING,
TASK_STATUS_RUNNING,
TASK_STATUS_WAITING,
TASK_STATUSES_ACTIVE,
)

if TYPE_CHECKING:
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.scheduler import Scheduler


def reset_outputs(itask: 'TaskProxy'):
Expand Down Expand Up @@ -441,3 +443,68 @@ async def test_clock_expiry(
# the third task should *not* be expired (it was a manual submit)
assert not three.state(TASK_STATUS_EXPIRED)
assert not three.state.outputs.is_message_complete(TASK_OUTPUT_EXPIRED)


async def test_removed_taskdef(
flow,
scheduler,
start,
):
"""It should handle tasks being removed from the config.
If the config of an active task is removed from the config by restart /
reload, then we must provide a fallback completion expression, otherwise
the expression will be blank (task has no required or optional outputs).
The fallback is to consider the outputs complete if *any* final output is
received. Since the task has been removed from the workflow its outputs
should be inconsequential.
See: https://github.com/cylc/cylc-flow/issues/5057
"""
id_ = flow({
'scheduling': {
'graph': {
'R1': 'a & z'
}
}
})

# start the workflow and mark the tasks as running
schd: 'Scheduler' = scheduler(id_)
async with start(schd):
for itask in schd.pool.get_tasks():
itask.state_reset(TASK_STATUS_RUNNING)
assert itask.state.outputs._completion_expression == 'succeeded'

# remove the task "z" from the config
id_ = flow({
'scheduling': {
'graph': {
'R1': 'a'
}
}
}, id_=id_)

# restart the workflow
schd: 'Scheduler' = scheduler(id_)
async with start(schd):
# 1/a:
# * is still in the config
# * is should still have a sensible completion expression
# * its outputs should be incomplete if the task fails
a_1 = schd.pool.get_task(IntegerPoint('1'), 'a')
assert a_1
assert a_1.state.outputs._completion_expression == 'succeeded'
a_1.state.outputs.set_message_complete(TASK_OUTPUT_FAILED)
assert not a_1.is_complete()

# 1/z:
# * is no longer in the config
# * should have a blank completion expression
# * its outputs should be completed by any final output
z_1 = schd.pool.get_task(IntegerPoint('1'), 'z')
assert z_1
assert z_1.state.outputs._completion_expression == ''
z_1.state.outputs.set_message_complete(TASK_OUTPUT_FAILED)
assert z_1.is_complete()

0 comments on commit 05e6f82

Please sign in to comment.