Skip to content

Commit

Permalink
wip docs
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Jan 16, 2025
1 parent 9fa0bba commit c15ed10
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 117 deletions.
116 changes: 49 additions & 67 deletions docs/docs-beta/docs/guides/operate/managing-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,106 +14,88 @@ This article assumes familiarity with [assets](/guides/build/assets/) and [jobs]

:::

## Limit how many jobs can be running at the same time

## Limit the number of total runs that can be in progress at the same time

* Dagster Core, add the following to your [dagster.yaml](/guides/deploy/dagster-yaml)
* In Dagster+, add the following to your [deployment settings](/dagster-plus/deployment/management/settings/deployment-settings)

```yaml
run_queue:
max_concurrent_runs: 15
concurrency:
runs:
max_concurrent_runs: 15
```
## Limit the number of runs that can be in progress for a set of ops
<CodeExample filePath="guides/tbd/concurrency-global.py" language="python" title="Global concurrency limits" />
## Limit how many ops or assets can be running at the same time
You can control the number of assets or ops that are running concurrently within a job using the `config` argument of `dg.define_asset_job()` or `dg.@job()` for ops.

<Tabs>
<TabItem value="Assets" label="Asset job">
<CodeExample filePath="guides/tbd/concurrency-job-asset.py" language="python" title="Asset concurrency limits in a job" />
You can assign assets and ops to concurrency pools which allow you to limit the number of in progress runs containing those assets or ops. You first assign your asset or op to a concurrency pool using the `pool` keyword argument.

</TabItem>
<CodeExample filePath="guides/tbd/concurrency-pool-api.py" language="python" title="Specifying pools on assets and ops" />

<TabItem value="Ops" label="Op job">
<CodeExample filePath="guides/tbd/concurrency-job-op.py" language="python" title="Op concurrency limits in a job" />
Once you have assigned your assets and ops to a concurrency pool, you can configure a pool limit for that pool in your deployment by using the Dagster UI or by using the Dagster CLI.

Check failure on line 34 in docs/docs-beta/docs/guides/operate/managing-concurrency.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Vale.Terms] Use 'cli' instead of 'CLI'. Raw Output: {"message": "[Vale.Terms] Use 'cli' instead of 'CLI'.", "location": {"path": "docs/docs-beta/docs/guides/operate/managing-concurrency.md", "range": {"start": {"line": 34, "column": 179}}}, "severity": "ERROR"}

</TabItem>
</Tabs>
To specify a limit for the pool "database" using the UI, navigate to the `Deployments` &rarr; `Concurrency` settings page and click the `Add pool limit` button:

Need screenshot here

## Limit how many of a certain type of op or asset can run across all runs
To specify a limit for the pool "database" using the CLI, use:

Check failure on line 40 in docs/docs-beta/docs/guides/operate/managing-concurrency.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Vale.Terms] Use 'cli' instead of 'CLI'. Raw Output: {"message": "[Vale.Terms] Use 'cli' instead of 'CLI'.", "location": {"path": "docs/docs-beta/docs/guides/operate/managing-concurrency.md", "range": {"start": {"line": 40, "column": 54}}}, "severity": "ERROR"}

You can a limit for all ops or assets with a specific tag key or key-value pair. Ops or assets above that limit will be queued. Use `tag_concurrency_limits` in the job's config, either in Python or using the Launchpad in the Dagster UI.
```
dagster instance concurrency set database 1
```

For example, you might want to limit the number of ops or assets that are running with a key of `database` across all runs (to limit the load on that database).
### Setting a default limit for concurrency pools

:::warning
This feature is experimental and is only supported with Postgres/MySQL storage.
:::
Instead of adding a limit for every individual pool, you can specify a default limit for pools in your deployment settings.

* Dagster Core, add the following to your [dagster.yaml](/todo)
* In Dagster+, add the following to your [deployment settings](/dagster-plus/deployment/deployment-settings)

```yaml
# dagster.yaml for Dagster Core; Deployment Settings for Dagster+
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
tag_concurrency_limits:
- key: "dagster/concurrency_key"
value: "database"
limit: 1
concurrency:
pools:
default_limit: 1
```

To specify a global concurrency limit using the CLI, use:

```
dagster instance concurrency set database 1
```

A default concurrency limit can be configured for the instance, for any concurrency keys that don't have an explicit limit set:
## Limit the number of runs by run tag

* Dagster+: Use the Dagster+ UI or the dagster-cloud CLI
* Dagster Open Source: Use your instance's dagster.yaml
You can also limit the number of in progress runs by run tag. This is useful for limiting sets of runs independent of which assets or ops it is executing. For example, you might want to limit the number of in-progress runs for a particular schedule. Or, you might want to limit the number of in-progress runs for all backfills.

Check warning on line 62 in docs/docs-beta/docs/guides/operate/managing-concurrency.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Dagster.sentence-length-variety] This paragraph has a stdev less than 2 Raw Output: {"message": "[Dagster.sentence-length-variety] This paragraph has a stdev less than 2", "location": {"path": "docs/docs-beta/docs/guides/operate/managing-concurrency.md", "range": {"start": {"line": 62, "column": 1}}}, "severity": "WARNING"}

To enable this default value, use `concurrency.default_op_concurrency_limit`. For example, the following would set the default concurrency value for the deployment to 1:
```yaml
concurrency:
default_op_concurrency_limit: 1
runs:
tag_concurrency_limits:
- key: "dagster/sensor_name"
value: "my_cool_sensor"
limit: 5
- key: "dagster/backfill"
limit: 10
```

<Tabs>
<TabItem value="Asset Tag" label="Asset tag concurrency limits">
<CodeExample filePath="guides/tbd/concurrency-tag-key-asset.py" language="python" title="No more than 1 asset running with a tag of 'database' across all runs" />

