Skip to content

Commit

Permalink
feat: pop multiple ops from prep queue at once
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-savu committed Apr 30, 2024
1 parent a3b8e87 commit ed973db
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 39 deletions.
33 changes: 24 additions & 9 deletions rust/agents/relayer/src/msg/op_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,30 @@ impl OpQueue {
}

/// Pop an element from the queue and update metrics
pub async fn pop(&mut self) -> Option<Reverse<QueueOperation>> {
pub async fn pop(&mut self) -> Option<QueueOperation> {
let pop_attempt = self.pop_many(1).await;
pop_attempt.into_iter().next()
}

/// Pop multiple elements at once from the queue and update metrics
pub async fn pop_many(&mut self, limit: usize) -> Vec<QueueOperation> {
self.process_retry_requests().await;
let op = self.queue.lock().await.pop();
op.map(|op| {
let mut queue = self.queue.lock().await;
let mut popped = vec![];
while let Some(Reverse(op)) = queue.pop() {
// even if the metric is decremented here, the operation may fail to process and be re-added to the queue.
// in those cases, the queue length will decrease to zero until the operation is re-added.
self.get_operation_metric(op.0.as_ref()).dec();
op
})
self.get_operation_metric(op.as_ref()).dec();
popped.push(op);
if popped.len() >= limit {
break;
}
}
popped
}

pub async fn len(&self) -> usize {
self.queue.lock().await.len()
}

pub async fn process_retry_requests(&mut self) {
Expand Down Expand Up @@ -241,7 +256,7 @@ mod test {
// Pop elements from queue 1
let mut queue_1_popped = vec![];
while let Some(op) = op_queue_1.pop().await {
queue_1_popped.push(op.0);
queue_1_popped.push(op);
}

// The elements sent over the channel should be the first ones popped,
Expand All @@ -253,7 +268,7 @@ mod test {
// Pop elements from queue 2
let mut queue_2_popped = vec![];
while let Some(op) = op_queue_2.pop().await {
queue_2_popped.push(op.0);
queue_2_popped.push(op);
}

// The elements should be popped in the order they were pushed, because there was no retry request for them
Expand Down Expand Up @@ -300,7 +315,7 @@ mod test {
// Pop elements from queue
let mut popped = vec![];
while let Some(op) = op_queue.pop().await {
popped.push(op.0.id());
popped.push(op.id());
}

// First messages should be those to `destination_domain_2` - their exact order depends on
Expand Down
2 changes: 1 addition & 1 deletion rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl PendingOperation for PendingMessage {
debug!("Getting submission_data");
let state = self
.submission_data
.clone()
.take()
.expect("Pending message must be prepared before it can be submitted");

// We use the estimated gas limit from the prior call to
Expand Down
54 changes: 28 additions & 26 deletions rust/agents/relayer/src/msg/serial_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ pub struct SerialSubmitter {
retry_rx: MpmcReceiver<MessageRetryRequest>,
/// Metrics for serial submitter.
metrics: SerialSubmitterMetrics,
/// Batch size for submitting messages
batch_size: Option<u32>,
/// Max batch size for submitting messages
max_batch_size: u32,
}

impl SerialSubmitter {
Expand All @@ -96,7 +96,7 @@ impl SerialSubmitter {
metrics,
rx: rx_prepare,
retry_rx,
batch_size,
max_batch_size,
} = self;
let prepare_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
Expand All @@ -109,11 +109,9 @@ impl SerialSubmitter {
retry_rx,
);

// This is a channel because we want to only have a small number of messages
// sitting ready to go at a time and this acts as a synchronization tool
// to slow down the preparation of messages when the submitter gets
// behind.
let (tx_submit, rx_submit) = mpsc::channel(batch_size.map(|b| b * 3).unwrap_or(1) as usize);
// This is channel acts as a buffer to avoid holding too many prepared messages in memory.
// Use double the max batch size to increase chances of having full batches
let (tx_submit, rx_submit) = mpsc::channel((max_batch_size * 2) as usize);

let tasks = [
spawn(receive_task(
Expand All @@ -127,14 +125,13 @@ impl SerialSubmitter {
confirm_queue.clone(),
tx_submit,
metrics.clone(),
batch_size,
)),
spawn(submit_task(
domain.clone(),
rx_submit,
confirm_queue.clone(),
metrics.clone(),
batch_size,
max_batch_size,
)),
spawn(confirm_task(
domain.clone(),
Expand Down Expand Up @@ -177,23 +174,15 @@ async fn prepare_task(
confirm_queue: OpQueue,
tx_submit: mpsc::Sender<QueueOperation>,
metrics: SerialSubmitterMetrics,
batch_size: Option<u32>,
) {
let batch_size = batch_size.unwrap_or(1);
// Pop as many ops as we can then send to the channel
let ops_to_prepare = tx_submit.capacity();
loop {
// Pop messages here according to the configured batch.
let mut batch = vec![];
for _ in 0..batch_size {
let next = prepare_queue.pop().await;
if let Some(Reverse(op)) = next {
batch.push(op);
} else {
break;
}
}
let mut batch = prepare_queue.pop_many(ops_to_prepare).await;
if batch.is_empty() {
// queue is empty so give some time before checking again to prevent burning CPU
sleep(Duration::from_millis(200)).await;
sleep(Duration::from_millis(100)).await;
continue;
}
let mut task_prep_futures = vec![];
Expand All @@ -204,6 +193,16 @@ async fn prepare_task(
task_prep_futures.push(op.prepare());
}
let res = join_all(task_prep_futures).await;
let not_ready_count = res
.iter()
.filter(|r| {
matches!(
r,
PendingOperationResult::NotReady | PendingOperationResult::Reprepare
)
})
.count();
let batch_len = batch.len();
for (op, prepare_result) in batch.into_iter().zip(res.into_iter()) {
match prepare_result {
PendingOperationResult::Success => {
Expand All @@ -217,7 +216,6 @@ async fn prepare_task(
PendingOperationResult::NotReady => {
// none of the operations are ready yet, so wait for a little bit
prepare_queue.push(op).await;
sleep(Duration::from_millis(200)).await;
}
PendingOperationResult::Reprepare => {
metrics.ops_failed.inc();
Expand All @@ -232,6 +230,10 @@ async fn prepare_task(
}
}
}
if not_ready_count == batch_len {
// none of the operations are ready yet, so wait for a little bit
sleep(Duration::from_millis(500)).await;
}
}
}

Expand All @@ -241,9 +243,9 @@ async fn submit_task(
mut rx_submit: mpsc::Receiver<QueueOperation>,
mut confirm_queue: OpQueue,
metrics: SerialSubmitterMetrics,
batch_size: Option<u32>,
batch_size: u32,
) {
let recv_limit = batch_size.unwrap_or(1) as usize;
let recv_limit = batch_size as usize;
// looping is not an issue because `recv_many` will sleep if the channel is empty
loop {
let mut recv_buffer = vec![];
Expand Down Expand Up @@ -290,7 +292,7 @@ async fn confirm_task(
) {
loop {
// Pick the next message to try confirming.
let Some(Reverse(mut op)) = confirm_queue.pop().await else {
let Some(mut op) = confirm_queue.pop().await else {
sleep(Duration::from_secs(5)).await;
continue;
};
Expand Down
6 changes: 4 additions & 2 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,12 @@ impl BaseAgent for Relayer {
dest_domain,
receive_channel,
mpmc_channel.receiver(),
// Default to submitting one message at a time if there is no batch config
self.core.settings.chains[dest_domain.name()]
.connection
.message_batch_config()
.map(|c| c.max_batch_size),
.map(|c| c.max_batch_size)
.unwrap_or(1),
),
);

Expand Down Expand Up @@ -454,7 +456,7 @@ impl Relayer {
destination: &HyperlaneDomain,
receiver: UnboundedReceiver<QueueOperation>,
retry_receiver_channel: MpmcReceiver<MessageRetryRequest>,
batch_size: Option<u32>,
batch_size: u32,
) -> Instrumented<JoinHandle<()>> {
let serial_submitter = SerialSubmitter::new(
destination.clone(),
Expand Down
4 changes: 3 additions & 1 deletion rust/utils/run-locally/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,21 @@ fn main() -> ExitCode {
"CHAINS_TEST1_BATCHCONTRACTADDRESS",
multicall_address_string.clone(),
)
.hyp_env("CHAINS_TEST1_MAXBATCHSIZE", "3")
.hyp_env("CHAINS_TEST1_MAXBATCHSIZE", "5")
// by setting this as a quorum provider we will cause nonce errors when delivering to test2
// because the message will be sent to the node 3 times.
.hyp_env("CHAINS_TEST2_RPCCONSENSUSTYPE", "quorum")
.hyp_env(
"CHAINS_TEST2_BATCHCONTRACTADDRESS",
multicall_address_string.clone(),
)
.hyp_env("CHAINS_TEST2_MAXBATCHSIZE", "5")
.hyp_env("CHAINS_TEST3_CONNECTION_URL", "http://127.0.0.1:8545")
.hyp_env(
"CHAINS_TEST3_BATCHCONTRACTADDRESS",
multicall_address_string,
)
.hyp_env("CHAINS_TEST3_MAXBATCHSIZE", "5")
.hyp_env("METRICSPORT", "9092")
.hyp_env("DB", relayer_db.to_str().unwrap())
.hyp_env("CHAINS_TEST1_SIGNER_KEY", RELAYER_KEYS[0])
Expand Down

0 comments on commit ed973db

Please sign in to comment.