From 709de963d9c9d614b7c929434dd5e810be8ecb9b Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 5 Dec 2023 19:57:30 -0500 Subject: [PATCH] its all soup --- .../_core/execution/context/compute.py | 25 ++++++++++--------- .../_core/execution/plan/compute_generator.py | 2 ++ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index d2fbdd8f02cfc..e530009d2523e 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -99,15 +99,8 @@ def consume_events(self) -> Iterator[DagsterEvent]: def has_events(self) -> bool: return bool(self._events) - def log_event(self, event: UserEvent, step_execution_context: StepExecutionContext) -> None: - if isinstance(event, AssetMaterialization): - self._events.append(DagsterEvent.asset_materialization(step_execution_context, event)) - elif isinstance(event, AssetObservation): - self._events.append(DagsterEvent.asset_observation(step_execution_context, event)) - elif isinstance(event, ExpectationResult): - self._events.append(DagsterEvent.step_expectation_result(step_execution_context, event)) - else: - check.failed(f"Unexpected event {event}") + def log_event(self, event: DagsterEvent): + self._events.append(event) @property def requires_typed_event_stream(self) -> bool: @@ -549,9 +542,17 @@ def log_event(self, event: UserEvent) -> None: def log_materialization(context): context.log_event(AssetMaterialization("foo")) """ - self.execution_properties.log_event( - event=event, step_execution_context=self._step_execution_context - ) + if isinstance(event, AssetMaterialization): + dagster_event = DagsterEvent.asset_materialization(self._step_execution_context, event) + elif isinstance(event, AssetObservation): + dagster_event = DagsterEvent.asset_observation(self._step_execution_context, event) + elif isinstance(event, ExpectationResult): + dagster_event = DagsterEvent.step_expectation_result( + self._step_execution_context, event + ) + else: + check.failed(f"Unexpected event {event}") + self.execution_properties.log_event(event=dagster_event) @public def add_output_metadata( diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index e9c83fe3c86ad..4cf2a837c683a 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -114,6 +114,8 @@ def invoke_compute_fn( config_arg_cls: Optional[Type[Config]], resource_args: Optional[Dict[str, str]] = None, ) -> Any: + # TODO - this is a possible execution pathway for both direct invocation and normal execution. Need to figure + # out the implications for the context args_to_pass = {**kwargs} if config_arg_cls: # config_arg_cls is either a Config class or a primitive type