Skip to content

Commit

Permalink
Merge pull request #5804 from cylc/8.2.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.2.x-sync into master
  • Loading branch information
oliver-sanders authored Nov 13, 2023
2 parents 4b93b5a + e5339e4 commit 869edf8
Show file tree
Hide file tree
Showing 18 changed files with 1,225 additions and 260 deletions.
1 change: 1 addition & 0 deletions changes.d/5660.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Re-worked graph n-window algorithm for better efficiency.
3 changes: 3 additions & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ message PbWorkflow {
optional bool pruned = 37;
optional int32 is_runahead_total = 38;
optional bool states_updated = 39;
optional int32 n_edge_distance = 40;
}

// Selected runtime fields
Expand Down Expand Up @@ -227,6 +228,7 @@ message PbTaskProxy {
optional bool is_runahead = 26;
optional bool flow_wait = 27;
optional PbRuntime runtime = 28;
optional int32 graph_depth = 29;
}

message PbFamily {
Expand Down Expand Up @@ -264,6 +266,7 @@ message PbFamilyProxy {
optional bool is_runahead = 19;
optional int32 is_runahead_total = 20;
optional PbRuntime runtime = 21;
optional int32 graph_depth = 22;
}

message PbEdge {
Expand Down
141 changes: 70 additions & 71 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

669 changes: 486 additions & 183 deletions cylc/flow/data_store_mgr.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ def node_filter(node, node_type, args, state):
args.get('maxdepth', -1) < 0
or node.depth <= args['maxdepth']
)
and (
args.get('graph_depth', -1) < 0
or node.graph_depth <= args['graph_depth']
)
# Now filter node against id arg lists
and (
not args.get('ids')
Expand Down
21 changes: 19 additions & 2 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class SortArgs(InputObjectType):
'is_runahead': Boolean(),
'mindepth': Int(default_value=-1),
'maxdepth': Int(default_value=-1),
'graph_depth': Int(default_value=-1),
'sort': SortArgs(default_value=None),
}

Expand All @@ -218,6 +219,7 @@ class SortArgs(InputObjectType):
'is_runahead': Boolean(),
'mindepth': Int(default_value=-1),
'maxdepth': Int(default_value=-1),
'graph_depth': Int(default_value=-1),
'sort': SortArgs(default_value=None),
}

Expand All @@ -226,8 +228,6 @@ class SortArgs(InputObjectType):
'exids': graphene.List(ID, default_value=[]),
'states': graphene.List(String, default_value=[]),
'exstates': graphene.List(String, default_value=[]),
'mindepth': Int(default_value=-1),
'maxdepth': Int(default_value=-1),
'sort': SortArgs(default_value=None),
}

Expand Down Expand Up @@ -785,6 +785,12 @@ class Meta:
description='Any active workflow broadcasts.'
)
pruned = Boolean() # TODO: what is this? write description
n_edge_distance = Int(
description=sstrip('''
The maximum graph distance (n) from an active node
of the data-store graph window.
'''),
)


class RuntimeSetting(ObjectType):
Expand Down Expand Up @@ -1067,6 +1073,11 @@ class Meta:
depth = Int(
description='The family inheritance depth',
)
graph_depth = Int(
description=sstrip('''
The n-window graph edge depth from closet active task(s).
'''),
)
job_submits = Int(
description='The number of job submissions for this task instance.',
)
Expand Down Expand Up @@ -1217,6 +1228,12 @@ class Meta:
is_runahead = Boolean()
is_runahead_total = Int()
depth = Int()
graph_depth = Int(
description=sstrip('''
The n-window graph edge smallest child task/family depth
from closet active task(s).
'''),
)
child_tasks = graphene.List(
TaskProxy,
description="""Descendant task proxies.""",
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,13 +840,14 @@ def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]':

return point_itasks

def get_task(self, point, name):
def get_task(self, point, name) -> Optional[TaskProxy]:
"""Retrieve a task from the pool."""
rel_id = f'{point}/{name}'
for pool in (self.main_pool, self.hidden_pool):
tasks = pool.get(point)
if tasks and rel_id in tasks:
return tasks[rel_id]
return None

