diff --git a/python_modules/dagster/dagster/_core/pipes/context.py b/python_modules/dagster/dagster/_core/pipes/context.py index 332aa0afc0d98..270532a3febb4 100644 --- a/python_modules/dagster/dagster/_core/pipes/context.py +++ b/python_modules/dagster/dagster/_core/pipes/context.py @@ -20,6 +20,7 @@ from typing_extensions import TypeAlias import dagster._check as check +from dagster import DagsterEvent from dagster._annotations import experimental, public from dagster._core.definitions.asset_check_result import AssetCheckResult from dagster._core.definitions.asset_check_spec import AssetCheckSeverity @@ -33,12 +34,15 @@ has_one_dimension_time_window_partitioning, ) from dagster._core.errors import DagsterPipesExecutionError +from dagster._core.events import EngineEventData from dagster._core.execution.context.compute import OpExecutionContext from dagster._core.execution.context.invocation import BoundOpExecutionContext +from dagster._utils.error import ExceptionInfo, serializable_error_info_from_exc_info if TYPE_CHECKING: from dagster._core.pipes.client import PipesMessageReader + PipesExecutionResult: TypeAlias = Union[MaterializeResult, AssetCheckResult] @@ -193,6 +197,16 @@ def _handle_log(self, message: str, level: str = "info") -> None: check.str_param(message, "message") self._context.log.log(level, message) + def report_pipes_framework_exception(self, origin: str, exc_info: ExceptionInfo): + # report as an engine event to provide structured exception data + DagsterEvent.engine_event( + self._context.get_step_execution_context(), + f"[pipes] framework exception occurred in {origin}", + EngineEventData( + error=serializable_error_info_from_exc_info(exc_info), + ), + ) + @experimental @dataclass diff --git a/python_modules/dagster/dagster/_core/pipes/utils.py b/python_modules/dagster/dagster/_core/pipes/utils.py index 866b8b005fd2f..1bc2af908067d 100644 --- a/python_modules/dagster/dagster/_core/pipes/utils.py +++ b/python_modules/dagster/dagster/_core/pipes/utils.py @@ -171,9 +171,16 @@ def read_messages( thread.join() def _reader_thread(self, handler: "PipesMessageHandler", is_resource_complete: Event) -> None: - for line in tail_file(self._path, lambda: is_resource_complete.is_set()): - message = json.loads(line) - handler.handle_message(message) + try: + for line in tail_file(self._path, lambda: is_resource_complete.is_set()): + message = json.loads(line) + handler.handle_message(message) + except: + handler.report_pipes_framework_exception( + f"{self.__class__.__name__} reader thread", + sys.exc_info(), + ) + raise def no_messages_debug_text(self) -> str: return f"Attempted to read messages from file {self._path}." @@ -324,20 +331,29 @@ def _messages_thread( params: PipesParams, is_task_complete: Event, ) -> None: - start_or_last_download = datetime.datetime.now() - while True: - now = datetime.datetime.now() - if (now - start_or_last_download).seconds > self.interval or is_task_complete.is_set(): - start_or_last_download = now - chunk = self.download_messages_chunk(self.counter, params) - if chunk: - for line in chunk.split("\n"): - message = json.loads(line) - handler.handle_message(message) - self.counter += 1 - elif is_task_complete.is_set(): - break - time.sleep(1) + try: + start_or_last_download = datetime.datetime.now() + while True: + now = datetime.datetime.now() + if ( + now - start_or_last_download + ).seconds > self.interval or is_task_complete.is_set(): + start_or_last_download = now + chunk = self.download_messages_chunk(self.counter, params) + if chunk: + for line in chunk.split("\n"): + message = json.loads(line) + handler.handle_message(message) + self.counter += 1 + elif is_task_complete.is_set(): + break + time.sleep(1) + except: + handler.report_pipes_framework_exception( + f"{self.__class__.__name__} reader thread", + sys.exc_info(), + ) + raise class PipesBlobStoreStdioReader(ABC):