Skip to content

Commit

Permalink
[bridge 4/n] SuiSyncer (#15201)
Browse files Browse the repository at this point in the history
## Description 

Implements `SuiSyncer` that periodically pulls from Sui to fetch Bridge
related events.

## Test Plan 

How did you test the new or updated feature?

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
Implements `SuiSyncer` that periodically pulls from Sui to fetch Bridge
related events.
  • Loading branch information
longbowlu authored and gdanezis committed Dec 15, 2023
1 parent aaaa4c4 commit fd1eaf9
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 51 deletions.
25 changes: 8 additions & 17 deletions crates/sui-bridge/src/eth_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use std::sync::Arc;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::{self, Duration};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(2);
Expand Down Expand Up @@ -88,12 +90,16 @@ where
interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
// TODO reconsider panic, should we just log an error and continue the loop?
let new_value = retry_with_max_delay!(
eth_client.get_last_finalized_block_id(),
Duration::from_secs(600)
)
.expect("Failed to get last finalzied block from eth client after retry");
tracing::debug!("Last finalized block: {}", new_value);

// TODO add a metrics for the last finalized block

if new_value > last_block_number {
last_finalized_block_sender
.send(new_value)
Expand Down Expand Up @@ -127,6 +133,7 @@ where
);
continue;
}
// TODO reconsider panic, should we just log an error and continue the loop?
let events = retry_with_max_delay!(
eth_client.get_events_in_range(contract_address, start_block, new_finalized_block),
Duration::from_secs(600)
Expand All @@ -138,29 +145,13 @@ where
.send(events)
.await
.expect("All Eth event channel receivers are closed");
tracing::info!(
contract_address=?contract_address,
"Observed {len} new events",
);
tracing::info!(?contract_address, "Observed {len} new Eth events",);
}
start_block = new_finalized_block + 1;
}
}
}

use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

#[macro_export]
macro_rules! retry_with_max_delay {
($func:expr, $max_delay:expr) => {{
let retry_strategy = ExponentialBackoff::from_millis(100)
.max_delay($max_delay)
.map(jitter);
Retry::spawn(retry_strategy, || $func).await
}};
}

