Skip to content

Commit

Permalink
Merge branch 'dagster-io:master' into deltalake
Browse files Browse the repository at this point in the history
  • Loading branch information
PedramNavid authored Nov 1, 2023
2 parents 377576b + be07285 commit bd31b68
Show file tree
Hide file tree
Showing 140 changed files with 17,452 additions and 11,240 deletions.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
"title": "Project files",
"path": "/getting-started/project-file-reference"
},
{
"title": "Getting help",
"path": "/getting-started/getting-help"
},
{
"title": "Telemetry",
"path": "/getting-started/telemetry"
Expand Down
53 changes: 53 additions & 0 deletions docs/content/concepts/assets/asset-checks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,59 @@ defs = Definitions(

There are a variety of types supported via the <PyObject object="MetadataValue" /> class. You can view the metadata on the **Checks** tab of the **Asset details** page.

### Asset check factories

If you want to define many checks that are similar, you can use the factory pattern. Here's an example factory that accepts a list of sql statements and turns them in to asset checks.

```python file=/concepts/assets/asset_checks/factory.py
from typing import Any, Mapping, Sequence

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
...


@asset
def items():
...


def make_checks(check_blobs: Sequence[Mapping[str, str]]):
checks = []
for check_blob in check_blobs:

@asset_check(name=check_blob["name"], asset=check_blob["asset"])
def _check(context):
db_connection = ...
rows = db_connection.execute(check_blob["sql"])
return AssetCheckResult(
passed=len(rows) == 0, metadata={"num_rows": len(rows)}
)

checks.append(_check)

return checks


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"sql": "select * from orders where order_id is null",
},
{
"name": "items_id_has_no_nulls",
"asset": "items",
"sql": "select * from items where item_id is null",
},
]

defs = Definitions(assets=[orders, items], asset_checks=make_checks(check_blobs))
```

---

## Executing checks
Expand Down
80 changes: 80 additions & 0 deletions docs/content/getting-started/getting-help.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
---
title: Getting Help | Dagster
description: Have questions about how to use Dagster? Hit a bug? Have a feature request? This page includes tips on what to do.
---

# Getting Help

Have questions about how to use Dagster? Hit a bug? Have a feature request? The Dagster Labs team and the Dagster community make a best-effort attempt to respond, on a few different platforms.

---

## Searching for answers

Search engines like Google generally do a good job at surfacing relevant Dagster docs, Github Discussions and Github Issues, so we recommend starting there.

Unfortunately, ChatGPT is not currently a great resource for answering questions about Dagster, because the corpus it was trained on doesn’t include Dagster’s latest APIs.

---

## Bugs and feature requests

We use [Github Issues](https://github.com/dagster-io/dagster/issues) to track all bugs and feature requests from the community, and we welcome issues submitted by users.

If you find a Github issue that seems like it’s related to the issue you’re experiencing, but you're not sure, don’t be shy about posting on that Github issue to ask. We can redirect it to a different issue if necessary.

### Tips for filing issues

- For bugs, include the minimum-sized working code snippet that reproduces your issue.
- If your Github issue includes a code snippet, you can add syntax highlighting by specifying the language at the top of the block:

```python
from dagster import ...
...
```

---

## Questions

If you're trying to find out the best way to use Dagster for your use case, there are a couple places to get help.

### Github Discussions: the preferred place to ask questions

[Github Discussions](https://github.com/dagster-io/dagster/discussions) is the main Q & A site for Dagster. For questions whose answers might be useful to others (which is most questions), Github Discussions is the best place to ask them, because they show up when others Google the same questions.

**Tip:** if your Github discussion includes a code snippet, add syntax highlighting by specifying the language at the top of the block:

```python
from dagster import ...

...
```

### Dagster Slack: the real-time gathering place for the community

We strongly encourage you to join [Dagster's Slack](https://dagster.io/slack), as it's the main real-time gathering place for the community. However, if you want a question or issue to be seen by the Dagster team, the best place to post it is in Github.

Check out the #faq-read-me-before-posting channel for more info.

### Asking digestible questions

The less time and effort it takes for someone to digest your question, the more likely it will get answered. The easiest questions to answer include enough information to be clear and specific, but don't require the reader to understand details that aren't relevant.

Below is an example of a well-phrased question. It's brief enough to be quickly digestible, but specific enough to be complete:

> Is it possible to set up Dagster to automatically update an entire downstream table every time that a specific partition of an upstream table is updated? We have a table that's partitioned by customer, and we have a specific analysis that we want to do on a specific customer.
Below is another example of a well-phrased question. It's the same as the question above, but also includes background context. As long as the background context doesn't obscure the core question, it's really helpful to include, because it allows the reader to understand the question more deeply and make broader suggestions:

> Is it possible to set up Dagster to automatically update an entire downstream table every time that a specific partition of an upstream table is updated? We have a table that's partitioned by customer, and we have a specific analysis that we want to do on a specific customer.
> The background here is that that we've set up Fivetran to pulls in our per-customer Salesforce tables into one big Snowflake table. We have about 500 customers, and they get pulled in usually daily, but it depends on the customer. For about 20 of these customers, we have custom logic, implemented in SQL with dbt, that performs analyses that are specific to those customers. We want those analyses to run automatically when customer data is updated, so that our sales representatives can discover red flags when they check their Looker dashboards.
Below is an example of a question that doesn't provide enough detail for the reader to answer it:

> How do I use tables with specific upstream partitions?
Below is an example a question that's difficult to digest. It requires the reader to understand an entire data pipeline to be able to answer the specific question:

> We've set up Fivetran to pulls in our per-customer Salesforce tables into one big Snowflake table. We have about 500 customers, and they get pulled in usually daily, but it depends on the customer. For about 20 of these customers, we have custom logic, implemented in SQL with dbt, that performs analyses that are specific to those customers. We want those analyses to run automatically when customer data is updated, so that our sales representatives can discover red flags when they check their Looker dashboards. How can I get this working with Dagster?
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Any, Mapping, Sequence

from dagster import AssetCheckResult, Definitions, asset, asset_check


@asset
def orders():
...


@asset
def items():
...


def make_checks(check_blobs: Sequence[Mapping[str, str]]):
checks = []
for check_blob in check_blobs:

@asset_check(name=check_blob["name"], asset=check_blob["asset"])
def _check(context):
db_connection = ...
rows = db_connection.execute(check_blob["sql"])
return AssetCheckResult(
passed=len(rows) == 0, metadata={"num_rows": len(rows)}
)

checks.append(_check)

return checks


check_blobs = [
{
"name": "orders_id_has_no_nulls",
"asset": "orders",
"sql": "select * from orders where order_id is null",
},
{
"name": "items_id_has_no_nulls",
"asset": "items",
"sql": "select * from items where item_id is null",
},
]

defs = Definitions(assets=[orders, items], asset_checks=make_checks(check_blobs))
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
import pytest

from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.events import AssetKey
from docs_snippets.concepts.assets.asset_checks.asset_with_check import (
defs as asset_with_check_defs,
)
from docs_snippets.concepts.assets.asset_checks.factory import check_blobs, make_checks
from docs_snippets.concepts.assets.asset_checks.metadata import defs as metadata_defs
from docs_snippets.concepts.assets.asset_checks.orders_check import defs as orders_defs
from docs_snippets.concepts.assets.asset_checks.severity import defs as severity_defs


@pytest.mark.parametrize(
"defs", [orders_defs, asset_with_check_defs, severity_defs, metadata_defs]
"defs",
[orders_defs, asset_with_check_defs, severity_defs, metadata_defs],
)
def test_execute(defs):
job_def = defs.get_implicit_global_asset_job_def()
result = job_def.execute_in_process()
assert result.success


def test_factory():
assert [c.spec.key for c in make_checks(check_blobs)] == [
AssetCheckKey(
AssetKey(["orders"]),
"orders_id_has_no_nulls",
),
AssetCheckKey(
AssetKey(["items"]),
"items_id_has_no_nulls",
),
]
2 changes: 1 addition & 1 deletion helm/dagster/templates/NOTES.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{- $_ := include "dagster.backcompat" . | mustFromJson -}}
Launched. You can access the Dagster webserver/UI by running the following commands:

export DAGSTER_WEBSERVER_POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "dagster.name" . }},app.kubernetes.io/instance={{ .Release.Name }},component=webserver" -o jsonpath="{.items[0].metadata.name}")
export DAGSTER_WEBSERVER_POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "dagster.name" . }},app.kubernetes.io/instance={{ .Release.Name }},component={{ include "dagster.webserver.componentName" . }}" -o jsonpath="{.items[0].metadata.name}")
echo "Visit http://127.0.0.1:8080 to open the Dagster UI"
kubectl --namespace {{ .Release.Namespace }} port-forward $DAGSTER_WEBSERVER_POD_NAME 8080:{{ $_.Values.dagsterWebserver.service.port }}

