Skip to content

Commit

Permalink
Escape output and error in launchSubProcess
Browse files Browse the repository at this point in the history
  • Loading branch information
replaceafill authored Sep 12, 2024
1 parent 0212db1 commit a78868e
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 27 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ warn_unused_configs = true

[[tool.mypy.overrides]]
module = [
"src.archivematicaCommon.lib.executeOrRunSubProcess",
"src.MCPClient.lib.client.*",
"src.MCPClient.lib.clientScripts.characterize_file",
"src.MCPClient.lib.clientScripts.has_packages",
Expand All @@ -64,6 +65,7 @@ module = [
"src.MCPClient.lib.clientScripts.policy_check",
"src.MCPClient.lib.clientScripts.transcribe_file",
"src.MCPClient.lib.clientScripts.validate_file",
"tests.archivematicaCommon.test_execute_functions",
"tests.dashboard.fpr.test_views",
"tests.MCPClient.conftest",
"tests.MCPClient.test_characterize_file",
Expand Down
74 changes: 50 additions & 24 deletions src/archivematicaCommon/lib/executeOrRunSubProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,29 @@
import subprocess
import sys
import tempfile
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

from archivematicaFunctions import escape

Arguments = List[str]
Input = Union[str, bytes, io.IOBase]
Environment = Dict[str, str]
Command = Union[str, List[str]]
Result = Tuple[int, str, str]


def launchSubProcess(
command,
stdIn="",
printing=True,
arguments=None,
env_updates=None,
capture_output=False,
):
command: Command,
stdIn: Input = "",
printing: bool = True,
arguments: Optional[Arguments] = None,
env_updates: Optional[Environment] = None,
capture_output: bool = False,
) -> Result:
"""
Launches a subprocess using ``command``, where ``command`` is either:
a) a single string containing a commandline statement, or
Expand Down Expand Up @@ -89,7 +102,7 @@ def launchSubProcess(
stdin_pipe = subprocess.PIPE
communicate_input = stdIn
elif isinstance(stdIn, io.IOBase):
stdin_pipe = stdIn
stdin_pipe = stdIn.fileno()
communicate_input = None
else:
raise Exception("stdIn must be a string or a file object")
Expand All @@ -103,8 +116,8 @@ def launchSubProcess(
env=my_env,
)
std_out, std_error = p.communicate(input=communicate_input)
stdOut = std_out.decode()
stdError = std_error.decode()
stdOut = escape(std_out)
stdError = escape(std_error)
else:
# Ignore the stdout of the subprocess, capturing only stderr
with open(os.devnull, "w") as devnull:
Expand All @@ -116,7 +129,7 @@ def launchSubProcess(
stderr=subprocess.PIPE,
)
__, std_error = p.communicate(input=communicate_input)
stdError = std_error.decode()
stdError = escape(std_error)
retcode = p.returncode
# If we are not capturing output and the subprocess has succeeded, set
# its stderr to the empty string.
Expand All @@ -139,8 +152,13 @@ def launchSubProcess(


def createAndRunScript(
text, stdIn="", printing=True, arguments=None, env_updates=None, capture_output=True
):
text: Command,
stdIn: Input = "",
printing: bool = True,
arguments: Optional[Arguments] = None,
env_updates: Optional[Environment] = None,
capture_output: bool = True,
) -> Result:
if arguments is None:
arguments = []
if env_updates is None:
Expand All @@ -150,7 +168,10 @@ def createAndRunScript(
encoding="utf-8", mode="wt", delete=False
) as tmpfile:
os.chmod(tmpfile.name, 0o770)
tmpfile.write(text)
if isinstance(text, str):
tmpfile.write(text)
else:
tmpfile.write(" ".join(text))
tmpfile.close()
cmd = [tmpfile.name]
cmd.extend(arguments)
Expand All @@ -168,14 +189,14 @@ def createAndRunScript(


def executeOrRun(
type,
text,
stdIn="",
printing=True,
arguments=None,
env_updates=None,
capture_output=True,
):
type: str,
text: Command,
stdIn: Input = "",
printing: bool = True,
arguments: Optional[Arguments] = None,
env_updates: Optional[Environment] = None,
capture_output: bool = True,
) -> Result:
"""
Attempts to run the provided command on the shell, with the text of
"stdIn" passed as standard input if provided. The type parameter
Expand Down Expand Up @@ -220,7 +241,9 @@ def executeOrRun(
capture_output=capture_output,
)
if type == "bashScript":
text = "#!/bin/bash\n" + text
if not isinstance(text, str):
raise ValueError("command must be a str")
text = f"#!/bin/bash\n{text}"
return createAndRunScript(
text,
stdIn=stdIn,
Expand All @@ -230,7 +253,9 @@ def executeOrRun(
capture_output=capture_output,
)
if type == "pythonScript":
text = "#!/usr/bin/env python\n" + text
if not isinstance(text, str):
raise ValueError("command must be a str")
text = f"#!/usr/bin/env python\n{text}"
return createAndRunScript(
text,
stdIn=stdIn,
Expand All @@ -248,3 +273,4 @@ def executeOrRun(
env_updates=env_updates,
capture_output=capture_output,
)
raise ValueError(f"unknown type {type}")
30 changes: 27 additions & 3 deletions tests/archivematicaCommon/test_execute_functions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import pathlib
import shlex
import tempfile
from typing import Generator
from unittest.mock import ANY
from unittest.mock import Mock
from unittest.mock import patch

import executeOrRunSubProcess as execsub
import pytest


def test_capture_output():
def test_capture_output() -> None:
"""Tests behaviour of capture_output when executing sub processes."""

# Test that stdout and stderr are not captured by default
Expand Down Expand Up @@ -61,7 +64,7 @@ def test_capture_output():


@pytest.fixture
def temp_path(tmp_path):
def temp_path(tmp_path: pathlib.Path) -> Generator[str, None, None]:
"""Creates custom temp path, yields the value, and resets to original value."""

original_tempdir = tempfile.tempdir
Expand All @@ -73,7 +76,9 @@ def temp_path(tmp_path):


@patch("executeOrRunSubProcess.launchSubProcess")
def test_createAndRunScript_creates_tmpfile_in_custom_dir(launchSubProcess, temp_path):
def test_createAndRunScript_creates_tmpfile_in_custom_dir(
launchSubProcess: Mock, temp_path: str
) -> None:
"""Tests execution of launchSubProcess when executing createAndRunScript."""

script_content = "#!/bin/bash\necho 'Script output'\nexit 0"
Expand All @@ -89,3 +94,22 @@ def test_createAndRunScript_creates_tmpfile_in_custom_dir(launchSubProcess, temp
)
args, _ = launchSubProcess.call_args
assert args[0][0].startswith(temp_path)


@patch("subprocess.Popen")
def test_launchSubProcess_replaces_non_utf8_output_with_replacement_characters(
popen: Mock,
) -> None:
communicate_return_code = 0
communicate_output = b"Output \xae"
communicate_error = b"Error \xae"
popen.return_value = Mock(
returncode=communicate_return_code,
**{"communicate.return_value": (communicate_output, communicate_error)},
)

code, stdout, stderr = execsub.launchSubProcess("mycommand", capture_output=True)

assert code == communicate_return_code
assert stdout == communicate_output.decode(errors="replace")
assert stderr == communicate_error.decode(errors="replace")

0 comments on commit a78868e

Please sign in to comment.