Skip to content

Commit

Permalink
Emit periodic keepalive events from Worker (#1191) (#1201)
Browse files Browse the repository at this point in the history
* new CLI arg and envvar for `Worker` mode to optionally emit regular keepalive events; fixes issues with container runtimes that assume long-silent stdout == hung process

---------

Co-authored-by: Alan Rominger <[email protected]>
(cherry picked from commit fd9d67a)
  • Loading branch information
nitzmahone authored Mar 1, 2023
1 parent 22c4555 commit 334c607
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 48 deletions.
14 changes: 14 additions & 0 deletions ansible_runner/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,15 @@ def main(sys_args=None):
"Using this will also assure that the directory is deleted when the job finishes."
)
)
worker_subparser.add_argument(
"--keepalive-seconds",
dest="keepalive_seconds",
default=None,
type=int,
help=(
"Emit a synthetic keepalive event every N seconds of idle. (default=0, disabled)"
)
)
process_subparser = subparser.add_parser(
'process',
help="Receive the output of remote ansible-runner work and distribute the results"
Expand Down Expand Up @@ -859,6 +868,7 @@ def main(sys_args=None):
limit=vargs.get('limit'),
streamer=streamer,
suppress_env_files=vargs.get("suppress_env_files"),
keepalive_seconds=vargs.get("keepalive_seconds"),
)
try:
res = run(**run_options)
Expand Down Expand Up @@ -887,3 +897,7 @@ def main(sys_args=None):
return 0
except OSError:
return 1


