Skip to content

Commit

Permalink
Merge pull request #34 from bytewax/examples-docs
Browse files Browse the repository at this point in the history
example for motherduck
  • Loading branch information
lfunderburk authored Nov 19, 2024
2 parents 782ae0f + d7c71e5 commit b4c7ea0
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,4 @@ pyrightconfig.json
# Devenv stuff
.devenv.flake.nix
devenv.*
examples/.env
106 changes: 105 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,111 @@

## bytewax-duckdb

* TODO: Add project documentation
Bytewax sink and operator for DuckDB and MotherDuck

## Installation

We've made installation as easy as Py by making it pip-installable:

```bash
pip install bytewax-duckdb
```

This will also install the latest DuckDB and Bytewax modules.

## Storing data to DuckDB in batches through a Bytewax dataflow

When working with this integration in Bytewax, you can use it to process data in batch and write data to a target database or file in a structured way. However, there’s one essential assumption you need to know, the sink expects data in a specific tuple format, structured as:

```python
("key", List[Dict])
```
Where

`"key"`: The first element is a string identifier for the batch. Think of this as a “batch ID” that helps to organize and keep track of which group of entries belong together. Every batch you send to the sink has a unique key or identifier.

`List[Dict]`: The second element is a list of dictionaries. Each dictionary represents an individual data record, with each key-value pair in the dictionary representing fields and their corresponding values.

Together, the tuple tells the sink: “Here is a batch of data, labeled with a specific key, and this batch contains multiple data entries.”

This format is designed to let the sink write data efficiently in batches, rather than handling each entry one-by-one. By grouping data entries together with an identifier, the sink can:

* Optimize Writing: Batching data reduces the frequency of writes to the database or file, which can dramatically improve performance, especially when processing high volumes of data.

* Ensure Atomicity: By writing a batch as a single unit, we minimize the risk of partial writes, ensuring either the whole batch is written or none at all. This is especially important for maintaining data integrity.

Here is an example for a local DuckDB file.

```python
import bytewax.duckdb.operators as duck_op
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource, run_main

flow = Dataflow("duckdb")


def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
return ("1", {"id": value, "name": "Alice"})


inp = op.input("inp", flow, TestingSource(range(50)))
dict_stream = op.map("dict", inp, create_dict)

duck_op.output(
"out",
dict_stream,
"sample.duckdb",
"example_table",
"CREATE TABLE IF NOT EXISTS example_table (id INTEGER, name TEXT)",
)

run_main(flow)
```

**Important**
To connect to a MotherDuck instance, ensure to [create an account](https://app.motherduck.com/?auth_flow) and [generate a token](https://motherduck.com/docs/key-tasks/authenticating-and-connecting-to-motherduck/authenticating-to-motherduck/#creating-an-access-token). You can store this token into your environment variables.

```python
import os
import random
from typing import Dict, Tuple, Union

# Save the token in an environment variable
md_token = os.getenv("MOTHERDUCK_TOKEN")

# Initialize the dataflow
flow = Dataflow("duckdb-names-cities")

# Define sample data for names and locations
names = ["Alice", "Bob", "Charlie", "Diana", "Eve"]
locations = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]


# Function to create a dictionary with more varied data
def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
name = random.choice(names)
age = random.randint(20, 60) # Random age between 20 and 60
location = random.choice(locations)
return ("batch_1", {"id": value, "name": name, "age": age, "location": location})


# Generate input data
inp = op.input("inp", flow, TestingSource(range(50)))
dict_stream = op.map("dict", inp, create_dict)
db_path = f"md:my_db?motherduck_token={md_token}"
# Output the data to DuckDB, creating a table with multiple columns
duck_op.output(
"out",
dict_stream,
db_path,
"names_cities",
"CREATE TABLE IF NOT EXISTS names_cities (id INTEGER, name TEXT, age INTEGER, location TEXT)",
)

# Run the dataflow
run_main(flow)
```

## Setting up the project for development

Expand Down
55 changes: 55 additions & 0 deletions examples/motherduck_dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from pathlib import Path
from typing import Dict, Tuple, Union

import bytewax.duckdb.operators as duck_op
from bytewax.duckdb import DuckDBSink
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource, run_main

import os
from dotenv import load_dotenv

load_dotenv(".env")

md_token = os.getenv("MOTHERDUCK_TOKEN")
db_path = f"md:my_db?motherduck_token={md_token}"

import bytewax.operators as op
from bytewax.dataflow import Dataflow
from bytewax.testing import run_main, TestingSource
import random
from typing import Dict, Tuple, Union

# Initialize the dataflow
flow = Dataflow("duckdb-names-cities")

# Define sample data for names and locations
names = ["Alice", "Bob", "Charlie", "Diana", "Eve"]
locations = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]


