Skip to content

Commit

Permalink
Break out 'External Metrics' and 'Exporting Metrics' pages and add mo…
Browse files Browse the repository at this point in the history
…re details
  • Loading branch information
shalabhc committed Oct 9, 2023
1 parent 4f1aa1a commit e798a11
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 77 deletions.
91 changes: 14 additions & 77 deletions docs/content/dagster-cloud/insights.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -23,85 +23,22 @@ width={771}
height={536}
/>

The Insights page shows a list of metrics in the left panel. For each metric the daily, weekly or monthly aggregated values are shown in a graph in the main panel. As of October 2023 the metrics are update once a day.
The Insights page shows a list of metrics in the left panel. For each metric the daily, weekly or monthly aggregated values are shown in a graph in the main panel. As of October 2023 the metrics are update once a day. The following metrics are available:

- **Dagster credits**: Dagster credit cost associated with computing this object. Dagster credits are charged for every step that is run, and for every asset that is materialized.
- **Compute duration**: Time spent computing steps. For jobs which run steps in parallel, the compute duration may be longer than the wall clock time it takes for the run to complete.
- **Retry compute**: "Time spent computing steps, including time spent retrying failed steps. For jobs which run steps in parallel, the compute duration may be longer than the wall clock time it takes for the run to complete.
- **Step retries**: Number of times steps were retried when computing this object.
- **Observations**: Number of asset observations associated with computing this object.
- **Materializations**: Number of asset materializations associated with computing this object.
- **Step failures**: Number of times steps failed when computing this object. Note that steps which retry and succeed are not tallied in this metric.
- **Asset check warnings**: Number of asset checks that produced warnings.
- **Asset check errors**: Number of asset checks that produced errors.

## External metrics

