diff --git a/rust/agents/relayer/src/msg/metadata/aggregation.rs b/rust/agents/relayer/src/msg/metadata/aggregation.rs index fed501b622..ec5301c72b 100644 --- a/rust/agents/relayer/src/msg/metadata/aggregation.rs +++ b/rust/agents/relayer/src/msg/metadata/aggregation.rs @@ -117,7 +117,7 @@ impl AggregationIsmMetadataBuilder { #[async_trait] impl MetadataBuilder for AggregationIsmMetadataBuilder { - #[instrument(err, skip(self))] + #[instrument(err, skip(self), ret)] async fn build( &self, ism_address: H256, diff --git a/rust/agents/relayer/src/msg/metadata/base.rs b/rust/agents/relayer/src/msg/metadata/base.rs index 9fb65902e6..74449d10ee 100644 --- a/rust/agents/relayer/src/msg/metadata/base.rs +++ b/rust/agents/relayer/src/msg/metadata/base.rs @@ -41,6 +41,7 @@ pub enum MetadataBuilderError { MaxDepthExceeded(u32), } +#[derive(Debug)] pub struct IsmWithMetadataAndType { pub ism: Box, pub metadata: Option>, @@ -224,7 +225,7 @@ impl MessageMetadataBuilder { } } - #[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()))] + #[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()), ret)] pub async fn build_ism_and_metadata( &self, ism_address: H256, diff --git a/rust/agents/relayer/src/msg/metadata/routing.rs b/rust/agents/relayer/src/msg/metadata/routing.rs index c51cd69baf..c16fbc2a2d 100644 --- a/rust/agents/relayer/src/msg/metadata/routing.rs +++ b/rust/agents/relayer/src/msg/metadata/routing.rs @@ -14,7 +14,7 @@ pub struct RoutingIsmMetadataBuilder { #[async_trait] impl MetadataBuilder for RoutingIsmMetadataBuilder { - #[instrument(err, skip(self))] + #[instrument(err, skip(self), ret)] async fn build( &self, ism_address: H256, diff --git a/rust/agents/relayer/src/msg/pending_message.rs b/rust/agents/relayer/src/msg/pending_message.rs index 7d0c4072e3..1bfabeded1 100644 --- a/rust/agents/relayer/src/msg/pending_message.rs +++ b/rust/agents/relayer/src/msg/pending_message.rs @@ -6,12 +6,11 @@ use std::{ use async_trait::async_trait; use derive_new::new; -use ethers::utils::hex; use eyre::Result; use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics}; use hyperlane_core::{ BatchItem, ChainCommunicationError, ChainResult, HyperlaneChain, HyperlaneDomain, - HyperlaneMessage, Mailbox, TryBatchAs, TxOutcome, TxSubmissionData, H256, U256, + HyperlaneMessage, Mailbox, MessageSubmissionData, TryBatchAs, TxOutcome, H256, U256, }; use prometheus::{IntCounter, IntGauge}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -22,7 +21,7 @@ use super::{ pending_operation::*, }; -const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) { +pub const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) { // Wait 5 seconds after submitting the message before confirming in test mode Duration::from_secs(5) } else { @@ -58,7 +57,7 @@ pub struct PendingMessage { #[new(default)] submitted: bool, #[new(default)] - submission_data: Option>, + submission_data: Option>, #[new(default)] num_retries: u32, #[new(value = "Instant::now()")] @@ -159,7 +158,7 @@ impl PendingOperation for PendingMessage { if is_already_delivered { debug!("Message has already been delivered, marking as submitted."); self.submitted = true; - self.next_attempt_after = Some(Instant::now() + CONFIRM_DELAY); + self.set_next_attempt_after(CONFIRM_DELAY); return PendingOperationResult::Confirm; } @@ -245,7 +244,7 @@ impl PendingOperation for PendingMessage { } } - self.submission_data = Some(Box::new(TxSubmissionData { + self.submission_data = Some(Box::new(MessageSubmissionData { metadata, gas_limit, })); @@ -259,7 +258,6 @@ impl PendingOperation for PendingMessage { return; } - debug!("Getting submission_data"); let state = self .submission_data .take() @@ -291,7 +289,7 @@ impl PendingOperation for PendingMessage { // Provider error; just try again later // Note: this means that we are using `NotReady` for a retryable error case self.inc_attempts(); - self.on_reprepare() + PendingOperationResult::NotReady }); if !self.is_ready() { @@ -314,8 +312,6 @@ impl PendingOperation for PendingMessage { submission=?self.submission_outcome, "Message successfully processed" ); - self.submitted = true; - self.reset_attempts(); PendingOperationResult::Success } else { if let Some(outcome) = &self.submission_outcome { @@ -329,7 +325,7 @@ impl PendingOperation for PendingMessage { } warn!( tx_outcome=?self.submission_outcome, - message_id=hex::encode(self.message.id()), + message_id=?self.message.id(), "Transaction attempting to process message either reverted or was reorged" ); self.on_reprepare() diff --git a/rust/agents/relayer/src/msg/pending_operation.rs b/rust/agents/relayer/src/msg/pending_operation.rs index 4e4f10f808..206e062e2a 100644 --- a/rust/agents/relayer/src/msg/pending_operation.rs +++ b/rust/agents/relayer/src/msg/pending_operation.rs @@ -148,7 +148,7 @@ pub enum PendingOperationResult { Reprepare, /// Do not attempt to run the operation again, forget about it Drop, - /// todo + /// Send this message straight to the confirm queue Confirm, } diff --git a/rust/agents/relayer/src/msg/serial_submitter.rs b/rust/agents/relayer/src/msg/serial_submitter.rs index 87f96a993c..5cb85d34ff 100644 --- a/rust/agents/relayer/src/msg/serial_submitter.rs +++ b/rust/agents/relayer/src/msg/serial_submitter.rs @@ -17,6 +17,7 @@ use hyperlane_core::{ MpmcReceiver, TxOutcome, }; +use crate::msg::pending_message::CONFIRM_DELAY; use crate::server::MessageRetryRequest; use super::op_queue::{OpQueue, QueueOperation}; @@ -211,11 +212,10 @@ async fn prepare_task( PendingOperationResult::Success => { debug!(?op, "Operation prepared"); metrics.ops_prepared.inc(); - // this send will pause this task if the submitter is not ready to accept yet + // TODO: push multiple messages at once submit_queue.push(op).await; } PendingOperationResult::NotReady => { - // none of the operations are ready yet, so wait for a little bit prepare_queue.push(op).await; } PendingOperationResult::Reprepare => { @@ -246,7 +246,6 @@ async fn submit_task( metrics: SerialSubmitterMetrics, ) { let recv_limit = max_batch_size as usize; - // looping is not an issue because `recv_many` will sleep if the channel is empty loop { let mut batch = submit_queue.pop_many(recv_limit).await; @@ -258,7 +257,7 @@ async fn submit_task( } std::cmp::Ordering::Equal => { let op = batch.pop().unwrap(); - submit_and_confirm_op(op, &mut confirm_queue, &metrics).await; + submit_single_operation(op, &mut confirm_queue, &metrics).await; } std::cmp::Ordering::Greater => { OperationBatch::new(batch, domain.clone()) @@ -270,7 +269,7 @@ async fn submit_task( } #[instrument(skip(confirm_queue, metrics), ret, level = "debug")] -async fn submit_and_confirm_op( +async fn submit_single_operation( mut op: QueueOperation, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics, @@ -309,7 +308,17 @@ async fn confirm_task( metrics.clone(), ) }); - join_all(futures).await; + let op_results = join_all(futures).await; + if op_results.iter().all(|op| { + matches!( + op, + PendingOperationResult::NotReady | PendingOperationResult::Confirm + ) + }) { + // None of the operations are ready, so wait for a little bit + // before checking again to prevent burning CPU + sleep(Duration::from_millis(500)).await; + } } } @@ -319,19 +328,19 @@ async fn confirm_operation( prepare_queue: OpQueue, confirm_queue: OpQueue, metrics: SerialSubmitterMetrics, -) { +) -> PendingOperationResult { trace!(?op, "Confirming operation"); debug_assert_eq!(*op.destination_domain(), domain); - match op.confirm().await { - PendingOperationResult::Success | PendingOperationResult::Confirm => { + let operation_result = op.confirm().await; + match operation_result { + PendingOperationResult::Success => { debug!(?op, "Operation confirmed"); metrics.ops_confirmed.inc(); } - PendingOperationResult::NotReady => { - // none of the operations are ready yet, so wait for a little bit + PendingOperationResult::NotReady | PendingOperationResult::Confirm => { + // TODO: push multiple messages at once confirm_queue.push(op).await; - sleep(Duration::from_secs(5)).await; } PendingOperationResult::Reprepare => { metrics.ops_failed.inc(); @@ -341,6 +350,7 @@ async fn confirm_operation( metrics.ops_dropped.inc(); } } + operation_result } #[derive(Debug, Clone)] @@ -390,9 +400,10 @@ impl OperationBatch { Ok(outcome) => { // TODO: use the `tx_outcome` with the total gas expenditure // We'll need to proportionally set `used_gas` based on the tx_outcome, so it can be updated in the confirm step - // which means we need to add a `set_transaction_outcome` fn to `PendingOperation`, and maybe also `set_next_attempt_after(CONFIRM_DELAY);` + // which means we need to add a `set_transaction_outcome` fn to `PendingOperation` info!(outcome=?outcome, batch_size=self.operations.len(), batch=?self.operations, "Submitted transaction batch"); - for op in self.operations { + for mut op in self.operations { + op.set_next_attempt_after(CONFIRM_DELAY); confirm_queue.push(op).await; } return; @@ -414,24 +425,23 @@ impl OperationBatch { .iter() .map(|op| op.try_batch()) .collect::>>>()?; + // We already assume that the relayer submits to a single mailbox per destination. // So it's fine to use the first item in the batch to get the mailbox. - let Some(first_item) = batch.first() else { return Err(ChainCommunicationError::BatchIsEmpty); }; - let mailbox = first_item.mailbox.clone(); // We use the estimated gas limit from the prior call to // `process_estimate_costs` to avoid a second gas estimation. - let outcome = mailbox.process_batch(&batch).await?; + let outcome = first_item.mailbox.process_batch(&batch).await?; metrics.ops_submitted.inc_by(self.operations.len() as u64); Ok(outcome) } async fn submit_serially(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) { for op in self.operations.into_iter() { - submit_and_confirm_op(op, confirm_queue, metrics).await; + submit_single_operation(op, confirm_queue, metrics).await; } } } diff --git a/rust/chains/hyperlane-ethereum/src/config.rs b/rust/chains/hyperlane-ethereum/src/config.rs index 5778c88c77..e56d968dc9 100644 --- a/rust/chains/hyperlane-ethereum/src/config.rs +++ b/rust/chains/hyperlane-ethereum/src/config.rs @@ -52,7 +52,7 @@ pub struct TransactionOverrides { pub max_priority_fee_per_gas: Option, } -/// Ethereum transaction overrides. +/// Config for batching messages #[derive(Debug, Clone, Default)] pub struct MessageBatchConfig { /// Optional Multicall3 contract address diff --git a/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs b/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs index 71f4b07345..5718dd1df8 100644 --- a/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs +++ b/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs @@ -26,7 +26,7 @@ use crate::interfaces::arbitrum_node_interface::ArbitrumNodeInterface; use crate::interfaces::i_mailbox::{ IMailbox as EthereumMailboxInternal, ProcessCall, IMAILBOX_ABI, }; -use crate::tx::{call_with_lag, fill_tx_gas_params, report_tx, GAS_ESTIMATE_BUFFER}; +use crate::tx::{call_with_lag, fill_tx_gas_params, report_tx, BATCH_GAS_LIMIT}; use crate::{BuildableWithProvider, ConnectionConf, EthereumProvider, TransactionOverrides}; use super::multicall::{self, build_multicall}; @@ -380,13 +380,12 @@ where .map_err(|e| HyperlaneEthereumError::MulticallError(e.to_string()))?; let contract_call_futures = messages .iter() - .map(|batch| async { + .map(|batch_item| async { // move ownership of the batch inside the closure - let batch = batch.clone(); self.process_contract_call( - &batch.data, - &batch.submission_data.metadata, - Some(batch.submission_data.gas_limit), + &batch_item.data, + &batch_item.submission_data.metadata, + Some(batch_item.submission_data.gas_limit), ) .await }) @@ -396,12 +395,9 @@ where .into_iter() .collect::>>()?; - let batch_call = multicall::batch::<_, ()>(&mut multicall, contract_calls).await?; + let batch_call = multicall::batch::<_, ()>(&mut multicall, contract_calls); let call = self - .add_gas_overrides( - batch_call, - GAS_ESTIMATE_BUFFER.checked_mul(80).map(Into::into), - ) + .add_gas_overrides(batch_call, Some(BATCH_GAS_LIMIT.into())) .await?; let receipt = report_tx(call).await?; diff --git a/rust/chains/hyperlane-ethereum/src/contracts/multicall.rs b/rust/chains/hyperlane-ethereum/src/contracts/multicall.rs index 0590515c54..6f2c279216 100644 --- a/rust/chains/hyperlane-ethereum/src/contracts/multicall.rs +++ b/rust/chains/hyperlane-ethereum/src/contracts/multicall.rs @@ -2,9 +2,7 @@ use std::sync::Arc; use ethers::{abi::Detokenize, providers::Middleware}; use ethers_contract::{builders::ContractCall, Multicall, MulticallResult, MulticallVersion}; -use hyperlane_core::{ - utils::hex_or_base58_to_h256, ChainResult, HyperlaneDomain, HyperlaneProvider, -}; +use hyperlane_core::{utils::hex_or_base58_to_h256, HyperlaneDomain, HyperlaneProvider}; use crate::{ConnectionConf, EthereumProvider}; @@ -36,10 +34,10 @@ pub async fn build_multicall( Ok(multicall) } -pub async fn batch( +pub fn batch( multicall: &mut Multicall, calls: Vec>, -) -> ChainResult>> { +) -> ContractCall> { // clear any calls that were in the multicall beforehand multicall.clear_calls(); @@ -47,6 +45,5 @@ pub async fn batch( multicall.add_call(call, ALLOW_BATCH_FAILURES); }); - let res = multicall.as_aggregate_3_value(); - Ok(res) + multicall.as_aggregate_3_value() } diff --git a/rust/chains/hyperlane-ethereum/src/tx.rs b/rust/chains/hyperlane-ethereum/src/tx.rs index a277a26147..4ebd35ac35 100644 --- a/rust/chains/hyperlane-ethereum/src/tx.rs +++ b/rust/chains/hyperlane-ethereum/src/tx.rs @@ -22,7 +22,10 @@ use tracing::{error, info}; use crate::{Middleware, TransactionOverrides}; /// An amount of gas to add to the estimated gas -pub const GAS_ESTIMATE_BUFFER: u32 = 50000; +pub const GAS_ESTIMATE_BUFFER: u32 = 50_000; + +/// The gas limit to temporarily hardcode when sending multicall operations +pub const BATCH_GAS_LIMIT: u32 = 4_000_000; const PENDING_TRANSACTION_POLLING_INTERVAL: Duration = Duration::from_secs(2); diff --git a/rust/hyperlane-core/src/traits/mailbox.rs b/rust/hyperlane-core/src/traits/mailbox.rs index 2accc2c0cf..fbd9dd5bc0 100644 --- a/rust/hyperlane-core/src/traits/mailbox.rs +++ b/rust/hyperlane-core/src/traits/mailbox.rs @@ -45,7 +45,6 @@ pub trait Mailbox: HyperlaneContract + Send + Sync + Debug { &self, _messages: &[BatchItem], ) -> ChainResult { - tracing::warn!("In Mailbox process_batch, returning BatchingFailed"); // Batching is not supported by default Err(ChainCommunicationError::BatchingFailed) } diff --git a/rust/hyperlane-core/src/types/transaction.rs b/rust/hyperlane-core/src/types/transaction.rs index 3ef5526fa3..3f6085d659 100644 --- a/rust/hyperlane-core/src/types/transaction.rs +++ b/rust/hyperlane-core/src/types/transaction.rs @@ -5,8 +5,9 @@ use derive_new::new; /// State for the next submission attempt generated by a prepare call. #[derive(Clone, Debug)] -pub struct TxSubmissionData { - /// Transaction metadata +pub struct MessageSubmissionData { + /// Transaction metadata - currently only applies to Messages, so this field can be made optional or generic if other + /// operations are submitted in the future. pub metadata: Vec, /// Gas limit for the transaction pub gas_limit: U256, @@ -18,7 +19,7 @@ pub struct BatchItem { /// The data to be submitted pub data: T, /// Data to do with this transaction submission - pub submission_data: TxSubmissionData, + pub submission_data: MessageSubmissionData, /// The mailbox to send the result to /// TODO: turn this into a `destination contract` object when we batch more than just messages pub mailbox: Arc, @@ -29,7 +30,6 @@ pub struct BatchItem { pub trait TryBatchAs { /// Try to convert the item into a batch item fn try_batch(&self) -> ChainResult> { - tracing::warn!("In TryBatchAs, returning BatchingFailed"); Err(crate::ChainCommunicationError::BatchingFailed) } }