Skip to content

Commit

Permalink
remove references to ops from schedule docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jun 1, 2024
1 parent 1fe387d commit 8101460
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ This parameter accepts any [`tz` timezone](https://en.wikipedia.org/wiki/List_of

```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_timezone endbefore=end_timezone
my_timezone_schedule = ScheduleDefinition(
job=my_job, cron_schedule="0 9 * * *", execution_timezone="America/Los_Angeles"
job=asset_job, cron_schedule="0 9 * * *", execution_timezone="America/Los_Angeles"
)
```

Expand Down
47 changes: 20 additions & 27 deletions docs/content/concepts/automation/schedules/examples.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ The following examples demonstrate how to define some basic schedules.
<TabGroup>
<TabItem name="Using ScheduleDefinition">

This example demonstrates how to define a schedule using <PyObject object="ScheduleDefinition" /> that will run a job every day at midnight. While this example uses [op jobs](/concepts/ops-jobs-graphs/jobs) (<PyObject object="job" decorator />), the same approach will work with [asset jobs](/concepts/assets/asset-jobs) (<PyObject object="define_asset_job" />).
This example demonstrates how to define a schedule using <PyObject object="ScheduleDefinition" /> that will run a job every day at midnight.

```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_basic_schedule endbefore=end_basic_schedule
@job
def my_job(): ...
```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_basic_asset_schedule endbefore=end_basic_asset_schedule
from dagster import AssetSelection, ScheduleDefinition, define_asset_job

asset_job = define_asset_job("asset_job", AssetSelection.groups("some_asset_group"))

basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")
basic_schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 0 * * *")
```

<table
Expand Down Expand Up @@ -202,7 +202,7 @@ def basic_schedule(): ...
This example demonstrates how to emit log messages from a schedule during its evaluation function. These logs will be visible in the UI when you inspect a tick in the schedule's tick history.

```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_schedule_logging endbefore=end_schedule_logging
@schedule(job=my_job, cron_schedule="* * * * *")
@schedule(job=asset_job, cron_schedule="* * * * *")
def logs_then_skips(context):
context.log.info("Logging from a schedule!")
return SkipReason("Nothing to do")
Expand Down Expand Up @@ -276,6 +276,7 @@ from dagster import (
job,
RunRequest,
RunConfig,
define_asset_job,
Definitions,
)
from datetime import datetime
Expand All @@ -287,8 +288,9 @@ class DateFormatter(ConfigurableResource):
def strftime(self, dt: datetime) -> str:
return dt.strftime(self.format)

@job
def process_data(): ...
process_data = define_asset_job(
"process_data",
)

@schedule(job=process_data, cron_schedule="* * * * *")
def process_data_schedule(
Expand Down Expand Up @@ -387,14 +389,12 @@ defs = Definitions(
This example demonstrates how to use run config to vary the behavior of a job based on its scheduled run time.

```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_run_config_schedule endbefore=end_run_config_schedule
@op(config_schema={"scheduled_date": str})
def configurable_op(context: OpExecutionContext):
@asset(config_schema={"scheduled_date": str})
def configurable_asset(context: OpExecutionContext):
context.log.info(context.op_config["scheduled_date"])


@job
def configurable_job():
configurable_op()
configurable_job = define_asset_job("configurable_job", [configurable_asset])


@schedule(job=configurable_job, cron_schedule="0 0 * * *")
Expand All @@ -403,7 +403,9 @@ def configurable_job_schedule(context: ScheduleEvaluationContext):
return RunRequest(
run_key=None,
run_config={
"ops": {"configurable_op": {"config": {"scheduled_date": scheduled_date}}}
"ops": {
"configurable_asset": {"config": {"scheduled_date": scheduled_date}}
}
},
tags={"date": scheduled_date},
)
Expand Down Expand Up @@ -456,7 +458,7 @@ This example demonstrates how to customize the timezone a schedule executes in.

```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_timezone endbefore=end_timezone
my_timezone_schedule = ScheduleDefinition(
job=my_job, cron_schedule="0 9 * * *", execution_timezone="America/Los_Angeles"
job=asset_job, cron_schedule="0 9 * * *", execution_timezone="America/Los_Angeles"
)
```

Expand Down Expand Up @@ -542,7 +544,7 @@ def daily_asset(): ...
partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset])


asset_partitioned_schedule = build_schedule_from_partitioned_job(
partitioned_asset_schedule = build_schedule_from_partitioned_job(
partitioned_asset_job,
)
```
Expand Down Expand Up @@ -616,16 +618,7 @@ asset_partitioned_schedule = build_schedule_from_partitioned_job(
This example demonstrates how to construct a schedule for a time-partitioned op job using <PyObject object="build_schedule_from_partitioned_job"/>.

```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_marker endbefore=end_marker
from dagster import build_schedule_from_partitioned_job, job


@job(config=partitioned_config)
def partitioned_op_job(): ...


partitioned_op_schedule = build_schedule_from_partitioned_job(
partitioned_op_job,
)
No match for startAfter value "start_marker"
```

<table
Expand Down Expand Up @@ -693,7 +686,7 @@ from dagster import schedule, RunRequest
@schedule(cron_schedule="0 0 * * *", job=continent_job)
def continent_schedule():
for c in CONTINENTS:
yield RunRequest(run_key=c, partition_key=c)
yield RunRequest(partition_key=c)
```

<table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ To follow this guide, you need to be familiar with:

For jobs partitioned by time, you can use the <PyObject object="build_schedule_from_partitioned_job"/> to construct a schedule for the job. The schedule's interval will match the spacing of the partitions in the job. For example, if you have a daily partitioned job that fills in a date partition of a table each time it runs, you likely want to run that job every day.

Refer to the following tabs for examples of asset and op-based jobs using <PyObject object="build_schedule_from_partitioned_job"/> to construct schedules:

<TabGroup>
<TabItem name="Asset jobs">

#### Asset jobs

Asset jobs are defined using <PyObject object="define_asset_job" />. In this example, we created an asset job named `partitioned_job` and then constructed `asset_partitioned_schedule` by using <PyObject object="build_schedule_from_partitioned_job"/>:

```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_partitioned_asset_schedule endbefore=end_partitioned_asset_schedule
from dagster import (
asset,
Expand All @@ -56,34 +47,11 @@ def daily_asset(): ...
partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset])


asset_partitioned_schedule = build_schedule_from_partitioned_job(
partitioned_asset_schedule = build_schedule_from_partitioned_job(
partitioned_asset_job,
)
```

</TabItem>
<TabItem name="Op jobs">

#### Op jobs

Op jobs are defined using the <PyObject object="job" decorator />. In this example, we created a partitioned job named `partitioned_op_job` and then constructed `partitioned_op_schedule` using <PyObject object="build_schedule_from_partitioned_job"/>:

```python file=/concepts/partitions_schedules_sensors/schedule_from_partitions.py startafter=start_marker endbefore=end_marker
from dagster import build_schedule_from_partitioned_job, job


@job(config=partitioned_config)
def partitioned_op_job(): ...


partitioned_op_schedule = build_schedule_from_partitioned_job(
partitioned_op_job,
)
```

</TabItem>
</TabGroup>

### Customizing schedule timing

The `minute_of_hour`, `hour_of_day`, `day_of_week`, and `day_of_month` parameters of `build_schedule_from_partitioned_job` can be used to control the timing of the schedule.
Expand Down
18 changes: 10 additions & 8 deletions docs/content/concepts/automation/schedules/testing.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,12 @@ To test a function decorated by the <PyObject object="schedule" decorator /> dec
Let's say we want to test the `configurable_job_schedule` in this example:

```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_run_config_schedule endbefore=end_run_config_schedule
@op(config_schema={"scheduled_date": str})
def configurable_op(context: OpExecutionContext):
@asset(config_schema={"scheduled_date": str})
def configurable_asset(context: OpExecutionContext):
context.log.info(context.op_config["scheduled_date"])


@job
def configurable_job():
configurable_op()
configurable_job = define_asset_job("configurable_job", [configurable_asset])


@schedule(job=configurable_job, cron_schedule="0 0 * * *")
Expand All @@ -68,7 +66,9 @@ def configurable_job_schedule(context: ScheduleEvaluationContext):
return RunRequest(
run_key=None,
run_config={
"ops": {"configurable_op": {"config": {"scheduled_date": scheduled_date}}}
"ops": {
"configurable_asset": {"config": {"scheduled_date": scheduled_date}}
}
},
tags={"date": scheduled_date},
)
Expand Down Expand Up @@ -104,6 +104,7 @@ from dagster import (
job,
RunRequest,
RunConfig,
define_asset_job,
Definitions,
)
from datetime import datetime
Expand All @@ -115,8 +116,9 @@ class DateFormatter(ConfigurableResource):
def strftime(self, dt: datetime) -> str:
return dt.strftime(self.format)

@job
def process_data(): ...
process_data = define_asset_job(
"process_data",
)

@schedule(job=process_data, cron_schedule="* * * * *")
def process_data_schedule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,16 @@ def partitioned_op_job():
Not all jobs are partitioned by time. For example, the following example shows a partitioned job where the partitions are continents:

```python file=/concepts/partitions_schedules_sensors/static_partitioned_job.py
from dagster import Config, OpExecutionContext, job, op, static_partitioned_config
from dagster import (
AssetExecutionContext,
Config,
StaticPartitionsDefinition,
asset,
define_asset_job,
job,
op,
static_partitioned_config,
)

CONTINENTS = [
"Africa",
Expand All @@ -132,23 +141,12 @@ CONTINENTS = [
]


@static_partitioned_config(partition_keys=CONTINENTS)
def continent_config(partition_key: str):
return {"ops": {"continent_op": {"config": {"continent_name": partition_key}}}}
@asset(partitions_def=StaticPartitionsDefinition(CONTINENTS))
def continents_info(context: AssetExecutionContext):
context.log.info(f"continent name: {context.partition_key}")


class ContinentOpConfig(Config):
continent_name: str


@op
def continent_op(context: OpExecutionContext, config: ContinentOpConfig):
context.log.info(config.continent_name)


@job(config=continent_config)
def continent_job():
continent_op()
continent_job = define_asset_job("continent_job", [continents_info])
```

---
Expand Down
Loading

0 comments on commit 8101460

Please sign in to comment.