</TabItem>
<TabItem value="Op Tag" label="Op tag concurrency limits">
<CodeExample filePath="guides/tbd/concurrency-tag-key-op.py" language="python" title="No more than 1 op running with a tag of 'database' across all runs" />

</TabItem>
</Tabs>
### Limiting runs by unique tag value

You can also limit concurrency for a tag within the job definition, for example to limit the number of specific assets running at the same time *within* that run.
To apply separate limits to each unique value of a run tag, set a limit for each unique value using applyLimitPerUniqueValue. For example, instead of limiting the number of backfill runs across all backfills, you may want to limit the number of runs for each backfill in progress:

<Tabs>
<TabItem value="Asset Tag with Job" label="Asset tag concurrency limits in a run">
<CodeExample filePath="guides/tbd/concurrency-tag-key-job-asset.py" language="python" title="No more than 1 asset running with a tag of 'database' within a run" />

</TabItem>
<TabItem value="Op Tag with Job" label="Op tag concurrency limits in a run">
<CodeExample filePath="guides/tbd/concurrency-tag-key-job-op.py" language="python" title="No more than 1 op running with a tag of 'database' within a run" />
</TabItem>
</Tabs>
```yaml
concurrency:
run_tag_limits:
- key: "dagster/backfill"
value:
applyLimitPerUniqueValue: true
limit: 10
```

## [Advanced] Limit the number of assets/ops actively in execution across a large set of runs

## Override job level concurrency in the Launchpad
For deployments with complex jobs containing many ops, blocking entire runs for a small number of concurrency-limited ops may be too coarse-grained for your requirements. Instead of enforcing concurrency limits at the run level, Dagster will ensure that the concurrency limit will be applied at the individual op/asset execution level. This means that if one run completes its materialization of a pool's asset, a materialization of another pool asset in a different run may begin even if the first run is still in progress.

You can override the default job-level settings, such as the value of the `max_concurrent` key for a job, by launching a job in the Launchpad in the Dagster UI.
You can set the granularity of the concurrency limit enforcement to be at the op level instead of at the run level:

Need screenshot here
```yaml
concurrency:
pools:
granularity: op
```

## Prevent runs from starting if another run is already occurring (advanced)

Expand Down Expand Up @@ -157,7 +139,7 @@ The possible causes for runs remaining in `QUEUED` status depend on whether you'
**Troubleshoot the run queue configuration**
If the daemon is running, runs may intentionally be left in the queue due to concurrency rules. To investigate:
* **Check the output logged from the daemon process**, as this will include skipped runs.
* **Check the max_concurrent_runs setting in your instance's dagster.yaml**. If set to 0, this may block the queue. You can check this setting in the Dagster UI by navigating to Deployment > Configuration and locating the run_queue.max_concurrent_runs setting. Refer to the Limiting overall runs section for more info.
* **Check the max_concurrent_runs setting in your instance's dagster.yaml**. If set to 0, this may block the queue. You can check this setting in the Dagster UI by navigating to Deployment > Configuration and locating the concurrency.runs.max_concurrent_runs setting. Refer to the Limiting overall runs section for more info.

Check failure on line 142 in docs/docs-beta/docs/guides/operate/managing-concurrency.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Vale.Spelling] Did you really mean 'max_concurrent_runs'? Raw Output: {"message": "[Vale.Spelling] Did you really mean 'max_concurrent_runs'?", "location": {"path": "docs/docs-beta/docs/guides/operate/managing-concurrency.md", "range": {"start": {"line": 142, "column": 19}}}, "severity": "ERROR"}

Check failure on line 142 in docs/docs-beta/docs/guides/operate/managing-concurrency.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Dagster.spelling] Is 'max_concurrent_runs' spelled correctly? Raw Output: {"message": "[Dagster.spelling] Is 'max_concurrent_runs' spelled correctly?", "location": {"path": "docs/docs-beta/docs/guides/operate/managing-concurrency.md", "range": {"start": {"line": 142, "column": 19}}}, "severity": "ERROR"}
* **Check the state of your run queue**. In some cases, the queue may be blocked by some number of in-progress runs. To view the status of your run queue, click **Runs** in the top navigation of the Dagster UI and then open the **Queued** and **In Progress** tabs.

If there are queued or in-progress runs blocking the queue, you can terminate them to allow other runs to proceed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,17 @@
import dagster as dg


@dg.asset
@dg.asset(concurrency_group="foo")
def first_asset(context: dg.AssetExecutionContext):
# sleep so that the asset takes some time to execute
time.sleep(20)
context.log.info("First asset executing")


@dg.asset
def second_asset_that_waits(context: dg.AssetExecutionContext):
@dg.asset(concurrency_group="foo")
def second_asset(context: dg.AssetExecutionContext):
time.sleep(20)
context.log.info("Second asset executing")


my_job = dg.define_asset_job("my_job", [first_asset, second_asset_that_waits])


defs = dg.Definitions(
assets=[first_asset, second_asset_that_waits],
jobs=[my_job],
)
defs = dg.Definitions(assets=[first_asset, second_asset])

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import time

import dagster as dg


@dg.asset(pool="foo")
def my_asset():
pass


@dg.op(pool="bar")
def my_op():
pass


@dg.op(pool="barbar")
def my_downstream_op(inp):
return inp


@dg.graph_asset
def my_graph_asset():
return my_downstream_op(my_op())


defs = dg.Definitions(
assets=[my_asset, my_graph_asset],
)
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
run_queue:
max_concurrent_runs: 1
concurrency:
runs:
max_concurrent_runs: 15

0 comments on commit c15ed10

Please sign in to comment.