Skip to content

Commit

Permalink
docs: update automated docs [SBK-285] (#37)
Browse files Browse the repository at this point in the history
* docs: refactor CircuitBreaker docs

* docs: add docstrings for main application class

* docs: add docstring for base Silverback exception

* docs: add docstring for `BaseRunner.run` method

* refactor: apply suggestions from code review

Co-authored-by: antazoey <[email protected]>

* fix: use Halt exception instead of SilverbackException base

* refactor: add `Error` to less descriptive error classes

---------

Co-authored-by: antazoey <[email protected]>
  • Loading branch information
fubuloubu and antazoey authored Sep 20, 2023
1 parent 8b22872 commit 02a3559
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 14 deletions.
72 changes: 66 additions & 6 deletions silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,30 @@
from ape.utils import ManagerAccessMixin
from taskiq import AsyncTaskiqDecoratedTask, TaskiqEvents

from .exceptions import DuplicateHandler, InvalidContainerType
from .exceptions import DuplicateHandlerError, InvalidContainerTypeError
from .settings import Settings


class SilverbackApp(ManagerAccessMixin):
"""
The application singleton. Must be initialized prior to use.
Usage example::
from silverback import SilverbackApp
app = SilverbackApp()
... # Connection has been initialized, can call broker methods e.g. `app.on_(...)`
"""

def __init__(self, settings: Optional[Settings] = None):
"""
Create app
Args:
settings (Optional[~:class:`silverback.settings.Settings`]): Settings override.
Defaults to environment settings.
"""
if not settings:
settings = Settings()
Expand Down Expand Up @@ -59,22 +75,50 @@ def __init__(self, settings: Optional[Settings] = None):

def on_startup(self) -> Callable:
"""
Code to execute on startup / restart after an error.
Code to execute on worker startup / restart after an error.
Usage example::
@app.on_startup()
def do_something_on_startup(state):
... # Can provision resources, or add things to `state`.
"""
return self.broker.on_event(TaskiqEvents.WORKER_STARTUP)

def on_shutdown(self) -> Callable:
"""
Code to execute on normal shutdown.
Code to execute on normal worker shutdown.
Usage example::
@app.on_shutdown()
def do_something_on_shutdown(state):
... # Update some external service, perhaps using information from `state`.
"""
return self.broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)

def get_block_handler(self) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `block` events.
Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.available_tasks.get("block")

def get_event_handler(
self, event_target: AddressType, event_name: str
) -> Optional[AsyncTaskiqDecoratedTask]:
"""
Get access to the handler for `<event_target>:<event_name>` events.
Args:
event_target (AddressType): The contract address of the target.
event_name: (str): The name of the event emitted by ``event_target``.
Returns:
Optional[AsyncTaskiqDecoratedTask]: Returns decorated task, if one has been created.
"""
return self.broker.available_tasks.get(f"{event_target}/event/{event_name}")

def on_(
Expand All @@ -83,9 +127,23 @@ def on_(
new_block_timeout: Optional[int] = None,
start_block: Optional[int] = None,
):
"""
Create task to handle events created by `container`.
Args:
container: (Union[BlockContainer, ContractEvent]): The event source to watch.
new_block_timeout: (Optional[int]): Override for block timeoui that is acceptable.
Defaults to whatever the app's settings are for default polling timeout are.
start_block (Optional[int]): block number to start processing events from.
Defaults to whatever the latest block is.
Raises:
:class:`~silverback.exceptions.InvalidContainerTypeError`:
If the type of `container` is not configurable for the app.
"""
if isinstance(container, BlockContainer):
if self.get_block_handler():
raise DuplicateHandler("block")
raise DuplicateHandlerError("block")

if new_block_timeout is not None:
if "_blocks_" in self.poll_settings:
Expand All @@ -105,7 +163,9 @@ def on_(
container.contract, ContractInstance
):
if self.get_event_handler(container.contract.address, container.abi.name):
raise DuplicateHandler(f"event {container.contract.address}:{container.abi.name}")
raise DuplicateHandlerError(
f"event {container.contract.address}:{container.abi.name}"
)

key = container.contract.address
if container.contract.address in self.contract_events:
Expand All @@ -131,4 +191,4 @@ def on_(

# TODO: Support account transaction polling
# TODO: Support mempool polling
raise InvalidContainerType(container)
raise InvalidContainerTypeError(container)
10 changes: 5 additions & 5 deletions silverback/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@ class ImportFromStringError(Exception):
pass


class DuplicateHandler(Exception):
class DuplicateHandlerError(Exception):
def __init__(self, handler_type: str):
super().__init__(f"Only one handler allowed for: {handler_type}")


class InvalidContainerType(Exception):
class InvalidContainerTypeError(Exception):
def __init__(self, container: Any):
super().__init__(f"Invalid container type: {container.__class__}")


class NoWebsocketAvailable(Exception):
class NoWebsocketAvailableError(Exception):
def __init__(self):
super().__init__(
"Attempted to a use WebsocketRunner without a websocket-compatible provider."
)


class SilverbackException(ApeException):
pass
"""Base Exception for any Silverback runtime faults."""


class Halt(SilverbackException):
Expand All @@ -35,7 +35,7 @@ def __init__(self):


class CircuitBreaker(SilverbackException):
"""Custom exception (created by user) that should trigger a shutdown."""
"""Custom exception (created by user) that will trigger an application shutdown."""

def __init__(self, message: str):
logger.error(message)
Expand Down
15 changes: 12 additions & 3 deletions silverback/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from taskiq import AsyncTaskiqDecoratedTask, TaskiqResult

from .application import SilverbackApp
from .exceptions import Halt, NoWebsocketAvailable, SilverbackException
from .exceptions import Halt, NoWebsocketAvailableError
from .subscriptions import SubscriptionType, Web3SubscriptionsManager
from .utils import async_wrap_iter

Expand Down Expand Up @@ -46,6 +46,15 @@ async def _event_task(
"""

async def run(self):
"""
Run the task broker client for the assembled ``SilverbackApp`` application.
Will listen for events against the connected provider (using `ManagerAccessMixin` context),
and process them by kicking events over to the configured broker.
Raises:
:class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute.
"""
await self.app.broker.startup()

if block_handler := self.app.get_block_handler():
Expand All @@ -59,7 +68,7 @@ async def run(self):
tasks.append(self._event_task(contract_event, event_handler))

if len(tasks) == 0:
raise SilverbackException("No tasks to execute")
raise Halt("No tasks to execute")

await asyncio.gather(*tasks)

Expand All @@ -77,7 +86,7 @@ def __init__(self, app: SilverbackApp, *args, **kwargs):

# Check for websocket support
if not (ws_uri := app.chain_manager.provider.ws_uri):
raise NoWebsocketAvailable()
raise NoWebsocketAvailableError()

self.ws_uri = ws_uri

Expand Down

0 comments on commit 02a3559

Please sign in to comment.