From 7df0d056e68f7e35b0f02d476ece9a7b08e9c031 Mon Sep 17 00:00:00 2001 From: Eugene Batalov Date: Thu, 2 Jan 2025 16:18:26 +0000 Subject: [PATCH] Fix error handling and log spams in Executor * Generate "customer error" if failed to deserialize graph or function in Initialize RPC. ** These errors are not logged as they are in customer owned code. ** These errors are added to function's stderr so customer can see them. * Fix the test for missing module. It check anything because g.run never raises. * Use short formatting for exceptions in prod (non-dev) mode so only short backtraces are present in logs without fully rendered local vars in the backtraces and etc. This reduces log spam. * Revert previous quickfixes that suppress logging of exception details in Executor. Testing: make check make test --- .../function_executor/function_executor.py | 15 +++++- .../function_executor/single_task_runner.py | 44 +++++++++-------- python-sdk/indexify/executor/task_runner.py | 4 +- .../function_executor_service.py | 20 ++++---- .../proto/function_executor.proto | 1 + .../proto/function_executor_pb2.py | 48 +++++++++---------- .../proto/function_executor_pb2.pyi | 8 +++- python-sdk/indexify/logging.py | 2 +- .../test_graph.py | 17 ++++--- 9 files changed, 90 insertions(+), 69 deletions(-) diff --git a/python-sdk/indexify/executor/function_executor/function_executor.py b/python-sdk/indexify/executor/function_executor/function_executor.py index c692bf449..6497b019d 100644 --- a/python-sdk/indexify/executor/function_executor/function_executor.py +++ b/python-sdk/indexify/executor/function_executor/function_executor.py @@ -23,6 +23,10 @@ ) +class CustomerError(RuntimeError): + pass + + class FunctionExecutor: """Executor side class supporting a running FunctionExecutorServer. @@ -50,7 +54,10 @@ async def initialize( base_url: str, config_path: Optional[str], ): - """Creates and initializes a FunctionExecutorServer and all resources associated with it.""" + """Creates and initializes a FunctionExecutorServer and all resources associated with it. + + Raises CustomerError if the server failed to initialize due to an error in customer owned code or data. + Raises an Exception if an internal error occured.""" try: self._server = await self._server_factory.create( config=config, logger=self._logger @@ -129,5 +136,9 @@ async def _initialize_server( stub: FunctionExecutorStub, initialize_request: InitializeRequest ): initialize_response: InitializeResponse = await stub.initialize(initialize_request) - if not initialize_response.success: + if initialize_response.success: + return + if initialize_response.HasField("customer_error"): + raise CustomerError(initialize_response.customer_error) + else: raise Exception("initialize RPC failed at function executor server") diff --git a/python-sdk/indexify/executor/function_executor/single_task_runner.py b/python-sdk/indexify/executor/function_executor/single_task_runner.py index 8769c6e8b..e48a57b27 100644 --- a/python-sdk/indexify/executor/function_executor/single_task_runner.py +++ b/python-sdk/indexify/executor/function_executor/single_task_runner.py @@ -12,7 +12,7 @@ ) from ..api_objects import Task -from .function_executor import FunctionExecutor +from .function_executor import CustomerError, FunctionExecutor from .function_executor_state import FunctionExecutorState from .server.function_executor_server_factory import ( FunctionExecutorServerConfiguration, @@ -50,7 +50,14 @@ async def run(self) -> TaskOutput: self._state.check_locked() if self._state.function_executor is None: - self._state.function_executor = await self._create_function_executor() + try: + await self._create_function_executor() + except CustomerError as e: + return TaskOutput( + task=self._task_input.task, + stderr=str(e), + success=False, + ) return await self._run() @@ -58,31 +65,28 @@ async def _create_function_executor(self) -> FunctionExecutor: function_executor: FunctionExecutor = FunctionExecutor( server_factory=self._factory, logger=self._logger ) - try: - config: FunctionExecutorServerConfiguration = ( - FunctionExecutorServerConfiguration( - image_uri=self._task_input.task.image_uri, - ) - ) - initialize_request: InitializeRequest = InitializeRequest( - namespace=self._task_input.task.namespace, - graph_name=self._task_input.task.compute_graph, - graph_version=self._task_input.task.graph_version, - function_name=self._task_input.task.compute_fn, - graph=self._task_input.graph, + config: FunctionExecutorServerConfiguration = ( + FunctionExecutorServerConfiguration( + image_uri=self._task_input.task.image_uri, ) + ) + initialize_request: InitializeRequest = InitializeRequest( + namespace=self._task_input.task.namespace, + graph_name=self._task_input.task.compute_graph, + graph_version=self._task_input.task.graph_version, + function_name=self._task_input.task.compute_fn, + graph=self._task_input.graph, + ) + + try: await function_executor.initialize( config=config, initialize_request=initialize_request, base_url=self._base_url, config_path=self._config_path, ) - return function_executor - except Exception as e: - self._logger.error( - f"failed to initialize function executor: {e.details()}", - # exc_info=e.details(), - ) + self._state.function_executor = function_executor + except Exception: await function_executor.destroy() raise diff --git a/python-sdk/indexify/executor/task_runner.py b/python-sdk/indexify/executor/task_runner.py index adb1896e6..08aa6b93b 100644 --- a/python-sdk/indexify/executor/task_runner.py +++ b/python-sdk/indexify/executor/task_runner.py @@ -33,8 +33,8 @@ async def run(self, task_input: TaskInput, logger: Any) -> TaskOutput: return await self._run(task_input, logger) except Exception as e: logger.error( - f"failed running the task: {e.details()}", - # exc_info=e.debug_error_string(), + "failed running the task:", + exc_info=e, ) return TaskOutput.internal_error(task_input.task) diff --git a/python-sdk/indexify/function_executor/function_executor_service.py b/python-sdk/indexify/function_executor/function_executor_service.py index 314a534a5..c78c1d2a3 100644 --- a/python-sdk/indexify/function_executor/function_executor_service.py +++ b/python-sdk/indexify/function_executor/function_executor_service.py @@ -55,24 +55,22 @@ def initialize( # implementing smart caching in customer code. E.g. load a model into GPU only once and # share the model's file descriptor between all tasks or download function configuration # only once. - graph_serializer = get_serializer(request.graph.content_type) - - try: - graph = graph_serializer.deserialize(request.graph.bytes) - except Exception as e: - self._logger.error(f"Caught exception {e}") - return InitializeResponse(success=False) - - self._function = graph_serializer.deserialize(graph[request.function_name]) - self._logger = self._logger.bind( namespace=request.namespace, graph_name=request.graph_name, graph_version=str(request.graph_version), function_name=request.function_name, ) - self._logger.info("initialized function executor service") + graph_serializer = get_serializer(request.graph.content_type) + try: + # Process user controlled input in a try-except block to not treat errors here as our + # internal platform errors. + graph = graph_serializer.deserialize(request.graph.bytes) + self._function = graph_serializer.deserialize(graph[request.function_name]) + except Exception as e: + return InitializeResponse(success=False, customer_error=str(e)) + self._logger.info("initialized function executor service") return InitializeResponse(success=True) def initialize_invocation_state_server( diff --git a/python-sdk/indexify/function_executor/proto/function_executor.proto b/python-sdk/indexify/function_executor/proto/function_executor.proto index 84c68084f..1cb210626 100644 --- a/python-sdk/indexify/function_executor/proto/function_executor.proto +++ b/python-sdk/indexify/function_executor/proto/function_executor.proto @@ -31,6 +31,7 @@ message InitializeRequest { message InitializeResponse { optional bool success = 1; + optional string customer_error = 2; } message SetInvocationStateRequest { diff --git a/python-sdk/indexify/function_executor/proto/function_executor_pb2.py b/python-sdk/indexify/function_executor/proto/function_executor_pb2.py index 354f3ebea..187277f21 100644 --- a/python-sdk/indexify/function_executor/proto/function_executor_pb2.py +++ b/python-sdk/indexify/function_executor/proto/function_executor_pb2.py @@ -24,7 +24,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n8indexify/function_executor/proto/function_executor.proto\x12\x19\x66unction_executor_service"i\n\x10SerializedObject\x12\x0f\n\x05\x62ytes\x18\x01 \x01(\x0cH\x00\x12\x10\n\x06string\x18\x02 \x01(\tH\x00\x12\x19\n\x0c\x63ontent_type\x18\x03 \x01(\tH\x01\x88\x01\x01\x42\x06\n\x04\x64\x61taB\x0f\n\r_content_type"\x88\x02\n\x11InitializeRequest\x12\x16\n\tnamespace\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x17\n\ngraph_name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x1a\n\rgraph_version\x18\x03 \x01(\x05H\x02\x88\x01\x01\x12\x1a\n\rfunction_name\x18\x05 \x01(\tH\x03\x88\x01\x01\x12?\n\x05graph\x18\x07 \x01(\x0b\x32+.function_executor_service.SerializedObjectH\x04\x88\x01\x01\x42\x0c\n\n_namespaceB\r\n\x0b_graph_nameB\x10\n\x0e_graph_versionB\x10\n\x0e_function_nameB\x08\n\x06_graph"6\n\x12InitializeResponse\x12\x14\n\x07success\x18\x01 \x01(\x08H\x00\x88\x01\x01\x42\n\n\x08_success"\x80\x01\n\x19SetInvocationStateRequest\x12\x10\n\x03key\x18\x01 \x01(\tH\x00\x88\x01\x01\x12?\n\x05value\x18\x02 \x01(\x0b\x32+.function_executor_service.SerializedObjectH\x01\x88\x01\x01\x42\x06\n\x04_keyB\x08\n\x06_value"\x1c\n\x1aSetInvocationStateResponse"5\n\x19GetInvocationStateRequest\x12\x10\n\x03key\x18\x01 \x01(\tH\x00\x88\x01\x01\x42\x06\n\x04_key"\x81\x01\n\x1aGetInvocationStateResponse\x12\x10\n\x03key\x18\x01 \x01(\tH\x00\x88\x01\x01\x12?\n\x05value\x18\x02 \x01(\x0b\x32+.function_executor_service.SerializedObjectH\x01\x88\x01\x01\x42\x06\n\x04_keyB\x08\n\x06_value"\xf7\x01\n\x16InvocationStateRequest\x12\x17\n\nrequest_id\x18\x01 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07task_id\x18\x02 \x01(\tH\x02\x88\x01\x01\x12\x43\n\x03set\x18\x03 \x01(\x0b\x32\x34.function_executor_service.SetInvocationStateRequestH\x00\x12\x43\n\x03get\x18\x04 \x01(\x0b\x32\x34.function_executor_service.GetInvocationStateRequestH\x00\x42\t\n\x07requestB\r\n\x0b_request_idB\n\n\x08_task_id"\xfb\x01\n\x17InvocationStateResponse\x12\x17\n\nrequest_id\x18\x01 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07success\x18\x02 \x01(\x08H\x02\x88\x01\x01\x12\x44\n\x03set\x18\x03 \x01(\x0b\x32\x35.function_executor_service.SetInvocationStateResponseH\x00\x12\x44\n\x03get\x18\x04 \x01(\x0b\x32\x35.function_executor_service.GetInvocationStateResponseH\x00\x42\n\n\x08responseB\r\n\x0b_request_idB\n\n\x08_success"N\n\x0e\x46unctionOutput\x12<\n\x07outputs\x18\x01 \x03(\x0b\x32+.function_executor_service.SerializedObject"\x1d\n\x0cRouterOutput\x12\r\n\x05\x65\x64ges\x18\x01 \x03(\t"\xb0\x02\n\x0eRunTaskRequest\x12 \n\x13graph_invocation_id\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x14\n\x07task_id\x18\x06 \x01(\tH\x01\x88\x01\x01\x12H\n\x0e\x66unction_input\x18\t \x01(\x0b\x32+.function_executor_service.SerializedObjectH\x02\x88\x01\x01\x12M\n\x13\x66unction_init_value\x18\n \x01(\x0b\x32+.function_executor_service.SerializedObjectH\x03\x88\x01\x01\x42\x16\n\x14_graph_invocation_idB\n\n\x08_task_idB\x11\n\x0f_function_inputB\x16\n\x14_function_init_value"\xf1\x02\n\x0fRunTaskResponse\x12\x14\n\x07task_id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12G\n\x0f\x66unction_output\x18\x02 \x01(\x0b\x32).function_executor_service.FunctionOutputH\x01\x88\x01\x01\x12\x43\n\rrouter_output\x18\x03 \x01(\x0b\x32\'.function_executor_service.RouterOutputH\x02\x88\x01\x01\x12\x13\n\x06stdout\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x13\n\x06stderr\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x17\n\nis_reducer\x18\x06 \x01(\x08H\x05\x88\x01\x01\x12\x14\n\x07success\x18\x07 \x01(\x08H\x06\x88\x01\x01\x42\n\n\x08_task_idB\x12\n\x10_function_outputB\x10\n\x0e_router_outputB\t\n\x07_stdoutB\t\n\x07_stderrB\r\n\x0b_is_reducerB\n\n\x08_success2\xf2\x02\n\x10\x46unctionExecutor\x12i\n\ninitialize\x12,.function_executor_service.InitializeRequest\x1a-.function_executor_service.InitializeResponse\x12\x8f\x01\n"initialize_invocation_state_server\x12\x32.function_executor_service.InvocationStateResponse\x1a\x31.function_executor_service.InvocationStateRequest(\x01\x30\x01\x12\x61\n\x08run_task\x12).function_executor_service.RunTaskRequest\x1a*.function_executor_service.RunTaskResponseb\x06proto3' + b'\n8indexify/function_executor/proto/function_executor.proto\x12\x19\x66unction_executor_service"i\n\x10SerializedObject\x12\x0f\n\x05\x62ytes\x18\x01 \x01(\x0cH\x00\x12\x10\n\x06string\x18\x02 \x01(\tH\x00\x12\x19\n\x0c\x63ontent_type\x18\x03 \x01(\tH\x01\x88\x01\x01\x42\x06\n\x04\x64\x61taB\x0f\n\r_content_type"\x88\x02\n\x11InitializeRequest\x12\x16\n\tnamespace\x18\x01 \x01(\tH\x00\x88\x01\x01\x12\x17\n\ngraph_name\x18\x02 \x01(\tH\x01\x88\x01\x01\x12\x1a\n\rgraph_version\x18\x03 \x01(\x05H\x02\x88\x01\x01\x12\x1a\n\rfunction_name\x18\x05 \x01(\tH\x03\x88\x01\x01\x12?\n\x05graph\x18\x07 \x01(\x0b\x32+.function_executor_service.SerializedObjectH\x04\x88\x01\x01\x42\x0c\n\n_namespaceB\r\n\x0b_graph_nameB\x10\n\x0e_graph_versionB\x10\n\x0e_function_nameB\x08\n\x06_graph"f\n\x12InitializeResponse\x12\x14\n\x07success\x18\x01 \x01(\x08H\x00\x88\x01\x01\x12\x1b\n\x0e\x63ustomer_error\x18\x02 \x01(\tH\x01\x88\x01\x01\x42\n\n\x08_successB\x11\n\x0f_customer_error"\x80\x01\n\x19SetInvocationStateRequest\x12\x10\n\x03key\x18\x01 \x01(\tH\x00\x88\x01\x01\x12?\n\x05value\x18\x02 \x01(\x0b\x32+.function_executor_service.SerializedObjectH\x01\x88\x01\x01\x42\x06\n\x04_keyB\x08\n\x06_value"\x1c\n\x1aSetInvocationStateResponse"5\n\x19GetInvocationStateRequest\x12\x10\n\x03key\x18\x01 \x01(\tH\x00\x88\x01\x01\x42\x06\n\x04_key"\x81\x01\n\x1aGetInvocationStateResponse\x12\x10\n\x03key\x18\x01 \x01(\tH\x00\x88\x01\x01\x12?\n\x05value\x18\x02 \x01(\x0b\x32+.function_executor_service.SerializedObjectH\x01\x88\x01\x01\x42\x06\n\x04_keyB\x08\n\x06_value"\xf7\x01\n\x16InvocationStateRequest\x12\x17\n\nrequest_id\x18\x01 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07task_id\x18\x02 \x01(\tH\x02\x88\x01\x01\x12\x43\n\x03set\x18\x03 \x01(\x0b\x32\x34.function_executor_service.SetInvocationStateRequestH\x00\x12\x43\n\x03get\x18\x04 \x01(\x0b\x32\x34.function_executor_service.GetInvocationStateRequestH\x00\x42\t\n\x07requestB\r\n\x0b_request_idB\n\n\x08_task_id"\xfb\x01\n\x17InvocationStateResponse\x12\x17\n\nrequest_id\x18\x01 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07success\x18\x02 \x01(\x08H\x02\x88\x01\x01\x12\x44\n\x03set\x18\x03 \x01(\x0b\x32\x35.function_executor_service.SetInvocationStateResponseH\x00\x12\x44\n\x03get\x18\x04 \x01(\x0b\x32\x35.function_executor_service.GetInvocationStateResponseH\x00\x42\n\n\x08responseB\r\n\x0b_request_idB\n\n\x08_success"N\n\x0e\x46unctionOutput\x12<\n\x07outputs\x18\x01 \x03(\x0b\x32+.function_executor_service.SerializedObject"\x1d\n\x0cRouterOutput\x12\r\n\x05\x65\x64ges\x18\x01 \x03(\t"\xb0\x02\n\x0eRunTaskRequest\x12 \n\x13graph_invocation_id\x18\x04 \x01(\tH\x00\x88\x01\x01\x12\x14\n\x07task_id\x18\x06 \x01(\tH\x01\x88\x01\x01\x12H\n\x0e\x66unction_input\x18\t \x01(\x0b\x32+.function_executor_service.SerializedObjectH\x02\x88\x01\x01\x12M\n\x13\x66unction_init_value\x18\n \x01(\x0b\x32+.function_executor_service.SerializedObjectH\x03\x88\x01\x01\x42\x16\n\x14_graph_invocation_idB\n\n\x08_task_idB\x11\n\x0f_function_inputB\x16\n\x14_function_init_value"\xf1\x02\n\x0fRunTaskResponse\x12\x14\n\x07task_id\x18\x01 \x01(\tH\x00\x88\x01\x01\x12G\n\x0f\x66unction_output\x18\x02 \x01(\x0b\x32).function_executor_service.FunctionOutputH\x01\x88\x01\x01\x12\x43\n\rrouter_output\x18\x03 \x01(\x0b\x32\'.function_executor_service.RouterOutputH\x02\x88\x01\x01\x12\x13\n\x06stdout\x18\x04 \x01(\tH\x03\x88\x01\x01\x12\x13\n\x06stderr\x18\x05 \x01(\tH\x04\x88\x01\x01\x12\x17\n\nis_reducer\x18\x06 \x01(\x08H\x05\x88\x01\x01\x12\x14\n\x07success\x18\x07 \x01(\x08H\x06\x88\x01\x01\x42\n\n\x08_task_idB\x12\n\x10_function_outputB\x10\n\x0e_router_outputB\t\n\x07_stdoutB\t\n\x07_stderrB\r\n\x0b_is_reducerB\n\n\x08_success2\xf2\x02\n\x10\x46unctionExecutor\x12i\n\ninitialize\x12,.function_executor_service.InitializeRequest\x1a-.function_executor_service.InitializeResponse\x12\x8f\x01\n"initialize_invocation_state_server\x12\x32.function_executor_service.InvocationStateResponse\x1a\x31.function_executor_service.InvocationStateRequest(\x01\x30\x01\x12\x61\n\x08run_task\x12).function_executor_service.RunTaskRequest\x1a*.function_executor_service.RunTaskResponseb\x06proto3' ) _globals = globals() @@ -39,27 +39,27 @@ _globals["_INITIALIZEREQUEST"]._serialized_start = 195 _globals["_INITIALIZEREQUEST"]._serialized_end = 459 _globals["_INITIALIZERESPONSE"]._serialized_start = 461 - _globals["_INITIALIZERESPONSE"]._serialized_end = 515 - _globals["_SETINVOCATIONSTATEREQUEST"]._serialized_start = 518 - _globals["_SETINVOCATIONSTATEREQUEST"]._serialized_end = 646 - _globals["_SETINVOCATIONSTATERESPONSE"]._serialized_start = 648 - _globals["_SETINVOCATIONSTATERESPONSE"]._serialized_end = 676 - _globals["_GETINVOCATIONSTATEREQUEST"]._serialized_start = 678 - _globals["_GETINVOCATIONSTATEREQUEST"]._serialized_end = 731 - _globals["_GETINVOCATIONSTATERESPONSE"]._serialized_start = 734 - _globals["_GETINVOCATIONSTATERESPONSE"]._serialized_end = 863 - _globals["_INVOCATIONSTATEREQUEST"]._serialized_start = 866 - _globals["_INVOCATIONSTATEREQUEST"]._serialized_end = 1113 - _globals["_INVOCATIONSTATERESPONSE"]._serialized_start = 1116 - _globals["_INVOCATIONSTATERESPONSE"]._serialized_end = 1367 - _globals["_FUNCTIONOUTPUT"]._serialized_start = 1369 - _globals["_FUNCTIONOUTPUT"]._serialized_end = 1447 - _globals["_ROUTEROUTPUT"]._serialized_start = 1449 - _globals["_ROUTEROUTPUT"]._serialized_end = 1478 - _globals["_RUNTASKREQUEST"]._serialized_start = 1481 - _globals["_RUNTASKREQUEST"]._serialized_end = 1785 - _globals["_RUNTASKRESPONSE"]._serialized_start = 1788 - _globals["_RUNTASKRESPONSE"]._serialized_end = 2157 - _globals["_FUNCTIONEXECUTOR"]._serialized_start = 2160 - _globals["_FUNCTIONEXECUTOR"]._serialized_end = 2530 + _globals["_INITIALIZERESPONSE"]._serialized_end = 563 + _globals["_SETINVOCATIONSTATEREQUEST"]._serialized_start = 566 + _globals["_SETINVOCATIONSTATEREQUEST"]._serialized_end = 694 + _globals["_SETINVOCATIONSTATERESPONSE"]._serialized_start = 696 + _globals["_SETINVOCATIONSTATERESPONSE"]._serialized_end = 724 + _globals["_GETINVOCATIONSTATEREQUEST"]._serialized_start = 726 + _globals["_GETINVOCATIONSTATEREQUEST"]._serialized_end = 779 + _globals["_GETINVOCATIONSTATERESPONSE"]._serialized_start = 782 + _globals["_GETINVOCATIONSTATERESPONSE"]._serialized_end = 911 + _globals["_INVOCATIONSTATEREQUEST"]._serialized_start = 914 + _globals["_INVOCATIONSTATEREQUEST"]._serialized_end = 1161 + _globals["_INVOCATIONSTATERESPONSE"]._serialized_start = 1164 + _globals["_INVOCATIONSTATERESPONSE"]._serialized_end = 1415 + _globals["_FUNCTIONOUTPUT"]._serialized_start = 1417 + _globals["_FUNCTIONOUTPUT"]._serialized_end = 1495 + _globals["_ROUTEROUTPUT"]._serialized_start = 1497 + _globals["_ROUTEROUTPUT"]._serialized_end = 1526 + _globals["_RUNTASKREQUEST"]._serialized_start = 1529 + _globals["_RUNTASKREQUEST"]._serialized_end = 1833 + _globals["_RUNTASKRESPONSE"]._serialized_start = 1836 + _globals["_RUNTASKRESPONSE"]._serialized_end = 2205 + _globals["_FUNCTIONEXECUTOR"]._serialized_start = 2208 + _globals["_FUNCTIONEXECUTOR"]._serialized_end = 2578 # @@protoc_insertion_point(module_scope) diff --git a/python-sdk/indexify/function_executor/proto/function_executor_pb2.pyi b/python-sdk/indexify/function_executor/proto/function_executor_pb2.pyi index 320632b4c..272cf9086 100644 --- a/python-sdk/indexify/function_executor/proto/function_executor_pb2.pyi +++ b/python-sdk/indexify/function_executor/proto/function_executor_pb2.pyi @@ -47,10 +47,14 @@ class InitializeRequest(_message.Message): ) -> None: ... class InitializeResponse(_message.Message): - __slots__ = ("success",) + __slots__ = ("success", "customer_error") SUCCESS_FIELD_NUMBER: _ClassVar[int] + CUSTOMER_ERROR_FIELD_NUMBER: _ClassVar[int] success: bool - def __init__(self, success: bool = ...) -> None: ... + customer_error: str + def __init__( + self, success: bool = ..., customer_error: _Optional[str] = ... + ) -> None: ... class SetInvocationStateRequest(_message.Message): __slots__ = ("key", "value") diff --git a/python-sdk/indexify/logging.py b/python-sdk/indexify/logging.py index 64de90028..13b9dc409 100644 --- a/python-sdk/indexify/logging.py +++ b/python-sdk/indexify/logging.py @@ -26,7 +26,7 @@ def configure_logging_early(): def configure_production_logging(): processors = [ - structlog.processors.dict_tracebacks, + structlog.processors.format_exc_info, structlog.processors.JSONRenderer(), ] structlog.configure(processors=processors) diff --git a/python-sdk/tests/broken_graph_module_error_test/test_graph.py b/python-sdk/tests/broken_graph_module_error_test/test_graph.py index def27497b..518070b38 100644 --- a/python-sdk/tests/broken_graph_module_error_test/test_graph.py +++ b/python-sdk/tests/broken_graph_module_error_test/test_graph.py @@ -1,12 +1,11 @@ -import sys +import io import unittest +from contextlib import redirect_stdout from extractors import extractor_a, extractor_c from indexify import RemoteGraph -from indexify.functions_sdk.data_objects import File from indexify.functions_sdk.graph import Graph -from indexify.functions_sdk.indexify_functions import indexify_function def create_broken_graph(): @@ -25,13 +24,17 @@ def test_broken_graph(self): g = create_broken_graph() g = RemoteGraph.deploy(g=g) - self.assertRaises( - Exception, + # We don't have a public SDK API to read a function's stderr + # so we rely on internal SDK behavior where it prints a failed function's + # stderr to the current stdout. + func_stdout: io.StringIO = io.StringIO() + with redirect_stdout(func_stdout): g.run( block_until_done=True, a=10, - ), - ) + ) + # Use regex because rich formatting characters are present in the output. + self.assertRegex(func_stdout.getvalue(), r"No module named.*'first_p_dep'") if __name__ == "__main__":