def _get_hidden_task_by_id(self, id_: str) -> Optional[TaskProxy]:
"""Return runahead pool task by ID if it exists, or None."""
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ def __init__(
self.state = TaskState(tdef, self.point, status, is_held)

# Determine graph children of this task (for spawning).
self.graph_children = generate_graph_children(tdef, self.point)
if data_mode:
self.graph_children = {}
else:
self.graph_children = generate_graph_children(tdef, self.point)

def __repr__(self) -> str:
return f"<{self.__class__.__name__} '{self.tokens}'>"
Expand Down
4 changes: 4 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ exclude= cylc/flow/etc/tutorial/.*
# Suppress the following messages:
# By default the bodies of untyped functions are not checked, consider using --check-untyped-defs
disable_error_code = annotation-unchecked

# For some reason, couldn't exclude this with the exclude directive above
[mypy-cylc.flow.data_messages_pb2]
ignore_errors = True
2 changes: 2 additions & 0 deletions tests/functional/cylc-show/06-past-present-future/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
cylc stop --now --max-polls=10 --interval=1 $CYLC_WORKFLOW_ID
false
else
# Allow time for c submission => running
sleep 2
cylc show "$CYLC_WORKFLOW_ID//1/b" >> $CYLC_WORKFLOW_RUN_DIR/show-b.txt
cylc show "$CYLC_WORKFLOW_ID//1/c" >> $CYLC_WORKFLOW_RUN_DIR/show-c.txt
cylc show "$CYLC_WORKFLOW_ID//1/d" >> $CYLC_WORKFLOW_RUN_DIR/show-d.txt
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/graphql/01-workflow.t
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ query {
oldestActiveCyclePoint
reloaded
runMode
nEdgeDistance
stateTotals
workflowLogDir
timeZoneInfo {
Expand Down Expand Up @@ -96,6 +97,7 @@ cmp_json "${TEST_NAME}-out" "${TEST_NAME_BASE}-workflows.stdout" << __HERE__
"oldestActiveCyclePoint": "20210101T0000Z",
"reloaded": false,
"runMode": "live",
"nEdgeDistance": 1,
"stateTotals": {
"waiting": 1,
"expired": 0,
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/graphql/03-is-held-arg.t
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ query {
workflows {
name
isHeldTotal
taskProxies(isHeld: true) {
taskProxies(isHeld: true, graphDepth: 1) {
id
jobs {
submittedTime
startedTime
}
}
familyProxies(exids: [\"*/root\"], isHeld: true) {
familyProxies(exids: [\"*/root\"], isHeld: true, graphDepth: 1) {
id
}
}
Expand Down
66 changes: 66 additions & 0 deletions tests/functional/n-window/01-past-present-future.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test window size using graphql and cylc-show for all tasks.

. "$(dirname "$0")/test_header"

set_test_number 7

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

TEST_NAME="${TEST_NAME_BASE}-validate"
run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"

TEST_NAME="${TEST_NAME_BASE}-run"
# 'a => b => c => d => e', 'a' sets window size to 2, 'c' uses cylc show on all.
workflow_run_ok "${TEST_NAME}" cylc play --no-detach --debug "${WORKFLOW_NAME}"

TEST_NAME="${TEST_NAME_BASE}-show-a.past"
contains_ok "$WORKFLOW_RUN_DIR/show-a.txt" <<__END__
state: succeeded
prerequisites: (None)
__END__

TEST_NAME="${TEST_NAME_BASE}-show-b.past"
contains_ok "$WORKFLOW_RUN_DIR/show-b.txt" <<__END__
state: succeeded
prerequisites: (n/a for past tasks)
__END__

TEST_NAME="${TEST_NAME_BASE}-show-c.present"
contains_ok "${WORKFLOW_RUN_DIR}/show-c.txt" <<__END__
prerequisites: ('-': not satisfied)
+ 1/b succeeded
__END__

TEST_NAME="${TEST_NAME_BASE}-show-d.future"
contains_ok "${WORKFLOW_RUN_DIR}/show-d.txt" <<__END__
state: waiting
prerequisites: ('-': not satisfied)
- 1/c succeeded
__END__

TEST_NAME="${TEST_NAME_BASE}-show-e.future"
contains_ok "${WORKFLOW_RUN_DIR}/show-e.txt" <<__END__
state: waiting
prerequisites: ('-': not satisfied)
- 1/d succeeded
__END__

purge
41 changes: 41 additions & 0 deletions tests/functional/n-window/01-past-present-future/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[scheduler]
allow implicit tasks = True
[[events]]
inactivity timeout = PT1M
abort on inactivity timeout = True
[scheduling]
[[graph]]
R1 = """
a => b => c => d => e
"""
[runtime]
[[a]]
script = """
set +e
read -r -d '' gqlDoc <<_DOC_
{"request_string": "
mutation {
setGraphWindowExtent (
workflows: [\"${CYLC_WORKFLOW_ID}\"],
nEdgeDistance: 2) {
result
}
}",
"variables": null}
_DOC_
echo "${gqlDoc}"
cylc client "$CYLC_WORKFLOW_ID" graphql < <(echo ${gqlDoc}) 2>/dev/null
set -e
"""
[[c]]
script = """
cylc show "$CYLC_WORKFLOW_ID//1/a" >> $CYLC_WORKFLOW_RUN_DIR/show-a.txt
cylc show "$CYLC_WORKFLOW_ID//1/b" >> $CYLC_WORKFLOW_RUN_DIR/show-b.txt
cylc show "$CYLC_WORKFLOW_ID//1/c" >> $CYLC_WORKFLOW_RUN_DIR/show-c.txt
cylc show "$CYLC_WORKFLOW_ID//1/d" >> $CYLC_WORKFLOW_RUN_DIR/show-d.txt
cylc show "$CYLC_WORKFLOW_ID//1/e" >> $CYLC_WORKFLOW_RUN_DIR/show-e.txt
"""
56 changes: 56 additions & 0 deletions tests/functional/n-window/02-big-window.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test large window size using graphql and find tasks in window.
# This is helpful with coverage by using most the no-rewalk mechanics.

. "$(dirname "$0")/test_header"

set_test_number 5

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

TEST_NAME="${TEST_NAME_BASE}-validate"
run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"

TEST_NAME="${TEST_NAME_BASE}-run"
# 'a => b => . . . f => g => h', 'a' sets window size to 5,
# 'b => i => j => f', 'c' finds 'a', 'j', 'h'
workflow_run_ok "${TEST_NAME}" cylc play --no-detach --debug "${WORKFLOW_NAME}"

TEST_NAME="${TEST_NAME_BASE}-show-a.past"
contains_ok "$WORKFLOW_RUN_DIR/show-a.txt" <<__END__
state: succeeded
prerequisites: (None)
__END__

TEST_NAME="${TEST_NAME_BASE}-show-j.parallel"
contains_ok "${WORKFLOW_RUN_DIR}/show-j.txt" <<__END__
state: waiting
prerequisites: ('-': not satisfied)
- 1/i succeeded
__END__

TEST_NAME="${TEST_NAME_BASE}-show-h.future"
contains_ok "${WORKFLOW_RUN_DIR}/show-h.txt" <<__END__
state: waiting
prerequisites: ('-': not satisfied)
- 1/g succeeded
__END__

purge
52 changes: 52 additions & 0 deletions tests/functional/n-window/02-big-window/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[scheduler]
allow implicit tasks = True
[[events]]
inactivity timeout = PT1M
abort on inactivity timeout = True
[scheduling]
[[graph]]
R1 = """
a => b => c => d => e => f => g => h
b => i => j => f
"""
[runtime]
[[a]]
script = """
set +e
read -r -d '' gqlDoc <<_DOC_
{"request_string": "
mutation {
setGraphWindowExtent (
workflows: [\"${CYLC_WORKFLOW_ID}\"],
nEdgeDistance: 5) {
result
}
}",
"variables": null}
_DOC_
echo "${gqlDoc}"
cylc client "$CYLC_WORKFLOW_ID" graphql < <(echo ${gqlDoc}) 2>/dev/null
set -e
"""
[[c]]
script = """
cylc show "$CYLC_WORKFLOW_ID//1/a" >> $CYLC_WORKFLOW_RUN_DIR/show-a.txt
cylc show "$CYLC_WORKFLOW_ID//1/j" >> $CYLC_WORKFLOW_RUN_DIR/show-j.txt
cylc show "$CYLC_WORKFLOW_ID//1/h" >> $CYLC_WORKFLOW_RUN_DIR/show-h.txt
"""

[[i]]
script = """
# Slow 2nd branch down
sleep 5
"""

[[f]]
script = """
# test re-trigger of old point
cylc trigger "$CYLC_WORKFLOW_ID//1/b"
"""
1 change: 1 addition & 0 deletions tests/functional/n-window/test_header
Loading

0 comments on commit 869edf8

Please sign in to comment.