Skip to content

Commit

Permalink
added example
Browse files Browse the repository at this point in the history
  • Loading branch information
davidvaldez89d committed Oct 12, 2023
1 parent 15b1d30 commit 11f9a3e
Show file tree
Hide file tree
Showing 20 changed files with 920 additions and 21 deletions.
138 changes: 138 additions & 0 deletions GETTING_STARTED.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Getting started guide

The Prefect-dbt-flow library allows you to seamlessly integrate dbt workflows into Prefect. This usage guide will walk you through the steps required to create and manage a Prefect flow for your dbt project.

## Example guide
This guide will walk you through setting up and running a sample Prefect-dbt-flow using Docker Compose. Follow these steps to get started:

### 1. Clone this repository
Clone the Prefect-dbt-flow repository and navigate to the example directory.
```bash
git clone [email protected]:datarootsio/prefect-dbt-flow.git
cd prefect-dbt-flow/example/jaffle_shop
```

### 2. Install Docker Compose
Ensure that you have Docker Compose installed on your system. If you haven't already installed it, refer to the [Docker Compose Installation Guide](https://docs.docker.com/compose/install/) for instructions.

### 3. Start the Docker Container
Start the Docker container by running the following command. This command will launch three services defined in the docker-compose file:
- A PostgreSQL database,
- A Prefect server accessible at: `http://0.0.0.0:4200/`,
- A CLI environment with all the required components installed.
```bash
docker compose up -d
```

### 4. Access the cli service
To access the CLI service, use the following command:
```bash
docker compose run cli
```

### 5. Run the Prefect flow
Inside the CLI environment, run the Prefect-dbt-flow using the following command:
```bash
python my_prefect_dbt_flow
```
This command will execute the Prefect flow and print its status to the terminal.

### 6. View the reseults
To view the results and monitor the flow, follow these steps:

- Open a web browser and go to `http://0.0.0.0:4200/`.
- In the Prefect Server interface, click on the flow run. It should have a name similar to `adjective-animal`.
- From there, you can explore the dbt job DAG and its associated logs.

With these steps, you can set up and run a Prefect-dbt-flow and monitor its progress through the Prefect Server interface.

## Installation
Before using Prefect-dbt-flow, you need to install the library. You can do this using pip:
```shell
pip install prefect-dbt-flow
```
You can install an specific version of **Prefect** if you need to:
```shell
pip install prefect==2.13.5
```

## Creating a Prefect Flow
To get started, you'll need to create a Prefect flow that incorporates your dbt project. Here's a step-by-step guide:
1. **Import the Required Modules:**
Start by importing the necessary modules from prefect_dbt_flow:
```python
from prefect_dbt_flow import dbt_flow
```
2. **Define the Prefect Flow:**
Create a Prefect flow by initializing a `dbtFlow.dbt_flow` object. You can configure it with your dbt project, profile, and additional options:
* **project**: A DbtProject object representing the dbt project configuration.
* **profile**: A DbtProfile object representing the dbt profile configuration.
* **dag_options**: A DbtDagOptions object to specify dbt DAG configurations.
* **flow_kwargs**: A dictionary of Prefect flow arguments.
Here's a basic example of how to use dbt_flow():
```python
my_flow = dbtFlow.dbt_flow(
project=dbtFlow.DbtProject(
name="my_flow",
project_dir="path_to/dbt_project",
profiles_dir="path_to/dbt_profiles",
),
profile=dbtFlow.DbtProfile(
target="dev",
),
dag_options=dbtFlow.DbtDagOptions(
run_test_after_model=True,
),
)
```
With this basic setup, you have created a Prefect flow that manages your dbt project. When you run the script, Prefect will execute the dbt tasks defined in your project.
3. **Run the Flow:**
To execute the Prefect flow, add the following code block:
```python
if __name__ == "__main__":
my_flow()
```
4. **Start the prefect server**
You will need to start prefect before the run
```shell
prefect server start
```
You can check up the dashoard at `http://0.0.0.0:4200`
5. **Running the Prefect Flow:**
To run the Prefect flow, simply execute your Python script:
```shell
python my_prefect_dbt_flow.py
```
Make sure you are in the correct directory or provide the full path to your script. Prefect will execute the dbt tasks defined in your flow, providing orchestration and monitoring capabilities.
6. **See the run**
You will be able to see the results of the run on the prefect dashboard at `http://0.0.0.0:4200`

## Advanced Configuration
In the previous section, you configured your dbt project within the Prefect flow. Here's how you can customize the configuration further:

### Dbt Project Configuration:
You specified the name, project directory, and profiles directory when creating the DbtProject object. Adjust these values to match your dbt project's setup.
- `DbtProject`: Represents your dbt project configuration.
- `name`: Name of the dbt project.
- `project_dir`: Path to the directory containing the project.yml configuration file.
- `profiles_dir`: Path to the directory containing the profiles.yml file.

