Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Escape output and error in launchSubProcess #1990

Merged
merged 4 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ warn_unused_configs = true

[[tool.mypy.overrides]]
module = [
"src.archivematicaCommon.lib.executeOrRunSubProcess",
"src.MCPClient.lib.client.*",
"src.MCPClient.lib.clientScripts.characterize_file",
"src.MCPClient.lib.clientScripts.has_packages",
Expand All @@ -64,6 +65,7 @@ module = [
"src.MCPClient.lib.clientScripts.policy_check",
"src.MCPClient.lib.clientScripts.transcribe_file",
"src.MCPClient.lib.clientScripts.validate_file",
"tests.archivematicaCommon.test_execute_functions",
"tests.dashboard.fpr.test_views",
"tests.MCPClient.conftest",
"tests.MCPClient.test_characterize_file",
Expand Down
74 changes: 50 additions & 24 deletions src/archivematicaCommon/lib/executeOrRunSubProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,29 @@
import subprocess
import sys
import tempfile
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

from archivematicaFunctions import escape

Arguments = List[str]
Input = Union[str, bytes, io.IOBase]
Environment = Dict[str, str]
Command = Union[str, List[str]]
Result = Tuple[int, str, str]


def launchSubProcess(
command,
stdIn="",
printing=True,
arguments=None,
env_updates=None,
capture_output=False,
):
command: Command,
stdIn: Input = "",
printing: bool = True,
arguments: Optional[Arguments] = None,
env_updates: Optional[Environment] = None,
capture_output: bool = False,
) -> Result:
"""
Launches a subprocess using ``command``, where ``command`` is either:
a) a single string containing a commandline statement, or
Expand Down Expand Up @@ -89,7 +102,7 @@
stdin_pipe = subprocess.PIPE
communicate_input = stdIn
elif isinstance(stdIn, io.IOBase):
stdin_pipe = stdIn
stdin_pipe = stdIn.fileno()

Check warning on line 105 in src/archivematicaCommon/lib/executeOrRunSubProcess.py

View check run for this annotation

Codecov / codecov/patch

src/archivematicaCommon/lib/executeOrRunSubProcess.py#L105

Added line #L105 was not covered by tests
communicate_input = None
else:
raise Exception("stdIn must be a string or a file object")
Expand All @@ -103,8 +116,8 @@
env=my_env,
)
std_out, std_error = p.communicate(input=communicate_input)
stdOut = std_out.decode()
stdError = std_error.decode()
stdOut = escape(std_out)
stdError = escape(std_error)
else:
# Ignore the stdout of the subprocess, capturing only stderr
with open(os.devnull, "w") as devnull:
Expand All @@ -116,7 +129,7 @@
stderr=subprocess.PIPE,
)
__, std_error = p.communicate(input=communicate_input)
stdError = std_error.decode()
stdError = escape(std_error)
retcode = p.returncode
# If we are not capturing output and the subprocess has succeeded, set
# its stderr to the empty string.
Expand All @@ -139,8 +152,13 @@


def createAndRunScript(
text, stdIn="", printing=True, arguments=None, env_updates=None, capture_output=True
):
text: Command,
stdIn: Input = "",
printing: bool = True,
arguments: Optional[Arguments] = None,
env_updates: Optional[Environment] = None,
capture_output: bool = True,
) -> Result:
if arguments is None:
arguments = []
if env_updates is None:
Expand All @@ -150,7 +168,10 @@
encoding="utf-8", mode="wt", delete=False
) as tmpfile:
os.chmod(tmpfile.name, 0o770)
tmpfile.write(text)
if isinstance(text, str):
tmpfile.write(text)
else:
tmpfile.write(" ".join(text))

Check warning on line 174 in src/archivematicaCommon/lib/executeOrRunSubProcess.py

View check run for this annotation

Codecov / codecov/patch

src/archivematicaCommon/lib/executeOrRunSubProcess.py#L174

Added line #L174 was not covered by tests
tmpfile.close()
cmd = [tmpfile.name]
cmd.extend(arguments)
Expand All @@ -168,14 +189,14 @@


def executeOrRun(
type,
text,
stdIn="",
printing=True,
arguments=None,
env_updates=None,
capture_output=True,
):
type: str,
text: Command,
stdIn: Input = "",
printing: bool = True,
arguments: Optional[Arguments] = None,
env_updates: Optional[Environment] = None,
capture_output: bool = True,
) -> Result:
"""
Attempts to run the provided command on the shell, with the text of
"stdIn" passed as standard input if provided. The type parameter
Expand Down Expand Up @@ -220,7 +241,9 @@
capture_output=capture_output,
)
if type == "bashScript":
text = "#!/bin/bash\n" + text
if not isinstance(text, str):
raise ValueError("command must be a str")
text = f"#!/bin/bash\n{text}"

