Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consider task dependencies on shutdown #1110

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 39 additions & 24 deletions kopf/_core/reactor/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async def operator(
It is efficiently `spawn_tasks` + `run_tasks` with some safety.
"""
existing_tasks = await aiotasks.all_tasks()
operator_tasks = await spawn_tasks(
system_tasks, root_tasks = await spawn_tasks(
lifecycle=lifecycle,
indexers=indexers,
registry=registry,
Expand All @@ -135,7 +135,7 @@ async def operator(
memo=memo,
_command=_command,
)
await run_tasks(operator_tasks, ignored=existing_tasks)
await run_tasks(system_tasks, root_tasks, ignored=existing_tasks)


async def spawn_tasks(
Expand Down Expand Up @@ -196,7 +196,8 @@ async def spawn_tasks(
signal_flag: aiotasks.Future = asyncio.Future()
started_flag: asyncio.Event = asyncio.Event()
operator_paused = aiotoggles.ToggleSet(any)
tasks: MutableSequence[aiotasks.Task] = []
root_tasks: MutableSequence[aiotasks.Task] = []
system_tasks: MutableSequence[aiotasks.Task] = []

# Map kwargs into the settings object.
settings.peering.clusterwide = clusterwide
Expand All @@ -219,20 +220,21 @@ async def spawn_tasks(
posting.settings_var.set(settings)

# A few common background forever-running infrastructural tasks (irregular root tasks).
tasks.append(asyncio.create_task(
system_tasks.append(aiotasks.create_task(
name="stop-flag checker",
coro=_stop_flag_checker(
signal_flag=signal_flag,
stop_flag=stop_flag)))
tasks.append(asyncio.create_task(
system_tasks.append(aiotasks.create_task(
name="ultimate termination",
coro=_ultimate_termination(
settings=settings,
stop_flag=stop_flag)))
tasks.append(asyncio.create_task(
system_tasks.append(aiotasks.create_task(
name="startup/cleanup activities",
coro=_startup_cleanup_activities(
root_tasks=tasks, # used as a "live" view, populated later.
system_tasks=system_tasks, # used as a "live" view, populated later.
root_tasks=root_tasks, # used as a "live" view, populated later.
ready_flag=ready_flag,
started_flag=started_flag,
registry=registry,
Expand All @@ -242,15 +244,15 @@ async def spawn_tasks(
memo=memo))) # to purge & finalize the caches in the end.

# Kill all the daemons gracefully when the operator exits (so that they are not "hung").
tasks.append(aiotasks.create_guarded_task(
system_tasks.append(aiotasks.create_guarded_task(
name="daemon killer", flag=started_flag, logger=logger,
coro=daemons.daemon_killer(
settings=settings,
memories=memories,
operator_paused=operator_paused)))

# Keeping the credentials fresh and valid via the authentication handlers on demand.
tasks.append(aiotasks.create_guarded_task(
system_tasks.append(aiotasks.create_guarded_task(
name="credentials retriever", flag=started_flag, logger=logger,
coro=activities.authenticator(
registry=registry,
Expand All @@ -261,7 +263,7 @@ async def spawn_tasks(

# K8s-event posting. Events are queued in-memory and posted in the background.
# NB: currently, it is a global task, but can be made per-resource or per-object.
tasks.append(aiotasks.create_guarded_task(
root_tasks.append(aiotasks.create_guarded_task(
name="poster of events", flag=started_flag, logger=logger,
coro=posting.poster(
settings=settings,
Expand All @@ -270,7 +272,7 @@ async def spawn_tasks(

# Liveness probing -- so that Kubernetes would know that the operator is alive.
if liveness_endpoint:
tasks.append(aiotasks.create_guarded_task(
root_tasks.append(aiotasks.create_guarded_task(
name="health reporter", flag=started_flag, logger=logger,
coro=probing.health_reporter(
registry=registry,
Expand All @@ -282,19 +284,19 @@ async def spawn_tasks(
# Admission webhooks run as either a server or a tunnel or a fixed config.
# The webhook manager automatically adjusts the cluster configuration at runtime.
container: aiovalues.Container[reviews.WebhookClientConfig] = aiovalues.Container()
tasks.append(aiotasks.create_guarded_task(
root_tasks.append(aiotasks.create_guarded_task(
name="admission insights chain", flag=started_flag, logger=logger,
coro=aiobindings.condition_chain(
source=insights.revised, target=container.changed)))
tasks.append(aiotasks.create_guarded_task(
root_tasks.append(aiotasks.create_guarded_task(
name="admission validating configuration manager", flag=started_flag, logger=logger,
coro=admission.validating_configuration_manager(
container=container, settings=settings, registry=registry, insights=insights)))
tasks.append(aiotasks.create_guarded_task(
root_tasks.append(aiotasks.create_guarded_task(
name="admission mutating configuration manager", flag=started_flag, logger=logger,
coro=admission.mutating_configuration_manager(
container=container, settings=settings, registry=registry, insights=insights)))
tasks.append(aiotasks.create_guarded_task(
root_tasks.append(aiotasks.create_guarded_task(
name="admission webhook server", flag=started_flag, logger=logger,
coro=admission.admission_webhook_server(
container=container, settings=settings, registry=registry, insights=insights,
Expand All @@ -305,13 +307,13 @@ async def spawn_tasks(

# Permanent observation of what resource kinds and namespaces are available in the cluster.
# Spawn and cancel dimensional tasks as they come and go; dimensions = resources x namespaces.
tasks.append(aiotasks.create_guarded_task(
root_tasks.append(aiotasks.create_guarded_task(
name="resource observer", flag=started_flag, logger=logger,
coro=observation.resource_observer(
insights=insights,
registry=registry,
settings=settings)))
tasks.append(aiotasks.create_guarded_task(
root_tasks.append(aiotasks.create_guarded_task(
name="namespace observer", flag=started_flag, logger=logger,
coro=observation.namespace_observer(
clusterwide=clusterwide,
Expand All @@ -322,11 +324,11 @@ async def spawn_tasks(
# Explicit command is a hack for the CLI to run coroutines in an operator-like environment.
# If not specified, then use the normal resource processing. It is not exposed publicly (yet).
if _command is not None:
tasks.append(aiotasks.create_guarded_task(
root_tasks.append(aiotasks.create_guarded_task(
name="the command", flag=started_flag, logger=logger, finishable=True,
coro=_command))
else:
tasks.append(aiotasks.create_guarded_task(
root_tasks.append(aiotasks.create_guarded_task(
name="multidimensional multitasker", flag=started_flag, logger=logger,
coro=orchestration.ochestrator(
settings=settings,
Expand Down Expand Up @@ -357,10 +359,11 @@ async def spawn_tasks(
else:
logger.warning("OS signals are ignored: running not in the main thread.")

return tasks
return system_tasks, root_tasks


async def run_tasks(
system_tasks: Collection[aiotasks.Task],
root_tasks: Collection[aiotasks.Task],
*,
ignored: Collection[aiotasks.Task] = frozenset(),
Expand Down Expand Up @@ -391,16 +394,25 @@ async def run_tasks(
# If the operator is cancelled, propagate the cancellation to all the sub-tasks.
# There is no graceful period: cancel as soon as possible, but allow them to finish.
try:
root_done, root_pending = await aiotasks.wait(root_tasks, return_when=asyncio.FIRST_COMPLETED)
# Start both system and root tasks together.
tasks = system_tasks + root_tasks
tasks_done, tasks_pending = await aiotasks.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
except asyncio.CancelledError:
# First stop regular root tasks.
await aiotasks.stop(root_tasks, title="Root", logger=logger, cancelled=True, interval=10)
# Then stop system tasks on which root tasks depend.
await aiotasks.stop(system_tasks, title="System", logger=logger, cancelled=True, interval=10)
hung_tasks = await aiotasks.all_tasks(ignored=ignored)
await aiotasks.stop(hung_tasks, title="Hung", logger=logger, cancelled=True, interval=1)
raise

# If the operator is intact, but one of the root tasks has exited (successfully or not),
# cancel all the remaining root tasks, and gracefully exit other spawned sub-tasks.
root_pending = [task for task in tasks_pending if task not in system_tasks]
root_cancelled, _ = await aiotasks.stop(root_pending, title="Root", logger=logger)
system_pending = [task for task in tasks_pending if task not in root_tasks]
system_cancelled, _ = await aiotasks.stop(system_pending, title="System", logger=logger)
tasks_cancelled = root_cancelled | system_cancelled

# After the root tasks are all gone, cancel any spawned sub-tasks (e.g. handlers).
# If the operator is cancelled, propagate the cancellation to all the sub-tasks.
Expand All @@ -416,7 +428,7 @@ async def run_tasks(
hung_cancelled, _ = await aiotasks.stop(hung_pending, title="Hung", logger=logger, interval=1)

# If succeeded or if cancellation is silenced, re-raise from failed tasks (if any).
await aiotasks.reraise(root_done | root_cancelled | hung_done | hung_cancelled)
await aiotasks.reraise(tasks_done | tasks_cancelled | hung_done | hung_cancelled)


async def _stop_flag_checker(
Expand Down Expand Up @@ -477,6 +489,7 @@ async def _ultimate_termination(


async def _startup_cleanup_activities(
system_tasks: Sequence[aiotasks.Task], # mutated externally!
root_tasks: Sequence[aiotasks.Task], # mutated externally!
ready_flag: Optional[aioadapters.Flag],
started_flag: asyncio.Event,
Expand Down Expand Up @@ -528,8 +541,10 @@ async def _startup_cleanup_activities(
# Beware: on explicit operator cancellation, there is no graceful period at all.
try:
current_task = asyncio.current_task()
awaited_tasks = {task for task in root_tasks if task is not current_task}
await aiotasks.wait(awaited_tasks)
awaited_root_tasks = {task for task in root_tasks if task is not current_task}
await aiotasks.wait(awaited_root_tasks)
awaited_system_tasks = {task for task in system_tasks if task is not current_task}
await aiotasks.wait(awaited_system_tasks)
except asyncio.CancelledError:
logger.warning("Cleanup activity is not executed at all due to cancellation.")
raise
Expand Down