Skip to content

Commit

Permalink
Merge pull request #901 from ml6team/feature/refactore-pipeline-inter…
Browse files Browse the repository at this point in the history
…face

Refactor pipeline interface to dataset first interface.
  • Loading branch information
mrchtr authored Apr 5, 2024
2 parents e62e1b0 + a6561be commit d9b5496
Show file tree
Hide file tree
Showing 56 changed files with 1,286 additions and 994 deletions.
7 changes: 3 additions & 4 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ At a high level, Fondant consists of three main parts:
component type.


* The `/pipeline` directory which contains the modules for implementing a Fondant pipeline.
* `pipeline.py`: Defines the `Pipeline` class which is used to define the pipeline graph and the
pipeline run. The
implemented class is then consumed by the compiler to compile to a specific pipeline runner.
* The `/dataset` directory which contains the modules for implementing a Fondant pipeline.
* `dataset.py`: Defines the `Dataset` class which is used to define the graph. The
implemented class is then consumed by the compiler to compile to a specific runner.
This module also implements the
`ComponentOp` class which is used to define the component operation in the pipeline graph.
* `compiler.py`: Defines the `Compiler` class which is used to define the compiler that
Expand Down
12 changes: 6 additions & 6 deletions docs/components/component_spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ If your dataset has a field called `custom_text` with type `string`, you can map

