From 17b51e3922311285edbedc3787625c2f4864ad7e Mon Sep 17 00:00:00 2001 From: Daniel Gibson Date: Mon, 29 Jul 2024 17:08:29 -0700 Subject: [PATCH 1/2] Add ASSET_MATERIALIZATION_FAILURE event test fixes --- .../types/AssetRunLogObserver.types.ts | 1 + .../ui-core/src/graphql/schema.graphql | 14 +++++ .../packages/ui-core/src/graphql/types.ts | 46 +++++++++++++++ .../src/runs/LogsRowStructuredContent.tsx | 1 + .../packages/ui-core/src/runs/filterLogs.tsx | 5 +- .../src/runs/types/LogsProvider.types.ts | 27 +++++++++ .../ui-core/src/runs/types/LogsRow.types.ts | 19 +++++++ ...LogsScrollingTableMessageFragment.types.ts | 10 ++++ .../src/runs/types/RunFragments.types.ts | 10 ++++ .../runs/types/RunMetadataProvider.types.ts | 8 +++ .../dagster_graphql/implementation/events.py | 3 + .../implementation/fetch_runs.py | 2 +- .../dagster_graphql/schema/logs/events.py | 15 +++++ .../dagster/dagster/_core/event_api.py | 1 + .../dagster/dagster/_core/events/__init__.py | 57 +++++++++++++++++++ .../dagster/_core/instance/__init__.py | 3 +- .../dagster/_core/storage/event_log/base.py | 14 ++++- .../_core/storage/event_log/sql_event_log.py | 4 ++ .../_core/storage/partition_status_cache.py | 2 +- .../storage_tests/utils/event_log_storage.py | 53 ++++++++++++++++- 20 files changed, 287 insertions(+), 8 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/types/AssetRunLogObserver.types.ts b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/types/AssetRunLogObserver.types.ts index 0cfb4d236de78..1c3feb62a7e04 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/types/AssetRunLogObserver.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/types/AssetRunLogObserver.types.ts @@ -24,6 +24,7 @@ export type AssetLiveRunLogsSubscription = { }; } | {__typename: 'AssetCheckEvaluationPlannedEvent'} + | {__typename: 'AssetMaterializationFailureEvent'} | { __typename: 'AssetMaterializationPlannedEvent'; assetKey: {__typename: 'AssetKey'; path: Array} | null; diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index aa7daa1095141..36d9e412e1a05 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -49,6 +49,7 @@ enum DagsterEventType { STEP_EXPECTATION_RESULT ASSET_CHECK_EVALUATION_PLANNED ASSET_CHECK_EVALUATION + ASSET_MATERIALIZATION_FAILURE RUN_ENQUEUED RUN_DEQUEUED RUN_STARTING @@ -1196,6 +1197,7 @@ union DagsterRunEvent = | AlertSuccessEvent | AlertFailureEvent | AssetMaterializationPlannedEvent + | AssetMaterializationFailureEvent | AssetCheckEvaluationPlannedEvent | AssetCheckEvaluationEvent @@ -1249,6 +1251,18 @@ type AlertFailureEvent implements MessageEvent & RunEvent { pipelineName: String! } +type AssetMaterializationFailureEvent implements MessageEvent & RunEvent { + runId: String! + message: String! + timestamp: String! + level: LogLevel! + stepKey: String + solidHandleID: String + eventType: DagsterEventType + pipelineName: String! + assetKey: AssetKey +} + type AssetCheckEvaluationPlannedEvent implements MessageEvent & StepEvent { runId: String! message: String! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 21e30296eeef2..f207223d844a1 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -392,6 +392,20 @@ export type AssetLineageInfo = { partitions: Array; }; +export type AssetMaterializationFailureEvent = MessageEvent & + RunEvent & { + __typename: 'AssetMaterializationFailureEvent'; + assetKey: Maybe; + eventType: Maybe; + level: LogLevel; + message: Scalars['String']['output']; + pipelineName: Scalars['String']['output']; + runId: Scalars['String']['output']; + solidHandleID: Maybe; + stepKey: Maybe; + timestamp: Scalars['String']['output']; + }; + export type AssetMaterializationPlannedEvent = MessageEvent & RunEvent & { __typename: 'AssetMaterializationPlannedEvent'; @@ -927,6 +941,7 @@ export enum DagsterEventType { ASSET_CHECK_EVALUATION = 'ASSET_CHECK_EVALUATION', ASSET_CHECK_EVALUATION_PLANNED = 'ASSET_CHECK_EVALUATION_PLANNED', ASSET_MATERIALIZATION = 'ASSET_MATERIALIZATION', + ASSET_MATERIALIZATION_FAILURE = 'ASSET_MATERIALIZATION_FAILURE', ASSET_MATERIALIZATION_PLANNED = 'ASSET_MATERIALIZATION_PLANNED', ASSET_OBSERVATION = 'ASSET_OBSERVATION', ASSET_STORE_OPERATION = 'ASSET_STORE_OPERATION', @@ -982,6 +997,7 @@ export type DagsterRunEvent = | AlertSuccessEvent | AssetCheckEvaluationEvent | AssetCheckEvaluationPlannedEvent + | AssetMaterializationFailureEvent | AssetMaterializationPlannedEvent | EngineEvent | ExecutionStepFailureEvent @@ -6303,6 +6319,36 @@ export const buildAssetLineageInfo = ( }; }; +export const buildAssetMaterializationFailureEvent = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'AssetMaterializationFailureEvent'} & AssetMaterializationFailureEvent => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('AssetMaterializationFailureEvent'); + return { + __typename: 'AssetMaterializationFailureEvent', + assetKey: + overrides && overrides.hasOwnProperty('assetKey') + ? overrides.assetKey! + : relationshipsToOmit.has('AssetKey') + ? ({} as AssetKey) + : buildAssetKey({}, relationshipsToOmit), + eventType: + overrides && overrides.hasOwnProperty('eventType') + ? overrides.eventType! + : DagsterEventType.ALERT_FAILURE, + level: overrides && overrides.hasOwnProperty('level') ? overrides.level! : LogLevel.CRITICAL, + message: overrides && overrides.hasOwnProperty('message') ? overrides.message! : 'quia', + pipelineName: + overrides && overrides.hasOwnProperty('pipelineName') ? overrides.pipelineName! : 'et', + runId: overrides && overrides.hasOwnProperty('runId') ? overrides.runId! : 'et', + solidHandleID: + overrides && overrides.hasOwnProperty('solidHandleID') ? overrides.solidHandleID! : 'sequi', + stepKey: overrides && overrides.hasOwnProperty('stepKey') ? overrides.stepKey! : 'natus', + timestamp: overrides && overrides.hasOwnProperty('timestamp') ? overrides.timestamp! : 'rerum', + }; +}; + export const buildAssetMaterializationPlannedEvent = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRowStructuredContent.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRowStructuredContent.tsx index 0bd03c08bdf52..b3c558ab2f851 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRowStructuredContent.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRowStructuredContent.tsx @@ -152,6 +152,7 @@ export const LogsRowStructuredContent = ({node, metadata}: IStructuredContentPro /> ); case 'AssetMaterializationPlannedEvent': + case 'AssetMaterializationFailureEvent': return ; case 'ObjectStoreOperationEvent': return ( diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/filterLogs.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/filterLogs.tsx index 368fadca1b641..98fce4eb602a2 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/filterLogs.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/filterLogs.tsx @@ -6,11 +6,12 @@ import {weakmapMemoize} from '../app/Util'; export function filterLogs(logs: LogsProviderLogs, filter: LogFilter, filterStepKeys: string[]) { const filteredNodes = logs.allNodes.filter((node) => { - // These events are used to determine which assets a run will materialize and are not intended + // These events are used for internal bookkeeping and are not intended // to be displayed in the Dagster UI. Pagination is offset based, so we remove these logs client-side. if ( node.__typename === 'AssetMaterializationPlannedEvent' || - node.__typename === 'AssetCheckEvaluationPlannedEvent' + node.__typename === 'AssetCheckEvaluationPlannedEvent' || + node.__typename === 'AssetMaterializationFailureEvent' ) { return false; } diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts index ddeb35b447620..243fc49f59e48 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts @@ -244,6 +244,15 @@ export type PipelineRunLogsSubscription = { stepKey: string | null; eventType: Types.DagsterEventType | null; } + | { + __typename: 'AssetMaterializationFailureEvent'; + runId: string; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; + } | { __typename: 'AssetMaterializationPlannedEvent'; runId: string; @@ -3581,6 +3590,15 @@ export type RunLogsSubscriptionSuccessFragment = { stepKey: string | null; eventType: Types.DagsterEventType | null; } + | { + __typename: 'AssetMaterializationFailureEvent'; + runId: string; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; + } | { __typename: 'AssetMaterializationPlannedEvent'; runId: string; @@ -6869,6 +6887,15 @@ export type RunLogsQuery = { stepKey: string | null; eventType: Types.DagsterEventType | null; } + | { + __typename: 'AssetMaterializationFailureEvent'; + runId: string; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; + } | { __typename: 'AssetMaterializationPlannedEvent'; runId: string; diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts index d1026405118db..69814c91aae55 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts @@ -208,6 +208,15 @@ export type LogsRowStructuredFragment_AssetCheckEvaluationPlannedEvent = { stepKey: string | null; }; +export type LogsRowStructuredFragment_AssetMaterializationFailureEvent = { + __typename: 'AssetMaterializationFailureEvent'; + message: string; + eventType: Types.DagsterEventType | null; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; +}; + export type LogsRowStructuredFragment_AssetMaterializationPlannedEvent = { __typename: 'AssetMaterializationPlannedEvent'; message: string; @@ -3010,6 +3019,7 @@ export type LogsRowStructuredFragment = | LogsRowStructuredFragment_AlertSuccessEvent | LogsRowStructuredFragment_AssetCheckEvaluationEvent | LogsRowStructuredFragment_AssetCheckEvaluationPlannedEvent + | LogsRowStructuredFragment_AssetMaterializationFailureEvent | LogsRowStructuredFragment_AssetMaterializationPlannedEvent | LogsRowStructuredFragment_EngineEvent | LogsRowStructuredFragment_ExecutionStepFailureEvent @@ -3085,6 +3095,14 @@ export type LogsRowUnstructuredFragment_AssetCheckEvaluationPlannedEvent = { stepKey: string | null; }; +export type LogsRowUnstructuredFragment_AssetMaterializationFailureEvent = { + __typename: 'AssetMaterializationFailureEvent'; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; +}; + export type LogsRowUnstructuredFragment_AssetMaterializationPlannedEvent = { __typename: 'AssetMaterializationPlannedEvent'; message: string; @@ -3363,6 +3381,7 @@ export type LogsRowUnstructuredFragment = | LogsRowUnstructuredFragment_AlertSuccessEvent | LogsRowUnstructuredFragment_AssetCheckEvaluationEvent | LogsRowUnstructuredFragment_AssetCheckEvaluationPlannedEvent + | LogsRowUnstructuredFragment_AssetMaterializationFailureEvent | LogsRowUnstructuredFragment_AssetMaterializationPlannedEvent | LogsRowUnstructuredFragment_EngineEvent | LogsRowUnstructuredFragment_ExecutionStepFailureEvent diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts index 67474ff2cd23e..c431e0ac69c7e 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts @@ -208,6 +208,15 @@ export type LogsScrollingTableMessageFragment_AssetCheckEvaluationPlannedEvent = stepKey: string | null; }; +export type LogsScrollingTableMessageFragment_AssetMaterializationFailureEvent = { + __typename: 'AssetMaterializationFailureEvent'; + message: string; + eventType: Types.DagsterEventType | null; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; +}; + export type LogsScrollingTableMessageFragment_AssetMaterializationPlannedEvent = { __typename: 'AssetMaterializationPlannedEvent'; message: string; @@ -3010,6 +3019,7 @@ export type LogsScrollingTableMessageFragment = | LogsScrollingTableMessageFragment_AlertSuccessEvent | LogsScrollingTableMessageFragment_AssetCheckEvaluationEvent | LogsScrollingTableMessageFragment_AssetCheckEvaluationPlannedEvent + | LogsScrollingTableMessageFragment_AssetMaterializationFailureEvent | LogsScrollingTableMessageFragment_AssetMaterializationPlannedEvent | LogsScrollingTableMessageFragment_EngineEvent | LogsScrollingTableMessageFragment_ExecutionStepFailureEvent diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts index 4e9d85715451c..1cf0eff291f1c 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts @@ -265,6 +265,15 @@ export type RunDagsterRunEventFragment_AssetCheckEvaluationPlannedEvent = { eventType: Types.DagsterEventType | null; }; +export type RunDagsterRunEventFragment_AssetMaterializationFailureEvent = { + __typename: 'AssetMaterializationFailureEvent'; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; +}; + export type RunDagsterRunEventFragment_AssetMaterializationPlannedEvent = { __typename: 'AssetMaterializationPlannedEvent'; message: string; @@ -3068,6 +3077,7 @@ export type RunDagsterRunEventFragment = | RunDagsterRunEventFragment_AlertSuccessEvent | RunDagsterRunEventFragment_AssetCheckEvaluationEvent | RunDagsterRunEventFragment_AssetCheckEvaluationPlannedEvent + | RunDagsterRunEventFragment_AssetMaterializationFailureEvent | RunDagsterRunEventFragment_AssetMaterializationPlannedEvent | RunDagsterRunEventFragment_EngineEvent | RunDagsterRunEventFragment_ExecutionStepFailureEvent diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts index cfa1418a0d4d3..d69e8151ba728 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts @@ -37,6 +37,13 @@ export type RunMetadataProviderMessageFragment_AssetCheckEvaluationPlannedEvent stepKey: string | null; }; +export type RunMetadataProviderMessageFragment_AssetMaterializationFailureEvent = { + __typename: 'AssetMaterializationFailureEvent'; + message: string; + timestamp: string; + stepKey: string | null; +}; + export type RunMetadataProviderMessageFragment_AssetMaterializationPlannedEvent = { __typename: 'AssetMaterializationPlannedEvent'; message: string; @@ -452,6 +459,7 @@ export type RunMetadataProviderMessageFragment = | RunMetadataProviderMessageFragment_AlertSuccessEvent | RunMetadataProviderMessageFragment_AssetCheckEvaluationEvent | RunMetadataProviderMessageFragment_AssetCheckEvaluationPlannedEvent + | RunMetadataProviderMessageFragment_AssetMaterializationFailureEvent | RunMetadataProviderMessageFragment_AssetMaterializationPlannedEvent | RunMetadataProviderMessageFragment_EngineEvent | RunMetadataProviderMessageFragment_ExecutionStepFailureEvent diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/events.py b/python_modules/dagster-graphql/dagster_graphql/implementation/events.py index 077d4f8e2141c..10fff39b56474 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/events.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/events.py @@ -222,6 +222,7 @@ def from_dagster_event_record(event_record: EventLogEntry, pipeline_name: str) - GrapheneAlertSuccessEvent, GrapheneAssetCheckEvaluationEvent, GrapheneAssetCheckEvaluationPlannedEvent, + GrapheneAssetMaterializationFailureEvent, GrapheneAssetMaterializationPlannedEvent, GrapheneEngineEvent, GrapheneExecutionStepFailureEvent, @@ -303,6 +304,8 @@ def from_dagster_event_record(event_record: EventLogEntry, pipeline_name: str) - ) elif dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return GrapheneAssetMaterializationPlannedEvent(event=event_record) + elif dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION_FAILURE: + return GrapheneAssetMaterializationFailureEvent(event=event_record) elif dagster_event.event_type == DagsterEventType.STEP_EXPECTATION_RESULT: data = cast(StepExpectationResultData, dagster_event.event_specific_data) return GrapheneStepExpectationResultEvent( diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index 3e553dc5c617d..d0fd05bfc9a44 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -166,7 +166,7 @@ def get_run_ids( def _get_latest_planned_run_id(instance: DagsterInstance, asset_record: AssetRecord): - if instance.event_log_storage.asset_records_have_last_planned_materialization_storage_id: + if instance.event_log_storage.asset_records_have_planned_and_failed_materializations: return asset_record.asset_entry.last_planned_materialization_run_id else: planned_info = instance.get_latest_planned_materialization_info( diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py b/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py index 497719226efdb..ad26ed9a1cd9d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py @@ -405,6 +405,20 @@ def __init__(self, event): ) +class GrapheneAssetMaterializationFailureEvent(graphene.ObjectType): + assetKey = graphene.Field(GrapheneAssetKey) + + class Meta: + name = "AssetMaterializationFailureEvent" + interfaces = (GrapheneMessageEvent, GrapheneRunEvent) + + def __init__(self, event: EventLogEntry): + self._event = event + + def resolve_assetKey(self, _graphene_info: ResolveInfo): + return self._event.get_dagster_event().asset_materialization_failure_data.asset_key + + class GrapheneAssetMaterializationPlannedEvent(graphene.ObjectType): assetKey = graphene.Field(GrapheneAssetKey) runOrError = graphene.NonNull("dagster_graphql.schema.pipelines.pipeline.GrapheneRunOrError") @@ -586,6 +600,7 @@ class Meta: GrapheneAlertSuccessEvent, GrapheneAlertFailureEvent, GrapheneAssetMaterializationPlannedEvent, + GrapheneAssetMaterializationFailureEvent, GrapheneAssetCheckEvaluationPlannedEvent, GrapheneAssetCheckEvaluationEvent, ) diff --git a/python_modules/dagster/dagster/_core/event_api.py b/python_modules/dagster/dagster/_core/event_api.py index c964603a5f230..bf88549a254cf 100644 --- a/python_modules/dagster/dagster/_core/event_api.py +++ b/python_modules/dagster/dagster/_core/event_api.py @@ -85,6 +85,7 @@ class RunShardedEventsCursor(NamedTuple): DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, DagsterEventType.ASSET_MATERIALIZATION_PLANNED, + DagsterEventType.ASSET_MATERIALIZATION_FAILURE, ] EventCursor: TypeAlias = Union[int, RunShardedEventsCursor] diff --git a/python_modules/dagster/dagster/_core/events/__init__.py b/python_modules/dagster/dagster/_core/events/__init__.py index c632e2bce12ab..6f42078b5851b 100644 --- a/python_modules/dagster/dagster/_core/events/__init__.py +++ b/python_modules/dagster/dagster/_core/events/__init__.py @@ -81,6 +81,7 @@ "AssetMaterializationPlannedData", "AssetCheckEvaluation", "AssetCheckEvaluationPlanned", + "AssetMaterializationFailureData", ] @@ -115,6 +116,8 @@ class DagsterEventType(str, Enum): ASSET_CHECK_EVALUATION_PLANNED = "ASSET_CHECK_EVALUATION_PLANNED" ASSET_CHECK_EVALUATION = "ASSET_CHECK_EVALUATION" + ASSET_MATERIALIZATION_FAILURE = "ASSET_MATERIALIZATION_FAILURE" + # We want to display RUN_* events in the Dagster UI and in our LogManager output, but in order to # support backcompat for our storage layer, we need to keep the persisted value to be strings # of the form "PIPELINE_*". We may have user code that pass in the DagsterEventType @@ -241,12 +244,14 @@ class DagsterEventType(str, Enum): BATCH_WRITABLE_EVENTS = { DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, + DagsterEventType.ASSET_MATERIALIZATION_FAILURE, } ASSET_EVENTS = { DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, DagsterEventType.ASSET_MATERIALIZATION_PLANNED, + DagsterEventType.ASSET_MATERIALIZATION_FAILURE, } ASSET_CHECK_EVENTS = { @@ -717,6 +722,8 @@ def asset_key(self) -> Optional[AssetKey]: return self.asset_observation_data.asset_observation.asset_key elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return self.asset_materialization_planned_data.asset_key + elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_FAILURE: + return self.asset_materialization_failure_data.asset_key else: return None @@ -733,6 +740,8 @@ def partition(self) -> Optional[str]: return self.asset_observation_data.asset_observation.partition elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return self.asset_materialization_planned_data.partition + elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_FAILURE: + return self.asset_materialization_failure_data.partition else: return None @@ -788,6 +797,15 @@ def asset_materialization_planned_data(self) -> "AssetMaterializationPlannedData ) return cast(AssetMaterializationPlannedData, self.event_specific_data) + @property + def asset_materialization_failure_data(self) -> "AssetMaterializationFailureData": + _assert_type( + "asset_materialization_failure", + DagsterEventType.ASSET_MATERIALIZATION_FAILURE, + self.event_type, + ) + return cast(AssetMaterializationFailureData, self.event_specific_data) + @property def asset_check_planned_data(self) -> "AssetCheckEvaluationPlanned": _assert_type( @@ -967,6 +985,20 @@ def step_start_event(step_context: IStepContext) -> "DagsterEvent": message=f'Started execution of step "{step_context.step.key}".', ) + @staticmethod + def build_asset_materialization_failure_event( + job_name: str, + step_key: str, + asset_materialization_failure_data: "AssetMaterializationFailureData", + ) -> "DagsterEvent": + return DagsterEvent( + event_type_value=DagsterEventType.ASSET_MATERIALIZATION_FAILURE.value, + job_name=job_name, + message="", + event_specific_data=asset_materialization_failure_data, + step_key=step_key, + ) + @staticmethod def step_restarted_event(step_context: IStepContext, previous_attempts: int) -> "DagsterEvent": return DagsterEvent.from_step( @@ -1546,6 +1578,31 @@ def __new__(cls, asset_observation: AssetObservation): ) +@whitelist_for_serdes +class AssetMaterializationFailureData( + NamedTuple( + "_AssetMaterializationFailureData", + [ + ("asset_key", AssetKey), + ("partition", Optional[str]), + ("error", Optional[SerializableErrorInfo]), + ], + ) +): + def __new__( + cls, + asset_key: AssetKey, + partition: Optional[str], + error: Optional[SerializableErrorInfo] = None, + ): + return super(AssetMaterializationFailureData, cls).__new__( + cls, + asset_key=check.inst_param(asset_key, "asset_key", AssetKey), + partition=check.opt_str_param(partition, "partition"), + error=check.opt_inst_param(error, "error", SerializableErrorInfo), + ) + + @whitelist_for_serdes class StepMaterializationData( NamedTuple( diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index ab9a797043fdf..1ea822e1cdc0b 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -2493,6 +2493,7 @@ def report_dagster_event( dagster_event: "DagsterEvent", run_id: str, log_level: Union[str, int] = logging.INFO, + batch_metadata: Optional["DagsterEventBatchMetadata"] = None, ) -> None: """Takes a DagsterEvent and stores it in persistent storage for the corresponding DagsterRun.""" from dagster._core.events.log import EventLogEntry @@ -2507,7 +2508,7 @@ def report_dagster_event( step_key=dagster_event.step_key, dagster_event=dagster_event, ) - self.handle_new_event(event_record) + self.handle_new_event(event_record, batch_metadata=batch_metadata) def report_run_canceling(self, run: DagsterRun, message: Optional[str] = None): from dagster._core.events import DagsterEvent, DagsterEventType diff --git a/python_modules/dagster/dagster/_core/storage/event_log/base.py b/python_modules/dagster/dagster/_core/storage/event_log/base.py index 0e7694f6d42a4..d7daa33845111 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/base.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/base.py @@ -60,11 +60,13 @@ class AssetEntry( ("last_run_id", Optional[str]), ("asset_details", Optional[AssetDetails]), ("cached_status", Optional["AssetStatusCacheValue"]), - # This is an optional field which can be used for more performant last observation + # The below are optional fields which can be used for more performant # queries if the underlying storage supports it ("last_observation_record", Optional[EventLogRecord]), ("last_planned_materialization_storage_id", Optional[int]), ("last_planned_materialization_run_id", Optional[str]), + ("last_materialization_failure_storage_id", Optional[int]), + ("last_materialization_failure_run_id", Optional[str]), ], ) ): @@ -78,6 +80,8 @@ def __new__( last_observation_record: Optional[EventLogRecord] = None, last_planned_materialization_storage_id: Optional[int] = None, last_planned_materialization_run_id: Optional[str] = None, + last_materialization_failure_storage_id: Optional[int] = None, + last_materialization_failure_run_id: Optional[str] = None, ): from dagster._core.storage.partition_status_cache import AssetStatusCacheValue @@ -103,6 +107,12 @@ def __new__( last_planned_materialization_run_id, "last_planned_materialization_run_id", ), + last_materialization_failure_storage_id=check.opt_int_param( + last_materialization_failure_storage_id, "last_materialization_failure_storage_id" + ), + last_materialization_failure_run_id=check.opt_str_param( + last_materialization_failure_run_id, "last_materialization_failure_run_id" + ), ) @property @@ -331,7 +341,7 @@ def get_asset_check_summary_records( pass @property - def asset_records_have_last_planned_materialization_storage_id(self) -> bool: + def asset_records_have_planned_and_failed_materializations(self) -> bool: return False @abstractmethod diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py index ade3fbeeab4c9..7cf46dcf2e971 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py @@ -248,6 +248,10 @@ def store_asset_event(self, event: EventLogEntry, event_id: int): # https://github.com/dagster-io/dagster/issues/3945 values = self._get_asset_entry_values(event, event_id, self.has_asset_key_index_cols()) + + if not values: + return + insert_statement = AssetKeyTable.insert().values( asset_key=event.dagster_event.asset_key.to_string(), **values ) diff --git a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py index 1e924b23a1b84..d14e109bc44f8 100644 --- a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py +++ b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py @@ -226,7 +226,7 @@ def get_validated_partition_keys( def get_last_planned_storage_id( instance: DagsterInstance, asset_key: AssetKey, asset_record: Optional["AssetRecord"] ) -> int: - if instance.event_log_storage.asset_records_have_last_planned_materialization_storage_id: + if instance.event_log_storage.asset_records_have_planned_and_failed_materializations: return ( (asset_record.asset_entry.last_planned_materialization_storage_id or 0) if asset_record diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index 760a7ccf2c247..56cb84a07228a 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -5,6 +5,7 @@ import string import sys import time +from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from contextlib import ExitStack, contextmanager from typing import List, Optional, Sequence, Tuple, cast @@ -67,13 +68,16 @@ from dagster._core.event_api import EventLogCursor, EventRecordsResult, RunStatusChangeRecordsFilter from dagster._core.events import ( EVENT_TYPE_TO_PIPELINE_RUN_STATUS, + AssetMaterializationFailureData, AssetMaterializationPlannedData, AssetObservationData, DagsterEvent, + DagsterEventBatchMetadata, DagsterEventType, EngineEventData, StepExpectationResultData, StepMaterializationData, + generate_event_batch_id, ) from dagster._core.events.log import EventLogEntry, construct_event_logger from dagster._core.execution.api import execute_run @@ -2743,6 +2747,53 @@ def _store_partition_event(asset_key, partition) -> int: latest_storage_ids["p1"] = _store_partition_event(a, "p1") _assert_storage_matches(latest_storage_ids) + def test_batch_write_asset_materialization_failures(self, storage, instance): + a = AssetKey(["a"]) + run_id = make_new_run_id() + + partitions = list(string.ascii_uppercase) + + failed_partitions = {"step_a": set(partitions[:10]), "step_b": set(partitions[10:15])} + + asset_materialization_failure_events: List[List[DagsterEvent]] = [ + [ + DagsterEvent.build_asset_materialization_failure_event( + job_name="my_fake_job", + step_key=step_key, + asset_materialization_failure_data=AssetMaterializationFailureData( + asset_key=a, partition=partition, error=None + ), + ) + for partition in partitions + ] + for step_key, partitions in failed_partitions.items() + ] + + asset_materialization_failure_events = [ + event for events in asset_materialization_failure_events for event in events + ] + + batch_id = generate_event_batch_id() + last_index = len(asset_materialization_failure_events) - 1 + + for i, asset_event in enumerate(asset_materialization_failure_events): + batch_metadata = DagsterEventBatchMetadata(batch_id, i == last_index) + instance.report_dagster_event(asset_event, run_id, batch_metadata=batch_metadata) + + failure_records = instance.get_records_for_run( + run_id=run_id, of_type=DagsterEventType.ASSET_MATERIALIZATION_FAILURE + ).records + + assert len(failure_records) == 15 + + stored_partitions_by_step_key = defaultdict(set) + for record in failure_records: + stored_partitions_by_step_key[record.event_log_entry.step_key].add( + record.event_log_entry.dagster_event.asset_materialization_failure_data.partition + ) + + assert stored_partitions_by_step_key == failed_partitions + @pytest.mark.parametrize( "dagster_event_type", [DagsterEventType.ASSET_OBSERVATION, DagsterEventType.ASSET_MATERIALIZATION], @@ -4006,7 +4057,7 @@ def second_asset(my_asset): ascending=False, ).records[0] - if storage.asset_records_have_last_planned_materialization_storage_id: + if storage.asset_records_have_planned_and_failed_materializations: assert ( asset_entry.last_planned_materialization_storage_id == materialization_planned_record.storage_id From f06448d501f719bb64f352324b4befddf8b420a6 Mon Sep 17 00:00:00 2001 From: Daniel Gibson Date: Wed, 14 Aug 2024 14:57:25 -0400 Subject: [PATCH 2/2] new event types --- .../types/AssetRunLogObserver.types.ts | 3 +- .../src/graphql/possibleTypes.generated.json | 2 +- .../ui-core/src/graphql/schema.graphql | 20 ++- .../packages/ui-core/src/graphql/types.ts | 145 ++++++++++++------ .../src/runs/LogsRowStructuredContent.tsx | 3 +- .../packages/ui-core/src/runs/filterLogs.tsx | 3 +- .../src/runs/types/LogsProvider.types.ts | 81 ++++++---- .../ui-core/src/runs/types/LogsRow.types.ts | 57 ++++--- ...LogsScrollingTableMessageFragment.types.ts | 30 ++-- .../src/runs/types/RunFragments.types.ts | 30 ++-- .../runs/types/RunMetadataProvider.types.ts | 24 ++- .../dagster_graphql/implementation/events.py | 9 +- .../implementation/fetch_runs.py | 2 +- .../dagster_graphql/schema/logs/events.py | 23 ++- .../dagster/dagster/_core/event_api.py | 3 +- .../dagster/dagster/_core/events/__init__.py | 108 ++++++++++--- .../dagster/_core/storage/event_log/base.py | 32 ++-- .../_core/storage/partition_status_cache.py | 2 +- .../storage_tests/utils/event_log_storage.py | 69 +++++++-- 19 files changed, 465 insertions(+), 181 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/types/AssetRunLogObserver.types.ts b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/types/AssetRunLogObserver.types.ts index 1c3feb62a7e04..7ca16f88fedbd 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/types/AssetRunLogObserver.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/types/AssetRunLogObserver.types.ts @@ -24,7 +24,6 @@ export type AssetLiveRunLogsSubscription = { }; } | {__typename: 'AssetCheckEvaluationPlannedEvent'} - | {__typename: 'AssetMaterializationFailureEvent'} | { __typename: 'AssetMaterializationPlannedEvent'; assetKey: {__typename: 'AssetKey'; path: Array} | null; @@ -54,6 +53,8 @@ export type AssetLiveRunLogsSubscription = { __typename: 'ObservationEvent'; assetKey: {__typename: 'AssetKey'; path: Array} | null; } + | {__typename: 'PlannedAssetMaterializationFailureEvent'} + | {__typename: 'PlannedAssetMaterializationSkippedEvent'} | {__typename: 'ResourceInitFailureEvent'} | {__typename: 'ResourceInitStartedEvent'} | {__typename: 'ResourceInitSuccessEvent'} diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json index 1a546c8f5f932..23cb4505ef200 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json @@ -1 +1 @@ -{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunCanceledEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetOwner":["UserAssetOwner","TeamAssetOwner"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"Instigator":["Schedule","Sensor"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"RunsFeedEntry":["Run","PartitionBackfill"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","UnauthorizedError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError","AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","UnsupportedOperationError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","RepositoryNotFoundError","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"WorkspaceLocationEntryOrError":["WorkspaceLocationEntry","PythonError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableColumnLineageMetadataEntry","TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","JobMetadataEntry","CodeReferencesMetadataEntry","NullMetadataEntry","TimestampMetadataEntry"],"SourceLocation":["LocalFileCodeReference","UrlCodeReference"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunsFeedConnectionOrError":["RunsFeedConnection","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"AssetConditionEvaluationNode":["UnpartitionedAssetConditionEvaluationNode","PartitionedAssetConditionEvaluationNode","SpecificPartitionAssetConditionEvaluationNode"],"AssetConditionEvaluationRecordsOrError":["AssetConditionEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","UnsupportedOperationError","AssetWipeSuccess"],"ReportRunlessAssetEventsResult":["UnauthorizedError","PythonError","ReportRunlessAssetEventsSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"],"DeleteDynamicPartitionsResult":["DeleteDynamicPartitionsSuccess","UnauthorizedError","PythonError"]} \ No newline at end of file +{"DisplayableEvent":["EngineEvent","ExecutionStepOutputEvent","ExpectationResult","FailureMetadata","HandledOutputEvent","LoadedInputEvent","ObjectStoreOperationResult","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","TypeCheck"],"MarkerEvent":["EngineEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent"],"ErrorEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepUpForRetryEvent","HookErroredEvent","RunCanceledEvent","RunFailureEvent","ResourceInitFailureEvent"],"MessageEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","LogMessageEvent","ObjectStoreOperationEvent","RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetMaterializationPlannedEvent","LogsCapturedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","PlannedAssetMaterializationFailureEvent","PlannedAssetMaterializationSkippedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"RunEvent":["RunCanceledEvent","RunCancelingEvent","RunDequeuedEvent","RunEnqueuedEvent","RunFailureEvent","RunStartEvent","RunStartingEvent","RunSuccessEvent","AssetMaterializationPlannedEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","PlannedAssetMaterializationFailureEvent","PlannedAssetMaterializationSkippedEvent"],"PipelineRunStepStats":["RunStepStats"],"StepEvent":["EngineEvent","ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepRestartEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","HandledOutputEvent","HookCompletedEvent","HookErroredEvent","HookSkippedEvent","LoadedInputEvent","ObjectStoreOperationEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","StepExpectationResultEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","MaterializationEvent","ObservationEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"AssetOwner":["UserAssetOwner","TeamAssetOwner"],"AssetPartitionStatuses":["DefaultPartitionStatuses","MultiPartitionStatuses","TimePartitionStatuses"],"PartitionStatus1D":["TimePartitionStatuses","DefaultPartitionStatuses"],"AssetChecksOrError":["AssetChecks","AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError"],"Instigator":["Schedule","Sensor"],"EvaluationStackEntry":["EvaluationStackListItemEntry","EvaluationStackPathEntry","EvaluationStackMapKeyEntry","EvaluationStackMapValueEntry"],"IPipelineSnapshot":["Pipeline","PipelineSnapshot","Job"],"PipelineConfigValidationError":["FieldNotDefinedConfigError","FieldsNotDefinedConfigError","MissingFieldConfigError","MissingFieldsConfigError","RuntimeMismatchConfigError","SelectorTypeConfigError"],"PipelineConfigValidationInvalid":["RunConfigValidationInvalid"],"PipelineConfigValidationResult":["InvalidSubsetError","PipelineConfigValidationValid","RunConfigValidationInvalid","PipelineNotFoundError","PythonError"],"PipelineReference":["PipelineSnapshot","UnknownPipeline"],"PipelineRun":["Run"],"DagsterRunEvent":["ExecutionStepFailureEvent","ExecutionStepInputEvent","ExecutionStepOutputEvent","ExecutionStepSkippedEvent","ExecutionStepStartEvent","ExecutionStepSuccessEvent","ExecutionStepUpForRetryEvent","ExecutionStepRestartEvent","LogMessageEvent","ResourceInitFailureEvent","ResourceInitStartedEvent","ResourceInitSuccessEvent","RunFailureEvent","RunStartEvent","RunEnqueuedEvent","RunDequeuedEvent","RunStartingEvent","RunCancelingEvent","RunCanceledEvent","RunSuccessEvent","StepWorkerStartedEvent","StepWorkerStartingEvent","HandledOutputEvent","LoadedInputEvent","LogsCapturedEvent","ObjectStoreOperationEvent","StepExpectationResultEvent","MaterializationEvent","ObservationEvent","EngineEvent","HookCompletedEvent","HookSkippedEvent","HookErroredEvent","AlertStartEvent","AlertSuccessEvent","AlertFailureEvent","AssetMaterializationPlannedEvent","PlannedAssetMaterializationFailureEvent","PlannedAssetMaterializationSkippedEvent","AssetCheckEvaluationPlannedEvent","AssetCheckEvaluationEvent"],"PipelineRunLogsSubscriptionPayload":["PipelineRunLogsSubscriptionSuccess","PipelineRunLogsSubscriptionFailure"],"RunOrError":["Run","RunNotFoundError","PythonError"],"PipelineRunStatsSnapshot":["RunStatsSnapshot"],"RunStatsSnapshotOrError":["RunStatsSnapshot","PythonError"],"PipelineSnapshotOrError":["PipelineNotFoundError","PipelineSnapshot","PipelineSnapshotNotFoundError","PythonError"],"RunsFeedEntry":["Run","PartitionBackfill"],"AssetOrError":["Asset","AssetNotFoundError"],"AssetsOrError":["AssetConnection","PythonError"],"DeletePipelineRunResult":["DeletePipelineRunSuccess","UnauthorizedError","PythonError","RunNotFoundError"],"ExecutionPlanOrError":["ExecutionPlan","RunConfigValidationInvalid","PipelineNotFoundError","InvalidSubsetError","PythonError"],"PipelineOrError":["Pipeline","PipelineNotFoundError","InvalidSubsetError","PythonError"],"ReloadRepositoryLocationMutationResult":["WorkspaceLocationEntry","ReloadNotSupported","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"RepositoryLocationOrLoadError":["RepositoryLocation","PythonError"],"ReloadWorkspaceMutationResult":["Workspace","UnauthorizedError","PythonError"],"ShutdownRepositoryLocationMutationResult":["ShutdownRepositoryLocationSuccess","RepositoryLocationNotFound","UnauthorizedError","PythonError"],"TerminatePipelineExecutionFailure":["TerminateRunFailure"],"TerminatePipelineExecutionSuccess":["TerminateRunSuccess"],"TerminateRunResult":["TerminateRunSuccess","TerminateRunFailure","RunNotFoundError","UnauthorizedError","PythonError"],"ScheduleMutationResult":["PythonError","UnauthorizedError","ScheduleStateResult"],"ScheduleOrError":["Schedule","ScheduleNotFoundError","PythonError"],"SchedulerOrError":["Scheduler","SchedulerNotDefinedError","PythonError"],"SchedulesOrError":["Schedules","RepositoryNotFoundError","PythonError"],"ScheduleTickSpecificData":["ScheduleTickSuccessData","ScheduleTickFailureData"],"LaunchBackfillResult":["LaunchBackfillSuccess","PartitionSetNotFoundError","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"ConfigTypeOrError":["EnumConfigType","CompositeConfigType","RegularConfigType","PipelineNotFoundError","ConfigTypeNotFoundError","PythonError"],"ConfigType":["ArrayConfigType","CompositeConfigType","EnumConfigType","NullableConfigType","RegularConfigType","ScalarUnionConfigType","MapConfigType"],"WrappingConfigType":["ArrayConfigType","NullableConfigType"],"DagsterType":["ListDagsterType","NullableDagsterType","RegularDagsterType"],"DagsterTypeOrError":["RegularDagsterType","PipelineNotFoundError","DagsterTypeNotFoundError","PythonError"],"WrappingDagsterType":["ListDagsterType","NullableDagsterType"],"Error":["AssetCheckNeedsMigrationError","AssetCheckNeedsUserCodeUpgrade","AssetCheckNeedsAgentUpgradeError","AssetNotFoundError","ConflictingExecutionParamsError","ConfigTypeNotFoundError","DagsterTypeNotFoundError","InvalidPipelineRunsFilterError","InvalidSubsetError","ModeNotFoundError","NoModeProvidedError","PartitionSetNotFoundError","PipelineNotFoundError","RunConflict","PipelineSnapshotNotFoundError","PresetNotFoundError","PythonError","ErrorChainLink","UnauthorizedError","ReloadNotSupported","RepositoryLocationNotFound","RepositoryNotFoundError","ResourceNotFoundError","RunGroupNotFoundError","RunNotFoundError","ScheduleNotFoundError","SchedulerNotDefinedError","SensorNotFoundError","UnsupportedOperationError","DuplicateDynamicPartitionError","InstigationStateNotFoundError","SolidStepStatusUnavailableError","GraphNotFoundError","BackfillNotFoundError","PartitionSubsetDeserializationError","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PipelineRunConflict":["RunConflict"],"PipelineRunNotFoundError":["RunNotFoundError"],"RepositoriesOrError":["RepositoryConnection","RepositoryNotFoundError","PythonError"],"RepositoryOrError":["PythonError","Repository","RepositoryNotFoundError"],"WorkspaceLocationEntryOrError":["WorkspaceLocationEntry","PythonError"],"InstigationTypeSpecificData":["SensorData","ScheduleData"],"InstigationStateOrError":["InstigationState","InstigationStateNotFoundError","PythonError"],"InstigationStatesOrError":["InstigationStates","PythonError"],"MetadataEntry":["TableColumnLineageMetadataEntry","TableSchemaMetadataEntry","TableMetadataEntry","FloatMetadataEntry","IntMetadataEntry","JsonMetadataEntry","BoolMetadataEntry","MarkdownMetadataEntry","PathMetadataEntry","NotebookMetadataEntry","PythonArtifactMetadataEntry","TextMetadataEntry","UrlMetadataEntry","PipelineRunMetadataEntry","AssetMetadataEntry","JobMetadataEntry","CodeReferencesMetadataEntry","NullMetadataEntry","TimestampMetadataEntry"],"SourceLocation":["LocalFileCodeReference","UrlCodeReference"],"PartitionRunConfigOrError":["PartitionRunConfig","PythonError"],"AssetBackfillStatus":["AssetPartitionsStatusCounts","UnpartitionedAssetStatus"],"PartitionSetOrError":["PartitionSet","PartitionSetNotFoundError","PythonError"],"PartitionSetsOrError":["PartitionSets","PipelineNotFoundError","PythonError"],"PartitionsOrError":["Partitions","PythonError"],"PartitionStatusesOrError":["PartitionStatuses","PythonError"],"PartitionTagsOrError":["PartitionTags","PythonError"],"RunConfigSchemaOrError":["RunConfigSchema","PipelineNotFoundError","InvalidSubsetError","ModeNotFoundError","PythonError"],"LaunchRunResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchRunReexecutionResult":["LaunchRunSuccess","InvalidStepError","InvalidOutputError","RunConfigValidationInvalid","PipelineNotFoundError","RunConflict","UnauthorizedError","PythonError","InvalidSubsetError","PresetNotFoundError","ConflictingExecutionParamsError","NoModeProvidedError"],"LaunchPipelineRunSuccess":["LaunchRunSuccess"],"RunsOrError":["Runs","InvalidPipelineRunsFilterError","PythonError"],"PipelineRuns":["Runs"],"RunGroupOrError":["RunGroup","RunGroupNotFoundError","PythonError"],"SensorOrError":["Sensor","SensorNotFoundError","UnauthorizedError","PythonError"],"SensorsOrError":["Sensors","RepositoryNotFoundError","PythonError"],"StopSensorMutationResultOrError":["StopSensorMutationResult","UnauthorizedError","PythonError"],"ISolidDefinition":["CompositeSolidDefinition","SolidDefinition"],"SolidContainer":["Pipeline","PipelineSnapshot","Job","CompositeSolidDefinition","Graph"],"SolidStepStatsOrError":["SolidStepStatsConnection","SolidStepStatusUnavailableError"],"WorkspaceOrError":["Workspace","PythonError"],"WorkspaceLocationStatusEntriesOrError":["WorkspaceLocationStatusEntries","PythonError"],"GraphOrError":["Graph","GraphNotFoundError","PythonError"],"ResourceDetailsOrError":["ResourceDetails","ResourceNotFoundError","PythonError"],"ResourcesOrError":["ResourceDetailsList","RepositoryNotFoundError","PythonError"],"EnvVarWithConsumersOrError":["EnvVarWithConsumersList","PythonError"],"RunsFeedConnectionOrError":["RunsFeedConnection","PythonError"],"RunTagKeysOrError":["PythonError","RunTagKeys"],"RunTagsOrError":["PythonError","RunTags"],"RunIdsOrError":["RunIds","InvalidPipelineRunsFilterError","PythonError"],"AssetNodeOrError":["AssetNode","AssetNotFoundError"],"PartitionBackfillOrError":["PartitionBackfill","BackfillNotFoundError","PythonError"],"PartitionBackfillsOrError":["PartitionBackfills","PythonError"],"EventConnectionOrError":["EventConnection","RunNotFoundError","PythonError"],"AutoMaterializeAssetEvaluationRecordsOrError":["AutoMaterializeAssetEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"PartitionKeysOrError":["PartitionKeys","PartitionSubsetDeserializationError"],"AutoMaterializeRuleEvaluationData":["TextRuleEvaluationData","ParentMaterializedRuleEvaluationData","WaitingOnKeysRuleEvaluationData"],"AssetConditionEvaluationNode":["UnpartitionedAssetConditionEvaluationNode","PartitionedAssetConditionEvaluationNode","SpecificPartitionAssetConditionEvaluationNode"],"AssetConditionEvaluationRecordsOrError":["AssetConditionEvaluationRecords","AutoMaterializeAssetEvaluationNeedsMigrationError"],"SensorDryRunResult":["PythonError","SensorNotFoundError","DryRunInstigationTick"],"ScheduleDryRunResult":["DryRunInstigationTick","PythonError","ScheduleNotFoundError"],"TerminateRunsResultOrError":["TerminateRunsResult","PythonError"],"AssetWipeMutationResult":["AssetNotFoundError","UnauthorizedError","PythonError","UnsupportedOperationError","AssetWipeSuccess"],"ReportRunlessAssetEventsResult":["UnauthorizedError","PythonError","ReportRunlessAssetEventsSuccess"],"ResumeBackfillResult":["ResumeBackfillSuccess","UnauthorizedError","PythonError"],"CancelBackfillResult":["CancelBackfillSuccess","UnauthorizedError","PythonError"],"LogTelemetryMutationResult":["LogTelemetrySuccess","PythonError"],"AddDynamicPartitionResult":["AddDynamicPartitionSuccess","UnauthorizedError","PythonError","DuplicateDynamicPartitionError"],"DeleteDynamicPartitionsResult":["DeleteDynamicPartitionsSuccess","UnauthorizedError","PythonError"]} \ No newline at end of file diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 36d9e412e1a05..91a75533a2953 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -45,11 +45,12 @@ enum DagsterEventType { STEP_RESTARTED ASSET_MATERIALIZATION ASSET_MATERIALIZATION_PLANNED + PLANNED_ASSET_MATERIALIZATION_FAILURE + PLANNED_ASSET_MATERIALIZATION_SKIPPED ASSET_OBSERVATION STEP_EXPECTATION_RESULT ASSET_CHECK_EVALUATION_PLANNED ASSET_CHECK_EVALUATION - ASSET_MATERIALIZATION_FAILURE RUN_ENQUEUED RUN_DEQUEUED RUN_STARTING @@ -1197,7 +1198,8 @@ union DagsterRunEvent = | AlertSuccessEvent | AlertFailureEvent | AssetMaterializationPlannedEvent - | AssetMaterializationFailureEvent + | PlannedAssetMaterializationFailureEvent + | PlannedAssetMaterializationSkippedEvent | AssetCheckEvaluationPlannedEvent | AssetCheckEvaluationEvent @@ -1251,7 +1253,19 @@ type AlertFailureEvent implements MessageEvent & RunEvent { pipelineName: String! } -type AssetMaterializationFailureEvent implements MessageEvent & RunEvent { +type PlannedAssetMaterializationFailureEvent implements MessageEvent & RunEvent { + runId: String! + message: String! + timestamp: String! + level: LogLevel! + stepKey: String + solidHandleID: String + eventType: DagsterEventType + pipelineName: String! + assetKey: AssetKey +} + +type PlannedAssetMaterializationSkippedEvent implements MessageEvent & RunEvent { runId: String! message: String! timestamp: String! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index f207223d844a1..4b6e6fb58a89d 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -392,20 +392,6 @@ export type AssetLineageInfo = { partitions: Array; }; -export type AssetMaterializationFailureEvent = MessageEvent & - RunEvent & { - __typename: 'AssetMaterializationFailureEvent'; - assetKey: Maybe; - eventType: Maybe; - level: LogLevel; - message: Scalars['String']['output']; - pipelineName: Scalars['String']['output']; - runId: Scalars['String']['output']; - solidHandleID: Maybe; - stepKey: Maybe; - timestamp: Scalars['String']['output']; - }; - export type AssetMaterializationPlannedEvent = MessageEvent & RunEvent & { __typename: 'AssetMaterializationPlannedEvent'; @@ -941,7 +927,6 @@ export enum DagsterEventType { ASSET_CHECK_EVALUATION = 'ASSET_CHECK_EVALUATION', ASSET_CHECK_EVALUATION_PLANNED = 'ASSET_CHECK_EVALUATION_PLANNED', ASSET_MATERIALIZATION = 'ASSET_MATERIALIZATION', - ASSET_MATERIALIZATION_FAILURE = 'ASSET_MATERIALIZATION_FAILURE', ASSET_MATERIALIZATION_PLANNED = 'ASSET_MATERIALIZATION_PLANNED', ASSET_OBSERVATION = 'ASSET_OBSERVATION', ASSET_STORE_OPERATION = 'ASSET_STORE_OPERATION', @@ -961,6 +946,8 @@ export enum DagsterEventType { PIPELINE_START = 'PIPELINE_START', PIPELINE_STARTING = 'PIPELINE_STARTING', PIPELINE_SUCCESS = 'PIPELINE_SUCCESS', + PLANNED_ASSET_MATERIALIZATION_FAILURE = 'PLANNED_ASSET_MATERIALIZATION_FAILURE', + PLANNED_ASSET_MATERIALIZATION_SKIPPED = 'PLANNED_ASSET_MATERIALIZATION_SKIPPED', RESOURCE_INIT_FAILURE = 'RESOURCE_INIT_FAILURE', RESOURCE_INIT_STARTED = 'RESOURCE_INIT_STARTED', RESOURCE_INIT_SUCCESS = 'RESOURCE_INIT_SUCCESS', @@ -997,7 +984,6 @@ export type DagsterRunEvent = | AlertSuccessEvent | AssetCheckEvaluationEvent | AssetCheckEvaluationPlannedEvent - | AssetMaterializationFailureEvent | AssetMaterializationPlannedEvent | EngineEvent | ExecutionStepFailureEvent @@ -1018,6 +1004,8 @@ export type DagsterRunEvent = | MaterializationEvent | ObjectStoreOperationEvent | ObservationEvent + | PlannedAssetMaterializationFailureEvent + | PlannedAssetMaterializationSkippedEvent | ResourceInitFailureEvent | ResourceInitStartedEvent | ResourceInitSuccessEvent @@ -3639,6 +3627,34 @@ export type PipelineTagAndValues = { values: Array; }; +export type PlannedAssetMaterializationFailureEvent = MessageEvent & + RunEvent & { + __typename: 'PlannedAssetMaterializationFailureEvent'; + assetKey: Maybe; + eventType: Maybe; + level: LogLevel; + message: Scalars['String']['output']; + pipelineName: Scalars['String']['output']; + runId: Scalars['String']['output']; + solidHandleID: Maybe; + stepKey: Maybe; + timestamp: Scalars['String']['output']; + }; + +export type PlannedAssetMaterializationSkippedEvent = MessageEvent & + RunEvent & { + __typename: 'PlannedAssetMaterializationSkippedEvent'; + assetKey: Maybe; + eventType: Maybe; + level: LogLevel; + message: Scalars['String']['output']; + pipelineName: Scalars['String']['output']; + runId: Scalars['String']['output']; + solidHandleID: Maybe; + stepKey: Maybe; + timestamp: Scalars['String']['output']; + }; + export type PresetNotFoundError = Error & { __typename: 'PresetNotFoundError'; message: Scalars['String']['output']; @@ -6319,36 +6335,6 @@ export const buildAssetLineageInfo = ( }; }; -export const buildAssetMaterializationFailureEvent = ( - overrides?: Partial, - _relationshipsToOmit: Set = new Set(), -): {__typename: 'AssetMaterializationFailureEvent'} & AssetMaterializationFailureEvent => { - const relationshipsToOmit: Set = new Set(_relationshipsToOmit); - relationshipsToOmit.add('AssetMaterializationFailureEvent'); - return { - __typename: 'AssetMaterializationFailureEvent', - assetKey: - overrides && overrides.hasOwnProperty('assetKey') - ? overrides.assetKey! - : relationshipsToOmit.has('AssetKey') - ? ({} as AssetKey) - : buildAssetKey({}, relationshipsToOmit), - eventType: - overrides && overrides.hasOwnProperty('eventType') - ? overrides.eventType! - : DagsterEventType.ALERT_FAILURE, - level: overrides && overrides.hasOwnProperty('level') ? overrides.level! : LogLevel.CRITICAL, - message: overrides && overrides.hasOwnProperty('message') ? overrides.message! : 'quia', - pipelineName: - overrides && overrides.hasOwnProperty('pipelineName') ? overrides.pipelineName! : 'et', - runId: overrides && overrides.hasOwnProperty('runId') ? overrides.runId! : 'et', - solidHandleID: - overrides && overrides.hasOwnProperty('solidHandleID') ? overrides.solidHandleID! : 'sequi', - stepKey: overrides && overrides.hasOwnProperty('stepKey') ? overrides.stepKey! : 'natus', - timestamp: overrides && overrides.hasOwnProperty('timestamp') ? overrides.timestamp! : 'rerum', - }; -}; - export const buildAssetMaterializationPlannedEvent = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -11724,6 +11710,73 @@ export const buildPipelineTagAndValues = ( }; }; +export const buildPlannedAssetMaterializationFailureEvent = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): { + __typename: 'PlannedAssetMaterializationFailureEvent'; +} & PlannedAssetMaterializationFailureEvent => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('PlannedAssetMaterializationFailureEvent'); + return { + __typename: 'PlannedAssetMaterializationFailureEvent', + assetKey: + overrides && overrides.hasOwnProperty('assetKey') + ? overrides.assetKey! + : relationshipsToOmit.has('AssetKey') + ? ({} as AssetKey) + : buildAssetKey({}, relationshipsToOmit), + eventType: + overrides && overrides.hasOwnProperty('eventType') + ? overrides.eventType! + : DagsterEventType.ALERT_FAILURE, + level: overrides && overrides.hasOwnProperty('level') ? overrides.level! : LogLevel.CRITICAL, + message: overrides && overrides.hasOwnProperty('message') ? overrides.message! : 'perferendis', + pipelineName: + overrides && overrides.hasOwnProperty('pipelineName') + ? overrides.pipelineName! + : 'perspiciatis', + runId: overrides && overrides.hasOwnProperty('runId') ? overrides.runId! : 'aliquid', + solidHandleID: + overrides && overrides.hasOwnProperty('solidHandleID') ? overrides.solidHandleID! : 'quas', + stepKey: overrides && overrides.hasOwnProperty('stepKey') ? overrides.stepKey! : 'laborum', + timestamp: overrides && overrides.hasOwnProperty('timestamp') ? overrides.timestamp! : 'illo', + }; +}; + +export const buildPlannedAssetMaterializationSkippedEvent = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): { + __typename: 'PlannedAssetMaterializationSkippedEvent'; +} & PlannedAssetMaterializationSkippedEvent => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('PlannedAssetMaterializationSkippedEvent'); + return { + __typename: 'PlannedAssetMaterializationSkippedEvent', + assetKey: + overrides && overrides.hasOwnProperty('assetKey') + ? overrides.assetKey! + : relationshipsToOmit.has('AssetKey') + ? ({} as AssetKey) + : buildAssetKey({}, relationshipsToOmit), + eventType: + overrides && overrides.hasOwnProperty('eventType') + ? overrides.eventType! + : DagsterEventType.ALERT_FAILURE, + level: overrides && overrides.hasOwnProperty('level') ? overrides.level! : LogLevel.CRITICAL, + message: overrides && overrides.hasOwnProperty('message') ? overrides.message! : 'culpa', + pipelineName: + overrides && overrides.hasOwnProperty('pipelineName') ? overrides.pipelineName! : 'quas', + runId: overrides && overrides.hasOwnProperty('runId') ? overrides.runId! : 'molestias', + solidHandleID: + overrides && overrides.hasOwnProperty('solidHandleID') ? overrides.solidHandleID! : 'et', + stepKey: overrides && overrides.hasOwnProperty('stepKey') ? overrides.stepKey! : 'aut', + timestamp: + overrides && overrides.hasOwnProperty('timestamp') ? overrides.timestamp! : 'aperiam', + }; +}; + export const buildPresetNotFoundError = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRowStructuredContent.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRowStructuredContent.tsx index b3c558ab2f851..9acfc94ef267b 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRowStructuredContent.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRowStructuredContent.tsx @@ -152,7 +152,8 @@ export const LogsRowStructuredContent = ({node, metadata}: IStructuredContentPro /> ); case 'AssetMaterializationPlannedEvent': - case 'AssetMaterializationFailureEvent': + case 'PlannedAssetMaterializationFailureEvent': + case 'PlannedAssetMaterializationSkippedEvent': return ; case 'ObjectStoreOperationEvent': return ( diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/filterLogs.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/filterLogs.tsx index 98fce4eb602a2..a73a3710a1d51 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/filterLogs.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/filterLogs.tsx @@ -11,7 +11,8 @@ export function filterLogs(logs: LogsProviderLogs, filter: LogFilter, filterStep if ( node.__typename === 'AssetMaterializationPlannedEvent' || node.__typename === 'AssetCheckEvaluationPlannedEvent' || - node.__typename === 'AssetMaterializationFailureEvent' + node.__typename === 'PlannedAssetMaterializationFailureEvent' || + node.__typename === 'PlannedAssetMaterializationSkippedEvent' ) { return false; } diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts index 243fc49f59e48..81f97cc4fe4b7 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts @@ -244,15 +244,6 @@ export type PipelineRunLogsSubscription = { stepKey: string | null; eventType: Types.DagsterEventType | null; } - | { - __typename: 'AssetMaterializationFailureEvent'; - runId: string; - message: string; - timestamp: string; - level: Types.LogLevel; - stepKey: string | null; - eventType: Types.DagsterEventType | null; - } | { __typename: 'AssetMaterializationPlannedEvent'; runId: string; @@ -2185,6 +2176,24 @@ export type PipelineRunLogsSubscription = { >; assetKey: {__typename: 'AssetKey'; path: Array} | null; } + | { + __typename: 'PlannedAssetMaterializationFailureEvent'; + runId: string; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; + } + | { + __typename: 'PlannedAssetMaterializationSkippedEvent'; + runId: string; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; + } | { __typename: 'ResourceInitFailureEvent'; runId: string; @@ -3590,15 +3599,6 @@ export type RunLogsSubscriptionSuccessFragment = { stepKey: string | null; eventType: Types.DagsterEventType | null; } - | { - __typename: 'AssetMaterializationFailureEvent'; - runId: string; - message: string; - timestamp: string; - level: Types.LogLevel; - stepKey: string | null; - eventType: Types.DagsterEventType | null; - } | { __typename: 'AssetMaterializationPlannedEvent'; runId: string; @@ -5489,6 +5489,24 @@ export type RunLogsSubscriptionSuccessFragment = { >; assetKey: {__typename: 'AssetKey'; path: Array} | null; } + | { + __typename: 'PlannedAssetMaterializationFailureEvent'; + runId: string; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; + } + | { + __typename: 'PlannedAssetMaterializationSkippedEvent'; + runId: string; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; + } | { __typename: 'ResourceInitFailureEvent'; runId: string; @@ -6887,15 +6905,6 @@ export type RunLogsQuery = { stepKey: string | null; eventType: Types.DagsterEventType | null; } - | { - __typename: 'AssetMaterializationFailureEvent'; - runId: string; - message: string; - timestamp: string; - level: Types.LogLevel; - stepKey: string | null; - eventType: Types.DagsterEventType | null; - } | { __typename: 'AssetMaterializationPlannedEvent'; runId: string; @@ -8828,6 +8837,24 @@ export type RunLogsQuery = { >; assetKey: {__typename: 'AssetKey'; path: Array} | null; } + | { + __typename: 'PlannedAssetMaterializationFailureEvent'; + runId: string; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; + } + | { + __typename: 'PlannedAssetMaterializationSkippedEvent'; + runId: string; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; + } | { __typename: 'ResourceInitFailureEvent'; runId: string; diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts index 69814c91aae55..4ac8cf8761909 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts @@ -208,15 +208,6 @@ export type LogsRowStructuredFragment_AssetCheckEvaluationPlannedEvent = { stepKey: string | null; }; -export type LogsRowStructuredFragment_AssetMaterializationFailureEvent = { - __typename: 'AssetMaterializationFailureEvent'; - message: string; - eventType: Types.DagsterEventType | null; - timestamp: string; - level: Types.LogLevel; - stepKey: string | null; -}; - export type LogsRowStructuredFragment_AssetMaterializationPlannedEvent = { __typename: 'AssetMaterializationPlannedEvent'; message: string; @@ -1956,6 +1947,24 @@ export type LogsRowStructuredFragment_ObservationEvent = { assetKey: {__typename: 'AssetKey'; path: Array} | null; }; +export type LogsRowStructuredFragment_PlannedAssetMaterializationFailureEvent = { + __typename: 'PlannedAssetMaterializationFailureEvent'; + message: string; + eventType: Types.DagsterEventType | null; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; +}; + +export type LogsRowStructuredFragment_PlannedAssetMaterializationSkippedEvent = { + __typename: 'PlannedAssetMaterializationSkippedEvent'; + message: string; + eventType: Types.DagsterEventType | null; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; +}; + export type LogsRowStructuredFragment_ResourceInitFailureEvent = { __typename: 'ResourceInitFailureEvent'; message: string; @@ -3019,7 +3028,6 @@ export type LogsRowStructuredFragment = | LogsRowStructuredFragment_AlertSuccessEvent | LogsRowStructuredFragment_AssetCheckEvaluationEvent | LogsRowStructuredFragment_AssetCheckEvaluationPlannedEvent - | LogsRowStructuredFragment_AssetMaterializationFailureEvent | LogsRowStructuredFragment_AssetMaterializationPlannedEvent | LogsRowStructuredFragment_EngineEvent | LogsRowStructuredFragment_ExecutionStepFailureEvent @@ -3040,6 +3048,8 @@ export type LogsRowStructuredFragment = | LogsRowStructuredFragment_MaterializationEvent | LogsRowStructuredFragment_ObjectStoreOperationEvent | LogsRowStructuredFragment_ObservationEvent + | LogsRowStructuredFragment_PlannedAssetMaterializationFailureEvent + | LogsRowStructuredFragment_PlannedAssetMaterializationSkippedEvent | LogsRowStructuredFragment_ResourceInitFailureEvent | LogsRowStructuredFragment_ResourceInitStartedEvent | LogsRowStructuredFragment_ResourceInitSuccessEvent @@ -3095,14 +3105,6 @@ export type LogsRowUnstructuredFragment_AssetCheckEvaluationPlannedEvent = { stepKey: string | null; }; -export type LogsRowUnstructuredFragment_AssetMaterializationFailureEvent = { - __typename: 'AssetMaterializationFailureEvent'; - message: string; - timestamp: string; - level: Types.LogLevel; - stepKey: string | null; -}; - export type LogsRowUnstructuredFragment_AssetMaterializationPlannedEvent = { __typename: 'AssetMaterializationPlannedEvent'; message: string; @@ -3263,6 +3265,22 @@ export type LogsRowUnstructuredFragment_ObservationEvent = { stepKey: string | null; }; +export type LogsRowUnstructuredFragment_PlannedAssetMaterializationFailureEvent = { + __typename: 'PlannedAssetMaterializationFailureEvent'; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; +}; + +export type LogsRowUnstructuredFragment_PlannedAssetMaterializationSkippedEvent = { + __typename: 'PlannedAssetMaterializationSkippedEvent'; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; +}; + export type LogsRowUnstructuredFragment_ResourceInitFailureEvent = { __typename: 'ResourceInitFailureEvent'; message: string; @@ -3381,7 +3399,6 @@ export type LogsRowUnstructuredFragment = | LogsRowUnstructuredFragment_AlertSuccessEvent | LogsRowUnstructuredFragment_AssetCheckEvaluationEvent | LogsRowUnstructuredFragment_AssetCheckEvaluationPlannedEvent - | LogsRowUnstructuredFragment_AssetMaterializationFailureEvent | LogsRowUnstructuredFragment_AssetMaterializationPlannedEvent | LogsRowUnstructuredFragment_EngineEvent | LogsRowUnstructuredFragment_ExecutionStepFailureEvent @@ -3402,6 +3419,8 @@ export type LogsRowUnstructuredFragment = | LogsRowUnstructuredFragment_MaterializationEvent | LogsRowUnstructuredFragment_ObjectStoreOperationEvent | LogsRowUnstructuredFragment_ObservationEvent + | LogsRowUnstructuredFragment_PlannedAssetMaterializationFailureEvent + | LogsRowUnstructuredFragment_PlannedAssetMaterializationSkippedEvent | LogsRowUnstructuredFragment_ResourceInitFailureEvent | LogsRowUnstructuredFragment_ResourceInitStartedEvent | LogsRowUnstructuredFragment_ResourceInitSuccessEvent diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts index c431e0ac69c7e..4780106eca0e6 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts @@ -208,15 +208,6 @@ export type LogsScrollingTableMessageFragment_AssetCheckEvaluationPlannedEvent = stepKey: string | null; }; -export type LogsScrollingTableMessageFragment_AssetMaterializationFailureEvent = { - __typename: 'AssetMaterializationFailureEvent'; - message: string; - eventType: Types.DagsterEventType | null; - timestamp: string; - level: Types.LogLevel; - stepKey: string | null; -}; - export type LogsScrollingTableMessageFragment_AssetMaterializationPlannedEvent = { __typename: 'AssetMaterializationPlannedEvent'; message: string; @@ -1956,6 +1947,24 @@ export type LogsScrollingTableMessageFragment_ObservationEvent = { assetKey: {__typename: 'AssetKey'; path: Array} | null; }; +export type LogsScrollingTableMessageFragment_PlannedAssetMaterializationFailureEvent = { + __typename: 'PlannedAssetMaterializationFailureEvent'; + message: string; + eventType: Types.DagsterEventType | null; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; +}; + +export type LogsScrollingTableMessageFragment_PlannedAssetMaterializationSkippedEvent = { + __typename: 'PlannedAssetMaterializationSkippedEvent'; + message: string; + eventType: Types.DagsterEventType | null; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; +}; + export type LogsScrollingTableMessageFragment_ResourceInitFailureEvent = { __typename: 'ResourceInitFailureEvent'; message: string; @@ -3019,7 +3028,6 @@ export type LogsScrollingTableMessageFragment = | LogsScrollingTableMessageFragment_AlertSuccessEvent | LogsScrollingTableMessageFragment_AssetCheckEvaluationEvent | LogsScrollingTableMessageFragment_AssetCheckEvaluationPlannedEvent - | LogsScrollingTableMessageFragment_AssetMaterializationFailureEvent | LogsScrollingTableMessageFragment_AssetMaterializationPlannedEvent | LogsScrollingTableMessageFragment_EngineEvent | LogsScrollingTableMessageFragment_ExecutionStepFailureEvent @@ -3040,6 +3048,8 @@ export type LogsScrollingTableMessageFragment = | LogsScrollingTableMessageFragment_MaterializationEvent | LogsScrollingTableMessageFragment_ObjectStoreOperationEvent | LogsScrollingTableMessageFragment_ObservationEvent + | LogsScrollingTableMessageFragment_PlannedAssetMaterializationFailureEvent + | LogsScrollingTableMessageFragment_PlannedAssetMaterializationSkippedEvent | LogsScrollingTableMessageFragment_ResourceInitFailureEvent | LogsScrollingTableMessageFragment_ResourceInitStartedEvent | LogsScrollingTableMessageFragment_ResourceInitSuccessEvent diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts index 1cf0eff291f1c..2e44fbc98412b 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts @@ -265,15 +265,6 @@ export type RunDagsterRunEventFragment_AssetCheckEvaluationPlannedEvent = { eventType: Types.DagsterEventType | null; }; -export type RunDagsterRunEventFragment_AssetMaterializationFailureEvent = { - __typename: 'AssetMaterializationFailureEvent'; - message: string; - timestamp: string; - level: Types.LogLevel; - stepKey: string | null; - eventType: Types.DagsterEventType | null; -}; - export type RunDagsterRunEventFragment_AssetMaterializationPlannedEvent = { __typename: 'AssetMaterializationPlannedEvent'; message: string; @@ -2014,6 +2005,24 @@ export type RunDagsterRunEventFragment_ObservationEvent = { assetKey: {__typename: 'AssetKey'; path: Array} | null; }; +export type RunDagsterRunEventFragment_PlannedAssetMaterializationFailureEvent = { + __typename: 'PlannedAssetMaterializationFailureEvent'; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; +}; + +export type RunDagsterRunEventFragment_PlannedAssetMaterializationSkippedEvent = { + __typename: 'PlannedAssetMaterializationSkippedEvent'; + message: string; + timestamp: string; + level: Types.LogLevel; + stepKey: string | null; + eventType: Types.DagsterEventType | null; +}; + export type RunDagsterRunEventFragment_ResourceInitFailureEvent = { __typename: 'ResourceInitFailureEvent'; message: string; @@ -3077,7 +3086,6 @@ export type RunDagsterRunEventFragment = | RunDagsterRunEventFragment_AlertSuccessEvent | RunDagsterRunEventFragment_AssetCheckEvaluationEvent | RunDagsterRunEventFragment_AssetCheckEvaluationPlannedEvent - | RunDagsterRunEventFragment_AssetMaterializationFailureEvent | RunDagsterRunEventFragment_AssetMaterializationPlannedEvent | RunDagsterRunEventFragment_EngineEvent | RunDagsterRunEventFragment_ExecutionStepFailureEvent @@ -3098,6 +3106,8 @@ export type RunDagsterRunEventFragment = | RunDagsterRunEventFragment_MaterializationEvent | RunDagsterRunEventFragment_ObjectStoreOperationEvent | RunDagsterRunEventFragment_ObservationEvent + | RunDagsterRunEventFragment_PlannedAssetMaterializationFailureEvent + | RunDagsterRunEventFragment_PlannedAssetMaterializationSkippedEvent | RunDagsterRunEventFragment_ResourceInitFailureEvent | RunDagsterRunEventFragment_ResourceInitStartedEvent | RunDagsterRunEventFragment_ResourceInitSuccessEvent diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts index d69e8151ba728..3ee33d7bad2ba 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts @@ -37,13 +37,6 @@ export type RunMetadataProviderMessageFragment_AssetCheckEvaluationPlannedEvent stepKey: string | null; }; -export type RunMetadataProviderMessageFragment_AssetMaterializationFailureEvent = { - __typename: 'AssetMaterializationFailureEvent'; - message: string; - timestamp: string; - stepKey: string | null; -}; - export type RunMetadataProviderMessageFragment_AssetMaterializationPlannedEvent = { __typename: 'AssetMaterializationPlannedEvent'; message: string; @@ -345,6 +338,20 @@ export type RunMetadataProviderMessageFragment_ObservationEvent = { stepKey: string | null; }; +export type RunMetadataProviderMessageFragment_PlannedAssetMaterializationFailureEvent = { + __typename: 'PlannedAssetMaterializationFailureEvent'; + message: string; + timestamp: string; + stepKey: string | null; +}; + +export type RunMetadataProviderMessageFragment_PlannedAssetMaterializationSkippedEvent = { + __typename: 'PlannedAssetMaterializationSkippedEvent'; + message: string; + timestamp: string; + stepKey: string | null; +}; + export type RunMetadataProviderMessageFragment_ResourceInitFailureEvent = { __typename: 'ResourceInitFailureEvent'; message: string; @@ -459,7 +466,6 @@ export type RunMetadataProviderMessageFragment = | RunMetadataProviderMessageFragment_AlertSuccessEvent | RunMetadataProviderMessageFragment_AssetCheckEvaluationEvent | RunMetadataProviderMessageFragment_AssetCheckEvaluationPlannedEvent - | RunMetadataProviderMessageFragment_AssetMaterializationFailureEvent | RunMetadataProviderMessageFragment_AssetMaterializationPlannedEvent | RunMetadataProviderMessageFragment_EngineEvent | RunMetadataProviderMessageFragment_ExecutionStepFailureEvent @@ -480,6 +486,8 @@ export type RunMetadataProviderMessageFragment = | RunMetadataProviderMessageFragment_MaterializationEvent | RunMetadataProviderMessageFragment_ObjectStoreOperationEvent | RunMetadataProviderMessageFragment_ObservationEvent + | RunMetadataProviderMessageFragment_PlannedAssetMaterializationFailureEvent + | RunMetadataProviderMessageFragment_PlannedAssetMaterializationSkippedEvent | RunMetadataProviderMessageFragment_ResourceInitFailureEvent | RunMetadataProviderMessageFragment_ResourceInitStartedEvent | RunMetadataProviderMessageFragment_ResourceInitSuccessEvent diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/events.py b/python_modules/dagster-graphql/dagster_graphql/implementation/events.py index 10fff39b56474..97cd64c237dd4 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/events.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/events.py @@ -222,7 +222,6 @@ def from_dagster_event_record(event_record: EventLogEntry, pipeline_name: str) - GrapheneAlertSuccessEvent, GrapheneAssetCheckEvaluationEvent, GrapheneAssetCheckEvaluationPlannedEvent, - GrapheneAssetMaterializationFailureEvent, GrapheneAssetMaterializationPlannedEvent, GrapheneEngineEvent, GrapheneExecutionStepFailureEvent, @@ -242,6 +241,8 @@ def from_dagster_event_record(event_record: EventLogEntry, pipeline_name: str) - GrapheneMaterializationEvent, GrapheneObjectStoreOperationEvent, GrapheneObservationEvent, + GraphenePlannedAssetMaterializationFailureEvent, + GraphenePlannedAssetMaterializationSkippedEvent, GrapheneResourceInitFailureEvent, GrapheneResourceInitStartedEvent, GrapheneResourceInitSuccessEvent, @@ -304,8 +305,10 @@ def from_dagster_event_record(event_record: EventLogEntry, pipeline_name: str) - ) elif dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return GrapheneAssetMaterializationPlannedEvent(event=event_record) - elif dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION_FAILURE: - return GrapheneAssetMaterializationFailureEvent(event=event_record) + elif dagster_event.event_type == DagsterEventType.PLANNED_ASSET_MATERIALIZATION_FAILURE: + return GraphenePlannedAssetMaterializationFailureEvent(event=event_record) + elif dagster_event.event_type == DagsterEventType.PLANNED_ASSET_MATERIALIZATION_SKIPPED: + return GraphenePlannedAssetMaterializationSkippedEvent(event=event_record) elif dagster_event.event_type == DagsterEventType.STEP_EXPECTATION_RESULT: data = cast(StepExpectationResultData, dagster_event.event_specific_data) return GrapheneStepExpectationResultEvent( diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py index d0fd05bfc9a44..2222b9b171fca 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_runs.py @@ -166,7 +166,7 @@ def get_run_ids( def _get_latest_planned_run_id(instance: DagsterInstance, asset_record: AssetRecord): - if instance.event_log_storage.asset_records_have_planned_and_failed_materializations: + if instance.event_log_storage.asset_records_have_planned_materializations: return asset_record.asset_entry.last_planned_materialization_run_id else: planned_info = instance.get_latest_planned_materialization_info( diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py b/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py index ad26ed9a1cd9d..36d88e7a77d0a 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py @@ -405,18 +405,32 @@ def __init__(self, event): ) -class GrapheneAssetMaterializationFailureEvent(graphene.ObjectType): +class GraphenePlannedAssetMaterializationFailureEvent(graphene.ObjectType): assetKey = graphene.Field(GrapheneAssetKey) class Meta: - name = "AssetMaterializationFailureEvent" + name = "PlannedAssetMaterializationFailureEvent" interfaces = (GrapheneMessageEvent, GrapheneRunEvent) def __init__(self, event: EventLogEntry): self._event = event def resolve_assetKey(self, _graphene_info: ResolveInfo): - return self._event.get_dagster_event().asset_materialization_failure_data.asset_key + return self._event.get_dagster_event().planned_asset_materialization_failure_data.asset_key + + +class GraphenePlannedAssetMaterializationSkippedEvent(graphene.ObjectType): + assetKey = graphene.Field(GrapheneAssetKey) + + class Meta: + name = "PlannedAssetMaterializationSkippedEvent" + interfaces = (GrapheneMessageEvent, GrapheneRunEvent) + + def __init__(self, event: EventLogEntry): + self._event = event + + def resolve_assetKey(self, _graphene_info: ResolveInfo): + return self._event.get_dagster_event().planned_asset_materialization_failure_data.asset_key class GrapheneAssetMaterializationPlannedEvent(graphene.ObjectType): @@ -600,7 +614,8 @@ class Meta: GrapheneAlertSuccessEvent, GrapheneAlertFailureEvent, GrapheneAssetMaterializationPlannedEvent, - GrapheneAssetMaterializationFailureEvent, + GraphenePlannedAssetMaterializationFailureEvent, + GraphenePlannedAssetMaterializationSkippedEvent, GrapheneAssetCheckEvaluationPlannedEvent, GrapheneAssetCheckEvaluationEvent, ) diff --git a/python_modules/dagster/dagster/_core/event_api.py b/python_modules/dagster/dagster/_core/event_api.py index bf88549a254cf..b618561da76a1 100644 --- a/python_modules/dagster/dagster/_core/event_api.py +++ b/python_modules/dagster/dagster/_core/event_api.py @@ -85,7 +85,8 @@ class RunShardedEventsCursor(NamedTuple): DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, DagsterEventType.ASSET_MATERIALIZATION_PLANNED, - DagsterEventType.ASSET_MATERIALIZATION_FAILURE, + DagsterEventType.PLANNED_ASSET_MATERIALIZATION_FAILURE, + DagsterEventType.PLANNED_ASSET_MATERIALIZATION_SKIPPED, ] EventCursor: TypeAlias = Union[int, RunShardedEventsCursor] diff --git a/python_modules/dagster/dagster/_core/events/__init__.py b/python_modules/dagster/dagster/_core/events/__init__.py index 6f42078b5851b..8faf8a5c636cc 100644 --- a/python_modules/dagster/dagster/_core/events/__init__.py +++ b/python_modules/dagster/dagster/_core/events/__init__.py @@ -81,7 +81,8 @@ "AssetMaterializationPlannedData", "AssetCheckEvaluation", "AssetCheckEvaluationPlanned", - "AssetMaterializationFailureData", + "PlannedAssetMaterializationFailureData", + "PlannedAssetMaterializationSkippedData", ] @@ -111,13 +112,15 @@ class DagsterEventType(str, Enum): ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION" ASSET_MATERIALIZATION_PLANNED = "ASSET_MATERIALIZATION_PLANNED" + + PLANNED_ASSET_MATERIALIZATION_FAILURE = "PLANNED_ASSET_MATERIALIZATION_FAILURE" + PLANNED_ASSET_MATERIALIZATION_SKIPPED = "PLANNED_ASSET_MATERIALIZATION_SKIPPED" + ASSET_OBSERVATION = "ASSET_OBSERVATION" STEP_EXPECTATION_RESULT = "STEP_EXPECTATION_RESULT" ASSET_CHECK_EVALUATION_PLANNED = "ASSET_CHECK_EVALUATION_PLANNED" ASSET_CHECK_EVALUATION = "ASSET_CHECK_EVALUATION" - ASSET_MATERIALIZATION_FAILURE = "ASSET_MATERIALIZATION_FAILURE" - # We want to display RUN_* events in the Dagster UI and in our LogManager output, but in order to # support backcompat for our storage layer, we need to keep the persisted value to be strings # of the form "PIPELINE_*". We may have user code that pass in the DagsterEventType @@ -244,14 +247,16 @@ class DagsterEventType(str, Enum): BATCH_WRITABLE_EVENTS = { DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, - DagsterEventType.ASSET_MATERIALIZATION_FAILURE, + DagsterEventType.PLANNED_ASSET_MATERIALIZATION_FAILURE, + DagsterEventType.PLANNED_ASSET_MATERIALIZATION_SKIPPED, } ASSET_EVENTS = { DagsterEventType.ASSET_MATERIALIZATION, DagsterEventType.ASSET_OBSERVATION, DagsterEventType.ASSET_MATERIALIZATION_PLANNED, - DagsterEventType.ASSET_MATERIALIZATION_FAILURE, + DagsterEventType.PLANNED_ASSET_MATERIALIZATION_FAILURE, + DagsterEventType.PLANNED_ASSET_MATERIALIZATION_SKIPPED, } ASSET_CHECK_EVENTS = { @@ -722,8 +727,10 @@ def asset_key(self) -> Optional[AssetKey]: return self.asset_observation_data.asset_observation.asset_key elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return self.asset_materialization_planned_data.asset_key - elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_FAILURE: - return self.asset_materialization_failure_data.asset_key + elif self.event_type == DagsterEventType.PLANNED_ASSET_MATERIALIZATION_FAILURE: + return self.planned_asset_materialization_failure_data.asset_key + elif self.event_type == DagsterEventType.PLANNED_ASSET_MATERIALIZATION_SKIPPED: + return self.planned_asset_materialization_skipped_data.asset_key else: return None @@ -740,8 +747,10 @@ def partition(self) -> Optional[str]: return self.asset_observation_data.asset_observation.partition elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED: return self.asset_materialization_planned_data.partition - elif self.event_type == DagsterEventType.ASSET_MATERIALIZATION_FAILURE: - return self.asset_materialization_failure_data.partition + elif self.event_type == DagsterEventType.PLANNED_ASSET_MATERIALIZATION_FAILURE: + return self.planned_asset_materialization_failure_data.partition + elif self.event_type == DagsterEventType.PLANNED_ASSET_MATERIALIZATION_SKIPPED: + return self.planned_asset_materialization_skipped_data.partition else: return None @@ -798,13 +807,26 @@ def asset_materialization_planned_data(self) -> "AssetMaterializationPlannedData return cast(AssetMaterializationPlannedData, self.event_specific_data) @property - def asset_materialization_failure_data(self) -> "AssetMaterializationFailureData": + def planned_asset_materialization_failure_data( + self, + ) -> "PlannedAssetMaterializationFailureData": _assert_type( - "asset_materialization_failure", - DagsterEventType.ASSET_MATERIALIZATION_FAILURE, + "planned_asset_materialization_failure", + DagsterEventType.PLANNED_ASSET_MATERIALIZATION_FAILURE, self.event_type, ) - return cast(AssetMaterializationFailureData, self.event_specific_data) + return cast(PlannedAssetMaterializationFailureData, self.event_specific_data) + + @property + def planned_asset_materialization_skipped_data( + self, + ) -> "PlannedAssetMaterializationSkippedData": + _assert_type( + "planned_asset_materialization_skipped", + DagsterEventType.PLANNED_ASSET_MATERIALIZATION_SKIPPED, + self.event_type, + ) + return cast(PlannedAssetMaterializationSkippedData, self.event_specific_data) @property def asset_check_planned_data(self) -> "AssetCheckEvaluationPlanned": @@ -986,16 +1008,30 @@ def step_start_event(step_context: IStepContext) -> "DagsterEvent": ) @staticmethod - def build_asset_materialization_failure_event( + def build_planned_asset_materialization_failure_event( + job_name: str, + step_key: str, + planned_asset_materialization_failure_data: "PlannedAssetMaterializationFailureData", + ) -> "DagsterEvent": + return DagsterEvent( + event_type_value=DagsterEventType.PLANNED_ASSET_MATERIALIZATION_FAILURE.value, + job_name=job_name, + message="", + event_specific_data=planned_asset_materialization_failure_data, + step_key=step_key, + ) + + @staticmethod + def build_planned_asset_materialization_skipped_event( job_name: str, step_key: str, - asset_materialization_failure_data: "AssetMaterializationFailureData", + planned_asset_materialization_skipped_data: "PlannedAssetMaterializationSkippedData", ) -> "DagsterEvent": return DagsterEvent( - event_type_value=DagsterEventType.ASSET_MATERIALIZATION_FAILURE.value, + event_type_value=DagsterEventType.PLANNED_ASSET_MATERIALIZATION_SKIPPED.value, job_name=job_name, message="", - event_specific_data=asset_materialization_failure_data, + event_specific_data=planned_asset_materialization_skipped_data, step_key=step_key, ) @@ -1579,7 +1615,7 @@ def __new__(cls, asset_observation: AssetObservation): @whitelist_for_serdes -class AssetMaterializationFailureData( +class PlannedAssetMaterializationFailureData( NamedTuple( "_AssetMaterializationFailureData", [ @@ -1595,7 +1631,7 @@ def __new__( partition: Optional[str], error: Optional[SerializableErrorInfo] = None, ): - return super(AssetMaterializationFailureData, cls).__new__( + return super(PlannedAssetMaterializationFailureData, cls).__new__( cls, asset_key=check.inst_param(asset_key, "asset_key", AssetKey), partition=check.opt_str_param(partition, "partition"), @@ -1603,6 +1639,40 @@ def __new__( ) +@whitelist_for_serdes +class PlannedAssetMaterializationSkipReason(Enum): + # The run was deleted after it was planned but before the materialization could happen + RUN_DELETED = "RUN_DELETED" + # Will add more concrete skip reasons as we add code that emits skip events + + +@whitelist_for_serdes +class PlannedAssetMaterializationSkippedData( + NamedTuple( + "_PlannedAssetMaterializationSkippedData", + [ + ("asset_key", AssetKey), + ("partition", Optional[str]), + ("skip_reason", Optional[PlannedAssetMaterializationSkipReason]), + ], + ) +): + def __new__( + cls, + asset_key: AssetKey, + partition: Optional[str], + skip_reason: Optional[PlannedAssetMaterializationSkipReason] = None, + ): + return super(PlannedAssetMaterializationSkippedData, cls).__new__( + cls, + asset_key=check.inst_param(asset_key, "asset_key", AssetKey), + partition=check.opt_str_param(partition, "partition"), + skip_reason=check.opt_inst_param( + skip_reason, "skip_reason", PlannedAssetMaterializationSkipReason + ), + ) + + @whitelist_for_serdes class StepMaterializationData( NamedTuple( diff --git a/python_modules/dagster/dagster/_core/storage/event_log/base.py b/python_modules/dagster/dagster/_core/storage/event_log/base.py index d7daa33845111..fcae855c2e975 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/base.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/base.py @@ -65,8 +65,10 @@ class AssetEntry( ("last_observation_record", Optional[EventLogRecord]), ("last_planned_materialization_storage_id", Optional[int]), ("last_planned_materialization_run_id", Optional[str]), - ("last_materialization_failure_storage_id", Optional[int]), - ("last_materialization_failure_run_id", Optional[str]), + ("last_planned_materialization_failure_storage_id", Optional[int]), + ("last_planned_materialization_failure_run_id", Optional[str]), + ("last_planned_materialization_skipped_storage_id", Optional[int]), + ("last_planned_materialization_skipped_run_id", Optional[str]), ], ) ): @@ -80,8 +82,10 @@ def __new__( last_observation_record: Optional[EventLogRecord] = None, last_planned_materialization_storage_id: Optional[int] = None, last_planned_materialization_run_id: Optional[str] = None, - last_materialization_failure_storage_id: Optional[int] = None, - last_materialization_failure_run_id: Optional[str] = None, + last_planned_materialization_failure_storage_id: Optional[int] = None, + last_planned_materialization_failure_run_id: Optional[str] = None, + last_planned_materialization_skipped_storage_id: Optional[int] = None, + last_planned_materialization_skipped_run_id: Optional[str] = None, ): from dagster._core.storage.partition_status_cache import AssetStatusCacheValue @@ -107,11 +111,21 @@ def __new__( last_planned_materialization_run_id, "last_planned_materialization_run_id", ), - last_materialization_failure_storage_id=check.opt_int_param( - last_materialization_failure_storage_id, "last_materialization_failure_storage_id" + last_planned_materialization_failure_storage_id=check.opt_int_param( + last_planned_materialization_failure_storage_id, + "last_planned_materialization_failure_storage_id", ), - last_materialization_failure_run_id=check.opt_str_param( - last_materialization_failure_run_id, "last_materialization_failure_run_id" + last_planned_materialization_failure_run_id=check.opt_str_param( + last_planned_materialization_failure_run_id, + "last_planned_materialization_failure_run_id", + ), + last_planned_materialization_skipped_storage_id=check.opt_int_param( + last_planned_materialization_skipped_storage_id, + "last_planned_materialization_skipped_storage_id", + ), + last_planned_materialization_skipped_run_id=check.opt_str_param( + last_planned_materialization_skipped_run_id, + "last_planned_materialization_skipped_run_id", ), ) @@ -341,7 +355,7 @@ def get_asset_check_summary_records( pass @property - def asset_records_have_planned_and_failed_materializations(self) -> bool: + def asset_records_have_planned_materializations(self) -> bool: return False @abstractmethod diff --git a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py index d14e109bc44f8..4229fa253db25 100644 --- a/python_modules/dagster/dagster/_core/storage/partition_status_cache.py +++ b/python_modules/dagster/dagster/_core/storage/partition_status_cache.py @@ -226,7 +226,7 @@ def get_validated_partition_keys( def get_last_planned_storage_id( instance: DagsterInstance, asset_key: AssetKey, asset_record: Optional["AssetRecord"] ) -> int: - if instance.event_log_storage.asset_records_have_planned_and_failed_materializations: + if instance.event_log_storage.asset_records_have_planned_materializations: return ( (asset_record.asset_entry.last_planned_materialization_storage_id or 0) if asset_record diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index 56cb84a07228a..06be2ea161af8 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -68,13 +68,14 @@ from dagster._core.event_api import EventLogCursor, EventRecordsResult, RunStatusChangeRecordsFilter from dagster._core.events import ( EVENT_TYPE_TO_PIPELINE_RUN_STATUS, - AssetMaterializationFailureData, AssetMaterializationPlannedData, AssetObservationData, DagsterEvent, DagsterEventBatchMetadata, DagsterEventType, EngineEventData, + PlannedAssetMaterializationFailureData, + PlannedAssetMaterializationSkippedData, StepExpectationResultData, StepMaterializationData, generate_event_batch_id, @@ -2747,20 +2748,21 @@ def _store_partition_event(asset_key, partition) -> int: latest_storage_ids["p1"] = _store_partition_event(a, "p1") _assert_storage_matches(latest_storage_ids) - def test_batch_write_asset_materialization_failures(self, storage, instance): + def test_batch_write_planned_asset_materialization_failures_and_skips(self, storage, instance): a = AssetKey(["a"]) run_id = make_new_run_id() partitions = list(string.ascii_uppercase) failed_partitions = {"step_a": set(partitions[:10]), "step_b": set(partitions[10:15])} + skipped_partitions = {"step_a": set(partitions[10:15]), "step_b": set(partitions[15:20])} - asset_materialization_failure_events: List[List[DagsterEvent]] = [ + failure_events_by_step: List[List[DagsterEvent]] = [ [ - DagsterEvent.build_asset_materialization_failure_event( + DagsterEvent.build_planned_asset_materialization_failure_event( job_name="my_fake_job", step_key=step_key, - asset_materialization_failure_data=AssetMaterializationFailureData( + planned_asset_materialization_failure_data=PlannedAssetMaterializationFailureData( asset_key=a, partition=partition, error=None ), ) @@ -2769,30 +2771,65 @@ def test_batch_write_asset_materialization_failures(self, storage, instance): for step_key, partitions in failed_partitions.items() ] - asset_materialization_failure_events = [ - event for events in asset_materialization_failure_events for event in events + failure_events = [event for events in failure_events_by_step for event in events] + + batch_id = generate_event_batch_id() + last_index = len(failure_events) - 1 + + for i, failure_event in enumerate(failure_events): + batch_metadata = DagsterEventBatchMetadata(batch_id, i == last_index) + instance.report_dagster_event(failure_event, run_id, batch_metadata=batch_metadata) + + skip_events_by_step: List[List[DagsterEvent]] = [ + [ + DagsterEvent.build_planned_asset_materialization_skipped_event( + job_name="my_fake_job", + step_key=step_key, + planned_asset_materialization_skipped_data=PlannedAssetMaterializationSkippedData( + asset_key=a, partition=partition + ), + ) + for partition in partitions + ] + for step_key, partitions in skipped_partitions.items() ] + skip_events = [event for events in skip_events_by_step for event in events] + batch_id = generate_event_batch_id() - last_index = len(asset_materialization_failure_events) - 1 + last_index = len(failure_events) - 1 - for i, asset_event in enumerate(asset_materialization_failure_events): + for i, skip_event in enumerate(skip_events): batch_metadata = DagsterEventBatchMetadata(batch_id, i == last_index) - instance.report_dagster_event(asset_event, run_id, batch_metadata=batch_metadata) + instance.report_dagster_event(skip_event, run_id, batch_metadata=batch_metadata) failure_records = instance.get_records_for_run( - run_id=run_id, of_type=DagsterEventType.ASSET_MATERIALIZATION_FAILURE + run_id=run_id, of_type=DagsterEventType.PLANNED_ASSET_MATERIALIZATION_FAILURE ).records assert len(failure_records) == 15 - stored_partitions_by_step_key = defaultdict(set) + failed_partitions_by_step_key = defaultdict(set) for record in failure_records: - stored_partitions_by_step_key[record.event_log_entry.step_key].add( - record.event_log_entry.dagster_event.asset_materialization_failure_data.partition + failed_partitions_by_step_key[record.event_log_entry.step_key].add( + record.event_log_entry.dagster_event.planned_asset_materialization_failure_data.partition + ) + + assert failed_partitions_by_step_key == failed_partitions + + skip_records = instance.get_records_for_run( + run_id=run_id, of_type=DagsterEventType.PLANNED_ASSET_MATERIALIZATION_SKIPPED + ).records + + assert len(skip_records) == 10 + + skipped_partitions_by_step_key = defaultdict(set) + for record in skip_records: + skipped_partitions_by_step_key[record.event_log_entry.step_key].add( + record.event_log_entry.dagster_event.planned_asset_materialization_skipped_data.partition ) - assert stored_partitions_by_step_key == failed_partitions + assert skipped_partitions_by_step_key == skipped_partitions @pytest.mark.parametrize( "dagster_event_type", @@ -4057,7 +4094,7 @@ def second_asset(my_asset): ascending=False, ).records[0] - if storage.asset_records_have_planned_and_failed_materializations: + if storage.asset_records_have_planned_materializations: assert ( asset_entry.last_planned_materialization_storage_id == materialization_planned_record.storage_id