Expand Down
2 changes: 1 addition & 1 deletion helm/dagster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,7 @@ dagsterDaemon:
tag: ~
pullPolicy: Always

heartbeatTolerance: 300
heartbeatTolerance: 1800

runCoordinator:
# Whether or not to enable the run queue (or some other custom run coordinator). See
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import time

import pendulum
import pytest
from dagster import DagsterInvariantViolationError
from dagster._core.test_utils import instance_for_test
from dagster._core.workspace.load_target import EmptyWorkspaceTarget
Expand Down Expand Up @@ -148,7 +147,7 @@ def run_loop_error(_, _ctx, _shutdown_event):
time.sleep(0.5)


def test_transient_heartbeat_failure(mocker):
def test_transient_heartbeat_failure(mocker, caplog):
with instance_for_test() as instance:
mocker.patch(
"dagster.daemon.controller.get_daemon_statuses",
Expand All @@ -168,11 +167,19 @@ def test_transient_heartbeat_failure(mocker):

time.sleep(2 * heartbeat_tolerance_seconds)

with pytest.raises(
Exception,
match="Stopped dagster-daemon process due to thread heartbeat failure",
):
controller.check_daemon_heartbeats()
assert not any(
"The following threads have not sent heartbeats in more than 5 seconds"
in str(record)
for record in caplog.records
)

controller.check_daemon_heartbeats()

assert any(
"The following threads have not sent heartbeats in more than 5 seconds"
in str(record)
for record in caplog.records
)


def test_error_daemon(monkeypatch):
Expand Down
4 changes: 3 additions & 1 deletion js_modules/dagster-ui/packages/app-oss/src/pages/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ if (process.env.NODE_ENV === 'development' && typeof window !== 'undefined') {
}
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
return originalError(...args);
const err = originalError(...args);
Object.setPrototypeOf(err, window.Error.prototype);
return err;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,5 @@ export const calculateMiddleTruncation = (
end = mid - 1;
}
}

return `${text.slice(0, end)}${text.slice(-end)}`;
};
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,31 @@ async function _batchedQueryAssets(
doNextFetch(pollRate);
} catch (e) {
console.error(e);
// Retry fetching in 5 seconds if theres a network error
setTimeout(doNextFetch, 5000);

if ((e as any)?.message?.includes('500')) {
// Mark these assets as fetched so that we don't retry them until after the poll interval rather than retrying them immediately.
// This is preferable because if the assets failed to fetch it's likely due to a timeout due to the query being too expensive and retrying it
// will not make it more likely to succeed and it would add more load to the database.
const fetchedTime = Date.now();
assetKeys.forEach((key) => {
lastFetchedOrRequested[tokenForAssetKey(key)] = {
fetched: fetchedTime,
};
});
} else {
// If it's not a timeout from the backend then lets keep retrying instead of moving on.
assetKeys.forEach((key) => {
delete lastFetchedOrRequested[tokenForAssetKey(key)];
});
}

setTimeout(
() => {
doNextFetch(pollRate);
},
// If the poll rate is faster than 5 seconds lets use that instead
Math.min(pollRate, 5000),
);
}
}

Expand Down
Loading

0 comments on commit bd31b68

Please sign in to comment.