Skip to content

Commit

Permalink
[docs] [pipes] - Add Databricks integration guide (#17114)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR adds a guide for integrating Dagster Pipes with Databricks.

TODO/?s:

- [x] Finish descriptions of `SubmitTask` spec
- [x] Finish UI section
- [x] Check in code examples
- [x] Check on some of the `PyObjects` - may be out of sync due to
changes in libraries
- [x] Add info about sending data back to Dagster (Step 2)

## How I Tested These Changes

eyes, bk

---------

Co-authored-by: Sean Mackesey <[email protected]>
  • Loading branch information
erinkcochran87 and smackesey committed Nov 7, 2023
1 parent d57b3ad commit 9e08133
Show file tree
Hide file tree
Showing 18 changed files with 770 additions and 257 deletions.
5 changes: 5 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,11 @@
}
]
},
{
"title": "Dagster Pipes + Databricks",
"path": "/guides/dagster-pipes/databricks"

},
{
"title": "Details and customization",
"path": "/guides/dagster-pipes/dagster-pipes-details-and-customization"
Expand Down
391 changes: 391 additions & 0 deletions docs/content/guides/dagster-pipes/databricks.mdx

Large diffs are not rendered by default.

This file was deleted.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions docs/screenshots/guides/dagster-pipes/databricks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- id: run.png
workspace: examples/docs_snippets/docs_snippets/legacy/dagster_pandas_guide/workspace.yaml
steps:
- materialize the `databricks_asset`
- go to the Run page for the launched run

Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# start_databricks_asset
### dagster_databricks_pipes.py

import os
import sys

from dagster_databricks import PipesDatabricksClient

from dagster import AssetExecutionContext, Definitions, EnvVar, asset
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs


@asset
def databricks_asset(
context: AssetExecutionContext, pipes_databricks: PipesDatabricksClient
):
task = jobs.SubmitTask.from_dict(
{
# The cluster settings below are somewhat arbitrary. Dagster Pipes is
# not dependent on a specific spark version, node type, or number of
# workers.
"new_cluster": {
"spark_version": "12.2.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0,
"cluster_log_conf": {
"dbfs": {"destination": "dbfs:/cluster-logs-dir-noexist"},
},
},
"libraries": [
# Include the latest published version of dagster-pipes on PyPI
# in the task environment
{"pypi": {"package": "dagster-pipes"}},
],
"task_key": "some-key",
"spark_python_task": {
"python_file": "dbfs:/my_python_script.py", # location of target code file
"source": jobs.Source.WORKSPACE,
},
}
)

print("This will be forwarded back to Dagster stdout") # noqa: T201
print("This will be forwarded back to Dagster stderr", file=sys.stderr) # noqa: T201

extras = {"some_parameter": 100}

return pipes_databricks.run(
task=task,
context=context,
extras=extras,
).get_materialize_result()


# end_databricks_asset

# start_definitions

pipes_databricks_resource = PipesDatabricksClient(
client=WorkspaceClient(
host=os.getenv("DATABRICKS_HOST"),
token=os.getenv("DATABRICKS_TOKEN"),
)
)

defs = Definitions(
assets=[databricks_asset], resources={"pipes_databricks": pipes_databricks_resource}
)
# end_definitions
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os
import sys

from dagster_databricks import PipesDbfsContextInjector, PipesDbfsMessageReader
from dagster_databricks.pipes import PipesDbfsLogReader

from dagster import AssetExecutionContext, asset, open_pipes_session
from databricks.sdk import WorkspaceClient


@asset
def databricks_asset(context: AssetExecutionContext):
client = WorkspaceClient(
host=os.environ["DATABRICKS_HOST"],
token=os.environ["DATABRICKS_TOKEN"],
)

# Arbitrary json-serializable data you want access to from the `PipesContext`
# in the Databricks runtime. Assume `sample_rate` is a parameter used by
# the target job's business logic.
extras = {"sample_rate": 1.0}

# Sets up Pipes communications channels
with open_pipes_session(
context=context,
extras=extras,
context_injector=PipesDbfsContextInjector(client=client),
message_reader=PipesDbfsMessageReader(
client=client,
# These log readers are optional. If you provide them, then you must set the
# `new_cluster.cluster_log_conf.dbfs.destination` field in the job you submit to a valid
# DBFS path. This will configure Databricks to write stdout/stderr to the specified
# location every 5 minutes. Dagster will poll this location and forward the
# stdout/stderr logs every time they are updated to the orchestration process
# stdout/stderr.
log_readers=[
PipesDbfsLogReader(
client=client, remote_log_name="stdout", target_stream=sys.stdout
),
PipesDbfsLogReader(
client=client, remote_log_name="stderr", target_stream=sys.stderr
),
],
),
) as pipes_session:
##### Option (1)
# NON-STREAMING. Just pass the necessary environment variables down.
# During execution, all reported materializations are buffered on the
# `pipes_session`. Yield them all after Databricks execution is finished.

# Dict[str, str] with environment variables containing Pipes comms info.
env_vars = pipes_session.get_bootstrap_env_vars()

# Some function that handles launching/monitoring of the Databricks job.
# It must ensure that the `env_vars` are set on the executing cluster.
custom_databricks_launch_code(env_vars) # type: ignore # noqa: F821

##### Option (2)
# STREAMING. Pass `pipes_session` down. During execution, you can yield any
# asset materializations that have been reported by calling `
# pipes_session.get_results()` as often as you like. `get_results` returns
# an iterator that your custom code can `yield from` to forward the
# results back to the materialize function. Note you will need to extract
# the env vars by calling `pipes_session.get_pipes_bootstrap_env_vars()`,
# and launch the Databricks job in the same way as with (1).

# The function should return an `Iterator[MaterializeResult]`.
yield from custom_databricks_launch_code(pipes_session) # type: ignore # noqa: F821

# With either option (1) or (2), this is required to yield any remaining
# buffered results.
yield from pipes_session.get_results()
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
### dbfs:/my_python_script.py

# `dagster_pipes` must be available in the databricks python environment
from dagster_pipes import (
PipesDbfsContextLoader,
PipesDbfsMessageWriter,
open_dagster_pipes,
)

# Sets up communication channels and downloads the context data sent from Dagster.
# Note that while other `context_loader` and `message_writer` settings are
# possible, it is recommended to use `PipesDbfsContextLoader` and
# `PipesDbfsMessageWriter` for Databricks.
with open_dagster_pipes(
context_loader=PipesDbfsContextLoader(),
message_writer=PipesDbfsMessageWriter(),
) as pipes:
# Access the `extras` dict passed when launching the job from Dagster.
some_parameter_value = pipes.get_extra("some_parameter")

# Stream log message back to Dagster
pipes.log.info(f"Using some_parameter value: {some_parameter_value}")

# ... your code that computes and persists the asset

# Stream asset materialization metadata and data version back to Dagster.
# This should be called after you've computed and stored the asset value. We
# omit the asset key here because there is only one asset in scope, but for
# multi-assets you can pass an `asset_key` parameter.
pipes.report_asset_materialization(
metadata={
"some_metric": {"raw_value": some_parameter_value + 1, "type": "int"}
},
data_version="alpha",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dagster_pipes import (
PipesDbfsContextLoader,
PipesDbfsMessageWriter,
open_dagster_pipes,
)

# ... existing code

if __name__ == "__main__":
with open_dagster_pipes(
context_loader=PipesDbfsContextLoader(),
message_writer=PipesDbfsMessageWriter(),
) as pipes:
# ... existing logic
pipes.report_asset_materialization(
asset_key="foo",
metadata={"some_key": "some_value"},
data_version="alpha",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import importlib.util
import os
import re

import pytest
from dagster_databricks._test_utils import (
databricks_client,
temp_dbfs_script,
upload_dagster_pipes_whl,
)

IS_BUILDKITE = os.getenv("BUILDKITE") is not None

# If we even try to import the sample code in an environment without Databricks credentials (BK),
# we'll get an error.
if not IS_BUILDKITE:
from dagster._core.definitions.events import AssetKey
from docs_snippets.guides.dagster.dagster_pipes.databricks.databricks_asset_client import (
databricks_asset,
defs as databricks_asset_defs,
)

def _get_databricks_script_path():
db_script_spec = importlib.util.find_spec(
"docs_snippets.guides.dagster.dagster_pipes.databricks.databricks_script"
)
assert db_script_spec and db_script_spec.origin
return db_script_spec.origin

def test_databricks_asset(databricks_client, capsys):
script_file = _get_databricks_script_path()
# with upload_dagster_pipes_whl(databricks_client) as dagster_pipes_whl_path:
with temp_dbfs_script(
databricks_client,
script_file=script_file,
dbfs_path="dbfs:/my_python_script.py",
) as script_file:
job_def = databricks_asset_defs.get_implicit_job_def_for_assets(
[AssetKey("databricks_asset")],
)
assert job_def
result = job_def.execute_in_process()
assert result.success

mats = result.asset_materializations_for_node(databricks_asset.op.name)
assert mats[0].metadata["some_metric"].value == 101
captured = capsys.readouterr()
assert re.search(
r"This will be forwarded back to Dagster stdout\n",
captured.out,
re.MULTILINE,
)
assert re.search(
r"This will be forwarded back to Dagster stderr\n",
captured.err,
re.MULTILINE,
)
1 change: 1 addition & 0 deletions examples/docs_snippets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"dagster-celery",
"dagster-dbt",
"dagster-dask",
"dagster-databricks",
"dagster-duckdb",
"dagster-duckdb-pandas",
"dagster-embedded-elt",
Expand Down
3 changes: 3 additions & 0 deletions examples/docs_snippets/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ deps =
-e ../../python_modules/libraries/dagster-airflow
-e ../../python_modules/libraries/dagster-aws
-e ../../python_modules/libraries/dagster-celery
-e ../../python_modules/libraries/dagster-databricks
-e ../../python_modules/libraries/dagster-dbt
-e ../../python_modules/libraries/dagster-dask
-e ../../python_modules/libraries/dagster-duckdb
Expand All @@ -24,9 +25,11 @@ deps =
-e ../../python_modules/libraries/dagster-k8s
-e ../../python_modules/libraries/dagster-pandas
-e ../../python_modules/libraries/dagster-postgres
-e ../../python_modules/libraries/dagster-pyspark
-e ../../python_modules/libraries/dagster-slack
-e ../../python_modules/libraries/dagster-gcp-pandas
-e ../../python_modules/libraries/dagster-gcp-pyspark
-e ../../python_modules/libraries/dagster-spark
-e ../../python_modules/libraries/dagster-snowflake
-e ../../python_modules/libraries/dagster-snowflake-pandas
-e ../../python_modules/libraries/dagster-snowflake-pyspark
Expand Down
Loading

1 comment on commit 9e08133

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-okaflfjbw-elementl.vercel.app

Built with commit 9e08133.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.