Skip to content

Commit

Permalink
Log outputs, not task messages, for unmatched prerequisites
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Mar 7, 2024
1 parent 0e51d8e commit 59c92b6
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 24 deletions.
32 changes: 23 additions & 9 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1672,9 +1672,15 @@ def _get_task_proxy_db_outputs(
itask.state.outputs.set_completed_by_msg(msg)
return itask

def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]':
"""Convert prerequisites to task output messages."""
_prereqs = []
def _standardise_prereqs(
self, prereqs: 'List[str]'
) -> 'Dict[Tokens, str]':
"""Convert prerequisites to a map of task messages: outputs.
(So satsify_me logs failures)
"""
_prereqs = {}
for prereq in prereqs:
pre = Tokens(prereq, relative=True)
# add implicit "succeeded"; convert "succeed" to "succeeded" etc.
Expand All @@ -1690,12 +1696,13 @@ def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]':
LOG.warning(
f"output {pre.relative_id_with_selectors} not found")
continue
_prereqs.append(
_prereqs[
pre.duplicate(
task_sel=msg,
cycle=standardise_point_string(pre['cycle'])
)
)
] = prereq

return _prereqs

def _standardise_outputs(
Expand Down Expand Up @@ -1839,11 +1846,18 @@ def _set_prereqs_itask(
if prereqs == ["all"]:
itask.state.set_all_satisfied()
else:
if not itask.satisfy_me(
self._standardise_prereqs(prereqs)
):
# Attempt to set the given presrequisites.
# Log any that aren't valid for the task.
presus = self._standardise_prereqs(prereqs)
unmatched = itask.satisfy_me(list(presus.keys()))
for task_msg in unmatched:
LOG.warning(
f"{itask.identity} does not depend on"
f' "{presus[task_msg]}"'
)
if len(unmatched) == len(prereqs):
# No prereqs matched.
return False

if (
self.runahead_limit_point is not None
and itask.point <= self.runahead_limit_point
Expand Down
23 changes: 9 additions & 14 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
Counter as TypingCounter,
Dict,
List,
Iterable,
Optional,
Set,
TYPE_CHECKING,
Expand Down Expand Up @@ -536,23 +535,19 @@ def state_reset(

return False

def satisfy_me(self, outputs: 'Iterable[Tokens]') -> bool:
"""Try to satisfy my prerequisites with given outputs.
def satisfy_me(
self, task_messages: 'List[Tokens]'
) -> 'Set[Tokens]':
"""Try to satisfy my prerequisites with given output messages.
The output strings are of the form "cycle/task:message"
Log a warning for outputs that I don't depend on.
The task output messages are of the form "cycle/task:message"
Log a warning for messages that I don't depend on.
Return True if any match, else False.
Return a set of unmatched task messages.
"""
used = self.state.satisfy_me(outputs)
for output in set(outputs) - used:
# Note this logs the task message not the output.
LOG.warning(
f"{self.identity} does not depend on"
f' "{output.relative_id_with_selectors}"'
)
return bool(used)
used = self.state.satisfy_me(task_messages)
return set(task_messages) - used

def clock_expire(self) -> bool:
"""Return True if clock expire time is up, else False."""
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ async def test_set_prereqs(
schd.pool.set_prereqs_and_outputs(
["20400101T0000Z/qux"], None, ["20400101T0000Z/foo:a"], ['all'])
assert log_filter(
log, contains='20400101T0000Z/qux does not depend on "20400101T0000Z/foo:drugs and money"')
log, contains='20400101T0000Z/qux does not depend on "20400101T0000Z/foo:a"')

# it should not add 20400101T0000Z/qux to the pool
assert (
Expand Down

0 comments on commit 59c92b6

Please sign in to comment.