Skip to content

Commit

Permalink
add example with custom blueprints
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jun 21, 2024
1 parent 5c7c42e commit 9ffbc7f
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .definitions import (
defs as defs,
loader as loader,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
type: curl_asset
table_name: cereals
csv_url: https://raw.githubusercontent.com/dagster-io/dagster/master/docs/next/public/assets/cereal.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
type: curl_asset
table_name: customers
csv_url: https://raw.githubusercontent.com/dagster-io/dagster/master/docs/next/public/assets/customers.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import tempfile
from pathlib import Path
from subprocess import Popen
from typing import Literal

from dagster import asset
from dagster_blueprints import YamlBlueprintsLoader
from dagster_blueprints.blueprint import Blueprint, BlueprintDefinitions


def local_csv_to_snowflake(table_name: str, csv_path: Path) -> None:
"""Left as an exercise to the reader."""


def curl_csv_to_snowflake(table_name: str, csv_url: str) -> None:
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir) / "file.csv"
command = ["curl", "-o", str(path), csv_url]
process = Popen(command)
process.wait()

if process.returncode != 0:
raise ValueError(f"External execution process failed with code {process.returncode}")

local_csv_to_snowflake(table_name, path)


class CurlCsvSnowflakeAssetBlueprint(Blueprint):
"""Blueprint for an asset in Snowflake, populated from a CSV file on the internet."""

type: Literal["curl_asset"]
csv_url: str
table_name: str

def build_defs(self) -> BlueprintDefinitions:
@asset(key=self.table_name)
def _asset():
curl_csv_to_snowflake(table_name=self.table_name, csv_url=self.csv_url)

return BlueprintDefinitions(assets=[_asset])


loader = YamlBlueprintsLoader(
per_file_blueprint_type=CurlCsvSnowflakeAssetBlueprint,
path=Path(__file__).parent / "curl_assets",
)
defs = loader.load_defs()
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "custom_blueprints"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[metadata]
name = custom_blueprints
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from setuptools import find_packages, setup

setup(
name="custom-blueprints",
packages=find_packages(exclude=["custom-blueprints"]),
install_requires=[
"dagster",
"dagster-blueprints",
"dagster-webserver",
],
extras_require={"dev": ["pytest"]},
)

0 comments on commit 9ffbc7f

Please sign in to comment.