Skip to content

Commit

Permalink
Merge pull request #5715 from dwsutherland/reload-delta-flag
Browse files Browse the repository at this point in the history
Fix data-store reloaded flag
  • Loading branch information
hjoliver authored Oct 19, 2023
2 parents 07a330c + c808238 commit 38cedcd
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 36 deletions.
43 changes: 29 additions & 14 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,21 +531,23 @@ def initiate_data_model(self, reloaded=False):
self.generate_definition_elements()

# Update workflow statuses and totals (assume needed)
self.update_workflow()
self.update_workflow(True)

# Apply current deltas
self.batch_deltas()
self.apply_delta_batch()
# Clear deltas after application
self.clear_delta_store()
self.clear_delta_batch()

if not reloaded:
# Gather this batch of deltas for publish
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()
# Gather the store as batch of deltas for publishing
self.batch_deltas(True)
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()

self.updates_pending = False

# Clear deltas after application and publishing
self.clear_delta_store()
# Clear second batch after publishing
self.clear_delta_batch()

def generate_definition_elements(self):
Expand All @@ -563,6 +565,8 @@ def generate_definition_elements(self):
workflow.id = self.workflow_id
workflow.last_updated = update_time
workflow.stamp = f'{workflow.id}@{workflow.last_updated}'
# Treat play/restart as hard reload of definition.
workflow.reloaded = True

graph = workflow.edges
graph.leaves[:] = config.leaves
Expand Down Expand Up @@ -1494,7 +1498,7 @@ def insert_db_job(self, row_idx, row):
tp_delta.jobs.append(j_id)
self.updates_pending = True

def update_data_structure(self, reloaded=False):
def update_data_structure(self):
"""Workflow batch updates in the data structure."""
# load database history for flagged nodes
self.apply_task_proxy_db_history()
Expand All @@ -1521,11 +1525,7 @@ def update_data_structure(self, reloaded=False):
# Apply all deltas
self.apply_delta_batch()

if reloaded:
self.clear_delta_batch()
self.batch_deltas(reloaded=True)

if self.updates_pending or reloaded:
if self.updates_pending:
self.apply_delta_checksum()
# Gather this batch of deltas for publish
self.publish_deltas = self.get_publish_deltas()
Expand All @@ -1536,6 +1536,18 @@ def update_data_structure(self, reloaded=False):
self.clear_delta_batch()
self.clear_delta_store()

def update_workflow_states(self):
"""Batch workflow state updates."""

# update the workflow state in the data store
self.update_workflow()

# push out update deltas
self.batch_deltas()
self.apply_delta_batch()
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()

def prune_data_store(self):
"""Remove flagged nodes and edges not in the set of active paths."""

Expand Down Expand Up @@ -1808,7 +1820,7 @@ def set_graph_window_extent(self, n_edge_distance):
self.next_n_edge_distance = n_edge_distance
self.updates_pending = True

def update_workflow(self):
def update_workflow(self, reloaded=False):
"""Update workflow element status and state totals."""
# Create new message and copy existing message content
data = self.data[self.workflow_id]
Expand Down Expand Up @@ -1864,6 +1876,9 @@ def update_workflow(self):
w_delta.status_msg = status_msg
delta_set = True

if reloaded is not w_data.reloaded:
w_delta.reloaded = reloaded

if self.schd.pool.main_pool:
pool_points = set(self.schd.pool.main_pool)
oldest_point = str(min(pool_points))
Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ async def subscribe_delta(self, root, info, args):
workflow_id=w_id)
delta_store[DELTA_ADDED] = (
self.data_store_mgr.data[w_id])
delta_store[DELTA_ADDED][
WORKFLOW
].reloaded = True
deltas_queue.put(
(w_id, 'initial_burst', delta_store))
elif w_id in self.delta_store[sub_id]:
Expand Down
38 changes: 16 additions & 22 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ async def command_reload_workflow(self) -> None:
self._update_workflow_state()

# Re-initialise data model on reload
self.data_store_mgr.initiate_data_model(reloaded=True)
self.data_store_mgr.initiate_data_model(self.is_reloaded)

# Reset the remote init map to trigger fresh file installation
self.task_job_mgr.task_remote_mgr.remote_init_map.clear()
Expand Down Expand Up @@ -1548,7 +1548,7 @@ async def workflow_shutdown(self):

# Is the workflow ready to shut down now?
if self.pool.can_stop(self.stop_mode):
await self.update_data_structure(self.is_reloaded)
await self.update_data_structure()
self.proc_pool.close()
if self.stop_mode != StopMode.REQUEST_NOW_NOW:
# Wait for process pool to complete,
Expand Down Expand Up @@ -1767,7 +1767,7 @@ async def main_loop(self) -> None:

if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure(self.is_reloaded)
await self.update_data_structure()

if has_updated:
if not self.is_reloaded:
Expand Down Expand Up @@ -1838,37 +1838,31 @@ def _update_workflow_state(self):
A cut-down version of update_data_structure which only considers
workflow state changes e.g. status, status message, state totals, etc.
"""
# Publish any existing before potentially creating more
self._publish_deltas()
# update the workflow state in the data store
self.data_store_mgr.update_workflow()

# push out update deltas
self.data_store_mgr.batch_deltas()
self.data_store_mgr.apply_delta_batch()
self.data_store_mgr.apply_delta_checksum()
self.data_store_mgr.publish_deltas = (
self.data_store_mgr.get_publish_deltas()
)
self.server.publish_queue.put(
self.data_store_mgr.publish_deltas)

# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
self.data_store_mgr.update_workflow_states()
self._publish_deltas()

async def update_data_structure(self, reloaded: bool = False):
"""Update DB, UIS, Summary data elements"""
# Publish any existing before potentially creating more
self._publish_deltas()
# Collect/apply data store updates/deltas
self.data_store_mgr.update_data_structure(reloaded=reloaded)
# Publish updates:
self.data_store_mgr.update_data_structure()
self._publish_deltas()
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)

def _publish_deltas(self):
"""Publish pending deltas."""
if self.data_store_mgr.publish_pending:
self.data_store_mgr.publish_pending = False
self.server.publish_queue.put(
self.data_store_mgr.publish_deltas)
# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)

def check_workflow_timers(self):
"""Check timers, and abort or run event handlers as configured."""
Expand Down

0 comments on commit 38cedcd

Please sign in to comment.