Skip to content

Commit

Permalink
feat: add the # of messages prioritized in the queue to the http resp… (
Browse files Browse the repository at this point in the history
#5043)

### Description

Add a mpsc channel between relayer and message retry handler, so message
retry handler knows the retry results from the relayer.

`POST /message_retry` endpoint now returns a JSON with details on the
retry request:

```rust
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct MessageRetryResponse {
    /// ID of the retry request
    pub uuid: String,
    /// how many pending operations were processed
    pub processed: usize,
    /// how many of the pending operations matched the retry request pattern
    pub matched: u64,
}
```

### Drive-by changes

<!--
Are there any minor or drive-by changes also included?
-->

### Related issues

Fixes #4059

### Backward compatibility

<!--
Are these changes backward compatible? Are there any infrastructure
implications, e.g. changes that would prohibit deploying older commits
using this infra tooling?

Yes/No
-->

### Testing

Updated unit tests to return a response so the handler is not blocked
  • Loading branch information
kamiyaa authored Jan 6, 2025
1 parent 99c91d8 commit c5d4bd6
Show file tree
Hide file tree
Showing 13 changed files with 403 additions and 86 deletions.
31 changes: 24 additions & 7 deletions rust/main/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ typetag = "0.2"
uint = "0.9.5"
ureq = { version = "2.4", default-features = false }
url = "2.3"
uuid = { version = "1.11.0", features = ["v4"] }
walkdir = "2"
warp = "0.3"
which = "4.3"
Expand Down
3 changes: 3 additions & 0 deletions rust/main/agents/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tokio-metrics.workspace = true
tracing-futures.workspace = true
tracing.workspace = true
typetag.workspace = true
uuid.workspace = true

hyperlane-core = { path = "../../hyperlane-core", features = [
"agent",
Expand All @@ -53,12 +54,14 @@ hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] }
hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" }

[dev-dependencies]
axum = { workspace = true, features = ["macros"] }
once_cell.workspace = true
mockall.workspace = true
tokio-test.workspace = true
hyperlane-test = { path = "../../hyperlane-test" }
hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] }
hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "async", "test-utils"] }
tracing-test.workspace = true

[features]
default = ["color-eyre", "oneline-errors"]
Expand Down
3 changes: 2 additions & 1 deletion rust/main/agents/relayer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ mod msg;
mod processor;
mod prover;
mod relayer;
mod server;
mod settings;

pub mod server;

pub use msg::GAS_EXPENDITURE_LOG_MESSAGE;
pub use relayer::*;
83 changes: 71 additions & 12 deletions rust/main/agents/relayer/src/msg/op_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use prometheus::{IntGauge, IntGaugeVec};
use tokio::sync::{broadcast::Receiver, Mutex};
use tracing::{debug, info, instrument};

use crate::settings::matching_list::MatchingList;
use crate::server::{MessageRetryRequest, MessageRetryResponse};

pub type OperationPriorityQueue = Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>>;

