From 538a2851ac31f63f67ab5bdc383857b64f7d3377 Mon Sep 17 00:00:00 2001 From: Ben Pankow Date: Wed, 15 May 2024 14:14:00 -0700 Subject: [PATCH] [4/n] add python api for replacing local file references with source control links (#21675) ## Summary Adds the `link_to_source_control` utility fn which converts all local source code reference metadata in the passed assets to references to source control. These local paths are mapped to the path in source control using a user-passed local git root. The path from the git root to the file locally is used as the filepath within source control. For example: ```python @asset def my_asset() -> pd.DataFrame: ... @asset def my_other_asset() -> pd.DataFrame: ... defs = Definitions( assets=link_to_source_control( with_source_code_references([my_asset, my_other_asset]), source_control_url="https://github.com/dagster-io/dagster", source_control_branch="master", repository_root_absolute_path=file_relative_path(__file__, "../../"), ) ) ``` A stacked PR will introduce a utility method that will wrap both `link_to_source_control` and `with_source_code_references` and will decide whether to link to source control based on whether the definitions are being loaded in a cloud context. ## Test Plan Unit tests. --- .../dagster_graphql/implementation/events.py | 2 + python_modules/dagster/dagster/__init__.py | 1 - .../_core/definitions/metadata/__init__.py | 2 + .../_core/definitions/metadata/source_code.py | 159 +++++++++++++++--- .../asset_package/__init__.py | 2 +- .../test_asset_defs_source_metadata.py | 100 ++++++++--- 6 files changed, 221 insertions(+), 45 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/events.py b/python_modules/dagster-graphql/dagster_graphql/implementation/events.py index baef6c8e788a5..f30c5aab7b9df 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/events.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/events.py @@ -28,6 +28,7 @@ MetadataValue, TableColumnLineageMetadataValue, ) +from dagster._core.definitions.metadata.source_code import LocalFileCodeReference from dagster._core.events import ( DagsterEventType, HandledOutputData, @@ -165,6 +166,7 @@ def iterate_metadata_entries(metadata: Mapping[str, MetadataValue]) -> Iterator[ label=reference.label, ) for reference in value.code_references + if isinstance(reference, LocalFileCodeReference) ], ) elif isinstance(value, TableMetadataValue): diff --git a/python_modules/dagster/dagster/__init__.py b/python_modules/dagster/dagster/__init__.py index a92508ced6792..815419683e08d 100644 --- a/python_modules/dagster/dagster/__init__.py +++ b/python_modules/dagster/dagster/__init__.py @@ -285,7 +285,6 @@ TextMetadataValue as TextMetadataValue, TimestampMetadataValue as TimestampMetadataValue, UrlMetadataValue as UrlMetadataValue, - with_source_code_references as with_source_code_references, ) from dagster._core.definitions.metadata.table import ( TableColumn as TableColumn, diff --git a/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py b/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py index f90840859142b..dbbbe4e69f01e 100644 --- a/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py @@ -62,6 +62,8 @@ CodeReferencesMetadataSet as CodeReferencesMetadataSet, CodeReferencesMetadataValue as CodeReferencesMetadataValue, LocalFileCodeReference as LocalFileCodeReference, + UrlCodeReference as UrlCodeReference, + link_to_source_control as link_to_source_control, with_source_code_references as with_source_code_references, ) from .table import ( # re-exported diff --git a/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py b/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py index 41118b4284ee9..b9a2cba4547fa 100644 --- a/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py +++ b/python_modules/dagster/dagster/_core/definitions/metadata/source_code.py @@ -1,5 +1,6 @@ import inspect import os +from pathlib import Path from typing import ( TYPE_CHECKING, Any, @@ -10,8 +11,6 @@ Union, ) -import pydantic - import dagster._check as check from dagster._annotations import experimental from dagster._model import DagsterModel @@ -40,6 +39,17 @@ class LocalFileCodeReference(DagsterModel): label: Optional[str] = None +@experimental +@whitelist_for_serdes +class UrlCodeReference(DagsterModel): + """Represents a source location which points at a URL, for example + in source control. + """ + + url: str + label: Optional[str] = None + + @experimental @whitelist_for_serdes class CodeReferencesMetadataValue(DagsterModel, MetadataValue["CodeReferencesMetadataValue"]): @@ -48,19 +58,19 @@ class CodeReferencesMetadataValue(DagsterModel, MetadataValue["CodeReferencesMet asset is defined. Attributes: - sources (List[LocalFileCodeReference]): + sources (List[Union[LocalFileCodeReference, SourceControlCodeReference]]): A list of code references for the asset, such as file locations or references to source control. """ - code_references: List[LocalFileCodeReference] + code_references: List[Union[LocalFileCodeReference, UrlCodeReference]] @property def value(self) -> "CodeReferencesMetadataValue": return self -def source_path_from_fn(fn: Callable[..., Any]) -> Optional[LocalFileCodeReference]: +def local_source_path_from_fn(fn: Callable[..., Any]) -> Optional[LocalFileCodeReference]: cwd = os.getcwd() origin_file = os.path.abspath(os.path.join(cwd, inspect.getsourcefile(fn))) # type: ignore @@ -75,7 +85,7 @@ class CodeReferencesMetadataSet(NamespacedMetadataSet): source code for the asset can be found. """ - code_references: CodeReferencesMetadataValue + code_references: Optional[CodeReferencesMetadataValue] = None @classmethod def namespace(cls) -> str: @@ -101,24 +111,25 @@ def _with_code_source_single_definition( if isinstance(assets_def.op.compute_fn, DecoratedOpFunction) else assets_def.op.compute_fn ) - source_path = source_path_from_fn(base_fn) + source_path = local_source_path_from_fn(base_fn) if source_path: sources = [source_path] for key in assets_def.keys: - # defer to any existing metadata - sources_for_asset = [*sources] - try: - existing_source_code_metadata = CodeReferencesMetadataSet.extract( - metadata_by_key.get(key, {}) - ) - sources_for_asset = [ - *existing_source_code_metadata.code_references.code_references, - *sources, - ] - except pydantic.ValidationError: - pass + # merge with any existing metadata + existing_source_code_metadata = CodeReferencesMetadataSet.extract( + metadata_by_key.get(key, {}) + ) + existing_code_references = ( + existing_source_code_metadata.code_references.code_references + if existing_source_code_metadata.code_references + else [] + ) + sources_for_asset: List[Union[LocalFileCodeReference, UrlCodeReference]] = [ + *existing_code_references, + *sources, + ] metadata_by_key[key] = { **metadata_by_key.get(key, {}), @@ -130,11 +141,119 @@ def _with_code_source_single_definition( return assets_def.with_attributes(metadata_by_key=metadata_by_key) +def convert_local_path_to_source_control_path( + base_source_control_url: str, + repository_root_absolute_path: str, + local_path: LocalFileCodeReference, +) -> UrlCodeReference: + source_file_from_repo_root = os.path.relpath( + local_path.file_path, repository_root_absolute_path + ) + + return UrlCodeReference( + url=f"{base_source_control_url}/{source_file_from_repo_root}#L{local_path.line_number}", + label=local_path.label, + ) + + +def _convert_local_path_to_source_control_path_single_definition( + base_source_control_url: str, + repository_root_absolute_path: str, + assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"], +) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]: + from dagster._core.definitions.assets import AssetsDefinition + + # SourceAsset doesn't have an op definition to point to - cacheable assets + # will be supported eventually but are a bit trickier + if not isinstance(assets_def, AssetsDefinition): + return assets_def + + metadata_by_key = dict(assets_def.metadata_by_key) or {} + + for key in assets_def.keys: + existing_source_code_metadata = CodeReferencesMetadataSet.extract( + metadata_by_key.get(key, {}) + ) + if not existing_source_code_metadata.code_references: + continue + + sources_for_asset: List[Union[LocalFileCodeReference, UrlCodeReference]] = [ + convert_local_path_to_source_control_path( + base_source_control_url, + repository_root_absolute_path, + source, + ) + if isinstance(source, LocalFileCodeReference) + else source + for source in existing_source_code_metadata.code_references.code_references + ] + + metadata_by_key[key] = { + **metadata_by_key.get(key, {}), + **CodeReferencesMetadataSet( + code_references=CodeReferencesMetadataValue(code_references=sources_for_asset) + ), + } + + return assets_def.with_attributes(metadata_by_key=metadata_by_key) + + +def _build_github_url(url: str, branch: str) -> str: + return f"{url}/tree/{branch}" + + +def _build_gitlab_url(url: str, branch: str) -> str: + return f"{url}/-/tree/{branch}" + + +@experimental +def link_to_source_control( + assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]], + source_control_url: str, + source_control_branch: str, + repository_root_absolute_path: Union[Path, str], +) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]: + """Wrapper function which converts local file path code references to source control URLs + based on the provided source control URL and branch. + + Args: + assets_defs (Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]): + The asset definitions to which source control metadata should be attached. + Only assets with local file code references (such as those created by + `with_source_code_references`) will be converted. + source_control_url (str): The base URL for the source control system. For example, + "https://github.com/dagster-io/dagster". + source_control_branch (str): The branch in the source control system, such as "master". + repository_root_absolute_path (Union[Path, str]): The absolute path to the root of the + repository on disk. This is used to calculate the relative path to the source file + from the repository root and append it to the source control URL. + """ + if "gitlab.com" in source_control_url: + source_control_url = _build_gitlab_url(source_control_url, source_control_branch) + elif "github.com" in source_control_url: + source_control_url = _build_github_url(source_control_url, source_control_branch) + else: + raise ValueError( + "Invalid `source_control_url`." + " Only GitHub and GitLab are supported for linking to source control at this time." + ) + + return [ + _convert_local_path_to_source_control_path_single_definition( + base_source_control_url=source_control_url, + repository_root_absolute_path=str(repository_root_absolute_path), + assets_def=assets_def, + ) + for assets_def in assets_defs + ] + + @experimental def with_source_code_references( assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]], ) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]: - """Wrapper function which attaches source code metadata to the provided asset definitions. + """Wrapper function which attaches local code reference metadata to the provided asset definitions. + This points to the filepath and line number where the asset body is defined. Args: assets_defs (Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]): diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py index 0c14133ec993e..9eaf0d44dad77 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/asset_package/__init__.py @@ -13,7 +13,7 @@ def make_list_of_assets(): def james_brown(): pass - @asset + @asset(metadata={"foo": "bar"}) def fats_domino(): pass diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py index 9ed8c5a08b9c3..b4adf8ed62fca 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_defs_source_metadata.py @@ -4,10 +4,34 @@ from dagster import AssetsDefinition, load_assets_from_modules from dagster._core.definitions.metadata import ( LocalFileCodeReference, + UrlCodeReference, + link_to_source_control, with_source_code_references, ) from dagster._utils import file_relative_path +# path of the `dagster` package on the filesystem +DAGSTER_PACKAGE_PATH = os.path.normpath(file_relative_path(__file__, "../../")) +GIT_ROOT_PATH = os.path.normpath(os.path.join(DAGSTER_PACKAGE_PATH, "../../")) + +# path of the current file relative to the `dagster` package root +PATH_IN_PACKAGE = "/dagster_tests/asset_defs_tests/" + +# {path to module}:{path to file relative to module root}:{line number} +EXPECTED_ORIGINS = { + "james_brown": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:12", + "chuck_berry": ( + DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:16" + ), + "little_richard": (DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:4"), + "fats_domino": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:16", + "miles_davis": ( + DAGSTER_PACKAGE_PATH + + PATH_IN_PACKAGE + + "asset_package/asset_subpackage/another_module_with_assets.py:6" + ), +} + def test_asset_code_origins() -> None: from dagster_tests.asset_defs_tests import asset_package @@ -28,33 +52,12 @@ def test_asset_code_origins() -> None: collection_with_source_metadata = with_source_code_references(collection) - # path of the `dagster` module on the filesystem - dagster_module_path = os.path.normpath(file_relative_path(__file__, "../../")) - - # path of the current file relative to the `dagster` module root - path_in_module = "/dagster_tests/asset_defs_tests/" - - # {path to module}:{path to file relative to module root}:{line number} - expected_origins = { - "james_brown": dagster_module_path + path_in_module + "asset_package/__init__.py:12", - "chuck_berry": ( - dagster_module_path + path_in_module + "asset_package/module_with_assets.py:16" - ), - "little_richard": (dagster_module_path + path_in_module + "asset_package/__init__.py:4"), - "fats_domino": dagster_module_path + path_in_module + "asset_package/__init__.py:16", - "miles_davis": ( - dagster_module_path - + path_in_module - + "asset_package/asset_subpackage/another_module_with_assets.py:6" - ), - } - for asset in collection_with_source_metadata: if isinstance(asset, AssetsDefinition): op_name = asset.op.name - assert op_name in expected_origins, f"Missing expected origin for op {op_name}" + assert op_name in EXPECTED_ORIGINS, f"Missing expected origin for op {op_name}" - expected_file_path, expected_line_number = expected_origins[op_name].split(":") + expected_file_path, expected_line_number = EXPECTED_ORIGINS[op_name].split(":") for key in asset.keys: assert "dagster/code_references" in asset.metadata_by_key[key] @@ -83,3 +86,54 @@ def test_asset_code_origins() -> None: assert meta.file_path == expected_file_path assert meta.line_number == int(expected_line_number) + + +def test_asset_code_origins_source_control() -> None: + from dagster_tests.asset_defs_tests import asset_package + + from .asset_package import module_with_assets + + collection = load_assets_from_modules([asset_package, module_with_assets]) + + for asset in collection: + if isinstance(asset, AssetsDefinition): + for key in asset.keys: + # `chuck_berry` is the only asset with source code metadata manually + # attached to it + if asset.op.name == "chuck_berry": + assert "dagster/code_references" in asset.metadata_by_key[key] + else: + assert "dagster/code_references" not in asset.metadata_by_key[key] + + collection_with_source_metadata = with_source_code_references(collection) + collection_with_source_control_metadata = link_to_source_control( + collection_with_source_metadata, + source_control_url="https://github.com/dagster-io/dagster", + source_control_branch="master", + repository_root_absolute_path=GIT_ROOT_PATH, + ) + + for asset in collection_with_source_control_metadata: + if isinstance(asset, AssetsDefinition): + op_name = asset.op.name + assert op_name in EXPECTED_ORIGINS, f"Missing expected origin for op {op_name}" + + expected_file_path, expected_line_number = EXPECTED_ORIGINS[op_name].split(":") + + for key in asset.keys: + assert "dagster/code_references" in asset.metadata_by_key[key] + + assert isinstance( + asset.metadata_by_key[key]["dagster/code_references"].code_references[-1], + UrlCodeReference, + ) + meta = cast( + UrlCodeReference, + asset.metadata_by_key[key]["dagster/code_references"].code_references[-1], + ) + + assert meta.url == ( + "https://github.com/dagster-io/dagster/tree/master/python_modules/dagster" + + (expected_file_path[len(DAGSTER_PACKAGE_PATH) :]) + + f"#L{expected_line_number}" + )