Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xtrigger efficiency fix. #5908

Merged
merged 7 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5908.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug causing redundant DB updates when many tasks depend on the same xtrigger.
2 changes: 2 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ class CylcWorkflowDAO:
["prereq_output", {"is_primary_key": True}],
["satisfied"],
],
# The xtriggers table holds the function signature and result of
# already-satisfied (the scheduler no longer needs to call them).
TABLE_XTRIGGERS: [
["signature", {"is_primary_key": True}],
["results"],
Expand Down
32 changes: 9 additions & 23 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@
self.workflow,
user=self.owner,
broadcast_mgr=self.broadcast_mgr,
workflow_db_mgr=self.workflow_db_mgr,
data_store_mgr=self.data_store_mgr,
proc_pool=self.proc_pool,
workflow_run_dir=self.workflow_run_dir,
Expand Down Expand Up @@ -1705,14 +1706,8 @@
await self.process_command_queue()
self.proc_pool.process()

# Tasks in the main pool that are waiting but not queued must be
# waiting on external dependencies, i.e. xtriggers or ext_triggers.
# For these tasks, call any unsatisfied xtrigger functions, and
# queue tasks that have become ready. (Tasks do not appear in the
# main pool at all until all other-task deps are satisfied, and are
# queued immediately on release from runahead limiting if they are
# not waiting on external deps).
housekeep_xtriggers = False
# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
Expand All @@ -1725,28 +1720,19 @@
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
# Call unsatisfied xtriggers if not already in-process.
# Results are returned asynchronously.
self.xtrigger_mgr.call_xtriggers_async(itask)
# Check for satisfied xtriggers, and queue if ready.
if self.xtrigger_mgr.check_xtriggers(
itask, self.workflow_db_mgr.put_xtriggers):
housekeep_xtriggers = True
if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

# Check for satisfied ext_triggers, and queue if ready.

if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
and self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)
and all(itask.is_ready_to_run())
):
self.broadcast_mgr.check_ext_triggers(

Check warning on line 1729 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1729

Added line #L1729 was not covered by tests
itask, self.ext_trigger_queue)

if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ def put_task_event_timers(self, task_events_mgr):

