Skip to content

Commit

Permalink
[pipes] add report_component_exception (#17014)
Browse files Browse the repository at this point in the history
Establish a pattern with message reader threads to report exceptions to
the handler.

## How I Tested These Changes

manually tested with the s3 message reader and not having aws auth.
Previously it would fail silently but now reports as:

![Screenshot 2023-10-04 at 5 14 37
PM](https://github.com/dagster-io/dagster/assets/202219/1e60773a-b0e8-47fd-b51a-71b722df99ad)

will add test coverage if we like this approach
  • Loading branch information
alangenfeld authored Nov 1, 2023
1 parent 4061b3e commit e2f9872
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 19 deletions.
16 changes: 16 additions & 0 deletions python_modules/dagster/dagster/_core/pipes/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing_extensions import TypeAlias

import dagster._check as check
from dagster import DagsterEvent
from dagster._annotations import experimental, public
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
Expand All @@ -34,12 +35,15 @@
has_one_dimension_time_window_partitioning,
)
from dagster._core.errors import DagsterPipesExecutionError
from dagster._core.events import EngineEventData
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.execution.context.invocation import BoundOpExecutionContext
from dagster._utils.error import ExceptionInfo, serializable_error_info_from_exc_info

if TYPE_CHECKING:
from dagster._core.pipes.client import PipesMessageReader


PipesExecutionResult: TypeAlias = Union[MaterializeResult, AssetCheckResult]


Expand Down Expand Up @@ -202,6 +206,18 @@ def _handle_log(self, message: str, level: str = "info") -> None:
check.str_param(message, "message")
self._context.log.log(level, message)

def report_pipes_framework_exception(self, origin: str, exc_info: ExceptionInfo):
# use an engine event to provide structured exception, this gives us an event with
# * the context of where the exception happened ([pipes]...)
# * the exception class, message, and stack trace as well as any chained exception
DagsterEvent.engine_event(
self._context.get_step_execution_context(),
f"[pipes] framework exception occurred in {origin}",
EngineEventData(
error=serializable_error_info_from_exc_info(exc_info),
),
)


@experimental
@dataclass
Expand Down
60 changes: 41 additions & 19 deletions python_modules/dagster/dagster/_core/pipes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,16 @@ def read_messages(
thread.join()

def _reader_thread(self, handler: "PipesMessageHandler", is_resource_complete: Event) -> None:
for line in tail_file(self._path, lambda: is_resource_complete.is_set()):
message = json.loads(line)
handler.handle_message(message)
try:
for line in tail_file(self._path, lambda: is_resource_complete.is_set()):
message = json.loads(line)
handler.handle_message(message)
except:
handler.report_pipes_framework_exception(
f"{self.__class__.__name__} reader thread",
sys.exc_info(),
)
raise

def no_messages_debug_text(self) -> str:
return f"Attempted to read messages from file {self._path}."
Expand Down Expand Up @@ -302,7 +309,7 @@ def read_messages(
messages_thread.start()
logs_thread = Thread(
target=self._logs_thread,
args=(params, is_session_closed, messages_thread),
args=(handler, params, is_session_closed, messages_thread),
daemon=True,
)
logs_thread.start()
Expand Down Expand Up @@ -337,23 +344,33 @@ def _messages_thread(
params: PipesParams,
is_session_closed: Event,
) -> None:
start_or_last_download = datetime.datetime.now()
while True:
now = datetime.datetime.now()
if (now - start_or_last_download).seconds > self.interval or is_session_closed.is_set():
start_or_last_download = now
chunk = self.download_messages_chunk(self.counter, params)
if chunk:
for line in chunk.split("\n"):
message = json.loads(line)
handler.handle_message(message)
self.counter += 1
elif is_session_closed.is_set():
break
time.sleep(DEFAULT_SLEEP_INTERVAL)
try:
start_or_last_download = datetime.datetime.now()
while True:
now = datetime.datetime.now()
if (
now - start_or_last_download
).seconds > self.interval or is_session_closed.is_set():
start_or_last_download = now
chunk = self.download_messages_chunk(self.counter, params)
if chunk:
for line in chunk.split("\n"):
message = json.loads(line)
handler.handle_message(message)
self.counter += 1
elif is_session_closed.is_set():
break
time.sleep(DEFAULT_SLEEP_INTERVAL)
except:
handler.report_pipes_framework_exception(
f"{self.__class__.__name__} messages thread",
sys.exc_info(),
)
raise

def _logs_thread(
self,
handler: "PipesMessageHandler",
params: PipesParams,
is_session_closed: Event,
messages_thread: Thread,
Expand Down Expand Up @@ -407,7 +424,12 @@ def _logs_thread(

# Wait for the external process to complete
is_session_closed.wait()

except:
handler.report_pipes_framework_exception(
f"{self.__class__.__name__} logs thread",
sys.exc_info(),
)
raise
finally:
for log_reader in self.log_readers:
if log_reader.is_running():
Expand Down

0 comments on commit e2f9872

Please sign in to comment.