Skip to content

Commit

Permalink
chore: use HyperlaneDb trait in validator submitter
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Oct 2, 2024
1 parent 9cffef5 commit b0a6f1a
Show file tree
Hide file tree
Showing 11 changed files with 521 additions and 172 deletions.
2 changes: 1 addition & 1 deletion rust/main/agents/relayer/src/merkle_tree/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
use async_trait::async_trait;
use derive_new::new;
use eyre::Result;
use hyperlane_base::db::{HyperlaneDB, HyperlaneRocksDB};
use hyperlane_base::db::{HyperlaneDb, HyperlaneRocksDB};
use hyperlane_core::{HyperlaneDomain, MerkleTreeInsertion};
use prometheus::IntGauge;
use tokio::sync::RwLock;
Expand Down
2 changes: 1 addition & 1 deletion rust/main/agents/relayer/src/msg/metadata/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
use async_trait::async_trait;
use derive_new::new;
use eyre::{Context, Result};
use hyperlane_base::db::{HyperlaneDB, HyperlaneRocksDB};
use hyperlane_base::db::{HyperlaneDb, HyperlaneRocksDB};
use hyperlane_base::{
settings::{ChainConf, CheckpointSyncerConf},
CheckpointSyncer, CoreMetrics, MultisigCheckpointSyncer,
Expand Down
2 changes: 1 addition & 1 deletion rust/main/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use async_trait::async_trait;
use derive_new::new;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneDB, HyperlaneRocksDB},
db::{HyperlaneDb, HyperlaneRocksDB},
CoreMetrics,
};
use hyperlane_core::{
Expand Down
163 changes: 156 additions & 7 deletions rust/main/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use derive_new::new;
use ethers::utils::hex;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneDB, HyperlaneRocksDB},
db::{HyperlaneDb, HyperlaneRocksDB},
CoreMetrics,
};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, QueueOperation};
Expand Down Expand Up @@ -52,7 +52,7 @@ struct ForwardBackwardIterator {

impl ForwardBackwardIterator {
#[instrument(skip(db), ret)]
fn new(db: Arc<dyn HyperlaneDB>) -> Self {
fn new(db: Arc<dyn HyperlaneDb>) -> Self {
let high_nonce = db.retrieve_highest_seen_message_nonce().ok().flatten();
let domain = db.domain().name().to_owned();
let high_nonce_iter = DirectionalNonceIterator::new(
Expand Down Expand Up @@ -125,7 +125,7 @@ enum NonceDirection {
struct DirectionalNonceIterator {
nonce: Option<u32>,
direction: NonceDirection,
db: Arc<dyn HyperlaneDB>,
db: Arc<dyn HyperlaneDb>,
domain_name: String,
}

Expand Down Expand Up @@ -329,7 +329,7 @@ impl MessageProcessor {
send_channels,
destination_ctxs,
metric_app_contexts,
nonce_iterator: ForwardBackwardIterator::new(Arc::new(db) as Arc<dyn HyperlaneDB>),
nonce_iterator: ForwardBackwardIterator::new(Arc::new(db) as Arc<dyn HyperlaneDb>),
}
}

Expand Down Expand Up @@ -397,9 +397,16 @@ mod test {

use super::*;
use hyperlane_base::{
db::{test_utils, DbResult, HyperlaneRocksDB},
db::{
test_utils, DbResult, HyperlaneRocksDB, InterchainGasExpenditureData,
InterchainGasPaymentData,
},
settings::{ChainConf, ChainConnectionConf, Settings},
};
use hyperlane_core::{
GasPaymentKey, InterchainGasPayment, InterchainGasPaymentMeta, MerkleTreeInsertion,
PendingOperationStatus, H256,
};
use hyperlane_test::mocks::{MockMailboxContract, MockValidatorAnnounceContract};
use prometheus::{IntCounter, Registry};
use tokio::{
Expand Down Expand Up @@ -594,11 +601,153 @@ mod test {
fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result;
}

impl HyperlaneDB for Db {
impl HyperlaneDb for Db {
/// Retrieve the nonce of the highest processed message we're aware of
fn retrieve_highest_seen_message_nonce(&self) -> DbResult<Option<u32>>;

/// Retrieve a message by its nonce
fn retrieve_message_by_nonce(&self, nonce: u32) -> DbResult<Option<HyperlaneMessage>>;
fn retrieve_processed_by_nonce(&self, nonce: u32) -> DbResult<Option<bool>>;

/// Retrieve whether a message has been processed
fn retrieve_processed_by_nonce(&self, nonce: &u32) -> DbResult<Option<bool>>;

/// Get the origin domain of the database
fn domain(&self) -> &HyperlaneDomain;

fn store_message_id_by_nonce(&self, nonce: &u32, id: &H256) -> DbResult<()>;

fn retrieve_message_id_by_nonce(&self, nonce: &u32) -> DbResult<Option<H256>>;

fn store_message_by_id(&self, id: &H256, message: &HyperlaneMessage) -> DbResult<()>;

fn retrieve_message_by_id(&self, id: &H256) -> DbResult<Option<HyperlaneMessage>>;

fn store_dispatched_block_number_by_nonce(
&self,
nonce: &u32,
block_number: &u64,
) -> DbResult<()>;

fn retrieve_dispatched_block_number_by_nonce(&self, nonce: &u32) -> DbResult<Option<u64>>;

/// Store whether a message was processed by its nonce
fn store_processed_by_nonce(&self, nonce: &u32, processed: &bool) -> DbResult<()>;

fn store_processed_by_gas_payment_meta(
&self,
meta: &InterchainGasPaymentMeta,
processed: &bool,
) -> DbResult<()>;

fn retrieve_processed_by_gas_payment_meta(
&self,
meta: &InterchainGasPaymentMeta,
) -> DbResult<Option<bool>>;

fn store_interchain_gas_expenditure_data_by_message_id(
&self,
message_id: &H256,
data: &InterchainGasExpenditureData,
) -> DbResult<()>;

fn retrieve_interchain_gas_expenditure_data_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<InterchainGasExpenditureData>>;

/// Store the status of an operation by its message id
fn store_status_by_message_id(
&self,
message_id: &H256,
status: &PendingOperationStatus,
) -> DbResult<()>;

/// Retrieve the status of an operation by its message id
fn retrieve_status_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<PendingOperationStatus>>;

fn store_interchain_gas_payment_data_by_gas_payment_key(
&self,
key: &GasPaymentKey,
data: &InterchainGasPaymentData,
) -> DbResult<()>;

fn retrieve_interchain_gas_payment_data_by_gas_payment_key(
&self,
key: &GasPaymentKey,
) -> DbResult<Option<InterchainGasPaymentData>>;

fn store_gas_payment_by_sequence(
&self,
sequence: &u32,
payment: &InterchainGasPayment,
) -> DbResult<()>;

fn retrieve_gas_payment_by_sequence(
&self,
sequence: &u32,
) -> DbResult<Option<InterchainGasPayment>>;

fn store_gas_payment_block_by_sequence(
&self,
sequence: &u32,
block_number: &u64,
) -> DbResult<()>;

fn retrieve_gas_payment_block_by_sequence(&self, sequence: &u32) -> DbResult<Option<u64>>;

/// Store the retry count for a pending message by its message id
fn store_pending_message_retry_count_by_message_id(
&self,
message_id: &H256,
count: &u32,
) -> DbResult<()>;

/// Retrieve the retry count for a pending message by its message id
fn retrieve_pending_message_retry_count_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<u32>>;

fn store_merkle_tree_insertion_by_leaf_index(
&self,
leaf_index: &u32,
insertion: &MerkleTreeInsertion,
) -> DbResult<()>;

/// Retrieve the merkle tree insertion event by its leaf index
fn retrieve_merkle_tree_insertion_by_leaf_index(
&self,
leaf_index: &u32,
) -> DbResult<Option<MerkleTreeInsertion>>;

fn store_merkle_leaf_index_by_message_id(
&self,
message_id: &H256,
leaf_index: &u32,
) -> DbResult<()>;

/// Retrieve the merkle leaf index of a message in the merkle tree
fn retrieve_merkle_leaf_index_by_message_id(&self, message_id: &H256) -> DbResult<Option<u32>>;

fn store_merkle_tree_insertion_block_number_by_leaf_index(
&self,
leaf_index: &u32,
block_number: &u64,
) -> DbResult<()>;

fn retrieve_merkle_tree_insertion_block_number_by_leaf_index(
&self,
leaf_index: &u32,
) -> DbResult<Option<u64>>;

fn store_highest_seen_message_nonce_number(&self, nonce: &u32) -> DbResult<()>;

/// Retrieve the nonce of the highest processed message we're aware of
fn retrieve_highest_seen_message_nonce_number(&self) -> DbResult<Option<u32>>;

}
}

Expand Down
11 changes: 6 additions & 5 deletions rust/main/agents/validator/src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use std::vec;

use hyperlane_base::db::HyperlaneDb;
use hyperlane_core::rpc_clients::call_and_retry_indefinitely;
use hyperlane_core::{ChainResult, MerkleTreeHook};
use prometheus::IntGauge;
use tokio::time::sleep;
use tracing::{debug, error, info};

use hyperlane_base::{db::HyperlaneRocksDB, CheckpointSyncer, CoreMetrics};
use hyperlane_base::{CheckpointSyncer, CoreMetrics};
use hyperlane_core::{
accumulator::incremental::IncrementalMerkle, Checkpoint, CheckpointWithMessageId,
HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneSignerExt,
Expand All @@ -23,7 +24,7 @@ pub(crate) struct ValidatorSubmitter {
signer: SingletonSignerHandle,
merkle_tree_hook: Arc<dyn MerkleTreeHook>,
checkpoint_syncer: Arc<dyn CheckpointSyncer>,
message_db: HyperlaneRocksDB,
db: Arc<dyn HyperlaneDb>,
metrics: ValidatorSubmitterMetrics,
}

Expand All @@ -34,7 +35,7 @@ impl ValidatorSubmitter {
merkle_tree_hook: Arc<dyn MerkleTreeHook>,
signer: SingletonSignerHandle,
checkpoint_syncer: Arc<dyn CheckpointSyncer>,
message_db: HyperlaneRocksDB,
db: Arc<dyn HyperlaneDb>,
metrics: ValidatorSubmitterMetrics,
) -> Self {
Self {
Expand All @@ -43,7 +44,7 @@ impl ValidatorSubmitter {
merkle_tree_hook,
signer,
checkpoint_syncer,
message_db,
db,
metrics,
}
}
Expand Down Expand Up @@ -159,7 +160,7 @@ impl ValidatorSubmitter {
// and convert the correctness_checkpoint.index to a count by adding 1.
while tree.count() as u32 <= correctness_checkpoint.index {
if let Some(insertion) = self
.message_db
.db
.retrieve_merkle_tree_insertion_by_leaf_index(&(tree.count() as u32))
.unwrap_or_else(|err| {
panic!(
Expand Down
4 changes: 2 additions & 2 deletions rust/main/agents/validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::{task::JoinHandle, time::sleep};
use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument};

use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
db::{HyperlaneDb, HyperlaneRocksDB, DB},
metrics::AgentMetrics,
settings::ChainConf,
AgentMetadata, BaseAgent, ChainMetrics, CheckpointSyncer, ContractSyncMetrics, ContractSyncer,
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Validator {
self.merkle_tree_hook.clone(),
self.signer.clone(),
self.checkpoint_syncer.clone(),
self.db.clone(),
Arc::new(self.db.clone()) as Arc<dyn HyperlaneDb>,
ValidatorSubmitterMetrics::new(&self.core.metrics, &self.origin_chain),
);

Expand Down
Loading

0 comments on commit b0a6f1a

Please sign in to comment.