From f5fcafab57ab140efb5c2b363faafbc768e84242 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Wed, 8 Jan 2025 18:28:27 +0000 Subject: [PATCH] =?UTF-8?q?Revert=20"feat:=20add=20the=20#=20of=20messages?= =?UTF-8?q?=20prioritized=20in=20the=20queue=20to=20the=20http=20resp?= =?UTF-8?q?=E2=80=A6=20(#5043)"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit c5d4bd60384062ea66f3cf23ca2df952b0b6d5b0. --- rust/main/Cargo.lock | 31 +- rust/main/Cargo.toml | 1 - rust/main/agents/relayer/Cargo.toml | 3 - rust/main/agents/relayer/src/lib.rs | 3 +- rust/main/agents/relayer/src/msg/op_queue.rs | 83 +---- .../agents/relayer/src/msg/op_submitter.rs | 4 +- rust/main/agents/relayer/src/relayer.rs | 5 +- .../relayer/src/server/list_messages.rs | 1 - .../relayer/src/server/message_retry.rs | 286 ++++-------------- rust/main/agents/relayer/src/server/mod.rs | 11 +- rust/main/utils/run-locally/Cargo.toml | 1 - rust/main/utils/run-locally/src/main.rs | 5 - rust/main/utils/run-locally/src/server.rs | 55 ---- 13 files changed, 86 insertions(+), 403 deletions(-) delete mode 100644 rust/main/utils/run-locally/src/server.rs diff --git a/rust/main/Cargo.lock b/rust/main/Cargo.lock index 47aec03422..4f728ab69f 100644 --- a/rust/main/Cargo.lock +++ b/rust/main/Cargo.lock @@ -512,7 +512,6 @@ checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", - "axum-macros", "bitflags 1.3.2", "bytes", "futures-util", @@ -554,18 +553,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "axum-macros" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" -dependencies = [ - "heck 0.4.1", - "proc-macro2 1.0.86", - "quote 1.0.37", - "syn 2.0.77", -] - [[package]] name = "backtrace" version = "0.3.71" @@ -7028,9 +7015,7 @@ dependencies = [ "tokio-test", "tracing", "tracing-futures", - "tracing-test", "typetag", - "uuid 1.11.0", ] [[package]] @@ -7165,7 +7150,7 @@ dependencies = [ "rkyv_derive", "seahash", "tinyvec", - "uuid 1.11.0", + "uuid 1.10.0", ] [[package]] @@ -7265,7 +7250,6 @@ dependencies = [ "once_cell", "regex", "relayer", - "reqwest", "ripemd", "serde", "serde_json", @@ -7765,7 +7749,7 @@ dependencies = [ "time", "tracing", "url", - "uuid 1.11.0", + "uuid 1.10.0", ] [[package]] @@ -7826,7 +7810,7 @@ dependencies = [ "sea-query-derive", "serde_json", "time", - "uuid 1.11.0", + "uuid 1.10.0", ] [[package]] @@ -7842,7 +7826,7 @@ dependencies = [ "serde_json", "sqlx", "time", - "uuid 1.11.0", + "uuid 1.10.0", ] [[package]] @@ -9142,7 +9126,7 @@ dependencies = [ "time", "tokio-stream", "url", - "uuid 1.11.0", + "uuid 1.10.0", "whoami", ] @@ -10379,11 +10363,10 @@ dependencies = [ [[package]] name = "uuid" -version = "1.11.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" dependencies = [ - "getrandom 0.2.15", "serde", ] diff --git a/rust/main/Cargo.toml b/rust/main/Cargo.toml index d9fee1663f..8b1a9b58e0 100644 --- a/rust/main/Cargo.toml +++ b/rust/main/Cargo.toml @@ -153,7 +153,6 @@ typetag = "0.2" uint = "0.9.5" ureq = { version = "2.4", default-features = false } url = "2.3" -uuid = { version = "1.11.0", features = ["v4"] } walkdir = "2" warp = "0.3" which = "4.3" diff --git a/rust/main/agents/relayer/Cargo.toml b/rust/main/agents/relayer/Cargo.toml index fcd89da771..5a891d912c 100644 --- a/rust/main/agents/relayer/Cargo.toml +++ b/rust/main/agents/relayer/Cargo.toml @@ -44,7 +44,6 @@ tokio-metrics.workspace = true tracing-futures.workspace = true tracing.workspace = true typetag.workspace = true -uuid.workspace = true hyperlane-core = { path = "../../hyperlane-core", features = [ "agent", @@ -54,14 +53,12 @@ hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] } hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" } [dev-dependencies] -axum = { workspace = true, features = ["macros"] } once_cell.workspace = true mockall.workspace = true tokio-test.workspace = true hyperlane-test = { path = "../../hyperlane-test" } hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] } hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "async", "test-utils"] } -tracing-test.workspace = true [features] default = ["color-eyre", "oneline-errors"] diff --git a/rust/main/agents/relayer/src/lib.rs b/rust/main/agents/relayer/src/lib.rs index 9a6e1e4147..62b896d628 100644 --- a/rust/main/agents/relayer/src/lib.rs +++ b/rust/main/agents/relayer/src/lib.rs @@ -3,9 +3,8 @@ mod msg; mod processor; mod prover; mod relayer; +mod server; mod settings; -pub mod server; - pub use msg::GAS_EXPENDITURE_LOG_MESSAGE; pub use relayer::*; diff --git a/rust/main/agents/relayer/src/msg/op_queue.rs b/rust/main/agents/relayer/src/msg/op_queue.rs index d0640bc2a3..99c9dde39f 100644 --- a/rust/main/agents/relayer/src/msg/op_queue.rs +++ b/rust/main/agents/relayer/src/msg/op_queue.rs @@ -6,7 +6,7 @@ use prometheus::{IntGauge, IntGaugeVec}; use tokio::sync::{broadcast::Receiver, Mutex}; use tracing::{debug, info, instrument}; -use crate::server::{MessageRetryRequest, MessageRetryResponse}; +use crate::settings::matching_list::MatchingList; pub type OperationPriorityQueue = Arc>>>; @@ -16,7 +16,7 @@ pub type OperationPriorityQueue = Arc>> pub struct OpQueue { metrics: IntGaugeVec, queue_metrics_label: String, - retry_receiver: Arc>>, + retry_rx: Arc>>, #[new(default)] pub queue: OperationPriorityQueue, } @@ -72,67 +72,27 @@ impl OpQueue { // The other consideration is whether to put the channel receiver in the OpQueue or in a dedicated task // that also holds an Arc to the Mutex. For simplicity, we'll put it in the OpQueue for now. let mut message_retry_requests = vec![]; - - while let Ok(retry_request) = self.retry_receiver.lock().await.try_recv() { - let uuid = retry_request.uuid.clone(); - message_retry_requests.push(( - retry_request, - MessageRetryResponse { - uuid, - evaluated: 0, - matched: 0, - }, - )); + while let Ok(message_id) = self.retry_rx.lock().await.try_recv() { + message_retry_requests.push(message_id); } - if message_retry_requests.is_empty() { return; } - let mut queue = self.queue.lock().await; - let queue_length = queue.len(); - let mut reprioritized_queue: BinaryHeap<_> = queue .drain() .map(|Reverse(mut op)| { - let matched_requests: Vec<_> = message_retry_requests - .iter_mut() - .filter_map(|(retry_req, retry_response)| { - // update retry metrics - if retry_req.pattern.op_matches(&op) { - debug!(uuid = retry_req.uuid, "Matched request"); - retry_response.matched += 1; - Some(retry_req.uuid.clone()) - } else { - None - } - }) - .collect(); - - if !matched_requests.is_empty() { + if message_retry_requests.iter().any(|r| r.op_matches(&op)) { info!( operation = %op, queue_label = %self.queue_metrics_label, "Retrying OpQueue operation" ); - op.reset_attempts(); + op.reset_attempts() } Reverse(op) }) .collect(); - - for (retry_req, mut retry_response) in message_retry_requests { - retry_response.evaluated = queue_length; - tracing::debug!( - uuid = retry_response.uuid, - evaluated = retry_response.evaluated, - matched = retry_response.matched, - "Sending relayer retry response back" - ); - if let Err(err) = retry_req.transmitter.send(retry_response).await { - tracing::error!(?err, "Failed to send retry response"); - } - } queue.append(&mut reprioritized_queue); } @@ -155,10 +115,7 @@ impl OpQueue { #[cfg(test)] pub mod test { - use crate::{server::ENDPOINT_MESSAGES_QUEUE_SIZE, settings::matching_list::MatchingList}; - use super::*; - use hyperlane_core::{ HyperlaneDomain, HyperlaneDomainProtocol, HyperlaneDomainTechnicalStack, HyperlaneDomainType, HyperlaneMessage, KnownHyperlaneDomain, PendingOperationResult, @@ -170,7 +127,7 @@ pub mod test { str::FromStr, time::{Duration, Instant}, }; - use tokio::sync::{self, mpsc}; + use tokio::sync; #[derive(Debug, Clone, Serialize)] pub struct MockPendingOperation { @@ -363,7 +320,6 @@ pub mod test { async fn test_multiple_op_queues_message_id() { let (metrics, queue_metrics_label) = dummy_metrics_and_label(); let broadcaster = sync::broadcast::Sender::new(100); - let mut op_queue_1 = OpQueue::new( metrics.clone(), queue_metrics_label.clone(), @@ -408,22 +364,12 @@ pub mod test { .await; } - let (transmitter, _receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE); - // Retry by message ids broadcaster - .send(MessageRetryRequest { - uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(), - pattern: MatchingList::with_message_id(op_ids[1]), - transmitter: transmitter.clone(), - }) + .send(MatchingList::with_message_id(op_ids[1])) .unwrap(); broadcaster - .send(MessageRetryRequest { - uuid: "59400966-e7fa-4fb9-9372-9a671d4392c3".to_string(), - pattern: MatchingList::with_message_id(op_ids[2]), - transmitter, - }) + .send(MatchingList::with_message_id(op_ids[2])) .unwrap(); // Pop elements from queue 1 @@ -453,7 +399,6 @@ pub mod test { async fn test_destination_domain() { let (metrics, queue_metrics_label) = dummy_metrics_and_label(); let broadcaster = sync::broadcast::Sender::new(100); - let mut op_queue = OpQueue::new( metrics.clone(), queue_metrics_label.clone(), @@ -480,15 +425,11 @@ pub mod test { .await; } - let (transmitter, _receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE); - // Retry by domain broadcaster - .send(MessageRetryRequest { - uuid: "a5b39473-7cc5-48a1-8bed-565454ba1037".to_string(), - pattern: MatchingList::with_destination_domain(destination_domain_2.id()), - transmitter, - }) + .send(MatchingList::with_destination_domain( + destination_domain_2.id(), + )) .unwrap(); // Pop elements from queue diff --git a/rust/main/agents/relayer/src/msg/op_submitter.rs b/rust/main/agents/relayer/src/msg/op_submitter.rs index f35a991c45..c1e295a24a 100644 --- a/rust/main/agents/relayer/src/msg/op_submitter.rs +++ b/rust/main/agents/relayer/src/msg/op_submitter.rs @@ -32,7 +32,7 @@ use hyperlane_core::{ }; use crate::msg::pending_message::CONFIRM_DELAY; -use crate::server::MessageRetryRequest; +use crate::settings::matching_list::MatchingList; use super::op_queue::OpQueue; use super::op_queue::OperationPriorityQueue; @@ -105,7 +105,7 @@ impl SerialSubmitter { pub fn new( domain: HyperlaneDomain, rx: mpsc::UnboundedReceiver, - retry_op_transmitter: Sender, + retry_op_transmitter: Sender, metrics: SerialSubmitterMetrics, max_batch_size: u32, task_monitor: TaskMonitor, diff --git a/rust/main/agents/relayer/src/relayer.rs b/rust/main/agents/relayer/src/relayer.rs index 4c3d8d33bb..b1f013b6ae 100644 --- a/rust/main/agents/relayer/src/relayer.rs +++ b/rust/main/agents/relayer/src/relayer.rs @@ -318,11 +318,10 @@ impl BaseAgent for Relayer { })); tasks.push(console_server.instrument(info_span!("Tokio console server"))); } - let sender = BroadcastSender::new(ENDPOINT_MESSAGES_QUEUE_SIZE); + let sender = BroadcastSender::::new(ENDPOINT_MESSAGES_QUEUE_SIZE); // send channels by destination chain let mut send_channels = HashMap::with_capacity(self.destination_chains.len()); let mut prep_queues = HashMap::with_capacity(self.destination_chains.len()); - for (dest_domain, dest_conf) in &self.destination_chains { let (send_channel, receive_channel) = mpsc::unbounded_channel::(); send_channels.insert(dest_domain.id(), send_channel); @@ -386,7 +385,7 @@ impl BaseAgent for Relayer { ); } // run server - let custom_routes = relayer_server::Server::new(self.origin_chains.len()) + let custom_routes = relayer_server::Server::new() .with_op_retry(sender.clone()) .with_message_queue(prep_queues) .routes(); diff --git a/rust/main/agents/relayer/src/server/list_messages.rs b/rust/main/agents/relayer/src/server/list_messages.rs index f6c92ba088..e21f39a5df 100644 --- a/rust/main/agents/relayer/src/server/list_messages.rs +++ b/rust/main/agents/relayer/src/server/list_messages.rs @@ -97,7 +97,6 @@ mod tests { fn setup_test_server() -> (SocketAddr, OperationPriorityQueue) { let (metrics, queue_metrics_label) = dummy_metrics_and_label(); let broadcaster = sync::broadcast::Sender::new(100); - let op_queue = OpQueue::new( metrics.clone(), queue_metrics_label.clone(), diff --git a/rust/main/agents/relayer/src/server/message_retry.rs b/rust/main/agents/relayer/src/server/message_retry.rs index 255614b43e..6d160355a5 100644 --- a/rust/main/agents/relayer/src/server/message_retry.rs +++ b/rust/main/agents/relayer/src/server/message_retry.rs @@ -1,89 +1,32 @@ use crate::settings::matching_list::MatchingList; - use axum::{extract::State, routing, Json, Router}; - use derive_new::new; -use serde::{Deserialize, Serialize}; -use tokio::sync::{broadcast::Sender, mpsc}; +use tokio::sync::broadcast::Sender; const MESSAGE_RETRY_API_BASE: &str = "/message_retry"; -#[derive(Clone, Debug, new)] +#[derive(new, Clone)] pub struct MessageRetryApi { - retry_request_transmitter: Sender, - relayer_chains: usize, -} - -#[derive(Clone, Debug)] -pub struct MessageRetryRequest { - pub uuid: String, - pub pattern: MatchingList, - pub transmitter: mpsc::Sender, -} - -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] -pub struct MessageRetryResponse { - /// ID of the retry request - pub uuid: String, - /// how many pending operations were evaluated - pub evaluated: usize, - /// how many of the pending operations matched the retry request pattern - pub matched: u64, + tx: Sender, } async fn retry_message( - State(state): State, + State(tx): State>, Json(retry_req_payload): Json, -) -> Result, String> { - let uuid = uuid::Uuid::new_v4(); - let uuid_string = uuid.to_string(); - - tracing::debug!(uuid = uuid_string, "Sending message retry request"); - - // This channel is only created to service this single - // retry request so we're expecting a single response - // from each transmitter end, hence we are using a channel of size 1 - let (transmitter, mut receiver) = mpsc::channel(state.relayer_chains); - state - .retry_request_transmitter - .send(MessageRetryRequest { - uuid: uuid_string.clone(), - pattern: retry_req_payload, - transmitter, - }) - .map_err(|err| { - // Technically it's bad practice to print the error message to the user, but - // this endpoint is for debugging purposes only. - format!("Failed to send retry request to the queue: {}", err) - })?; - - let mut resp = MessageRetryResponse { - uuid: uuid_string, - evaluated: 0, - matched: 0, - }; - - // Wait for responses from relayer - tracing::debug!(uuid = resp.uuid, "Waiting for response from relayer"); - while let Some(relayer_resp) = receiver.recv().await { - tracing::debug!( - uuid = resp.uuid, - evaluated = resp.evaluated, - matched = resp.matched, - "Submitter response to retry request" - ); - resp.evaluated += relayer_resp.evaluated; - resp.matched += relayer_resp.matched; +) -> String { + match tx.send(retry_req_payload) { + Ok(_) => "Moved message(s) to the front of the queue".to_string(), + // Technically it's bad practice to print the error message to the user, but + // this endpoint is for debugging purposes only. + Err(err) => format!("Failed to send retry request to the queue: {}", err), } - - Ok(Json(resp)) } impl MessageRetryApi { pub fn router(&self) -> Router { Router::new() .route("/", routing::post(retry_message)) - .with_state(self.clone()) + .with_state(self.tx.clone()) } pub fn get_route(&self) -> (&'static str, Router) { @@ -98,21 +41,13 @@ mod tests { use super::*; use axum::http::StatusCode; use hyperlane_core::{HyperlaneMessage, QueueOperation}; - use serde::de::DeserializeOwned; use serde_json::json; use std::net::SocketAddr; use tokio::sync::broadcast::{Receiver, Sender}; - #[derive(Debug)] - struct TestServerSetup { - pub socket_address: SocketAddr, - pub retry_req_rx: Receiver, - } - - fn setup_test_server() -> TestServerSetup { - let broadcast_tx = Sender::new(ENDPOINT_MESSAGES_QUEUE_SIZE); - - let message_retry_api = MessageRetryApi::new(broadcast_tx.clone(), 10); + fn setup_test_server() -> (SocketAddr, Receiver) { + let broadcast_tx = Sender::::new(ENDPOINT_MESSAGES_QUEUE_SIZE); + let message_retry_api = MessageRetryApi::new(broadcast_tx.clone()); let (path, retry_router) = message_retry_api.get_route(); let app = Router::new().nest(path, retry_router); @@ -123,51 +58,12 @@ mod tests { let addr = server.local_addr(); tokio::spawn(server); - let retry_req_rx = broadcast_tx.subscribe(); - - TestServerSetup { - socket_address: addr, - retry_req_rx, - } - } - - async fn send_retry_responses_future( - mut retry_request_receiver: Receiver, - pending_operations: Vec, - metrics: Vec<(usize, u64)>, - ) { - if let Ok(req) = retry_request_receiver.recv().await { - for (op, (evaluated, matched)) in pending_operations.iter().zip(metrics) { - // Check that the list received by the server matches the pending operation - assert!(req.pattern.op_matches(&op)); - let resp = MessageRetryResponse { - uuid: req.uuid.clone(), - evaluated, - matched, - }; - req.transmitter.send(resp).await.unwrap(); - } - } + (addr, broadcast_tx.subscribe()) } - async fn parse_response_to_json(response: reqwest::Response) -> T { - let resp_body = response - .text() - .await - .expect("Failed to parse response body"); - let resp_json: T = - serde_json::from_str(&resp_body).expect("Failed to deserialize response body"); - resp_json - } - - #[tracing_test::traced_test] #[tokio::test] async fn test_message_id_retry() { - let TestServerSetup { - socket_address: addr, - retry_req_rx, - .. - } = setup_test_server(); + let (addr, mut rx) = setup_test_server(); let client = reqwest::Client::new(); // Create a random message with a random message ID @@ -179,37 +75,25 @@ mod tests { } ]); - // spawn a task to respond to message retry request - let respond_task = send_retry_responses_future( - retry_req_rx, - vec![Box::new(pending_operation.clone()) as QueueOperation], - vec![(1, 1)], - ); - // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send(); - - let (_t1, response_res) = tokio::join!(respond_task, response); + .send() + .await + .unwrap(); - let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let resp_json: MessageRetryResponse = parse_response_to_json(response).await; - assert_eq!(resp_json.evaluated, 1); - assert_eq!(resp_json.matched, 1); + let list = rx.try_recv().unwrap(); + // Check that the list received by the server matches the pending operation + assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); } #[tokio::test] async fn test_destination_domain_retry() { - let TestServerSetup { - socket_address: addr, - retry_req_rx, - .. - } = setup_test_server(); + let (addr, mut rx) = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage { @@ -224,37 +108,25 @@ mod tests { } ]); - // spawn a task to respond to message retry request - let respond_task = send_retry_responses_future( - retry_req_rx, - vec![Box::new(pending_operation.clone()) as QueueOperation], - vec![(1, 1)], - ); - // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send(); - - let (_t1, response_res) = tokio::join!(respond_task, response); + .send() + .await + .unwrap(); - let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let resp_json: MessageRetryResponse = parse_response_to_json(response).await; - assert_eq!(resp_json.evaluated, 1); - assert_eq!(resp_json.matched, 1); + let list = rx.try_recv().unwrap(); + // Check that the list received by the server matches the pending operation + assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); } #[tokio::test] async fn test_origin_domain_retry() { - let TestServerSetup { - socket_address: addr, - retry_req_rx, - .. - } = setup_test_server(); + let (addr, mut rx) = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage { @@ -269,37 +141,25 @@ mod tests { } ]); - // spawn a task to respond to message retry request - let respond_task = send_retry_responses_future( - retry_req_rx, - vec![Box::new(pending_operation.clone()) as QueueOperation], - vec![(1, 1)], - ); - // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send(); - - let (_t1, response_res) = tokio::join!(respond_task, response); + .send() + .await + .unwrap(); - let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let resp_json: MessageRetryResponse = parse_response_to_json(response).await; - assert_eq!(resp_json.evaluated, 1); - assert_eq!(resp_json.matched, 1); + let list = rx.try_recv().unwrap(); + // Check that the list received by the server matches the pending operation + assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); } #[tokio::test] async fn test_sender_address_retry() { - let TestServerSetup { - socket_address: addr, - retry_req_rx, - .. - } = setup_test_server(); + let (addr, mut rx) = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage::default(); @@ -310,37 +170,25 @@ mod tests { } ]); - // spawn a task to respond to message retry request - let respond_task = send_retry_responses_future( - retry_req_rx, - vec![Box::new(pending_operation.clone()) as QueueOperation], - vec![(1, 1)], - ); - // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send(); - - let (_t1, response_res) = tokio::join!(respond_task, response); + .send() + .await + .unwrap(); - let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let resp_json: MessageRetryResponse = parse_response_to_json(response).await; - assert_eq!(resp_json.evaluated, 1); - assert_eq!(resp_json.matched, 1); + let list = rx.try_recv().unwrap(); + // Check that the list received by the server matches the pending operation + assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); } #[tokio::test] async fn test_recipient_address_retry() { - let TestServerSetup { - socket_address: addr, - retry_req_rx, - .. - } = setup_test_server(); + let (addr, mut rx) = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage::default(); @@ -351,37 +199,25 @@ mod tests { } ]); - // spawn a task to respond to message retry request - let respond_task = send_retry_responses_future( - retry_req_rx, - vec![Box::new(pending_operation.clone()) as QueueOperation], - vec![(1, 1)], - ); - // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send(); - - let (_t1, response_res) = tokio::join!(respond_task, response); + .send() + .await + .unwrap(); - let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let resp_json: MessageRetryResponse = parse_response_to_json(response).await; - assert_eq!(resp_json.evaluated, 1); - assert_eq!(resp_json.matched, 1); + let list = rx.try_recv().unwrap(); + // Check that the list received by the server matches the pending operation + assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); } #[tokio::test] async fn test_multiple_retry() { - let TestServerSetup { - socket_address: addr, - retry_req_rx, - .. - } = setup_test_server(); + let (addr, mut rx) = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage { @@ -402,27 +238,19 @@ mod tests { } ]); - // spawn a task to respond to message retry request - let respond_task = send_retry_responses_future( - retry_req_rx, - vec![Box::new(pending_operation.clone()) as QueueOperation], - vec![(1, 1)], - ); - // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send(); - - let (_t1, response_res) = tokio::join!(respond_task, response); + .send() + .await + .unwrap(); - let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let resp_json: MessageRetryResponse = parse_response_to_json(response).await; - assert_eq!(resp_json.evaluated, 1); - assert_eq!(resp_json.matched, 1); + let list = rx.try_recv().unwrap(); + // Check that the list received by the server matches the pending operation + assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); } } diff --git a/rust/main/agents/relayer/src/server/mod.rs b/rust/main/agents/relayer/src/server/mod.rs index 8cc49a3f25..083f8d94d2 100644 --- a/rust/main/agents/relayer/src/server/mod.rs +++ b/rust/main/agents/relayer/src/server/mod.rs @@ -3,7 +3,7 @@ use derive_new::new; use std::collections::HashMap; use tokio::sync::broadcast::Sender; -use crate::msg::op_queue::OperationPriorityQueue; +use crate::{msg::op_queue::OperationPriorityQueue, settings::matching_list::MatchingList}; pub const ENDPOINT_MESSAGES_QUEUE_SIZE: usize = 100; @@ -15,15 +15,14 @@ mod message_retry; #[derive(new)] pub struct Server { - relayer_chains: usize, #[new(default)] - retry_transmitter: Option>, + retry_transmitter: Option>, #[new(default)] op_queues: Option>, } impl Server { - pub fn with_op_retry(mut self, transmitter: Sender) -> Self { + pub fn with_op_retry(mut self, transmitter: Sender) -> Self { self.retry_transmitter = Some(transmitter); self } @@ -37,8 +36,8 @@ impl Server { /// Can be extended with additional routes and feature flags to enable/disable individually. pub fn routes(self) -> Vec<(&'static str, Router)> { let mut routes = vec![]; - if let Some(tx) = self.retry_transmitter { - routes.push(MessageRetryApi::new(tx, self.relayer_chains).get_route()); + if let Some(retry_transmitter) = self.retry_transmitter { + routes.push(MessageRetryApi::new(retry_transmitter).get_route()); } if let Some(op_queues) = self.op_queues { routes.push(ListOperationsApi::new(op_queues).get_route()); diff --git a/rust/main/utils/run-locally/Cargo.toml b/rust/main/utils/run-locally/Cargo.toml index a994324687..9dedae9cea 100644 --- a/rust/main/utils/run-locally/Cargo.toml +++ b/rust/main/utils/run-locally/Cargo.toml @@ -14,7 +14,6 @@ hyperlane-cosmos = { path = "../../chains/hyperlane-cosmos" } toml_edit.workspace = true k256.workspace = true jobserver.workspace = true -reqwest.workspace = true ripemd.workspace = true sha2.workspace = true serde.workspace = true diff --git a/rust/main/utils/run-locally/src/main.rs b/rust/main/utils/run-locally/src/main.rs index 8855a08d86..4686c15446 100644 --- a/rust/main/utils/run-locally/src/main.rs +++ b/rust/main/utils/run-locally/src/main.rs @@ -51,7 +51,6 @@ mod invariants; mod logging; mod metrics; mod program; -mod server; mod solana; mod utils; @@ -484,10 +483,6 @@ fn main() -> ExitCode { // give things a chance to fully start. sleep(Duration::from_secs(10)); - // test retry request - let resp = server::run_retry_request().expect("Failed to process retry request"); - assert!(resp.matched > 0); - if !post_startup_invariants(&checkpoints_dirs) { log!("Failure: Post startup invariants are not met"); return report_test_result(true); diff --git a/rust/main/utils/run-locally/src/server.rs b/rust/main/utils/run-locally/src/server.rs deleted file mode 100644 index 4df7df78f0..0000000000 --- a/rust/main/utils/run-locally/src/server.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::io; - -use reqwest::Url; - -use relayer::server::MessageRetryResponse; - -use crate::RELAYER_METRICS_PORT; - -/// create tokio runtime to send a retry request to -/// relayer to retry all existing messages in the queues -pub fn run_retry_request() -> io::Result { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build(); - runtime - .unwrap() - .block_on(async { call_retry_request().await }) -} - -/// sends a request to relayer to retry all existing messages -/// in the queues -async fn call_retry_request() -> io::Result { - let client = reqwest::Client::new(); - - let url = Url::parse(&format!( - "http://0.0.0.0:{RELAYER_METRICS_PORT}/message_retry" - )) - .map_err(|err| { - eprintln!("Failed to parse url: {err}"); - io::Error::new(io::ErrorKind::InvalidInput, err.to_string()) - })?; - - let body = vec![serde_json::json!({ - "message_id": "*" - })]; - let retry_response = client.post(url).json(&body).send().await.map_err(|err| { - eprintln!("Failed to send request: {err}"); - io::Error::new(io::ErrorKind::InvalidData, err.to_string()) - })?; - - let response_text = retry_response.text().await.map_err(|err| { - eprintln!("Failed to parse response body: {err}"); - io::Error::new(io::ErrorKind::InvalidData, err.to_string()) - })?; - - println!("Retry Request Response: {:?}", response_text); - - let response_json: MessageRetryResponse = - serde_json::from_str(&response_text).map_err(|err| { - eprintln!("Failed to parse response body to json: {err}"); - io::Error::new(io::ErrorKind::InvalidData, err.to_string()) - })?; - - Ok(response_json) -}