Skip to content

Commit

Permalink
Fix error on op selection (#17233)
Browse files Browse the repository at this point in the history
With
https://github.com/dagster-io/dagster/pull/16640/files#diff-3a0d9c2e7d488c33f5be6fdf25298f9b03277b1bc73417d53e0a51688d32e8b6,
we started passing assetCheckSelection: [] for op jobs. This was
untested, and hit this invariant because unlike for assets where [] gets
turned in to None, we maintain a difference between None and [] with
asset checks

This is ripe for rethinking
  • Loading branch information
johannkm authored Oct 16, 2023
1 parent 70514f3 commit 4ecfe81
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
4 changes: 4 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,16 @@ def infer_pipeline_selector(
graphql_context: WorkspaceRequestContext,
pipeline_name: str,
op_selection: Optional[Sequence[str]] = None,
asset_selection: Optional[Sequence[GqlAssetKey]] = None,
asset_check_selection: Optional[Sequence[GqlAssetCheckHandle]] = None,
) -> Selector:
selector = infer_repository_selector(graphql_context)
selector.update(
{
"pipelineName": pipeline_name,
"solidSelection": op_selection,
"assetSelection": asset_selection,
"assetCheckSelection": asset_check_selection,
}
)
return selector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,33 @@ def test_nested_graph_op_selection_and_config(self, graphql_context: WorkspaceRe
assert step_did_not_run(logs, "plus_one")
assert step_did_not_run(logs, "subgraph.op_2")

def test_nested_graph_op_selection_and_config_with_non_null_asset_and_check_selection(
self, graphql_context: WorkspaceRequestContext
):
selector = infer_pipeline_selector(
graphql_context,
"nested_job",
["subgraph.adder", "subgraph.op_1"],
asset_selection=[],
asset_check_selection=[],
)
run_config = {"ops": {"subgraph": {"ops": {"adder": {"inputs": {"num2": 20}}}}}}
result = sync_execute_get_run_log_data(
context=graphql_context,
variables={
"executionParams": {
"selector": selector,
"runConfigData": run_config,
}
},
)
logs = result["messages"]
assert isinstance(logs, list)
assert step_did_succeed(logs, "subgraph.adder")
assert step_did_succeed(logs, "subgraph.op_1")
assert step_did_not_run(logs, "plus_one")
assert step_did_not_run(logs, "subgraph.op_2")

def test_memoization_job(self, graphql_context: WorkspaceRequestContext):
selector = infer_pipeline_selector(graphql_context, "memoization_job")
run_config = {}
Expand Down
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,11 @@ def create_run(
check.opt_set_param(asset_selection, "asset_selection", of_type=AssetKey)
check.opt_set_param(asset_check_selection, "asset_check_selection", of_type=AssetCheckKey)

if asset_selection is not None or asset_check_selection is not None:
# asset_selection will always be None on an op job, but asset_check_selection may be
# None or []. This is because [] and None are different for asset checks: None means
# include all asset checks on selected assets, while [] means include no asset checks.
# In an op job (which has no asset checks), these two are equivalent.
if asset_selection is not None or asset_check_selection:
check.invariant(
op_selection is None,
"Cannot pass op_selection with either of asset_selection or asset_check_selection",
Expand Down

0 comments on commit 4ecfe81

Please sign in to comment.