diff --git a/.github/workflows/build-docs-revamp.yml b/.github/workflows/build-docs-revamp.yml index 7d63950f0ae47..4af9f18c06fe0 100644 --- a/.github/workflows/build-docs-revamp.yml +++ b/.github/workflows/build-docs-revamp.yml @@ -4,7 +4,6 @@ on: paths: - docs/docs-next - .github/workflows/build-docs-revamp.yml - push: branches: - docs/revamp @@ -13,7 +12,6 @@ on: - .github/workflows/build-docs-revamp.yml concurrency: - # Cancel in-progress runs on same branch group: ${{ github.workflow}}-${{github.ref}} cancel-in-progress: true diff --git a/docs/docs-next/docs/concepts/sensors.md b/docs/docs-next/docs/concepts/sensors.md new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/docs/docs-next/docs/guides/automation.md b/docs/docs-next/docs/guides/automation.md index 099aa8ea65642..43a33ac23c798 100644 --- a/docs/docs-next/docs/guides/automation.md +++ b/docs/docs-next/docs/guides/automation.md @@ -6,9 +6,9 @@ last_update: author: Pedram Navid --- -# How To Automate Pipelines in Dagster - -Automation is key to building reliable, efficient data pipelines. This guide covers the main ways to automate processes in Dagster, helping you choose the right method for your needs. +Automation is key to building reliable, efficient data pipelines. +This guide provides a simplified overview of the main ways to automate processes in Dagster, +helping you choose the right method for your needs. You will find links to more detailed guides for each method below. ## What You'll Learn @@ -30,105 +30,49 @@ Before continuing, you should be familiar with: Dagster offers several ways to automate pipeline execution: -1. Schedules - Run jobs at specified times -2. Sensors - Trigger runs based on events -3. Declarative Automation - Automatically materialize assets based on conditions -4. Asset Sensors - Trigger jobs when specific assets materialize +1. [Schedules](#schedules) - Run jobs at specified times +2. [Sensors](#sensors) - Trigger runs based on events +3. [Asset Sensors](#asset-sensors) - Trigger jobs when specific assets materialize Let's look at each method in more detail. ## Schedules Schedules allow you to run jobs at specified times, like "every Monday at 9 AM" or "daily at midnight." -A schedule combines a selection of assets, known as a [Job](/concepts/ops-jobs), and a cron expression in order to define when the job should be run. +A schedule combines a selection of assets, known as a [Job](/concepts/ops-jobs), and a [cron expression](https://en.wikipedia.org/wiki/Cron) +in order to define when the job should be run. + To make creating cron expressions easier, you can use an online tool like [Crontab Guru](https://crontab.guru/). ### When to use Schedules - You need to run jobs at regular intervals -- You want a basic time-based automation method - -### Basic Schedule Example - -```python -from dagster import ScheduleDefinition, define_asset_job - -# A job is a selection of assets that are grouped together for execution -daily_refresh_job = define_asset_job("daily_refresh", selection=["customer_data", "sales_report"]) +- You want basic time-based automation -# Create a schedule that runs the job daily at midnight -daily_schedule = ScheduleDefinition( - job=daily_refresh_job, - cron_schedule="0 0 * * *" # Runs at midnight daily -) -``` +For examples of how to create schedules, see the [How-To Use Schedules](/guides/automation/schedules) guide. -View more detailed examples of schedules in the [How-To Use Schedules](/guides/automation/schedules) -and read more about how Schedules work in [About Schedules](/concepts/schedules). +For more information about how Schedules work, see the [About Schedules](/concepts/schedules) concept page. ## Sensors Sensors allow you to trigger runs based on events or conditions, like a new file arriving or an external system status change. -A sensor requires that you define a function that will +Like schedules, sensors operate on a selection of assets, known as [Jobs](/concepts/ops-jobs) and can either start a pipeline +through a Run or log a reason for not starting a pipeline using a SkipReason. + +However, unlike schedules, sensors are triggered by events that you define. +You must provide a function that the sensor will use to determine if it should trigger a run. ### When to use Sensors - You need event-driven automation - You want to react to changes in external systems -### Basic Sensor Example - -```python -from dagster import RunRequest, SensorDefinition, sensor - -@asset -def my_asset(): - ... - -my_job = define_asset_job("my_job", selection=[my_asset]) - -def check_for_new_files() -> List[str]: - return ["file1", "file2"] +For more examples of how to create sensors, see the [How-To Use Sensors](/guides/automation/sensors) guide. -@sensor(job=my_job) -def new_file_sensor(): - new_files = check_for_new_files() - if new_files: - yield RunRequest(run_key=f"filename") +For more information about how Sensors work, see the [About Sensors](/concepts/sensors) concept page. -``` - -## 3. Declarative Automation - -Declarative Automation allows you to automatically materialize assets when specified criteria are met, without needing to define explicit jobs. - -### When to use Declarative Automation - -- You're working primarily with assets -- You want a simpler, more declarative approach to automation - -### Basic Declarative Automation Example - -```python -from dagster import asset, AutoMaterializePolicy, AutoMaterializeRule - -@asset( - auto_materialize_policy=AutoMaterializePolicy( - rules=[ - # Materialize if upstream assets have changed - AutoMaterializeRule.materialize_on_parent_updated(), - # Materialize daily at 2 AM - AutoMaterializeRule.materialize_on_cron("0 2 * * *"), - ] - ) -) -def my_asset(): - # Asset computation logic here - pass -``` - -## 4. Asset Sensors +## Asset Sensors Asset Sensors trigger jobs when specified assets are materialized, allowing you to create dependencies between jobs or code locations. @@ -137,15 +81,8 @@ Asset Sensors trigger jobs when specified assets are materialized, allowing you - You need to trigger jobs based on asset materializations - You want to create dependencies between different jobs or code locations -### Basic Asset Sensor Example - -```python -from dagster import AssetSensor, RunRequest, asset_sensor +For more examples of how to create asset sensors, see the [How-To Use Asset Sensors](/guides/automation/asset-sensors) guide. -@asset_sensor(asset_key=["raw_data"], job=process_raw_data_job) -def raw_data_sensor(context): - yield RunRequest(run_key=context.cursor) -``` ## Choosing the Right Automation Method @@ -171,4 +108,4 @@ Use this table to help guide your decision: - Explore [complex sensor examples] - TODO ADD LINK - Dive into [Declarative Automation best practices] - TODO ADD LINK -By understanding and effectively using these automation methods, you can build robust, efficient data pipelines that respond to your specific needs and constraints. \ No newline at end of file +By understanding and effectively using these automation methods, you can build more efficient data pipelines that respond to your specific needs and constraints. \ No newline at end of file diff --git a/docs/docs-next/docs/guides/automation/asset-sensors.md b/docs/docs-next/docs/guides/automation/asset-sensors.md new file mode 100644 index 0000000000000..1d928c3cc59bc --- /dev/null +++ b/docs/docs-next/docs/guides/automation/asset-sensors.md @@ -0,0 +1,10 @@ +--- +title: Asset Sensors +sidebar_position: 50 +--- + +### Basic Asset Sensor Example + + + +This Asset Sensor will trigger a run of `my_job` whenever the `asset_to_watch` asset is materialized. diff --git a/docs/docs-next/docs/guides/automation/creating-dynamic-pipelines-based-on-external-data.md b/docs/docs-next/docs/guides/automation/creating-dynamic-pipelines-based-on-external-data.md index 6aa876233cae2..98576d9d32794 100644 --- a/docs/docs-next/docs/guides/automation/creating-dynamic-pipelines-based-on-external-data.md +++ b/docs/docs-next/docs/guides/automation/creating-dynamic-pipelines-based-on-external-data.md @@ -1,6 +1,6 @@ --- title: "Creating dynamic pipelines based on external data" -sidebar_position: 3 +sidebar_position: 30 --- -# Creating dynamic pipelines based on external data \ No newline at end of file +# Creating dynamic pipelines based on external data diff --git a/docs/docs-next/docs/guides/automation/running-pipelines-on-a-schedule.md b/docs/docs-next/docs/guides/automation/running-pipelines-on-a-schedule.md deleted file mode 100644 index c99994d5c9a78..0000000000000 --- a/docs/docs-next/docs/guides/automation/running-pipelines-on-a-schedule.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -title: "Scheduling runs" -sidebar_position: 1 ---- - -# Scheduling runs \ No newline at end of file diff --git a/docs/docs-next/docs/guides/automation/schedules.md b/docs/docs-next/docs/guides/automation/schedules.md index e69de29bb2d1d..db5105529916e 100644 --- a/docs/docs-next/docs/guides/automation/schedules.md +++ b/docs/docs-next/docs/guides/automation/schedules.md @@ -0,0 +1,74 @@ +--- +title: "Scheduling pipelines" +sidebar_label: "Running pipelines on a schedule" +sidebar_position: 10 +--- + +## Basic Schedule Example + +A basic schedule is defined by a `JobDefinition` and a `cron_schedule` using the `ScheduleDefinition` class. + + + +## How to Set Custom Timezones + +By default, schedules without a timezone will run in UTC. If you want to run a schedule in a different timezone, you can +set the `timezone` parameter. + +```python +ecommerce_schedule = ScheduleDefinition( + job=ecommerce_job, + cron_schedule="15 5 * * 1-5", +timezone="America/Los_Angeles", +) +``` + +## How to Create Partitioned Schedules + +If you have a partitioned asset and job, you can create a schedule from the partition using `build_schedule_from_partitioned_job`. +The schedule will execute as the same cadence specified by the partition definition. + +```python +from dagster import ( + asset, + build_schedule_from_partitioned_job, + define_asset_job, + DailyPartitionsDefinition, +) + +daily_partition = DailyPartitionsDefinition(start_date="2024-05-20") + + +@asset(partitions_def=daily_partition) +def daily_asset(): ... + +partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset]) + +# highlight-start +# This partition will run daily +asset_partitioned_schedule = build_schedule_from_partitioned_job( + partitioned_asset_job, +) +# highlight-end + +``` + +If you have a partitioned job, you can create a schedule from the partition using `build_schedule_from_partitioned_job`. + +```python +from dagster import build_schedule_from_partitioned_job, job + + +@job(config=partitioned_config) +def partitioned_op_job(): ... + +# highlight-start +partitioned_op_schedule = build_schedule_from_partitioned_job( + partitioned_op_job, +) +# highlight-end +``` + +--- + +For more information about how Schedules work, see the [About Schedules](/concepts/schedules) concept page. diff --git a/docs/docs-next/docs/guides/automation/sensors.md b/docs/docs-next/docs/guides/automation/sensors.md new file mode 100644 index 0000000000000..fcaa64aa5be7c --- /dev/null +++ b/docs/docs-next/docs/guides/automation/sensors.md @@ -0,0 +1,18 @@ +--- +title: Sensor Examples +--- + +### Basic Sensor Example + +This example includes a `check_for_new_files` function that simulates finding new files. In a real scenario, this function would check an actual system or directory. + +The sensor runs every 5 seconds. If it finds new files, it starts a run of `my_job`. If not, it skips the run and logs "No new files found" in the Dagster UI. + + + +:::tip + +By default, sensors aren't enabled when first deployed to a Dagster instance. +Click "Automation" in the top navigation to find and enable a sensor. + +::: diff --git a/docs/docs-next/docs/guides/automation/simple-asset-sensor-example.py b/docs/docs-next/docs/guides/automation/simple-asset-sensor-example.py new file mode 100644 index 0000000000000..3432bb604d190 --- /dev/null +++ b/docs/docs-next/docs/guides/automation/simple-asset-sensor-example.py @@ -0,0 +1,36 @@ +from dagster import ( + AssetExecutionContext, + AssetKey, + Definitions, + RunRequest, + asset, + asset_sensor, + define_asset_job, +) + + +@asset +def asset_to_watch(context: AssetExecutionContext): + context.log.info("Asset to watch") + + +@asset +def asset_to_trigger(context: AssetExecutionContext): + context.log.info("Asset to trigger") + + +my_job = define_asset_job("my_job", [asset_to_trigger]) + + +# highlight-start +@asset_sensor(asset_key=AssetKey("asset_to_watch"), job_name="my_job") +def my_asset_sensor(): + yield RunRequest() + # highlight-end + + +defs = Definitions( + assets=[asset_to_watch, asset_to_trigger], + jobs=[my_job], + sensors=[my_asset_sensor], +) diff --git a/docs/docs-next/docs/guides/automation/simple-schedule-example.py b/docs/docs-next/docs/guides/automation/simple-schedule-example.py new file mode 100644 index 0000000000000..33ec3e6b791e4 --- /dev/null +++ b/docs/docs-next/docs/guides/automation/simple-schedule-example.py @@ -0,0 +1,25 @@ +from dagster import Definitions, ScheduleDefinition, asset, define_asset_job + + +@asset +def customer_data(): ... + + +@asset +def sales_report(): ... + + +daily_refresh_job = define_asset_job("daily_refresh", selection=["customer_data", "sales_report"]) + +# highlight-start +daily_schedule = ScheduleDefinition( + job=daily_refresh_job, + cron_schedule="0 0 * * *", # Runs at midnight daily +) +# highlight-end + +defs = Definitions( + assets=[customer_data, sales_report], + jobs=[daily_refresh_job], + schedules=[daily_schedule], +) diff --git a/docs/docs-next/docs/guides/automation/simple-sensor-example.py b/docs/docs-next/docs/guides/automation/simple-sensor-example.py index 1f7210d32984c..e6011a6d4ecea 100644 --- a/docs/docs-next/docs/guides/automation/simple-sensor-example.py +++ b/docs/docs-next/docs/guides/automation/simple-sensor-example.py @@ -1,6 +1,15 @@ +import random from typing import List -from dagster import AssetExecutionContext, Definitions, RunRequest, asset, define_asset_job, sensor +from dagster import ( + AssetExecutionContext, + Definitions, + RunRequest, + SkipReason, + asset, + define_asset_job, + sensor, +) @asset @@ -11,15 +20,22 @@ def my_asset(context: AssetExecutionContext): my_job = define_asset_job("my_job", selection=[my_asset]) +# highlight-start def check_for_new_files() -> List[str]: - return ["file1", "file2"] + if random.random() > 0.5: + return ["file1", "file2"] + return [] -@sensor(target=my_job) +@sensor(target=my_job, minimum_interval_seconds=5) def new_file_sensor(): new_files = check_for_new_files() - for filename in new_files: - yield RunRequest(run_key=filename) + if new_files: + for filename in new_files: + yield RunRequest(run_key=filename) + else: + yield SkipReason("No new files found") + # highlight-end defs = Definitions(assets=[my_asset], jobs=[my_job], sensors=[new_file_sensor]) diff --git a/docs/docs-next/docs/guides/automation/triggering-pipeline-runs-using-events.md b/docs/docs-next/docs/guides/automation/triggering-pipeline-runs-using-events.md index a22694860e3fa..17826f1c536e4 100644 --- a/docs/docs-next/docs/guides/automation/triggering-pipeline-runs-using-events.md +++ b/docs/docs-next/docs/guides/automation/triggering-pipeline-runs-using-events.md @@ -1,6 +1,4 @@ --- -title: "Triggering runs using events" -sidebar_position: 2 +title: "Creating event-based pipelines" +sidebar_position: 20 --- - -# Triggering pipeline runs using events \ No newline at end of file diff --git a/docs/vale/styles/Dagster/section-heading-sentence-case.yml b/docs/vale/styles/Dagster/section-heading-sentence-case.yml new file mode 100644 index 0000000000000..93244ad05e7c6 --- /dev/null +++ b/docs/vale/styles/Dagster/section-heading-sentence-case.yml @@ -0,0 +1,5 @@ +extends: capitalization +message: "'%s' should be in sentence case" +level: error +scope: heading +match: $sentence diff --git a/docs/vale/styles/config/vocabularies/Dagster/accept.txt b/docs/vale/styles/config/vocabularies/Dagster/accept.txt index 9ee817464ef1a..ca940b3243c3c 100644 --- a/docs/vale/styles/config/vocabularies/Dagster/accept.txt +++ b/docs/vale/styles/config/vocabularies/Dagster/accept.txt @@ -11,3 +11,5 @@ dataframe dataframes DataFrame cron +materializations +webserver \ No newline at end of file