```python
dataset = pipeline.read(...)
dataset = Dataset.read(...)
dataset = dataset.apply(
"example_component",
consumes={
Expand All @@ -127,7 +127,7 @@ so as follows:

```python
dataset = pipeline.read(...)
dataset = Dataset.read(...)
dataset = dataset.apply(
"example_component",
produces={
Expand Down Expand Up @@ -165,7 +165,7 @@ in the component specification, so we will need to specify the schema of the
fields when defining the components

```python
dataset = pipeline.read(
dataset = Dataset.read(
"load_from_csv",
arguments={
"dataset_uri": "path/to/dataset.csv",
Expand Down Expand Up @@ -196,7 +196,7 @@ by the next component. We can either load the `image` field:

```python
dataset = pipeline.read(
dataset = Dataset.read(
"load_from_csv",
arguments={
"dataset_uri": "path/to/dataset.csv",
Expand All @@ -219,7 +219,7 @@ or the `embedding` field:

```python
dataset = pipeline.read(
dataset = Dataset.read(
"load_from_csv",
arguments={
"dataset_uri": "path/to/dataset.csv",
Expand Down Expand Up @@ -268,7 +268,7 @@ These arguments are passed in when the component is instantiated.
If an argument is not explicitly provided, the default value will be used instead if available.

```python
dataset = pipeline.read(
dataset = Dataset.read(
"custom_component",
arguments={
"custom_argument": "foo"
Expand Down
2 changes: 1 addition & 1 deletion docs/components/lightweight_components.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pipeline = Pipeline(
base_path="./data",
)

dataset = pipeline.read(
dataset = Dataset.read(
ref=CreateData,
)

Expand Down
12 changes: 6 additions & 6 deletions docs/guides/build_a_simple_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pipeline = Pipeline(

??? "View a detailed reference of the options accepted by the `Pipeline` class"

::: fondant.pipeline.Pipeline.__init__
::: fondant.dataset.Dataset.__init__
handler: python
options:
show_source: false
Expand All @@ -69,13 +69,13 @@ As a first step, we want to read data into our pipeline. In this case, we will l
from the HuggingFace Hub. For this, we can use the reusable
[load_from_hf_hub](../components/hub.md#load_from_hugging_face_hub#description) component.

We can read data into our pipeline using the `Pipeline.read()` method, which returns a (lazy)
We can read data into our pipeline using the `Dataset.read()` method, which returns a (lazy)
`Dataset`.

```python
import pyarrow as pa

dataset = pipeline.read(
dataset = Dataset.read(
"load_from_hf_hub",
arguments={
"dataset_name": "fondant-ai/fondant-cc-25m",
Expand All @@ -101,9 +101,9 @@ We provide three arguments to the `.read()` method:
defined in the component [documentation](../components/hub.md#load_from_hugging_face_hub#inputs_outputs) with
`additionalProperties: true` under the produces section.

??? "View a detailed reference of the `Pipeline.read()` method"
??? "View a detailed reference of the `Dataset.read()` method"

::: fondant.pipeline.Pipeline.read
::: fondant.dataset.Dataset.read
handler: python
options:
show_source: false
Expand Down Expand Up @@ -171,7 +171,7 @@ english_images = images.apply(

??? "View a detailed reference of the `Dataset.apply()` method"

::: fondant.pipeline.pipeline.Dataset.apply
::: fondant.dataset.Dataset.apply
handler: python
options:
show_source: false
Expand Down
2 changes: 1 addition & 1 deletion docs/guides/implement_custom_components.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pipeline = Pipeline(
base_path="./data"
)

dataset = pipeline.read(
dataset = Dataset.read(
"load_from_hf_hub",
arguments={
"dataset_name": "fondant-ai/fondant-cc-25m",
Expand Down
48 changes: 20 additions & 28 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
# Pipeline
# Dataset

A Fondant pipeline is a tool for building complex workflows by creating a Directed Acyclic Graph
(DAG) of different components that need to be executed. With Fondant, you can use both reusable
components and custom components, and chain them into a pipeline.
A Fondant Dataset is a checkpoint in a Directed Acyclic Graph
(DAG) of one or more different components that need to be executed. With Fondant, you can use both reusable
components and custom components, and chain them together.

## Composing a pipeline

Start by creating a `pipeline.py` file and adding the following code.
[//]: # (TODO update this section once we have the workspace)
## Composing a Pipeline

Start by creating a `pipeline.py` file and adding the following code.
```python
from fondant.pipeline import Pipeline
from fondant.dataset import Dataset

#dataset = Dataset.read(
# ..
#)

pipeline = Pipeline(
name="my-pipeline",
base_path="./data",
)
```

We identify our pipeline with a name and provide a base path where the pipeline will store its
Expand All @@ -30,34 +31,25 @@ The base path can be:
* **A local directory**: only valid for the local runner, points to a local directory. This is
useful for local development.

!!! note "IMPORTANT"

Make sure the provided base_path already exists.

??? "View a detailed reference of the options accepted by the `Pipeline` class"

::: fondant.pipeline.Pipeline.__init__
handler: python
options:
show_source: false

### Adding a load component

You can read data into your pipeline by using the `Pipeline.read()` method with a load component.
You can read data into your pipeline by using the `Dataset.read()` method with a load component.

```python
dataset = pipeline.read(
dataset = Dataset.read(
"load_from_parquet",
arguments={
"dataset_uri": "path/to/dataset",
"n_rows_to_load": 100,
},
)
```
[//]: # (TODO: Add example of init from manifest)

??? "View a detailed reference of the `Pipeline.read()` method"
??? "View a detailed reference of the `Dataset.read()` method"

::: fondant.pipeline.Pipeline.read
::: fondant.dataset.Dataset.read
handler: python
options:
show_source: false
Expand All @@ -68,7 +60,7 @@ graph. It returns a lazy `Dataset` instance which you can use to chain transform
### Adding transform components

```python
from fondant.pipeline import Resources
from fondant.dataset import Resources

dataset = dataset.apply(
"embed_text",
Expand All @@ -90,7 +82,7 @@ can choose the type of GPU as well.

??? "View a detailed reference of the `Dataset.apply()` method"

::: fondant.pipeline.pipeline.Dataset.apply
::: fondant.dataset.Dataset.apply
handler: python
options:
show_source: false
Expand All @@ -112,7 +104,7 @@ dataset = dataset.write(

??? "View a detailed reference of the `Dataset.write()` method"

::: fondant.pipeline.pipeline.Dataset.write
::: fondant.dataset.Dataset.write
handler: python
options:
show_source: false
Expand Down
23 changes: 12 additions & 11 deletions docs/runners/kfp.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,40 @@ Make sure to install Fondant with the Kubeflow runner extra.
pip install fondant[kfp]
```

### Running a pipeline with Kubeflow
### Materialize a dataset with Kubeflow

You will need a Kubeflow cluster to run your pipeline on and specify the host of that cluster. More
You will need a Kubeflow cluster to run your workflow on and specify the host of that cluster. More
info on setting up a Kubeflow pipelines deployment and the host path can be found in
the [kubeflow infrastructure documentation](kfp_infrastructure.md).

=== "Console"

```bash
fondant run kubeflow <pipeline_ref> \
fondant run kubeflow <dataset_ref> \
--working-dir $WORKING_DIR \
--host $KUBEFLOW_HOST
```

The pipeline ref is reference to a fondant pipeline (e.g. `pipeline.py`) where a pipeline instance
The dataset ref is reference to a fondant dataset (e.g. `pipeline.py`) where a dataset instance
exists.


=== "Python"

```python
from fondant.pipeline.compiler import KubeFlowCompiler
from fondant.pipeline.runner import KubeFlowRunner
from fondant.dataset.compiler import KubeFlowCompiler
from fondant.dataset.runner import KubeFlowRunner

compiler= KubeFlowCompiler()
compiler.compile(pipeline=<pipeline_object>)
compiler.compile(dataset=<dataset_object>)

runner = KubeFlowRunner(host=<kubeflow_host>)
runner.run(input_spec=<path_to_compiled_spec>)
runner.run(input_spec=<path_to_compiled_spec>, working_dir=<working_dir>)
```

Once your pipeline is running you can monitor it using the Kubeflow UI.
Once your workflow is running you can monitor it using the Kubeflow UI.

#### Assigning custom resources to the pipeline
#### Assigning custom resources to your run

Each component can optionally be constrained to run on particular node(s) using `node_pool_label`
and `node_pool_name`. You can find these under the Kubernetes labels of your cluster.
Expand All @@ -58,7 +59,7 @@ needs to
have an available GPU.

```python
from fondant.pipeline import Resources
from fondant.dataset import Resources

dataset = dataset.apply(
"...",
Expand Down
Loading

0 comments on commit d9b5496

Please sign in to comment.