Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] [pipes] - Add Databricks integration guide #17114

Merged
merged 12 commits into from
Nov 7, 2023
Original file line number Diff line number Diff line change
@@ -1,6 +1,305 @@
---
title: "Integrating Databricks with Dagster Pipes | Dagster Docs"
description: ""
description: "Learn to integrate Dagster Pipes with Databricks to launch external code from Dagster assets."
---

# Integrating Databricks with Dagster Pipes

In this guide, we’ll show you how to use [Dagster Pipes](/guides/dagster-pipes) with Dagster’s Databricks integration to launch Databricks jobs. You can then send information such as structured metadata and logging back to Dagster, where it will be visible in the Dagster UI.
erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved

<Note>
<strong>Heads up!</strong> This guide focuses on using an out-of-the-box
Databricks resource. For further customization, use the{" "}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Databricks resource" -> "Databricks Pipes client"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it (sort of?) functions like a resource, and is configured like one. Resource is a term Dagster users are already familiar with, whereas this is a new term. Do you feel strongly about changing this?

<a href="/guides/dagster-pipes/customizing-dagster-pipes-protocols">
<code>open_pipes_session</code> approach
</a>{" "}
instead.
</Note>

---

## Prerequisites

To use Dagster Pipes with Databricks, you’ll need:

- **To have the following libraries installed**:

- **Dagster (`dagster`) and the Dagster UI (`dagster-webserver`).** Refer to the [Installation guide](/getting-started/install) for more info.
- **The `dagster-databricks` library**:

```shell
pip install dagster-databricks
```
erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved

