diff --git a/ansible_runner/__main__.py b/ansible_runner/__main__.py index 17c8469c1..0873719ed 100644 --- a/ansible_runner/__main__.py +++ b/ansible_runner/__main__.py @@ -611,6 +611,15 @@ def main(sys_args=None): "Using this will also assure that the directory is deleted when the job finishes." ) ) + worker_subparser.add_argument( + "--keepalive-seconds", + dest="keepalive_seconds", + default=None, + type=int, + help=( + "Emit a synthetic keepalive event every N seconds of idle. (default=0, disabled)" + ) + ) process_subparser = subparser.add_parser( 'process', help="Receive the output of remote ansible-runner work and distribute the results" @@ -859,6 +868,7 @@ def main(sys_args=None): limit=vargs.get('limit'), streamer=streamer, suppress_env_files=vargs.get("suppress_env_files"), + keepalive_seconds=vargs.get("keepalive_seconds"), ) try: res = run(**run_options) @@ -887,3 +897,7 @@ def main(sys_args=None): return 0 except OSError: return 1 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/ansible_runner/config/_base.py b/ansible_runner/config/_base.py index 03abc870e..1c43da075 100644 --- a/ansible_runner/config/_base.py +++ b/ansible_runner/config/_base.py @@ -67,7 +67,7 @@ def __init__(self, process_isolation=False, process_isolation_executable=None, container_image=None, container_volume_mounts=None, container_options=None, container_workdir=None, container_auth_data=None, ident=None, rotate_artifacts=0, timeout=None, ssh_key=None, quiet=False, json_mode=False, - check_job_event_data=False, suppress_env_files=False): + check_job_event_data=False, suppress_env_files=False, keepalive_seconds=None): # common params self.host_cwd = host_cwd self.envvars = envvars @@ -95,6 +95,8 @@ def __init__(self, self.timeout = timeout self.check_job_event_data = check_job_event_data self.suppress_env_files = suppress_env_files + # ignore this for now since it's worker-specific and would just trip up old runners + # self.keepalive_seconds = keepalive_seconds # setup initial environment if private_data_dir: diff --git a/ansible_runner/streaming.py b/ansible_runner/streaming.py index 1a64be740..ff9622c05 100644 --- a/ansible_runner/streaming.py +++ b/ansible_runner/streaming.py @@ -1,3 +1,5 @@ +from __future__ import annotations # allow newer type syntax until 3.10 is our minimum + import codecs import json import os @@ -6,10 +8,6 @@ import tempfile import uuid import traceback -try: - from collections.abc import Mapping -except ImportError: - from collections import Mapping import ansible_runner from ansible_runner.exceptions import ConfigurationError @@ -17,6 +15,9 @@ import ansible_runner.plugins from ansible_runner.utils import register_for_cleanup from ansible_runner.utils.streaming import stream_dir, unstream_dir +from collections.abc import Mapping +from functools import wraps +from threading import Event, RLock, Thread class UUIDEncoder(json.JSONEncoder): @@ -38,6 +39,9 @@ def __init__(self, _output=None, **kwargs): self._output = _output self.private_data_dir = os.path.abspath(kwargs.pop('private_data_dir')) self.only_transmit_kwargs = kwargs.pop('only_transmit_kwargs', False) + if 'keepalive_seconds' in kwargs: + kwargs.pop('keepalive_seconds') # don't confuse older runners with this Worker-only arg + self.kwargs = kwargs self.status = "unstarted" @@ -60,12 +64,22 @@ def run(self): return self.status, self.rc -class Worker(object): - def __init__(self, _input=None, _output=None, **kwargs): +class Worker: + def __init__(self, _input=None, _output=None, keepalive_seconds: float | None = None, **kwargs): if _input is None: _input = sys.stdin.buffer if _output is None: _output = sys.stdout.buffer + + if keepalive_seconds is None: # if we didn't get an explicit int value, fall back to envvar + # FIXME: emit/log a warning and silently continue if this value won't parse + keepalive_seconds = float(os.environ.get('ANSIBLE_RUNNER_KEEPALIVE_SECONDS', 0)) + + self._keepalive_interval_sec = keepalive_seconds + self._keepalive_thread: Thread | None = None + self._output_event = Event() + self._output_lock = RLock() + self._input = _input self._output = _output @@ -81,6 +95,64 @@ def __init__(self, _input=None, _output=None, **kwargs): self.status = "unstarted" self.rc = None + def _begin_keepalive(self): + """Starts a keepalive thread at most once""" + if not self._keepalive_thread: + self._keepalive_thread = Thread(target=self._keepalive_loop, daemon=True) + self._keepalive_thread.start() + + def _end_keepalive(self): + """Disable the keepalive interval and notify the keepalive thread to shut down""" + self._keepalive_interval_sec = 0 + self._output_event.set() + + def _keepalive_loop(self): + """Main loop for keepalive injection thread; exits when keepalive interval is <= 0""" + while self._keepalive_interval_sec > 0: + # block until output has occurred or keepalive interval elapses + if self._output_event.wait(timeout=self._keepalive_interval_sec): + # output was sent before keepalive timeout; reset the event and start waiting again + self._output_event.clear() + continue + + # keepalive interval elapsed; try to send a keepalive... + # pre-acquire the output lock without blocking + if not self._output_lock.acquire(blocking=False): + # something else has the lock; output is imminent, so just skip this keepalive + # NB: a long-running operation under an event handler that's holding this lock but not actually moving + # output could theoretically block keepalives long enough to cause problems, but it's probably not + # worth the added locking hassle to be pedantic about it + continue + + try: + # were keepalives recently disabled? + if self._keepalive_interval_sec <= 0: + # we're probably shutting down; don't risk corrupting output by writing now, just bail out + return + # output a keepalive event + # FIXME: this could be a lot smaller (even just `{}`) if a short-circuit discard was guaranteed in + # Processor or if other layers were more defensive about missing event keys and/or unknown dictionary + # values... + self.event_handler(dict(event='keepalive', counter=0, uuid=0)) + finally: + # always release the output lock ( + self._output_lock.release() + + def _synchronize_output_reset_keepalive(wrapped_method): + """ + Utility decorator to synchronize event writes and flushes to avoid keepalives splatting in the middle of + mid-write events, and reset keepalive interval on write completion. + """ + @wraps(wrapped_method) + def wrapper(self, *args, **kwargs): + with self._output_lock: + ret = wrapped_method(self, *args, **kwargs) + # signal the keepalive thread last, so the timeout restarts after the last write, not before the first + self._output_event.set() + return ret + + return wrapper + def update_paths(self, kwargs): if kwargs.get('envvars'): if 'ANSIBLE_ROLES_PATH' in kwargs['envvars']: @@ -93,63 +165,72 @@ def update_paths(self, kwargs): return kwargs def run(self): - while True: - try: - line = self._input.readline() - data = json.loads(line) - except (json.decoder.JSONDecodeError, IOError): - self.status_handler({'status': 'error', 'job_explanation': 'Failed to JSON parse a line from transmit stream.'}, None) - self.finished_callback(None) # send eof line - return self.status, self.rc - - if 'kwargs' in data: - self.job_kwargs = self.update_paths(data['kwargs']) - elif 'zipfile' in data: + self._begin_keepalive() + try: + while True: try: - unstream_dir(self._input, data['zipfile'], self.private_data_dir) - except Exception: - self.status_handler({ - 'status': 'error', - 'job_explanation': 'Failed to extract private data directory on worker.', - 'result_traceback': traceback.format_exc() - }, None) + line = self._input.readline() + data = json.loads(line) + except (json.decoder.JSONDecodeError, IOError): + self.status_handler({'status': 'error', 'job_explanation': 'Failed to JSON parse a line from transmit stream.'}, None) self.finished_callback(None) # send eof line return self.status, self.rc - elif 'eof' in data: - break - self.kwargs.update(self.job_kwargs) - self.kwargs['quiet'] = True - self.kwargs['suppress_ansible_output'] = True - self.kwargs['private_data_dir'] = self.private_data_dir - self.kwargs['status_handler'] = self.status_handler - self.kwargs['event_handler'] = self.event_handler - self.kwargs['artifacts_handler'] = self.artifacts_handler - self.kwargs['finished_callback'] = self.finished_callback - - r = ansible_runner.interface.run(**self.kwargs) - self.status, self.rc = r.status, r.rc - - # FIXME: do cleanup on the tempdir + if 'kwargs' in data: + self.job_kwargs = self.update_paths(data['kwargs']) + elif 'zipfile' in data: + try: + unstream_dir(self._input, data['zipfile'], self.private_data_dir) + except Exception: + self.status_handler({ + 'status': 'error', + 'job_explanation': 'Failed to extract private data directory on worker.', + 'result_traceback': traceback.format_exc() + }, None) + self.finished_callback(None) # send eof line + return self.status, self.rc + elif 'eof' in data: + break + + self.kwargs.update(self.job_kwargs) + self.kwargs['quiet'] = True + self.kwargs['suppress_ansible_output'] = True + self.kwargs['private_data_dir'] = self.private_data_dir + self.kwargs['status_handler'] = self.status_handler + self.kwargs['event_handler'] = self.event_handler + self.kwargs['artifacts_handler'] = self.artifacts_handler + self.kwargs['finished_callback'] = self.finished_callback + + r = ansible_runner.interface.run(**self.kwargs) + self.status, self.rc = r.status, r.rc + + # FIXME: do cleanup on the tempdir + finally: + self._end_keepalive() return self.status, self.rc + @_synchronize_output_reset_keepalive def status_handler(self, status_data, runner_config): self.status = status_data['status'] self._output.write(json.dumps(status_data).encode('utf-8')) self._output.write(b'\n') self._output.flush() + @_synchronize_output_reset_keepalive def event_handler(self, event_data): self._output.write(json.dumps(event_data).encode('utf-8')) self._output.write(b'\n') self._output.flush() + @_synchronize_output_reset_keepalive def artifacts_handler(self, artifact_dir): stream_dir(artifact_dir, self._output) self._output.flush() + @_synchronize_output_reset_keepalive def finished_callback(self, runner_obj): + self._end_keepalive() # ensure that we can't splat a keepalive event after the eof event self._output.write(json.dumps({'eof': True}).encode('utf-8')) self._output.write(b'\n') self._output.flush() @@ -210,10 +291,18 @@ def status_callback(self, status_data): self.status_handler(status_data, runner_config=self.config) def event_callback(self, event_data): + # FIXME: this needs to be more defensive to not blow up on "malformed" events or new values it doesn't recognize + counter = event_data.get('counter') + uuid = event_data.get('uuid') + + if not counter or not uuid: + # FIXME: log a warning about a malformed event? + return + full_filename = os.path.join(self.artifact_dir, 'job_events', - '{}-{}.json'.format(event_data['counter'], - event_data['uuid'])) + f'{counter}-{uuid}.json') + if not self.quiet and 'stdout' in event_data: print(event_data['stdout']) @@ -254,6 +343,9 @@ def run(self): self.artifacts_callback(data) elif 'eof' in data: break + elif data.get('event') == 'keepalive': + # just ignore keepalives + continue else: self.event_callback(data) diff --git a/test/integration/test_transmit_worker_process.py b/test/integration/test_transmit_worker_process.py index e7e9097a7..f86e56513 100644 --- a/test/integration/test_transmit_worker_process.py +++ b/test/integration/test_transmit_worker_process.py @@ -1,8 +1,10 @@ +import base64 import io import os import socket import concurrent.futures import time +import threading import pytest import json @@ -32,10 +34,8 @@ def get_job_kwargs(self, job_type): job_kwargs['envvars'] = dict(MY_ENV_VAR='bogus') return job_kwargs - def check_artifacts(self, process_dir, job_type): - - assert set(os.listdir(process_dir)) == {'artifacts', } - + @staticmethod + def get_stdout(process_dir): events_dir = os.path.join(process_dir, 'artifacts', 'job_events') events = [] for file in os.listdir(events_dir): @@ -44,7 +44,14 @@ def check_artifacts(self, process_dir, job_type): continue content = f.read() events.append(json.loads(content)) - stdout = '\n'.join(event['stdout'] for event in events) + return '\n'.join(event['stdout'] for event in events) + + @staticmethod + def check_artifacts(process_dir, job_type): + + assert set(os.listdir(process_dir)) == {'artifacts', } + + stdout = TestStreamingUsage.get_stdout(process_dir) if job_type == 'run': assert 'Hello world!' in stdout @@ -99,6 +106,91 @@ def test_remote_job_interface(self, tmp_path, project_fixtures, job_type): self.check_artifacts(str(process_dir), job_type) + @pytest.mark.parametrize("keepalive_setting", [ + 0, # keepalive explicitly disabled, default + 1, # emit keepalives every 1s + 0.000000001, # emit keepalives on a ridiculously small interval to test for output corruption + None, # default disable, test sets envvar for keepalives + ]) + def test_keepalive_setting(self, tmp_path, project_fixtures, keepalive_setting): + verbosity = None + output_corruption_test_mode = 0 < (keepalive_setting or 0) < 1 + + if output_corruption_test_mode: + verbosity = 5 + # FIXME: turn on debug output too just to really spam the thing + + if keepalive_setting is None: + # test the envvar fallback + os.environ['ANSIBLE_RUNNER_KEEPALIVE_SECONDS'] = '1' + elif 'ANSIBLE_RUNNER_KEEPALIVE_SECONDS' in os.environ: + # don't allow this envvar to affect the test behavior + del os.environ['ANSIBLE_RUNNER_KEEPALIVE_SECONDS'] + + worker_dir = tmp_path / 'for_worker' + process_dir = tmp_path / 'for_process' + for dir in (worker_dir, process_dir): + dir.mkdir() + + outgoing_buffer = io.BytesIO() + incoming_buffer = io.BytesIO() + for buffer in (outgoing_buffer, incoming_buffer): + buffer.name = 'foo' + + status, rc = Transmitter( + _output=outgoing_buffer, private_data_dir=project_fixtures / 'sleep', + playbook='sleep.yml', extravars=dict(sleep_interval=2), verbosity=verbosity + ).run() + assert rc in (None, 0) + assert status == 'unstarted' + outgoing_buffer.seek(0) + + worker_start_time = time.time() + + worker = Worker( + _input=outgoing_buffer, _output=incoming_buffer, private_data_dir=worker_dir, + keepalive_seconds=keepalive_setting + ) + worker.run() + + assert time.time() - worker_start_time > 2.0 # task sleeps for 2 second + assert isinstance(worker._keepalive_thread, threading.Thread) # we currently always create and start the thread + assert worker._keepalive_thread.daemon + worker._keepalive_thread.join(2) # wait a couple of keepalive intervals to avoid exit race + assert not worker._keepalive_thread.is_alive() # make sure it's dead + + incoming_buffer.seek(0) + Processor(_input=incoming_buffer, private_data_dir=process_dir).run() + + stdout = self.get_stdout(process_dir) + assert 'Sleep for a specified interval' in stdout + assert '"event": "keepalive"' not in stdout + + incoming_data = incoming_buffer.getvalue().decode('utf-8') + if keepalive_setting == 0: + assert incoming_data.count('"event": "keepalive"') == 0 + elif 0 < (keepalive_setting or 0) < 1: + # JSON-load every line to ensure no interleaved keepalive output corruption + line = None + try: + pending_payload_length = 0 + for line in incoming_data.splitlines(): + if pending_payload_length: + # decode and check length to validate that we didn't trash the payload + # zap the mashed eof message from the end if present + line = line.rsplit('{"eof": true}', 1)[0] # FUTURE: change this to removesuffix for 3.9+ + assert pending_payload_length == len(base64.b64decode(line)) + pending_payload_length = 0 # back to normal + continue + + data = json.loads(line) + pending_payload_length = data.get('zipfile', 0) + except json.JSONDecodeError: + pytest.fail(f'unparseable JSON in output (likely corrupted by keepalive): {line}') + else: + # account for some wobble in the number of keepalives for artifact gather, etc + assert 1 <= incoming_data.count('"event": "keepalive"') < 5 + @pytest.mark.parametrize("job_type", ['run', 'adhoc']) def test_remote_job_by_sockets(self, tmp_path, project_fixtures, job_type): """This test case is intended to be close to how the AWX use case works