Skip to content

Commit

Permalink
Allow protocol-specific indexing (#3544)
Browse files Browse the repository at this point in the history
### Description

- Generalizes contract syncer hashmaps to store trait objects instead of
concrete structs, since both `SequencedDataContractSync` and
`WatermarkContractSync` implement `ContractSyncer`
- The nice thing this enables is being able to instantiate a different
type of syncer for the same contract, for different chain protocols
(e.g. for IGP, we can use a sequenced syncer on sealevel and cosmwasm,
and a watermark syncer on EVM).
- The problem is that `ContractSyncer` is implemented on different trait
bounds on `SequencedDataContractSync` and `WatermarkContractSync`. Most
notably, the `SequencedDataContractSync` impl bounds the indexed type by
the `Sequenced` trait. This means that if a type is to be indexed by
more than one syncer strategy, it needs to implement the union of bounds
of those strategies
- **Blocker:** In the IGP, this means that `InterchainGasPayment` needs
to implement `Sequenced` and we have no obvious way of getting that just
from the type. Even if the sequence is derived from the type, it has to
be a sensible value, since indexing reliability depends on it.
- Similar to the point above, we now have `contract_syncs`, a single
function that instantiates contract syncs for all chains. Since
`contract_syncs` must be able to build a `ContractSyncer` trait obj for
all syncer types, it bounds both `T` (the indexed type) and `D` (the log
store type) with a union of bounds, so it may be too strict for some
types in the future.
- The scraper now uses forward-backward indexing instead of just forward
indexing, because that simplified the refactor - there's now a
convenience trait called `TryFromWithMetrics` that allows adding this
bound: `SequenceIndexer<T>: TryFromWithMetrics<ChainConf>,`. This isn't
really extendable if we use non-sequence indexers, but it's fine for now
since this is used everywhere
- replaces the macro-based contract syncer builder with generic fns

Still todo:
- add new `IndexingDecorator<T>` struct that can wrap
`InterchainGasPayment`, so we can use sequence-aware indexing for the
IGP
- remove sequence logic from the watermark cursor

### Drive-by changes

<!--
Are there any minor or drive-by changes also included?
-->

### Related issues

<!--
- Fixes #[issue number here]
-->

### Backward compatibility

<!--
Are these changes backward compatible? Are there any infrastructure
implications, e.g. changes that would prohibit deploying older commits
using this infra tooling?

Yes/No
-->

### Testing

<!--
What kind of testing have these changes undergone?

None/Manual/Unit Tests
-->
  • Loading branch information
daniel-savu authored Apr 17, 2024
1 parent aea79c6 commit f4626eb
Show file tree
Hide file tree
Showing 13 changed files with 803 additions and 589 deletions.
49 changes: 27 additions & 22 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::{AgentMetrics, MetricsUpdater},
settings::ChainConf,
BaseAgent, ChainMetrics, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
BaseAgent, ChainMetrics, ContractSyncMetrics, ContractSyncer, CoreMetrics, HyperlaneAgentCore,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, MpmcChannel,
Expand Down Expand Up @@ -60,15 +59,14 @@ pub struct Relayer {
destination_chains: HashMap<HyperlaneDomain, ChainConf>,
#[as_ref]
core: HyperlaneAgentCore,
message_syncs: HashMap<HyperlaneDomain, Arc<SequencedDataContractSync<HyperlaneMessage>>>,
message_syncs: HashMap<HyperlaneDomain, Arc<dyn ContractSyncer<HyperlaneMessage>>>,
interchain_gas_payment_syncs:
HashMap<HyperlaneDomain, Arc<WatermarkContractSync<InterchainGasPayment>>>,
HashMap<HyperlaneDomain, Arc<dyn ContractSyncer<InterchainGasPayment>>>,
/// Context data for each (origin, destination) chain pair a message can be
/// sent between
msg_ctxs: HashMap<ContextKey, Arc<MessageContext>>,
prover_syncs: HashMap<HyperlaneDomain, Arc<RwLock<MerkleTreeBuilder>>>,
merkle_tree_hook_syncs:
HashMap<HyperlaneDomain, Arc<SequencedDataContractSync<MerkleTreeInsertion>>>,
merkle_tree_hook_syncs: HashMap<HyperlaneDomain, Arc<dyn ContractSyncer<MerkleTreeInsertion>>>,
dbs: HashMap<HyperlaneDomain, HyperlaneRocksDB>,
whitelist: Arc<MatchingList>,
blacklist: Arc<MatchingList>,
Expand Down Expand Up @@ -133,35 +131,46 @@ impl BaseAgent for Relayer {
let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&core_metrics));

let message_syncs = settings
.build_message_indexers(
.contract_syncs::<HyperlaneMessage, _>(
settings.origin_chains.iter(),
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
.map(|(d, db)| (d.clone(), Arc::new(db.clone())))
.collect(),
)
.await?;
.await?
.into_iter()
.map(|(k, v)| (k, v as _))
.collect();

let interchain_gas_payment_syncs = settings
.build_interchain_gas_payment_indexers(
.contract_syncs::<InterchainGasPayment, _>(
settings.origin_chains.iter(),
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
.map(|(d, db)| (d.clone(), Arc::new(db.clone())))
.collect(),
)
.await?;
.await?
.into_iter()
.map(|(k, v)| (k, v as _))
.collect();

let merkle_tree_hook_syncs = settings
.build_merkle_tree_hook_indexers(
.contract_syncs::<MerkleTreeInsertion, _>(
settings.origin_chains.iter(),
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
.map(|(d, db)| (d.clone(), Arc::new(db.clone())))
.collect(),
)
.await?;
.await?
.into_iter()
.map(|(k, v)| (k, v as _))
.collect();

let whitelist = Arc::new(settings.whitelist);
let blacklist = Arc::new(settings.blacklist);
Expand Down Expand Up @@ -341,9 +350,7 @@ impl Relayer {
async fn run_message_sync(&self, origin: &HyperlaneDomain) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index_settings();
let contract_sync = self.message_syncs.get(origin).unwrap().clone();
let cursor = contract_sync
.forward_backward_message_sync_cursor(index_settings)
.await;
let cursor = contract_sync.cursor(index_settings).await;
tokio::spawn(async move {
contract_sync
.clone()
Expand All @@ -363,7 +370,7 @@ impl Relayer {
.get(origin)
.unwrap()
.clone();
let cursor = contract_sync.rate_limited_cursor(index_settings).await;
let cursor = contract_sync.cursor(index_settings).await;
tokio::spawn(async move { contract_sync.clone().sync("gas_payments", cursor).await })
.instrument(info_span!("ContractSync"))
}
Expand All @@ -374,9 +381,7 @@ impl Relayer {
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index.clone();
let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone();
let cursor = contract_sync
.forward_backward_message_sync_cursor(index_settings)
.await;
let cursor = contract_sync.cursor(index_settings).await;
tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await })
.instrument(info_span!("ContractSync"))
}
Expand Down
117 changes: 56 additions & 61 deletions rust/agents/scraper/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use derive_more::AsRef;
use futures::future::try_join_all;
use hyperlane_base::{
metrics::AgentMetrics, settings::IndexSettings, BaseAgent, ChainMetrics, ContractSyncMetrics,
CoreMetrics, HyperlaneAgentCore, MetricsUpdater,
ContractSyncer, CoreMetrics, HyperlaneAgentCore, MetricsUpdater,
};
use hyperlane_core::HyperlaneDomain;
use hyperlane_core::{Delivery, HyperlaneDomain, HyperlaneMessage, InterchainGasPayment};
use tokio::task::JoinHandle;
use tracing::{info_span, instrument::Instrumented, trace, Instrument};

Expand Down Expand Up @@ -173,41 +173,6 @@ impl Scraper {
}
}

/// Create a function to spawn task that syncs contract events
macro_rules! spawn_sync_task {
($name:ident, $cursor: ident, $label:literal) => {
async fn $name(
&self,
domain: HyperlaneDomain,
metrics: Arc<CoreMetrics>,
contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb,
index_settings: IndexSettings,
) -> Instrumented<JoinHandle<()>> {
let sync = self
.as_ref()
.settings
.$name(
&domain,
&metrics.clone(),
&contract_sync_metrics.clone(),
Arc::new(db.clone()),
)
.await
.unwrap();
let cursor = sync
.$cursor(index_settings.clone())
.await;
tokio::spawn(async move {
sync
.sync($label, cursor)
.await
})
.instrument(info_span!("ChainContractSync", chain=%domain.name(), event=$label))
}
}
}

impl Scraper {
async fn build_message_indexer(
&self,
Expand All @@ -220,39 +185,69 @@ impl Scraper {
let sync = self
.as_ref()
.settings
.build_message_indexer(
.sequenced_contract_sync::<HyperlaneMessage, _>(
&domain,
&metrics.clone(),
&contract_sync_metrics.clone(),
Arc::new(db.clone()),
db.into(),
)
.await
.unwrap();
let latest_nonce = self
.scrapers
.get(&domain.id())
.unwrap()
.db
.last_message_nonce()
.await
.unwrap_or(None)
.unwrap_or(0);
let cursor = sync
.forward_message_sync_cursor(index_settings.clone(), latest_nonce.saturating_sub(1))
.await;
let cursor = sync.cursor(index_settings.clone()).await;
tokio::spawn(async move { sync.sync("message_dispatch", cursor).await }).instrument(
info_span!("ChainContractSync", chain=%domain.name(), event="message_dispatch"),
)
}

spawn_sync_task!(
build_delivery_indexer,
rate_limited_cursor,
"message_delivery"
);
spawn_sync_task!(
build_interchain_gas_payment_indexer,
rate_limited_cursor,
"gas_payment"
);
async fn build_delivery_indexer(
&self,
domain: HyperlaneDomain,
metrics: Arc<CoreMetrics>,
contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb,
index_settings: IndexSettings,
) -> Instrumented<JoinHandle<()>> {
let sync = self
.as_ref()
.settings
.watermark_contract_sync::<Delivery, _>(
&domain,
&metrics.clone(),
&contract_sync_metrics.clone(),
Arc::new(db.clone()) as _,
)
.await
.unwrap();

let label = "message_delivery";
let cursor = sync.cursor(index_settings.clone()).await;
tokio::spawn(async move { sync.sync(label, cursor).await })
.instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label))
}

async fn build_interchain_gas_payment_indexer(
&self,
domain: HyperlaneDomain,
metrics: Arc<CoreMetrics>,
contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb,
index_settings: IndexSettings,
) -> Instrumented<JoinHandle<()>> {
let sync = self
.as_ref()
.settings
.watermark_contract_sync::<InterchainGasPayment, _>(
&domain,
&metrics.clone(),
&contract_sync_metrics.clone(),
Arc::new(db.clone()),
)
.await
.unwrap();

let label = "gas_payment";
let cursor = sync.cursor(index_settings.clone()).await;
tokio::spawn(async move { sync.sync(label, cursor).await })
.instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label))
}
}
13 changes: 5 additions & 8 deletions rust/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::AgentMetrics,
settings::ChainConf,
BaseAgent, ChainMetrics, CheckpointSyncer, ContractSyncMetrics, CoreMetrics,
BaseAgent, ChainMetrics, CheckpointSyncer, ContractSyncMetrics, ContractSyncer, CoreMetrics,
HyperlaneAgentCore, MetricsUpdater, SequencedDataContractSync,
};

Expand Down Expand Up @@ -97,14 +97,13 @@ impl BaseAgent for Validator {
let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));

let merkle_tree_hook_sync = settings
.build_merkle_tree_hook_indexer(
.sequenced_contract_sync::<MerkleTreeInsertion, _>(
&settings.origin_chain,
&metrics,
&contract_sync_metrics,
Arc::new(msg_db.clone()),
msg_db.clone().into(),
)
.await?
.into();
.await?;

Ok(Self {
origin_chain: settings.origin_chain,
Expand Down Expand Up @@ -208,9 +207,7 @@ impl Validator {
let index_settings =
self.as_ref().settings.chains[self.origin_chain.name()].index_settings();
let contract_sync = self.merkle_tree_hook_sync.clone();
let cursor = contract_sync
.forward_backward_message_sync_cursor(index_settings)
.await;
let cursor = contract_sync.cursor(index_settings).await;
tokio::spawn(async move {
contract_sync.clone().sync("merkle_tree_hook", cursor).await;
})
Expand Down
Loading

0 comments on commit f4626eb

Please sign in to comment.