Skip to content

Commit

Permalink
[du] updating based on API changes from 1dot7
Browse files Browse the repository at this point in the history
  • Loading branch information
tacastillo committed May 8, 2024
1 parent 594292b commit dd0024e
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Let’s add metadata to the `taxi_trips_file` asset to demonstrate further. This
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""

partition_date_str = context.asset_partition_key_for_output()
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]

raw_trips = requests.get(
Expand Down Expand Up @@ -85,7 +85,7 @@ Let’s add metadata to the `taxi_trips_file` asset to demonstrate further. This
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""

partition_date_str = context.asset_partition_key_for_output()
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]

raw_trips = requests.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The job you built should look similar to the following code. Click **View answer
```python {% obfuscated="true" %}
from dagster import define_asset_job, AssetSelection

trips_by_week = AssetSelection.keys(["trips_by_week"])
trips_by_week = AssetSelection.assets(["trips_by_week"])

weekly_update_job = define_asset_job(
name="weekly_update_job",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ As you might have noticed while defining assets and resources, Dagster’s best
To select only the assets you want to include, you’ll use the `AssetSelection` class. This class lets you look up and reference assets across your code location. In particular, there will be two methods that you’ll be using:

- `AssetSelection.all()` gives you a list of all asset definitions in the code location
- `AssetSelection.keys([<string>, ...])` which gives you a list of assets that match the asset keys provided
- `AssetSelection.assets([<string>, ...])` which gives you a list of assets that match the asset keys provided

For more info on asset selection, refer to the [asset selection syntax guide in the Dagster docs](https://docs.dagster.io/concepts/assets/asset-selection-syntax).

Expand All @@ -47,7 +47,7 @@ For more info on asset selection, refer to the [asset selection syntax guide in
```python
from dagster import AssetSelection

trips_by_week = AssetSelection.keys("trips_by_week")
trips_by_week = AssetSelection.assets("trips_by_week")
```

This uses the `AssetSelection` utility to reference a single asset, `trips_by_week`. We’ll isolate this specifically because we won’t want to run it with the rest of our pipeline and it should be run more frequently.
Expand All @@ -74,7 +74,7 @@ Your final code in `jobs/__init__.py` should look like the following:
```python
from dagster import AssetSelection, define_asset_job

trips_by_week = AssetSelection.keys("trips_by_week")
trips_by_week = AssetSelection.assets("trips_by_week")

trip_update_job = define_asset_job(
name="trip_update_job",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,38 @@ To add the partition to the asset:
)
```

3. In Dagster, the `context` argument provides you with metadata about the current materialization. To access it, include it as the first argument in the asset definition function:
3. In Dagster, the `context` argument provides you with metadata about the current materialization. To access it, include it as the first argument in the asset definition function. You can enable typehinting for this by importing `AssetExecutionContext` from `dagster` and adding it to the function signature. For example, the updated asset definition should look like this:

```python
from dagster import asset, AssetExecutionContext


@asset(
partitions_def=monthly_partition
)
def taxi_trips_file(context):
def taxi_trips_file(context: AssetExecutionContext):
```

**Note**: The `context` argument isn’t specific to partitions. However, this is the first time you've used it in Dagster University. The `context` argument provides information about how Dagster is running and materializing your asset. For example, you can use it to find out which partition Dagster is materializing, which job triggered the materialization, or what metadata was attached to its previous materializations.

4. In the original asset code, the logic was hard-coded to specifically fetch data for March 2023 (`'2023-03'`). Use the `context` argument’s `asset_partition_key_for_output()` method to dynamically fetch a specific partition’s month of data:
4. In the original asset code, the logic was hard-coded to specifically fetch data for March 2023 (`'2023-03'`). Use the `context` argument’s `partition_key` property to dynamically fetch a specific partition’s month of data:

```python
@asset(
partitions_def=monthly_partition
)
def taxi_trips_file(context):
partition_date_str = context.asset_partition_key_for_output()
partition_date_str = context.partition_key
```

5. In the NYC OpenData source system, the taxi trip files are structured in a `YYYY-MM` format. However, `context.asset_partition_key_for_output()` supplies the materializing partition’s date as a string in the `YYYY-MM-DD` format. Slice the string to make it match the format expected by our source system and replace our existing declaration of the `month_to_fetch` variable:
5. In the NYC OpenData source system, the taxi trip files are structured in a `YYYY-MM` format. However, `context.partition_key` supplies the materializing partition’s date as a string in the `YYYY-MM-DD` format. Slice the string to make it match the format expected by our source system and replace our existing declaration of the `month_to_fetch` variable:

```python
@asset(
partitions_def=monthly_partition
)
def taxi_trips_file(context):
partition_date_str = context.asset_partition_key_for_output()
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]
```

Expand All @@ -88,7 +91,7 @@ def taxi_trips_file(context):
The raw parquet files for the taxi trips dataset. Sourced from the NYC Open Data portal.
"""

partition_date_str = context.asset_partition_key_for_output()
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]

raw_trips = requests.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,20 @@ The updated asset should look similar to the following code. Click **View answer
**If there are differences**, compare what you wrote to the asset below and change them, as this asset will be used as-is in future lessons.

```python {% obfuscated="true" %}
from dagster import asset
from dagster import asset, AssetExecutionContext
from dagster_duckdb import DuckDBResource
from ..partitions import monthly_partitions