Check warning on line 246 in src/archivematicaCommon/lib/executeOrRunSubProcess.py

View check run for this annotation

Codecov / codecov/patch

src/archivematicaCommon/lib/executeOrRunSubProcess.py#L245-L246

Added lines #L245 - L246 were not covered by tests
return createAndRunScript(
text,
stdIn=stdIn,
Expand All @@ -230,7 +253,9 @@
capture_output=capture_output,
)
if type == "pythonScript":
text = "#!/usr/bin/env python\n" + text
if not isinstance(text, str):
raise ValueError("command must be a str")
text = f"#!/usr/bin/env python\n{text}"

Check warning on line 258 in src/archivematicaCommon/lib/executeOrRunSubProcess.py

View check run for this annotation

Codecov / codecov/patch

src/archivematicaCommon/lib/executeOrRunSubProcess.py#L257-L258

Added lines #L257 - L258 were not covered by tests
return createAndRunScript(
text,
stdIn=stdIn,
Expand All @@ -248,3 +273,4 @@
env_updates=env_updates,
capture_output=capture_output,
)
raise ValueError(f"unknown type {type}")
30 changes: 27 additions & 3 deletions tests/archivematicaCommon/test_execute_functions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import pathlib
import shlex
import tempfile
from typing import Generator
from unittest.mock import ANY
from unittest.mock import Mock
from unittest.mock import patch

import executeOrRunSubProcess as execsub
import pytest


def test_capture_output():
def test_capture_output() -> None:
"""Tests behaviour of capture_output when executing sub processes."""

# Test that stdout and stderr are not captured by default
Expand Down Expand Up @@ -61,7 +64,7 @@ def test_capture_output():


@pytest.fixture
def temp_path(tmp_path):
def temp_path(tmp_path: pathlib.Path) -> Generator[str, None, None]:
"""Creates custom temp path, yields the value, and resets to original value."""

original_tempdir = tempfile.tempdir
Expand All @@ -73,7 +76,9 @@ def temp_path(tmp_path):


@patch("executeOrRunSubProcess.launchSubProcess")
def test_createAndRunScript_creates_tmpfile_in_custom_dir(launchSubProcess, temp_path):
def test_createAndRunScript_creates_tmpfile_in_custom_dir(
launchSubProcess: Mock, temp_path: str
) -> None:
"""Tests execution of launchSubProcess when executing createAndRunScript."""

script_content = "#!/bin/bash\necho 'Script output'\nexit 0"
Expand All @@ -89,3 +94,22 @@ def test_createAndRunScript_creates_tmpfile_in_custom_dir(launchSubProcess, temp
)
args, _ = launchSubProcess.call_args
assert args[0][0].startswith(temp_path)


@patch("subprocess.Popen")
def test_launchSubProcess_replaces_non_utf8_output_with_replacement_characters(
popen: Mock,
) -> None:
communicate_return_code = 0
communicate_output = b"Output \xae"
communicate_error = b"Error \xae"
popen.return_value = Mock(
returncode=communicate_return_code,
**{"communicate.return_value": (communicate_output, communicate_error)},
)

code, stdout, stderr = execsub.launchSubProcess("mycommand", capture_output=True)

assert code == communicate_return_code
assert stdout == communicate_output.decode(errors="replace")
assert stderr == communicate_error.decode(errors="replace")