Skip to content

Commit

Permalink
update logic and testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 18, 2023
1 parent d811182 commit 45fe573
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 125 deletions.
50 changes: 23 additions & 27 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, ABCMeta, abstractmethod
from inspect import _empty as EmptyAnnotation
from typing import (
AbstractSet,
Any,
Expand Down Expand Up @@ -1496,42 +1497,37 @@ def build_execution_context(
step type annotation result
asset AssetExecutionContext AssetExecutionContext
asset OpExecutionContext AssetExecutionContext - with deprecation warning
asset OpExecutionContext OpExecutionContext
asset None AssetExecutionContext
op AssetExecutionContext Error
op AssetExecutionContext AssetExecutionContext
op OpExecutionContext OpExecutionContext
op None OpExecutionContext
"""
is_sda_step = step_context.is_sda_step
is_asset_context = False
is_op_context = False

context_annotation = EmptyAnnotation
compute_fn = step_context.op_def._compute_fn # noqa: SLF001
if isinstance(compute_fn, DecoratedOpFunction) and compute_fn.has_context_arg():
compute_fn = (
compute_fn
if isinstance(compute_fn, DecoratedOpFunction)
else DecoratedOpFunction(compute_fn)
)
if compute_fn.has_context_arg():
context_param = compute_fn.get_context_arg()
is_asset_context = context_param.annotation is AssetExecutionContext
is_op_context = context_param.annotation is OpExecutionContext

if is_asset_context and not is_sda_step:
raise DagsterInvalidDefinitionError(
"When executed in jobs, the op context should be annotated with OpExecutionContext, not"
" AssetExecutionContext."
)

if is_op_context and is_sda_step:
deprecation_warning(
"Contexts with type annotation OpExecutionContext for @assets, @multi_assets,"
" @graph_asset, and @graph_multi_asset.",
"1.7.0",
additional_warn_text="Please annotate the context with AssetExecutionContext",
stacklevel=1,
)
context_annotation = context_param.annotation

op_context = OpExecutionContext(step_context)

# TODO - determine special casing for graph backed assets

if is_sda_step:
if context_annotation is EmptyAnnotation:
# if no type hint has been given, default to AssetExecutionContext for sda steps and
# OpExecutionContext for non sda steps
return AssetExecutionContext(op_context) if is_sda_step else op_context
if context_annotation is AssetExecutionContext:
return AssetExecutionContext(op_context)
return op_context
if context_annotation is OpExecutionContext:
return op_context

raise DagsterInvalidDefinitionError(
f"Cannot annotate `context` parameter with type {context_annotation}. `context` must be"
" annotated with AssetExecutionContext, OpExecutionContext, or left blank."
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,169 +33,211 @@ def foo():
assert foo.execute_in_process().success


def test_correct_context_provided_no_type_hints():
# asset
def test_context_provided_to_asset():
@asset
def the_asset(context):
def no_annotation(context):
assert isinstance(context, AssetExecutionContext)

materialize([the_asset])
materialize([no_annotation])

# ops, jobs
@op
def the_op(context):
@asset
def asset_annotation(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)

materialize([asset_annotation])

@asset
def op_annotation(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

@job
def the_job():
the_op()
materialize([op_annotation])

assert the_job.execute_in_process().success

# multi_asset
@multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)})
def the_multi_asset(context):
assert isinstance(context, AssetExecutionContext)
return None, None
def test_context_provided_to_op():
@op
def no_annotation(context):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

@job
def no_annotation_job():
no_annotation()

materialize([the_multi_asset])
assert no_annotation_job.execute_in_process().success

# graph backed asset
@op
def the_asset_op(context):
def asset_annotation(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)

@graph_asset
def the_graph_asset():
return the_asset_op()

materialize([the_graph_asset])

# graph backed multi asset
@graph_multi_asset(
outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}
)
def the_graph_multi_asset():
return the_asset_op(), the_asset_op()
@job
def asset_annotation_job():
asset_annotation()

materialize([the_graph_multi_asset])
assert asset_annotation_job.execute_in_process().success

# job created using Definitions classes, not decorators
def plain_python(context, *args):
@op
def op_annotation(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
yield Output(1)

no_decorator_op = OpDefinition(compute_fn=plain_python, name="no_decorator_op")
no_decorator_graph = GraphDefinition(name="no_decorator_graph", node_defs=[no_decorator_op])
@job
def op_annotation_job():
op_annotation()

no_decorator_graph.to_job(name="no_decorator_job").execute_in_process()
assert op_annotation_job.execute_in_process().success


def test_correct_context_provided_with_expected_type_hints():
# asset
@asset
def the_asset(context: AssetExecutionContext):
def test_context_provided_to_multi_asset():
@multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)})
def no_annotation(context):
assert isinstance(context, AssetExecutionContext)
return None, None

materialize([the_asset])
materialize([no_annotation])

# ops, jobs
@op
def the_op(context: OpExecutionContext):
@multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)})
def asset_annotation(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)
return None, None

materialize([asset_annotation])

@multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)})
def op_annotation(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
return None, None

@job
def the_job():
the_op()
materialize([op_annotation])

assert the_job.execute_in_process().success

# multi_asset
@multi_asset(outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)})
def the_multi_asset(context: AssetExecutionContext):
def test_context_provided_to_graph_asset():
@op
def no_annotation_op(context):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(
context, AssetExecutionContext
) # fails, context is an AssetExecutionContext in current impl

@graph_asset
def no_annotation_asset():
return no_annotation_op()

materialize([no_annotation_asset])

@op
def asset_annotation_op(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)
return None, None

materialize([the_multi_asset])
@graph_asset
def asset_annotation_asset():
return asset_annotation_op()

materialize([asset_annotation_asset])

# job created using Definitions classes, not decorators
def plain_python(context: OpExecutionContext, *args):
@op
def op_annotation_op(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
yield Output(1)

no_decorator_op = OpDefinition(compute_fn=plain_python, name="no_decorator_op")
no_decorator_graph = GraphDefinition(name="no_decorator_graph", node_defs=[no_decorator_op])

no_decorator_graph.to_job(name="no_decorator_job").execute_in_process()
@graph_asset
def op_annotation_asset():
return op_annotation_op()

materialize([op_annotation_asset])

def test_graph_asset_with_op_context():
# TODO - this test fails right now. How do we want to handle this case?
# If we want to provide an OpExecutionContext to this op, then we need a way
# to determine if the asset is a graph-backed asset rather than an @asset or @multi_asset so that we
# can special case this behavior
#
# weird edge case:
# an op is used in both a job and a graph backed asset. This would mean in the job it would get an
# OpExecutionContext, but in the graph backed asset it would get an AssetExecutionContext. Once we
# deprecate the op methods from AssetExecutionContext this will be a problem since a method like
# describe_op would be accessed as context.describe_op in the job and context.op_execution_context.describe_op
# in the graph backed asset

def test_context_provided_to_graph_multi_asset():
@op
def the_op(context: OpExecutionContext):
def no_annotation_op(context):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(
context, AssetExecutionContext
) # fails, context is an AssetExecutionContext in current impl

@graph_asset
def the_graph_asset():
return the_op()

materialize([the_graph_asset])

@graph_multi_asset(
outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}
)
def the_graph_multi_asset():
return the_op(), the_op()

materialize([the_graph_multi_asset])
def no_annotation_asset():
return no_annotation_op(), no_annotation_op()

materialize([no_annotation_asset])

def test_graph_asset_with_asset_context():
@op
def the_op(context: AssetExecutionContext):
def asset_annotation_op(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)

@graph_asset
def the_graph_asset():
return the_op()
@graph_multi_asset(
outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}
)
def asset_annotation_asset():
return asset_annotation_op(), asset_annotation_op()

materialize([the_graph_asset])
materialize([asset_annotation_asset])

@op
def op_annotation_op(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

@graph_multi_asset(
outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}
)
def the_graph_multi_asset():
return the_op(), the_op()
def op_annotation_asset():
return op_annotation_op(), op_annotation_op()

materialize([op_annotation_asset])


def test_context_provided_to_plain_python():
# tests a job created using Definitions classes, not decorators

def no_annotation(context, *args):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
yield Output(1)

no_annotation_op = OpDefinition(compute_fn=no_annotation, name="no_annotation_op")
no_annotation_graph = GraphDefinition(name="no_annotation_graph", node_defs=[no_annotation_op])

no_annotation_graph.to_job(name="no_annotation_job").execute_in_process()

def asset_annotation(context: AssetExecutionContext, *args):
assert isinstance(context, AssetExecutionContext)
yield Output(1)

asset_annotation_op = OpDefinition(compute_fn=asset_annotation, name="asset_annotation_op")
asset_annotation_graph = GraphDefinition(
name="asset_annotation_graph", node_defs=[asset_annotation_op]
)

asset_annotation_graph.to_job(name="asset_annotation_job").execute_in_process()

def op_annotation(context: OpExecutionContext, *args):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
yield Output(1)

op_annotation_op = OpDefinition(compute_fn=op_annotation, name="op_annotation_op")
op_annotation_graph = GraphDefinition(name="op_annotation_graph", node_defs=[op_annotation_op])

materialize([the_graph_multi_asset])
op_annotation_graph.to_job(name="op_annotation_job").execute_in_process()


def test_error_on_context_type_mismatch():
def test_error_on_invalid_context_annotation():
@op
def the_op(context: AssetExecutionContext):
def the_op(context: int):
pass

@job
Expand All @@ -204,6 +246,6 @@ def the_job():

with pytest.raises(
DagsterInvalidDefinitionError,
match="When executed in jobs, the op context should be annotated with OpExecutionContext",
match="must be annotated with AssetExecutionContext, OpExecutionContext, or left blank",
):
assert the_job.execute_in_process().success
assert the_job.execute_in_process()

0 comments on commit 45fe573

Please sign in to comment.