diff --git a/src/fondant/core/exceptions.py b/src/fondant/core/exceptions.py index 4143f389..5560b9aa 100644 --- a/src/fondant/core/exceptions.py +++ b/src/fondant/core/exceptions.py @@ -25,3 +25,7 @@ class InvalidTypeSchema(ValidationError, FondantException): class UnsupportedTypeAnnotation(FondantException): """Thrown when an unsupported type annotation is encountered during type inference.""" + + +class PipelineRunError(ValidationError, FondantException): + """Thrown when a pipeline run results in an error.""" diff --git a/src/fondant/pipeline/runner.py b/src/fondant/pipeline/runner.py index 1b5d4491..62b158c1 100644 --- a/src/fondant/pipeline/runner.py +++ b/src/fondant/pipeline/runner.py @@ -6,6 +6,7 @@ import yaml +from fondant.core.exceptions import PipelineRunError from fondant.pipeline import Pipeline from fondant.pipeline.compiler import ( DockerCompiler, @@ -38,15 +39,23 @@ def _run(self, input_spec: str, *args, **kwargs): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ] print("Starting pipeline run...") # copy the current environment with the DOCKER_DEFAULT_PLATFORM argument - subprocess.call( # nosec + output = subprocess.run( # nosec cmd, env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) + + if output.returncode != 0: + msg = f"Command failed with error: '{output.stderr}'" + raise PipelineRunError(msg) + print("Finished pipeline run.") def run( @@ -55,12 +64,11 @@ def run( *, extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None, build_args: t.Optional[t.List[str]] = None, - ) -> None: + ): """Run a pipeline, either from a compiled docker-compose spec or from a fondant pipeline. Args: input: the pipeline to compile or a path to a already compiled docker-compose spec - output_path: the path where to save the docker-compose spec extra_volumes: a list of extra volumes (using the Short syntax: https://docs.docker.com/compose/compose-file/05-services/#short-syntax-5) to mount in the docker-compose spec. @@ -258,8 +266,6 @@ def run( pipeline_name: the name of the pipeline to create role_arn: the Amazon Resource Name role to use for the processing steps, if none provided the `sagemaker.get_execution_role()` role will be used. - instance_type: the instance type to use for the processing steps - (see: https://aws.amazon.com/ec2/instance-types/ for options). """ if isinstance(input, Pipeline): os.makedirs(".fondant", exist_ok=True) diff --git a/tests/pipeline/test_runner.py b/tests/pipeline/test_runner.py index a59f63c8..be7c736a 100644 --- a/tests/pipeline/test_runner.py +++ b/tests/pipeline/test_runner.py @@ -5,6 +5,7 @@ from unittest import mock import pytest +from fondant.core.exceptions import PipelineRunError from fondant.pipeline import Pipeline from fondant.pipeline.runner import ( DockerRunner, @@ -22,11 +23,23 @@ ) -def test_docker_runner(): +@pytest.fixture() +def mock_subprocess_run(): + def _mock_subprocess_run(*args, **kwargs): + class MockCompletedProcess: + returncode = 0 + + return MockCompletedProcess() + + return _mock_subprocess_run + + +def test_docker_runner(mock_subprocess_run): """Test that the docker runner while mocking subprocess.call.""" - with mock.patch("subprocess.call") as mock_call: + with mock.patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run DockerRunner().run("some/path") - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -37,15 +50,19 @@ def test_docker_runner(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) -def test_docker_runner_from_pipeline(): - with mock.patch("subprocess.call") as mock_call: +def test_docker_runner_from_pipeline(mock_subprocess_run): + with mock.patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run DockerRunner().run(PIPELINE) - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -56,11 +73,25 @@ def test_docker_runner_from_pipeline(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) +def test_invalid_docker_run(): + """Test that the docker runner throws the correct error.""" + spec_path = "some/path" + resolved_spec_path = str(Path(spec_path).resolve()) + with pytest.raises( + PipelineRunError, + match=f"stat {resolved_spec_path}: no such file or directory", + ): + DockerRunner().run(spec_path) + + class MockKfpClient: def __init__(self, host): self.host = host diff --git a/tests/test_cli.py b/tests/test_cli.py index 48130982..23a4e2d0 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -44,6 +44,17 @@ def load(self): pass +@pytest.fixture() +def mock_subprocess_run(): + def _mock_subprocess_run(*args, **kwargs): + class MockCompletedProcess: + returncode = 0 + + return MockCompletedProcess() + + return _mock_subprocess_run + + @pytest.mark.parametrize("command", commands) def test_basic_invocation(command): """Test that the CLI (sub)commands can be invoked without errors.""" @@ -262,7 +273,7 @@ def test_sagemaker_compile(tmp_path_factory): ) -def test_local_run(): +def test_local_run(mock_subprocess_run): """Test that the run command works with different arguments.""" args = argparse.Namespace( local=True, @@ -275,9 +286,11 @@ def test_local_run(): extra_volumes=[], build_arg=[], ) - with patch("subprocess.call") as mock_call: + + with patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run run_local(args) - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -288,11 +301,15 @@ def test_local_run(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) - with patch("subprocess.call") as mock_call: + with patch("subprocess.run") as mock_run: + mock_run.side_effect = mock_subprocess_run args1 = argparse.Namespace( local=True, vertex=False, @@ -306,7 +323,7 @@ def test_local_run(): credentials=None, ) run_local(args1) - mock_call.assert_called_once_with( + mock_run.assert_called_once_with( [ "docker", "compose", @@ -317,12 +334,15 @@ def test_local_run(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", ) -def test_local_run_cloud_credentials(): +def test_local_run_cloud_credentials(mock_subprocess_run): namespace_creds_kwargs = [ {"auth_gcp": True, "auth_azure": False, "auth_aws": False}, {"auth_gcp": False, "auth_azure": True, "auth_aws": False}, @@ -333,8 +353,10 @@ def test_local_run_cloud_credentials(): with patch( "fondant.pipeline.compiler.DockerCompiler.compile", ) as mock_compiler, patch( - "subprocess.call", + "subprocess.run", ) as mock_runner: + mock_runner.side_effect = mock_subprocess_run + args = argparse.Namespace( local=True, vertex=False, @@ -360,7 +382,6 @@ def test_local_run_cloud_credentials(): output_path=".fondant/compose.yaml", build_args=[], ) - mock_runner.assert_called_once_with( [ "docker", @@ -372,8 +393,11 @@ def test_local_run_cloud_credentials(): "--pull", "always", "--remove-orphans", + "--abort-on-container-exit", ], env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"), + capture_output=True, + encoding="utf8", )