From 368faa7e73912553fccf4d252a064333d8095470 Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Thu, 17 Oct 2024 12:47:24 -0400 Subject: [PATCH] Have Transmitter get values from RunnerConfig --- src/ansible_runner/config/_base.py | 36 +++++++++------ src/ansible_runner/config/runner.py | 32 ++++++++++++-- src/ansible_runner/interface.py | 9 ++-- src/ansible_runner/streaming.py | 44 ++++++++----------- .../test_transmit_worker_process.py | 38 ++++++++++------ test/unit/config/test_runner.py | 15 +++++++ 6 files changed, 111 insertions(+), 63 deletions(-) diff --git a/src/ansible_runner/config/_base.py b/src/ansible_runner/config/_base.py index c6e6a819..d03d0815 100644 --- a/src/ansible_runner/config/_base.py +++ b/src/ansible_runner/config/_base.py @@ -64,7 +64,7 @@ class BaseExecutionMode(Enum): # Metadata string values class MetaValues(Enum): - STREAMABLE = 'streamable' + TRANSMIT = 'transmit' @dataclass @@ -82,26 +82,26 @@ class BaseConfig: # No other config objects make use of positional parameters, so this should be fine. # # Example use case: RunnerConfig("/tmp/demo", playbook="main.yml", ...) - private_data_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) + private_data_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) - artifact_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) + artifact_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) check_job_event_data: bool = False container_auth_data: dict[str, str] | None = None - container_image: str = "" + container_image: str | None = None container_options: list[str] | None = None container_volume_mounts: list[str] | None = None container_workdir: str | None = None envvars: dict[str, Any] | None = None - fact_cache: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) + fact_cache: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) fact_cache_type: str = 'jsonfile' host_cwd: str | None = None - ident: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) + ident: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) json_mode: bool = False - keepalive_seconds: int | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) + keepalive_seconds: int | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) passwords: dict[str, str] | None = None process_isolation: bool = False process_isolation_executable: str = defaults.default_process_isolation_executable - project_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) + project_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) quiet: bool = False rotate_artifacts: int = 0 settings: dict | None = None @@ -109,11 +109,11 @@ class BaseConfig: suppress_env_files: bool = False timeout: int | None = None - event_handler: Callable[[dict], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) - status_handler: Callable[[dict, BaseConfig], bool] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) - artifacts_handler: Callable[[str], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) - cancel_callback: Callable[[], bool] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) - finished_callback: Callable[[BaseConfig], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None) + event_handler: Callable[[dict], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) + status_handler: Callable[[dict, BaseConfig], bool] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) + artifacts_handler: Callable[[str], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) + cancel_callback: Callable[[], bool] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) + finished_callback: Callable[[BaseConfig], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None) _CONTAINER_ENGINES = ('docker', 'podman') @@ -123,6 +123,8 @@ def __post_init__(self) -> None: self.command: list[str] = [] self.registry_auth_path: str self.container_name: str = "" # like other properties, not accurate until prepare is called + if self.container_image is None: + self.container_image = '' # ignore this for now since it's worker-specific and would just trip up old runners # self.keepalive_seconds = keepalive_seconds @@ -147,7 +149,9 @@ def __post_init__(self) -> None: if self.ident is None: self.ident = str(uuid4()) + self.ident_set_by_user = False else: + self.ident_set_by_user = True self.ident = str(self.ident) self.artifact_dir = os.path.join(self.artifact_dir, self.ident) @@ -185,12 +189,13 @@ def prepare_env(self, runner_mode: str = 'pexpect') -> None: Manages reading environment metadata files under ``private_data_dir`` and merging/updating with existing values so the :py:class:`ansible_runner.runner.Runner` object can read and use them easily """ - if self.ident is None: raise ConfigurationError("ident value cannot be None") if self.artifact_dir is None: raise ConfigurationError("artifact_dir value cannot be None") + self.artifact_dir = os.path.join(self.artifact_dir, self.ident) + self.runner_mode = runner_mode try: if self.settings and isinstance(self.settings, dict): @@ -520,6 +525,9 @@ def wrap_args_for_containerization(self, if self.private_data_dir is None: raise ConfigurationError("private_data_dir value cannot be None") + if self.container_image is None: + raise ConfigurationError("container_image value cannot be None") + new_args = [self.process_isolation_executable] new_args.extend(['run', '--rm']) diff --git a/src/ansible_runner/config/runner.py b/src/ansible_runner/config/runner.py index 3430bd54..5bc33a28 100644 --- a/src/ansible_runner/config/runner.py +++ b/src/ansible_runner/config/runner.py @@ -152,18 +152,42 @@ def extra_vars(self, value): def streamable_attributes(self) -> dict[str, Any]: """Get the set of streamable attributes that have a value that is different from the default. - The field metadata indicates if the attribute is streamable. By default, an attribute + The field metadata indicates if the attribute is streamable from Transmit. By default, an attribute is considered streamable (must be explicitly disabled). :return: A dict of attribute names and their values. """ retval = {} for field_obj in fields(self): - if field_obj.metadata and not field_obj.metadata.get(MetaValues.STREAMABLE, True): + if field_obj.metadata and not field_obj.metadata.get(MetaValues.TRANSMIT, True): continue current_value = getattr(self, field_obj.name) - if not field_obj.default == current_value: - retval[field_obj.name] = current_value + + if field_obj.default == current_value: + continue + + # Treat an empty current value (e.g., {} or "") as the same as a default of None to prevent + # streaming unnecessary empty values. + if field_obj.default is None and current_value in ({}, "", []): + continue + + retval[field_obj.name] = current_value + + return retval + + def all_non_default_attributes(self) -> dict[str, Any]: + """Get all values that have been set differently from their default values. + + :return: A dict of attribute names and their values. + """ + retval = {} + for field_obj in fields(self): + current_value = getattr(self, field_obj.name) + if field_obj.default == current_value: + continue + if field_obj.default is None and current_value in ({}, "", []): + continue + retval[field_obj.name] = current_value return retval def prepare(self): diff --git a/src/ansible_runner/interface.py b/src/ansible_runner/interface.py index 2df978b4..df2fbe00 100644 --- a/src/ansible_runner/interface.py +++ b/src/ansible_runner/interface.py @@ -25,7 +25,6 @@ import sys import threading import logging -from dataclasses import asdict from ansible_runner import output from ansible_runner._internal._dump_artifacts import dump_artifacts @@ -90,18 +89,16 @@ def init_runner( config.cancel_callback = signal_handler() if streamer == 'transmit': - kwargs = asdict(config) - stream_transmitter = Transmitter(only_transmit_kwargs, _output=_output, **kwargs) + stream_transmitter = Transmitter(config, only_transmit_kwargs, _output=_output) return stream_transmitter if streamer == 'worker': - kwargs = asdict(config) + kwargs = config.all_non_default_attributes() stream_worker = Worker(_input=_input, _output=_output, **kwargs) return stream_worker if streamer == 'process': - kwargs = asdict(config) - stream_processor = Processor(_input=_input, **kwargs) + stream_processor = Processor(config, _input=_input) return stream_processor if config.process_isolation: diff --git a/src/ansible_runner/streaming.py b/src/ansible_runner/streaming.py index d27d8bc6..15296054 100644 --- a/src/ansible_runner/streaming.py +++ b/src/ansible_runner/streaming.py @@ -15,6 +15,7 @@ from typing import BinaryIO import ansible_runner +from ansible_runner.config.runner import RunnerConfig from ansible_runner.exceptions import ConfigurationError from ansible_runner.loader import ArtifactLoader import ansible_runner.plugins @@ -38,16 +39,14 @@ def __init__(self, settings): class Transmitter: - def __init__(self, only_transmit_kwargs: bool, _output: BinaryIO | None, **kwargs): + def __init__(self, config: RunnerConfig, only_transmit_kwargs: bool = False, _output: BinaryIO | None = None): if _output is None: _output = sys.stdout.buffer self._output = _output - self.private_data_dir = os.path.abspath(kwargs['private_data_dir']) + self.private_data_dir = os.path.abspath(config.private_data_dir) if config.private_data_dir else "" self.only_transmit_kwargs = only_transmit_kwargs - if 'keepalive_seconds' in kwargs: - kwargs.pop('keepalive_seconds') # don't confuse older runners with this Worker-only arg - self.kwargs = kwargs + self.kwargs = config.streamable_attributes() self.status = "unstarted" self.rc = None @@ -251,21 +250,17 @@ def finished_callback(self, runner_obj): class Processor: - def __init__(self, _input=None, status_handler=None, event_handler=None, - artifacts_handler=None, cancel_callback=None, finished_callback=None, **kwargs): + def __init__(self, config: RunnerConfig, _input: BinaryIO | None = None, quiet: bool = False): if _input is None: _input = sys.stdin.buffer self._input = _input - self.quiet = kwargs.get('quiet') + self.quiet = quiet - private_data_dir = kwargs.get('private_data_dir') - if private_data_dir is None: - private_data_dir = tempfile.mkdtemp() - self.private_data_dir = private_data_dir + self.private_data_dir: str = config.private_data_dir or '' self._loader = ArtifactLoader(self.private_data_dir) - settings = kwargs.get('settings') + settings = config.settings if settings is None: try: settings = self._loader.load_file('env/settings', Mapping) @@ -273,21 +268,18 @@ def __init__(self, _input=None, status_handler=None, event_handler=None, settings = {} self.config = MockConfig(settings) - if kwargs.get('artifact_dir'): - self.artifact_dir = os.path.abspath(kwargs.get('artifact_dir')) - else: - project_artifacts = os.path.abspath(os.path.join(self.private_data_dir, 'artifacts')) - if ident := kwargs.get('ident'): - self.artifact_dir = os.path.join(project_artifacts, str(ident)) - else: - self.artifact_dir = project_artifacts + self.artifact_dir = config.artifact_dir + if not config.ident_set_by_user: + # If an ident value was not explicitly supplied, for some reason, we don't bother with + # using a subdir named with the ident value. + self.artifact_dir, _ = os.path.split(self.artifact_dir) - self.status_handler = status_handler - self.event_handler = event_handler - self.artifacts_handler = artifacts_handler + self.status_handler = config.status_handler + self.event_handler = config.event_handler + self.artifacts_handler = config.artifacts_handler - self.cancel_callback = cancel_callback # FIXME: unused - self.finished_callback = finished_callback + self.cancel_callback = config.cancel_callback # FIXME: unused + self.finished_callback = config.finished_callback self.status = "unstarted" self.rc = None diff --git a/test/integration/test_transmit_worker_process.py b/test/integration/test_transmit_worker_process.py index 187b111e..532ff088 100644 --- a/test/integration/test_transmit_worker_process.py +++ b/test/integration/test_transmit_worker_process.py @@ -9,7 +9,8 @@ import pytest -from ansible_runner import run +from ansible_runner.config.runner import RunnerConfig +from ansible_runner.exceptions import ConfigurationError from ansible_runner.streaming import Transmitter, Worker, Processor import ansible_runner.interface # AWX import pattern @@ -74,7 +75,8 @@ def test_remote_job_interface(self, tmp_path, project_fixtures, job_type): outgoing_buffer_file.touch() outgoing_buffer = outgoing_buffer_file.open('b+r') - transmitter = Transmitter(_output=outgoing_buffer, private_data_dir=transmit_dir, **job_kwargs) + config = RunnerConfig(private_data_dir=transmit_dir, **job_kwargs) + transmitter = Transmitter(config, _output=outgoing_buffer) for key, value in job_kwargs.items(): assert transmitter.kwargs.get(key, '') == value @@ -141,10 +143,15 @@ def test_keepalive_setting(self, tmp_path, project_fixtures, keepalive_setting): for buffer in (outgoing_buffer, incoming_buffer): buffer.name = 'foo' + config = RunnerConfig(private_data_dir=project_fixtures / 'sleep', + playbook='sleep.yml', + extravars={'sleep_interval': 2}, + verbosity=verbosity) + status, rc = Transmitter( + config, only_transmit_kwargs=False, - _output=outgoing_buffer, private_data_dir=project_fixtures / 'sleep', - playbook='sleep.yml', extravars={'sleep_interval': 2}, verbosity=verbosity + _output=outgoing_buffer, ).run() assert rc in (None, 0) assert status == 'unstarted' @@ -257,6 +264,9 @@ def process_method(results_sockfile_read): break time.sleep(0.05) # additionally, AWX calls cancel_callback() + res = process_future.result() + assert res.status == 'successful' + for s in ( transmit_socket_write, transmit_socket_read, results_socket_write, results_socket_read, transmit_socket_write_file, transmit_socket_read_file, results_socket_write_file, results_socket_read_file, @@ -292,7 +302,7 @@ def test_process_isolation_executable_not_exist(self, tmp_path, mocker): **job_kwargs, ) - # valide process_isolation kwargs are passed to transmitter + # valid process_isolation kwargs are passed to transmitter assert transmitter.kwargs['process_isolation'] == job_kwargs['process_isolation'] assert transmitter.kwargs['process_isolation_executable'] == job_kwargs['process_isolation_executable'] @@ -330,7 +340,7 @@ def test_process_isolation_executable_not_exist(self, tmp_path, mocker): _output=incoming_buffer, private_data_dir=worker_dir, ) - assert exc.value.code == 1 + assert exc.value.code == 1 outgoing_buffer.close() incoming_buffer.close() @@ -341,8 +351,10 @@ def transmit_stream(project_fixtures, tmp_path): outgoing_buffer.touch() transmit_dir = project_fixtures / 'debug' + config = RunnerConfig(private_data_dir=transmit_dir, playbook='debug.yml') + with outgoing_buffer.open('wb') as f: - transmitter = Transmitter(only_transmit_kwargs=False, _output=f, private_data_dir=transmit_dir, playbook='debug.yml') + transmitter = Transmitter(config, only_transmit_kwargs=False, _output=f) status, rc = transmitter.run() assert rc in (None, 0) @@ -448,15 +460,15 @@ def test_missing_private_dir_transmit(): outgoing_buffer = io.BytesIO() # Transmit - with pytest.raises(ValueError) as excinfo: - run( + with pytest.raises(ConfigurationError) as excinfo: + ansible_runner.interface.run( streamer='transmit', _output=outgoing_buffer, private_data_dir='/foo/bar/baz', playbook='debug.yml', ) - assert "private_data_dir path is either invalid or does not exist" in str(excinfo.value) + assert "Unable to create private_data_dir /foo/bar/baz" in str(excinfo.value) def test_garbage_private_dir_worker(tmp_path): @@ -467,7 +479,7 @@ def test_garbage_private_dir_worker(tmp_path): outgoing_buffer = io.BytesIO() # Worker - run( + ansible_runner.interface.run( streamer='worker', _input=incoming_buffer, _output=outgoing_buffer, @@ -488,7 +500,7 @@ def test_unparsable_line_worker(tmp_path): outgoing_buffer = io.BytesIO() # Worker - run( + ansible_runner.interface.run( streamer='worker', _input=incoming_buffer, _output=outgoing_buffer, @@ -512,7 +524,7 @@ def status_receiver(status_data, runner_config): # pylint: disable=W0613 assert 'not-json-data with extra garbage:ffffffffff' in status_data['job_explanation'] assert len(status_data['job_explanation']) < 2000 - run( + ansible_runner.interface.run( streamer='process', _input=incoming_buffer, private_data_dir=process_dir, diff --git a/test/unit/config/test_runner.py b/test/unit/config/test_runner.py index 8b060e92..68708919 100644 --- a/test/unit/config/test_runner.py +++ b/test/unit/config/test_runner.py @@ -749,11 +749,26 @@ def test_streamable_attributes_non_default(tmp_path): keepalive_seconds=10, host_pattern="hostA,", json_mode=True, + playbook=[], verbosity=3) # Don't expect private_data_dir or keepalive_seconds since they are not streamable. + # Don't expect playbook since it is an empty value. assert rc.streamable_attributes() == { "host_pattern": "hostA,", "json_mode": True, "verbosity": 3, } + + +def test_all_non_default_attributes(): + """Test not non-default values are returned. + + Note that we expect certain values from an empty config to be returned because they + are set during initialization. + """ + rc = RunnerConfig(ident="my-ident") + retval = rc.all_non_default_attributes() + assert len(retval) == 5 + for key in ('ident', 'artifact_dir', 'project_dir', 'private_data_dir', 'fact_cache'): + assert key in retval