Skip to content

Commit

Permalink
wip add testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 25, 2023
1 parent 3fdb7bc commit 1d157c6
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ def name(self):
def has_context_arg(self) -> bool:
return is_context_provided(get_function_params(self.decorated_fn))

def get_context_arg(self) -> Parameter:
if self.has_context_arg():
return get_function_params(self.decorated_fn)[0]
check.failed("Requested context arg on function that does not have one")

@lru_cache(maxsize=1)
def _get_function_params(self) -> Sequence[Parameter]:
return get_function_params(self.decorated_fn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
OutputManagerRequirement,
ResourceRequirement,
)
from dagster._core.errors import DagsterInvalidInvocationError, DagsterInvariantViolationError
from dagster._core.errors import (
DagsterInvalidDefinitionError,
DagsterInvalidInvocationError,
DagsterInvariantViolationError,
)
from dagster._core.types.dagster_type import DagsterType, DagsterTypeKind
from dagster._utils import IHasInternalInit
from dagster._utils.warnings import normalize_renamed_param
Expand Down Expand Up @@ -143,9 +147,11 @@ def __init__(
exclude_nothing=True,
)
self._compute_fn = compute_fn
_validate_context_type_hint(self._compute_fn.decorated_fn)
else:
resolved_input_defs = input_defs
self._compute_fn = check.callable_param(compute_fn, "compute_fn")
_validate_context_type_hint(self._compute_fn)

code_version = normalize_renamed_param(
code_version,
Expand Down Expand Up @@ -504,3 +510,23 @@ def _resolve_output_defs_from_outs(
)

return output_defs


def _validate_context_type_hint(fn):
from inspect import _empty as EmptyAnnotation

from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.decorators.op_decorator import is_context_provided
from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext

params = get_function_params(fn)
if is_context_provided(params):
if (
params[0].annotation is not AssetExecutionContext
and params[0].annotation is not OpExecutionContext
and params[0].annotation is not EmptyAnnotation
):
raise DagsterInvalidDefinitionError(
f"Cannot annotate `context` parameter with type {params[0].annotation}. `context`"
" must be annotated with AssetExecutionContext, OpExecutionContext, or left blank."
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import dagster._check as check
from dagster import OpExecutionContext, job, op
import pytest
from dagster import OpExecutionContext, asset, job, materialize, op
from dagster._core.definitions.asset_out import AssetOut
from dagster._core.definitions.decorators.asset_decorator import (
graph_asset,
graph_multi_asset,
multi_asset,
)
from dagster._core.definitions.events import Output
from dagster._core.definitions.graph_definition import GraphDefinition
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.storage.dagster_run import DagsterRun


Expand All @@ -20,3 +31,228 @@ def foo():
ctx_op()

assert foo.execute_in_process().success


def test_context_provided_to_asset():
@asset
def no_annotation(context):
assert isinstance(context, AssetExecutionContext)

materialize([no_annotation])

@asset
def asset_annotation(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)

materialize([asset_annotation])

@asset
def op_annotation(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

materialize([op_annotation])


def test_context_provided_to_op():
@op
def no_annotation(context):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

@job
def no_annotation_job():
no_annotation()

assert no_annotation_job.execute_in_process().success

@op
def asset_annotation(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)

@job
def asset_annotation_job():
asset_annotation()

with pytest.raises(
DagsterInvalidDefinitionError,
match="Cannot annotate @op `context` parameter with type AssetExecutionContext",
):
asset_annotation_job.execute_in_process()

@op
def op_annotation(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

@job
def op_annotation_job():
op_annotation()

assert op_annotation_job.execute_in_process().success


def test_context_provided_to_multi_asset():
@multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)})
def no_annotation(context):
assert isinstance(context, AssetExecutionContext)
return None, None

materialize([no_annotation])

@multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)})
def asset_annotation(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)
return None, None

materialize([asset_annotation])

@multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)})
def op_annotation(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
return None, None

materialize([op_annotation])


def test_context_provided_to_graph_asset():
@op
def no_annotation_op(context):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

@graph_asset
def no_annotation_asset():
return no_annotation_op()

materialize([no_annotation_asset])

@op
def asset_annotation_op(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)

@graph_asset
def asset_annotation_asset():
return asset_annotation_op()

materialize([asset_annotation_asset])

@op
def op_annotation_op(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

@graph_asset
def op_annotation_asset():
return op_annotation_op()

materialize([op_annotation_asset])


def test_context_provided_to_graph_multi_asset():
@op
def no_annotation_op(context):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

@graph_multi_asset(
outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}
)
def no_annotation_asset():
return no_annotation_op(), no_annotation_op()

materialize([no_annotation_asset])

@op
def asset_annotation_op(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)

@graph_multi_asset(
outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}
)
def asset_annotation_asset():
return asset_annotation_op(), asset_annotation_op()

materialize([asset_annotation_asset])

@op
def op_annotation_op(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

@graph_multi_asset(
outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}
)
def op_annotation_asset():
return op_annotation_op(), op_annotation_op()

materialize([op_annotation_asset])


def test_context_provided_to_plain_python():
# tests a job created using Definitions classes, not decorators

def no_annotation(context, *args):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
yield Output(1)

no_annotation_op = OpDefinition(compute_fn=no_annotation, name="no_annotation_op")
no_annotation_graph = GraphDefinition(name="no_annotation_graph", node_defs=[no_annotation_op])

no_annotation_graph.to_job(name="no_annotation_job").execute_in_process()

def asset_annotation(context: AssetExecutionContext, *args):
assert False, "Test should error during context creation"

asset_annotation_op = OpDefinition(compute_fn=asset_annotation, name="asset_annotation_op")
asset_annotation_graph = GraphDefinition(
name="asset_annotation_graph", node_defs=[asset_annotation_op]
)

with pytest.raises(
DagsterInvalidDefinitionError,
match="Cannot annotate @op `context` parameter with type AssetExecutionContext",
):
asset_annotation_graph.to_job(name="asset_annotation_job").execute_in_process()

def op_annotation(context: OpExecutionContext, *args):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
yield Output(1)

op_annotation_op = OpDefinition(compute_fn=op_annotation, name="op_annotation_op")
op_annotation_graph = GraphDefinition(name="op_annotation_graph", node_defs=[op_annotation_op])

op_annotation_graph.to_job(name="op_annotation_job").execute_in_process()


def test_error_on_invalid_context_annotation():
with pytest.raises(
DagsterInvalidDefinitionError,
match="must be annotated with AssetExecutionContext, OpExecutionContext, or left blank",
):

@op
def the_op(context: int):
pass

with pytest.raises(
DagsterInvalidDefinitionError,
match="must be annotated with AssetExecutionContext, OpExecutionContext, or left blank",
):

@asset
def the_asset(context: int):
pass

0 comments on commit 1d157c6

Please sign in to comment.