diff --git a/crates/sui-bridge/src/eth_syncer.rs b/crates/sui-bridge/src/eth_syncer.rs index 38e02e85eb8a0..e716f3746cabc 100644 --- a/crates/sui-bridge/src/eth_syncer.rs +++ b/crates/sui-bridge/src/eth_syncer.rs @@ -15,6 +15,8 @@ use std::sync::Arc; use tokio::sync::watch; use tokio::task::JoinHandle; use tokio::time::{self, Duration}; +use tokio_retry::strategy::{jitter, ExponentialBackoff}; +use tokio_retry::Retry; const ETH_EVENTS_CHANNEL_SIZE: usize = 1000; const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(2); @@ -138,29 +140,13 @@ where .send(events) .await .expect("All Eth event channel receivers are closed"); - tracing::info!( - contract_address=?contract_address, - "Observed {len} new events", - ); + tracing::info!(?contract_address, "Observed {len} new Eth events",); } start_block = new_finalized_block + 1; } } } -use tokio_retry::strategy::{jitter, ExponentialBackoff}; -use tokio_retry::Retry; - -#[macro_export] -macro_rules! retry_with_max_delay { - ($func:expr, $max_delay:expr) => {{ - let retry_strategy = ExponentialBackoff::from_millis(100) - .max_delay($max_delay) - .map(jitter); - Retry::spawn(retry_strategy, || $func).await - }}; -} - #[cfg(test)] mod tests { use std::{collections::HashSet, str::FromStr}; diff --git a/crates/sui-bridge/src/lib.rs b/crates/sui-bridge/src/lib.rs index 38f6323863a06..d0027134e7a97 100644 --- a/crates/sui-bridge/src/lib.rs +++ b/crates/sui-bridge/src/lib.rs @@ -9,9 +9,20 @@ pub mod events; pub mod handler; pub mod server; pub mod sui_client; +pub mod sui_syncer; #[cfg(test)] pub(crate) mod eth_mock_provider; #[cfg(test)] pub(crate) mod sui_mock_client; + +#[macro_export] +macro_rules! retry_with_max_delay { + ($func:expr, $max_delay:expr) => {{ + let retry_strategy = ExponentialBackoff::from_millis(100) + .max_delay($max_delay) + .map(jitter); + Retry::spawn(retry_strategy, || $func).await + }}; +} diff --git a/crates/sui-bridge/src/sui_client.rs b/crates/sui-bridge/src/sui_client.rs index 774b845effcab..f1224abf885c2 100644 --- a/crates/sui-bridge/src/sui_client.rs +++ b/crates/sui-bridge/src/sui_client.rs @@ -24,7 +24,7 @@ use tap::TapFallible; use crate::error::{BridgeError, BridgeResult}; use crate::events::SuiBridgeEvent; -pub(crate) struct SuiClient

