diff --git a/example.py b/example.py index 632ee41b..4300fca9 100644 --- a/example.py +++ b/example.py @@ -1,3 +1,4 @@ +import asyncio from typing import Annotated from ape import chain @@ -11,6 +12,9 @@ # Do this first to initialize your app app = SilverbackApp() +# Cannot call `app.state` outside of an app function handler +# app.state.something # NOTE: raises AttributeError + # NOTE: Don't do any networking until after initializing app USDC = tokens["USDC"] YFI = tokens["YFI"] @@ -18,24 +22,37 @@ @app.on_startup() def app_startup(startup_state: StateSnapshot): - # NOTE: This is called just as the app is put into "run" state, - # and handled by the first available worker - # raise Exception # NOTE: Any exception raised on startup aborts immediately + # This is called just as the app is put into "run" state, + # and handled by the first available worker + + # Any exception raised on startup aborts immediately: + # raise Exception # NOTE: raises StartupFailure + + # This is a great place to set `app.state` values + app.state.logs_processed = 0 + # NOTE: Can put anything here, any python object works + return {"block_number": startup_state.last_block_seen} # Can handle some resource initialization for each worker, like LLMs or database connections class MyDB: def execute(self, query: str): - pass + pass # Handle query somehow... @app.on_worker_startup() -def worker_startup(state: TaskiqState): # NOTE: You need the type hint here +def worker_startup(worker_state: TaskiqState): # NOTE: You need the type hint to load worker state + # NOTE: Worker state is per-worker, not shared with other workers # NOTE: Can put anything here, any python object works - state.db = MyDB() - state.block_count = 0 - # raise Exception # NOTE: Any exception raised on worker startup aborts immediately + worker_state.db = MyDB() + worker_state.block_count = 0 + + # Any exception raised on worker startup aborts immediately: + # raise Exception # NOTE: raises StartupFailure + + # Cannot call `app.state` because it is not set up yet on worker startup functions + # app.state.something # NOTE: raises AttributeError # This is how we trigger off of new blocks @@ -57,6 +74,9 @@ def exec_event1(log): # NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself raise ValueError("I don't like the number 3.") + # You can update state whenever you want + app.state.logs_processed += 1 + return {"amount": log.amount} @@ -67,18 +87,26 @@ async def exec_event2(log: ContractLog): # If you ever want the app to immediately shutdown under some scenario, raise this exception raise CircuitBreaker("Oopsie!") + # All `app.state` values are updated across all workers at the same time + app.state.logs_processed += 1 + # Do any other long running tasks... + await asyncio.sleep(5) return log.amount # A final job to execute on Silverback shutdown @app.on_shutdown() def app_shutdown(): - # raise Exception # NOTE: Any exception raised on shutdown is ignored + # NOTE: Any exception raised on worker shutdown is ignored: + # raise Exception return {"some_metric": 123} # Just in case you need to release some resources or something inside each worker @app.on_worker_shutdown() def worker_shutdown(state: TaskiqState): # NOTE: You need the type hint here + # This is a good time to release resources state.db = None - # raise Exception # NOTE: Any exception raised on worker shutdown is ignored + + # NOTE: Any exception raised on worker shutdown is ignored: + # raise Exception diff --git a/silverback/application.py b/silverback/application.py index 2691a98a..9b071442 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -1,4 +1,5 @@ import atexit +from collections import defaultdict from datetime import timedelta from typing import Any, Callable @@ -13,6 +14,7 @@ from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError from .settings import Settings +from .state import AppDatastore, StateSnapshot from .types import SilverbackID, TaskType @@ -33,6 +35,24 @@ class TaskData(BaseModel): # NOTE: Any other items here must have a default value +class SharedState(defaultdict): + def __init__(self): + # Any unknown key returns None + super().__init__(lambda: None) + + def __getattr__(self, attr): + try: + return super().__getattr__(attr) + except AttributeError: + return super().__getitem__(attr) + + def __setattr__(self, attr, val): + try: + super().__setattr__(attr, val) + except AttributeError: + super().__setitem__(attr, val) + + class SilverbackApp(ManagerAccessMixin): """ The application singleton. Must be initialized prior to use. @@ -113,6 +133,16 @@ def __init__(self, settings: Settings | None = None): TaskType.SYSTEM_USER_ALL_TASKDATA, self.__get_user_all_taskdata_handler ) + # TODO: Make backup optional and settings-driven + # TODO: Allow configuring backup class + self.datastore = AppDatastore() + self._load_snapshot = self.__register_system_task( + TaskType.SYSTEM_LOAD_SNAPSHOT, self.__load_snapshot_handler + ) + self._save_snapshot = self.__register_system_task( + TaskType.SYSTEM_SAVE_SNAPSHOT, self.__save_snapshot_handler + ) + def __register_system_task( self, task_type: TaskType, task_handler: Callable ) -> AsyncTaskiqDecoratedTask: @@ -142,6 +172,47 @@ def __get_user_taskdata_handler(self, task_type: TaskType) -> list[TaskData]: def __get_user_all_taskdata_handler(self) -> list[TaskData]: return [v for k, l in self.tasks.items() if str(k).startswith("user:") for v in l] + async def __load_snapshot_handler(self) -> StateSnapshot: + # NOTE: This is not networked in any way, nor thread-safe nor multi-process safe, + # but will be accessible across multiple workers in a single container + # NOTE: *DO NOT USE* in Runner, as it will not be updated by the app + self.state = SharedState() + # NOTE: attribute does not exist before this task is executed, + # ensuring no one uses it during worker startup + + if not (startup_state := await self.datastore.init(app_id=self.identifier)): + logger.warning("No state snapshot detected, using empty snapshot") + # TODO: Refactor to `None` by removing + self.state["system:last_block_seen"] = -1 + self.state["system:last_block_processed"] = -1 + startup_state = StateSnapshot( + # TODO: Migrate these to parameters (remove explicitly from state) + last_block_seen=-1, + last_block_processed=-1, + ) # Use empty snapshot + + return startup_state + + async def __save_snapshot_handler( + self, + last_block_seen: int | None = None, + last_block_processed: int | None = None, + ): + # Task that backups state before/after every non-system runtime task and at shutdown + if last_block_seen is not None: + self.state["system:last_block_seen"] = last_block_seen + + if last_block_processed is not None: + self.state["system:last_block_processed"] = last_block_processed + + snapshot = StateSnapshot( + # TODO: Migrate these to parameters (remove explicitly from state) + last_block_processed=self.state["system:last_block_seen"] or -1, + last_block_seen=self.state["system:last_block_processed"] or -1, + ) + + return await self.datastore.save(snapshot) + def broker_task_decorator( self, task_type: TaskType, diff --git a/silverback/runner.py b/silverback/runner.py index 1f511571..6a2a8c3d 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -14,7 +14,7 @@ from .application import SilverbackApp, SystemConfig, TaskData from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure from .recorder import BaseRecorder, TaskResult -from .state import AppDatastore, StateSnapshot +from .state import StateSnapshot from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .types import TaskType from .utils import ( @@ -37,8 +37,6 @@ def __init__( ): self.app = app self.recorder = recorder - self.state = None - self.datastore = AppDatastore() self.max_exceptions = max_exceptions self.exceptions = 0 @@ -76,26 +74,12 @@ async def _checkpoint( last_block_processed: int | None = None, ): """Set latest checkpoint block number""" - assert self.state, f"{self.__class__.__name__}.run() not triggered." + if not self.save_snapshot_supported: + return # Can't support this feature - logger.debug( - ( - f"Checkpoint block [seen={self.state.last_block_seen}, " - f"procssed={self.state.last_block_processed}]" - ) - ) - - if last_block_seen: - self.state.last_block_seen = last_block_seen - if last_block_processed: - self.state.last_block_processed = last_block_processed - - if self.recorder: - try: - await self.datastore.set_state(self.state) - - except Exception as err: - logger.error(f"Error setting state: {err}") + task = await self.app._save_snapshot.kiq(last_block_seen, last_block_processed) + if (result := await task.wait_result()).is_err: + logger.error(f"Error saving snapshot: {result.error}") @abstractmethod async def _block_task(self, task_data: TaskData): @@ -106,7 +90,7 @@ async def _block_task(self, task_data: TaskData): @abstractmethod async def _event_task(self, task_data: TaskData): """ - handle an event handler task for the given contract event + Handle an event handler task for the given contract event """ async def run(self): @@ -148,20 +132,41 @@ async def run(self): f", available task types:\n- {system_tasks_str}" ) + # NOTE: Bypass snapshotting if unsupported + self.save_snapshot_supported = TaskType.SYSTEM_SAVE_SNAPSHOT in system_tasks + + # Load the snapshot (if available) + # NOTE: Add some additional handling to see if this feature is available in bot + if TaskType.SYSTEM_LOAD_SNAPSHOT not in system_tasks: + logger.warning( + "Silverback no longer supports runner-based snapshotting, " + "please upgrade your bot SDK version to latest." + ) + startup_state = StateSnapshot( + last_block_seen=-1, + last_block_processed=-1, + ) # Use empty snapshot + + elif ( + result := await run_taskiq_task_wait_result( + self._create_system_task_kicker(TaskType.SYSTEM_LOAD_SNAPSHOT) + ) + ).is_err: + raise StartupFailure(result.error) + + else: + startup_state = result.return_value + logger.debug(f"Startup state: {startup_state}") + # NOTE: State snapshot is immediately out of date after init + # NOTE: Do this for other system tasks because they may not be in older SDK versions # `if TaskType. not in system_tasks: raise StartupFailure(...)` # or handle accordingly by having default logic if it is not available - # Initialize recorder (if available) and fetch state if app has been run previously + # Initialize recorder (if available) if self.recorder: await self.recorder.init(app_id=self.app.identifier) - if startup_state := (await self.datastore.init(app_id=self.app.identifier)): - self.state = startup_state - - else: # use empty state - self.state = StateSnapshot(last_block_seen=-1, last_block_processed=-1) - # Execute Silverback startup task before we init the rest startup_taskdata_result = await run_taskiq_task_wait_result( self._create_system_task_kicker(TaskType.SYSTEM_USER_TASKDATA), TaskType.STARTUP @@ -176,7 +181,7 @@ async def run(self): ) startup_task_results = await run_taskiq_task_group_wait_results( - (task_handler for task_handler in startup_task_handlers), self.state + (task_handler for task_handler in startup_task_handlers), startup_state ) if any(result.is_err for result in startup_task_results): diff --git a/silverback/state.py b/silverback/state.py index ddabca78..412fc3f6 100644 --- a/silverback/state.py +++ b/silverback/state.py @@ -43,7 +43,6 @@ async def init(self, app_id: SilverbackID) -> StateSnapshot | None: Path.cwd() / ".silverback-sessions" / app_id.name / app_id.ecosystem / app_id.network ) data_folder.mkdir(parents=True, exist_ok=True) - self.state_backup_file = data_folder / "state.json" return ( @@ -53,12 +52,4 @@ async def init(self, app_id: SilverbackID) -> StateSnapshot | None: ) async def save(self, snapshot: StateSnapshot): - if self.state_backup_file.exists(): - old_snapshot = AppState.parse_file(self.snapshot_backup_file) - if old_snapshot.last_block_seen > snapshot.last_block_seen: - snapshot.last_block_seen = old_snapshot.last_block_seen - if old_snapshot.last_block_processed > snapshot.last_block_processed: - snapshot.last_block_processed = old_snapshot.last_block_processed - - snapshot.last_updated = utc_now() self.state_backup_file.write_text(snapshot.model_dump_json()) diff --git a/silverback/types.py b/silverback/types.py index e1584386..63ee4aea 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -16,6 +16,8 @@ class TaskType(str, Enum): SYSTEM_CONFIG = "system:config" SYSTEM_USER_TASKDATA = "system:user-taskdata" SYSTEM_USER_ALL_TASKDATA = "system:user-all-taskdata" + SYSTEM_LOAD_SNAPSHOT = "system:load-snapshot" + SYSTEM_SAVE_SNAPSHOT = "system:save-snapshot" # User-accessible Tasks STARTUP = "user:startup"