Skip to content

Commit

Permalink
Speed up equality checks for build_dbt_asset_selection asset selectio…
Browse files Browse the repository at this point in the history
…ns (#25434)

Current implementation does a full equality check on the full manifest.
Scope it down to just the "metadata" part of the manifest, which is much
smaller and faster to compare.

## How I Tested These Changes
New tests
Speed scope importing code like this and see a marked improvement:
```
dbt_job_6 = define_asset_job(
    name="dbt_6",
    selection=build_dbt_asset_selection(dbt_assets=[dbt_project_assets]),
)


@sensor(job=dbt_job_6)
def my_sensor():
    pass
```

## Changelog
[dagster-dbt] Performance improvements when loading definitions using
`build_dbt_asset_selection`.
  • Loading branch information
gibsondan authored Oct 22, 2024
1 parent 3a4f8da commit ebee8eb
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,15 @@ def to_serializable_asset_selection(self, asset_graph: BaseAssetGraph) -> "Asset
)
)

def __eq__(self, other):
if not isinstance(other, OperandListAssetSelection):
return False

num_operands = len(self.operands)
return len(other.operands) == num_operands and all(
self.operands[i] == other.operands[i] for i in range(num_operands)
)

def needs_parentheses_when_operand(self) -> bool:
return True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,25 @@ class DbtManifestAssetSelection(AssetSelection, arbitrary_types_allowed=True):
dagster_dbt_translator: DagsterDbtTranslator
exclude: str

def __eq__(self, other):
if not isinstance(other, DbtManifestAssetSelection):
return False

self_metadata = self.manifest.get("metadata")
other_metadata = other.manifest.get("metadata")

if not self_metadata or not other_metadata:
return super().__eq__(other)

# Compare metadata only since it uniquely identifies the manifest and the
# full manifest dictionary can be large
return (
self_metadata == other_metadata
and self.select == other.select
and self.dagster_dbt_translator == other.dagster_dbt_translator
and self.exclude == other.exclude
)

@classmethod
def build(
cls,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import copy
import os
from pathlib import Path
from typing import Any, Dict, Optional, Set
from typing import Any, Dict, Optional, Set, cast
from unittest import mock

import pytest
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_selection import AndAssetSelection
from dagster._core.definitions.events import AssetKey
from dagster_dbt import build_dbt_asset_selection
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.dbt_manifest_asset_selection import DbtManifestAssetSelection


@pytest.mark.parametrize(
Expand Down Expand Up @@ -240,3 +244,54 @@ def my_dbt_assets(): ...
selected_asset_keys = asset_selection.resolve(all_assets=asset_graph)

assert selected_asset_keys == expected_asset_keys


def test_dbt_asset_selection_equality(
test_jaffle_shop_manifest_path: Path, test_jaffle_shop_manifest: Dict[str, Any]
) -> None:
for manifest_param in [
test_jaffle_shop_manifest,
test_jaffle_shop_manifest_path,
os.fspath(test_jaffle_shop_manifest_path),
]:

@dbt_assets(manifest=manifest_param)
def my_dbt_assets(): ...

asset_selection = build_dbt_asset_selection([my_dbt_assets], dbt_select="fqn:*")

assert asset_selection == asset_selection

assert asset_selection != build_dbt_asset_selection(
[my_dbt_assets], dbt_select="new_select"
)

dbt_manifest_asset_selection = cast(AndAssetSelection, asset_selection).operands[0]

assert isinstance(dbt_manifest_asset_selection, DbtManifestAssetSelection)

altered_manifest = copy.deepcopy(dbt_manifest_asset_selection.manifest)
altered_manifest["metadata"]["project_id"] = 12345

assert dbt_manifest_asset_selection != dbt_manifest_asset_selection.model_copy(
update={"manifest": altered_manifest}
)

assert dbt_manifest_asset_selection != dbt_manifest_asset_selection.model_copy(
update={"select": "other_select"}
)

assert dbt_manifest_asset_selection != dbt_manifest_asset_selection.model_copy(
update={"dagster_dbt_translator": mock.MagicMock()}
)

assert dbt_manifest_asset_selection != dbt_manifest_asset_selection.model_copy(
update={"exclude": "other_exclude"}
)

# changing non-metadata fields does not affect equality
altered_nodes_manifest = dict(copy.deepcopy(dbt_manifest_asset_selection.manifest))
altered_nodes_manifest["nodes"] = []
assert dbt_manifest_asset_selection == dbt_manifest_asset_selection.model_copy(
update={"manifest": altered_nodes_manifest}
)

0 comments on commit ebee8eb

Please sign in to comment.