From 0e65170ac8160160d5e6978d512a7f0f89fcc9c2 Mon Sep 17 00:00:00 2001 From: Anomit Ghosh Date: Wed, 6 Sep 2023 12:45:41 +0530 Subject: [PATCH] Snapshotter identity check (#46) * feat: do not proceed with init script if snapshotter is not active * feat: snapshotter active status check at runtime * build image for testnet_pretask * chore: ping reporting service every 30 secs * feat+chore: won't process further epochs is snapshotters is disabled, reduce default rpc polling interval to 10 seconds * chore: made PROTOCOL_STATE_CONTRACT and PROST_RPC URL mandatory, remove default values --------- Co-authored-by: Akshay Dahiya Co-authored-by: Swaroop Hegde --- .github/workflows/docker-publish.yml | 5 +-- init_processes.sh | 7 ++++ snapshotter/core_api.py | 19 +++++++++-- snapshotter/process_hub_core.py | 5 ++- snapshotter/processor_distributor.py | 21 ++++++++++-- snapshotter/snapshotter_id_ping.py | 43 +++++++++++++++++++++++++ snapshotter/system_event_detector.py | 11 +++++++ snapshotter/utils/models/data_models.py | 5 +++ snapshotter/utils/redis/redis_keys.py | 2 ++ snapshotter_autofill.sh | 27 ++++++++-------- 10 files changed, 124 insertions(+), 21 deletions(-) create mode 100644 snapshotter/snapshotter_id_ping.py diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml index e54300bc..9b5261c8 100644 --- a/.github/workflows/docker-publish.yml +++ b/.github/workflows/docker-publish.yml @@ -9,7 +9,7 @@ on: release: types: ['published'] push: - branches: [ "dockerify" ] + branches: [ "dockerify", "testnet_pretask" ] # Publish semver tags as releases. tags: [ 'v*.*.*' ] pull_request: @@ -25,7 +25,8 @@ env: jobs: build: - runs-on: ubuntu-latest + runs-on: + group: larger-runners permissions: contents: read packages: write diff --git a/init_processes.sh b/init_processes.sh index b22e4231..feadd06e 100755 --- a/init_processes.sh +++ b/init_processes.sh @@ -1,5 +1,12 @@ #!/bin/bash +poetry run python -m snapshotter.snapshotter_id_ping +ret_status=$? + +if [ $ret_status -ne 0 ]; then + echo "Snapshotter identity check failed on protocol smart contract" + exit 1 +fi echo 'starting processes...'; pm2 start pm2.config.js diff --git a/snapshotter/core_api.py b/snapshotter/core_api.py index c5dbf13d..35ab22e5 100644 --- a/snapshotter/core_api.py +++ b/snapshotter/core_api.py @@ -36,6 +36,7 @@ from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping from snapshotter.utils.redis.redis_keys import epoch_process_report_cached_key from snapshotter.utils.redis.redis_keys import project_last_finalized_epoch_key +from snapshotter.utils.redis.redis_keys import active_status_key from snapshotter.utils.rpc import RpcHelper @@ -97,11 +98,23 @@ async def startup_boilerplate(): await app.state.ipfs_singleton.init_sessions() app.state.ipfs_reader_client = app.state.ipfs_singleton._ipfs_read_client -# Health check endpoint that returns 200 OK - +# Health check endpoint @app.get('/health') -async def health_check(): +async def health_check( + request: Request, + response: Response, +): + redis_conn: aioredis.Redis = request.app.state.redis_pool + _ = await redis_conn.get(active_status_key) + if _: + active_status = bool(int(_)) + if not active_status: + response.status_code = 503 + return { + 'status': 'error', + 'message': 'Snapshotter is not active', + } return {'status': 'OK'} # get current epoch diff --git a/snapshotter/process_hub_core.py b/snapshotter/process_hub_core.py index f33d0958..923524d6 100644 --- a/snapshotter/process_hub_core.py +++ b/snapshotter/process_hub_core.py @@ -2,6 +2,7 @@ import json import os import threading +import time from urllib.parse import urljoin import uuid from multiprocessing import Process @@ -65,6 +66,7 @@ def __init__(self, name, **kwargs): keepalive_expiry=300, ), ) + self._last_reporting_service_ping = 0 self._thread_shutdown_event = threading.Event() self._shutdown_initiated = False @@ -263,7 +265,7 @@ def internal_state_reporter(self, redis_conn: redis.Redis = None): name=f'powerloom:snapshotter:{settings.namespace}:{settings.instance_id}:Processes', mapping=proc_id_map, ) - if settings.reporting.service_url: + if settings.reporting.service_url and int(time.time()) - self._last_reporting_service_ping >= 30: try: self._httpx_client.post( url=urljoin(settings.reporting.service_url, '/ping'), @@ -276,6 +278,7 @@ def internal_state_reporter(self, redis_conn: redis.Redis = None): self._logger.error( 'Error while pinging reporting service: {}', e, ) + self._last_reporting_service_ping = int(time.time()) self._logger.error( ( 'Caught thread shutdown notification event. Deleting process' diff --git a/snapshotter/processor_distributor.py b/snapshotter/processor_distributor.py index c7d3974b..b7790088 100644 --- a/snapshotter/processor_distributor.py +++ b/snapshotter/processor_distributor.py @@ -41,6 +41,7 @@ from snapshotter.utils.models.data_models import PreloaderAsyncFutureDetails from snapshotter.utils.models.data_models import SnapshotterStates from snapshotter.utils.models.data_models import SnapshotterStateUpdate +from snapshotter.utils.models.data_models import SnapshottersUpdatedEvent from snapshotter.utils.models.message_models import EpochBase from snapshotter.utils.models.message_models import PayloadCommitFinalizedMessage from snapshotter.utils.models.message_models import PowerloomCalculateAggregateMessage @@ -50,6 +51,7 @@ from snapshotter.utils.models.message_models import PowerloomSnapshotSubmittedMessage from snapshotter.utils.models.settings_model import AggregateOn from snapshotter.utils.redis.redis_conn import RedisPoolCache +from snapshotter.utils.redis.redis_keys import active_status_key from snapshotter.utils.redis.redis_keys import epoch_id_epoch_released_key from snapshotter.utils.redis.redis_keys import epoch_id_project_to_state_mapping from snapshotter.utils.redis.redis_keys import project_finalized_data_zset @@ -662,7 +664,15 @@ async def _on_rabbitmq_message(self, message: IncomingMessage): int(time.time()), ) asyncio.ensure_future(self._cleanup_older_epoch_status(_.epochId)) - await self._epoch_release_processor(message) + + _ = await self._redis_conn.get(active_status_key) + if _: + active_status = bool(int(_)) + if not active_status: + self._logger.error('System is not active, ignoring released Epoch') + else: + await self._epoch_release_processor(message) + elif message_type == 'SnapshotSubmitted': await self._distribute_callbacks_aggregate( message, @@ -674,7 +684,14 @@ async def _on_rabbitmq_message(self, message: IncomingMessage): ) elif message_type == 'ProjectsUpdated': await self._update_all_projects(message) - + elif message_type == 'SnapshottersUpdated': + msg_cast = SnapshottersUpdatedEvent.parse_raw(message.body) + if msg_cast.snapshotterAddress == settings.instance_id: + if self._redis_conn: + await self._redis_conn.set( + active_status_key, + int(msg_cast.allowed), + ) else: self._logger.error( ( diff --git a/snapshotter/snapshotter_id_ping.py b/snapshotter/snapshotter_id_ping.py new file mode 100644 index 00000000..9455a348 --- /dev/null +++ b/snapshotter/snapshotter_id_ping.py @@ -0,0 +1,43 @@ +import asyncio +import sys +from web3 import Web3 +from snapshotter.auth.helpers.redis_conn import RedisPoolCache +from snapshotter.settings.config import settings +from snapshotter.utils.file_utils import read_json_file +from snapshotter.utils.redis.redis_keys import active_status_key +from snapshotter.utils.rpc import RpcHelper + + +async def main(): + aioredis_pool = RedisPoolCache(pool_size=1000) + await aioredis_pool.populate() + redis_conn = aioredis_pool._aioredis_pool + anchor_rpc = RpcHelper(settings.anchor_chain_rpc) + protocol_abi = read_json_file(settings.protocol_state.abi) + protocol_state_contract = anchor_rpc.get_current_node()['web3_client'].eth.contract( + address=Web3.toChecksumAddress( + settings.protocol_state.address, + ), + abi=protocol_abi, + ) + snapshotters_arr_query = await anchor_rpc.web3_call( + [ + protocol_state_contract.functions.getAllSnapshotters(), + ], + redis_conn + ) + allowed_snapshotters = snapshotters_arr_query[0] + if settings.instance_id in allowed_snapshotters: + print('Snapshotting allowed...') + await redis_conn.set( + active_status_key, + int(True) + ) + sys.exit(0) + else: + print('Snapshotting not allowed...') + sys.exit(1) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/snapshotter/system_event_detector.py b/snapshotter/system_event_detector.py index 53e24ab1..fdace168 100644 --- a/snapshotter/system_event_detector.py +++ b/snapshotter/system_event_detector.py @@ -21,6 +21,7 @@ from snapshotter.utils.models.data_models import EventBase from snapshotter.utils.models.data_models import ProjectsUpdatedEvent from snapshotter.utils.models.data_models import SnapshotFinalizedEvent +from snapshotter.utils.models.data_models import SnapshottersUpdatedEvent from snapshotter.utils.rabbitmq_helpers import RabbitmqThreadedSelectLoopInteractor from snapshotter.utils.redis.redis_conn import RedisPoolCache from snapshotter.utils.redis.redis_keys import event_detector_last_processed_block @@ -119,12 +120,15 @@ def __init__(self, name, **kwargs): 'EpochReleased': self.contract.events.EpochReleased._get_event_abi(), 'SnapshotFinalized': self.contract.events.SnapshotFinalized._get_event_abi(), 'ProjectsUpdated': self.contract.events.ProjectsUpdated._get_event_abi(), + 'SnapshottersUpdated': self.contract.events.SnapshottersUpdated._get_event_abi(), } EVENT_SIGS = { 'EpochReleased': 'EpochReleased(uint256,uint256,uint256,uint256)', 'SnapshotFinalized': 'SnapshotFinalized(uint256,uint256,string,string,uint256)', 'ProjectsUpdated': 'ProjectsUpdated(string,bool,uint256)', + 'SnapshottersUpdated': 'SnapshottersUpdated(address,bool)', + } self.event_sig, self.event_abi = get_event_sig_and_abi( @@ -187,6 +191,13 @@ async def get_events(self, from_block: int, to_block: int): timestamp=int(time.time()), ) events.append((log.event, event)) + elif log.event == 'SnapshottersUpdated': + event = SnapshottersUpdatedEvent( + snapshotterAddress=log.args.snapshotterAddress, + allowed=log.args.allowed, + timestamp=int(time.time()), + ) + events.append((log.event, event)) self._logger.info('Events: {}', events) return events diff --git a/snapshotter/utils/models/data_models.py b/snapshotter/utils/models/data_models.py index 39ddda03..dc7c7711 100644 --- a/snapshotter/utils/models/data_models.py +++ b/snapshotter/utils/models/data_models.py @@ -108,6 +108,11 @@ class ProjectsUpdatedEvent(EventBase): enableEpochId: int +class SnapshottersUpdatedEvent(EventBase): + snapshotterAddress: str + allowed: bool + + class SnapshotSubmittedEvent(EventBase): snapshotCid: str epochId: int diff --git a/snapshotter/utils/redis/redis_keys.py b/snapshotter/utils/redis/redis_keys.py index 90bff2ce..27f04ec7 100644 --- a/snapshotter/utils/redis/redis_keys.py +++ b/snapshotter/utils/redis/redis_keys.py @@ -36,6 +36,8 @@ snapshot_submission_window_key = 'snapshotSubmissionWindow' +active_status_key = f'snapshotterActiveStatus:{settings.namespace}' + # project finalzed data zset diff --git a/snapshotter_autofill.sh b/snapshotter_autofill.sh index 5e1e0b92..635e9736 100755 --- a/snapshotter_autofill.sh +++ b/snapshotter_autofill.sh @@ -15,21 +15,24 @@ if [ -z "$SIGNER_ACCOUNT_ADDRESS" ]; then exit 1; fi +if [ -z "$PROST_RPC_URL" ]; then + echo "PROST_RPC_URL not found, please set this in your .env!"; + exit 1; +fi + +if [ -z "$PROTOCOL_STATE_CONTRACT" ]; then + echo "PROTOCOL_STATE_CONTRACT not found, please set this in your .env!"; + exit 1; +fi + echo "Found SOURCE RPC URL ${SOURCE_RPC_URL}"; echo "Found SIGNER ACCOUNT ADDRESS ${SIGNER_ACCOUNT_ADDRESS}"; -if [ "$PROST_RPC_URL" ]; then - echo "Found PROST_RPC_URL ${PROST_RPC_URL}"; -fi - if [ "$IPFS_URL" ]; then echo "Found IPFS_URL ${IPFS_URL}"; fi -if [ "$PROTOCOL_STATE_CONTRACT" ]; then - echo "Found PROTOCOL_STATE_CONTRACT ${PROTOCOL_STATE_CONTRACT}"; -fi if [ "$SLACK_REPORTING_URL" ]; then echo "Found SLACK_REPORTING_URL ${SLACK_REPORTING_URL}"; @@ -46,13 +49,11 @@ cp config/auth_settings.example.json config/auth_settings.json cp config/settings.example.json config/settings.json export namespace=UNISWAPV2 -export prost_rpc_url="${PROST_RPC_URL:-https://rpc-prost1b.powerloom.io}" export ipfs_url="${IPFS_URL:-/dns/ipfs/tcp/5001}" export ipfs_api_key="${IPFS_API_KEY:-}" export ipfs_api_secret="${IPFS_API_SECRET:-}" -export protocol_state_contract="${PROTOCOL_STATE_CONTRACT:-0x102Af943b34FAC403a6ACB8e463f44bE164aa942}" export slack_reporting_url="${SLACK_REPORTING_URL:-}" export powerloom_reporting_url="${POWERLOOM_REPORTING_URL:-}" @@ -65,10 +66,10 @@ if [ -z "$IPFS_URL" ]; then fi echo "Using Namespace: ${namespace}" -echo "Using Prost RPC URL: ${prost_rpc_url}" +echo "Using Prost RPC URL: ${PROST_RPC_URL}" echo "Using IPFS URL: ${ipfs_url}" echo "Using IPFS API KEY: ${ipfs_api_key}" -echo "Using protocol state contract: ${protocol_state_contract}" +echo "Using protocol state contract: ${PROTOCOL_STATE_CONTRACT}" echo "Using slack reporting url: ${slack_reporting_url}" echo "Using powerloom reporting url: ${powerloom_reporting_url}" @@ -78,7 +79,7 @@ sed -i'.backup' "s#account-address#$SIGNER_ACCOUNT_ADDRESS#" config/settings.jso sed -i'.backup' "s#https://rpc-url#$SOURCE_RPC_URL#" config/settings.json -sed -i'.backup' "s#https://prost-rpc-url#$prost_rpc_url#" config/settings.json +sed -i'.backup' "s#https://prost-rpc-url#$PROST_RPC_URL#" config/settings.json sed -i'.backup' "s#ipfs-writer-url#$ipfs_url#" config/settings.json sed -i'.backup' "s#ipfs-writer-key#$ipfs_api_key#" config/settings.json @@ -88,7 +89,7 @@ sed -i'.backup' "s#ipfs-reader-url#$ipfs_url#" config/settings.json sed -i'.backup' "s#ipfs-reader-key#$ipfs_api_key#" config/settings.json sed -i'.backup' "s#ipfs-reader-secret#$ipfs_api_secret#" config/settings.json -sed -i'.backup' "s#protocol-state-contract#$protocol_state_contract#" config/settings.json +sed -i'.backup' "s#protocol-state-contract#$PROTOCOL_STATE_CONTRACT#" config/settings.json sed -i'.backup' "s#https://slack-reporting-url#$slack_reporting_url#" config/settings.json