Skip to content

Commit

Permalink
Handle docker compose errors (#769)
Browse files Browse the repository at this point in the history
PR that handles errors when docker compose faces an issue, we now a
raise a valid python error with the error command that is being thrown.
This make it more clear for the user that something went off with the
pipeline.

@janvanlooy
  • Loading branch information
PhilippeMoussalli authored Jan 11, 2024
1 parent c67f7a1 commit 16888c8
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 19 deletions.
4 changes: 4 additions & 0 deletions src/fondant/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ class InvalidTypeSchema(ValidationError, FondantException):

class UnsupportedTypeAnnotation(FondantException):
"""Thrown when an unsupported type annotation is encountered during type inference."""


class PipelineRunError(ValidationError, FondantException):
"""Thrown when a pipeline run results in an error."""
16 changes: 11 additions & 5 deletions src/fondant/pipeline/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import yaml

from fondant.core.exceptions import PipelineRunError
from fondant.pipeline import Pipeline
from fondant.pipeline.compiler import (
DockerCompiler,
Expand Down Expand Up @@ -38,15 +39,23 @@ def _run(self, input_spec: str, *args, **kwargs):
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
]

print("Starting pipeline run...")

# copy the current environment with the DOCKER_DEFAULT_PLATFORM argument
subprocess.call( # nosec
output = subprocess.run( # nosec
cmd,
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)

if output.returncode != 0:
msg = f"Command failed with error: '{output.stderr}'"
raise PipelineRunError(msg)

print("Finished pipeline run.")

def run(
Expand All @@ -55,12 +64,11 @@ def run(
*,
extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None,
build_args: t.Optional[t.List[str]] = None,
) -> None:
):
"""Run a pipeline, either from a compiled docker-compose spec or from a fondant pipeline.
Args:
input: the pipeline to compile or a path to a already compiled docker-compose spec
output_path: the path where to save the docker-compose spec
extra_volumes: a list of extra volumes (using the Short syntax:
https://docs.docker.com/compose/compose-file/05-services/#short-syntax-5)
to mount in the docker-compose spec.
Expand Down Expand Up @@ -258,8 +266,6 @@ def run(
pipeline_name: the name of the pipeline to create
role_arn: the Amazon Resource Name role to use for the processing steps,
if none provided the `sagemaker.get_execution_role()` role will be used.
instance_type: the instance type to use for the processing steps
(see: https://aws.amazon.com/ec2/instance-types/ for options).
"""
if isinstance(input, Pipeline):
os.makedirs(".fondant", exist_ok=True)
Expand Down
43 changes: 37 additions & 6 deletions tests/pipeline/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from unittest import mock

import pytest
from fondant.core.exceptions import PipelineRunError
from fondant.pipeline import Pipeline
from fondant.pipeline.runner import (
DockerRunner,
Expand All @@ -22,11 +23,23 @@
)


def test_docker_runner():
@pytest.fixture()
def mock_subprocess_run():
def _mock_subprocess_run(*args, **kwargs):
class MockCompletedProcess:
returncode = 0

return MockCompletedProcess()

return _mock_subprocess_run


def test_docker_runner(mock_subprocess_run):
"""Test that the docker runner while mocking subprocess.call."""
with mock.patch("subprocess.call") as mock_call:
with mock.patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
DockerRunner().run("some/path")
mock_call.assert_called_once_with(
mock_run.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -37,15 +50,19 @@ def test_docker_runner():
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


def test_docker_runner_from_pipeline():
with mock.patch("subprocess.call") as mock_call:
def test_docker_runner_from_pipeline(mock_subprocess_run):
with mock.patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
DockerRunner().run(PIPELINE)
mock_call.assert_called_once_with(
mock_run.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -56,11 +73,25 @@ def test_docker_runner_from_pipeline():
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


def test_invalid_docker_run():
"""Test that the docker runner throws the correct error."""
spec_path = "some/path"
resolved_spec_path = str(Path(spec_path).resolve())
with pytest.raises(
PipelineRunError,
match=f"stat {resolved_spec_path}: no such file or directory",
):
DockerRunner().run(spec_path)


class MockKfpClient:
def __init__(self, host):
self.host = host
Expand Down
40 changes: 32 additions & 8 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ def load(self):
pass


@pytest.fixture()
def mock_subprocess_run():
def _mock_subprocess_run(*args, **kwargs):
class MockCompletedProcess:
returncode = 0

return MockCompletedProcess()

return _mock_subprocess_run


@pytest.mark.parametrize("command", commands)
def test_basic_invocation(command):
"""Test that the CLI (sub)commands can be invoked without errors."""
Expand Down Expand Up @@ -262,7 +273,7 @@ def test_sagemaker_compile(tmp_path_factory):
)


def test_local_run():
def test_local_run(mock_subprocess_run):
"""Test that the run command works with different arguments."""
args = argparse.Namespace(
local=True,
Expand All @@ -275,9 +286,11 @@ def test_local_run():
extra_volumes=[],
build_arg=[],
)
with patch("subprocess.call") as mock_call:

with patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
run_local(args)
mock_call.assert_called_once_with(
mock_run.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -288,11 +301,15 @@ def test_local_run():
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)

with patch("subprocess.call") as mock_call:
with patch("subprocess.run") as mock_run:
mock_run.side_effect = mock_subprocess_run
args1 = argparse.Namespace(
local=True,
vertex=False,
Expand All @@ -306,7 +323,7 @@ def test_local_run():
credentials=None,
)
run_local(args1)
mock_call.assert_called_once_with(
mock_run.assert_called_once_with(
[
"docker",
"compose",
Expand All @@ -317,12 +334,15 @@ def test_local_run():
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


def test_local_run_cloud_credentials():
def test_local_run_cloud_credentials(mock_subprocess_run):
namespace_creds_kwargs = [
{"auth_gcp": True, "auth_azure": False, "auth_aws": False},
{"auth_gcp": False, "auth_azure": True, "auth_aws": False},
Expand All @@ -333,8 +353,10 @@ def test_local_run_cloud_credentials():
with patch(
"fondant.pipeline.compiler.DockerCompiler.compile",
) as mock_compiler, patch(
"subprocess.call",
"subprocess.run",
) as mock_runner:
mock_runner.side_effect = mock_subprocess_run

args = argparse.Namespace(
local=True,
vertex=False,
Expand All @@ -360,7 +382,6 @@ def test_local_run_cloud_credentials():
output_path=".fondant/compose.yaml",
build_args=[],
)

mock_runner.assert_called_once_with(
[
"docker",
Expand All @@ -372,8 +393,11 @@ def test_local_run_cloud_credentials():
"--pull",
"always",
"--remove-orphans",
"--abort-on-container-exit",
],
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
capture_output=True,
encoding="utf8",
)


Expand Down

0 comments on commit 16888c8

Please sign in to comment.