Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor pipeline interface #901

Merged
merged 8 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ At a high level, Fondant consists of three main parts:
component type.


* The `/pipeline` directory which contains the modules for implementing a Fondant pipeline.
* `pipeline.py`: Defines the `Pipeline` class which is used to define the pipeline graph and the
pipeline run. The
implemented class is then consumed by the compiler to compile to a specific pipeline runner.
* The `/dataset` directory which contains the modules for implementing a Fondant pipeline.
* `dataset.py`: Defines the `Dataset` class which is used to define the graph. The
implemented class is then consumed by the compiler to compile to a specific runner.
This module also implements the
`ComponentOp` class which is used to define the component operation in the pipeline graph.
* `compiler.py`: Defines the `Compiler` class which is used to define the compiler that
Expand Down
12 changes: 6 additions & 6 deletions docs/components/component_spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ If your dataset has a field called `custom_text` with type `string`, you can map

```python

dataset = pipeline.read(...)
dataset = Dataset.read(...)
dataset = dataset.apply(
"example_component",
consumes={
Expand All @@ -127,7 +127,7 @@ so as follows:

```python

dataset = pipeline.read(...)
dataset = Dataset.read(...)
dataset = dataset.apply(
"example_component",
produces={
Expand Down Expand Up @@ -165,7 +165,7 @@ in the component specification, so we will need to specify the schema of the
fields when defining the components

```python
dataset = pipeline.read(
dataset = Dataset.read(
"load_from_csv",
arguments={
"dataset_uri": "path/to/dataset.csv",
Expand Down Expand Up @@ -196,7 +196,7 @@ by the next component. We can either load the `image` field:

```python

dataset = pipeline.read(
dataset = Dataset.read(
"load_from_csv",
arguments={
"dataset_uri": "path/to/dataset.csv",
Expand All @@ -219,7 +219,7 @@ or the `embedding` field:

```python

dataset = pipeline.read(
dataset = Dataset.read(
"load_from_csv",
arguments={
"dataset_uri": "path/to/dataset.csv",
Expand Down Expand Up @@ -268,7 +268,7 @@ These arguments are passed in when the component is instantiated.
If an argument is not explicitly provided, the default value will be used instead if available.

```python
dataset = pipeline.read(
dataset = Dataset.read(
"custom_component",
arguments={
"custom_argument": "foo"
Expand Down
2 changes: 1 addition & 1 deletion docs/components/lightweight_components.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pipeline = Pipeline(
base_path="./data",
)

dataset = pipeline.read(
dataset = Dataset.read(
ref=CreateData,
)

Expand Down
12 changes: 6 additions & 6 deletions docs/guides/build_a_simple_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pipeline = Pipeline(

??? "View a detailed reference of the options accepted by the `Pipeline` class"

::: fondant.pipeline.Pipeline.__init__
::: fondant.dataset.Dataset.__init__
handler: python
options:
show_source: false
Expand All @@ -69,13 +69,13 @@ As a first step, we want to read data into our pipeline. In this case, we will l
from the HuggingFace Hub. For this, we can use the reusable
[load_from_hf_hub](../components/hub.md#load_from_hugging_face_hub#description) component.

We can read data into our pipeline using the `Pipeline.read()` method, which returns a (lazy)
We can read data into our pipeline using the `Dataset.read()` method, which returns a (lazy)
`Dataset`.

```python
import pyarrow as pa

dataset = pipeline.read(
dataset = Dataset.read(
"load_from_hf_hub",
arguments={
"dataset_name": "fondant-ai/fondant-cc-25m",
Expand All @@ -101,9 +101,9 @@ We provide three arguments to the `.read()` method:
defined in the component [documentation](../components/hub.md#load_from_hugging_face_hub#inputs_outputs) with
`additionalProperties: true` under the produces section.

??? "View a detailed reference of the `Pipeline.read()` method"
??? "View a detailed reference of the `Dataset.read()` method"

::: fondant.pipeline.Pipeline.read
::: fondant.dataset.Dataset.read
handler: python
options:
show_source: false
Expand Down Expand Up @@ -171,7 +171,7 @@ english_images = images.apply(

??? "View a detailed reference of the `Dataset.apply()` method"

::: fondant.pipeline.pipeline.Dataset.apply
::: fondant.dataset.Dataset.apply
handler: python
options:
show_source: false
Expand Down
2 changes: 1 addition & 1 deletion docs/guides/implement_custom_components.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pipeline = Pipeline(
base_path="./data"
)

dataset = pipeline.read(
dataset = Dataset.read(
"load_from_hf_hub",
arguments={
"dataset_name": "fondant-ai/fondant-cc-25m",
Expand Down
48 changes: 20 additions & 28 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
# Pipeline
# Dataset

A Fondant pipeline is a tool for building complex workflows by creating a Directed Acyclic Graph
(DAG) of different components that need to be executed. With Fondant, you can use both reusable
components and custom components, and chain them into a pipeline.
A Fondant Dataset is a checkpoint in a Directed Acyclic Graph
(DAG) of one or more different components that need to be executed. With Fondant, you can use both reusable
components and custom components, and chain them together.

## Composing a pipeline

Start by creating a `pipeline.py` file and adding the following code.
[//]: # (TODO update this section once we have the workspace)
## Composing a Pipeline
mrchtr marked this conversation as resolved.
Show resolved Hide resolved

Start by creating a `pipeline.py` file and adding the following code.
```python
from fondant.pipeline import Pipeline
from fondant.dataset import Dataset

#dataset = Dataset.read(
# ..
#)

pipeline = Pipeline(
name="my-pipeline",
base_path="./data",
)
```

We identify our pipeline with a name and provide a base path where the pipeline will store its
Expand All @@ -30,34 +31,25 @@ The base path can be:
* **A local directory**: only valid for the local runner, points to a local directory. This is
useful for local development.

!!! note "IMPORTANT"

Make sure the provided base_path already exists.

??? "View a detailed reference of the options accepted by the `Pipeline` class"

::: fondant.pipeline.Pipeline.__init__
handler: python
options:
show_source: false

### Adding a load component

You can read data into your pipeline by using the `Pipeline.read()` method with a load component.
You can read data into your pipeline by using the `Dataset.read()` method with a load component.

```python
dataset = pipeline.read(
dataset = Dataset.read(
"load_from_parquet",
arguments={
"dataset_uri": "path/to/dataset",
"n_rows_to_load": 100,
},
)
```
[//]: # (TODO: Add example of init from manifest)

??? "View a detailed reference of the `Pipeline.read()` method"
??? "View a detailed reference of the `Dataset.read()` method"

::: fondant.pipeline.Pipeline.read
::: fondant.dataset.Dataset.read
handler: python
options:
show_source: false
Expand All @@ -68,7 +60,7 @@ graph. It returns a lazy `Dataset` instance which you can use to chain transform
### Adding transform components

```python
from fondant.pipeline import Resources
from fondant.dataset import Resources

dataset = dataset.apply(
"embed_text",
Expand All @@ -90,7 +82,7 @@ can choose the type of GPU as well.

??? "View a detailed reference of the `Dataset.apply()` method"

::: fondant.pipeline.pipeline.Dataset.apply
::: fondant.dataset.Dataset.apply
handler: python
options:
show_source: false
Expand All @@ -112,7 +104,7 @@ dataset = dataset.write(

??? "View a detailed reference of the `Dataset.write()` method"

::: fondant.pipeline.pipeline.Dataset.write
::: fondant.dataset.Dataset.write
handler: python
options:
show_source: false
Expand Down
23 changes: 12 additions & 11 deletions docs/runners/kfp.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,40 @@ Make sure to install Fondant with the Kubeflow runner extra.
pip install fondant[kfp]
```

### Running a pipeline with Kubeflow
### Materialize a dataset with Kubeflow

You will need a Kubeflow cluster to run your pipeline on and specify the host of that cluster. More
You will need a Kubeflow cluster to run your workflow on and specify the host of that cluster. More
info on setting up a Kubeflow pipelines deployment and the host path can be found in
the [kubeflow infrastructure documentation](kfp_infrastructure.md).

=== "Console"

```bash
fondant run kubeflow <pipeline_ref> \
fondant run kubeflow <dataset_ref> \
--working-dir $WORKING_DIR \
--host $KUBEFLOW_HOST
```

The pipeline ref is reference to a fondant pipeline (e.g. `pipeline.py`) where a pipeline instance
The dataset ref is reference to a fondant dataset (e.g. `pipeline.py`) where a dataset instance
exists.


=== "Python"

```python
from fondant.pipeline.compiler import KubeFlowCompiler
from fondant.pipeline.runner import KubeFlowRunner
from fondant.dataset.compiler import KubeFlowCompiler
from fondant.dataset.runner import KubeFlowRunner

compiler= KubeFlowCompiler()
compiler.compile(pipeline=<pipeline_object>)
compiler.compile(dataset=<dataset_object>)

runner = KubeFlowRunner(host=<kubeflow_host>)
runner.run(input_spec=<path_to_compiled_spec>)
runner.run(input_spec=<path_to_compiled_spec>, working_dir=<working_dir>)
```

Once your pipeline is running you can monitor it using the Kubeflow UI.
Once your workflow is running you can monitor it using the Kubeflow UI.

#### Assigning custom resources to the pipeline
#### Assigning custom resources to your run

Each component can optionally be constrained to run on particular node(s) using `node_pool_label`
and `node_pool_name`. You can find these under the Kubernetes labels of your cluster.
Expand All @@ -58,7 +59,7 @@ needs to
have an available GPU.

```python
from fondant.pipeline import Resources
from fondant.dataset import Resources

dataset = dataset.apply(
"...",
Expand Down
Loading
Loading