def put_xtriggers(self, sat_xtrig):
"""Put statements to update external triggers table."""
self.db_deletes_map[self.TABLE_XTRIGGERS].append({})
for sig, res in sat_xtrig.items():
self.db_inserts_map[self.TABLE_XTRIGGERS].append({
"signature": sig,
Expand Down
82 changes: 45 additions & 37 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@
import re
from copy import deepcopy
from time import time
from typing import Any, Dict, List, Optional, Tuple, Callable
from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING

from cylc.flow import LOG
from cylc.flow.exceptions import XtriggerConfigError
import cylc.flow.flags
from cylc.flow.hostuserutil import get_user
from cylc.flow.subprocpool import get_func
from cylc.flow.xtriggers.wall_clock import wall_clock

from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.subprocpool import get_func
if TYPE_CHECKING:
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager


class TemplateVariables(Enum):
Expand Down Expand Up @@ -185,6 +187,7 @@ class XtriggerManager:
Args:
workflow: workflow name
user: workflow owner
workflow_db_mgr: the DB Manager
broadcast_mgr: the Broadcast Manager
proc_pool: pool of Subprocesses
workflow_run_dir: workflow run directory
Expand All @@ -195,9 +198,10 @@ class XtriggerManager:
def __init__(
self,
workflow: str,
broadcast_mgr: BroadcastMgr,
data_store_mgr: DataStoreMgr,
proc_pool: SubProcPool,
broadcast_mgr: 'BroadcastMgr',
workflow_db_mgr: 'WorkflowDatabaseManager',
data_store_mgr: 'DataStoreMgr',
proc_pool: 'SubProcPool',
user: Optional[str] = None,
workflow_run_dir: Optional[str] = None,
workflow_share_dir: Optional[str] = None,
Expand Down Expand Up @@ -230,11 +234,15 @@ def __init__(
}

self.proc_pool = proc_pool
self.workflow_db_mgr = workflow_db_mgr
self.broadcast_mgr = broadcast_mgr
self.data_store_mgr = data_store_mgr
self.do_housekeeping = False

@staticmethod
def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None:
def validate_xtrigger(
label: str, fctx: 'SubFuncContext', fdir: str
) -> None:
"""Validate an Xtrigger function.

Args:
Expand Down Expand Up @@ -305,7 +313,7 @@ def validate_xtrigger(label: str, fctx: SubFuncContext, fdir: str) -> None:
f' {", ".join(t.value for t in deprecated_variables)}'
)

def add_trig(self, label: str, fctx: SubFuncContext, fdir: str) -> None:
def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
"""Add a new xtrigger function.

Check the xtrigger function exists here (e.g. during validation).
Expand Down Expand Up @@ -334,7 +342,7 @@ def load_xtrigger_for_restart(self, row_idx: int, row: Tuple[str, str]):
sig, results = row
self.sat_xtrig[sig] = json.loads(results)

def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False,
def _get_xtrigs(self, itask: 'TaskProxy', unsat_only: bool = False,
sigs_only: bool = False):
"""(Internal helper method.)

Expand All @@ -361,7 +369,9 @@ def _get_xtrigs(self, itask: TaskProxy, unsat_only: bool = False,
res.append((label, sig, ctx, satisfied))
return res

def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
def get_xtrig_ctx(
self, itask: 'TaskProxy', label: str
) -> 'SubFuncContext':
"""Get a real function context from the template.

Args:
Expand Down Expand Up @@ -412,7 +422,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
ctx.update_command(self.workflow_run_dir)
return ctx

def call_xtriggers_async(self, itask: TaskProxy):
def call_xtriggers_async(self, itask: 'TaskProxy'):
"""Call itask's xtrigger functions via the process pool...

...if previous call not still in-process and retry period is up.
Expand All @@ -421,16 +431,23 @@ def call_xtriggers_async(self, itask: TaskProxy):
itask: task proxy to check.
"""
for label, sig, ctx, _ in self._get_xtrigs(itask, unsat_only=True):
# Special case: quick synchronous clock check:
if sig.startswith("wall_clock"):
# Special case: quick synchronous clock check.
if wall_clock(*ctx.func_args, **ctx.func_kwargs):
if sig in self.sat_xtrig:
# Already satisfied, just update the task
itask.state.xtriggers[label] = True
elif wall_clock(*ctx.func_args, **ctx.func_kwargs):
# Newly satisfied
itask.state.xtriggers[label] = True
self.sat_xtrig[sig] = {}
self.data_store_mgr.delta_task_xtrigger(sig, True)
self.workflow_db_mgr.put_xtriggers({sig: {}})
LOG.info('xtrigger satisfied: %s = %s', label, sig)
self.do_housekeeping = True
continue
# General case: potentially slow asynchronous function call.
if sig in self.sat_xtrig:
# Already satisfied, just update the task
if not itask.state.xtriggers[label]:
itask.state.xtriggers[label] = True
res = {}
Expand All @@ -445,6 +462,8 @@ def call_xtriggers_async(self, itask: TaskProxy):
xtrigger_env
)
continue

# Call the function to check the unsatisfied xtrigger.
if sig in self.active:
# Already waiting on this result.
continue
Expand All @@ -457,8 +476,10 @@ def call_xtriggers_async(self, itask: TaskProxy):
self.active.append(sig)
self.proc_pool.put_command(ctx, callback=self.callback)

def housekeep(self, itasks: List[TaskProxy]):
"""Delete satisfied xtriggers no longer needed by any task.
def housekeep(self, itasks):
"""Forget satisfied xtriggers no longer needed by any task.

Check self.do_housekeeping before calling this method.

Args:
itasks: list of all task proxies.
Expand All @@ -469,8 +490,9 @@ def housekeep(self, itasks: List[TaskProxy]):
for sig in list(self.sat_xtrig):
if sig not in all_xtrig:
del self.sat_xtrig[sig]
self.do_housekeeping = False

def callback(self, ctx: SubFuncContext):
def callback(self, ctx: 'SubFuncContext'):
"""Callback for asynchronous xtrigger functions.

Record satisfaction status and function results dict.
Expand All @@ -489,23 +511,9 @@ def callback(self, ctx: SubFuncContext):
return
LOG.debug('%s: returned %s', sig, results)
if satisfied:
# Newly satisfied
self.data_store_mgr.delta_task_xtrigger(sig, True)
self.workflow_db_mgr.put_xtriggers({sig: results})
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig)
self.sat_xtrig[sig] = results

def check_xtriggers(
self,
itask: TaskProxy,
db_update_func: Callable[[dict], None]) -> bool:
"""Check if all of itasks' xtriggers have become satisfied.

Return True if satisfied, else False

Args:
itasks: task proxies to check
db_update_func: method to update xtriggers in the DB
"""
if itask.state.xtriggers_all_satisfied():
db_update_func(self.sat_xtrig)
return True
return False
self.do_housekeeping = True
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ tests =
pytest-asyncio>=0.17,!=0.23.*
pytest-cov>=2.8.0
pytest-xdist>=2
pytest-env>=0.6.2
pytest-mock>=3.7
pytest>=6
testfixtures>=6.11.0
towncrier>=23
Expand Down
1 change: 1 addition & 0 deletions tests/functional/xtriggers/02-persistence/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[scheduling]
initial cycle point = 2010
final cycle point = 2011
runahead limit = P0
[[xtriggers]]
x1 = faker(name="bob")
[[graph]]
Expand Down
54 changes: 54 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"""Tests for the behaviour of xtrigger manager.
"""

from pytest_mock import mocker

async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
"""Test that if an itask has 2 wall_clock triggers with different
Expand Down Expand Up @@ -65,3 +66,56 @@ async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
'clock_2': False,
'clock_3': False,
}


async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker):
"""
If multiple tasks depend on the same satisfied xtrigger, the DB mgr method
put_xtriggers should only be called once - when the xtrigger gets satisfied.

See [GitHub #5908](https://github.com/cylc/cylc-flow/pull/5908)

"""
task_point = 1588636800 # 2020-05-05
ten_years_ahead = 1904169600 # 2030-05-05
monkeypatch.setattr(
'cylc.flow.xtriggers.wall_clock.time',
lambda: ten_years_ahead - 1
)
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'initial cycle point': '2020-05-05',
'xtriggers': {
'clock_1': 'wall_clock()',
},
'graph': {
'R1': '@clock_1 => foo & bar'
}
}
})

schd = scheduler(id_)
spy = mocker.spy(schd.workflow_db_mgr, 'put_xtriggers')

async with start(schd):

# Call the clock trigger via its dependent tasks, to get it satisfied.
for task in schd.pool.get_tasks():
# (For clock triggers this is synchronous)
schd.xtrigger_mgr.call_xtriggers_async(task)

# It should now be satisfied.
assert task.state.xtriggers == {'clock_1': True}

# Check one put_xtriggers call only, not two.
assert spy.call_count == 1
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved

# Note on master prior to GH #5908 the call is made from the
# scheduler main loop when the two tasks become satisified,
# resulting in two calls to put_xtriggers. This test fails
# on master, but with call count 0 (not 2) because the main
# loop doesn't run in this test.

1 change: 1 addition & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def xtrigger_mgr() -> XtriggerManager:
workflow=workflow_name,
user=user,
proc_pool=Mock(put_command=lambda *a, **k: True),
workflow_db_mgr=Mock(housekeep=lambda *a, **k: True),
broadcast_mgr=Mock(put_broadcast=lambda *a, **k: True),
data_store_mgr=DataStoreMgr(
create_autospec(Scheduler, workflow=workflow_name, owner=user)
Expand Down
Loading
Loading