Skip to content

Commit

Permalink
Create local artifact directory if it does not exist (#847)
Browse files Browse the repository at this point in the history
This bothers me already for some time. Change the mounting of the local
base path in the docker compile. Now a local base path is created.
  • Loading branch information
mrchtr authored Feb 9, 2024
1 parent 7ec04dc commit ac23862
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 25 deletions.
1 change: 0 additions & 1 deletion examples/sample_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from fondant.pipeline import Pipeline, lightweight_component

BASE_PATH = Path("./.artifacts").resolve()
BASE_PATH.mkdir(parents=True, exist_ok=True)

# Define pipeline
pipeline = Pipeline(name="dummy-pipeline", base_path=str(BASE_PATH))
Expand Down
53 changes: 38 additions & 15 deletions src/fondant/pipeline/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pathlib import Path

import yaml
from fsspec.registry import known_implementations

from fondant.core.component_spec import ComponentSpec
from fondant.core.exceptions import InvalidPipelineDefinition
Expand Down Expand Up @@ -137,23 +138,45 @@ def _patch_path(base_path: str) -> t.Tuple[str, t.Optional[DockerVolume]]:
if local it patches the base_path and prepares a bind mount
Returns a tuple containing the path and volume.
"""

def is_remote_path(path: Path) -> bool:
"""Check if the path is remote."""
_path = str(path)
prefixes = set(known_implementations.keys()) - {"local", "file"}
return any(_path.startswith(prefix) for prefix in prefixes)

def resolve_local_base_path(base_path: Path) -> Path:
"""Resolve local base path and create base directory if it no exist."""
p_base_path = base_path.resolve()
try:
if p_base_path.exists():
logger.info(
f"Base path found on local system, setting up {base_path} as mount volume",
)
else:
p_base_path.mkdir(parents=True, exist_ok=True)
logger.info(
f"Base path not found on local system, created base path and setting up "
f"{base_path} as mount volume",
)
except Exception as e:
msg = f"Unable to create and mount local base path. {e}"
raise ValueError(msg)

return p_base_path

p_base_path = Path(base_path)
# check if base path is an existing local folder
if p_base_path.exists():
logger.info(
f"Base path found on local system, setting up {base_path} as mount volume",
)
p_base_path = p_base_path.resolve()
volume = DockerVolume(
type="bind",
source=str(p_base_path),
target=f"/{p_base_path.stem}",
)
path = f"/{p_base_path.stem}"
else:
if is_remote_path(p_base_path):
logger.info(f"Base path {base_path} is remote")
volume = None
path = base_path
return base_path, None

p_base_path = resolve_local_base_path(p_base_path)
volume = DockerVolume(
type="bind",
source=str(p_base_path),
target=f"/{p_base_path.stem}",
)
path = f"/{p_base_path.stem}"
return path, volume

def _generate_spec(
Expand Down
42 changes: 41 additions & 1 deletion tests/pipeline/test_compiler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import json
import os
import re
import subprocess
import sys
import tempfile
Expand Down Expand Up @@ -168,6 +169,7 @@ def test_docker_compiler(setup_pipeline, tmp_path_factory):
example_dir, pipeline, _ = setup_pipeline
compiler = DockerCompiler()
with tmp_path_factory.mktemp("temp") as fn:
pipeline.base_path = str(fn)
output_path = str(fn / "docker-compose.yml")
compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[])
pipeline_configs = DockerComposeConfigs.from_spec(output_path)
Expand Down Expand Up @@ -336,6 +338,7 @@ def test_docker_configuration(tmp_path_factory):

compiler = DockerCompiler()
with tmp_path_factory.mktemp("temp") as fn:
pipeline.base_path = str(fn)
output_path = str(fn / "docker-compose.yaml")
compiler.compile(pipeline=pipeline, output_path=output_path)
pipeline_configs = DockerComposeConfigs.from_spec(output_path)
Expand Down Expand Up @@ -363,7 +366,10 @@ def test_invalid_docker_configuration(tmp_path_factory):
)

compiler = DockerCompiler()
with pytest.raises(InvalidPipelineDefinition):
with tmp_path_factory.mktemp("temp") as fn, pytest.raises( # noqa PT012
InvalidPipelineDefinition,
):
pipeline.base_path = str(fn)
compiler.compile(pipeline=pipeline, output_path="kubeflow_pipeline.yml")


Expand Down Expand Up @@ -663,6 +669,7 @@ def test_caching_dependency_docker(tmp_path_factory):
)

with tmp_path_factory.mktemp("temp") as fn:
pipeline.base_path = str(fn)
output_path = str(fn / "docker-compose.yml")
compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[])
pipeline_configs = DockerComposeConfigs.from_spec(output_path)
Expand Down Expand Up @@ -844,3 +851,36 @@ def test_sagemaker_base_path_validator():

# valid
compiler.validate_base_path("s3://foo/bar")


@pytest.mark.usefixtures("_freeze_time")
def test_docker_compiler_create_local_base_path(setup_pipeline, tmp_path_factory):
"""Test compiling a pipeline to docker-compose."""
example_dir, pipeline, _ = setup_pipeline
compiler = DockerCompiler()
with tmp_path_factory.mktemp("temp") as fn:
pipeline.base_path = str(fn) + "/my-artifacts"
output_path = str(fn / "docker-compose.yml")
compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[])
assert Path(pipeline.base_path).exists()


@pytest.mark.usefixtures("_freeze_time")
def test_docker_compiler_create_local_base_path_propagate_exception(
setup_pipeline,
tmp_path_factory,
):
"""Test compiling a pipeline to docker-compose."""
example_dir, pipeline, _ = setup_pipeline
compiler = DockerCompiler()
msg = re.escape(
"Unable to create and mount local base path. ",
)

with tmp_path_factory.mktemp("temp") as fn, pytest.raises( # noqa PT012
ValueError,
match=msg,
):
pipeline.base_path = "/my-artifacts"
output_path = str(fn / "docker-compose.yml")
compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[])
13 changes: 10 additions & 3 deletions tests/pipeline/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,16 @@ def test_docker_runner(mock_docker_installation):
)


def test_docker_runner_from_pipeline(mock_docker_installation):
with mock.patch("subprocess.call") as mock_call:
DockerRunner().run(PIPELINE)
def test_docker_runner_from_pipeline(mock_docker_installation, tmp_path_factory):
with mock.patch("subprocess.call") as mock_call, tmp_path_factory.mktemp(
"temp",
) as fn:
pipeline = Pipeline(
name="testpipeline",
description="description of the test pipeline",
base_path=str(fn),
)
DockerRunner().run(pipeline)
mock_call.assert_called_once_with(
[
"docker",
Expand Down
10 changes: 5 additions & 5 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def test_local_run(mock_docker_installation):
"""Test that the run command works with different arguments."""
args = argparse.Namespace(
local=True,
ref="some/path",
ref=__name__,
output_path=None,
auth_provider=None,
credentials=None,
Expand All @@ -284,7 +284,7 @@ def test_local_run(mock_docker_installation):
"docker",
"compose",
"-f",
"some/path",
".fondant/compose.yaml",
"up",
"--build",
"--pull",
Expand Down Expand Up @@ -372,7 +372,7 @@ def test_kfp_run(tmp_path_factory):
local=False,
vertex=False,
output_path=None,
ref="some/path",
ref=__name__,
host=None,
)
with pytest.raises(
Expand All @@ -386,7 +386,7 @@ def test_kfp_run(tmp_path_factory):
local=False,
output_path=None,
host="localhost",
ref="some/path",
ref=__name__,
)
run_kfp(args)
mock_runner.assert_called_once_with(host="localhost")
Expand Down Expand Up @@ -419,7 +419,7 @@ def test_vertex_run(tmp_path_factory):
project_id="project-123",
service_account=None,
network=None,
ref="some/path",
ref=__name__,
)
run_vertex(args)
mock_runner.assert_called_once_with(
Expand Down

0 comments on commit ac23862

Please sign in to comment.