diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index 0db6ec2ccdb89..1201d576c5512 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -18,7 +18,6 @@ ) from dagster._core.definitions.selector import JobSubsetSelector from dagster._core.errors import DagsterRunNotFoundError -from dagster._core.execution.backfill import PartitionBackfill from dagster._core.instance import DagsterInstance from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter from dagster._core.storage.event_log.base import AssetRecord @@ -400,52 +399,6 @@ def _fetch_runs_not_in_backfill( return runs[:limit] -def _spicy_window_safe_sort( - runs: Sequence[RunRecord], backfills: Sequence[PartitionBackfill], limit: Optional[int] -) -> Sequence[Union[RunRecord, PartitionBackfill]]: - """Merges a list of runs and backfills into a single list ordered by creation time, but maintains - the relative order of each list to account for spicy window shenanigans. - - For example, if two runs are created at t1 and t2, but the spicy window stores the t2 run before the - t1 run, we need to ensure that the order of those runs relative to each other is maintianed. - - DB ID run time - n run2 t2 - n+1 run1 t1 - - if we sorted just by creation time, run1 would come before run2. If run1 was the final run returned, the - user would pass run1 as the cursor for the next query, which means run2 would never get returned. - - Instead we can do a special sort that ensures that run2 is always returned before run1 so that - all runs are returned to the user. - """ - runs_idx = 0 - backfills_idx = 0 - sorted_runs = [] - - for _ in range(limit or len(runs) + len(backfills)): - if ( - runs[runs_idx].create_timestamp.timestamp() - > backfills[backfills_idx].backfill_timestamp - ): - sorted_runs.append(runs[runs_idx]) - runs_idx += 1 - else: - sorted_runs.append(backfills[backfills_idx]) - backfills_idx += 1 - - if limit and len(sorted_runs) == limit: - return sorted_runs - if runs_idx == len(runs): - sorted_runs.extend(backfills[backfills_idx:]) - return sorted_runs[:limit] - if backfills_idx == len(backfills): - sorted_runs.extend(runs[runs_idx:]) - return sorted_runs[:limit] - - return sorted_runs - - def get_mega_runs( graphene_info: "ResolveInfo", cursor: Optional[str] = None, @@ -471,14 +424,13 @@ def get_mega_runs( # order runs and backfills by create_time. typically we sort by storage id but that won't work here since # they are different tables - # all_mega_runs = sorted( - # backfills + runs, - # key=lambda x: x.create_timestamp.timestamp() - # if isinstance(x, RunRecord) - # else x.backfill_timestamp, - # reverse=True, - # ) - all_mega_runs = _spicy_window_safe_sort(runs=runs, backfills=backfills, limit=limit) + all_mega_runs = sorted( + backfills + runs, + key=lambda x: x.create_timestamp.timestamp() + if isinstance(x, RunRecord) + else x.backfill_timestamp, + reverse=True, + ) if limit: all_mega_runs = all_mega_runs[:limit] diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_mega_run.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_mega_run.py new file mode 100644 index 0000000000000..12066569e856c --- /dev/null +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_mega_run.py @@ -0,0 +1,120 @@ +"""general test structure: +seed some runs and backfills +ensure that the get megaruns function returns the expected results + +things to test + +respects limit +ordering is correct +cursors are respected +runs in a backfill are ignored +no runs or backfills are created +not enough runs/backfills for limit +1 run and a bunch of backfills and vis versa. +""" + +from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.storage.dagster_run import DagsterRun +from dagster._core.test_utils import create_run_for_test, freeze_time +from dagster._core.utils import make_new_backfill_id +from dagster._time import add_absolute_time, create_datetime +from dagster_graphql.test.utils import execute_dagster_graphql + +from dagster_graphql_tests.graphql.graphql_context_test_suite import ( + ExecutingGraphQLContextTestMatrix, +) + +GET_MEGA_RUNS_QUERY = """ +query MegaRunsQuery($cursor: String, $limit: Int!) { + megaRunsOrError(cursor: $cursor, limit: $limit) { + ... on MegaRuns { + results { + runId + status + creationTime + startTime + endTime + jobName + assetSelection { + path + } + assetCheckSelection { + name + } + tags { + key + value + } + } + } + ... on PythonError { + stack + message + } + } +} + +""" + + +def _create_run(graphql_context) -> DagsterRun: + return create_run_for_test( + instance=graphql_context.instance, + ) + + +def _create_run_for_backfill(graphql_context, backfill_id: str) -> DagsterRun: + return create_run_for_test( + instance=graphql_context.instance, + tags={ + **DagsterRun.tags_for_backfill_id(backfill_id), + }, + ) + + +def _create_backfill(graphql_context, timestamp: float) -> str: + backfill = PartitionBackfill( + backfill_id=make_new_backfill_id(), + serialized_asset_backfill_data="foo", # the content of the backfill doesn't matter for testing fetching mega runs + status=BulkActionStatus.COMPLETED, + reexecution_steps=None, + tags=None, + backfill_timestamp=timestamp, # truncate to an integer to make the ordering deterministic since the runs db is in ints + from_failure=False, + ) + graphql_context.instance.add_backfill(backfill) + return backfill.backfill_id + + +class TestMegaRuns(ExecutingGraphQLContextTestMatrix): + def test_get_mega_runs(self, graphql_context): + # seed some runs and backfills in an alternating order + expected_order = [] + start_datetime = create_datetime(year=2022, month=1, day=1, hour=1) + for i in range(10): + with freeze_time(start_datetime): + run_id = _create_run(graphql_context).run_id + expected_order.append(run_id) + backfill_id = _create_backfill( + graphql_context, timestamp=start_datetime.timestamp() + 2 + ) + expected_order.append(backfill_id) + + start_datetime = add_absolute_time(start_datetime, seconds=10) + + result = execute_dagster_graphql( + graphql_context, + GET_MEGA_RUNS_QUERY, + variables={ + "limit": 20, + "cursor": None, + }, + ) + + assert not result.errors + assert result.data + + assert result.data["megaRunsOrError"]["__typename"] == "LaunchBackfillSuccess" + assert len(result.data["megaRunsOrError"]["results"]) == 10 + for res in result.data["megaRunsOrError"]["results"]: + assert res["runId"] == expected_order.pop(-1) diff --git a/python_modules/dagster/dagster/_core/storage/dagster_run.py b/python_modules/dagster/dagster/_core/storage/dagster_run.py index b5e3505e937ba..b383692a2a03f 100644 --- a/python_modules/dagster/dagster/_core/storage/dagster_run.py +++ b/python_modules/dagster/dagster/_core/storage/dagster_run.py @@ -645,8 +645,8 @@ async def _batch_load( # this should be replaced with an async DB call records = instance.get_run_records(RunsFilter(run_ids=list(result_map.keys()))) - for run_record in records: - result_map[run_record.dagster_run.run_id] = run_record + for record in records: + result_map[record.dagster_run.run_id] = record return result_map.values()