Skip to content

Commit

Permalink
[dagster-sigma] Implement materializing workbooks (#25433)
Browse files Browse the repository at this point in the history
## Summary

Introduces asset definitions builder to allow materializing Sigma
workbooks. Runs each scheduled materialization for the provided
workbook.


## How I Tested These Changes

Unit test, tested against Sigma instance.
  • Loading branch information
benpankow authored Dec 13, 2024
1 parent 813660f commit 72c7f5e
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from dagster._core.libraries import DagsterLibraryRegistry

from dagster_sigma.assets import (
build_materialize_workbook_assets_definition as build_materialize_workbook_assets_definition,
)
from dagster_sigma.resource import (
SigmaBaseUrl as SigmaBaseUrl,
SigmaFilter as SigmaFilter,
Expand Down
38 changes: 38 additions & 0 deletions python_modules/libraries/dagster-sigma/dagster_sigma/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import cast

from dagster import AssetExecutionContext, AssetsDefinition, AssetSpec, multi_asset
from dagster._annotations import experimental


@experimental
def build_materialize_workbook_assets_definition(
resource_key: str,
spec: AssetSpec,
) -> AssetsDefinition:
"""Returns an AssetsDefinition which will, when materialized,
run all materialization schedules for the targeted Sigma workbook.
Note that this will not update portions of a workbook which are not
assigned to a materialization schedule.
For more information, see
https://help.sigmacomputing.com/docs/materialization#create-materializations-in-workbooks
Args:
resource_key (str): The resource key to use for the Sigma resource.
spec (AssetSpec): The asset spec of the Sigma workbook.
Returns:
AssetsDefinition: The AssetsDefinition which rebuilds a Sigma workbook.
"""
from dagster_sigma import SigmaOrganization

@multi_asset(
name=f"sigma_materialize_{spec.key.to_python_identifier()}",
specs=[spec],
required_resource_keys={resource_key},
)
def asset_fn(context: AssetExecutionContext):
sigma = cast(SigmaOrganization, getattr(context.resources, resource_key))
yield from sigma.run_materializations_for_workbook(spec)

return asset_fn
127 changes: 127 additions & 0 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import contextlib
import enum
import os
import time
import urllib.parse
import warnings
from collections import defaultdict
Expand Down Expand Up @@ -30,6 +32,7 @@
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.events import AssetMaterialization
from dagster._core.definitions.repository_definition.repository_definition import RepositoryLoadData
from dagster._record import IHaveNew, record_custom
from dagster._serdes.serdes import deserialize_value
Expand All @@ -45,6 +48,7 @@
SigmaOrganizationData,
SigmaTable,
SigmaWorkbook,
SigmaWorkbookMetadataSet,
_inode_from_url,
)

Expand All @@ -53,6 +57,12 @@
logger = get_dagster_logger("dagster_sigma")


class SigmaMaterializationStatus(str, enum.Enum):
PENDING = "pending"
BUILDING = "building"
READY = "ready"


@record_custom
class SigmaFilter(IHaveNew):
"""Filters the set of Sigma objects to fetch.
Expand Down Expand Up @@ -162,6 +172,30 @@ async def _fetch_json_async(
response.raise_for_status()
return await response.json()

def _fetch_json(
self,
endpoint: str,
method: str = "GET",
query_params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
url = f"{self.base_url}/v2/{endpoint}"
if query_params:
url = f"{url}?{urllib.parse.urlencode(query_params)}"

response = requests.request(
method=method,
url=url,
headers={
"Accept": "application/json",
"Authorization": f"Bearer {self.api_token}",
**SIGMA_PARTNER_ID_TAG,
},
json=json,
)
response.raise_for_status()
return response.json()

async def _fetch_json_async_paginated_entries(
self, endpoint: str, query_params: Optional[Dict[str, Any]] = None, limit: int = 1000
) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -255,6 +289,86 @@ def try_except_http_warn(self, should_catch: bool, msg: str) -> Iterator[None]:
else:
raise

def _begin_workbook_materialization(self, workbook_id: str, sheet_id: str) -> str:
output = self._fetch_json(
f"workbooks/{workbook_id}/materializations",
method="POST",
json={"sheetId": sheet_id},
)
return output["materializationId"]

def _fetch_materialization_status(
self, workbook_id: str, materialization_id: str
) -> Dict[str, Any]:
return self._fetch_json(f"workbooks/{workbook_id}/materializations/{materialization_id}")

def _run_materializations_for_workbook(
self, workbook_id: str, sheet_ids: AbstractSet[str]
) -> None:
materialization_id_to_sheet = dict(
zip(
[
self._begin_workbook_materialization(workbook_id, sheet_id)
for sheet_id in sheet_ids
],
sheet_ids,
)
)
remaining_materializations = set(materialization_id_to_sheet.keys())

successful_sheets = set()
failed_sheets = set()

while remaining_materializations:
materialization_statuses = [
self._fetch_materialization_status(workbook_id, materialization_id)
for materialization_id in remaining_materializations
]
for status in materialization_statuses:
if status["status"] not in (
SigmaMaterializationStatus.PENDING,
SigmaMaterializationStatus.BUILDING,
):
remaining_materializations.remove(status["materializationId"])
if status["status"] == SigmaMaterializationStatus.READY:
successful_sheets.add(
materialization_id_to_sheet[status["materializationId"]]
)
else:
failed_sheets.add(materialization_id_to_sheet[status["materializationId"]])

time.sleep(5)

if failed_sheets:
if successful_sheets:
raise Exception(
f"Materializations for sheets {', '.join(failed_sheets)} failed for workbook {workbook_id}"
f", materializations for sheets {', '.join(successful_sheets)} succeeded."
)
else:
raise Exception(
f"Materializations for sheets {', '.join(failed_sheets)} failed for workbook {workbook_id}"
)

def run_materializations_for_workbook(
self, workbook_spec: AssetSpec
) -> Iterator[AssetMaterialization]:
"""Runs all scheduled materializations for a workbook.
See https://help.sigmacomputing.com/docs/materialization#create-materializations-in-workbooks
for more information.
"""
metadata = SigmaWorkbookMetadataSet.extract(workbook_spec.metadata)
workbook_id = metadata.workbook_id
materialization_schedules = check.is_list(
check.not_none(metadata.materialization_schedules).value
)

materialization_sheets = {schedule["sheetId"] for schedule in materialization_schedules}

self._run_materializations_for_workbook(workbook_id, materialization_sheets)
yield (AssetMaterialization(asset_key=workbook_spec.key))

@cached_method
async def _fetch_dataset_upstreams_by_inode(
self, sigma_filter: SigmaFilter
Expand Down Expand Up @@ -386,6 +500,14 @@ async def build_member_id_to_email_mapping(self) -> Mapping[str, str]:
members = (await self._fetch_json_async("members", query_params={"limit": 500}))["entries"]
return {member["memberId"]: member["email"] for member in members}

@cached_method
async def _fetch_materialization_schedules_for_workbook(
self, workbook_id: str
) -> List[Dict[str, Any]]:
return await self._fetch_json_async_paginated_entries(
f"workbooks/{workbook_id}/materialization-schedules"
)

async def load_workbook_data(self, raw_workbook_data: Dict[str, Any]) -> SigmaWorkbook:
dataset_deps = set()
direct_table_deps = set()
Expand Down Expand Up @@ -430,12 +552,17 @@ async def safe_fetch_lineage_for_element(
if item.get("type") == "table":
direct_table_deps.add(item["nodeId"])

materialization_schedules = await self._fetch_materialization_schedules_for_workbook(
raw_workbook_data["workbookId"]
)

return SigmaWorkbook(
properties=raw_workbook_data,
datasets=dataset_deps,
direct_table_deps=direct_table_deps,
owner_email=None,
lineage=lineages,
materialization_schedules=materialization_schedules,
)

@cached_method
Expand Down
44 changes: 37 additions & 7 deletions python_modules/libraries/dagster-sigma/dagster_sigma/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@

from dagster import AssetKey, AssetSpec, MetadataValue, TableSchema
from dagster._annotations import deprecated
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet
from dagster._core.definitions.metadata.metadata_value import (
JsonMetadataValue,
TimestampMetadataValue,
UrlMetadataValue,
)
from dagster._core.definitions.metadata.table import TableColumn
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
Expand All @@ -26,6 +31,20 @@ def _inode_from_url(url: str) -> str:
return f'inode-{url.split("/")[-1]}'


class SigmaWorkbookMetadataSet(NamespacedMetadataSet):
web_url: Optional[UrlMetadataValue]
version: Optional[int]
created_at: Optional[TimestampMetadataValue]
properties: Optional[JsonMetadataValue]
lineage: Optional[JsonMetadataValue]
materialization_schedules: Optional[JsonMetadataValue] = None
workbook_id: str

@classmethod
def namespace(cls) -> str:
return "dagster_sigma"


@whitelist_for_serdes
@record
class SigmaWorkbook:
Expand All @@ -40,6 +59,7 @@ class SigmaWorkbook:
datasets: AbstractSet[str]
direct_table_deps: AbstractSet[str]
owner_email: Optional[str]
materialization_schedules: Optional[List[Dict[str, Any]]]


@whitelist_for_serdes
Expand Down Expand Up @@ -110,13 +130,23 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
"""Get the AssetSpec for a Sigma object, such as a workbook or dataset."""
if isinstance(data, SigmaWorkbook):
metadata = {
"dagster_sigma/web_url": MetadataValue.url(data.properties["url"]),
"dagster_sigma/version": data.properties["latestVersion"],
"dagster_sigma/created_at": MetadataValue.timestamp(
isoparse(data.properties["createdAt"])
**SigmaWorkbookMetadataSet(
web_url=MetadataValue.url(data.properties["url"]),
version=data.properties["latestVersion"],
created_at=MetadataValue.timestamp(isoparse(data.properties["createdAt"])),
properties=MetadataValue.json(data.properties),
lineage=MetadataValue.json(data.lineage),
workbook_id=data.properties["workbookId"],
**(
{
"materialization_schedules": MetadataValue.json(
data.materialization_schedules
)
}
if data.materialization_schedules
else {}
),
),
"dagster_sigma/properties": MetadataValue.json(data.properties),
"dagster_sigma/lineage": MetadataValue.json(data.lineage),
}
datasets = [self._context.get_datasets_by_inode()[inode] for inode in data.datasets]
tables = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,29 @@ def lineage_warn_fixture(responses: aioresponses) -> None:
)


@pytest.fixture(name="sigma_materialization")
def sigma_materialization_fixture(responses: aioresponses) -> None:
# Trigger materialization, check status, check status again
request_responses.add(
method=request_responses.POST,
url=f"{SigmaBaseUrl.AWS_US.value}/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/materializations",
status=200,
body=json.dumps({"materializationId": "foobar"}),
)
request_responses.add(
method=request_responses.GET,
url=f"{SigmaBaseUrl.AWS_US.value}/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/materializations/foobar",
status=200,
body=json.dumps({"materializationId": "foobar", "status": "pending"}),
)
request_responses.add(
method=request_responses.GET,
url=f"{SigmaBaseUrl.AWS_US.value}/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/materializations/foobar",
status=200,
body=json.dumps({"materializationId": "foobar", "status": "ready"}),
)


@pytest.fixture(name="sigma_sample_data")
def sigma_sample_data_fixture(responses: aioresponses) -> None:
# Single workbook, dataset
Expand Down Expand Up @@ -133,6 +156,12 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None:
body=json.dumps(_build_paginated_response([{"pageId": "qwMyyHBCuC", "name": "Page 1"}])),
status=200,
)
responses.add(
method=hdrs.METH_GET,
url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/materialization-schedules?limit=1000",
body=json.dumps(_build_paginated_response([{"sheetId": "qwMyyHBCuC"}])),
status=200,
)
elements = [
{
"elementId": "_MuHPbskp0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from dagster import EnvVar, asset, define_asset_job
from dagster._core.definitions.definitions_class import Definitions
from dagster._utils.env import environ
from dagster_sigma import (
SigmaBaseUrl,
SigmaOrganization,
build_materialize_workbook_assets_definition,
load_sigma_asset_specs,
)

fake_client_id = "fake_client_id"
fake_client_secret = "fake_client_secret"

with environ({"SIGMA_CLIENT_ID": fake_client_id, "SIGMA_CLIENT_SECRET": fake_client_secret}):
fake_token = "fake_token"
resource = SigmaOrganization(
base_url=SigmaBaseUrl.AWS_US,
client_id=EnvVar("SIGMA_CLIENT_ID"),
client_secret=EnvVar("SIGMA_CLIENT_SECRET"),
)

@asset
def my_materializable_asset():
pass

sigma_specs = load_sigma_asset_specs(resource)
sigma_assets = [
build_materialize_workbook_assets_definition("sigma", spec)
if spec.metadata.get("dagster_sigma/materialization_schedules")
else spec
for spec in sigma_specs
]

defs = Definitions(
assets=[my_materializable_asset, *sigma_assets],
jobs=[define_asset_job("all_asset_job")],
resources={"sigma": resource},
)
Loading

0 comments on commit 72c7f5e

Please sign in to comment.