Skip to content

Commit

Permalink
start some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 6, 2024
1 parent dbd5e92 commit c6ac4ac
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
)
from dagster._core.definitions.selector import JobSubsetSelector
from dagster._core.errors import DagsterRunNotFoundError
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter
from dagster._core.storage.event_log.base import AssetRecord
Expand Down Expand Up @@ -400,52 +399,6 @@ def _fetch_runs_not_in_backfill(
return runs[:limit]


def _spicy_window_safe_sort(
runs: Sequence[RunRecord], backfills: Sequence[PartitionBackfill], limit: Optional[int]
) -> Sequence[Union[RunRecord, PartitionBackfill]]:
"""Merges a list of runs and backfills into a single list ordered by creation time, but maintains
the relative order of each list to account for spicy window shenanigans.
For example, if two runs are created at t1 and t2, but the spicy window stores the t2 run before the
t1 run, we need to ensure that the order of those runs relative to each other is maintianed.
DB ID run time
n run2 t2
n+1 run1 t1
if we sorted just by creation time, run1 would come before run2. If run1 was the final run returned, the
user would pass run1 as the cursor for the next query, which means run2 would never get returned.
Instead we can do a special sort that ensures that run2 is always returned before run1 so that
all runs are returned to the user.
"""
runs_idx = 0
backfills_idx = 0
sorted_runs = []

for _ in range(limit or len(runs) + len(backfills)):
if (
runs[runs_idx].create_timestamp.timestamp()
> backfills[backfills_idx].backfill_timestamp
):
sorted_runs.append(runs[runs_idx])
runs_idx += 1
else:
sorted_runs.append(backfills[backfills_idx])
backfills_idx += 1

if limit and len(sorted_runs) == limit:
return sorted_runs
if runs_idx == len(runs):
sorted_runs.extend(backfills[backfills_idx:])
return sorted_runs[:limit]
if backfills_idx == len(backfills):
sorted_runs.extend(runs[runs_idx:])
return sorted_runs[:limit]

return sorted_runs


def get_mega_runs(
graphene_info: "ResolveInfo",
cursor: Optional[str] = None,
Expand All @@ -471,14 +424,13 @@ def get_mega_runs(

# order runs and backfills by create_time. typically we sort by storage id but that won't work here since
# they are different tables
# all_mega_runs = sorted(
# backfills + runs,
# key=lambda x: x.create_timestamp.timestamp()
# if isinstance(x, RunRecord)
# else x.backfill_timestamp,
# reverse=True,
# )
all_mega_runs = _spicy_window_safe_sort(runs=runs, backfills=backfills, limit=limit)
all_mega_runs = sorted(
backfills + runs,
key=lambda x: x.create_timestamp.timestamp()
if isinstance(x, RunRecord)
else x.backfill_timestamp,
reverse=True,
)
if limit:
all_mega_runs = all_mega_runs[:limit]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""general test structure:
seed some runs and backfills
ensure that the get megaruns function returns the expected results
things to test
respects limit
ordering is correct
cursors are respected
runs in a backfill are ignored
no runs or backfills are created
not enough runs/backfills for limit
1 run and a bunch of backfills and vis versa.
"""

from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.storage.dagster_run import DagsterRun
from dagster._core.test_utils import create_run_for_test, freeze_time
from dagster._core.utils import make_new_backfill_id
from dagster._time import add_absolute_time, create_datetime
from dagster_graphql.test.utils import execute_dagster_graphql

from dagster_graphql_tests.graphql.graphql_context_test_suite import (
ExecutingGraphQLContextTestMatrix,
)

GET_MEGA_RUNS_QUERY = """
query MegaRunsQuery($cursor: String, $limit: Int!) {
megaRunsOrError(cursor: $cursor, limit: $limit) {
... on MegaRuns {
results {
runId
status
creationTime
startTime
endTime
jobName
assetSelection {
path
}
assetCheckSelection {
name
}
tags {
key
value
}
}
}
... on PythonError {
stack
message
}
}
}
"""


def _create_run(graphql_context) -> DagsterRun:
return create_run_for_test(
instance=graphql_context.instance,
)


def _create_run_for_backfill(graphql_context, backfill_id: str) -> DagsterRun:
return create_run_for_test(
instance=graphql_context.instance,
tags={
**DagsterRun.tags_for_backfill_id(backfill_id),
},
)


def _create_backfill(graphql_context, timestamp: float) -> str:
backfill = PartitionBackfill(
backfill_id=make_new_backfill_id(),
serialized_asset_backfill_data="foo", # the content of the backfill doesn't matter for testing fetching mega runs
status=BulkActionStatus.COMPLETED,
reexecution_steps=None,
tags=None,
backfill_timestamp=timestamp, # truncate to an integer to make the ordering deterministic since the runs db is in ints
from_failure=False,
)
graphql_context.instance.add_backfill(backfill)
return backfill.backfill_id


class TestMegaRuns(ExecutingGraphQLContextTestMatrix):
def test_get_mega_runs(self, graphql_context):
# seed some runs and backfills in an alternating order
expected_order = []
start_datetime = create_datetime(year=2022, month=1, day=1, hour=1)
for i in range(10):
with freeze_time(start_datetime):
run_id = _create_run(graphql_context).run_id
expected_order.append(run_id)
backfill_id = _create_backfill(
graphql_context, timestamp=start_datetime.timestamp() + 2
)
expected_order.append(backfill_id)

start_datetime = add_absolute_time(start_datetime, seconds=10)

result = execute_dagster_graphql(
graphql_context,
GET_MEGA_RUNS_QUERY,
variables={
"limit": 20,
"cursor": None,
},
)

assert not result.errors
assert result.data

assert result.data["megaRunsOrError"]["__typename"] == "LaunchBackfillSuccess"
assert len(result.data["megaRunsOrError"]["results"]) == 10
for res in result.data["megaRunsOrError"]["results"]:
assert res["runId"] == expected_order.pop(-1)

0 comments on commit c6ac4ac

Please sign in to comment.