Expand All @@ -16,7 +16,7 @@ pub type OperationPriorityQueue = Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>
pub struct OpQueue {
metrics: IntGaugeVec,
queue_metrics_label: String,
retry_rx: Arc<Mutex<Receiver<MatchingList>>>,
retry_receiver: Arc<Mutex<Receiver<MessageRetryRequest>>>,
#[new(default)]
pub queue: OperationPriorityQueue,
}
Expand Down Expand Up @@ -72,27 +72,67 @@ impl OpQueue {
// The other consideration is whether to put the channel receiver in the OpQueue or in a dedicated task
// that also holds an Arc to the Mutex. For simplicity, we'll put it in the OpQueue for now.
let mut message_retry_requests = vec![];
while let Ok(message_id) = self.retry_rx.lock().await.try_recv() {
message_retry_requests.push(message_id);

while let Ok(retry_request) = self.retry_receiver.lock().await.try_recv() {
let uuid = retry_request.uuid.clone();
message_retry_requests.push((
retry_request,
MessageRetryResponse {
uuid,
evaluated: 0,
matched: 0,
},
));
}

if message_retry_requests.is_empty() {
return;
}

let mut queue = self.queue.lock().await;
let queue_length = queue.len();

let mut reprioritized_queue: BinaryHeap<_> = queue
.drain()
.map(|Reverse(mut op)| {
if message_retry_requests.iter().any(|r| r.op_matches(&op)) {
let matched_requests: Vec<_> = message_retry_requests
.iter_mut()
.filter_map(|(retry_req, retry_response)| {
// update retry metrics
if retry_req.pattern.op_matches(&op) {
debug!(uuid = retry_req.uuid, "Matched request");
retry_response.matched += 1;
Some(retry_req.uuid.clone())
} else {
None
}
})
.collect();

if !matched_requests.is_empty() {
info!(
operation = %op,
queue_label = %self.queue_metrics_label,
"Retrying OpQueue operation"
);
op.reset_attempts()
op.reset_attempts();
}
Reverse(op)
})
.collect();

for (retry_req, mut retry_response) in message_retry_requests {
retry_response.evaluated = queue_length;
tracing::debug!(
uuid = retry_response.uuid,
evaluated = retry_response.evaluated,
matched = retry_response.matched,
"Sending relayer retry response back"
);
if let Err(err) = retry_req.transmitter.send(retry_response).await {
tracing::error!(?err, "Failed to send retry response");
}
}
queue.append(&mut reprioritized_queue);
}

Expand All @@ -115,7 +155,10 @@ impl OpQueue {

#[cfg(test)]
pub mod test {
use crate::{server::ENDPOINT_MESSAGES_QUEUE_SIZE, settings::matching_list::MatchingList};

use super::*;

use hyperlane_core::{
HyperlaneDomain, HyperlaneDomainProtocol, HyperlaneDomainTechnicalStack,
HyperlaneDomainType, HyperlaneMessage, KnownHyperlaneDomain, PendingOperationResult,
Expand All @@ -127,7 +170,7 @@ pub mod test {
str::FromStr,
time::{Duration, Instant},
};
use tokio::sync;
use tokio::sync::{self, mpsc};

#[derive(Debug, Clone, Serialize)]
pub struct MockPendingOperation {
Expand Down Expand Up @@ -320,6 +363,7 @@ pub mod test {
async fn test_multiple_op_queues_message_id() {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);

let mut op_queue_1 = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Expand Down Expand Up @@ -364,12 +408,22 @@ pub mod test {
.await;
}

let (transmitter, _receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE);

// Retry by message ids
broadcaster
.send(MatchingList::with_message_id(op_ids[1]))
.send(MessageRetryRequest {
uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(),
pattern: MatchingList::with_message_id(op_ids[1]),
transmitter: transmitter.clone(),
})
.unwrap();
broadcaster
.send(MatchingList::with_message_id(op_ids[2]))
.send(MessageRetryRequest {
uuid: "59400966-e7fa-4fb9-9372-9a671d4392c3".to_string(),
pattern: MatchingList::with_message_id(op_ids[2]),
transmitter,
})
.unwrap();

// Pop elements from queue 1
Expand Down Expand Up @@ -399,6 +453,7 @@ pub mod test {
async fn test_destination_domain() {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);

let mut op_queue = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Expand All @@ -425,11 +480,15 @@ pub mod test {
.await;
}

let (transmitter, _receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE);

// Retry by domain
broadcaster
.send(MatchingList::with_destination_domain(
destination_domain_2.id(),
))
.send(MessageRetryRequest {
uuid: "a5b39473-7cc5-48a1-8bed-565454ba1037".to_string(),
pattern: MatchingList::with_destination_domain(destination_domain_2.id()),
transmitter,
})
.unwrap();

// Pop elements from queue
Expand Down
4 changes: 2 additions & 2 deletions rust/main/agents/relayer/src/msg/op_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use hyperlane_core::{
};

use crate::msg::pending_message::CONFIRM_DELAY;
use crate::settings::matching_list::MatchingList;
use crate::server::MessageRetryRequest;

use super::op_queue::OpQueue;
use super::op_queue::OperationPriorityQueue;
Expand Down Expand Up @@ -105,7 +105,7 @@ impl SerialSubmitter {
pub fn new(
domain: HyperlaneDomain,
rx: mpsc::UnboundedReceiver<QueueOperation>,
retry_op_transmitter: Sender<MatchingList>,
retry_op_transmitter: Sender<MessageRetryRequest>,
metrics: SerialSubmitterMetrics,
max_batch_size: u32,
task_monitor: TaskMonitor,
Expand Down
5 changes: 3 additions & 2 deletions rust/main/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,11 @@ impl BaseAgent for Relayer {
}));
tasks.push(console_server.instrument(info_span!("Tokio console server")));
}
let sender = BroadcastSender::<MatchingList>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
let sender = BroadcastSender::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
let mut prep_queues = HashMap::with_capacity(self.destination_chains.len());

for (dest_domain, dest_conf) in &self.destination_chains {
let (send_channel, receive_channel) = mpsc::unbounded_channel::<QueueOperation>();
send_channels.insert(dest_domain.id(), send_channel);
Expand Down Expand Up @@ -385,7 +386,7 @@ impl BaseAgent for Relayer {
);
}
// run server
let custom_routes = relayer_server::Server::new()
let custom_routes = relayer_server::Server::new(self.origin_chains.len())
.with_op_retry(sender.clone())
.with_message_queue(prep_queues)
.routes();
Expand Down
1 change: 1 addition & 0 deletions rust/main/agents/relayer/src/server/list_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ mod tests {
fn setup_test_server() -> (SocketAddr, OperationPriorityQueue) {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);

let op_queue = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Expand Down
Loading

0 comments on commit c5d4bd6

Please sign in to comment.