Skip to content

Commit

Permalink
Have Transmitter get values from RunnerConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
Shrews committed Oct 17, 2024
1 parent 1121854 commit 368faa7
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 63 deletions.
36 changes: 22 additions & 14 deletions src/ansible_runner/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BaseExecutionMode(Enum):

# Metadata string values
class MetaValues(Enum):
STREAMABLE = 'streamable'
TRANSMIT = 'transmit'


@dataclass
Expand All @@ -82,38 +82,38 @@ class BaseConfig:
# No other config objects make use of positional parameters, so this should be fine.
#
# Example use case: RunnerConfig("/tmp/demo", playbook="main.yml", ...)
private_data_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
private_data_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)

artifact_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
artifact_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
check_job_event_data: bool = False
container_auth_data: dict[str, str] | None = None
container_image: str = ""
container_image: str | None = None
container_options: list[str] | None = None
container_volume_mounts: list[str] | None = None
container_workdir: str | None = None
envvars: dict[str, Any] | None = None
fact_cache: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
fact_cache: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
fact_cache_type: str = 'jsonfile'
host_cwd: str | None = None
ident: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
ident: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
json_mode: bool = False
keepalive_seconds: int | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
keepalive_seconds: int | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
passwords: dict[str, str] | None = None
process_isolation: bool = False
process_isolation_executable: str = defaults.default_process_isolation_executable
project_dir: str | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
project_dir: str | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
quiet: bool = False
rotate_artifacts: int = 0
settings: dict | None = None
ssh_key: str | None = None
suppress_env_files: bool = False
timeout: int | None = None

