Skip to content

Commit

Permalink
PR feedback WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dynco-nym committed Nov 13, 2024
1 parent ac0e840 commit ed66ef0
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 63 deletions.
1 change: 0 additions & 1 deletion common/models/src/ns_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ pub struct TestrunAssignment {
pub struct SubmitResults {
pub message: String,
pub signature: Signature,
pub public_key: PublicKey,
}
1 change: 0 additions & 1 deletion nym-node-status-agent/src/cli/generate_keypair.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{fs::File, io::Write, path::Path};

use tracing::info;

pub(crate) fn generate_key_pair(path: impl AsRef<Path>) -> anyhow::Result<()> {
Expand Down
1 change: 0 additions & 1 deletion nym-node-status-agent/src/cli/run_probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,5 @@ fn sign_message(key: PrivateKey, probe_outcome: String) -> SubmitResults {
SubmitResults {
message: probe_outcome,
signature,
public_key: key.public_key(),
}
}
2 changes: 2 additions & 0 deletions nym-node-status-api/migrations/001_assigned_agent.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE testruns
ADD COLUMN assigned_agent VARCHAR;
27 changes: 3 additions & 24 deletions nym-node-status-api/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use clap::Parser;
use nym_bin_common::bin_info;
use nym_crypto::asymmetric::ed25519::PublicKey;
use reqwest::Url;
use std::{sync::OnceLock, time::Duration};

Expand Down Expand Up @@ -71,29 +70,9 @@ pub(crate) struct Cli {
#[arg(value_parser = parse_duration)]
pub(crate) testruns_refresh_interval: Duration,

#[clap(long, env = "NODE_STATUS_API_AGENT_KEY_LIST")]
#[arg(value_parser = parse_key_list)]
agent_key_list: KeyList,
}

impl Cli {
pub(crate) fn agent_key_list(&self) -> &Vec<PublicKey> {
&self.agent_key_list.0
}
}

// We need a list of keys from clap. But if we define CLI argument as Vec<T>,
// clap interprets that as type T which can be given as a CLI argument multiple
// times (so all of them are stored in a Vec<T>). Thus we wrap Vec in a newtype
// pattern to have a list of keys and make clap happy.
#[derive(Debug, Clone)]
struct KeyList(Vec<PublicKey>);

fn parse_key_list(arg: &str) -> anyhow::Result<KeyList> {
arg.split(',')
.map(|value| PublicKey::from_base58_string(value.trim()).map_err(anyhow::Error::from))
.collect::<anyhow::Result<Vec<_>>>()
.map(KeyList)
#[clap(env = "NODE_STATUS_API_AGENT_KEY_LIST")]
#[arg(value_delimiter = ',')]
pub(crate) agent_key_list: Vec<String>,
}

fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
Expand Down
10 changes: 10 additions & 0 deletions nym-node-status-api/src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
http::{self, models::SummaryHistory},
monitor::NumericalCheckedCast,
};
use nym_crypto::asymmetric::ed25519::PublicKey;
use nym_node_requests::api::v1::node::models::NodeDescription;
use serde::{Deserialize, Serialize};
use strum_macros::{EnumString, FromRepr};
Expand Down Expand Up @@ -309,6 +310,15 @@ pub struct TestRunDto {
pub timestamp_utc: i64,
pub ip_address: String,
pub log: String,
pub assigned_agent: Option<String>,
}

impl TestRunDto {
pub(crate) fn assigned_agent_key(&self) -> Option<PublicKey> {
(&self.assigned_agent)
.as_ref()
.and_then(|value| PublicKey::from_base58_string(value).ok())
}
}

#[derive(Debug, Clone, strum_macros::Display, EnumString, FromRepr, PartialEq)]
Expand Down
62 changes: 58 additions & 4 deletions nym-node-status-api/src/db/queries/testruns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
};
use anyhow::Context;
use chrono::Duration;
use nym_crypto::asymmetric::ed25519::PublicKey;
use sqlx::{pool::PoolConnection, Sqlite};

