diff --git a/silverback/application.py b/silverback/application.py index c72ce896..f97e4904 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -14,7 +14,7 @@ from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError from .settings import Settings -from .state import AppDatastore, StateSnapshot +from .state import StateSnapshot from .types import SilverbackID, TaskType @@ -161,15 +161,11 @@ def __init__(self, settings: Settings | None = None): self._get_user_all_taskdata = self.__register_system_task( TaskType.SYSTEM_USER_ALL_TASKDATA, self.__get_user_all_taskdata_handler ) - - # TODO: Make backup optional and settings-driven - # TODO: Allow configuring backup class - self.datastore = AppDatastore() self._load_snapshot = self.__register_system_task( TaskType.SYSTEM_LOAD_SNAPSHOT, self.__load_snapshot_handler ) - self._save_snapshot = self.__register_system_task( - TaskType.SYSTEM_SAVE_SNAPSHOT, self.__save_snapshot_handler + self._create_snapshot = self.__register_system_task( + TaskType.SYSTEM_CREATE_SNAPSHOT, self.__create_snapshot_handler ) def __register_system_task( @@ -201,45 +197,34 @@ def __get_user_taskdata_handler(self, task_type: TaskType) -> list[TaskData]: def __get_user_all_taskdata_handler(self) -> list[TaskData]: return [v for k, l in self.tasks.items() if str(k).startswith("user:") for v in l] - async def __load_snapshot_handler(self) -> StateSnapshot: + async def __load_snapshot_handler(self, startup_state: StateSnapshot): # NOTE: *DO NOT USE* in Runner, as it will not be updated by the app self.state = SharedState() # NOTE: attribute does not exist before this task is executed, # ensuring no one uses it during worker startup - if not (startup_state := await self.datastore.init(app_id=self.identifier)): - logger.warning("No state snapshot detected, using empty snapshot") - # TODO: Refactor to `None` by removing - self.state["system:last_block_seen"] = -1 - self.state["system:last_block_processed"] = -1 - startup_state = StateSnapshot( - # TODO: Migrate these to parameters (remove explicitly from state) - last_block_seen=-1, - last_block_processed=-1, - ) # Use empty snapshot - - return startup_state + self.state["system:last_block_seen"] = startup_state.last_block_seen + self.state["system:last_block_processed"] = startup_state.last_block_processed + # TODO: Load user custom state (should not start with `system:`) - async def __save_snapshot_handler( + async def __create_snapshot_handler( self, last_block_seen: int | None = None, last_block_processed: int | None = None, ): - # Task that backups state before/after every non-system runtime task and at shutdown + # Task that updates state checkpoints before/after every non-system runtime task/at shutdown if last_block_seen is not None: self.state["system:last_block_seen"] = last_block_seen if last_block_processed is not None: self.state["system:last_block_processed"] = last_block_processed - snapshot = StateSnapshot( + return StateSnapshot( # TODO: Migrate these to parameters (remove explicitly from state) - last_block_processed=self.state["system:last_block_seen"] or -1, - last_block_seen=self.state["system:last_block_processed"] or -1, + last_block_seen=self.state.get("system:last_block_seen", -1), + last_block_processed=self.state.get("system:last_block_processed", -1), ) - return await self.datastore.save(snapshot) - def broker_task_decorator( self, task_type: TaskType, diff --git a/silverback/runner.py b/silverback/runner.py index b9009a5a..339aac41 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -14,7 +14,7 @@ from .application import SilverbackApp, SystemConfig, TaskData from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure from .recorder import BaseRecorder, TaskResult -from .state import StateSnapshot +from .state import AppDatastore, StateSnapshot from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .types import TaskType from .utils import ( @@ -36,6 +36,10 @@ def __init__( **kwargs, ): self.app = app + + # TODO: Make datastore optional and settings-driven + # TODO: Allow configuring datastore class + self.datastore = AppDatastore() self.recorder = recorder self.max_exceptions = max_exceptions @@ -74,12 +78,14 @@ async def _checkpoint( last_block_processed: int | None = None, ): """Set latest checkpoint block number""" - if not self.save_snapshot_supported: + if not self._snapshotting_supported: return # Can't support this feature - task = await self.app._save_snapshot.kiq(last_block_seen, last_block_processed) + task = await self.app._create_snapshot.kiq(last_block_seen, last_block_processed) if (result := await task.wait_result()).is_err: logger.error(f"Error saving snapshot: {result.error}") + else: + await self.datastore.save(result.return_value) @abstractmethod async def _block_task(self, task_data: TaskData): @@ -133,32 +139,39 @@ async def run(self): ) # NOTE: Bypass snapshotting if unsupported - self.save_snapshot_supported = TaskType.SYSTEM_SAVE_SNAPSHOT in system_tasks + self._snapshotting_supported = TaskType.SYSTEM_CREATE_SNAPSHOT in system_tasks # Load the snapshot (if available) # NOTE: Add some additional handling to see if this feature is available in bot if TaskType.SYSTEM_LOAD_SNAPSHOT not in system_tasks: logger.warning( "Silverback no longer supports runner-based snapshotting, " - "please upgrade your bot SDK version to latest." + "please upgrade your bot SDK version to latest to use snapshots." ) startup_state = StateSnapshot( last_block_seen=-1, last_block_processed=-1, ) # Use empty snapshot - elif ( + elif not (startup_state := await self.datastore.init(app_id=self.app.identifier)): + logger.warning("No state snapshot detected, using empty snapshot") + startup_state = StateSnapshot( + # TODO: Migrate these to parameters (remove explicitly from state) + last_block_seen=-1, + last_block_processed=-1, + ) # Use empty snapshot + + logger.debug(f"Startup state: {startup_state}") + # NOTE: State snapshot is immediately out of date after init + + # Send startup state to app + if ( result := await run_taskiq_task_wait_result( - self._create_system_task_kicker(TaskType.SYSTEM_LOAD_SNAPSHOT) + self._create_system_task_kicker(TaskType.SYSTEM_LOAD_SNAPSHOT), startup_state ) ).is_err: raise StartupFailure(result.error) - else: - startup_state = result.return_value - logger.debug(f"Startup state: {startup_state}") - # NOTE: State snapshot is immediately out of date after init - # NOTE: Do this for other system tasks because they may not be in older SDK versions # `if TaskType. not in system_tasks: raise StartupFailure(...)` # or handle accordingly by having default logic if it is not available @@ -274,7 +287,11 @@ async def run(self): # NOTE: No need to handle results otherwise - await self.app.broker.shutdown() + if self._snapshotting_supported: + # Do one last checkpoint to save a snapshot of final state + await self._checkpoint() + + await self.app.broker.shutdown() # Release broker class WebsocketRunner(BaseRunner, ManagerAccessMixin): diff --git a/silverback/types.py b/silverback/types.py index 44d6645d..68be6721 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -17,7 +17,7 @@ class TaskType(str, Enum): SYSTEM_USER_TASKDATA = "system:user-taskdata" SYSTEM_USER_ALL_TASKDATA = "system:user-all-taskdata" SYSTEM_LOAD_SNAPSHOT = "system:load-snapshot" - SYSTEM_SAVE_SNAPSHOT = "system:save-snapshot" + SYSTEM_CREATE_SNAPSHOT = "system:create-snapshot" # User-accessible Tasks STARTUP = "user:startup"