Skip to content

Commit

Permalink
fix: pr remediations
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed May 3, 2024
1 parent 2b2470d commit 8712087
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 58 deletions.
2 changes: 1 addition & 1 deletion rust/agents/relayer/src/msg/metadata/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl AggregationIsmMetadataBuilder {

#[async_trait]
impl MetadataBuilder for AggregationIsmMetadataBuilder {
#[instrument(err, skip(self))]
#[instrument(err, skip(self), ret)]
async fn build(
&self,
ism_address: H256,
Expand Down
3 changes: 2 additions & 1 deletion rust/agents/relayer/src/msg/metadata/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub enum MetadataBuilderError {
MaxDepthExceeded(u32),
}

#[derive(Debug)]
pub struct IsmWithMetadataAndType {
pub ism: Box<dyn InterchainSecurityModule>,
pub metadata: Option<Vec<u8>>,
Expand Down Expand Up @@ -224,7 +225,7 @@ impl MessageMetadataBuilder {
}
}

#[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()))]
#[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()), ret)]
pub async fn build_ism_and_metadata(
&self,
ism_address: H256,
Expand Down
2 changes: 1 addition & 1 deletion rust/agents/relayer/src/msg/metadata/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct RoutingIsmMetadataBuilder {

#[async_trait]
impl MetadataBuilder for RoutingIsmMetadataBuilder {
#[instrument(err, skip(self))]
#[instrument(err, skip(self), ret)]
async fn build(
&self,
ism_address: H256,
Expand Down
18 changes: 7 additions & 11 deletions rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ use std::{

use async_trait::async_trait;
use derive_new::new;
use ethers::utils::hex;
use eyre::Result;
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{
BatchItem, ChainCommunicationError, ChainResult, HyperlaneChain, HyperlaneDomain,
HyperlaneMessage, Mailbox, TryBatchAs, TxOutcome, TxSubmissionData, H256, U256,
HyperlaneMessage, Mailbox, MessageSubmissionData, TryBatchAs, TxOutcome, H256, U256,
};
use prometheus::{IntCounter, IntGauge};
use tracing::{debug, error, info, instrument, trace, warn};
Expand All @@ -22,7 +21,7 @@ use super::{
pending_operation::*,
};

const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) {
pub const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) {
// Wait 5 seconds after submitting the message before confirming in test mode
Duration::from_secs(5)
} else {
Expand Down Expand Up @@ -58,7 +57,7 @@ pub struct PendingMessage {
#[new(default)]
submitted: bool,
#[new(default)]
submission_data: Option<Box<TxSubmissionData>>,
submission_data: Option<Box<MessageSubmissionData>>,
#[new(default)]
num_retries: u32,
#[new(value = "Instant::now()")]
Expand Down Expand Up @@ -159,7 +158,7 @@ impl PendingOperation for PendingMessage {
if is_already_delivered {
debug!("Message has already been delivered, marking as submitted.");
self.submitted = true;
self.next_attempt_after = Some(Instant::now() + CONFIRM_DELAY);
self.set_next_attempt_after(CONFIRM_DELAY);
return PendingOperationResult::Confirm;
}

Expand Down Expand Up @@ -245,7 +244,7 @@ impl PendingOperation for PendingMessage {
}
}

self.submission_data = Some(Box::new(TxSubmissionData {
self.submission_data = Some(Box::new(MessageSubmissionData {
metadata,
gas_limit,
}));
Expand All @@ -259,7 +258,6 @@ impl PendingOperation for PendingMessage {
return;
}

debug!("Getting submission_data");
let state = self
.submission_data
.take()
Expand Down Expand Up @@ -291,7 +289,7 @@ impl PendingOperation for PendingMessage {
// Provider error; just try again later
// Note: this means that we are using `NotReady` for a retryable error case
self.inc_attempts();
self.on_reprepare()
PendingOperationResult::NotReady
});

if !self.is_ready() {
Expand All @@ -314,8 +312,6 @@ impl PendingOperation for PendingMessage {
submission=?self.submission_outcome,
"Message successfully processed"
);
self.submitted = true;
self.reset_attempts();
PendingOperationResult::Success
} else {
if let Some(outcome) = &self.submission_outcome {
Expand All @@ -329,7 +325,7 @@ impl PendingOperation for PendingMessage {
}
warn!(
tx_outcome=?self.submission_outcome,
message_id=hex::encode(self.message.id()),
message_id=?self.message.id(),
"Transaction attempting to process message either reverted or was reorged"
);
self.on_reprepare()
Expand Down
2 changes: 1 addition & 1 deletion rust/agents/relayer/src/msg/pending_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub enum PendingOperationResult {
Reprepare,
/// Do not attempt to run the operation again, forget about it
Drop,
/// todo
/// Send this message straight to the confirm queue
Confirm,
}

Expand Down
46 changes: 28 additions & 18 deletions rust/agents/relayer/src/msg/serial_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use hyperlane_core::{
MpmcReceiver, TxOutcome,
};

use crate::msg::pending_message::CONFIRM_DELAY;
use crate::server::MessageRetryRequest;

use super::op_queue::{OpQueue, QueueOperation};
Expand Down Expand Up @@ -211,11 +212,10 @@ async fn prepare_task(
PendingOperationResult::Success => {
debug!(?op, "Operation prepared");
metrics.ops_prepared.inc();
// this send will pause this task if the submitter is not ready to accept yet
// TODO: push multiple messages at once
submit_queue.push(op).await;
}
PendingOperationResult::NotReady => {
// none of the operations are ready yet, so wait for a little bit
prepare_queue.push(op).await;
}
PendingOperationResult::Reprepare => {
Expand Down Expand Up @@ -246,7 +246,6 @@ async fn submit_task(
metrics: SerialSubmitterMetrics,
) {
let recv_limit = max_batch_size as usize;
// looping is not an issue because `recv_many` will sleep if the channel is empty
loop {
let mut batch = submit_queue.pop_many(recv_limit).await;

Expand All @@ -258,7 +257,7 @@ async fn submit_task(
}
std::cmp::Ordering::Equal => {
let op = batch.pop().unwrap();
submit_and_confirm_op(op, &mut confirm_queue, &metrics).await;
submit_single_operation(op, &mut confirm_queue, &metrics).await;
}
std::cmp::Ordering::Greater => {
OperationBatch::new(batch, domain.clone())
Expand All @@ -270,7 +269,7 @@ async fn submit_task(
}

#[instrument(skip(confirm_queue, metrics), ret, level = "debug")]
async fn submit_and_confirm_op(
async fn submit_single_operation(
mut op: QueueOperation,
confirm_queue: &mut OpQueue,
metrics: &SerialSubmitterMetrics,
Expand Down Expand Up @@ -309,7 +308,17 @@ async fn confirm_task(
metrics.clone(),
)
});
join_all(futures).await;
let op_results = join_all(futures).await;
if op_results.iter().all(|op| {
matches!(
op,
PendingOperationResult::NotReady | PendingOperationResult::Confirm
)
}) {
// None of the operations are ready, so wait for a little bit
// before checking again to prevent burning CPU
sleep(Duration::from_millis(500)).await;
}
}
}

Expand All @@ -319,19 +328,19 @@ async fn confirm_operation(
prepare_queue: OpQueue,
confirm_queue: OpQueue,
metrics: SerialSubmitterMetrics,
) {
) -> PendingOperationResult {
trace!(?op, "Confirming operation");
debug_assert_eq!(*op.destination_domain(), domain);

match op.confirm().await {
PendingOperationResult::Success | PendingOperationResult::Confirm => {
let operation_result = op.confirm().await;
match operation_result {
PendingOperationResult::Success => {
debug!(?op, "Operation confirmed");
metrics.ops_confirmed.inc();
}
PendingOperationResult::NotReady => {
// none of the operations are ready yet, so wait for a little bit
PendingOperationResult::NotReady | PendingOperationResult::Confirm => {
// TODO: push multiple messages at once
confirm_queue.push(op).await;
sleep(Duration::from_secs(5)).await;
}
PendingOperationResult::Reprepare => {
metrics.ops_failed.inc();
Expand All @@ -341,6 +350,7 @@ async fn confirm_operation(
metrics.ops_dropped.inc();
}
}
operation_result
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -390,9 +400,10 @@ impl OperationBatch {
Ok(outcome) => {
// TODO: use the `tx_outcome` with the total gas expenditure
// We'll need to proportionally set `used_gas` based on the tx_outcome, so it can be updated in the confirm step
// which means we need to add a `set_transaction_outcome` fn to `PendingOperation`, and maybe also `set_next_attempt_after(CONFIRM_DELAY);`
// which means we need to add a `set_transaction_outcome` fn to `PendingOperation`
info!(outcome=?outcome, batch_size=self.operations.len(), batch=?self.operations, "Submitted transaction batch");
for op in self.operations {
for mut op in self.operations {
op.set_next_attempt_after(CONFIRM_DELAY);
confirm_queue.push(op).await;
}
return;
Expand All @@ -414,24 +425,23 @@ impl OperationBatch {
.iter()
.map(|op| op.try_batch())
.collect::<ChainResult<Vec<BatchItem<HyperlaneMessage>>>>()?;

// We already assume that the relayer submits to a single mailbox per destination.
// So it's fine to use the first item in the batch to get the mailbox.

let Some(first_item) = batch.first() else {
return Err(ChainCommunicationError::BatchIsEmpty);
};
let mailbox = first_item.mailbox.clone();

// We use the estimated gas limit from the prior call to
// `process_estimate_costs` to avoid a second gas estimation.
let outcome = mailbox.process_batch(&batch).await?;
let outcome = first_item.mailbox.process_batch(&batch).await?;
metrics.ops_submitted.inc_by(self.operations.len() as u64);
Ok(outcome)
}

async fn submit_serially(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) {
for op in self.operations.into_iter() {
submit_and_confirm_op(op, confirm_queue, metrics).await;
submit_single_operation(op, confirm_queue, metrics).await;
}
}
}
2 changes: 1 addition & 1 deletion rust/chains/hyperlane-ethereum/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct TransactionOverrides {
pub max_priority_fee_per_gas: Option<U256>,
}

/// Ethereum transaction overrides.
/// Config for batching messages
#[derive(Debug, Clone, Default)]
pub struct MessageBatchConfig {
/// Optional Multicall3 contract address
Expand Down
18 changes: 7 additions & 11 deletions rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::interfaces::arbitrum_node_interface::ArbitrumNodeInterface;
use crate::interfaces::i_mailbox::{
IMailbox as EthereumMailboxInternal, ProcessCall, IMAILBOX_ABI,
};
use crate::tx::{call_with_lag, fill_tx_gas_params, report_tx, GAS_ESTIMATE_BUFFER};
use crate::tx::{call_with_lag, fill_tx_gas_params, report_tx, BATCH_GAS_LIMIT};
use crate::{BuildableWithProvider, ConnectionConf, EthereumProvider, TransactionOverrides};

use super::multicall::{self, build_multicall};
Expand Down Expand Up @@ -380,13 +380,12 @@ where
.map_err(|e| HyperlaneEthereumError::MulticallError(e.to_string()))?;
let contract_call_futures = messages
.iter()
.map(|batch| async {
.map(|batch_item| async {
// move ownership of the batch inside the closure
let batch = batch.clone();
self.process_contract_call(
&batch.data,
&batch.submission_data.metadata,
Some(batch.submission_data.gas_limit),
&batch_item.data,
&batch_item.submission_data.metadata,
Some(batch_item.submission_data.gas_limit),
)
.await
})
Expand All @@ -396,12 +395,9 @@ where
.into_iter()
.collect::<ChainResult<Vec<_>>>()?;

let batch_call = multicall::batch::<_, ()>(&mut multicall, contract_calls).await?;
let batch_call = multicall::batch::<_, ()>(&mut multicall, contract_calls);
let call = self
.add_gas_overrides(
batch_call,
GAS_ESTIMATE_BUFFER.checked_mul(80).map(Into::into),
)
.add_gas_overrides(batch_call, Some(BATCH_GAS_LIMIT.into()))
.await?;

let receipt = report_tx(call).await?;
Expand Down
11 changes: 4 additions & 7 deletions rust/chains/hyperlane-ethereum/src/contracts/multicall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::sync::Arc;

use ethers::{abi::Detokenize, providers::Middleware};
use ethers_contract::{builders::ContractCall, Multicall, MulticallResult, MulticallVersion};
use hyperlane_core::{
utils::hex_or_base58_to_h256, ChainResult, HyperlaneDomain, HyperlaneProvider,
};
use hyperlane_core::{utils::hex_or_base58_to_h256, HyperlaneDomain, HyperlaneProvider};

use crate::{ConnectionConf, EthereumProvider};

Expand Down Expand Up @@ -36,17 +34,16 @@ pub async fn build_multicall<M: Middleware + 'static>(
Ok(multicall)
}

pub async fn batch<M: Middleware, D: Detokenize>(
pub fn batch<M: Middleware, D: Detokenize>(
multicall: &mut Multicall<M>,
calls: Vec<ContractCall<M, D>>,
) -> ChainResult<ContractCall<M, Vec<MulticallResult>>> {
) -> ContractCall<M, Vec<MulticallResult>> {
// clear any calls that were in the multicall beforehand
multicall.clear_calls();

calls.into_iter().for_each(|call| {
multicall.add_call(call, ALLOW_BATCH_FAILURES);
});

let res = multicall.as_aggregate_3_value();
Ok(res)
multicall.as_aggregate_3_value()
}
5 changes: 4 additions & 1 deletion rust/chains/hyperlane-ethereum/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use tracing::{error, info};
use crate::{Middleware, TransactionOverrides};

/// An amount of gas to add to the estimated gas
pub const GAS_ESTIMATE_BUFFER: u32 = 50000;
pub const GAS_ESTIMATE_BUFFER: u32 = 50_000;

/// The gas limit to temporarily hardcode when sending multicall operations
pub const BATCH_GAS_LIMIT: u32 = 4_000_000;

const PENDING_TRANSACTION_POLLING_INTERVAL: Duration = Duration::from_secs(2);

Expand Down
1 change: 0 additions & 1 deletion rust/hyperlane-core/src/traits/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub trait Mailbox: HyperlaneContract + Send + Sync + Debug {
&self,
_messages: &[BatchItem<HyperlaneMessage>],
) -> ChainResult<TxOutcome> {
tracing::warn!("In Mailbox process_batch, returning BatchingFailed");
// Batching is not supported by default
Err(ChainCommunicationError::BatchingFailed)
}
Expand Down
Loading

0 comments on commit 8712087

Please sign in to comment.