Skip to content

Commit

Permalink
[4/n] add python api for replacing local file references with source …
Browse files Browse the repository at this point in the history
…control links
  • Loading branch information
benpankow committed May 7, 2024
1 parent 7b3ff2a commit 3e2b861
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
CodeReferencesMetadataSet as CodeReferencesMetadataSet,
CodeReferencesMetadataValue as CodeReferencesMetadataValue,
LocalFileCodeReference as LocalFileCodeReference,
SourceControlCodeReference as SourceControlCodeReference,
link_to_source_control as link_to_source_control,
with_source_code_references as with_source_code_references,
)
from .table import ( # re-exported
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import os
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -40,6 +41,16 @@ class LocalFileCodeReference(DagsterModel):
label: Optional[str] = None


@experimental
@whitelist_for_serdes
class SourceControlCodeReference(DagsterModel):
"""Represents a source code location ."""

source_control_url: str
line_number: int
label: Optional[str] = None


@experimental
@whitelist_for_serdes
class CodeReferencesMetadataValue(DagsterModel, MetadataValue["CodeReferencesMetadataValue"]):
Expand All @@ -48,19 +59,19 @@ class CodeReferencesMetadataValue(DagsterModel, MetadataValue["CodeReferencesMet
asset is defined.
Attributes:
sources (List[LocalFileCodeReference]):
sources (List[Union[LocalFileCodeReference, SourceControlCodeReference]]):
A list of code references for the asset, such as file locations or
references to source control.
"""

code_references: List[LocalFileCodeReference]
code_references: List[Union[LocalFileCodeReference, SourceControlCodeReference]]

@property
def value(self) -> "CodeReferencesMetadataValue":
return self


def source_path_from_fn(fn: Callable[..., Any]) -> Optional[LocalFileCodeReference]:
def local_source_path_from_fn(fn: Callable[..., Any]) -> Optional[LocalFileCodeReference]:
cwd = os.getcwd()

origin_file = os.path.abspath(os.path.join(cwd, inspect.getsourcefile(fn))) # type: ignore
Expand Down Expand Up @@ -101,7 +112,7 @@ def _with_code_source_single_definition(
if isinstance(assets_def.op.compute_fn, DecoratedOpFunction)
else assets_def.op.compute_fn
)
source_path = source_path_from_fn(base_fn)
source_path = local_source_path_from_fn(base_fn)

if source_path:
sources = [source_path]
Expand All @@ -113,7 +124,9 @@ def _with_code_source_single_definition(
existing_source_code_metadata = CodeReferencesMetadataSet.extract(
metadata_by_key.get(key, {})
)
sources_for_asset = [
sources_for_asset: List[
Union[LocalFileCodeReference, SourceControlCodeReference]
] = [
*existing_source_code_metadata.code_references.code_references,
*sources,
]
Expand All @@ -130,6 +143,87 @@ def _with_code_source_single_definition(
return assets_def.with_attributes(metadata_by_key=metadata_by_key)


def convert_local_path_to_source_control_path(
base_source_control_url: str,
repository_root_absolute_path: str,
local_path: LocalFileCodeReference,
) -> SourceControlCodeReference:
source_file_from_repo_root = os.path.relpath(
local_path.file_path, repository_root_absolute_path
)

return SourceControlCodeReference(
source_control_url=f"{base_source_control_url}/{source_file_from_repo_root}",
line_number=local_path.line_number,
label=local_path.label,
)


def _convert_local_path_to_source_control_path_single_definition(
base_source_control_url: str,
repository_root_absolute_path: str,
assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"],
) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]:
from dagster._core.definitions.assets import AssetsDefinition

# SourceAsset doesn't have an op definition to point to - cacheable assets
# will be supported eventually but are a bit trickier
if not isinstance(assets_def, AssetsDefinition):
return assets_def

metadata_by_key = dict(assets_def.metadata_by_key) or {}

for key in assets_def.keys:
try:
existing_source_code_metadata = CodeReferencesMetadataSet.extract(
metadata_by_key.get(key, {})
)
sources_for_asset: List[Union[LocalFileCodeReference, SourceControlCodeReference]] = [
convert_local_path_to_source_control_path(
base_source_control_url,
repository_root_absolute_path,
source,
)
if isinstance(source, LocalFileCodeReference)
else source
for source in existing_source_code_metadata.code_references.code_references
]
metadata_by_key[key] = {
**metadata_by_key.get(key, {}),
**CodeReferencesMetadataSet(
code_references=CodeReferencesMetadataValue(code_references=sources_for_asset)
),
}
except pydantic.ValidationError:
pass

return assets_def.with_attributes(metadata_by_key=metadata_by_key)


def _build_github_url(url: str, branch: str) -> str:
return f"{url}/tree/{branch}"


@experimental
def link_to_source_control(
assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]],
source_control_url: str,
source_control_branch: str,
repository_root_absolute_path: Union[Path, str],
) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]:
if source_control_url and "github.com/" in source_control_url:
source_control_url = _build_github_url(source_control_url, source_control_branch)

return [
_convert_local_path_to_source_control_path_single_definition(
base_source_control_url=source_control_url,
repository_root_absolute_path=str(repository_root_absolute_path),
assets_def=assets_def,
)
for assets_def in assets_defs
]


@experimental
def with_source_code_references(
assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,34 @@
from dagster import AssetsDefinition, load_assets_from_modules
from dagster._core.definitions.metadata import (
LocalFileCodeReference,
SourceControlCodeReference,
link_to_source_control,
with_source_code_references,
)
from dagster._utils import file_relative_path

# path of the `dagster` package on the filesystem
DAGSTER_PACKAGE_PATH = os.path.normpath(file_relative_path(__file__, "../../"))
GIT_ROOT_PATH = os.path.normpath(os.path.join(DAGSTER_PACKAGE_PATH, "../../"))

# path of the current file relative to the `dagster` package root
PATH_IN_PACKAGE = "/dagster_tests/asset_defs_tests/"

# {path to module}:{path to file relative to module root}:{line number}
EXPECTED_ORIGINS = {
"james_brown": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:12",
"chuck_berry": (
DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:16"
),
"little_richard": (DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:4"),
"fats_domino": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:16",
"miles_davis": (
DAGSTER_PACKAGE_PATH
+ PATH_IN_PACKAGE
+ "asset_package/asset_subpackage/another_module_with_assets.py:6"
),
}


def test_asset_code_origins() -> None:
from dagster_tests.asset_defs_tests import asset_package
Expand All @@ -28,33 +52,12 @@ def test_asset_code_origins() -> None:

collection_with_source_metadata = with_source_code_references(collection)

# path of the `dagster` module on the filesystem
dagster_module_path = os.path.normpath(file_relative_path(__file__, "../../"))

# path of the current file relative to the `dagster` module root
path_in_module = "/dagster_tests/asset_defs_tests/"

# {path to module}:{path to file relative to module root}:{line number}
expected_origins = {
"james_brown": dagster_module_path + path_in_module + "asset_package/__init__.py:12",
"chuck_berry": (
dagster_module_path + path_in_module + "asset_package/module_with_assets.py:16"
),
"little_richard": (dagster_module_path + path_in_module + "asset_package/__init__.py:4"),
"fats_domino": dagster_module_path + path_in_module + "asset_package/__init__.py:16",
"miles_davis": (
dagster_module_path
+ path_in_module
+ "asset_package/asset_subpackage/another_module_with_assets.py:6"
),
}

for asset in collection_with_source_metadata:
if isinstance(asset, AssetsDefinition):
op_name = asset.op.name
assert op_name in expected_origins, f"Missing expected origin for op {op_name}"
assert op_name in EXPECTED_ORIGINS, f"Missing expected origin for op {op_name}"

expected_file_path, expected_line_number = expected_origins[op_name].split(":")
expected_file_path, expected_line_number = EXPECTED_ORIGINS[op_name].split(":")

for key in asset.keys:
assert "dagster/code_references" in asset.metadata_by_key[key]
Expand Down Expand Up @@ -83,3 +86,54 @@ def test_asset_code_origins() -> None:

assert meta.file_path == expected_file_path
assert meta.line_number == int(expected_line_number)


def test_asset_code_origins_source_control() -> None:
from dagster_tests.asset_defs_tests import asset_package

from .asset_package import module_with_assets

collection = load_assets_from_modules([asset_package, module_with_assets])

for asset in collection:
if isinstance(asset, AssetsDefinition):
for key in asset.keys:
# `chuck_berry` is the only asset with source code metadata manually
# attached to it
if asset.op.name == "chuck_berry":
assert "dagster/code_references" in asset.metadata_by_key[key]
else:
assert "dagster/code_references" not in asset.metadata_by_key[key]

collection_with_source_metadata = with_source_code_references(collection)
collection_with_source_control_metadata = link_to_source_control(
collection_with_source_metadata,
source_control_url="https://github.com/dagster-io/dagster",
source_control_branch="master",
repository_root_absolute_path=GIT_ROOT_PATH,
)

for asset in collection_with_source_control_metadata:
if isinstance(asset, AssetsDefinition):
op_name = asset.op.name
assert op_name in EXPECTED_ORIGINS, f"Missing expected origin for op {op_name}"

expected_file_path, expected_line_number = EXPECTED_ORIGINS[op_name].split(":")

for key in asset.keys:
assert "dagster/code_references" in asset.metadata_by_key[key]

assert isinstance(
asset.metadata_by_key[key]["dagster/code_references"].code_references[-1],
SourceControlCodeReference,
)
meta = cast(
SourceControlCodeReference,
asset.metadata_by_key[key]["dagster/code_references"].code_references[-1],
)

assert meta.source_control_url == (
"https://github.com/dagster-io/dagster/tree/master/python_modules/dagster"
+ (expected_file_path[len(DAGSTER_PACKAGE_PATH) :])
)
assert meta.line_number == int(expected_line_number)

0 comments on commit 3e2b861

Please sign in to comment.