Skip to content

Commit

Permalink
[pipes] add report_component_exception
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Oct 17, 2023
1 parent 571c27b commit 0d07e23
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 17 deletions.
14 changes: 14 additions & 0 deletions python_modules/dagster/dagster/_core/pipes/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing_extensions import TypeAlias

import dagster._check as check
from dagster import DagsterEvent
from dagster._annotations import experimental, public
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
Expand All @@ -33,12 +34,15 @@
has_one_dimension_time_window_partitioning,
)
from dagster._core.errors import DagsterPipesExecutionError
from dagster._core.events import EngineEventData
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.execution.context.invocation import BoundOpExecutionContext
from dagster._utils.error import ExceptionInfo, serializable_error_info_from_exc_info

if TYPE_CHECKING:
from dagster._core.pipes.client import PipesMessageReader


PipesExecutionResult: TypeAlias = Union[MaterializeResult, AssetCheckResult]


Expand Down Expand Up @@ -193,6 +197,16 @@ def _handle_log(self, message: str, level: str = "info") -> None:
check.str_param(message, "message")
self._context.log.log(level, message)

def report_pipes_framework_exception(self, origin: str, exc_info: ExceptionInfo):
# report as an engine event to provide structured exception data
DagsterEvent.engine_event(
self._context.get_step_execution_context(),
f"[pipes] framework exception occurred in {origin}",
EngineEventData(
error=serializable_error_info_from_exc_info(exc_info),
),
)


@experimental
@dataclass
Expand Down
50 changes: 33 additions & 17 deletions python_modules/dagster/dagster/_core/pipes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,16 @@ def read_messages(
thread.join()

def _reader_thread(self, handler: "PipesMessageHandler", is_resource_complete: Event) -> None:
for line in tail_file(self._path, lambda: is_resource_complete.is_set()):
message = json.loads(line)
handler.handle_message(message)
try:
for line in tail_file(self._path, lambda: is_resource_complete.is_set()):
message = json.loads(line)
handler.handle_message(message)
except:
handler.report_pipes_framework_exception(
f"{self.__class__.__name__} reader thread",
sys.exc_info(),
)
raise

def no_messages_debug_text(self) -> str:
return f"Attempted to read messages from file {self._path}."
Expand Down Expand Up @@ -324,20 +331,29 @@ def _messages_thread(
params: PipesParams,
is_task_complete: Event,
) -> None:
start_or_last_download = datetime.datetime.now()
while True:
now = datetime.datetime.now()
if (now - start_or_last_download).seconds > self.interval or is_task_complete.is_set():
start_or_last_download = now
chunk = self.download_messages_chunk(self.counter, params)
if chunk:
for line in chunk.split("\n"):
message = json.loads(line)
handler.handle_message(message)
self.counter += 1
elif is_task_complete.is_set():
break
time.sleep(1)
try:
start_or_last_download = datetime.datetime.now()
while True:
now = datetime.datetime.now()
if (
now - start_or_last_download
).seconds > self.interval or is_task_complete.is_set():
start_or_last_download = now
chunk = self.download_messages_chunk(self.counter, params)
if chunk:
for line in chunk.split("\n"):
message = json.loads(line)
handler.handle_message(message)
self.counter += 1
elif is_task_complete.is_set():
break
time.sleep(1)
except:
handler.report_pipes_framework_exception(
f"{self.__class__.__name__} reader thread",
sys.exc_info(),
)
raise


class PipesBlobStoreStdioReader(ABC):
Expand Down

0 comments on commit 0d07e23

Please sign in to comment.