Skip to content

Commit

Permalink
graphene model
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 2, 2024
1 parent 6669f73 commit 0c1dd35
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 32 deletions.

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 92 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord, RunsFilter
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.storage.tags import TagType, get_tag_type
from dagster._core.storage.tags import BACKFILL_ID_TAG, TagType, get_tag_type

from .external import ensure_valid_config, get_external_job_or_raise

Expand All @@ -33,7 +33,7 @@
from ..schema.execution import GrapheneExecutionPlan
from ..schema.logs.events import GrapheneRunStepStats
from ..schema.pipelines.config import GraphenePipelineConfigValidationValid
from ..schema.pipelines.pipeline import GrapheneEventConnection, GrapheneRun
from ..schema.pipelines.pipeline import GrapheneEventConnection, GrapheneMegaRun, GrapheneRun
from ..schema.pipelines.pipeline_run_stats import GrapheneRunStatsSnapshot
from ..schema.runs import GrapheneRunGroup, GrapheneRunTagKeys, GrapheneRunTags
from ..schema.util import ResolveInfo
Expand Down Expand Up @@ -379,3 +379,54 @@ def get_logs_for_run(
cursor=conn.cursor,
hasMore=conn.has_more,
)


def _fetch_runs_not_in_backfill(
instance, cursor: Optional[str], limit: Optional[int]
) -> Sequence[RunRecord]:
if limit is None:
new_runs = instance.get_run_records(cursor=cursor)
return [run for run in new_runs if run.dagster_run.tags.get(BACKFILL_ID_TAG) is None]

runs = []
while len(runs) < limit:
new_runs = instance.get_run_records(limit=limit, cursor=cursor)
if len(new_runs) == 0:
return runs
cursor = new_runs[-1].dagster_run.run_id
runs.extend([run for run in new_runs if run.dagster_run.tags.get(BACKFILL_ID_TAG) is None])

return runs[:limit]


def get_mega_runs(
graphene_info: "ResolveInfo",
cursor: Optional[str] = None,
limit: Optional[int] = None,
) -> Sequence["GrapheneMegaRun"]:
from ..schema.pipelines.pipeline import GrapheneMegaRun

check.opt_str_param(cursor, "cursor")
check.opt_int_param(limit, "limit")

instance = graphene_info.context.instance

run_cursor, backfill_cursor = cursor.split(";") if cursor else (None, None)

# get_backfills doesn't support many filters. Need to implement some filtering before we can
# add that as a param
backfills = instance.get_backfills(cursor=backfill_cursor, limit=limit)
runs = _fetch_runs_not_in_backfill(instance, cursor=run_cursor, limit=limit)

# order runs and backfills by create_time. typically we sort by storage id but that won't work here since
# they are different tables
all_mega_runs = sorted(
backfills + runs,
key=lambda x: x.create_timestamp.timestamp()
if isinstance(x, RunRecord)
else x.backfill_timestamp,
)
if limit:
all_mega_runs = all_mega_runs[:limit]

return [GrapheneMegaRun(record) for record in all_mega_runs]
14 changes: 14 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
)
from dagster._core.workspace.permissions import Permissions

from dagster_graphql.schema.pipelines.status import GrapheneRunStatus

from ..implementation.fetch_partition_sets import (
partition_status_counts_from_run_partition_data,
partition_statuses_from_run_partition_data,
Expand Down Expand Up @@ -120,6 +122,18 @@ class GrapheneBulkActionStatus(graphene.Enum):
class Meta:
name = "BulkActionStatus"

def to_dagster_run_status(self) -> GrapheneRunStatus:
if self == GrapheneBulkActionStatus.REQUESTED:
return GrapheneRunStatus.STARTED
if self == GrapheneBulkActionStatus.COMPLETED:
return GrapheneRunStatus.SUCCESS
if self == GrapheneBulkActionStatus.FAILED:
return GrapheneRunStatus.FAILURE
if self == GrapheneBulkActionStatus.CANCELED:
return GrapheneRunStatus.CANCELED
if self == GrapheneBulkActionStatus.CANCELING:
return GrapheneRunStatus.CANCELING


class GrapheneAssetBackfillTargetPartitions(graphene.ObjectType):
class Meta:
Expand Down
Loading

0 comments on commit 0c1dd35

Please sign in to comment.