if __name__ == '__main__':
sys.exit(main())
4 changes: 3 additions & 1 deletion ansible_runner/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self,
process_isolation=False, process_isolation_executable=None,
container_image=None, container_volume_mounts=None, container_options=None, container_workdir=None, container_auth_data=None,
ident=None, rotate_artifacts=0, timeout=None, ssh_key=None, quiet=False, json_mode=False,
check_job_event_data=False, suppress_env_files=False):
check_job_event_data=False, suppress_env_files=False, keepalive_seconds=None):
# common params
self.host_cwd = host_cwd
self.envvars = envvars
Expand Down Expand Up @@ -95,6 +95,8 @@ def __init__(self,
self.timeout = timeout
self.check_job_event_data = check_job_event_data
self.suppress_env_files = suppress_env_files
# ignore this for now since it's worker-specific and would just trip up old runners
# self.keepalive_seconds = keepalive_seconds

# setup initial environment
if private_data_dir:
Expand Down
176 changes: 134 additions & 42 deletions ansible_runner/streaming.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations # allow newer type syntax until 3.10 is our minimum

import codecs
import json
import os
Expand All @@ -6,17 +8,16 @@
import tempfile
import uuid
import traceback
try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping

import ansible_runner
from ansible_runner.exceptions import ConfigurationError
from ansible_runner.loader import ArtifactLoader
import ansible_runner.plugins
from ansible_runner.utils import register_for_cleanup
from ansible_runner.utils.streaming import stream_dir, unstream_dir
from collections.abc import Mapping
from functools import wraps
from threading import Event, RLock, Thread


class UUIDEncoder(json.JSONEncoder):
Expand All @@ -38,6 +39,9 @@ def __init__(self, _output=None, **kwargs):
self._output = _output
self.private_data_dir = os.path.abspath(kwargs.pop('private_data_dir'))
self.only_transmit_kwargs = kwargs.pop('only_transmit_kwargs', False)
if 'keepalive_seconds' in kwargs:
kwargs.pop('keepalive_seconds') # don't confuse older runners with this Worker-only arg

self.kwargs = kwargs

self.status = "unstarted"
Expand All @@ -60,12 +64,22 @@ def run(self):
return self.status, self.rc


class Worker(object):
def __init__(self, _input=None, _output=None, **kwargs):
class Worker:
def __init__(self, _input=None, _output=None, keepalive_seconds: float | None = None, **kwargs):
if _input is None:
_input = sys.stdin.buffer
if _output is None:
_output = sys.stdout.buffer

if keepalive_seconds is None: # if we didn't get an explicit int value, fall back to envvar
# FIXME: emit/log a warning and silently continue if this value won't parse
keepalive_seconds = float(os.environ.get('ANSIBLE_RUNNER_KEEPALIVE_SECONDS', 0))

self._keepalive_interval_sec = keepalive_seconds
self._keepalive_thread: Thread | None = None
self._output_event = Event()
self._output_lock = RLock()

self._input = _input
self._output = _output

Expand All @@ -81,6 +95,64 @@ def __init__(self, _input=None, _output=None, **kwargs):
self.status = "unstarted"
self.rc = None

def _begin_keepalive(self):
"""Starts a keepalive thread at most once"""
if not self._keepalive_thread:
self._keepalive_thread = Thread(target=self._keepalive_loop, daemon=True)
self._keepalive_thread.start()

def _end_keepalive(self):
"""Disable the keepalive interval and notify the keepalive thread to shut down"""
self._keepalive_interval_sec = 0
self._output_event.set()

def _keepalive_loop(self):
"""Main loop for keepalive injection thread; exits when keepalive interval is <= 0"""
while self._keepalive_interval_sec > 0:
# block until output has occurred or keepalive interval elapses
if self._output_event.wait(timeout=self._keepalive_interval_sec):
# output was sent before keepalive timeout; reset the event and start waiting again
self._output_event.clear()
continue

# keepalive interval elapsed; try to send a keepalive...
# pre-acquire the output lock without blocking
if not self._output_lock.acquire(blocking=False):
# something else has the lock; output is imminent, so just skip this keepalive
# NB: a long-running operation under an event handler that's holding this lock but not actually moving
# output could theoretically block keepalives long enough to cause problems, but it's probably not
# worth the added locking hassle to be pedantic about it
continue

try:
# were keepalives recently disabled?
if self._keepalive_interval_sec <= 0:
# we're probably shutting down; don't risk corrupting output by writing now, just bail out
return
# output a keepalive event
# FIXME: this could be a lot smaller (even just `{}`) if a short-circuit discard was guaranteed in
# Processor or if other layers were more defensive about missing event keys and/or unknown dictionary
# values...
self.event_handler(dict(event='keepalive', counter=0, uuid=0))
finally:
# always release the output lock (
self._output_lock.release()

def _synchronize_output_reset_keepalive(wrapped_method):
"""
Utility decorator to synchronize event writes and flushes to avoid keepalives splatting in the middle of
mid-write events, and reset keepalive interval on write completion.
"""
@wraps(wrapped_method)
def wrapper(self, *args, **kwargs):
with self._output_lock:
ret = wrapped_method(self, *args, **kwargs)
# signal the keepalive thread last, so the timeout restarts after the last write, not before the first
self._output_event.set()
return ret

return wrapper

def update_paths(self, kwargs):
if kwargs.get('envvars'):
if 'ANSIBLE_ROLES_PATH' in kwargs['envvars']:
Expand All @@ -93,63 +165,72 @@ def update_paths(self, kwargs):
return kwargs

def run(self):
while True:
try:
line = self._input.readline()
data = json.loads(line)
except (json.decoder.JSONDecodeError, IOError):
self.status_handler({'status': 'error', 'job_explanation': 'Failed to JSON parse a line from transmit stream.'}, None)
self.finished_callback(None) # send eof line
return self.status, self.rc

if 'kwargs' in data:
self.job_kwargs = self.update_paths(data['kwargs'])
elif 'zipfile' in data:
self._begin_keepalive()
try:
while True:
try:
unstream_dir(self._input, data['zipfile'], self.private_data_dir)
except Exception:
self.status_handler({
'status': 'error',
'job_explanation': 'Failed to extract private data directory on worker.',
'result_traceback': traceback.format_exc()
}, None)
line = self._input.readline()
data = json.loads(line)
except (json.decoder.JSONDecodeError, IOError):
self.status_handler({'status': 'error', 'job_explanation': 'Failed to JSON parse a line from transmit stream.'}, None)
self.finished_callback(None) # send eof line
return self.status, self.rc
elif 'eof' in data:
break

self.kwargs.update(self.job_kwargs)
self.kwargs['quiet'] = True
self.kwargs['suppress_ansible_output'] = True
self.kwargs['private_data_dir'] = self.private_data_dir
self.kwargs['status_handler'] = self.status_handler
self.kwargs['event_handler'] = self.event_handler
self.kwargs['artifacts_handler'] = self.artifacts_handler
self.kwargs['finished_callback'] = self.finished_callback

r = ansible_runner.interface.run(**self.kwargs)
self.status, self.rc = r.status, r.rc

# FIXME: do cleanup on the tempdir
if 'kwargs' in data:
self.job_kwargs = self.update_paths(data['kwargs'])
elif 'zipfile' in data:
try:
unstream_dir(self._input, data['zipfile'], self.private_data_dir)
except Exception:
self.status_handler({
'status': 'error',
'job_explanation': 'Failed to extract private data directory on worker.',
'result_traceback': traceback.format_exc()
}, None)
self.finished_callback(None) # send eof line
return self.status, self.rc
elif 'eof' in data:
break

self.kwargs.update(self.job_kwargs)
self.kwargs['quiet'] = True
self.kwargs['suppress_ansible_output'] = True
self.kwargs['private_data_dir'] = self.private_data_dir
self.kwargs['status_handler'] = self.status_handler
self.kwargs['event_handler'] = self.event_handler
self.kwargs['artifacts_handler'] = self.artifacts_handler
self.kwargs['finished_callback'] = self.finished_callback

r = ansible_runner.interface.run(**self.kwargs)
self.status, self.rc = r.status, r.rc

# FIXME: do cleanup on the tempdir
finally:
self._end_keepalive()

return self.status, self.rc

@_synchronize_output_reset_keepalive
def status_handler(self, status_data, runner_config):
self.status = status_data['status']
self._output.write(json.dumps(status_data).encode('utf-8'))
self._output.write(b'\n')
self._output.flush()

@_synchronize_output_reset_keepalive
def event_handler(self, event_data):
self._output.write(json.dumps(event_data).encode('utf-8'))
self._output.write(b'\n')
self._output.flush()

@_synchronize_output_reset_keepalive
def artifacts_handler(self, artifact_dir):
stream_dir(artifact_dir, self._output)
self._output.flush()

@_synchronize_output_reset_keepalive
def finished_callback(self, runner_obj):
self._end_keepalive() # ensure that we can't splat a keepalive event after the eof event
self._output.write(json.dumps({'eof': True}).encode('utf-8'))
self._output.write(b'\n')
self._output.flush()
Expand Down Expand Up @@ -210,10 +291,18 @@ def status_callback(self, status_data):
self.status_handler(status_data, runner_config=self.config)

def event_callback(self, event_data):
# FIXME: this needs to be more defensive to not blow up on "malformed" events or new values it doesn't recognize
counter = event_data.get('counter')
uuid = event_data.get('uuid')

if not counter or not uuid:
# FIXME: log a warning about a malformed event?
return

full_filename = os.path.join(self.artifact_dir,
'job_events',
'{}-{}.json'.format(event_data['counter'],
event_data['uuid']))
f'{counter}-{uuid}.json')

if not self.quiet and 'stdout' in event_data:
print(event_data['stdout'])

Expand Down Expand Up @@ -254,6 +343,9 @@ def run(self):
self.artifacts_callback(data)
elif 'eof' in data:
break
elif data.get('event') == 'keepalive':
# just ignore keepalives
continue
else:
self.event_callback(data)

Expand Down
Loading

0 comments on commit 334c607

Please sign in to comment.