External metrics such as Snowflake credits spent can be integrated in the Dagster Insights UI. The [`dagster-cloud`](https://pypi.org/project/dagster-cloud/) package contains utilities for capturing and submitting external metrics about data operations to Dagster Cloud via an API.
External metrics such as Snowflake credits spent can be integrated in the Dagster Insights UI. See [External metrics](/dagster-cloud/insights/external-metrics) for details.

### How to enable Snowflake and dbt with Insights
## Exporting metrics

If you use dbt to materialize tables in Snowflake, you can use these instructions to integrate Snowflake metrics into the Insights UI.

#### Step 1 - Instrument your dbt asset definition

You need `dagster-cloud` version 1.5.1 or newer. Instrument the dagster `@dbt_assets` function with `dbt_with_snowflake_insights`.

This passes through all the underlying events and in addition emits an `AssetObservation` for each materialization. This observation contains the dbt invocation id and unique id that get recorded in the Dagster event log.

```python
from dagster_cloud.dagster_insights import dbt_with_snowflake_insights
@dbt_assets(...)
def my_asset(context: AssetExecutionContext):
# Typically you have a `yield from dbt_resource.cli(...)`.
# Wrap the original call with `dbt_with_snowflake_insights` as below.
dbt_cli_invocation = dbt_resource.cli(["build"], context=context)
yield from dbt_with_snowflake_insights(context, dbt_cli_invocation)
```

#### Step 2 - Update your dbt_project.yml

Add the following to your `dbt_project.yml`:

```yaml
query-comment:
comment: "snowflake_dagster_dbt_v1_opaque_id[[[{{ node.unique_id }}:{{ invocation_id }}]]]"
append: true
```
This adds a comment to each query recorded in the `query_history` table in Snowflake. The comment contains the dbt unique id and invocation id. Here `append: true` is important since Snowflake strips leading comments.

#### Step 3 - Create a metrics ingestion pipeline

Create a Dagster pipeline that joins asset observation events with the Snowflake query history and calls the Dagster Cloud ingestion API. This needs a Snowflake resource that can query `query_history`. You can use a pre-defined pipeline as below:

```python
from datetime import date
from dagster_snowflake import SnowflakeResource
from dagster import Definition, EnvVar
from dagster_cloud.dagster_insights import (
create_snowflake_insights_asset_and_schedule,
)
snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule(
date(2023, 10, 5),
allow_partial_partitions=True,
dry_run=False,
snowflake_resource_key="snowflake_insights",
)
defs = Definitions(
assets=[..., *snowflake_insights_definitions.assets],
schedules=[..., snowflake_insights_deifnitions.schedule],
resources={
...,
"snowflake_insights": SnowflakeResource(
account=EnvVar("SNOWFLAKE_PURINA_ACCOUNT"),
user=EnvVar("SNOWFLAKE_PURINA_USER"),
password=EnvVar("SNOWFLAKE_PURINA_PASSWORD"),
),
}
)
```

The `snowflake_resource_key` is a SnowflakeResource that has access to the `query_history` table. Once the pipeline runs, Snowflake credits should be visible in the Insights tab:

<Image
alt="Snowflake credits in the Dagster UI"
src="/images/dagster-cloud/insights/insights-snowflake.png"
width={383}
height={349}
/>

---
Dagster Cloud Insights metrics can be exported using a GraphQL API. See [Exporting metrics](/dagster-cloud/insights/metrics-export) for details.
92 changes: 92 additions & 0 deletions docs/content/dagster-cloud/insights/external-metrics.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
---
title: External Metrics for Dagster Cloud Insights
description: "Integrating external metrics with Dagster Insights."

platform_type: "cloud"
---

# External Metrics for Dagster Cloud Insights

External metrics such as Snowflake credits spent can be integrated in the Dagster Insights UI. The [`dagster-cloud`](https://pypi.org/project/dagster-cloud/) package contains utilities for capturing and submitting external metrics about data operations to Dagster Cloud via an API.

## How to enable Snowflake and dbt with Insights

If you use dbt to materialize tables in Snowflake, you can use these instructions to integrate Snowflake metrics into the Insights UI.

### Step 1 - Instrument your dbt asset definition

You need `dagster-cloud` version 1.5.1 or newer. Instrument the dagster `@dbt_assets` function with `dbt_with_snowflake_insights`.

This passes through all the underlying events and in addition emits an `AssetObservation` for each materialization. This observation contains the dbt invocation id and unique id that get recorded in the Dagster event log.

```python
from dagster_cloud.dagster_insights import dbt_with_snowflake_insights
@dbt_assets(...)
def my_asset(context: AssetExecutionContext):
# Typically you have a `yield from dbt_resource.cli(...)`.
# Wrap the original call with `dbt_with_snowflake_insights` as below.
dbt_cli_invocation = dbt_resource.cli(["build"], context=context)
yield from dbt_with_snowflake_insights(context, dbt_cli_invocation)
```

### Step 2 - Update your dbt_project.yml

Add the following to your `dbt_project.yml`:

```yaml
query-comment:
comment: "snowflake_dagster_dbt_v1_opaque_id[[[{{ node.unique_id }}:{{ invocation_id }}]]]"
append: true
```
This adds a comment to each query recorded in the `query_history` table in Snowflake. The comment contains the dbt unique id and invocation id. Here `append: true` is important since Snowflake strips leading comments.

### Step 3 - Create a metrics ingestion pipeline

Create a Dagster pipeline that joins asset observation events with the Snowflake query history and calls the Dagster Cloud ingestion API. This needs a Snowflake resource that can query `query_history`. You can use a pre-defined pipeline as below:

```python
from datetime import date
from dagster_snowflake import SnowflakeResource
from dagster import Definition, EnvVar
from dagster_cloud.dagster_insights import (
create_snowflake_insights_asset_and_schedule,
)
snowflake_insights_definitions = create_snowflake_insights_asset_and_schedule(
date(2023, 10, 5),
allow_partial_partitions=True,
dry_run=False,
snowflake_resource_key="snowflake_insights",
)
defs = Definitions(
assets=[..., *snowflake_insights_definitions.assets],
schedules=[..., snowflake_insights_deifnitions.schedule],
resources={
...,
"snowflake_insights": SnowflakeResource(
account=EnvVar("SNOWFLAKE_PURINA_ACCOUNT"),
user=EnvVar("SNOWFLAKE_PURINA_USER"),
password=EnvVar("SNOWFLAKE_PURINA_PASSWORD"),
),
}
)
```

The `snowflake_resource_key` is a SnowflakeResource that has access to the `query_history` table. Once the pipeline runs, Snowflake credits should be visible in the Insights tab:

<Image
alt="Snowflake credits in the Dagster UI"
src="/images/dagster-cloud/insights/insights-snowflake.png"
width={383}
height={349}
/>

## Limitations

Currently the following limitations are in effect for external metrics:

1. _Size_: Up to 2 million individual data points can be added per month.
2. _Retention_: External metrics not retained for 90 days only.
124 changes: 124 additions & 0 deletions docs/content/dagster-cloud/insights/metrics-export.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
---
title: Metrics Export
description: "Exporting metrics from Dagster Cloud Insights."

platform_type: "cloud"
---

# Exporting Metrics from Dagster Cloud Insights

Metrics avaialable in [Dagster Cloud Insights](/dagster-cloud/insights) can be exported using a GraphQL API.

## Example

This examples uses the [GraphQL Python Client](concepts/webserver/graphql-client) to export the Dagster Credits metric for all assets for the month of September 2023:

```python
from datetime import datetime
from dagster_graphql import DagsterGraphQLClient

ASSET_METRICS_QUERY = """
query AssetMetrics($metricName: String, $after: Float, $before: Float) {
reportingMetricsByAsset(
metricsSelector: {
metricName: $metricName
after: $after
before: $before
sortAggregationFunction: SUM
granularity: DAILY
}
) {
__typename
... on ReportingMetrics {
metrics {
values
entity {
... on ReportingAsset {
assetKey {
path
}
}
}
}
}
}
}
"""


def get_client():
url = "YOUR_ORG.dagster.cloud/prod" # Your deployment-scoped url
user_token = "YOUR_TOKEN" # A token generated from Cloud Settings -> Tokens
return DagsterGraphQLClient(url, headers={"Dagster-Cloud-Api-Token": user_token})


if __name__ == "__main__":
client = get_client()
result = client._execute(
ASSET_METRICS_QUERY,
{
"metricName": "__dagster_dagster_credits",
"after": datetime(2023, 9, 1).timestamp(),
"before": datetime(2023, 10, 1).timestamp(),
},
)

for asset_series in result["reportingMetricsByAsset"]["metrics"]:
print("Asset key:", asset_series["entity"]["assetKey"]["path"])
print("Daily values:", asset_series["values"])

```

## Reference

A condensed reference is provided below. For the full reference go to <https://YOUR_ORG.dagster.cloud/prod/graphql> and click on the Schema tab.

The top level GraphQL queries available are:

```graphql
reportingMetricsByJob(
metricsFilter: JobReportingMetricsFilter
metricsSelector: ReportingMetricsSelector!
): ReportingMetricsOrError!

reportingMetricsByAsset(
metricsFilter: AssetReportingMetricsFilter
metricsSelector: ReportingMetricsSelector!
): ReportingMetricsOrError!

reportingMetricsByAssetGroup(
metricsFilter: AssetGroupReportingMetricsFilter
metricsSelector: ReportingMetricsSelector!
): ReportingMetricsOrError!
```

The `metricsSelector` specifies the metric name and time granularity.

```graphql
input ReportingMetricsSelector {
after: Float # timestamp
before: Float # timestamp
metricName: String # see below for valid values
granularity: ReportingMetricsGranularity
}

enum ReportingMetricsGranularity {
DAILY
WEEKLY
MONTHLY
}

# The valid metric names are:
# "__dagster_dagster_credits"
# "__dagster_execution_time_ms"
# "__dagster_materializations"
# "__dagster_step_failures"
# "__dagster_step_retries"
# "__dagster_asset_check_errors"
# "__dagster_asset_check_warnings"
```

## Automation

We do not recommend doing frequent queries over large time windows that may download large amounts of data. After an initial data load, we recommend loading data daily for the most recent week or less. Note that the metrics data is currently computed once daily.

0 comments on commit e798a11

Please sign in to comment.