-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Break out 'External Metrics' and 'Exporting Metrics' pages and add mo…
…re details
- Loading branch information
Showing
3 changed files
with
230 additions
and
77 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
--- | ||
title: External Metrics for Dagster Cloud Insights | ||
description: "Integrating external metrics with Dagster Insights." | ||
|
||
platform_type: "cloud" | ||
--- | ||
|
||
# External Metrics for Dagster Cloud Insights | ||
|
||
External metrics such as Snowflake credits spent can be integrated in the Dagster Insights UI. The [`dagster-cloud`](https://pypi.org/project/dagster-cloud/) package contains utilities for capturing and submitting external metrics about data operations to Dagster Cloud via an API. | ||
|
||
## How to enable Snowflake and dbt with Insights | ||
|
||
If you use dbt to materialize tables in Snowflake, you can use these instructions to integrate Snowflake metrics into the Insights UI. | ||
|
||
### Step 1 - Instrument your dbt asset definition | ||
|
||
You need `dagster-cloud` version 1.5.1 or newer. Instrument the dagster `@dbt_assets` function with `dbt_with_snowflake_insights`. | ||
|
||
This passes through all the underlying events and in addition emits an `AssetObservation` for each materialization. This observation contains the dbt invocation id and unique id that get recorded in the Dagster event log. | ||
|
||
```python | ||
from dagster_cloud.dagster_insights import dbt_with_snowflake_insights | ||
@dbt_assets(...) | ||
def my_asset(context: AssetExecutionContext): | ||
# Typically you have a `yield from dbt_resource.cli(...)`. | ||
# Wrap the original call with `dbt_with_snowflake_insights` as below. | ||
dbt_cli_invocation = dbt_resource.cli(["build"], context=context) | ||
yield from dbt_with_snowflake_insights(context, dbt_cli_invocation) | ||
``` | ||
|
||
### Step 2 - Update your dbt_project.yml | ||
|
||
Add the following to your `dbt_project.yml`: | ||
|
||
```yaml | ||
query-comment: | ||
comment: "snowflake_dagster_dbt_v1_opaque_id[[[{{ node.unique_id }}:{{ invocation_id }}]]]" | ||
append: true | ||
``` | ||
This adds a comment to each query recorded in the `query_history` table in Snowflake. The comment contains the dbt unique id and invocation id. Here `append: true` is important since Snowflake strips leading comments. | ||
|
||
### Step 3 - Create a metrics ingestion pipeline | ||
|
||
Create a Dagster pipeline that joins asset observation events with the Snowflake query history and calls the Dagster Cloud ingestion API. This needs a Snowflake resource that can query `query_history`. You can use a pre-defined pipeline as below: | ||
|
||
```python | ||
from datetime import date | ||
from dagster_snowflake import SnowflakeResource | ||
from dagster import Definition, EnvVar | ||
from dagster_cloud.dagster_insights import ( | ||
create_snowflake_insights_asset_and_schedule, | ||
) | ||
snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule( | ||
date(2023, 10, 5), | ||
allow_partial_partitions=True, | ||
dry_run=False, | ||
snowflake_resource_key="snowflake_insights", | ||
) | ||
defs = Definitions( | ||
assets=[..., *snowflake_insights_definitions.assets], | ||
schedules=[..., snowflake_insights_deifnitions.schedule], | ||
resources={ | ||
..., | ||
"snowflake_insights": SnowflakeResource( | ||
account=EnvVar("SNOWFLAKE_PURINA_ACCOUNT"), | ||
user=EnvVar("SNOWFLAKE_PURINA_USER"), | ||
password=EnvVar("SNOWFLAKE_PURINA_PASSWORD"), | ||
), | ||
} | ||
) | ||
``` | ||
|
||
The `snowflake_resource_key` is a SnowflakeResource that has access to the `query_history` table. Once the pipeline runs, Snowflake credits should be visible in the Insights tab: | ||
|
||
<Image | ||
alt="Snowflake credits in the Dagster UI" | ||
src="/images/dagster-cloud/insights/insights-snowflake.png" | ||
width={383} | ||
height={349} | ||
/> | ||
|
||
## Limitations | ||
|
||
Currently the following limitations are in effect for external metrics: | ||
|
||
1. _Size_: Up to 2 million individual data points can be added per month. | ||
2. _Retention_: External metrics not retained for 90 days only. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
--- | ||
title: Metrics Export | ||
description: "Exporting metrics from Dagster Cloud Insights." | ||
|
||
platform_type: "cloud" | ||
--- | ||
|
||
# Exporting Metrics from Dagster Cloud Insights | ||
|
||
Metrics avaialable in [Dagster Cloud Insights](/dagster-cloud/insights) can be exported using a GraphQL API. | ||
|
||
## Example | ||
|
||
This examples uses the [GraphQL Python Client](concepts/webserver/graphql-client) to export the Dagster Credits metric for all assets for the month of September 2023: | ||
|
||
```python | ||
from datetime import datetime | ||
from dagster_graphql import DagsterGraphQLClient | ||
|
||
ASSET_METRICS_QUERY = """ | ||
query AssetMetrics($metricName: String, $after: Float, $before: Float) { | ||
reportingMetricsByAsset( | ||
metricsSelector: { | ||
metricName: $metricName | ||
after: $after | ||
before: $before | ||
sortAggregationFunction: SUM | ||
granularity: DAILY | ||
} | ||
) { | ||
__typename | ||
... on ReportingMetrics { | ||
metrics { | ||
values | ||
entity { | ||
... on ReportingAsset { | ||
assetKey { | ||
path | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
""" | ||
|
||
|
||
def get_client(): | ||
url = "YOUR_ORG.dagster.cloud/prod" # Your deployment-scoped url | ||
user_token = "YOUR_TOKEN" # A token generated from Cloud Settings -> Tokens | ||
return DagsterGraphQLClient(url, headers={"Dagster-Cloud-Api-Token": user_token}) | ||
|
||
|
||
if __name__ == "__main__": | ||
client = get_client() | ||
result = client._execute( | ||
ASSET_METRICS_QUERY, | ||
{ | ||
"metricName": "__dagster_dagster_credits", | ||
"after": datetime(2023, 9, 1).timestamp(), | ||
"before": datetime(2023, 10, 1).timestamp(), | ||
}, | ||
) | ||
|
||
for asset_series in result["reportingMetricsByAsset"]["metrics"]: | ||
print("Asset key:", asset_series["entity"]["assetKey"]["path"]) | ||
print("Daily values:", asset_series["values"]) | ||
|
||
``` | ||
|
||
## Reference | ||
|
||
A condensed reference is provided below. For the full reference go to <https://YOUR_ORG.dagster.cloud/prod/graphql> and click on the Schema tab. | ||
|
||
The top level GraphQL queries available are: | ||
|
||
```graphql | ||
reportingMetricsByJob( | ||
metricsFilter: JobReportingMetricsFilter | ||
metricsSelector: ReportingMetricsSelector! | ||
): ReportingMetricsOrError! | ||
|
||
reportingMetricsByAsset( | ||
metricsFilter: AssetReportingMetricsFilter | ||
metricsSelector: ReportingMetricsSelector! | ||
): ReportingMetricsOrError! | ||
|
||
reportingMetricsByAssetGroup( | ||
metricsFilter: AssetGroupReportingMetricsFilter | ||
metricsSelector: ReportingMetricsSelector! | ||
): ReportingMetricsOrError! | ||
``` | ||
|
||
The `metricsSelector` specifies the metric name and time granularity. | ||
|
||
```graphql | ||
input ReportingMetricsSelector { | ||
after: Float # timestamp | ||
before: Float # timestamp | ||
metricName: String # see below for valid values | ||
granularity: ReportingMetricsGranularity | ||
} | ||
|
||
enum ReportingMetricsGranularity { | ||
DAILY | ||
WEEKLY | ||
MONTHLY | ||
} | ||
|
||
# The valid metric names are: | ||
# "__dagster_dagster_credits" | ||
# "__dagster_execution_time_ms" | ||
# "__dagster_materializations" | ||
# "__dagster_step_failures" | ||
# "__dagster_step_retries" | ||
# "__dagster_asset_check_errors" | ||
# "__dagster_asset_check_warnings" | ||
``` | ||
|
||
## Automation | ||
|
||
We do not recommend doing frequent queries over large time windows that may download large amounts of data. After an initial data load, we recommend loading data daily for the most recent week or less. Note that the metrics data is currently computed once daily. |