# Function to create a dictionary with more varied data
def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
name = random.choice(names)
age = random.randint(20, 60) # Random age between 20 and 60
location = random.choice(locations)
# The following marks batch '1' to be written to MotherDuck
return ("batch_1", {"id": value, "name": name, "age": age, "location": location})


# Generate input data
inp = op.input("inp", flow, TestingSource(range(50)))
dict_stream = op.map("dict", inp, create_dict)

# Output the data to DuckDB, creating a table with multiple columns
duck_op.output(
"out",
dict_stream,
db_path,
"names_cities",
"CREATE TABLE IF NOT EXISTS names_cities (id INTEGER, name TEXT, age INTEGER, location TEXT)",
)

# Run the dataflow
run_main(flow)
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "bytewax-duckdb"
version = "0.1.21"
description = "Bytewax custom sink for DuckDB"
version = "0.1.22"
description = "Bytewax custom sink for DuckDB and MotherDuck"
readme = "README.md"
requires-python = ">=3.9"
license = {file = "LICENSE.md"}
Expand Down
46 changes: 32 additions & 14 deletions pytests/test_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,38 @@ def create_table_sql() -> str:
return "CREATE TABLE test_table (id INTEGER, name TEXT)"


def test_duckdb_sink(db_path: str, table_name: str) -> None:
def test_duckdb_sink(db_path: Path, table_name: str) -> None:
"""Test that DuckDBSink writes all items correctly."""
flow = Dataflow("duckdb")

def create_dict(value: int) -> Tuple[str, List[Dict[str, Union[int, str]]]]:
return (str(value), [{"id": value, "name": "Alice"}])
return (str(value), [{"id": value, "name": f"Name_{value}"}])

inp = op.input("inp", flow, TestingSource(range(100)))
dict_stream = op.map("dict", inp, create_dict)

op.output(
"out",
dict_stream,
DuckDBSink(
db_path,
str(db_path), # Convert Path to string
table_name,
f"CREATE TABLE {table_name} (id INTEGER, name TEXT)",
f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER, name TEXT)",
),
)
run_main(flow)
conn = duckdb.connect(db_path)
assert conn.sql(f"SELECT COUNT(*) from {table_name}").fetchall() == [(100,)]

conn = duckdb.connect(str(db_path)) # Convert Path to string
result = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()
assert result == (100,)


def test_duckdb_operator(db_path: str, table_name: str) -> None:
def test_duckdb_operator(db_path: Path, table_name: str) -> None:
"""Test that the DuckDB operator appends data correctly across runs."""
flow = Dataflow("duckdb")

def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
return (str(value), {"id": value, "name": "Alice"})
return (str(value), {"id": value, "name": f"Name_{value}"})

inp = op.input("inp", flow, TestingSource(range(100)))
dict_stream = op.map("dict", inp, create_dict)
Expand All @@ -74,17 +79,30 @@ def create_dict(value: int) -> Tuple[str, Dict[str, Union[int, str]]]:
duck_op.output(
"out",
dict_stream,
db_path,
str(db_path), # Convert Path to string
table_name,
f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER, name TEXT)",
)

# First run
run_main(flow)
conn = duckdb.connect(str(db_path)) # Convert Path to string
first_result = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()
assert first_result == (100,)

# Connect to the database and verify the results
conn = duckdb.connect(db_path)
assert conn.sql(f"SELECT COUNT(*) FROM {table_name}").fetchall() == [(100,)]
# Second run: reinitialize the dataflow and append
flow = Dataflow("duckdb")
inp = op.input("inp", flow, TestingSource(range(100)))
dict_stream = op.map("dict", inp, create_dict)

# Run the flow a second time and check if data is appended correctly
duck_op.output(
"out",
dict_stream,
str(db_path), # Convert Path to string
table_name,
f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER, name TEXT)",
)
run_main(flow)
assert conn.sql(f"SELECT COUNT(*) FROM {table_name}").fetchall() == [(200,)]

second_result = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()
assert second_result == (200,)
7 changes: 7 additions & 0 deletions src/bytewax/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ def __init__(
) -> None:
"""Initialize the DuckDB or MotherDuck connection, and create tables if needed.
Note: To connect to a MotherDuck instance, ensure to:
1. Create an account https://app.motherduck.com/?auth_flow=signup
2. Generate a token
https://motherduck.com/docs/key-tasks/authenticating-and-connecting-to-motherduck/authenticating-to-motherduck/)
Args:
db_path (str): Path to the DuckDB database file or MotherDuck
connection string.
Expand All @@ -114,6 +119,8 @@ def __init__(
resume_state (None): Unused, as this sink does not perform recovery.
"""
self.table_name = table_name
# Ensure db_path is a string
db_path = str(db_path) # Convert to string if it's a Path object
parsed_db_path = urlparse(db_path)
path = parsed_db_path.path
config = dict(parse_qsl(parsed_db_path.query))
Expand Down

0 comments on commit b4c7ea0

Please sign in to comment.