### Dbt Profile Configuration:
The DbtProfile object allows you to set the target profile for your dbt project. This profile should match the configuration in your dbt profiles.yml file.
- `DbtProfile`: Represents the dbt profile configuration.
- `target`: Specify the dbt target (e.g., "dev" or "prod").

### Dag Options:
The DbtDagOptions object lets you define various options for your dbt workflow. In the provided example, we set run_test_after_model to True, indicating that dbt tests should run after each dbt model.
- `DbtDagOptions`: Allows you to specify dbt DAG configurations.
- `select`: Specify a dbt module to include in the run.
- `exclude`: Specify a dbt module to exclude in the run.
- `run_test_after_model`: Set this to True to run tests after running models.

### Prefect flow configuration
Prefect-dbt-flow integrates with Prefect's monitoring and error handling capabilities. You can use Prefect features like scheduling, notifications, and task retries to monitor and manage your dbt flows effectively. You can pass this additional Prefect flow configuration options using a dictionary into: `flow_kwargs`.

For more information on these features, consult the [Prefect documentation.](https://docs.prefect.io/2.10.12/api-ref/prefect/flows/#prefect.flows.flow)

## Conclusion
Prefect-dbt-flow simplifies the orchestration and management of dbt workflows within a Prefect flow. By following the steps in this guide, you can easily create and execute data pipelines that incorporate dbt projects. Be aware of breaking changes as this library is actively developed, and consult the changelog for updates. Happy data engineering! :rocket:
74 changes: 55 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,58 @@
![dataroots.png](https://dataroots.io/assets/logo/logo-rainbow.png)
[![maintained by dataroots](https://img.shields.io/badge/maintained%20by-dataroots-%2300b189)](https://dataroots.io)
<p align="center">
<a href="https://datarootsio.github.io/prefect-dbt-flow"><img alt="logo" src="https://dataroots.io/assets/logo/logo-rainbow.png"></a>
</p>
<p align="center">
<a href="https://dataroots.io"><img alt="Maintained by dataroots" src="https://dataroots.io/maintained-rnd.svg" /></a>
<a href="https://pypi.org/project/prefect-dbt-flow/"><img alt="Python versions" src="https://img.shields.io/pypi/pyversions/prefect-dbt-flow" /></a>
<a href="https://pypi.org/project/prefect-dbt-flow/"><img alt="PiPy" src="https://img.shields.io/pypi/v/prefect-dbt-flow" /></a>
<a href="https://pepy.tech/project/prefect-dbt-flow"><img alt="Downloads" src="https://pepy.tech/badge/prefect-dbt-flow" /></a>
<a href="https://github.com/psf/black"><img alt="Code style: black" src="https://img.shields.io/badge/code%20style-black-000000.svg" /></a>
<a href="http://mypy-lang.org/"><img alt="Mypy checked" src="https://img.shields.io/badge/mypy-checked-1f5082.svg" /></a>
<!-- <a href="https://pepy.tech/project/prefect-dbt-flow"><img alt="Codecov" src="https://codecov.io/github/datarootsio/databooks/main/graph/badge.svg" /></a>
<a href="https://github.com/datarootsio/databooks/actions"><img alt="test" src="https://github.com/datarootsio/databooks/actions/workflows/test.yml/badge.svg" /></a> -->
</p>

# prefect-dbt-flow
Welcome to the prefect-dbt-flow integration repository! This project aims to provide a seamless integration for simplifying the execution of dbt workflows using Prefect.
Prefect-dbt-flow is a Python library that enables Prefect to convert dbt workflows into independent tasks within a Prefect flow. This integration simplifies the orchestration and execution of dbt models and tests using Prefect, allowing you to build robust data pipelines and monitor your dbt projects efficiently.

## Requirements
Before you get started, make sure you have the following prerequisites installed on your system:
**Active Development Notice:** Prefect-dbt-flow is actively under development and may not be ready for production use. We advise users to be aware of potential breaking changes as the library evolves. Please check the changelog for updates.

- python
- prefect
- dbt
## Table of Contents
- [Introduction](#introduction)
- [Why Use Prefect-dbt-flow?](#why-use-prefect-dbt-flow)
- [How to Install](#how-to-install)
- [Basic Usage](#basic-usage)
- [Inspiration](#inspiration)
- [License](#license)

## Installation
``` bash
pip install prefect-dbt-flow
```
## Introduction
Prefect-dbt-flow is a tool designed to streamline the integration of dbt workflows into Prefect. dbt is an immensely popular tool for building and testing data transformation models, and Prefect is a versatile workflow management system. This integration brings together the best of both worlds, empowering data engineers and analysts to create robust data pipelines.

## Usage
## Why Use Prefect-dbt-flow?
### Simplified Orchestration
With Prefect-dbt-flow, you can orchestrate your dbt workflows with ease. Define and manage your dbt projects and models as Prefect tasks, creating a seamless pipeline for data transformation.

### Create a flow
[Simplified Orchestration]()

``` python
import prefect_dbt_flow as dbtFlow
### Monitoring and Error Handling
Prefect provides extensive monitoring capabilities and error handling. Now, you can gain deep insights into the execution of your dbt workflows and take immediate action in case of issues.

[Monitoring and Error Handling]()

### Workflow Consistency
Ensure your dbt workflows run consistently by managing them through Prefect. This consistency is crucial for maintaining data quality and reliability.

[Workflow Consistency]()

## How to Install
You can install Prefect-dbt-flow via pip:
```shell
pip install prefect-dbt-flow
```
## Basic Usage
Here's an example of how to use Prefect-dbt-flow to create a Prefect flow for your dbt project:
```python
import prefect_dbt_flow as dbtFlow
my_flow = dbtFlow.dbt_flow(
project=dbtFlow.DbtProject(
name="my_flow",
Expand All @@ -36,10 +66,16 @@ my_flow = dbtFlow.dbt_flow(
run_test_after_model=True,
),
)

if __name__ == "__main__":
my_flow()
```
For more information consult the [Getting started guide](GETTING_STARTED.md)

## Inspiration
Prefect-dbt-flow draws inspiration from various projects in the data engineering and workflow orchestration space, including:
- cosmos by astronomer
- anna-geller => prefect-dataplatform
- dbt + Dagster

## License
This project is licensed under the MIT License.
# License
This project is licensed under the MIT License. You are free to use, modify, and distribute this software as per the terms of the license. If you find this project helpful, please consider giving it a star on GitHub.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
image: postgres:15.2-alpine
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_USER=admin
- POSTGRES_PASSWORD=pass123
- POSTGRES_DB=data
expose:
Expand All @@ -23,4 +23,17 @@ services:
- PREFECT_SERVER_API_HOST=0.0.0.0
- PREFECT_API_DATABASE_CONNECTION_URL=sqlite+aiosqlite:////opt/prefect/prefect.db
ports:
- 4200:4200
- 4200:4200

cli:
build: .
entrypoint: "bash"
working_dir: "/root/flows"
volumes:
- "./jaffle_shop:/root/flows"
environment:
- PREFECT_API_URL=http://server:4200/api

networks:
default:
name: prefect-dbt-network
26 changes: 26 additions & 0 deletions examples/jaffle_shop/jaffle_shop/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: 'example_jaffle_shop'

config-version: 2
version: '0.1'

profile: 'example_jaffle_shop'

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
analysis-paths: ["analysis"]
macro-paths: ["macros"]

target-path: "target"
clean-targets:
- "target"
- "dbt_modules"
- "logs"

require-dbt-version: [">=1.0.0", "<2.0.0"]

models:
example_jaffle_shop:
materialized: table
staging:
materialized: view
69 changes: 69 additions & 0 deletions examples/jaffle_shop/jaffle_shop/models/customers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
with customers as (

select * from {{ ref('stg_customers') }}

),

orders as (

select * from {{ ref('stg_orders') }}

),

payments as (

select * from {{ ref('stg_payments') }}

),

customer_orders as (

select
customer_id,

min(order_date) as first_order,
max(order_date) as most_recent_order,
count(order_id) as number_of_orders
from orders

group by customer_id

),

customer_payments as (

select
orders.customer_id,
sum(amount) as total_amount

from payments

left join orders on
payments.order_id = orders.order_id

group by orders.customer_id

),

final as (

select
customers.customer_id,
customers.first_name,
customers.last_name,
customer_orders.first_order,
customer_orders.most_recent_order,
customer_orders.number_of_orders,
customer_payments.total_amount as customer_lifetime_value

from customers

left join customer_orders
on customers.customer_id = customer_orders.customer_id

left join customer_payments
on customers.customer_id = customer_payments.customer_id

)

select * from final
14 changes: 14 additions & 0 deletions examples/jaffle_shop/jaffle_shop/models/docs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% docs orders_status %}

Orders can be one of the following statuses:

| status | description |
|----------------|------------------------------------------------------------------------------------------------------------------------|
| placed | The order has been placed but has not yet left the warehouse |
| shipped | The order has ben shipped to the customer and is currently in transit |
| completed | The order has been received by the customer |
| return_pending | The customer has indicated that they would like to return the order, but it has not yet been received at the warehouse |
| returned | The order has been returned by the customer and received at the warehouse |


{% enddocs %}
Loading

0 comments on commit 11f9a3e

Please sign in to comment.