From ed66ef0ba4b0fa4baced814301b4f1715c6e5721 Mon Sep 17 00:00:00 2001 From: dynco-nym <173912580+dynco-nym@users.noreply.github.com> Date: Thu, 14 Nov 2024 00:25:06 +0100 Subject: [PATCH] PR feedback WIP --- common/models/src/ns_api.rs | 1 - .../src/cli/generate_keypair.rs | 1 - nym-node-status-agent/src/cli/run_probe.rs | 1 - .../migrations/001_assigned_agent.sql | 2 + nym-node-status-api/src/cli/mod.rs | 27 +----- nym-node-status-api/src/db/models.rs | 10 +++ .../src/db/queries/testruns.rs | 62 +++++++++++++- nym-node-status-api/src/http/api/testruns.rs | 85 +++++++++++++------ nym-node-status-api/src/http/error.rs | 2 +- nym-node-status-api/src/main.rs | 7 +- nym-node-status-api/src/testruns/mod.rs | 2 +- nym-node-status-api/src/testruns/queue.rs | 3 +- 12 files changed, 140 insertions(+), 63 deletions(-) create mode 100644 nym-node-status-api/migrations/001_assigned_agent.sql diff --git a/common/models/src/ns_api.rs b/common/models/src/ns_api.rs index 263628eb40b..f509f345743 100644 --- a/common/models/src/ns_api.rs +++ b/common/models/src/ns_api.rs @@ -11,5 +11,4 @@ pub struct TestrunAssignment { pub struct SubmitResults { pub message: String, pub signature: Signature, - pub public_key: PublicKey, } diff --git a/nym-node-status-agent/src/cli/generate_keypair.rs b/nym-node-status-agent/src/cli/generate_keypair.rs index ba86db838db..e89bf3e8d51 100644 --- a/nym-node-status-agent/src/cli/generate_keypair.rs +++ b/nym-node-status-agent/src/cli/generate_keypair.rs @@ -1,5 +1,4 @@ use std::{fs::File, io::Write, path::Path}; - use tracing::info; pub(crate) fn generate_key_pair(path: impl AsRef) -> anyhow::Result<()> { diff --git a/nym-node-status-agent/src/cli/run_probe.rs b/nym-node-status-agent/src/cli/run_probe.rs index 3c8ce13509f..aee1116f4ac 100644 --- a/nym-node-status-agent/src/cli/run_probe.rs +++ b/nym-node-status-agent/src/cli/run_probe.rs @@ -114,6 +114,5 @@ fn sign_message(key: PrivateKey, probe_outcome: String) -> SubmitResults { SubmitResults { message: probe_outcome, signature, - public_key: key.public_key(), } } diff --git a/nym-node-status-api/migrations/001_assigned_agent.sql b/nym-node-status-api/migrations/001_assigned_agent.sql new file mode 100644 index 00000000000..96ec895924e --- /dev/null +++ b/nym-node-status-api/migrations/001_assigned_agent.sql @@ -0,0 +1,2 @@ +ALTER TABLE testruns +ADD COLUMN assigned_agent VARCHAR; diff --git a/nym-node-status-api/src/cli/mod.rs b/nym-node-status-api/src/cli/mod.rs index 0ca2b9bf48e..fa3d3c12bfd 100644 --- a/nym-node-status-api/src/cli/mod.rs +++ b/nym-node-status-api/src/cli/mod.rs @@ -1,6 +1,5 @@ use clap::Parser; use nym_bin_common::bin_info; -use nym_crypto::asymmetric::ed25519::PublicKey; use reqwest::Url; use std::{sync::OnceLock, time::Duration}; @@ -71,29 +70,9 @@ pub(crate) struct Cli { #[arg(value_parser = parse_duration)] pub(crate) testruns_refresh_interval: Duration, - #[clap(long, env = "NODE_STATUS_API_AGENT_KEY_LIST")] - #[arg(value_parser = parse_key_list)] - agent_key_list: KeyList, -} - -impl Cli { - pub(crate) fn agent_key_list(&self) -> &Vec { - &self.agent_key_list.0 - } -} - -// We need a list of keys from clap. But if we define CLI argument as Vec, -// clap interprets that as type T which can be given as a CLI argument multiple -// times (so all of them are stored in a Vec). Thus we wrap Vec in a newtype -// pattern to have a list of keys and make clap happy. -#[derive(Debug, Clone)] -struct KeyList(Vec); - -fn parse_key_list(arg: &str) -> anyhow::Result { - arg.split(',') - .map(|value| PublicKey::from_base58_string(value.trim()).map_err(anyhow::Error::from)) - .collect::>>() - .map(KeyList) + #[clap(env = "NODE_STATUS_API_AGENT_KEY_LIST")] + #[arg(value_delimiter = ',')] + pub(crate) agent_key_list: Vec, } fn parse_duration(arg: &str) -> Result { diff --git a/nym-node-status-api/src/db/models.rs b/nym-node-status-api/src/db/models.rs index a5511787f9a..0a82b30a407 100644 --- a/nym-node-status-api/src/db/models.rs +++ b/nym-node-status-api/src/db/models.rs @@ -2,6 +2,7 @@ use crate::{ http::{self, models::SummaryHistory}, monitor::NumericalCheckedCast, }; +use nym_crypto::asymmetric::ed25519::PublicKey; use nym_node_requests::api::v1::node::models::NodeDescription; use serde::{Deserialize, Serialize}; use strum_macros::{EnumString, FromRepr}; @@ -309,6 +310,15 @@ pub struct TestRunDto { pub timestamp_utc: i64, pub ip_address: String, pub log: String, + pub assigned_agent: Option, +} + +impl TestRunDto { + pub(crate) fn assigned_agent_key(&self) -> Option { + (&self.assigned_agent) + .as_ref() + .and_then(|value| PublicKey::from_base58_string(value).ok()) + } } #[derive(Debug, Clone, strum_macros::Display, EnumString, FromRepr, PartialEq)] diff --git a/nym-node-status-api/src/db/queries/testruns.rs b/nym-node-status-api/src/db/queries/testruns.rs index 91a3a86b139..1ae81df9316 100644 --- a/nym-node-status-api/src/db/queries/testruns.rs +++ b/nym-node-status-api/src/db/queries/testruns.rs @@ -6,6 +6,7 @@ use crate::{ }; use anyhow::Context; use chrono::Duration; +use nym_crypto::asymmetric::ed25519::PublicKey; use sqlx::{pool::PoolConnection, Sqlite}; pub(crate) async fn get_in_progress_testrun_by_id( @@ -20,7 +21,8 @@ pub(crate) async fn get_in_progress_testrun_by_id( status as "status!", timestamp_utc as "timestamp_utc!", ip_address as "ip_address!", - log as "log!" + log as "log!", + assigned_agent FROM testruns WHERE id = ? @@ -35,6 +37,35 @@ pub(crate) async fn get_in_progress_testrun_by_id( .context(format!("Couldn't retrieve testrun {testrun_id}")) } +pub(crate) async fn get_testruns_assigned_to_agent( + conn: &mut PoolConnection, + agent_key: PublicKey, +) -> anyhow::Result { + let agent_key = agent_key.to_base58_string(); + sqlx::query_as!( + TestRunDto, + r#"SELECT + id as "id!", + gateway_id as "gateway_id!", + status as "status!", + timestamp_utc as "timestamp_utc!", + ip_address as "ip_address!", + log as "log!", + assigned_agent + FROM testruns + WHERE + assigned_agent = ? + AND + status = ? + ORDER BY timestamp_utc"#, + agent_key, + TestRunStatus::InProgress as i64, + ) + .fetch_one(conn.as_mut()) + .await + .context(format!("No testruns in progress for agent {agent_key}")) +} + pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> anyhow::Result { let mut conn = db.acquire().await?; let previous_run = now_utc() - age; @@ -44,7 +75,8 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an r#"UPDATE testruns SET - status = ? + status = ?, + assigned_agent = NULL WHERE status = ? AND @@ -69,13 +101,17 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an Ok(stale_testruns) } -pub(crate) async fn get_oldest_testrun_and_make_it_pending( +pub(crate) async fn assign_oldest_testrun( conn: &mut PoolConnection, + agent_key: PublicKey, ) -> anyhow::Result> { + let agent_key = agent_key.to_base58_string(); // find & mark as "In progress" in the same transaction to avoid race conditions let returning = sqlx::query!( r#"UPDATE testruns - SET status = ? + SET + status = ?, + assigned_agent = ? WHERE rowid = ( SELECT rowid @@ -89,6 +125,7 @@ pub(crate) async fn get_oldest_testrun_and_make_it_pending( gateway_id "#, TestRunStatus::InProgress as i64, + agent_key, TestRunStatus::Queued as i64, ) .fetch_optional(conn.as_mut()) @@ -134,6 +171,23 @@ pub(crate) async fn update_testrun_status( Ok(()) } +pub(crate) async fn update_testrun_agent_key( + conn: &mut PoolConnection, + testrun_id: i64, + agent_key: PublicKey, +) -> anyhow::Result<()> { + let agent_key = agent_key.to_base58_string(); + sqlx::query!( + "UPDATE testruns SET assigned_agent = ? WHERE id = ?", + agent_key, + testrun_id + ) + .execute(conn.as_mut()) + .await?; + + Ok(()) +} + pub(crate) async fn update_gateway_last_probe_log( conn: &mut PoolConnection, gateway_pk: i64, diff --git a/nym-node-status-api/src/http/api/testruns.rs b/nym-node-status-api/src/http/api/testruns.rs index 02074f4f3b2..7f0ba1440e1 100644 --- a/nym-node-status-api/src/http/api/testruns.rs +++ b/nym-node-status-api/src/http/api/testruns.rs @@ -45,7 +45,7 @@ async fn request_testrun( .await .map_err(HttpError::internal_with_logging)?; - return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(&mut conn).await { + return match db::queries::testruns::assign_oldest_testrun(&mut conn, agent_pubkey).await { Ok(res) => { if let Some(testrun) = res { tracing::info!( @@ -66,67 +66,96 @@ async fn request_testrun( #[tracing::instrument(level = "debug", skip_all)] async fn submit_testrun( - Path(testrun_id): Path, + Path(submitted_testrun_id): Path, State(state): State, Json(probe_results): Json, ) -> HttpResult { - let agent_pubkey = authenticate_agent(&probe_results.public_key.to_base58_string(), &state)?; - agent_pubkey - .verify(&probe_results.message, &probe_results.signature) - .map_err(|_| { - tracing::warn!("Message verification failed, rejecting"); - HttpError::unauthorized() - })?; - let db = state.db_pool(); let mut conn = db .acquire() .await .map_err(HttpError::internal_with_logging)?; - let testrun = queries::testruns::get_in_progress_testrun_by_id(&mut conn, testrun_id) - .await - .map_err(|e| { - tracing::error!("{e}"); - HttpError::not_found(testrun_id) + let submitted_testrun = + queries::testruns::get_in_progress_testrun_by_id(&mut conn, submitted_testrun_id) + .await + .map_err(|e| { + tracing::warn!("testrun_id {} not found: {}", submitted_testrun_id, e); + HttpError::not_found(submitted_testrun_id) + })?; + let agent_pubkey = submitted_testrun + .assigned_agent_key() + .ok_or_else(|| HttpError::unauthorized())?; + + let assigned_testrun = + queries::testruns::get_testruns_assigned_to_agent(&mut conn, agent_pubkey) + .await + .map_err(|err| { + tracing::warn!("{err}"); + HttpError::invalid_input("Invalid testrun submitted") + })?; + if submitted_testrun_id != assigned_testrun.id { + tracing::warn!( + "Agent {} submitted testrun {} but {} was expected", + agent_pubkey, + submitted_testrun_id, + assigned_testrun.id + ); + return Err(HttpError::invalid_input("Invalid testrun submitted")); + } + + agent_pubkey + .verify(&probe_results.message, &probe_results.signature) + .map_err(|_| { + tracing::warn!("Message verification failed, rejecting"); + HttpError::unauthorized() })?; - let gw_identity = db::queries::select_gateway_identity(&mut conn, testrun.gateway_id) + let gw_identity = db::queries::select_gateway_identity(&mut conn, assigned_testrun.gateway_id) .await .map_err(|_| { // should never happen: - HttpError::internal_with_logging("No gateway found for testrun") + HttpError::internal_with_logging(format!( + "No gateway found for testrun {submitted_testrun_id}" + )) })?; tracing::debug!( "Agent {} submitted testrun {} for gateway {} ({} bytes)", agent_pubkey, - testrun_id, + submitted_testrun_id, gw_identity, &probe_results.message.len(), ); - // TODO dz this should be part of a single transaction: commit after everything is done - queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete) - .await - .map_err(HttpError::internal_with_logging)?; + queries::testruns::update_testrun_status( + &mut conn, + submitted_testrun_id, + TestRunStatus::Complete, + ) + .await + .map_err(HttpError::internal_with_logging)?; queries::testruns::update_gateway_last_probe_log( &mut conn, - testrun.gateway_id, + assigned_testrun.gateway_id, &probe_results.message, ) .await .map_err(HttpError::internal_with_logging)?; let result = get_result_from_log(&probe_results.message); - queries::testruns::update_gateway_last_probe_result(&mut conn, testrun.gateway_id, &result) - .await - .map_err(HttpError::internal_with_logging)?; - queries::testruns::update_gateway_score(&mut conn, testrun.gateway_id) + queries::testruns::update_gateway_last_probe_result( + &mut conn, + assigned_testrun.gateway_id, + &result, + ) + .await + .map_err(HttpError::internal_with_logging)?; + queries::testruns::update_gateway_score(&mut conn, assigned_testrun.gateway_id) .await .map_err(HttpError::internal_with_logging)?; tracing::info!( "✅ Testrun row_id {} for gateway {} complete", - testrun.id, + assigned_testrun.id, gw_identity ); diff --git a/nym-node-status-api/src/http/error.rs b/nym-node-status-api/src/http/error.rs index 021c993827b..4ce1ca1ce1b 100644 --- a/nym-node-status-api/src/http/error.rs +++ b/nym-node-status-api/src/http/error.rs @@ -17,7 +17,7 @@ impl HttpError { pub(crate) fn unauthorized() -> Self { Self { - message: serde_json::json!({"message": "Make sure your public key si registered with NS API"}).to_string(), + message: serde_json::json!({"message": "Make sure your public key is registered with NS API"}).to_string(), status: axum::http::StatusCode::UNAUTHORIZED, } } diff --git a/nym-node-status-api/src/main.rs b/nym-node-status-api/src/main.rs index facb4485b92..02e25a9d013 100644 --- a/nym-node-status-api/src/main.rs +++ b/nym-node-status-api/src/main.rs @@ -1,4 +1,5 @@ use clap::Parser; +use nym_crypto::asymmetric::ed25519::PublicKey; use nym_task::signal::wait_for_signal; mod cli; @@ -14,7 +15,11 @@ async fn main() -> anyhow::Result<()> { let args = cli::Cli::parse(); - let agent_key_list = args.agent_key_list(); + let agent_key_list = args + .agent_key_list + .iter() + .map(|value| PublicKey::from_base58_string(value).map_err(anyhow::Error::from)) + .collect::>>()?; tracing::info!("Registered {} agent keys", agent_key_list.len()); let connection_url = args.database_url.clone(); diff --git a/nym-node-status-api/src/testruns/mod.rs b/nym-node-status-api/src/testruns/mod.rs index 7ac916fc8e5..522060850a9 100644 --- a/nym-node-status-api/src/testruns/mod.rs +++ b/nym-node-status-api/src/testruns/mod.rs @@ -71,7 +71,7 @@ async fn run(pool: &DbPool) -> anyhow::Result<()> { testruns_created += 1; } } - tracing::debug!("{} testruns queued in total", testruns_created); + tracing::info!("{} testruns queued in total", testruns_created); Ok(()) } diff --git a/nym-node-status-api/src/testruns/queue.rs b/nym-node-status-api/src/testruns/queue.rs index 88804fff1b1..ec9676ed70f 100644 --- a/nym-node-status-api/src/testruns/queue.rs +++ b/nym-node-status-api/src/testruns/queue.rs @@ -55,7 +55,8 @@ pub(crate) async fn try_queue_testrun( status as "status!", timestamp_utc as "timestamp_utc!", ip_address as "ip_address!", - log as "log!" + log as "log!", + assigned_agent FROM testruns WHERE gateway_id = ? AND status != 2 ORDER BY id DESC