- **In Databricks:**
- **An existing Databricks workspace** with an existing project that’s deployed with a Databricks job. If you don’t have this, follow the [Databricks quickstart](https://docs.databricks.com/workflows/jobs/jobs-quickstart.html) to set one up.
- **The following information about your Databricks workspace**:
- `host` - The host address, starting with `https://`, of your Databricks workspace
erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved
- `token` - A personal access token for the Databricks workspace Refer to the Databricks API authentication documentation for more info about retrieving these values.
- **To have the `dagster-pipes` library available in the Databricks environment.**
erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved

---

## Step 1: Create a Databricks job in Dagster

In this step, you’ll create a Dagster asset that, when materialized, opens a Dagster pipes session and launches a Databricks job that executes some external code.

erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved
### Step 1.1: Define a Dagster asset

1. Create a new file named `dagster_databricks_pipes.py` in your Dagster project.

2. Copy and paste the following imports into the file:

```python
# dagster_databricks_pipes.py

from dagster import AssetExecutionContext, Definitions, EnvVar, asset
from dagster_databricks import PipesDatabricksClient
from databricks.sdk.service import jobs
```

3. Next, you’ll define the asset. Copy and paste the following below the imports in `dagster_databricks_pipes.py`:

```python
# dagster_databricks_pipes.py

@asset
def databricks_asset(context: AssetExecutionContext, pipes_databricks: PipesDatabricksClient):
task = jobs.SubmitTask.from_dict({
"new_cluster": { ... },
"libraries": [
# must include dagster-pipes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"must include dagster-pipes" -> "Make the latest version of dagster-pipes on PyPI available in the task environment"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to specify the version here? Or would this automatically use the latest version?

{"pypi": {"package": "dagster-pipes"}},
],
"task_key": "some-key",
"spark_python_task": {
"python_file": "dbfs:/existing_databricks_code.py", # location of target code file
"source": jobs.Source.WORKSPACE,
}
})
```
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're the authority on style consistency so I leave to you, but IMO it's more clear to keep imports and the code using them together in one block/step. In fact I think the whole asset and imports should just be in one coherent code block, then if you want to requote later you do so in a smaller snippet.


Here’s what we did in this example:

- Created an asset named `databricks_asset`
- Provided <PyObject object="AssetExecutionContext" /> as the `context` argument to the asset. This object provides system information such as resources, config, and logging. We’ll come back to this a bit later in this section.
erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved
- Specified a resource for the asset to use, <PyObject module="dagster_databricks" object="PipesDatabricksClient" />. We’ll also come back to this in a little bit.
- Defined a Databricks `SubmitTask` specification in the asset body. When the asset is materialized, this specification will be passed to the Databricks resource (<PyObject module="dagster_databricks" object="PipesDatabricksClient" />), which will use it to launch a Databricks job.
erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved

- `new_cluster` - TODO

- `spark_env_vars` - **Optional.** An object of key-value pairs specifying environment variables to inject into the launched Databricks job.

- `libraries` - An object containing the libraries required to run the specified Python file (`spark_python_task.python_file`). This object must include `dagster-pipes`; for example:

```python
"libraries": [
{
"pypi": {
"package": "dagster-pipes"
}
},
],
...
```

- `spark_python_task` - An object specifying the Python file the job should run, which contains the following properties:
- `python_file` - The URI of the Python file to run, located in DBFS.
- `source` - TODO - is this required?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should attempt to formally document the various fields here. This is part of the Databricks API and has nothing to do with Dagster per se. We might just say something like:

Here we are targeting an existing python script on DBFS. Refer to the Databricks SDK/API documentation for more information on how to specify a Databricks task.

Copy link
Contributor Author

@erinkcochran87 erinkcochran87 Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, about that - did you find good documentation that describes how to do this? What I found was pretty light and didn't totally match up here. I also would prefer not to include these fields, but if we can't point to good documentation, I don't think it's a good experience for users of our integration.


### Step 1.2: Create the Databricks job

In this step, we’ll create the Databricks job and pass in the task specification, as well as our `context` data. The Databricks resource used by the asset contains a `run` method. This method synchronously executes the Databricks job using the task specification you created in [the previous step](#step-11-define-a-dagster-asset).
erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved

When the job is executed, Dagster Pipes will return a <PyObject module="dagster_pipes" object="PipesClientCompletedInvocation" /> object. This object contains a `get_results` method, which you can use to access the results reported by the Databricks job.
erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved

In the body of the `databricks_asset`, add the following:

```python
# dagster_databricks_pipes.py

return pipes_databricks.run(
task=task,
context=context,
).get_results()
```

Let’s take a look at what this code does:

- Submit the job to the Databricks resource (`pipes_databricks`) using the `run` method
erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved
- Provide the `task` specification and asset `context` to the Databricks job
- Use `get_results` to access the results of the execution
- Lastly, return the results of the Databricks job

### Step 1.3: Define the Databricks Pipes resource

The [`dagster-databricks`](/\_apidocs/libraries/dagster-databricks) library provides a <PyObject module="dagster_databricks" object="PipesDatabricksClient" />, which is a pre-built Dagster resource that allows you to quickly get Pipes working with your Databricks workspace.

1. In the imports section of `dagster_databricks_pipes.py`, add the following:

```python
# dagster_databricks_pipes.py

from databricks.sdk import WorkspaceClient
```

2. Next, copy and paste the following to the bottom of `dagster_databricks_pipes.py`:

```python
# dagster_databricks_pipes.py

pipes_databricks_resource = PipesDatabricksClient(
client = WorkspaceClient(
host=EnvVar("DATABRICKS_HOST"),
token=EnvVar("DATABRICKS_TOKEN")
)
)
```

In this example, we created a `pipes_databricks_resource`. The <PyObject module="dagster_databricks" object="PipesDatabricksClient" /> accepts a `client` argument, which provides the information required to connect to a Databricks workspace by using `WorkspaceClient`.
erinkcochran87 marked this conversation as resolved.
Show resolved Hide resolved

### Step 1.4: Update the Definitions object

To wrap up this section, you’ll add the asset and Databricks resource to your project’s code location via the <PyObject object="Definitions" /> object. This makes the resource available to other Dagster definitions in the project.

Copy and paste the following to the bottom of `dagster_databricks_pipes.py`:

```python
# dagster_databricks_pipes.py

defs = Definitions(
assets=[databricks_asset],
resources = {"pipes_databricks": pipes_databricks_resource}
)
```

At this point, `dagster_databricks_pipes.py` should look like the following:

```python
# dagster_databricks_pipes.py

from dagster import AssetExecutionContext, Definitions, EnvVar, asset
from dagster_databricks import PipesDatabricksClient
from databricks.sdk.service import jobs
from databricks.sdk import WorkspaceClient


@asset
def databricks_asset(context: AssetExecutionContext, pipes_databricks: PipesDatabricksClient):
task = jobs.SubmitTask.from_dict({
"new_cluster": { ... },
"libraries": [
# must include dagster-pipes
{"pypi": {"package": "dagster-pipes"}},
],
"task_key": "some-key",
"spark_python_task": {
"python_file": "dbfs:/existing_databricks_code.py", # location of target file
"source": jobs.Source.WORKSPACE,
}
})

return pipes_databricks.run(
task=task, # task specification
context=context, # asset context
).get_results()


pipes_databricks_resource = PipesDatabricksClient(
client = WorkspaceClient(
host=EnvVar("DATABRICKS_HOST"),
token=EnvVar("DATABRICKS_TOKEN")
)
)


defs = Definitions(
assets=[databricks_asset],
resources = {"pipes_databricks": pipes_databricks_resource}
)
```

---

## Step 2: Modify the Databricks code

The next step is to modify the code in Databricks to work with Dagster Pipes. **Note:** The target file (`python_file`) you specified in [Step 1.1](#step-11-create-a-dagster-asset) must already exist in Databricks.

1. In the imports section of the file, add the following:

```python
# existing_databricks_code.py

from dagster_pipes import PipesDbfsContextLoader, PipesDbfsMessageWriter, open_dagster_pipes
```

2. Next, we need to define the communication channels from Dagster Pipes to Databricks and download the context data sent by Dagster. Add the following code below the imports section of the file:

```python
# existing_databricks_code.py

context = open_dagster_pipes(
context_loader=PipesDbfsContextLoader(),
message_writer=PipesDbfsMessageWriter()
)
```

The <PyObject module="dagster_pipes" object="open_dagster_pipes" /> call initializes Dagster Pipes and specifies the context loader and message writer to use. In this case, we’re using <PyObject module="dagster_pipes" object="PipesDbfsContextLoader" /> and <PyObject module="dagster_pipes" object="PipesDbfsMessageWriter" />, which we recommend for use with Databricks.

3. TODO: EXAMPLES BUILT ON MODIFYING EXISTING CODE PR

At this point, your code might look something like the following:

```python
# existing_databricks_code.py

from dagster_pipes import PipesDbfsContextLoader, PipesDbfsMessageWriter, init_dagster_pipes

context = init_dagster_pipes(
context_loader=PipesDbfsContextLoader(),
message_writer=PipesDbfsMessageWriter()
)

# ... remaining code
```

---

## Step 3: Run the Databricks job from the Dagster UI

In this step, you’ll run the Databricks job you created in [Step 1.2](#step-12-create-the-databricks-job) from the Dagster UI.

1. In a new command line session, run the following to start the UI:

```python
dagster dev
```

2. Navigate to [localhost:3000](http://localhost:3000/), where you should see the UI: \[SCREENSHOT]

3. Click **Materialize** near the top right corner of the page.

4. TODO

---

## Related

<ArticleList>
<ArticleListItem
title="Dagster Pipes"
href="/guides/dagster-pipes"
></ArticleListItem>
<ArticleListItem
title="Understanding the Dagster Pipes process"
href="/guides/dagster-pipes/understanding-dagster-pipes"
></ArticleListItem>
<ArticleListItem
title="Customizing Dagster Pipes protocols"
href="/guides/dagster-pipes/customizing-dagster-pipes-protocols"
></ArticleListItem>
<ArticleListItem
title="dagster-databricks API reference"
href="/_apidocs/libraries/dagster-databricks"
></ArticleListItem>
</ArticleList>