Skip to content

Commit

Permalink
feat: serve the service
Browse files Browse the repository at this point in the history
Signed-off-by: Frost Ming <[email protected]>
  • Loading branch information
frostming committed Oct 24, 2023
1 parent 2911853 commit 05b76ae
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 130 deletions.
16 changes: 11 additions & 5 deletions src/bentoml/_internal/runner/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@

from ..resource import get_resource
from ..resource import system_resources
from .runnable import Runnable

if t.TYPE_CHECKING:
from bentoml_io import Servable

from .runnable import Runnable

ServableType = t.Union[type[Servable], type[Runnable]]

logger = logging.getLogger(__name__)

Expand All @@ -17,7 +23,7 @@ class Strategy(abc.ABC):
@abc.abstractmethod
def get_worker_count(
cls,
runnable_class: t.Type[Runnable],
runnable_class: ServableType,
resource_request: dict[str, t.Any] | None,
workers_per_resource: int | float,
) -> int:
Expand All @@ -27,7 +33,7 @@ def get_worker_count(
@abc.abstractmethod
def get_worker_env(
cls,
runnable_class: t.Type[Runnable],
runnable_class: ServableType,
resource_request: dict[str, t.Any] | None,
workers_per_resource: int | float,
worker_index: int,
Expand Down Expand Up @@ -60,7 +66,7 @@ class DefaultStrategy(Strategy):
@classmethod
def get_worker_count(
cls,
runnable_class: t.Type[Runnable],
runnable_class: ServableType,
resource_request: dict[str, t.Any] | None,
workers_per_resource: int | float,
) -> int:
Expand Down Expand Up @@ -103,7 +109,7 @@ def get_worker_count(
@classmethod
def get_worker_env(
cls,
runnable_class: t.Type[Runnable],
runnable_class: ServableType,
resource_request: dict[str, t.Any] | None,
workers_per_resource: int | float,
worker_index: int,
Expand Down
2 changes: 1 addition & 1 deletion src/bentoml/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ def serve_http_production(
name="api_server",
args=api_server_args,
working_dir=working_dir,
numprocesses=svc.required_workers_num or api_workers,
numprocesses=api_workers,
close_child_stdin=close_child_stdin,
)
)
Expand Down
3 changes: 3 additions & 0 deletions src/bentoml_io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from ._pydantic import add_custom_preparers
from .api import api as api
from .servable import Servable as Servable
from .server import APIService as APIService
from .server import Service as Service

add_custom_preparers()
del add_custom_preparers
2 changes: 1 addition & 1 deletion src/bentoml_io/client/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ def call(self, name: str, *args: t.Any, **kwargs: t.Any) -> t.Any:
meth = getattr(self.servable, name)
if inspect.isgeneratorfunction(meth):
return sync_gen_to_async(meth(*args, **kwargs))
elif not is_async_callable(meth):
elif not is_async_callable(meth) and not inspect.isasyncgenfunction(meth):
return run_in_threadpool(meth, *args, **kwargs)
return meth(*args, **kwargs)
2 changes: 1 addition & 1 deletion src/bentoml_io/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def from_output(cls, func: t.Callable[..., t.Any]) -> type[IODescriptor]:


def ensure_io_descriptor(output_type: type) -> type[IODescriptor]:
if issubclass(output_type, BaseModel):
if inspect.isclass(output_type) and issubclass(output_type, BaseModel):
if not issubclass(output_type, IODescriptor):

class Output(output_type, IOMixin):
Expand Down
5 changes: 0 additions & 5 deletions src/bentoml_io/servable.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@
from .client.base import AbstractClient


class DependencySpec(t.NamedTuple):
connect_string: str | None
servable_cls: type[Servable]


class Servable:
__servable_methods__: dict[str, APIMethod[..., t.Any]] = {}
# User defined attributes
Expand Down
1 change: 1 addition & 0 deletions src/bentoml_io/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
A reference implementation of serving a BentoML service.
This will be eventually migrated to Rust.
"""
from .service import APIService as APIService
from .service import Service as Service
83 changes: 64 additions & 19 deletions src/bentoml_io/server/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml._internal.context import ServiceContext
from bentoml._internal.runner.strategy import DefaultStrategy
from bentoml._internal.runner.strategy import Strategy
from bentoml.exceptions import BentoMLException

from ..client import ClientManager
Expand All @@ -27,33 +29,28 @@
HookF_ctx = t.TypeVar("HookF_ctx", bound=ContextFunc)


@attrs.define
@attrs.define(slots=False)
class Service:
servable_cls: type[Servable]
args: tuple[t.Any, ...] = attrs.field(default=(), repr=False)
kwargs: dict[str, t.Any] = attrs.field(factory=dict, repr=False)
args: tuple[t.Any, ...] = attrs.field(default=())
kwargs: dict[str, t.Any] = attrs.field(factory=dict)
dependencies: list[Service] = attrs.field(factory=list)
strategy: type[Strategy] = attrs.field(default=DefaultStrategy, kw_only=True)
mount_apps: list[tuple[ext.ASGIApp, str, str]] = attrs.field(
factory=list, init=False, repr=False
factory=list, init=False
)
middlewares: list[tuple[type[ext.AsgiMiddleware], dict[str, t.Any]]] = attrs.field(
factory=list, init=False, repr=False
)
startup_hooks: list[LifecycleHook] = attrs.field(
factory=list, init=False, repr=False
)
shutdown_hooks: list[LifecycleHook] = attrs.field(
factory=list, init=False, repr=False
factory=list, init=False
)
startup_hooks: list[LifecycleHook] = attrs.field(factory=list, init=False)
shutdown_hooks: list[LifecycleHook] = attrs.field(factory=list, init=False)
# service context
context: ServiceContext = attrs.field(
init=False, factory=ServiceContext, repr=False
)
context: ServiceContext = attrs.field(init=False, factory=ServiceContext)
# import info
_caller_module: str = attrs.field(init=False, repr=False)
_working_dir: str = attrs.field(init=False, repr=False, factory=os.getcwd)
_servable: Servable | None = attrs.field(init=False, repr=False, default=None)
_client_manager: ClientManager = attrs.field(init=False, repr=False)
_caller_module: str = attrs.field(init=False)
_working_dir: str = attrs.field(init=False, factory=os.getcwd)
_servable: Servable | None = attrs.field(init=False, default=None)
_client_manager: ClientManager = attrs.field(init=False)

def __attrs_post_init__(self):
self.startup_hooks.append(self.init_servable) # type: ignore
Expand All @@ -64,6 +61,9 @@ async def cleanup() -> None:

self.shutdown_hooks.append(cleanup)

def __repr__(self) -> str:
return f"<{self.__class__.__name__} name={self.name}>"

@_caller_module.default
def get_caller_module(self) -> str:
if __name__ == "__main__":
Expand Down Expand Up @@ -152,6 +152,35 @@ def on_shutdown(self, func: HookF_ctx) -> HookF_ctx:
self.shutdown_hooks.append(partial(func, self.context))
return func

@cached_property
def config(self) -> dict[str, t.Any]:
config = BentoMLContainer.runners_config.get()
if self.name in config:
return config[self.name]
return config

@property
def worker_count(self) -> int:
config = self.config
return self.strategy.get_worker_count(
self.servable_cls,
config["resources"],
config.get("workers_per_resource", 1),
)

@property
def worker_env_map(self) -> list[dict[str, t.Any]]:
config = self.config
return [
self.strategy.get_worker_env(
self.servable_cls,
config["resources"],
config.get("workers_per_resource", 1),
i,
)
for i in range(self.worker_count)
]

@inject
def serve_http(
self,
Expand All @@ -174,10 +203,14 @@ def serve_http(
development_mode: bool = False,
reload: bool = False,
) -> None:
from bentoml._internal.log import configure_logging

from .serving import serve_http_production

configure_logging()

if working_dir is None:
working_dir = os.getcwd()
working_dir = self._working_dir
serve_http_production(
self,
working_dir=working_dir,
Expand All @@ -197,3 +230,15 @@ def serve_http(
development_mode=development_mode,
reload=reload,
)


class APIService(Service):
"""A service that doesn't scale on requested resources"""

@property
def worker_count(self) -> int:
return BentoMLContainer.api_server_workers.get()

@property
def worker_env_map(self) -> list[dict[str, t.Any]]:
return []
Loading

0 comments on commit 05b76ae

Please sign in to comment.