diff --git a/python_modules/dagster/dagster/_core/pipes/context.py b/python_modules/dagster/dagster/_core/pipes/context.py index 52b8c9e97d2b5..1f6227fd8f4b4 100644 --- a/python_modules/dagster/dagster/_core/pipes/context.py +++ b/python_modules/dagster/dagster/_core/pipes/context.py @@ -21,6 +21,7 @@ from typing_extensions import TypeAlias import dagster._check as check +from dagster import DagsterEvent from dagster._annotations import experimental, public from dagster._core.definitions.asset_check_result import AssetCheckResult from dagster._core.definitions.asset_check_spec import AssetCheckSeverity @@ -34,12 +35,15 @@ has_one_dimension_time_window_partitioning, ) from dagster._core.errors import DagsterPipesExecutionError +from dagster._core.events import EngineEventData from dagster._core.execution.context.compute import OpExecutionContext from dagster._core.execution.context.invocation import BoundOpExecutionContext +from dagster._utils.error import ExceptionInfo, serializable_error_info_from_exc_info if TYPE_CHECKING: from dagster._core.pipes.client import PipesMessageReader + PipesExecutionResult: TypeAlias = Union[MaterializeResult, AssetCheckResult] @@ -202,6 +206,18 @@ def _handle_log(self, message: str, level: str = "info") -> None: check.str_param(message, "message") self._context.log.log(level, message) + def report_pipes_framework_exception(self, origin: str, exc_info: ExceptionInfo): + # use an engine event to provide structured exception, this gives us an event with + # * the context of where the exception happened ([pipes]...) + # * the exception class, message, and stack trace as well as any chained exception + DagsterEvent.engine_event( + self._context.get_step_execution_context(), + f"[pipes] framework exception occurred in {origin}", + EngineEventData( + error=serializable_error_info_from_exc_info(exc_info), + ), + ) + @experimental @dataclass diff --git a/python_modules/dagster/dagster/_core/pipes/utils.py b/python_modules/dagster/dagster/_core/pipes/utils.py index 2100f09bb1def..3d2f94277c89b 100644 --- a/python_modules/dagster/dagster/_core/pipes/utils.py +++ b/python_modules/dagster/dagster/_core/pipes/utils.py @@ -174,9 +174,16 @@ def read_messages( thread.join() def _reader_thread(self, handler: "PipesMessageHandler", is_resource_complete: Event) -> None: - for line in tail_file(self._path, lambda: is_resource_complete.is_set()): - message = json.loads(line) - handler.handle_message(message) + try: + for line in tail_file(self._path, lambda: is_resource_complete.is_set()): + message = json.loads(line) + handler.handle_message(message) + except: + handler.report_pipes_framework_exception( + f"{self.__class__.__name__} reader thread", + sys.exc_info(), + ) + raise def no_messages_debug_text(self) -> str: return f"Attempted to read messages from file {self._path}." @@ -302,7 +309,7 @@ def read_messages( messages_thread.start() logs_thread = Thread( target=self._logs_thread, - args=(params, is_session_closed, messages_thread), + args=(handler, params, is_session_closed, messages_thread), daemon=True, ) logs_thread.start() @@ -337,23 +344,33 @@ def _messages_thread( params: PipesParams, is_session_closed: Event, ) -> None: - start_or_last_download = datetime.datetime.now() - while True: - now = datetime.datetime.now() - if (now - start_or_last_download).seconds > self.interval or is_session_closed.is_set(): - start_or_last_download = now - chunk = self.download_messages_chunk(self.counter, params) - if chunk: - for line in chunk.split("\n"): - message = json.loads(line) - handler.handle_message(message) - self.counter += 1 - elif is_session_closed.is_set(): - break - time.sleep(DEFAULT_SLEEP_INTERVAL) + try: + start_or_last_download = datetime.datetime.now() + while True: + now = datetime.datetime.now() + if ( + now - start_or_last_download + ).seconds > self.interval or is_session_closed.is_set(): + start_or_last_download = now + chunk = self.download_messages_chunk(self.counter, params) + if chunk: + for line in chunk.split("\n"): + message = json.loads(line) + handler.handle_message(message) + self.counter += 1 + elif is_session_closed.is_set(): + break + time.sleep(DEFAULT_SLEEP_INTERVAL) + except: + handler.report_pipes_framework_exception( + f"{self.__class__.__name__} messages thread", + sys.exc_info(), + ) + raise def _logs_thread( self, + handler: "PipesMessageHandler", params: PipesParams, is_session_closed: Event, messages_thread: Thread, @@ -407,7 +424,12 @@ def _logs_thread( # Wait for the external process to complete is_session_closed.wait() - + except: + handler.report_pipes_framework_exception( + f"{self.__class__.__name__} logs thread", + sys.exc_info(), + ) + raise finally: for log_reader in self.log_readers: if log_reader.is_running():