#[cfg(test)]
mod tests {
use std::{collections::HashSet, str::FromStr};
Expand Down
11 changes: 11 additions & 0 deletions crates/sui-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,20 @@ pub mod events;
pub mod handler;
pub mod server;
pub mod sui_client;
pub mod sui_syncer;

#[cfg(test)]
pub(crate) mod eth_mock_provider;

#[cfg(test)]
pub(crate) mod sui_mock_client;

#[macro_export]
macro_rules! retry_with_max_delay {
($func:expr, $max_delay:expr) => {{
let retry_strategy = ExponentialBackoff::from_millis(100)
.max_delay($max_delay)
.map(jitter);
Retry::spawn(retry_strategy, || $func).await
}};
}
37 changes: 17 additions & 20 deletions crates/sui-bridge/src/sui_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tap::TapFallible;
use crate::error::{BridgeError, BridgeResult};
use crate::events::SuiBridgeEvent;

pub(crate) struct SuiClient<P> {
pub struct SuiClient<P> {
inner: P,
}

Expand Down Expand Up @@ -84,10 +84,7 @@ where
let mut is_first_page = true;
let mut all_events: Vec<sui_json_rpc_types::SuiEvent> = vec![];
loop {
let events = self
.inner
.query_events(filter.clone(), cursor.clone())
.await?;
let events = self.inner.query_events(filter.clone(), cursor).await?;
if events.data.is_empty() {
return Ok(Page {
data: all_events,
Expand All @@ -97,7 +94,7 @@ where
}

// unwrap safe: we just checked data is not empty
let new_cursor = events.data.last().unwrap().id.clone();
let new_cursor = events.data.last().unwrap().id;

// Now check if we need to query more events for the sake of
// paginating in transaction granularity
Expand Down Expand Up @@ -328,7 +325,7 @@ mod tests {
let event_1 = SuiEvent::random_for_testing();
let events_page_1 = EventPage {
data: vec![event_1.clone()],
next_cursor: Some(event_1.id.clone()),
next_cursor: Some(event_1.id),
has_next_page: true,
};
mock_client.add_event_response(
Expand All @@ -346,10 +343,10 @@ mod tests {
event_2.id.event_seq = event_1.id.event_seq + 1;
let events_page_2 = EventPage {
data: vec![event_2.clone()],
next_cursor: Some(event_2.id.clone()),
next_cursor: Some(event_2.id),
has_next_page: true,
};
mock_client.add_event_response(package, module.clone(), event_1.id.clone(), events_page_2);
mock_client.add_event_response(package, module.clone(), event_1.id, events_page_2);
// page 3 (event 3, event 4, different tx_digest)
let mut event_3 = SuiEvent::random_for_testing();
event_3.id.tx_digest = event_2.id.tx_digest;
Expand All @@ -358,10 +355,10 @@ mod tests {
assert_ne!(event_3.id.tx_digest, event_4.id.tx_digest);
let events_page_3 = EventPage {
data: vec![event_3.clone(), event_4.clone()],
next_cursor: Some(event_4.id.clone()),
next_cursor: Some(event_4.id),
has_next_page: true,
};
mock_client.add_event_response(package, module.clone(), event_2.id.clone(), events_page_3);
mock_client.add_event_response(package, module.clone(), event_2.id, events_page_3);
let page: Page<SuiEvent, TransactionDigest> = sui_client
.query_events_by_module(package, module.clone(), cursor)
.await
Expand Down Expand Up @@ -390,23 +387,23 @@ mod tests {
// second page
assert_eq!(
mock_client.pop_front_past_event_query_params().unwrap(),
(package, module.clone(), event_1.id.clone())
(package, module.clone(), event_1.id)
);
// third page
assert_eq!(
mock_client.pop_front_past_event_query_params().unwrap(),
(package, module.clone(), event_2.id.clone())
(package, module.clone(), event_2.id)
);
// no more
assert_eq!(mock_client.pop_front_past_event_query_params(), None);

// Case 4, modify page 3 in case 3 to return event_4 only
let events_page_3 = EventPage {
data: vec![event_4.clone()],
next_cursor: Some(event_4.id.clone()),
next_cursor: Some(event_4.id),
has_next_page: true,
};
mock_client.add_event_response(package, module.clone(), event_2.id.clone(), events_page_3);
mock_client.add_event_response(package, module.clone(), event_2.id, events_page_3);
let page: Page<SuiEvent, TransactionDigest> = sui_client
.query_events_by_module(package, module.clone(), cursor)
.await
Expand Down Expand Up @@ -434,23 +431,23 @@ mod tests {
// second page
assert_eq!(
mock_client.pop_front_past_event_query_params().unwrap(),
(package, module.clone(), event_1.id.clone())
(package, module.clone(), event_1.id)
);
// third page
assert_eq!(
mock_client.pop_front_past_event_query_params().unwrap(),
(package, module.clone(), event_2.id.clone())
(package, module.clone(), event_2.id)
);
// no more
assert_eq!(mock_client.pop_front_past_event_query_params(), None);

// Case 5, modify page 2 in case 3 to mark has_next_page as false
let events_page_2 = EventPage {
data: vec![event_2.clone()],
next_cursor: Some(event_2.id.clone()),
next_cursor: Some(event_2.id),
has_next_page: false,
};
mock_client.add_event_response(package, module.clone(), event_1.id.clone(), events_page_2);
mock_client.add_event_response(package, module.clone(), event_1.id, events_page_2);
let page: Page<SuiEvent, TransactionDigest> = sui_client
.query_events_by_module(package, module.clone(), cursor)
.await
Expand Down Expand Up @@ -478,7 +475,7 @@ mod tests {
// second page
assert_eq!(
mock_client.pop_front_past_event_query_params().unwrap(),
(package, module.clone(), event_1.id.clone())
(package, module.clone(), event_1.id)
);
// no more
assert_eq!(mock_client.pop_front_past_event_query_params(), None);
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-bridge/src/sui_mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ impl SuiClientInner for SuiMockClient {
self.past_event_query_params.lock().unwrap().push_back((
package,
module.clone(),
cursor.clone(),
cursor,
));
Ok(events
.get(&(package, module.clone(), cursor.clone()))
.get(&(package, module.clone(), cursor))
.cloned()
.unwrap_or_else(|| {
panic!(
Expand Down
Loading

0 comments on commit fd1eaf9

Please sign in to comment.