Skip to content

Commit

Permalink
feat: UDF container should crash on exceptions (#160)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored May 3, 2024
1 parent cb37054 commit 06a6fdc
Show file tree
Hide file tree
Showing 34 changed files with 774 additions and 551 deletions.
797 changes: 428 additions & 369 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pynumaflow/mapper/multiproc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def handler(self, keys: list[str], datum: Datum) -> Messages:
# Setting the max value to 2 * CPU count
# Used for multiproc server
self._process_count = min(server_count, 2 * _PROCESS_COUNT)
self.servicer = SyncMapServicer(handler=mapper_instance)
self.servicer = SyncMapServicer(handler=mapper_instance, multiproc=True)

def start(self) -> None:
"""
Expand Down
16 changes: 9 additions & 7 deletions pynumaflow/mapper/servicer/async_servicer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import grpc
from google.protobuf import empty_pb2 as _empty_pb2

from pynumaflow.mapper._dtypes import Datum
from pynumaflow.mapper._dtypes import MapAsyncHandlerCallable, MapSyncCallable
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
from pynumaflow.shared.server import exit_on_error
from pynumaflow.types import NumaflowServicerContext
from pynumaflow._constants import _LOGGER

Expand Down Expand Up @@ -40,22 +40,24 @@ async def MapFn(
watermark=request.watermark.ToDatetime(),
headers=dict(request.headers),
),
context,
)
except Exception as e:
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(str(e))
return map_pb2.MapResponse(results=[])
except BaseException as e:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
exit_on_error(context, repr(e))
return

return map_pb2.MapResponse(results=res)

async def __invoke_map(self, keys: list[str], req: Datum):
async def __invoke_map(self, keys: list[str], req: Datum, context: NumaflowServicerContext):
"""
Invokes the user defined function.
"""
try:
msgs = await self.__map_handler(keys, req)
except Exception as err:
except BaseException as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
exit_on_error(context, repr(err))
raise err
datums = []
for msg in msgs:
Expand Down
9 changes: 4 additions & 5 deletions pynumaflow/mapper/servicer/sync_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ class SyncMapServicer(map_pb2_grpc.MapServicer):
Provides the functionality for the required rpc methods.
"""

def __init__(
self,
handler: MapSyncCallable,
):
def __init__(self, handler: MapSyncCallable, multiproc: bool = False):
self.__map_handler: MapSyncCallable = handler
# This indicates whether the grpc server attached is multiproc or not
self.multiproc = multiproc

def MapFn(
self, request: map_pb2.MapRequest, context: NumaflowServicerContext
Expand All @@ -26,7 +25,7 @@ def MapFn(
Applies a function to each datum element.
The pascal case function name comes from the proto map_pb2_grpc.py file.
"""
return _map_fn_util(self.__map_handler, request, context)
return _map_fn_util(self.__map_handler, request, context, self.multiproc)

def IsReady(
self, request: _empty_pb2.Empty, context: NumaflowServicerContext
Expand Down
15 changes: 9 additions & 6 deletions pynumaflow/mapper/servicer/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import grpc
from pynumaflow.mapper._dtypes import MapSyncCallable

from pynumaflow.mapper._dtypes import Datum
from pynumaflow.proto.mapper import map_pb2
from pynumaflow.shared.server import exit_on_error
from pynumaflow.types import NumaflowServicerContext
from pynumaflow._constants import _LOGGER


def _map_fn_util(
__map_handler: MapSyncCallable, request: map_pb2.MapRequest, context: NumaflowServicerContext
__map_handler: MapSyncCallable,
request: map_pb2.MapRequest,
context: NumaflowServicerContext,
multiproc: bool,
) -> map_pb2.MapResponse:
# proto repeated field(keys) is of type google._upb._message.RepeatedScalarContainer
# we need to explicitly convert it to list
Expand All @@ -23,11 +26,11 @@ def _map_fn_util(
headers=dict(request.headers),
),
)
except Exception as err:
except BaseException as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(str(err))
return map_pb2.MapResponse(results=[])
# Terminate the current server process due to exception
exit_on_error(context, repr(err), multiproc)
return

datums = []

Expand Down
36 changes: 23 additions & 13 deletions pynumaflow/mapstreamer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pynumaflow.mapstreamer import Datum
from pynumaflow.mapstreamer._dtypes import MapStreamCallable
from pynumaflow.proto.mapstreamer import mapstream_pb2_grpc, mapstream_pb2
from pynumaflow.shared.server import exit_on_error
from pynumaflow.types import NumaflowServicerContext
from pynumaflow._constants import _LOGGER

Expand Down Expand Up @@ -33,26 +34,35 @@ async def MapStreamFn(
The pascal case function name comes from the proto mapstream_pb2_grpc.py file.
"""

async for res in self.__invoke_map_stream(
list(request.keys),
Datum(
keys=list(request.keys),
value=request.value,
event_time=request.event_time.ToDatetime(),
watermark=request.watermark.ToDatetime(),
headers=dict(request.headers),
),
):
yield mapstream_pb2.MapStreamResponse(result=res)
try:
async for res in self.__invoke_map_stream(
list(request.keys),
Datum(
keys=list(request.keys),
value=request.value,
event_time=request.event_time.ToDatetime(),
watermark=request.watermark.ToDatetime(),
headers=dict(request.headers),
),
context,
):
yield mapstream_pb2.MapStreamResponse(result=res)
except BaseException as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
exit_on_error(context, repr(err))
return

async def __invoke_map_stream(self, keys: list[str], req: Datum):
async def __invoke_map_stream(
self, keys: list[str], req: Datum, context: NumaflowServicerContext
):
try:
async for msg in self.__map_stream_handler(keys, req):
yield mapstream_pb2.MapStreamResponse.Result(
keys=msg.keys, value=msg.value, tags=msg.tags
)
except Exception as err:
except BaseException as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
exit_on_error(context, repr(err))
raise err

async def IsReady(
Expand Down
33 changes: 22 additions & 11 deletions pynumaflow/reducer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from collections.abc import AsyncIterable
from typing import Union

Expand All @@ -14,6 +15,7 @@
WindowOperation,
)
from pynumaflow.reducer.servicer.task_manager import TaskManager
from pynumaflow.shared.server import exit_on_error
from pynumaflow.types import NumaflowServicerContext


Expand Down Expand Up @@ -86,7 +88,7 @@ async def ReduceFn(
# Create a task manager instance
# The task manager is used to manage lifecycle of the tasks
# required for the reduce operation.
task_manager = TaskManager(handler=self.__reduce_handler)
task_manager = TaskManager(handler=self.__reduce_handler, context=context)

# Start iterating through the request iterator and create tasks
# based on the operation type received.
Expand All @@ -101,11 +103,14 @@ async def ReduceFn(
# append the task data to the existing task
# if the task does not exist, it will create a new task
await task_manager.append_task(request)
except Exception as e:
except BaseException as e:
_LOGGER.critical("Reduce Error", exc_info=True)
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(e.__str__())
raise e
# Send a context abort signal for the rpc, this is required for numa container to get
# the correct grpc error
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
)
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)

# send EOF to all the tasks once the request iterator is exhausted
# This will signal the tasks to stop reading the data on their
Expand All @@ -120,20 +125,26 @@ async def ReduceFn(
for task in res:
fut = task.future
await fut

# For each message in the task result, yield the response
for msg in fut.result():
yield reduce_pb2.ReduceResponse(result=msg, window=task.window)
if fut.result():
for msg in fut.result():
yield reduce_pb2.ReduceResponse(result=msg, window=task.window)

# For each window processed by the ReduceFn send an EOF response
# We send one EOF per window
current_window = task_manager.get_unique_windows()
for window in current_window.values():
# yield the EOF response once the task is completed for a keyed window
yield reduce_pb2.ReduceResponse(window=window, EOF=True)
except Exception as e:
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(e.__str__())
raise e
except BaseException as e:
_LOGGER.critical("Reduce Error", exc_info=True)
# Send a context abort signal for the rpc, this is required for numa container to get
# the correct grpc error
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
)
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)

async def IsReady(
self, request: _empty_pb2.Empty, context: NumaflowServicerContext
Expand Down
23 changes: 20 additions & 3 deletions pynumaflow/reducer/servicer/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from typing import Union
from collections.abc import AsyncIterable

import grpc

from pynumaflow.exceptions import UDFError
from pynumaflow.proto.reducer import reduce_pb2
from pynumaflow.shared.asynciter import NonBlockingIterator
Expand All @@ -20,6 +22,8 @@
ReduceAsyncCallable,
ReduceWindow,
)
from pynumaflow.shared.server import exit_on_error
from pynumaflow.types import NumaflowServicerContext


def build_unique_key_name(keys, window):
Expand All @@ -46,7 +50,11 @@ class TaskManager:
It is created whenever a new reduce operation is requested.
"""

def __init__(self, handler: Union[ReduceAsyncCallable, _ReduceBuilderClass]):
def __init__(
self,
handler: Union[ReduceAsyncCallable, _ReduceBuilderClass],
context: NumaflowServicerContext,
):
# A dictionary to store the task information
self.tasks = {}
# Collection for storing strong references to all running tasks.
Expand All @@ -55,6 +63,8 @@ def __init__(self, handler: Union[ReduceAsyncCallable, _ReduceBuilderClass]):
self.background_tasks = set()
# Handler for the reduce operation
self.__reduce_handler = handler
# Servicer context from grpc, required to abort if error occurs
self.context = context

def get_tasks(self):
"""
Expand Down Expand Up @@ -158,9 +168,16 @@ async def __invoke_reduce(
new_instance = self.__reduce_handler.create()
try:
msgs = await new_instance(keys, request_iterator, md)
except Exception as err:
except BaseException as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
raise err
# Send a context abort signal for the rpc, this is required for numa container to get
# the correct grpc error
await asyncio.gather(
self.context.abort(grpc.StatusCode.UNKNOWN, details=repr(err)),
return_exceptions=True,
)
exit_on_error(err=repr(err), parent=False, context=self.context, update_context=False)
return

datum_responses = []
for msg in msgs:
Expand Down
3 changes: 1 addition & 2 deletions pynumaflow/reducestreamer/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ async def aexec(self):
# Create a new async server instance and add the servicer to it
server = grpc.aio.server()
server.add_insecure_port(self.sock_path)
reduce_servicer = self.servicer
reduce_pb2_grpc.add_ReduceServicer_to_server(reduce_servicer, server)
reduce_pb2_grpc.add_ReduceServicer_to_server(self.servicer, server)
await start_async_server(
server, self.sock_path, self.max_threads, self._server_options, self.server_info_file
)
37 changes: 26 additions & 11 deletions pynumaflow/reducestreamer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ReduceRequest,
)
from pynumaflow.reducestreamer.servicer.task_manager import TaskManager
from pynumaflow.shared.server import exit_on_error
from pynumaflow.types import NumaflowServicerContext


Expand Down Expand Up @@ -44,13 +45,12 @@ def get_exception_traceback_str(exc) -> str:
return file.getvalue().rstrip()


async def handle_error(context: NumaflowServicerContext, e: Exception):
def handle_error(context: NumaflowServicerContext, e: BaseException):
trace = get_exception_traceback_str(e)
_LOGGER.critical(trace)
_LOGGER.critical(e.__str__())
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(e.__str__())
return context


class AsyncReduceStreamServicer(reduce_pb2_grpc.ReduceServicer):
Expand Down Expand Up @@ -111,22 +111,37 @@ async def ReduceFn(
try:
async for msg in consumer:
# If the message is an exception, we raise the exception
if isinstance(msg, Exception):
await handle_error(context, msg)
raise msg
if isinstance(msg, BaseException):
handle_error(context, msg)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(msg)),
return_exceptions=True,
)
exit_on_error(
err=repr(msg), parent=False, context=context, update_context=False
)
return
# Send window EOF response or Window result response
# back to the client
else:
yield msg
except Exception as e:
await handle_error(context, e)
raise e
except BaseException as e:
handle_error(context, e)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
)
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)
return
# Wait for the process_input_stream task to finish for a clean exit
try:
await producer
except Exception as e:
await handle_error(context)
raise e
except BaseException as e:
handle_error(context, e)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
)
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)
return

async def IsReady(
self, request: _empty_pb2.Empty, context: NumaflowServicerContext
Expand Down
Loading

0 comments on commit 06a6fdc

Please sign in to comment.