diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/__init__.py b/python_modules/libraries/dagster-sigma/dagster_sigma/__init__.py index eed90af5624b8..944e51de4e4c2 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/__init__.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/__init__.py @@ -1,5 +1,8 @@ from dagster._core.libraries import DagsterLibraryRegistry +from dagster_sigma.assets import ( + build_materialize_workbook_assets_definition as build_materialize_workbook_assets_definition, +) from dagster_sigma.resource import ( SigmaBaseUrl as SigmaBaseUrl, SigmaFilter as SigmaFilter, diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/assets.py b/python_modules/libraries/dagster-sigma/dagster_sigma/assets.py new file mode 100644 index 0000000000000..8f7ff644e5d6e --- /dev/null +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/assets.py @@ -0,0 +1,38 @@ +from typing import cast + +from dagster import AssetExecutionContext, AssetsDefinition, AssetSpec, multi_asset +from dagster._annotations import experimental + + +@experimental +def build_materialize_workbook_assets_definition( + resource_key: str, + spec: AssetSpec, +) -> AssetsDefinition: + """Returns an AssetsDefinition which will, when materialized, + run all materialization schedules for the targeted Sigma workbook. + Note that this will not update portions of a workbook which are not + assigned to a materialization schedule. + + For more information, see + https://help.sigmacomputing.com/docs/materialization#create-materializations-in-workbooks + + Args: + resource_key (str): The resource key to use for the Sigma resource. + spec (AssetSpec): The asset spec of the Sigma workbook. + + Returns: + AssetsDefinition: The AssetsDefinition which rebuilds a Sigma workbook. + """ + from dagster_sigma import SigmaOrganization + + @multi_asset( + name=f"sigma_materialize_{spec.key.to_python_identifier()}", + specs=[spec], + required_resource_keys={resource_key}, + ) + def asset_fn(context: AssetExecutionContext): + sigma = cast(SigmaOrganization, getattr(context.resources, resource_key)) + yield from sigma.run_materializations_for_workbook(spec) + + return asset_fn diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index 733be0d2f29fd..55d20e57968e9 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -1,6 +1,8 @@ import asyncio import contextlib +import enum import os +import time import urllib.parse import warnings from collections import defaultdict @@ -30,6 +32,7 @@ from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader +from dagster._core.definitions.events import AssetMaterialization from dagster._core.definitions.repository_definition.repository_definition import RepositoryLoadData from dagster._record import IHaveNew, record_custom from dagster._serdes.serdes import deserialize_value @@ -45,6 +48,7 @@ SigmaOrganizationData, SigmaTable, SigmaWorkbook, + SigmaWorkbookMetadataSet, _inode_from_url, ) @@ -53,6 +57,12 @@ logger = get_dagster_logger("dagster_sigma") +class SigmaMaterializationStatus(str, enum.Enum): + PENDING = "pending" + BUILDING = "building" + READY = "ready" + + @record_custom class SigmaFilter(IHaveNew): """Filters the set of Sigma objects to fetch. @@ -162,6 +172,30 @@ async def _fetch_json_async( response.raise_for_status() return await response.json() + def _fetch_json( + self, + endpoint: str, + method: str = "GET", + query_params: Optional[Dict[str, Any]] = None, + json: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + url = f"{self.base_url}/v2/{endpoint}" + if query_params: + url = f"{url}?{urllib.parse.urlencode(query_params)}" + + response = requests.request( + method=method, + url=url, + headers={ + "Accept": "application/json", + "Authorization": f"Bearer {self.api_token}", + **SIGMA_PARTNER_ID_TAG, + }, + json=json, + ) + response.raise_for_status() + return response.json() + async def _fetch_json_async_paginated_entries( self, endpoint: str, query_params: Optional[Dict[str, Any]] = None, limit: int = 1000 ) -> List[Dict[str, Any]]: @@ -255,6 +289,86 @@ def try_except_http_warn(self, should_catch: bool, msg: str) -> Iterator[None]: else: raise + def _begin_workbook_materialization(self, workbook_id: str, sheet_id: str) -> str: + output = self._fetch_json( + f"workbooks/{workbook_id}/materializations", + method="POST", + json={"sheetId": sheet_id}, + ) + return output["materializationId"] + + def _fetch_materialization_status( + self, workbook_id: str, materialization_id: str + ) -> Dict[str, Any]: + return self._fetch_json(f"workbooks/{workbook_id}/materializations/{materialization_id}") + + def _run_materializations_for_workbook( + self, workbook_id: str, sheet_ids: AbstractSet[str] + ) -> None: + materialization_id_to_sheet = dict( + zip( + [ + self._begin_workbook_materialization(workbook_id, sheet_id) + for sheet_id in sheet_ids + ], + sheet_ids, + ) + ) + remaining_materializations = set(materialization_id_to_sheet.keys()) + + successful_sheets = set() + failed_sheets = set() + + while remaining_materializations: + materialization_statuses = [ + self._fetch_materialization_status(workbook_id, materialization_id) + for materialization_id in remaining_materializations + ] + for status in materialization_statuses: + if status["status"] not in ( + SigmaMaterializationStatus.PENDING, + SigmaMaterializationStatus.BUILDING, + ): + remaining_materializations.remove(status["materializationId"]) + if status["status"] == SigmaMaterializationStatus.READY: + successful_sheets.add( + materialization_id_to_sheet[status["materializationId"]] + ) + else: + failed_sheets.add(materialization_id_to_sheet[status["materializationId"]]) + + time.sleep(5) + + if failed_sheets: + if successful_sheets: + raise Exception( + f"Materializations for sheets {', '.join(failed_sheets)} failed for workbook {workbook_id}" + f", materializations for sheets {', '.join(successful_sheets)} succeeded." + ) + else: + raise Exception( + f"Materializations for sheets {', '.join(failed_sheets)} failed for workbook {workbook_id}" + ) + + def run_materializations_for_workbook( + self, workbook_spec: AssetSpec + ) -> Iterator[AssetMaterialization]: + """Runs all scheduled materializations for a workbook. + + See https://help.sigmacomputing.com/docs/materialization#create-materializations-in-workbooks + for more information. + """ + metadata = SigmaWorkbookMetadataSet.extract(workbook_spec.metadata) + workbook_id = metadata.workbook_id + materialization_schedules = check.is_list( + check.not_none(metadata.materialization_schedules).value + ) + + materialization_sheets = {schedule["sheetId"] for schedule in materialization_schedules} + + self._run_materializations_for_workbook(workbook_id, materialization_sheets) + yield (AssetMaterialization(asset_key=workbook_spec.key)) + @cached_method async def _fetch_dataset_upstreams_by_inode( self, sigma_filter: SigmaFilter @@ -386,6 +500,14 @@ async def build_member_id_to_email_mapping(self) -> Mapping[str, str]: members = (await self._fetch_json_async("members", query_params={"limit": 500}))["entries"] return {member["memberId"]: member["email"] for member in members} + @cached_method + async def _fetch_materialization_schedules_for_workbook( + self, workbook_id: str + ) -> List[Dict[str, Any]]: + return await self._fetch_json_async_paginated_entries( + f"workbooks/{workbook_id}/materialization-schedules" + ) + async def load_workbook_data(self, raw_workbook_data: Dict[str, Any]) -> SigmaWorkbook: dataset_deps = set() direct_table_deps = set() @@ -430,12 +552,17 @@ async def safe_fetch_lineage_for_element( if item.get("type") == "table": direct_table_deps.add(item["nodeId"]) + materialization_schedules = await self._fetch_materialization_schedules_for_workbook( + raw_workbook_data["workbookId"] + ) + return SigmaWorkbook( properties=raw_workbook_data, datasets=dataset_deps, direct_table_deps=direct_table_deps, owner_email=None, lineage=lineages, + materialization_schedules=materialization_schedules, ) @cached_method diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py b/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py index a949b73f7b6fb..de54d74219115 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py @@ -3,7 +3,12 @@ from dagster import AssetKey, AssetSpec, MetadataValue, TableSchema from dagster._annotations import deprecated -from dagster._core.definitions.metadata.metadata_set import TableMetadataSet +from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet +from dagster._core.definitions.metadata.metadata_value import ( + JsonMetadataValue, + TimestampMetadataValue, + UrlMetadataValue, +) from dagster._core.definitions.metadata.table import TableColumn from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes @@ -26,6 +31,20 @@ def _inode_from_url(url: str) -> str: return f'inode-{url.split("/")[-1]}' +class SigmaWorkbookMetadataSet(NamespacedMetadataSet): + web_url: Optional[UrlMetadataValue] + version: Optional[int] + created_at: Optional[TimestampMetadataValue] + properties: Optional[JsonMetadataValue] + lineage: Optional[JsonMetadataValue] + materialization_schedules: Optional[JsonMetadataValue] = None + workbook_id: str + + @classmethod + def namespace(cls) -> str: + return "dagster_sigma" + + @whitelist_for_serdes @record class SigmaWorkbook: @@ -40,6 +59,7 @@ class SigmaWorkbook: datasets: AbstractSet[str] direct_table_deps: AbstractSet[str] owner_email: Optional[str] + materialization_schedules: Optional[List[Dict[str, Any]]] @whitelist_for_serdes @@ -110,13 +130,23 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec: """Get the AssetSpec for a Sigma object, such as a workbook or dataset.""" if isinstance(data, SigmaWorkbook): metadata = { - "dagster_sigma/web_url": MetadataValue.url(data.properties["url"]), - "dagster_sigma/version": data.properties["latestVersion"], - "dagster_sigma/created_at": MetadataValue.timestamp( - isoparse(data.properties["createdAt"]) + **SigmaWorkbookMetadataSet( + web_url=MetadataValue.url(data.properties["url"]), + version=data.properties["latestVersion"], + created_at=MetadataValue.timestamp(isoparse(data.properties["createdAt"])), + properties=MetadataValue.json(data.properties), + lineage=MetadataValue.json(data.lineage), + workbook_id=data.properties["workbookId"], + **( + { + "materialization_schedules": MetadataValue.json( + data.materialization_schedules + ) + } + if data.materialization_schedules + else {} + ), ), - "dagster_sigma/properties": MetadataValue.json(data.properties), - "dagster_sigma/lineage": MetadataValue.json(data.lineage), } datasets = [self._context.get_datasets_by_inode()[inode] for inode in data.datasets] tables = [ diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py index e2d862fe35527..29fa8aae9f778 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/conftest.py @@ -105,6 +105,29 @@ def lineage_warn_fixture(responses: aioresponses) -> None: ) +@pytest.fixture(name="sigma_materialization") +def sigma_materialization_fixture(responses: aioresponses) -> None: + # Trigger materialization, check status, check status again + request_responses.add( + method=request_responses.POST, + url=f"{SigmaBaseUrl.AWS_US.value}/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/materializations", + status=200, + body=json.dumps({"materializationId": "foobar"}), + ) + request_responses.add( + method=request_responses.GET, + url=f"{SigmaBaseUrl.AWS_US.value}/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/materializations/foobar", + status=200, + body=json.dumps({"materializationId": "foobar", "status": "pending"}), + ) + request_responses.add( + method=request_responses.GET, + url=f"{SigmaBaseUrl.AWS_US.value}/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/materializations/foobar", + status=200, + body=json.dumps({"materializationId": "foobar", "status": "ready"}), + ) + + @pytest.fixture(name="sigma_sample_data") def sigma_sample_data_fixture(responses: aioresponses) -> None: # Single workbook, dataset @@ -133,6 +156,12 @@ def sigma_sample_data_fixture(responses: aioresponses) -> None: body=json.dumps(_build_paginated_response([{"pageId": "qwMyyHBCuC", "name": "Page 1"}])), status=200, ) + responses.add( + method=hdrs.METH_GET, + url="https://aws-api.sigmacomputing.com/v2/workbooks/4ea60fe9-f487-43b0-aa7a-3ef43ca3a90e/materialization-schedules?limit=1000", + body=json.dumps(_build_paginated_response([{"sheetId": "qwMyyHBCuC"}])), + status=200, + ) elements = [ { "elementId": "_MuHPbskp0", diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/materialize_workbook.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/materialize_workbook.py new file mode 100644 index 0000000000000..c5735a936d0af --- /dev/null +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/materialize_workbook.py @@ -0,0 +1,38 @@ +from dagster import EnvVar, asset, define_asset_job +from dagster._core.definitions.definitions_class import Definitions +from dagster._utils.env import environ +from dagster_sigma import ( + SigmaBaseUrl, + SigmaOrganization, + build_materialize_workbook_assets_definition, + load_sigma_asset_specs, +) + +fake_client_id = "fake_client_id" +fake_client_secret = "fake_client_secret" + +with environ({"SIGMA_CLIENT_ID": fake_client_id, "SIGMA_CLIENT_SECRET": fake_client_secret}): + fake_token = "fake_token" + resource = SigmaOrganization( + base_url=SigmaBaseUrl.AWS_US, + client_id=EnvVar("SIGMA_CLIENT_ID"), + client_secret=EnvVar("SIGMA_CLIENT_SECRET"), + ) + + @asset + def my_materializable_asset(): + pass + + sigma_specs = load_sigma_asset_specs(resource) + sigma_assets = [ + build_materialize_workbook_assets_definition("sigma", spec) + if spec.metadata.get("dagster_sigma/materialization_schedules") + else spec + for spec in sigma_specs + ] + + defs = Definitions( + assets=[my_materializable_asset, *sigma_assets], + jobs=[define_asset_job("all_asset_job")], + resources={"sigma": resource}, + ) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py index 47e76cea6497f..c3cc5a383c885 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_asset_specs.py @@ -4,6 +4,8 @@ import responses from click.testing import CliRunner from dagster._core.code_pointer import CodePointer +from dagster._core.definitions import materialize +from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.reconstruct import ( initialize_repository_def_from_pointer, reconstruct_repository_def_from_pointer, @@ -79,6 +81,27 @@ def test_load_assets_organization_data(sigma_auth_token: str, sigma_sample_data: assert len(responses.calls) == calls +@responses.activate +def test_materialize_workbook( + sigma_auth_token: str, sigma_sample_data: None, sigma_materialization: None +) -> None: + with instance_for_test() as _instance: + # first, we resolve the repository to generate our cached metadata + repository_def = initialize_repository_def_from_pointer( + CodePointer.from_python_file( + str(Path(__file__).parent / "materialize_workbook.py"), "defs", None + ), + ) + + workbook_asset = repository_def.assets_defs_by_key[AssetKey(["Sample_Workbook"])] + assert workbook_asset.is_materializable + + # materialize the workbook + with environ({"SIGMA_CLIENT_ID": "fake", "SIGMA_CLIENT_SECRET": "fake"}): + result = materialize([workbook_asset], raise_on_error=False) + assert result.success + + @responses.activate def test_load_assets_organization_data_translator( sigma_auth_token: str, sigma_sample_data: None diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py index d847ce0074c9b..a32aa3977518e 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py @@ -28,6 +28,7 @@ def test_workbook_translation() -> None: owner_email="ben@dagsterlabs.com", direct_table_deps={SAMPLE_TABLE_INODE}, lineage=[], + materialization_schedules=None, ) sample_dataset = SigmaDataset(properties=SAMPLE_DATASET_DATA, columns=set(), inputs=set())