From 8809f372717bd11a446a78bd77797147a6baa4f1 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:13:50 +0100 Subject: [PATCH] feat: checkpoint syncer fraud flag (#4587) Upon detection of a merkle root mismatch between the local merkle tree (built from indexed events) and the onchain one, reorg diagnostic data is posted to checkpoint storage. This PR focuses on the submitter logic and leaves two things for a follow up PR: - adding logic to not spin up the validator if a fraud flag has been posted - integration testing with S3 and GCP I assume rolling out to validators will happen after the follow up PR, but running with logic from this PR will already mean the reorg diagnostic flag is posted to S3 (though not tested). --- rust/main/Cargo.lock | 2 + rust/main/Cargo.toml | 1 + rust/main/agents/relayer/Cargo.toml | 1 + rust/main/agents/relayer/src/msg/processor.rs | 15 +- rust/main/agents/validator/Cargo.toml | 3 + rust/main/agents/validator/src/submit.rs | 346 +++++++++++++++++- .../main/chains/hyperlane-ethereum/Cargo.toml | 4 + .../src/signer/singleton.rs | 8 + .../src/traits/checkpoint_syncer.rs | 8 +- .../hyperlane-base/src/types/gcs_storage.rs | 15 +- .../hyperlane-base/src/types/local_storage.rs | 23 +- .../hyperlane-base/src/types/s3_storage.rs | 21 +- rust/main/hyperlane-core/src/test_utils.rs | 14 +- rust/main/hyperlane-core/src/types/mod.rs | 2 + rust/main/hyperlane-core/src/types/reorg.rs | 20 + 15 files changed, 461 insertions(+), 22 deletions(-) create mode 100644 rust/main/hyperlane-core/src/types/reorg.rs diff --git a/rust/main/Cargo.lock b/rust/main/Cargo.lock index f58a90d40e..68f4a27323 100644 --- a/rust/main/Cargo.lock +++ b/rust/main/Cargo.lock @@ -10362,6 +10362,7 @@ version = "0.1.0" dependencies = [ "async-trait", "axum", + "chrono", "config", "console-subscriber", "derive-new", @@ -10376,6 +10377,7 @@ dependencies = [ "hyperlane-ethereum", "hyperlane-test", "k256 0.13.3", + "mockall", "prometheus", "reqwest", "serde", diff --git a/rust/main/Cargo.toml b/rust/main/Cargo.toml index 06619dcd48..aab9a72b94 100644 --- a/rust/main/Cargo.toml +++ b/rust/main/Cargo.toml @@ -40,6 +40,7 @@ borsh = "0.9" bs58 = "0.5.0" bytes = "1" clap = "4" +chrono = "*" color-eyre = "0.6" config = "0.13.3" console-subscriber = "0.2.0" diff --git a/rust/main/agents/relayer/Cargo.toml b/rust/main/agents/relayer/Cargo.toml index 3a7f5c6bb0..5a891d912c 100644 --- a/rust/main/agents/relayer/Cargo.toml +++ b/rust/main/agents/relayer/Cargo.toml @@ -58,6 +58,7 @@ mockall.workspace = true tokio-test.workspace = true hyperlane-test = { path = "../../hyperlane-test" } hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] } +hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "async", "test-utils"] } [features] default = ["color-eyre", "oneline-errors"] diff --git a/rust/main/agents/relayer/src/msg/processor.rs b/rust/main/agents/relayer/src/msg/processor.rs index a2f70f8c9c..43bd93c67b 100644 --- a/rust/main/agents/relayer/src/msg/processor.rs +++ b/rust/main/agents/relayer/src/msg/processor.rs @@ -404,8 +404,8 @@ mod test { settings::{ChainConf, ChainConnectionConf, Settings}, }; use hyperlane_core::{ - GasPaymentKey, InterchainGasPayment, InterchainGasPaymentMeta, MerkleTreeInsertion, - PendingOperationStatus, H256, + test_utils::dummy_domain, GasPaymentKey, InterchainGasPayment, InterchainGasPaymentMeta, + MerkleTreeInsertion, PendingOperationStatus, H256, }; use hyperlane_test::mocks::{MockMailboxContract, MockValidatorAnnounceContract}; use prometheus::{IntCounter, Registry}; @@ -537,17 +537,6 @@ mod test { } } - fn dummy_domain(domain_id: u32, name: &str) -> HyperlaneDomain { - let test_domain = HyperlaneDomain::new_test_domain(name); - HyperlaneDomain::Unknown { - domain_id, - domain_name: name.to_owned(), - domain_type: test_domain.domain_type(), - domain_protocol: test_domain.domain_protocol(), - domain_technical_stack: test_domain.domain_technical_stack(), - } - } - /// Only adds database entries to the pending message prefix if the message's /// retry count is greater than zero fn persist_retried_messages( diff --git a/rust/main/agents/validator/Cargo.toml b/rust/main/agents/validator/Cargo.toml index 7b3f1049c1..7228ad69cb 100644 --- a/rust/main/agents/validator/Cargo.toml +++ b/rust/main/agents/validator/Cargo.toml @@ -11,6 +11,7 @@ version.workspace = true [dependencies] async-trait.workspace = true axum.workspace = true +chrono.workspace = true config.workspace = true console-subscriber.workspace = true derive_more.workspace = true @@ -36,10 +37,12 @@ hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" } hyperlane-cosmos = { path = "../../chains/hyperlane-cosmos" } [dev-dependencies] +mockall.workspace = true tokio-test.workspace = true reqwest.workspace = true hyperlane-test = { path = "../../hyperlane-test" } k256.workspace = true +hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum", features = ["test-utils"] } [features] default = ["color-eyre", "oneline-errors"] diff --git a/rust/main/agents/validator/src/submit.rs b/rust/main/agents/validator/src/submit.rs index 8b3ede08ee..94d5964673 100644 --- a/rust/main/agents/validator/src/submit.rs +++ b/rust/main/agents/validator/src/submit.rs @@ -3,18 +3,18 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::vec; -use hyperlane_base::db::HyperlaneDb; -use hyperlane_core::rpc_clients::call_and_retry_indefinitely; -use hyperlane_core::{ChainResult, MerkleTreeHook}; use prometheus::IntGauge; use tokio::time::sleep; use tracing::{debug, error, info}; +use hyperlane_base::db::HyperlaneDb; use hyperlane_base::{CheckpointSyncer, CoreMetrics}; +use hyperlane_core::rpc_clients::call_and_retry_indefinitely; use hyperlane_core::{ accumulator::incremental::IncrementalMerkle, Checkpoint, CheckpointWithMessageId, HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneSignerExt, }; +use hyperlane_core::{ChainResult, MerkleTreeHook, ReorgEvent}; use hyperlane_ethereum::SingletonSignerHandle; #[derive(Clone)] @@ -206,12 +206,32 @@ impl ValidatorSubmitter { // If the tree's checkpoint doesn't match the correctness checkpoint, something went wrong // and we bail loudly. if checkpoint != *correctness_checkpoint { + let reorg_event = ReorgEvent::new( + tree.root(), + correctness_checkpoint.root, + checkpoint.index, + chrono::Utc::now().timestamp() as u64, + self.reorg_period.map(|x| x.get()).unwrap_or(0), + ); error!( ?checkpoint, ?correctness_checkpoint, + ?reorg_event, "Incorrect tree root, something went wrong" ); - panic!("Incorrect tree root, something went wrong"); + + let mut panic_message = "Incorrect tree root, something went wrong.".to_owned(); + if let Err(e) = self + .checkpoint_syncer + .write_reorg_status(&reorg_event) + .await + { + panic_message.push_str(&format!( + " Reorg troubleshooting details couldn't be written to checkpoint storage: {}", + e + )); + } + panic!("{panic_message}"); } if !checkpoint_queue.is_empty() { @@ -313,3 +333,321 @@ impl ValidatorSubmitterMetrics { } } } + +#[cfg(test)] +mod test { + use super::*; + use async_trait::async_trait; + use eyre::Result; + use hyperlane_base::{ + db::{DbResult, HyperlaneDb, InterchainGasExpenditureData, InterchainGasPaymentData}, + AgentMetadata, + }; + use hyperlane_core::{ + test_utils::dummy_domain, GasPaymentKey, HyperlaneChain, HyperlaneContract, + HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, InterchainGasPayment, + InterchainGasPaymentMeta, MerkleTreeHook, MerkleTreeInsertion, PendingOperationStatus, + ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId, H160, H256, + }; + use prometheus::Registry; + use std::{fmt::Debug, sync::Arc, time::Duration}; + use tokio::sync::mpsc; + + mockall::mock! { + pub Db { + fn provider(&self) -> Box; + } + + impl Debug for Db { + fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result; + } + + impl HyperlaneDb for Db { + fn retrieve_highest_seen_message_nonce(&self) -> DbResult>; + fn retrieve_message_by_nonce(&self, nonce: u32) -> DbResult>; + fn retrieve_processed_by_nonce(&self, nonce: &u32) -> DbResult>; + fn domain(&self) -> &HyperlaneDomain; + fn store_message_id_by_nonce(&self, nonce: &u32, id: &H256) -> DbResult<()>; + fn retrieve_message_id_by_nonce(&self, nonce: &u32) -> DbResult>; + fn store_message_by_id(&self, id: &H256, message: &HyperlaneMessage) -> DbResult<()>; + fn retrieve_message_by_id(&self, id: &H256) -> DbResult>; + fn store_dispatched_block_number_by_nonce( + &self, + nonce: &u32, + block_number: &u64, + ) -> DbResult<()>; + fn retrieve_dispatched_block_number_by_nonce(&self, nonce: &u32) -> DbResult>; + fn store_processed_by_nonce(&self, nonce: &u32, processed: &bool) -> DbResult<()>; + fn store_processed_by_gas_payment_meta( + &self, + meta: &InterchainGasPaymentMeta, + processed: &bool, + ) -> DbResult<()>; + fn retrieve_processed_by_gas_payment_meta( + &self, + meta: &InterchainGasPaymentMeta, + ) -> DbResult>; + fn store_interchain_gas_expenditure_data_by_message_id( + &self, + message_id: &H256, + data: &InterchainGasExpenditureData, + ) -> DbResult<()>; + fn retrieve_interchain_gas_expenditure_data_by_message_id( + &self, + message_id: &H256, + ) -> DbResult>; + fn store_status_by_message_id( + &self, + message_id: &H256, + status: &PendingOperationStatus, + ) -> DbResult<()>; + fn retrieve_status_by_message_id( + &self, + message_id: &H256, + ) -> DbResult>; + fn store_interchain_gas_payment_data_by_gas_payment_key( + &self, + key: &GasPaymentKey, + data: &InterchainGasPaymentData, + ) -> DbResult<()>; + fn retrieve_interchain_gas_payment_data_by_gas_payment_key( + &self, + key: &GasPaymentKey, + ) -> DbResult>; + fn store_gas_payment_by_sequence( + &self, + sequence: &u32, + payment: &InterchainGasPayment, + ) -> DbResult<()>; + fn retrieve_gas_payment_by_sequence( + &self, + sequence: &u32, + ) -> DbResult>; + fn store_gas_payment_block_by_sequence( + &self, + sequence: &u32, + block_number: &u64, + ) -> DbResult<()>; + fn retrieve_gas_payment_block_by_sequence(&self, sequence: &u32) -> DbResult>; + fn store_pending_message_retry_count_by_message_id( + &self, + message_id: &H256, + count: &u32, + ) -> DbResult<()>; + fn retrieve_pending_message_retry_count_by_message_id( + &self, + message_id: &H256, + ) -> DbResult>; + fn store_merkle_tree_insertion_by_leaf_index( + &self, + leaf_index: &u32, + insertion: &MerkleTreeInsertion, + ) -> DbResult<()>; + fn retrieve_merkle_tree_insertion_by_leaf_index( + &self, + leaf_index: &u32, + ) -> DbResult>; + fn store_merkle_leaf_index_by_message_id( + &self, + message_id: &H256, + leaf_index: &u32, + ) -> DbResult<()>; + fn retrieve_merkle_leaf_index_by_message_id(&self, message_id: &H256) -> DbResult>; + fn store_merkle_tree_insertion_block_number_by_leaf_index( + &self, + leaf_index: &u32, + block_number: &u64, + ) -> DbResult<()>; + fn retrieve_merkle_tree_insertion_block_number_by_leaf_index( + &self, + leaf_index: &u32, + ) -> DbResult>; + fn store_highest_seen_message_nonce_number(&self, nonce: &u32) -> DbResult<()>; + fn retrieve_highest_seen_message_nonce_number(&self) -> DbResult>; + + } + } + + mockall::mock! { + pub MerkleTreeHook {} + + impl Debug for MerkleTreeHook { + fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result; + } + + impl HyperlaneChain for MerkleTreeHook { + fn domain(&self) -> &HyperlaneDomain; + fn provider(&self) -> Box; + } + + impl HyperlaneContract for MerkleTreeHook { + fn address(&self) -> H256; + } + + #[async_trait] + impl MerkleTreeHook for MerkleTreeHook { + async fn tree(&self, lag: Option) -> ChainResult; + async fn count(&self, lag: Option) -> ChainResult; + async fn latest_checkpoint(&self, lag: Option) -> ChainResult; + } + } + + mockall::mock! { + pub CheckpointSyncer {} + + impl Debug for CheckpointSyncer { + fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result; + } + + #[async_trait] + impl CheckpointSyncer for CheckpointSyncer { + async fn latest_index(&self) -> Result>; + async fn write_latest_index(&self, index: u32) -> Result<()>; + async fn update_latest_index(&self, index: u32) -> Result<()>; + async fn fetch_checkpoint(&self, index: u32) -> Result>; + async fn write_checkpoint( + &self, + signed_checkpoint: &SignedCheckpointWithMessageId, + ) -> Result<()>; + async fn write_metadata(&self, metadata: &AgentMetadata) -> Result<()>; + async fn write_announcement(&self, signed_announcement: &SignedAnnouncement) -> Result<()>; + fn announcement_location(&self) -> String; + async fn write_reorg_status(&self, reorg_event: &ReorgEvent) -> Result<()>; + async fn reorg_status(&self) -> Result>; + } + } + + fn dummy_metrics() -> ValidatorSubmitterMetrics { + let origin_domain = dummy_domain(0, "dummy_origin_domain"); + let core_metrics = CoreMetrics::new("dummy_relayer", 37582, Registry::new()).unwrap(); + ValidatorSubmitterMetrics::new(&core_metrics, &origin_domain) + } + + fn dummy_singleton_handle() -> SingletonSignerHandle { + SingletonSignerHandle::new(H160::from_low_u64_be(0), mpsc::unbounded_channel().0) + } + + fn reorg_event_is_correct( + reorg_event: &ReorgEvent, + expected_local_merkle_tree: &IncrementalMerkle, + mock_onchain_merkle_tree: &IncrementalMerkle, + unix_timestamp: u64, + expected_reorg_period: u64, + ) { + assert_eq!( + reorg_event.canonical_merkle_root, + mock_onchain_merkle_tree.root() + ); + assert_eq!( + reorg_event.local_merkle_root, + expected_local_merkle_tree.root() + ); + assert_eq!( + reorg_event.checkpoint_index, + expected_local_merkle_tree.index() + ); + // timestamp diff should be less than 1 second + let timestamp_diff = reorg_event.unix_timestamp as i64 - unix_timestamp as i64; + assert!(timestamp_diff.abs() < 1); + + assert_eq!(reorg_event.reorg_period, expected_reorg_period); + } + + #[tokio::test] + #[should_panic(expected = "Incorrect tree root, something went wrong.")] + async fn reorg_is_detected_and_persisted_to_checkpoint_storage() { + let unix_timestamp = chrono::Utc::now().timestamp() as u64; + let expected_reorg_period = 12; + + let pre_reorg_merke_insertions = vec![ + MerkleTreeInsertion::new(0, H256::random()), + MerkleTreeInsertion::new(1, H256::random()), + MerkleTreeInsertion::new(2, H256::random()), + ]; + let mut expected_local_merkle_tree = IncrementalMerkle::default(); + for insertion in pre_reorg_merke_insertions.iter() { + expected_local_merkle_tree.ingest(insertion.message_id()); + } + + // the last leaf is different post-reorg + let post_reorg_merkle_insertions = vec![ + pre_reorg_merke_insertions[0].clone(), + pre_reorg_merke_insertions[1].clone(), + MerkleTreeInsertion::new(2, H256::random()), + ]; + let mut mock_onchain_merkle_tree = IncrementalMerkle::default(); + for insertion in post_reorg_merkle_insertions.iter() { + mock_onchain_merkle_tree.ingest(insertion.message_id()); + } + + // assert the reorg resulted in different merkle tree roots + assert_ne!( + mock_onchain_merkle_tree.root(), + expected_local_merkle_tree.root() + ); + + // the db returns the pre-reorg merkle tree insertions + let mut db = MockDb::new(); + db.expect_retrieve_merkle_tree_insertion_by_leaf_index() + .returning(move |sequence| { + Ok(Some(pre_reorg_merke_insertions[*sequence as usize].clone())) + }); + + // boilerplate mocks + let mut mock_merkle_tree_hook = MockMerkleTreeHook::new(); + mock_merkle_tree_hook + .expect_address() + .returning(|| H256::from_low_u64_be(0)); + let dummy_domain = dummy_domain(0, "dummy_domain"); + mock_merkle_tree_hook + .expect_domain() + .return_const(dummy_domain.clone()); + + // expect the checkpoint syncer to post the reorg event to the checkpoint storage + // and not submit any checkpoints (this is checked implicitly, by not setting any `expect`s) + let mut mock_checkpoint_syncer = MockCheckpointSyncer::new(); + let mock_onchain_merkle_tree_clone = mock_onchain_merkle_tree.clone(); + mock_checkpoint_syncer + .expect_write_reorg_status() + .once() + .returning(move |reorg_event| { + // unit test correctness criteria + reorg_event_is_correct( + reorg_event, + &expected_local_merkle_tree, + &mock_onchain_merkle_tree_clone, + unix_timestamp, + expected_reorg_period, + ); + Ok(()) + }); + + // instantiate the validator submitter + let validator_submitter = ValidatorSubmitter::new( + Duration::from_secs(1), + expected_reorg_period, + Arc::new(mock_merkle_tree_hook), + dummy_singleton_handle(), + Arc::new(mock_checkpoint_syncer), + Arc::new(db), + dummy_metrics(), + ); + + // mock the correctness checkpoint response + let mock_onchain_checkpoint = Checkpoint { + root: mock_onchain_merkle_tree.root(), + index: mock_onchain_merkle_tree.index(), + merkle_tree_hook_address: H256::from_low_u64_be(0), + mailbox_domain: dummy_domain.id(), + }; + + // Start the submitter with an empty merkle tree, so it gets rebuilt from the db. + // A panic is expected here, as the merkle root inconsistency is a critical error that may indicate fraud. + validator_submitter + .submit_checkpoints_until_correctness_checkpoint( + &mut IncrementalMerkle::default(), + &mock_onchain_checkpoint, + ) + .await; + } +} diff --git a/rust/main/chains/hyperlane-ethereum/Cargo.toml b/rust/main/chains/hyperlane-ethereum/Cargo.toml index c096965c96..07e624e1e9 100644 --- a/rust/main/chains/hyperlane-ethereum/Cargo.toml +++ b/rust/main/chains/hyperlane-ethereum/Cargo.toml @@ -36,3 +36,7 @@ ethers-prometheus = { path = "../../ethers-prometheus", features = ["serde"] } [build-dependencies] abigen = { path = "../../utils/abigen", features = ["ethers"] } hyperlane-core = { path = "../../hyperlane-core", features = ["test-utils"] } + +[features] +default = [] +test-utils = [] \ No newline at end of file diff --git a/rust/main/chains/hyperlane-ethereum/src/signer/singleton.rs b/rust/main/chains/hyperlane-ethereum/src/signer/singleton.rs index 790c8e73a7..7925d1ec9c 100644 --- a/rust/main/chains/hyperlane-ethereum/src/signer/singleton.rs +++ b/rust/main/chains/hyperlane-ethereum/src/signer/singleton.rs @@ -38,6 +38,14 @@ pub struct SingletonSignerHandle { tx: mpsc::UnboundedSender, } +#[cfg(feature = "test-utils")] +impl SingletonSignerHandle { + /// Create a new handle for testing purposes + pub fn new(address: H160, tx: mpsc::UnboundedSender) -> Self { + Self { address, tx } + } +} + impl fmt::Debug for SingletonSignerHandle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("SingletonSignerHandle") diff --git a/rust/main/hyperlane-base/src/traits/checkpoint_syncer.rs b/rust/main/hyperlane-base/src/traits/checkpoint_syncer.rs index 44828bcbea..0dc6a1e6b7 100644 --- a/rust/main/hyperlane-base/src/traits/checkpoint_syncer.rs +++ b/rust/main/hyperlane-base/src/traits/checkpoint_syncer.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use eyre::Result; use crate::AgentMetadata; -use hyperlane_core::{SignedAnnouncement, SignedCheckpointWithMessageId}; +use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId}; /// A generic trait to read/write Checkpoints offchain #[async_trait] @@ -34,4 +34,10 @@ pub trait CheckpointSyncer: Debug + Send + Sync { async fn write_announcement(&self, signed_announcement: &SignedAnnouncement) -> Result<()>; /// Return the announcement storage location for this syncer fn announcement_location(&self) -> String; + /// If a bigger than expected reorg was detected on the validated chain, this flag can be set to inform + /// the validator agent to stop publishing checkpoints. Once any remediation is done, this flag can be reset + /// to resume operation. + async fn write_reorg_status(&self, reorg_event: &ReorgEvent) -> Result<()>; + /// Read the reorg status of the chain being validated + async fn reorg_status(&self) -> Result>; } diff --git a/rust/main/hyperlane-base/src/types/gcs_storage.rs b/rust/main/hyperlane-base/src/types/gcs_storage.rs index e54f653494..ba40ec2ca9 100644 --- a/rust/main/hyperlane-base/src/types/gcs_storage.rs +++ b/rust/main/hyperlane-base/src/types/gcs_storage.rs @@ -2,7 +2,7 @@ use crate::{AgentMetadata, CheckpointSyncer}; use async_trait::async_trait; use derive_new::new; use eyre::{bail, Result}; -use hyperlane_core::{SignedAnnouncement, SignedCheckpointWithMessageId}; +use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId}; use std::fmt; use ya_gcp::{ storage::{ @@ -15,6 +15,7 @@ use ya_gcp::{ const LATEST_INDEX_KEY: &str = "gcsLatestIndexKey"; const METADATA_KEY: &str = "gcsMetadataKey"; const ANNOUNCEMENT_KEY: &str = "gcsAnnouncementKey"; +const REORG_FLAG_KEY: &str = "gcsReorgFlagKey"; /// Path to GCS users_secret file pub const GCS_USER_SECRET: &str = "GCS_USER_SECRET"; /// Path to GCS Service account key @@ -217,6 +218,18 @@ impl CheckpointSyncer for GcsStorageClient { fn announcement_location(&self) -> String { format!("gs://{}/{}", &self.bucket, ANNOUNCEMENT_KEY) } + + async fn write_reorg_status(&self, reorged_event: &ReorgEvent) -> Result<()> { + let serialized_metadata = serde_json::to_string_pretty(reorged_event)?; + self.inner + .insert_object(&self.bucket, REORG_FLAG_KEY, serialized_metadata) + .await?; + Ok(()) + } + + async fn reorg_status(&self) -> Result> { + Ok(None) + } } #[tokio::test] diff --git a/rust/main/hyperlane-base/src/types/local_storage.rs b/rust/main/hyperlane-base/src/types/local_storage.rs index 71047f2464..bb1ebf1239 100644 --- a/rust/main/hyperlane-base/src/types/local_storage.rs +++ b/rust/main/hyperlane-base/src/types/local_storage.rs @@ -4,7 +4,7 @@ use crate::traits::CheckpointSyncer; use crate::AgentMetadata; use async_trait::async_trait; use eyre::{Context, Result}; -use hyperlane_core::{SignedAnnouncement, SignedCheckpointWithMessageId}; +use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId}; use prometheus::IntGauge; #[derive(Debug, Clone)] @@ -41,6 +41,10 @@ impl LocalStorage { self.path.join("announcement.json") } + fn reorg_flag_path(&self) -> PathBuf { + self.path.join("reorg_flag.json") + } + fn metadata_file_path(&self) -> PathBuf { self.path.join("metadata_latest.json") } @@ -116,4 +120,21 @@ impl CheckpointSyncer for LocalStorage { fn announcement_location(&self) -> String { format!("file://{}", self.path.to_str().unwrap()) } + + async fn write_reorg_status(&self, reorged_event: &ReorgEvent) -> Result<()> { + let serialized_reorg = serde_json::to_string_pretty(reorged_event)?; + let path = self.reorg_flag_path(); + tokio::fs::write(&path, &serialized_reorg) + .await + .with_context(|| format!("Writing reorg status to {path:?}"))?; + Ok(()) + } + + async fn reorg_status(&self) -> Result> { + let Ok(data) = tokio::fs::read(self.reorg_flag_path()).await else { + return Ok(None); + }; + let reorg = serde_json::from_slice(&data)?; + Ok(Some(reorg)) + } } diff --git a/rust/main/hyperlane-base/src/types/s3_storage.rs b/rust/main/hyperlane-base/src/types/s3_storage.rs index 4d03ff0406..ea04a1c4f9 100644 --- a/rust/main/hyperlane-base/src/types/s3_storage.rs +++ b/rust/main/hyperlane-base/src/types/s3_storage.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use derive_new::new; use eyre::{bail, Result}; use futures_util::TryStreamExt; -use hyperlane_core::{SignedAnnouncement, SignedCheckpointWithMessageId}; +use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId}; use prometheus::IntGauge; use rusoto_core::{ credential::{Anonymous, AwsCredentials, StaticProvider}, @@ -145,6 +145,10 @@ impl S3Storage { fn announcement_key() -> String { "announcement.json".to_owned() } + + fn reorg_flag_key() -> String { + "reorg_flag.json".to_owned() + } } #[async_trait] @@ -216,4 +220,19 @@ impl CheckpointSyncer for S3Storage { } } } + + async fn write_reorg_status(&self, reorged_event: &ReorgEvent) -> Result<()> { + let serialized_reorg = serde_json::to_string(reorged_event)?; + self.write_to_bucket(S3Storage::reorg_flag_key(), &serialized_reorg) + .await?; + Ok(()) + } + + async fn reorg_status(&self) -> Result> { + self.anonymously_read_from_bucket(S3Storage::reorg_flag_key()) + .await? + .map(|data| serde_json::from_slice(&data)) + .transpose() + .map_err(Into::into) + } } diff --git a/rust/main/hyperlane-core/src/test_utils.rs b/rust/main/hyperlane-core/src/test_utils.rs index c24d704191..2425706805 100644 --- a/rust/main/hyperlane-core/src/test_utils.rs +++ b/rust/main/hyperlane-core/src/test_utils.rs @@ -3,7 +3,7 @@ use std::io::Read; use std::path::PathBuf; use crate::accumulator::merkle::Proof; -use crate::H256; +use crate::{HyperlaneDomain, H256}; /// Struct representing a single merkle test case #[derive(serde::Deserialize, serde::Serialize)] @@ -38,3 +38,15 @@ pub fn find_vector(final_component: &str) -> PathBuf { git_dir.join("vectors").join(final_component) } + +/// Create a dummy domain for testing purposes +pub fn dummy_domain(domain_id: u32, name: &str) -> HyperlaneDomain { + let test_domain = HyperlaneDomain::new_test_domain(name); + HyperlaneDomain::Unknown { + domain_id, + domain_name: name.to_owned(), + domain_type: test_domain.domain_type(), + domain_protocol: test_domain.domain_protocol(), + domain_technical_stack: test_domain.domain_technical_stack(), + } +} diff --git a/rust/main/hyperlane-core/src/types/mod.rs b/rust/main/hyperlane-core/src/types/mod.rs index 8987d9a861..f9d78b05a4 100644 --- a/rust/main/hyperlane-core/src/types/mod.rs +++ b/rust/main/hyperlane-core/src/types/mod.rs @@ -13,6 +13,7 @@ pub use indexing::*; pub use log_metadata::*; pub use merkle_tree::*; pub use message::*; +pub use reorg::*; pub use transaction::*; use crate::{Decode, Encode, HyperlaneProtocolError}; @@ -24,6 +25,7 @@ mod indexing; mod log_metadata; mod merkle_tree; mod message; +mod reorg; mod serialize; mod transaction; diff --git a/rust/main/hyperlane-core/src/types/reorg.rs b/rust/main/hyperlane-core/src/types/reorg.rs new file mode 100644 index 0000000000..00a8dedc12 --- /dev/null +++ b/rust/main/hyperlane-core/src/types/reorg.rs @@ -0,0 +1,20 @@ +use derive_new::new; +use serde::{Deserialize, Serialize}; + +use crate::H256; + +/// Details about a detected chain reorg, from an agent's perspective +#[derive(Debug, Clone, Serialize, Deserialize, new)] +pub struct ReorgEvent { + /// the merkle root built from this agent's indexed events + pub local_merkle_root: H256, + /// the onchain merkle root + pub canonical_merkle_root: H256, + /// the index of the checkpoint when the reorg was detected + /// (due to a mismatch between local and canonical merkle roots) + pub checkpoint_index: u32, + /// the timestamp when the reorg was detected, in seconds since the Unix epoch + pub unix_timestamp: u64, + /// the reorg period configured for the agent, in blocks + pub reorg_period: u64, +}