{ +pub struct SuiClient

{ inner: P, } @@ -84,10 +84,7 @@ where let mut is_first_page = true; let mut all_events: Vec = vec![]; loop { - let events = self - .inner - .query_events(filter.clone(), cursor.clone()) - .await?; + let events = self.inner.query_events(filter.clone(), cursor).await?; if events.data.is_empty() { return Ok(Page { data: all_events, @@ -97,7 +94,7 @@ where } // unwrap safe: we just checked data is not empty - let new_cursor = events.data.last().unwrap().id.clone(); + let new_cursor = events.data.last().unwrap().id; // Now check if we need to query more events for the sake of // paginating in transaction granularity @@ -328,7 +325,7 @@ mod tests { let event_1 = SuiEvent::random_for_testing(); let events_page_1 = EventPage { data: vec![event_1.clone()], - next_cursor: Some(event_1.id.clone()), + next_cursor: Some(event_1.id), has_next_page: true, }; mock_client.add_event_response( @@ -346,10 +343,10 @@ mod tests { event_2.id.event_seq = event_1.id.event_seq + 1; let events_page_2 = EventPage { data: vec![event_2.clone()], - next_cursor: Some(event_2.id.clone()), + next_cursor: Some(event_2.id), has_next_page: true, }; - mock_client.add_event_response(package, module.clone(), event_1.id.clone(), events_page_2); + mock_client.add_event_response(package, module.clone(), event_1.id, events_page_2); // page 3 (event 3, event 4, different tx_digest) let mut event_3 = SuiEvent::random_for_testing(); event_3.id.tx_digest = event_2.id.tx_digest; @@ -358,10 +355,10 @@ mod tests { assert_ne!(event_3.id.tx_digest, event_4.id.tx_digest); let events_page_3 = EventPage { data: vec![event_3.clone(), event_4.clone()], - next_cursor: Some(event_4.id.clone()), + next_cursor: Some(event_4.id), has_next_page: true, }; - mock_client.add_event_response(package, module.clone(), event_2.id.clone(), events_page_3); + mock_client.add_event_response(package, module.clone(), event_2.id, events_page_3); let page: Page = sui_client .query_events_by_module(package, module.clone(), cursor) .await @@ -390,12 +387,12 @@ mod tests { // second page assert_eq!( mock_client.pop_front_past_event_query_params().unwrap(), - (package, module.clone(), event_1.id.clone()) + (package, module.clone(), event_1.id) ); // third page assert_eq!( mock_client.pop_front_past_event_query_params().unwrap(), - (package, module.clone(), event_2.id.clone()) + (package, module.clone(), event_2.id) ); // no more assert_eq!(mock_client.pop_front_past_event_query_params(), None); @@ -403,10 +400,10 @@ mod tests { // Case 4, modify page 3 in case 3 to return event_4 only let events_page_3 = EventPage { data: vec![event_4.clone()], - next_cursor: Some(event_4.id.clone()), + next_cursor: Some(event_4.id), has_next_page: true, }; - mock_client.add_event_response(package, module.clone(), event_2.id.clone(), events_page_3); + mock_client.add_event_response(package, module.clone(), event_2.id, events_page_3); let page: Page = sui_client .query_events_by_module(package, module.clone(), cursor) .await @@ -434,12 +431,12 @@ mod tests { // second page assert_eq!( mock_client.pop_front_past_event_query_params().unwrap(), - (package, module.clone(), event_1.id.clone()) + (package, module.clone(), event_1.id) ); // third page assert_eq!( mock_client.pop_front_past_event_query_params().unwrap(), - (package, module.clone(), event_2.id.clone()) + (package, module.clone(), event_2.id) ); // no more assert_eq!(mock_client.pop_front_past_event_query_params(), None); @@ -447,10 +444,10 @@ mod tests { // Case 5, modify page 2 in case 3 to mark has_next_page as false let events_page_2 = EventPage { data: vec![event_2.clone()], - next_cursor: Some(event_2.id.clone()), + next_cursor: Some(event_2.id), has_next_page: false, }; - mock_client.add_event_response(package, module.clone(), event_1.id.clone(), events_page_2); + mock_client.add_event_response(package, module.clone(), event_1.id, events_page_2); let page: Page = sui_client .query_events_by_module(package, module.clone(), cursor) .await @@ -478,7 +475,7 @@ mod tests { // second page assert_eq!( mock_client.pop_front_past_event_query_params().unwrap(), - (package, module.clone(), event_1.id.clone()) + (package, module.clone(), event_1.id) ); // no more assert_eq!(mock_client.pop_front_past_event_query_params(), None); diff --git a/crates/sui-bridge/src/sui_mock_client.rs b/crates/sui-bridge/src/sui_mock_client.rs index a3b6b726af197..48edd2705dff4 100644 --- a/crates/sui-bridge/src/sui_mock_client.rs +++ b/crates/sui-bridge/src/sui_mock_client.rs @@ -78,10 +78,10 @@ impl SuiClientInner for SuiMockClient { self.past_event_query_params.lock().unwrap().push_back(( package, module.clone(), - cursor.clone(), + cursor, )); Ok(events - .get(&(package, module.clone(), cursor.clone())) + .get(&(package, module.clone(), cursor)) .cloned() .unwrap_or_else(|| { panic!( diff --git a/crates/sui-bridge/src/sui_syncer.rs b/crates/sui-bridge/src/sui_syncer.rs index 2ab3a9f636a76..c49778170c986 100644 --- a/crates/sui-bridge/src/sui_syncer.rs +++ b/crates/sui-bridge/src/sui_syncer.rs @@ -1,14 +1,237 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -//! The Suisyncer module is responsible for synchronizing Events emitted on Sui blockchain from -//! concerned bridge packages. Each package is associated with a cursor, and the syncer will -//! only query from that cursor onwards. It's likely that SuiSyncer only tracks one package. +//! The SuiSyncer module is responsible for synchronizing Events emitted on Sui blockchain from +//! concerned bridge packages. +use crate::{ + error::BridgeResult, + retry_with_max_delay, + sui_client::{SuiClient, SuiClientInner}, +}; +use mysten_metrics::spawn_logged_monitored_task; +use std::{collections::HashMap, sync::Arc}; +use sui_json_rpc_types::SuiEvent; +use sui_types::{ + base_types::ObjectID, digests::TransactionDigest, Identifier, SUI_SYSTEM_PACKAGE_ID, +}; +use tokio::{ + task::JoinHandle, + time::{self, Duration}, +}; +use tokio_retry::strategy::{jitter, ExponentialBackoff}; +use tokio_retry::Retry; +// TODO: use the right package id +const PACKAGE_ID: ObjectID = SUI_SYSTEM_PACKAGE_ID; +const SUI_EVENTS_CHANNEL_SIZE: usize = 1000; +/// Map from contract address to their start block. +pub type SuiTargetModules = HashMap; -pub struct EthSyncer

{ - eth_client: Arc, - contract_addresses: EthTargetAddresses, +pub struct SuiSyncer { + sui_client: Arc>, + // The last transaction that the syncer has fully processed. + // Syncer will resume post this transaction (i.e. exclusive), when it starts. + cursors: SuiTargetModules, +} + +impl SuiSyncer +where + C: SuiClientInner + 'static, +{ + pub fn new(sui_client: Arc>, cursors: SuiTargetModules) -> Self { + Self { + sui_client, + cursors, + } + } + + pub async fn run( + self, + query_interval: Duration, + ) -> BridgeResult<( + Vec>, + mysten_metrics::metered_channel::Receiver>, + )> { + let (events_tx, events_rx) = mysten_metrics::metered_channel::channel( + SUI_EVENTS_CHANNEL_SIZE, + &mysten_metrics::get_metrics() + .unwrap() + .channels + .with_label_values(&["sui_events_queue"]), + ); + + let mut task_handles = vec![]; + for (module, cursor) in self.cursors { + let events_rx_clone = events_tx.clone(); + let sui_client_clone = self.sui_client.clone(); + task_handles.push(spawn_logged_monitored_task!( + Self::run_event_listening_task( + module, + cursor, + events_rx_clone, + sui_client_clone, + query_interval + ) + )); + } + Ok((task_handles, events_rx)) + } + + async fn run_event_listening_task( + // The module where interested events are defined. + // Moudle is always of bridge package 0x9. + module: Identifier, + mut next_cursor: TransactionDigest, + events_sender: mysten_metrics::metered_channel::Sender>, + sui_client: Arc>, + query_interval: Duration, + ) { + tracing::info!( + ?module, + ?next_cursor, + "Starting sui events listening task from tx_digest {next_cursor}" + ); + let mut interval = time::interval(query_interval); + interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); + loop { + interval.tick().await; + let events = retry_with_max_delay!( + sui_client.query_events_by_module(PACKAGE_ID, module.clone(), next_cursor), + Duration::from_secs(10) + ) + .expect("Failed to query events from sui client after retry"); + + let len = events.data.len(); + if len != 0 { + events_sender + .send(events.data) + .await + .expect("All Sui event channel receivers are closed"); + // Unwrap: `query_events_by_module` always returns Some `next_cursor` + // If the events list is empty, `next_cursor` will be the same as `start_tx_digest` + next_cursor = events.next_cursor.unwrap(); + tracing::info!(?module, ?next_cursor, "Observed {len} new Sui events"); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::{sui_client::SuiClient, sui_mock_client::SuiMockClient}; + use prometheus::Registry; + use sui_json_rpc_types::EventPage; + use sui_types::{digests::TransactionDigest, event::EventID, Identifier}; + use tokio::time::timeout; + + #[tokio::test] + async fn test_sui_syncer_basic() -> anyhow::Result<()> { + telemetry_subscribers::init_for_testing(); + let registry = Registry::new(); + mysten_metrics::init_metrics(®istry); + + let mock = SuiMockClient::default(); + let client = Arc::new(SuiClient::new_for_testing(mock.clone())); + let module_foo = Identifier::new("Foo").unwrap(); + let module_bar = Identifier::new("Bar").unwrap(); + let empty_events = EventPage::empty(); + let cursor = TransactionDigest::random(); + add_event_response(&mock, module_foo.clone(), cursor, empty_events.clone()); + add_event_response(&mock, module_bar.clone(), cursor, empty_events.clone()); + + let target_modules = HashMap::from_iter(vec![ + (module_foo.clone(), cursor), + (module_bar.clone(), cursor), + ]); + let interval = Duration::from_millis(200); + let (_handles, mut events_rx) = SuiSyncer::new(client, target_modules) + .run(interval) + .await + .unwrap(); + + // Initially there are no events + assert_no_more_events(interval, &mut events_rx).await; + + // Module Foo has new events + let event_1: SuiEvent = SuiEvent::random_for_testing(); + let module_foo_events_1: sui_json_rpc_types::Page = EventPage { + data: vec![event_1.clone(), event_1.clone()], + next_cursor: None, + has_next_page: false, + }; + add_event_response( + &mock, + module_foo.clone(), + event_1.id.tx_digest, + empty_events.clone(), + ); + add_event_response( + &mock, + module_foo.clone(), + cursor, + module_foo_events_1.clone(), + ); + + let received_events = events_rx.recv().await.unwrap(); + assert_eq!(received_events.len(), 2); + assert_eq!(received_events[0].id, event_1.id); + assert_eq!(received_events[1].id, event_1.id); + // No more + assert_no_more_events(interval, &mut events_rx).await; + + // Module Bar has new events + let event_2: SuiEvent = SuiEvent::random_for_testing(); + let module_bar_events_1 = EventPage { + data: vec![event_2.clone()], + next_cursor: None, + has_next_page: false, + }; + add_event_response( + &mock, + module_bar.clone(), + event_2.id.tx_digest, + empty_events.clone(), + ); + + add_event_response(&mock, module_bar.clone(), cursor, module_bar_events_1); + + let received_events = events_rx.recv().await.unwrap(); + assert_eq!(received_events.len(), 1); + assert_eq!(received_events[0].id, event_2.id); + // No more + assert_no_more_events(interval, &mut events_rx).await; + + Ok(()) + } + + async fn assert_no_more_events( + interval: Duration, + events_rx: &mut mysten_metrics::metered_channel::Receiver>, + ) { + match timeout(interval * 2, events_rx.recv()).await { + Err(_e) => (), + other => panic!("Should have timed out, but got: {:?}", other), + }; + } + + fn add_event_response( + mock: &SuiMockClient, + module: Identifier, + cursor: TransactionDigest, + events: EventPage, + ) { + mock.add_event_response( + PACKAGE_ID, + module.clone(), + EventID { + tx_digest: cursor, + event_seq: u16::MAX as u64, + }, + events.clone(), + ); + } } diff --git a/crates/sui-indexer/src/apis/indexer_api_v2.rs b/crates/sui-indexer/src/apis/indexer_api_v2.rs index 01138e8a87302..cacc65b1e1716 100644 --- a/crates/sui-indexer/src/apis/indexer_api_v2.rs +++ b/crates/sui-indexer/src/apis/indexer_api_v2.rs @@ -190,7 +190,7 @@ impl IndexerApiServer for IndexerApiV2 { let has_next_page = results.len() > limit; results.truncate(limit); - let next_cursor = results.last().map(|o| o.id.clone()); + let next_cursor = results.last().map(|o| o.id); Ok(Page { data: results, next_cursor, diff --git a/crates/sui-indexer/src/indexer_reader.rs b/crates/sui-indexer/src/indexer_reader.rs index 173d311a92a8a..9e753f1c1ef75 100644 --- a/crates/sui-indexer/src/indexer_reader.rs +++ b/crates/sui-indexer/src/indexer_reader.rs @@ -1052,7 +1052,7 @@ impl IndexerReader { limit: usize, descending_order: bool, ) -> IndexerResult> { - let (tx_seq, event_seq) = if let Some(cursor) = cursor.clone() { + let (tx_seq, event_seq) = if let Some(cursor) = cursor { let EventID { tx_digest, event_seq, diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 36ff4d8192840..ae8f1ddc15552 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -463,7 +463,7 @@ impl PgIndexerStore { page_limit -= 1; let has_next_page = sui_event_vec.len() > page_limit; sui_event_vec.truncate(page_limit); - let next_cursor = sui_event_vec.last().map(|e| e.id.clone()); + let next_cursor = sui_event_vec.last().map(|e| e.id); Ok(EventPage { data: sui_event_vec, next_cursor, diff --git a/crates/sui-json-rpc/src/indexer_api.rs b/crates/sui-json-rpc/src/indexer_api.rs index 655ef7a85ea5e..9a600025b03d6 100644 --- a/crates/sui-json-rpc/src/indexer_api.rs +++ b/crates/sui-json-rpc/src/indexer_api.rs @@ -257,7 +257,7 @@ impl IndexerApiServer for IndexerApi { .query_events( &self.transaction_kv_store, query, - cursor.clone(), + cursor, limit + 1, descending, ) @@ -265,7 +265,7 @@ impl IndexerApiServer for IndexerApi { .map_err(Error::from)?; let has_next_page = data.len() > limit; data.truncate(limit); - let next_cursor = data.last().map_or(cursor, |e| Some(e.id.clone())); + let next_cursor = data.last().map_or(cursor, |e| Some(e.id)); self.metrics .query_events_result_size .report(data.len() as u64); diff --git a/crates/sui-types/src/event.rs b/crates/sui-types/src/event.rs index 35af980a9ba03..b505442d55bc5 100755 --- a/crates/sui-types/src/event.rs +++ b/crates/sui-types/src/event.rs @@ -41,7 +41,7 @@ pub struct EventEnvelope { /// Unique ID of a Sui Event, the ID is a combination of tx seq number and event seq number, /// the ID is local to this particular fullnode and will be different from other fullnode. #[serde_as] -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, Hash)] #[serde(rename_all = "camelCase")] pub struct EventID { pub tx_digest: TransactionDigest,