Skip to content

Commit

Permalink
move some docs to operate section
Browse files Browse the repository at this point in the history
Signed-off-by: nikki everett <[email protected]>
  • Loading branch information
neverett committed Dec 19, 2024
1 parent 84f7211 commit 03b919f
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 1 deletion.
86 changes: 86 additions & 0 deletions docs/docs-beta/docs/guides/operate/io-managers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
---
title: "Managing stored data with I/O managers"
---

I/O managers in Dagster allow you to keep the code for data processing separate from the code for reading and writing data. This reduces repetitive code and makes it easier to change where your data is stored.

In many Dagster pipelines, assets can be broken down as the following steps:

1. Reading data a some data store into memory
2. Applying in-memory transform
3. Writing the transformed data to a data store

For assets that follow this pattern, an I/O manager can streamline the code that handles reading and writing data to and from a source.

<details>
<summary>Prerequisites</summary>

To follow the steps in this guide, you'll need familiarity with:

- [Assets](/guides/build/assets-concepts/index.mdx
- [Resources](/guides/build/configure/resources)
</details>

## Before you begin

**I/O managers aren't required to use Dagster, nor are they the best option in all scenarios.** If you find yourself writing the same code at the start and end of each asset to load and store data, an I/O manager may be useful. For example:

- You have assets that are stored in the same location and follow a consistent set of rules to determine the storage path
- You have assets that are stored differently in local, staging, and production environments
- You have assets that load upstream dependencies into memory to do the computation

**I/O managers may not be the best fit if:**

- You want to run SQL queries that create or update a table in a database
- Your pipeline manages I/O on its own by using other libraries/tools that write to storage
- Your assets won't fit in memory, such as a database table with billions of rows

As a general rule, if your pipeline becomes more complicated in order to use I/O managers, it's likely that I/O managers aren't a good fit. In these cases you should use `deps` to [define dependencies](/guides/build/assets-concepts/asset-dependencies).

## Using I/O managers in assets \{#io-in-assets}

Consider the following example, which contains assets that construct a DuckDB connection object, read data from an upstream table, apply some in-memory transform, and write the result to a new table in DuckDB:

<CodeExample filePath="guides/external-systems/assets-without-io-managers.py" language="python" />

Using an I/O manager would remove the code that reads and writes data from the assets themselves, instead delegating it to the I/O manager. The assets would be left only with the code that applies transformations or retrieves the initial CSV file.

<CodeExample filePath="guides/external-systems/assets-with-io-managers.py" language="python" />

To load upstream assets using an I/O manager, specify the asset as an input parameter to the asset function. In this example, the `DuckDBPandasIOManager` I/O manager will read the DuckDB table with the same name as the upstream asset (`raw_sales_data`) and pass the data to `clean_sales_data` as a Pandas DataFrame.

To store data using an I/O manager, return the data in the asset function. The returned data must be a valid type. This example uses Pandas DataFrames, which the `DuckDBPandasIOManager` will write to a DuckDB table with the same name as the asset.

Refer to the individual I/O manager documentation for details on valid types and how they store data.

## Swapping data stores \{#swap-data-stores}

With I/O managers, swapping data stores consists of changing the implementation of the I/O manager. The asset definitions, which only contain transformational logic, won't need to change.

In the following example, a Snowflake I/O manager replaced the DuckDB I/O manager.

<CodeExample filePath="guides/external-systems/assets-with-snowflake-io-manager.py" language="python" />

## Built-in I/O managers \{#built-in}

Dagster offers built-in library implementations for I/O managers for popular data stores and in-memory formats.

| Name | Description |
| ------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------- |
| <PyObject module="dagster" object="FilesystemIOManager" /> | Default I/O manager. Stores outputs as pickle files on the local file system. |
| <PyObject module="dagster" object="InMemoryIOManager" /> | Stores outputs in memory. Primarily useful for unit testing. |
| <PyObject module="dagster_aws.s3" object="S3PickleIOManager" /> | Stores outputs as pickle files in Amazon Web Services S3. |
| <PyObject module="dagster_azure.adls2" object="ConfigurablePickledObjectADLS2IOManager" /> | Stores outputs as pickle files in Azure ADLS2. |
| <PyObject module="dagster_gcp" object="GCSPickleIOManager" /> | Stores outputs as pickle files in Google Cloud Platform GCS. |
| <PyObject module="dagster_gcp_pandas" object="BigQueryPandasIOManager" /> | Stores Pandas DataFrame outputs in Google Cloud Platform BigQuery. |
| <PyObject module="dagster_gcp_pyspark" object="BigQueryPySparkIOManager" /> | Stores PySpark DataFrame outputs in Google Cloud Platform BigQuery. |
| <PyObject module="dagster_snowflake_pandas" object="SnowflakePandasIOManager" /> | Stores Pandas DataFrame outputs in Snowflake. |
| <PyObject module="dagster_snowflake_pyspark" object="SnowflakePySparkIOManager" /> | Stores PySpark DataFrame outputs in Snowflake. |
| <PyObject module="dagster_duckdb_pandas" object="DuckDBPandasIOManager" /> | Stores Pandas DataFrame outputs in DuckDB. |
| <PyObject module="dagster_duckdb_pyspark" object="DuckDBPySparkIOManager" /> | Stores PySpark DataFrame outputs in DuckDB. |
| <PyObject module="dagster_duckdb_polars" object="DuckDBPolarsIOManager" /> | Stores Polars DataFrame outputs in DuckDB. | |

## Next steps

- Learn to [connect databases](/guides/build/configure/databases) with resources
- Learn to [connect APIs](/guides/build/configure/apis) with resources
172 changes: 172 additions & 0 deletions docs/docs-beta/docs/guides/operate/managing-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
---
title: Managing concurrency of Dagster assets, jobs, and Dagster instances
sidebar_label: Managing concurrency
description: How to limit the number of runs a job, or assets for an instance of Dagster.
sidebar_position: 900
---

You often want to control the number of concurrent runs for a Dagster job, a specific asset, or for a type of asset or job. Limiting concurrency in your data pipelines can help prevent performance problems and downtime.


<details>
<summary>Prerequisites</summary>

- Familiarity with [Assets](/guides/build/assets-concepts/index.mdx
- Familiarity with [Jobs and Ops](/guides/build/ops-jobs)
</details>



## Limit how many jobs can be running at the same time


* Dagster Core, add the following to your [dagster.yaml](/todo)
* In Dagster+, add the following to your [deployment settings](/dagster-plus/deployment/management/settings/deployment-settings)

```yaml
run_queue:
max_concurrent_runs: 15
```
<CodeExample filePath="guides/tbd/concurrency-global.py" language="python" title="Global concurrency limits" />
## Limit how many ops or assets can be running at the same time
You can control the number of assets or ops that are running concurrently within a job using the `config` argument of `dg.define_asset_job()` or `dg.@job()` for ops.

<Tabs>
<TabItem value="Assets" label="Asset job">
<CodeExample filePath="guides/tbd/concurrency-job-asset.py" language="python" title="Asset concurrency limits in a job" />

</TabItem>

<TabItem value="Ops" label="Op job">
<CodeExample filePath="guides/tbd/concurrency-job-op.py" language="python" title="Op concurrency limits in a job" />

</TabItem>
</Tabs>


## Limit how many of a certain type of op or asset can run across all runs

You can a limit for all ops or assets with a specific tag key or key-value pair. Ops or assets above that limit will be queued. Use `tag_concurrency_limits` in the job's config, either in Python or using the Launchpad in the Dagster UI.

For example, you might want to limit the number of ops or assets that are running with a key of `database` across all runs (to limit the load on that database).

:::warning
This feature is experimental and is only supported with Postgres/MySQL storage.
:::


```yaml
# dagster.yaml for Dagster Core; Deployment Settings for Dagster+
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
tag_concurrency_limits:
- key: "dagster/concurrency_key"
value: "database"
limit: 1
```

To specify a global concurrency limit using the CLI, use:

```
dagster instance concurrency set database 1
```

A default concurrency limit can be configured for the instance, for any concurrency keys that don't have an explicit limit set:

* Dagster+: Use the Dagster+ UI or the dagster-cloud CLI
* Dagster Open Source: Use your instance's dagster.yaml

To enable this default value, use `concurrency.default_op_concurrency_limit`. For example, the following would set the default concurrency value for the deployment to 1:
```yaml
concurrency:
default_op_concurrency_limit: 1
```

<Tabs>
<TabItem value="Asset Tag" label="Asset tag concurrency limits">
<CodeExample filePath="guides/tbd/concurrency-tag-key-asset.py" language="python" title="No more than 1 asset running with a tag of 'database' across all runs" />

</TabItem>
<TabItem value="Op Tag" label="Op tag concurrency limits">
<CodeExample filePath="guides/tbd/concurrency-tag-key-op.py" language="python" title="No more than 1 op running with a tag of 'database' across all runs" />

</TabItem>
</Tabs>

You can also limit concurrency for a tag within the job definition, for example to limit the number of specific assets running at the same time *within* that run.

<Tabs>
<TabItem value="Asset Tag with Job" label="Asset tag concurrency limits in a run">
<CodeExample filePath="guides/tbd/concurrency-tag-key-job-asset.py" language="python" title="No more than 1 asset running with a tag of 'database' within a run" />

</TabItem>
<TabItem value="Op Tag with Job" label="Op tag concurrency limits in a run">
<CodeExample filePath="guides/tbd/concurrency-tag-key-job-op.py" language="python" title="No more than 1 op running with a tag of 'database' within a run" />
</TabItem>
</Tabs>


## Override job level concurrency in the Launchpad

You can override the default job-level settings, such as the value of the `max_concurrent` key for a job, by launching a job in the Launchpad in the Dagster UI.

Need screenshot here

## Prevent runs from starting if another run is already occurring (advanced)

You can use Dagster's rich metadata to use a schedule or a sensor to only start a run when there are no currently running jobs.

<CodeExample filePath="guides/tbd/concurrency-no-more-than-1-job.py" language="python" title="No more than 1 running job from a schedule" />


## Troubleshooting

When limiting concurrency, you might run into some issues until you get the configuration right.

### Runs going to STARTED status and skipping QUEUED

:::info
This only applies to Dagster Open Source.
:::

The `run_queue` key may not be set in your instance's settings. In the Dagster UI, navigate to Deployment > Configuration and verify that the `run_queue` key is set.

### Runs remaining in QUEUED status

The possible causes for runs remaining in `QUEUED` status depend on whether you're using Dagster+ or Dagster Open Source.

<Tabs>
<TabItem value="Dagster+" label="Dagster+">
If runs aren't being dequeued in Dagster+, the root causes could be:
* **If using a [hybrid deployment](/dagster-plus/deployment/deployment-types/hybrid)**, the agent serving the deployment may be down. In this situation, runs will be paused.
* **Dagster+ is experiencing downtime**. Check the [status page](https://dagstercloud.statuspage.io/) for the latest on potential outages.

</TabItem>
<TabItem value="Dagster Open Source" label="Dagster Open Source">
If runs aren't being dequeued in Dagster Open Source, the root cause is likely an issue with the Dagster daemon or the run queue configuration.

#### Troubleshoot the Dagster daemon

* **Verify the Dagster daemon is set up and running.** In the Dagster UI, navigate to **Deployment > Daemons** and verify that the daemon is running. The **Run queue** should also be running. If you used [dagster dev](/guides/deploy/deployment-options/running-local-ui-development) to start the Dagster UI, the daemon should have been started for you. If the daemon isn't running, proceed to step 2.
* **Verify the Dagster daemon can access the same storage as the Dagster webserver process.** Both the webserver process and the Dagster daemon should access the same storage, meaning they should use the same `dagster.yaml`. Locally, this means both processes should have the same set `DAGSTER_HOME` environment variable. If you used dagster dev to start the Dagster UI, both processes should be using the same storage. Refer to the [Dagster Instance docs](/todo) for more information.

#### Troubleshoot the run queue configuration
If the daemon is running, runs may intentionally be left in the queue due to concurrency rules. To investigate:
* **Check the output logged from the daemon process**, as this will include skipped runs.
* **Check the max_concurrent_runs setting in your instance's dagster.yaml**. If set to 0, this may block the queue. You can check this setting in the Dagster UI by navigating to Deployment > Configuration and locating the run_queue.max_concurrent_runs setting. Refer to the Limiting overall runs section for more info.
* **Check the state of your run queue**. In some cases, the queue may be blocked by some number of in-progress runs. To view the status of your run queue, click **Runs** in the top navigation of the Dagster UI and then open the **Queued** and **In Progress** tabs.

If there are queued or in-progress runs blocking the queue, you can terminate them to allow other runs to proceed.
</TabItem>
</Tabs>


## Next Steps

Read more in the [concurrency configuration reference](/todo).
2 changes: 1 addition & 1 deletion docs/docs-beta/docs/guides/operate/run-executors.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: Run executors
title: About run executors
sidebar_position: 100
unlisted: true
---

0 comments on commit 03b919f

Please sign in to comment.