@asset(
deps=["taxi_trips_file"],
partitions_def=monthly_partition,
)
def taxi_trips(context, database: DuckDBResource):
def taxi_trips(context: AssetExecutionContext, database: DuckDBResource):
"""
The raw taxi trips dataset, loaded into a DuckDB database, partitioned by month.
"""

partition_date_str = context.asset_partition_key_for_output()
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]

query = f"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ from ..partitions import weekly_partition
deps=["taxi_trips"],
partitions_def=weekly_partition
)
def trips_by_week(context, database: DuckDBResource):
def trips_by_week(context: AssetExecutionContext, database: DuckDBResource):
"""
The number of trips per week, aggregated by week.
"""

period_to_fetch = context.asset_partition_key_for_output()
period_to_fetch = context.partition_key

# get all trips for the week
query = f"""
Expand Down Expand Up @@ -76,7 +76,7 @@ def trips_by_week(context, database: DuckDBResource):
from dagster import define_asset_job, AssetSelection
from ..partitions import weekly_partition

trips_by_week = AssetSelection.keys("trips_by_week")
trips_by_week = AssetSelection.assets("trips_by_week")

weekly_update_job = define_asset_job(
name="weekly_update_job",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Currently, `trip_update_job` in `jobs/__init__.py` should look like this:
```python
trip_update_job = define_asset_job(
name="trip_update_job",
selection=AssetSelection.all() - AssetSelection.keys(["trips_by_week"]),
selection=AssetSelection.all() - AssetSelection.assets(["trips_by_week"]),
)
```

Expand All @@ -42,6 +42,6 @@ from ..partitions import monthly_partition
trip_update_job = define_asset_job(
name="trip_update_job",
partitions_def=monthly_partition, # partitions added here
selection=AssetSelection.all() - AssetSelection.keys(["trips_by_week"])
selection=AssetSelection.all() - AssetSelection.assets(["trips_by_week"])
)
```
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ Now that cursors have been explained, let’s start writing the sensor.
- The `json` standard library will be used to read the request’s JSON files as needed
- `adhoc_request_job` is used to specify that the sensor will create runs from this job

3. To define a sensor, create a new function definition that takes `context` as a parameter. Your code should look like the snippet below:
3. To define a sensor, create a new function definition that takes `context` as a parameter. Similar to how your asset definitions had a context argument of type `AssetExecutionContext`, sensor definitions also have a similar `SensorEvaluationContext` to provide information and metadata about the currently running sensor. Your code should look like the snippet below:

```python
from dagster import sensor, SensorEvaluationContext

@sensor
def adhoc_request_sensor(context):
def adhoc_request_sensor(context: SensorEvaluationContext):
```

4. Annotate the function with the `@sensor` decorator and pass `adhoc_request_job` as an argument for the job parameter. At this point, your code should look like this:
Expand All @@ -72,7 +74,7 @@ Now that cursors have been explained, let’s start writing the sensor.
@sensor(
job=adhoc_request_job
)
def adhoc_request_sensor(context):
def adhoc_request_sensor(context: SensorEvaluationContext):
```

5. Let’s fill out the function’s body. Create a variable that resolves to the `data/requests` directory, which is the directory the sensor will observe:
Expand All @@ -81,7 +83,7 @@ Now that cursors have been explained, let’s start writing the sensor.
@sensor(
job=adhoc_request_job
)
def adhoc_request_sensor(context):
def adhoc_request_sensor(context: SensorEvaluationContext):
PATH_TO_REQUESTS = os.path.join(os.path.dirname(__file__), "../../", "data/requests")
```

Expand Down Expand Up @@ -167,7 +169,7 @@ from ..jobs import adhoc_request_job
@sensor(
job=adhoc_request_job
)
def adhoc_request_sensor(context):
def adhoc_request_sensor(context: SensorEvaluationContext):
PATH_TO_REQUESTS = os.path.join(os.path.dirname(__file__), "../../", "data/requests")

previous_state = json.loads(context.cursor) if context.cursor else {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Next, you’ll create a job that materializes the new `adhoc_request` asset. Thi
Navigate to the `jobs/__init__.py` file and add the following lines to create a job for your ad-hoc requests

```python
adhoc_request = AssetSelection.keys(["adhoc_request"])
adhoc_request = AssetSelection.assets(["adhoc_request"])

adhoc_request_job = define_asset_job(
name="adhoc_request_job",
Expand Down

0 comments on commit dd0024e

Please sign in to comment.