Skip to content

Commit

Permalink
Create separate class for metadata (#372)
Browse files Browse the repository at this point in the history
PR that creates a metadata class, this will make it easier to implement
#368 (was originally part of #325 but decided to break it down to make
it easier to review).

Few other notable changes: 

- The `run_id` between both runners has now an identical format
(name_timestamp), we no longer need the uid of kfp since it's just used
to store the native output artifacts
- The `safe_component_name` has been moved from the local runner to the
component spec to avoid having to plug it everywhere

---------

Co-authored-by: Georges Lorré <[email protected]>
  • Loading branch information
PhilippeMoussalli and GeorgesLorre authored Aug 22, 2023
1 parent ce048d9 commit ea50d75
Show file tree
Hide file tree
Showing 29 changed files with 563 additions and 379 deletions.
3 changes: 2 additions & 1 deletion docs/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ the data is stored and references to the pipeline and component that were used t
```json
{
"metadata": {
"pipeline_name": "pipeline_name",
"base_path": "gs://bucket",
"run_id": "12345",
"run_id": "pipeline_name_12345",
"component_id": "67890"
}
}
Expand Down
72 changes: 28 additions & 44 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import json
import logging
import typing as t
Expand All @@ -8,6 +7,7 @@

import yaml

from fondant.manifest import Metadata
from fondant.pipeline import Pipeline

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -37,19 +37,6 @@ class DockerVolume:
target: str


@dataclass
class MetaData:
"""Dataclass representing the metadata arguments of a pipeline.
Args:
run_id: identifier of the current pipeline run
base_path: the base path used to store the artifacts.
"""

run_id: str
base_path: str


class DockerCompiler(Compiler):
"""Compiler that creates a docker-compose spec from a pipeline."""

Expand Down Expand Up @@ -90,13 +77,6 @@ def ignore_aliases(self, data):

logger.info(f"Successfully compiled to {output_path}")

@staticmethod
def _safe_component_name(component_name: str) -> str:
"""Transform a component name to a docker-compose friendly one.
eg: `Component A` -> `component_a`.
"""
return component_name.replace(" ", "_").lower()

@staticmethod
def _patch_path(base_path: str) -> t.Tuple[str, t.Optional[DockerVolume]]:
"""Helper that checks if the base_path is local or remote,
Expand Down Expand Up @@ -132,29 +112,33 @@ def _generate_spec(
"""Generate a docker-compose spec as a python dictionary,
loops over the pipeline graph to create services and their dependencies.
"""
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
path, volume = self._patch_path(base_path=pipeline.base_path)
run_id = f"{pipeline.name}-{timestamp}"
metadata = MetaData(run_id=run_id, base_path=path)
run_id = pipeline.get_run_id()

services = {}

pipeline.validate(run_id=run_id)

for component_name, component in pipeline._graph.items():
metadata = Metadata(
pipeline_name=pipeline.name,
run_id=run_id,
base_path=path,
component_id=component_name,
)

logger.info(f"Compiling service for {component_name}")
safe_component_name = self._safe_component_name(component_name)

component_op = component["fondant_component_op"]

# add metadata argument to command
command = ["--metadata", json.dumps(asdict(metadata))]
command = ["--metadata", metadata.to_json()]

# add in and out manifest paths to command
command.extend(
[
"--output_manifest_path",
f"{path}/{safe_component_name}/manifest.json",
f"{path}/{component_name}/manifest.json",
],
)

Expand All @@ -169,15 +153,14 @@ def _generate_spec(
depends_on = {}
if component["dependencies"]:
for dependency in component["dependencies"]:
safe_dependency = self._safe_component_name(dependency)
depends_on[safe_dependency] = {
depends_on[dependency] = {
"condition": "service_completed_successfully",
}
# there is only an input manifest if the component has dependencies
command.extend(
[
"--input_manifest_path",
f"{path}/{safe_dependency}/manifest.json",
f"{path}/{dependency}/manifest.json",
],
)

Expand All @@ -187,7 +170,7 @@ def _generate_spec(
if extra_volumes:
volumes.extend(extra_volumes)

services[safe_component_name] = {
services[component_name] = {
"command": command,
"depends_on": depends_on,
"volumes": volumes,
Expand All @@ -197,14 +180,12 @@ def _generate_spec(
logger.info(
f"Found Dockerfile for {component_name}, adding build step.",
)
services[safe_component_name]["build"] = {
services[component_name]["build"] = {
"context": str(component_op.component_dir),
"args": build_args,
}
else:
services[safe_component_name][
"image"
] = component_op.component_spec.image
services[component_name]["image"] = component_op.component_spec.image
return {
"name": pipeline.name,
"version": "3.8",
Expand Down Expand Up @@ -242,12 +223,21 @@ def compile(
pipeline: the pipeline to compile
output_path: the path where to save the Kubeflow pipeline spec
"""
run_id = pipeline.get_run_id()

@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description)
def kfp_pipeline():
previous_component_task = None
manifest_path = ""
for component_name, component in self.pipeline._graph.items():

for component_name, component in pipeline._graph.items():
metadata = Metadata(
pipeline_name=pipeline.name,
run_id=run_id,
base_path=pipeline.base_path,
component_id=component_name,
)

logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]
Expand All @@ -259,16 +249,10 @@ def kfp_pipeline():
# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.
component_args = component_op.arguments
metadata = json.dumps(
{
"base_path": self.pipeline.base_path,
"run_id": "{{workflow.name}}",
},
)

component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata,
metadata=metadata.to_json(),
**component_args,
)
# Set optional configurations
Expand All @@ -287,7 +271,7 @@ def kfp_pipeline():
previous_component_task = component_task

self.pipeline = pipeline
self.pipeline.validate(run_id="{{workflow.name}}")
self.pipeline.validate(run_id=run_id)
logger.info(f"Compiling {self.pipeline.name} to {output_path}")

self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore
Expand Down
9 changes: 5 additions & 4 deletions src/fondant/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)
from fondant.component_spec import Argument, ComponentSpec, kubeflow2python_type
from fondant.data_io import DaskDataLoader, DaskDataWriter
from fondant.manifest import Manifest
from fondant.manifest import Manifest, Metadata
from fondant.schema import validate_partition_number

logger = logging.getLogger(__name__)
Expand All @@ -46,7 +46,7 @@ def __init__(
self.spec = spec
self.input_manifest_path = input_manifest_path
self.output_manifest_path = output_manifest_path
self.metadata = metadata
self.metadata = Metadata.from_dict(metadata)
self.user_arguments = user_arguments
self.input_partition_rows = input_partition_rows

Expand Down Expand Up @@ -239,8 +239,9 @@ def optional_fondant_arguments() -> t.List[str]:
def _load_or_create_manifest(self) -> Manifest:
component_id = self.spec.name.lower().replace(" ", "_")
return Manifest.create(
base_path=self.metadata["base_path"],
run_id=self.metadata["run_id"],
pipeline_name=self.metadata.pipeline_name,
base_path=self.metadata.base_path,
run_id=self.metadata.run_id,
component_id=component_id,
)

Expand Down
48 changes: 42 additions & 6 deletions src/fondant/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pkgutil
import types
import typing as t
from dataclasses import asdict, dataclass
from pathlib import Path

import jsonschema.exceptions
Expand Down Expand Up @@ -70,6 +71,26 @@ def fields(self) -> t.Dict[str, Field]:
}


@dataclass
class Metadata:
"""Class representing the Metadata of the manifest."""

base_path: str
pipeline_name: str
run_id: str
component_id: str

def to_dict(self):
return asdict(self)

def to_json(self):
return json.dumps(self.to_dict())

@classmethod
def from_dict(cls, data_dict):
return cls(**data_dict)


class Manifest:
"""
Class representing a Fondant manifest.
Expand Down Expand Up @@ -112,20 +133,31 @@ def retrieve_from_filesystem(uri: str) -> Resource:
raise InvalidManifest.create_from(e)

@classmethod
def create(cls, *, base_path: str, run_id: str, component_id: str) -> "Manifest":
def create(
cls,
*,
pipeline_name: str,
base_path: str,
run_id: str,
component_id: str,
) -> "Manifest":
"""Create an empty manifest.
Args:
pipeline_name: the name of the pipeline
base_path: The base path of the manifest
run_id: The id of the current pipeline run
component_id: The id of the current component being executed
"""
metadata = Metadata(
pipeline_name=pipeline_name,
base_path=base_path,
run_id=run_id,
component_id=component_id,
)

specification = {
"metadata": {
"base_path": base_path,
"run_id": run_id,
"component_id": component_id,
},
"metadata": metadata.to_dict(),
"index": {"location": f"/index/{run_id}/{component_id}"},
"subsets": {},
}
Expand Down Expand Up @@ -166,6 +198,10 @@ def run_id(self) -> str:
def component_id(self) -> str:
return self.metadata["component_id"]

@property
def pipeline_name(self) -> str:
return self.metadata["pipeline_name"]

@property
def index(self) -> Index:
return Index(self._specification["index"], base_path=self.base_path)
Expand Down
23 changes: 15 additions & 8 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This module defines classes to represent a Fondant Pipeline."""
import datetime
import hashlib
import json
import logging
Expand Down Expand Up @@ -57,12 +58,13 @@ def __init__(
node_pool_name: t.Optional[str] = None,
) -> None:
self.component_dir = Path(component_dir)
self.input_partition_rows = input_partition_rows
self.arguments = self._set_arguments(arguments)

self.component_spec = ComponentSpec.from_file(
self.component_dir / self.COMPONENT_SPEC_NAME,
)
self.name = self.component_spec.name.replace(" ", "_").lower()
self.input_partition_rows = input_partition_rows
self.arguments = self._set_arguments(arguments)

self.arguments.setdefault("component_spec", self.component_spec.specification)

self.number_of_gpus = number_of_gpus
Expand Down Expand Up @@ -238,11 +240,9 @@ def add_op(
msg,
)

dependencies_names = [
dependency.component_spec.name for dependency in dependencies
]
dependencies_names = [dependency.name for dependency in dependencies]

self._graph[task.component_spec.name] = {
self._graph[task.name] = {
"fondant_component_op": task,
"dependencies": dependencies_names,
}
Expand Down Expand Up @@ -279,11 +279,17 @@ def _validate_pipeline_name(pipeline_name: str) -> str:
raise InvalidPipelineDefinition(msg)
return pipeline_name

def get_run_id(self) -> str:
"""Get a unique run ID for the pipeline."""
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
return f"{self.name}-{timestamp}"

def validate(self, run_id: str):
"""Sort and run validation on the pipeline definition.
Args:
run_id (str, optional): run identifier. Defaults to None.
run_id: run identifier
"""
self.sort_graph()
self._validate_pipeline_definition(run_id)
Expand All @@ -310,6 +316,7 @@ def _validate_pipeline_definition(self, run_id: str):

# Create initial manifest
manifest = Manifest.create(
pipeline_name=self.name,
base_path=self.base_path,
run_id=run_id,
component_id=load_component_name,
Expand Down
4 changes: 4 additions & 0 deletions src/fondant/schemas/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
"type": "string",
"format": "uri"
},
"pipeline_name": {
"type": "string"
},
"run_id": {
"type": "string"
},
Expand All @@ -18,6 +21,7 @@
},
"required": [
"base_path",
"pipeline_name",
"run_id",
"component_id"
]
Expand Down
3 changes: 2 additions & 1 deletion tests/example_data/manifest.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"metadata": {
"pipeline_name": "test_pipeline",
"base_path": "tests/example_data/subsets_input",
"run_id": "12345",
"run_id": "test_pipeline_12345",
"component_id": "67890"
},
"index": {
Expand Down
Loading

0 comments on commit ea50d75

Please sign in to comment.