event_handler: Callable[[dict], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
status_handler: Callable[[dict, BaseConfig], bool] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
artifacts_handler: Callable[[str], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
cancel_callback: Callable[[], bool] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
finished_callback: Callable[[BaseConfig], None] | None = field(metadata={MetaValues.STREAMABLE: False}, default=None)
event_handler: Callable[[dict], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
status_handler: Callable[[dict, BaseConfig], bool] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
artifacts_handler: Callable[[str], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
cancel_callback: Callable[[], bool] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)
finished_callback: Callable[[BaseConfig], None] | None = field(metadata={MetaValues.TRANSMIT: False}, default=None)

_CONTAINER_ENGINES = ('docker', 'podman')

Expand All @@ -123,6 +123,8 @@ def __post_init__(self) -> None:
self.command: list[str] = []
self.registry_auth_path: str
self.container_name: str = "" # like other properties, not accurate until prepare is called
if self.container_image is None:
self.container_image = ''

# ignore this for now since it's worker-specific and would just trip up old runners
# self.keepalive_seconds = keepalive_seconds
Expand All @@ -147,7 +149,9 @@ def __post_init__(self) -> None:

if self.ident is None:
self.ident = str(uuid4())
self.ident_set_by_user = False
else:
self.ident_set_by_user = True
self.ident = str(self.ident)

self.artifact_dir = os.path.join(self.artifact_dir, self.ident)
Expand Down Expand Up @@ -185,12 +189,13 @@ def prepare_env(self, runner_mode: str = 'pexpect') -> None:
Manages reading environment metadata files under ``private_data_dir`` and merging/updating
with existing values so the :py:class:`ansible_runner.runner.Runner` object can read and use them easily
"""

if self.ident is None:
raise ConfigurationError("ident value cannot be None")
if self.artifact_dir is None:
raise ConfigurationError("artifact_dir value cannot be None")

self.artifact_dir = os.path.join(self.artifact_dir, self.ident)

self.runner_mode = runner_mode
try:
if self.settings and isinstance(self.settings, dict):
Expand Down Expand Up @@ -520,6 +525,9 @@ def wrap_args_for_containerization(self,
if self.private_data_dir is None:
raise ConfigurationError("private_data_dir value cannot be None")

if self.container_image is None:
raise ConfigurationError("container_image value cannot be None")

new_args = [self.process_isolation_executable]
new_args.extend(['run', '--rm'])

Expand Down
32 changes: 28 additions & 4 deletions src/ansible_runner/config/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,42 @@ def extra_vars(self, value):
def streamable_attributes(self) -> dict[str, Any]:
"""Get the set of streamable attributes that have a value that is different from the default.
The field metadata indicates if the attribute is streamable. By default, an attribute
The field metadata indicates if the attribute is streamable from Transmit. By default, an attribute
is considered streamable (must be explicitly disabled).
:return: A dict of attribute names and their values.
"""
retval = {}
for field_obj in fields(self):
if field_obj.metadata and not field_obj.metadata.get(MetaValues.STREAMABLE, True):
if field_obj.metadata and not field_obj.metadata.get(MetaValues.TRANSMIT, True):
continue
current_value = getattr(self, field_obj.name)
if not field_obj.default == current_value:
retval[field_obj.name] = current_value

if field_obj.default == current_value:
continue

# Treat an empty current value (e.g., {} or "") as the same as a default of None to prevent
# streaming unnecessary empty values.
if field_obj.default is None and current_value in ({}, "", []):
continue

retval[field_obj.name] = current_value

return retval

def all_non_default_attributes(self) -> dict[str, Any]:
"""Get all values that have been set differently from their default values.
:return: A dict of attribute names and their values.
"""
retval = {}
for field_obj in fields(self):
current_value = getattr(self, field_obj.name)
if field_obj.default == current_value:
continue
if field_obj.default is None and current_value in ({}, "", []):
continue
retval[field_obj.name] = current_value
return retval

def prepare(self):
Expand Down
9 changes: 3 additions & 6 deletions src/ansible_runner/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import sys
import threading
import logging
from dataclasses import asdict

from ansible_runner import output
from ansible_runner._internal._dump_artifacts import dump_artifacts
Expand Down Expand Up @@ -90,18 +89,16 @@ def init_runner(
config.cancel_callback = signal_handler()

if streamer == 'transmit':
kwargs = asdict(config)
stream_transmitter = Transmitter(only_transmit_kwargs, _output=_output, **kwargs)
stream_transmitter = Transmitter(config, only_transmit_kwargs, _output=_output)
return stream_transmitter

if streamer == 'worker':
kwargs = asdict(config)
kwargs = config.all_non_default_attributes()
stream_worker = Worker(_input=_input, _output=_output, **kwargs)
return stream_worker

if streamer == 'process':
kwargs = asdict(config)
stream_processor = Processor(_input=_input, **kwargs)
stream_processor = Processor(config, _input=_input)
return stream_processor

if config.process_isolation:
Expand Down
44 changes: 18 additions & 26 deletions src/ansible_runner/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import BinaryIO

import ansible_runner
from ansible_runner.config.runner import RunnerConfig
from ansible_runner.exceptions import ConfigurationError
from ansible_runner.loader import ArtifactLoader
import ansible_runner.plugins
Expand All @@ -38,16 +39,14 @@ def __init__(self, settings):


class Transmitter:
def __init__(self, only_transmit_kwargs: bool, _output: BinaryIO | None, **kwargs):
def __init__(self, config: RunnerConfig, only_transmit_kwargs: bool = False, _output: BinaryIO | None = None):
if _output is None:
_output = sys.stdout.buffer
self._output = _output
self.private_data_dir = os.path.abspath(kwargs['private_data_dir'])
self.private_data_dir = os.path.abspath(config.private_data_dir) if config.private_data_dir else ""
self.only_transmit_kwargs = only_transmit_kwargs
if 'keepalive_seconds' in kwargs:
kwargs.pop('keepalive_seconds') # don't confuse older runners with this Worker-only arg

self.kwargs = kwargs
self.kwargs = config.streamable_attributes()

self.status = "unstarted"
self.rc = None
Expand Down Expand Up @@ -251,43 +250,36 @@ def finished_callback(self, runner_obj):


class Processor:
def __init__(self, _input=None, status_handler=None, event_handler=None,
artifacts_handler=None, cancel_callback=None, finished_callback=None, **kwargs):
def __init__(self, config: RunnerConfig, _input: BinaryIO | None = None, quiet: bool = False):
if _input is None:
_input = sys.stdin.buffer
self._input = _input

self.quiet = kwargs.get('quiet')
self.quiet = quiet

private_data_dir = kwargs.get('private_data_dir')
if private_data_dir is None:
private_data_dir = tempfile.mkdtemp()
self.private_data_dir = private_data_dir
self.private_data_dir: str = config.private_data_dir or ''
self._loader = ArtifactLoader(self.private_data_dir)

settings = kwargs.get('settings')
settings = config.settings
if settings is None:
try:
settings = self._loader.load_file('env/settings', Mapping)
except ConfigurationError:
settings = {}
self.config = MockConfig(settings)

if kwargs.get('artifact_dir'):
self.artifact_dir = os.path.abspath(kwargs.get('artifact_dir'))
else:
project_artifacts = os.path.abspath(os.path.join(self.private_data_dir, 'artifacts'))
if ident := kwargs.get('ident'):
self.artifact_dir = os.path.join(project_artifacts, str(ident))
else:
self.artifact_dir = project_artifacts
self.artifact_dir = config.artifact_dir
if not config.ident_set_by_user:
# If an ident value was not explicitly supplied, for some reason, we don't bother with
# using a subdir named with the ident value.
self.artifact_dir, _ = os.path.split(self.artifact_dir)

self.status_handler = status_handler
self.event_handler = event_handler
self.artifacts_handler = artifacts_handler
self.status_handler = config.status_handler
self.event_handler = config.event_handler
self.artifacts_handler = config.artifacts_handler

self.cancel_callback = cancel_callback # FIXME: unused
self.finished_callback = finished_callback
self.cancel_callback = config.cancel_callback # FIXME: unused
self.finished_callback = config.finished_callback

self.status = "unstarted"
self.rc = None
Expand Down
38 changes: 25 additions & 13 deletions test/integration/test_transmit_worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import pytest

from ansible_runner import run
from ansible_runner.config.runner import RunnerConfig
from ansible_runner.exceptions import ConfigurationError
from ansible_runner.streaming import Transmitter, Worker, Processor

import ansible_runner.interface # AWX import pattern
Expand Down Expand Up @@ -74,7 +75,8 @@ def test_remote_job_interface(self, tmp_path, project_fixtures, job_type):
outgoing_buffer_file.touch()
outgoing_buffer = outgoing_buffer_file.open('b+r')

transmitter = Transmitter(_output=outgoing_buffer, private_data_dir=transmit_dir, **job_kwargs)
config = RunnerConfig(private_data_dir=transmit_dir, **job_kwargs)
transmitter = Transmitter(config, _output=outgoing_buffer)

for key, value in job_kwargs.items():
assert transmitter.kwargs.get(key, '') == value
Expand Down Expand Up @@ -141,10 +143,15 @@ def test_keepalive_setting(self, tmp_path, project_fixtures, keepalive_setting):
for buffer in (outgoing_buffer, incoming_buffer):
buffer.name = 'foo'

config = RunnerConfig(private_data_dir=project_fixtures / 'sleep',
playbook='sleep.yml',
extravars={'sleep_interval': 2},
verbosity=verbosity)

status, rc = Transmitter(
config,
only_transmit_kwargs=False,
_output=outgoing_buffer, private_data_dir=project_fixtures / 'sleep',
playbook='sleep.yml', extravars={'sleep_interval': 2}, verbosity=verbosity
_output=outgoing_buffer,
).run()
assert rc in (None, 0)
assert status == 'unstarted'
Expand Down Expand Up @@ -257,6 +264,9 @@ def process_method(results_sockfile_read):
break
time.sleep(0.05) # additionally, AWX calls cancel_callback()

res = process_future.result()
assert res.status == 'successful'

for s in (
transmit_socket_write, transmit_socket_read, results_socket_write, results_socket_read,
transmit_socket_write_file, transmit_socket_read_file, results_socket_write_file, results_socket_read_file,
Expand Down Expand Up @@ -292,7 +302,7 @@ def test_process_isolation_executable_not_exist(self, tmp_path, mocker):
**job_kwargs,
)

# valide process_isolation kwargs are passed to transmitter
# valid process_isolation kwargs are passed to transmitter
assert transmitter.kwargs['process_isolation'] == job_kwargs['process_isolation']
assert transmitter.kwargs['process_isolation_executable'] == job_kwargs['process_isolation_executable']

Expand Down Expand Up @@ -330,7 +340,7 @@ def test_process_isolation_executable_not_exist(self, tmp_path, mocker):
_output=incoming_buffer,
private_data_dir=worker_dir,
)
assert exc.value.code == 1
assert exc.value.code == 1
outgoing_buffer.close()
incoming_buffer.close()

Expand All @@ -341,8 +351,10 @@ def transmit_stream(project_fixtures, tmp_path):
outgoing_buffer.touch()

transmit_dir = project_fixtures / 'debug'
config = RunnerConfig(private_data_dir=transmit_dir, playbook='debug.yml')

with outgoing_buffer.open('wb') as f:
transmitter = Transmitter(only_transmit_kwargs=False, _output=f, private_data_dir=transmit_dir, playbook='debug.yml')
transmitter = Transmitter(config, only_transmit_kwargs=False, _output=f)
status, rc = transmitter.run()

assert rc in (None, 0)
Expand Down Expand Up @@ -448,15 +460,15 @@ def test_missing_private_dir_transmit():
outgoing_buffer = io.BytesIO()

# Transmit
with pytest.raises(ValueError) as excinfo:
run(
with pytest.raises(ConfigurationError) as excinfo:
ansible_runner.interface.run(
streamer='transmit',
_output=outgoing_buffer,
private_data_dir='/foo/bar/baz',
playbook='debug.yml',
)

assert "private_data_dir path is either invalid or does not exist" in str(excinfo.value)
assert "Unable to create private_data_dir /foo/bar/baz" in str(excinfo.value)


def test_garbage_private_dir_worker(tmp_path):
Expand All @@ -467,7 +479,7 @@ def test_garbage_private_dir_worker(tmp_path):
outgoing_buffer = io.BytesIO()

# Worker
run(
ansible_runner.interface.run(
streamer='worker',
_input=incoming_buffer,
_output=outgoing_buffer,
Expand All @@ -488,7 +500,7 @@ def test_unparsable_line_worker(tmp_path):
outgoing_buffer = io.BytesIO()

# Worker
run(
ansible_runner.interface.run(
streamer='worker',
_input=incoming_buffer,
_output=outgoing_buffer,
Expand All @@ -512,7 +524,7 @@ def status_receiver(status_data, runner_config): # pylint: disable=W0613
assert 'not-json-data with extra garbage:ffffffffff' in status_data['job_explanation']
assert len(status_data['job_explanation']) < 2000

run(
ansible_runner.interface.run(
streamer='process',
_input=incoming_buffer,
private_data_dir=process_dir,
Expand Down
Loading

0 comments on commit 368faa7

Please sign in to comment.