pub(crate) async fn get_in_progress_testrun_by_id(
Expand All @@ -20,7 +21,8 @@ pub(crate) async fn get_in_progress_testrun_by_id(
status as "status!",
timestamp_utc as "timestamp_utc!",
ip_address as "ip_address!",
log as "log!"
log as "log!",
assigned_agent
FROM testruns
WHERE
id = ?
Expand All @@ -35,6 +37,35 @@ pub(crate) async fn get_in_progress_testrun_by_id(
.context(format!("Couldn't retrieve testrun {testrun_id}"))
}

pub(crate) async fn get_testruns_assigned_to_agent(
conn: &mut PoolConnection<Sqlite>,
agent_key: PublicKey,
) -> anyhow::Result<TestRunDto> {
let agent_key = agent_key.to_base58_string();
sqlx::query_as!(
TestRunDto,
r#"SELECT
id as "id!",
gateway_id as "gateway_id!",
status as "status!",
timestamp_utc as "timestamp_utc!",
ip_address as "ip_address!",
log as "log!",
assigned_agent
FROM testruns
WHERE
assigned_agent = ?
AND
status = ?
ORDER BY timestamp_utc"#,
agent_key,
TestRunStatus::InProgress as i64,
)
.fetch_one(conn.as_mut())
.await
.context(format!("No testruns in progress for agent {agent_key}"))
}

pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> anyhow::Result<u64> {
let mut conn = db.acquire().await?;
let previous_run = now_utc() - age;
Expand All @@ -44,7 +75,8 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an
r#"UPDATE
testruns
SET
status = ?
status = ?,
assigned_agent = NULL
WHERE
status = ?
AND
Expand All @@ -69,13 +101,17 @@ pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> an
Ok(stale_testruns)
}

pub(crate) async fn get_oldest_testrun_and_make_it_pending(
pub(crate) async fn assign_oldest_testrun(
conn: &mut PoolConnection<Sqlite>,
agent_key: PublicKey,
) -> anyhow::Result<Option<TestrunAssignment>> {
let agent_key = agent_key.to_base58_string();
// find & mark as "In progress" in the same transaction to avoid race conditions
let returning = sqlx::query!(
r#"UPDATE testruns
SET status = ?
SET
status = ?,
assigned_agent = ?
WHERE rowid =
(
SELECT rowid
Expand All @@ -89,6 +125,7 @@ pub(crate) async fn get_oldest_testrun_and_make_it_pending(
gateway_id
"#,
TestRunStatus::InProgress as i64,
agent_key,
TestRunStatus::Queued as i64,
)
.fetch_optional(conn.as_mut())
Expand Down Expand Up @@ -134,6 +171,23 @@ pub(crate) async fn update_testrun_status(
Ok(())
}

pub(crate) async fn update_testrun_agent_key(
conn: &mut PoolConnection<Sqlite>,
testrun_id: i64,
agent_key: PublicKey,
) -> anyhow::Result<()> {
let agent_key = agent_key.to_base58_string();
sqlx::query!(
"UPDATE testruns SET assigned_agent = ? WHERE id = ?",
agent_key,
testrun_id
)
.execute(conn.as_mut())
.await?;

Ok(())
}

pub(crate) async fn update_gateway_last_probe_log(
conn: &mut PoolConnection<Sqlite>,
gateway_pk: i64,
Expand Down
85 changes: 57 additions & 28 deletions nym-node-status-api/src/http/api/testruns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn request_testrun(
.await
.map_err(HttpError::internal_with_logging)?;

return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(&mut conn).await {
return match db::queries::testruns::assign_oldest_testrun(&mut conn, agent_pubkey).await {
Ok(res) => {
if let Some(testrun) = res {
tracing::info!(
Expand All @@ -66,67 +66,96 @@ async fn request_testrun(

#[tracing::instrument(level = "debug", skip_all)]
async fn submit_testrun(
Path(testrun_id): Path<i64>,
Path(submitted_testrun_id): Path<i64>,
State(state): State<AppState>,
Json(probe_results): Json<SubmitResults>,
) -> HttpResult<StatusCode> {
let agent_pubkey = authenticate_agent(&probe_results.public_key.to_base58_string(), &state)?;
agent_pubkey
.verify(&probe_results.message, &probe_results.signature)
.map_err(|_| {
tracing::warn!("Message verification failed, rejecting");
HttpError::unauthorized()
})?;

let db = state.db_pool();
let mut conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;

let testrun = queries::testruns::get_in_progress_testrun_by_id(&mut conn, testrun_id)
.await
.map_err(|e| {
tracing::error!("{e}");
HttpError::not_found(testrun_id)
let submitted_testrun =
queries::testruns::get_in_progress_testrun_by_id(&mut conn, submitted_testrun_id)
.await
.map_err(|e| {
tracing::warn!("testrun_id {} not found: {}", submitted_testrun_id, e);
HttpError::not_found(submitted_testrun_id)
})?;
let agent_pubkey = submitted_testrun
.assigned_agent_key()
.ok_or_else(|| HttpError::unauthorized())?;

let assigned_testrun =
queries::testruns::get_testruns_assigned_to_agent(&mut conn, agent_pubkey)
.await
.map_err(|err| {
tracing::warn!("{err}");
HttpError::invalid_input("Invalid testrun submitted")
})?;
if submitted_testrun_id != assigned_testrun.id {
tracing::warn!(
"Agent {} submitted testrun {} but {} was expected",
agent_pubkey,
submitted_testrun_id,
assigned_testrun.id
);
return Err(HttpError::invalid_input("Invalid testrun submitted"));
}

agent_pubkey
.verify(&probe_results.message, &probe_results.signature)
.map_err(|_| {
tracing::warn!("Message verification failed, rejecting");
HttpError::unauthorized()
})?;

let gw_identity = db::queries::select_gateway_identity(&mut conn, testrun.gateway_id)
let gw_identity = db::queries::select_gateway_identity(&mut conn, assigned_testrun.gateway_id)
.await
.map_err(|_| {
// should never happen:
HttpError::internal_with_logging("No gateway found for testrun")
HttpError::internal_with_logging(format!(
"No gateway found for testrun {submitted_testrun_id}"
))
})?;
tracing::debug!(
"Agent {} submitted testrun {} for gateway {} ({} bytes)",
agent_pubkey,
testrun_id,
submitted_testrun_id,
gw_identity,
&probe_results.message.len(),
);

// TODO dz this should be part of a single transaction: commit after everything is done
queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete)
.await
.map_err(HttpError::internal_with_logging)?;
queries::testruns::update_testrun_status(
&mut conn,
submitted_testrun_id,
TestRunStatus::Complete,
)
.await
.map_err(HttpError::internal_with_logging)?;
queries::testruns::update_gateway_last_probe_log(
&mut conn,
testrun.gateway_id,
assigned_testrun.gateway_id,
&probe_results.message,
)
.await
.map_err(HttpError::internal_with_logging)?;
let result = get_result_from_log(&probe_results.message);
queries::testruns::update_gateway_last_probe_result(&mut conn, testrun.gateway_id, &result)
.await
.map_err(HttpError::internal_with_logging)?;
queries::testruns::update_gateway_score(&mut conn, testrun.gateway_id)
queries::testruns::update_gateway_last_probe_result(
&mut conn,
assigned_testrun.gateway_id,
&result,
)
.await
.map_err(HttpError::internal_with_logging)?;
queries::testruns::update_gateway_score(&mut conn, assigned_testrun.gateway_id)
.await
.map_err(HttpError::internal_with_logging)?;

tracing::info!(
"✅ Testrun row_id {} for gateway {} complete",
testrun.id,
assigned_testrun.id,
gw_identity
);

Expand Down
2 changes: 1 addition & 1 deletion nym-node-status-api/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl HttpError {

pub(crate) fn unauthorized() -> Self {
Self {
message: serde_json::json!({"message": "Make sure your public key si registered with NS API"}).to_string(),
message: serde_json::json!({"message": "Make sure your public key is registered with NS API"}).to_string(),
status: axum::http::StatusCode::UNAUTHORIZED,
}
}
Expand Down
7 changes: 6 additions & 1 deletion nym-node-status-api/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use clap::Parser;
use nym_crypto::asymmetric::ed25519::PublicKey;
use nym_task::signal::wait_for_signal;

mod cli;
Expand All @@ -14,7 +15,11 @@ async fn main() -> anyhow::Result<()> {

let args = cli::Cli::parse();

let agent_key_list = args.agent_key_list();
let agent_key_list = args
.agent_key_list
.iter()
.map(|value| PublicKey::from_base58_string(value).map_err(anyhow::Error::from))
.collect::<anyhow::Result<Vec<_>>>()?;
tracing::info!("Registered {} agent keys", agent_key_list.len());

let connection_url = args.database_url.clone();
Expand Down
2 changes: 1 addition & 1 deletion nym-node-status-api/src/testruns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn run(pool: &DbPool) -> anyhow::Result<()> {
testruns_created += 1;
}
}
tracing::debug!("{} testruns queued in total", testruns_created);
tracing::info!("{} testruns queued in total", testruns_created);

Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion nym-node-status-api/src/testruns/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ pub(crate) async fn try_queue_testrun(
status as "status!",
timestamp_utc as "timestamp_utc!",
ip_address as "ip_address!",
log as "log!"
log as "log!",
assigned_agent
FROM testruns
WHERE gateway_id = ? AND status != 2
ORDER BY id DESC
Expand Down

0 comments on commit ed66ef0

Please sign in to comment.