From c0ff81968ba51f93669f34fc88065eb63ad4ea3f Mon Sep 17 00:00:00 2001 From: Jiajie Chen Date: Mon, 11 Mar 2024 09:51:05 +0800 Subject: [PATCH] feat: add pipeline and worker status --- server/src/api.rs | 71 ++++++++++++++++++++++++++++++++++++++++++-- server/src/bot.rs | 65 +++++++++++++--------------------------- server/src/main.rs | 3 ++ server/src/models.rs | 3 +- server/src/routes.rs | 12 +++++++- 5 files changed, 105 insertions(+), 49 deletions(-) diff --git a/server/src/api.rs b/server/src/api.rs index a318fe3..984eae2 100644 --- a/server/src/api.rs +++ b/server/src/api.rs @@ -1,14 +1,17 @@ +use std::collections::BTreeMap; + use crate::{ github::get_packages_from_pr, job::get_crab_github_installation, - models::{NewJob, NewPipeline, Pipeline}, + models::{NewJob, NewPipeline, Pipeline, Worker}, DbPool, ALL_ARCH, ARGS, }; use anyhow::anyhow; use anyhow::Context; use buildit_utils::github::{get_archs, update_abbs}; use common::JobSource; -use diesel::{RunQueryDsl, SelectableHelper}; +use diesel::{dsl::count, ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; +use serde::Serialize; use tracing::warn; pub async fn pipeline_new( @@ -197,3 +200,67 @@ pub async fn pipeline_new_pr( } } } + +#[derive(Serialize)] +pub struct PipelineStatus { + pub arch: String, + pub pending: u64, + pub running: u64, + pub available_servers: u64, +} + +pub async fn pipeline_status(pool: DbPool) -> anyhow::Result> { + let mut conn = pool + .get() + .context("Failed to get db connection from pool")?; + // find pending/running jobs + let pending: BTreeMap = crate::schema::jobs::dsl::jobs + .filter(crate::schema::jobs::dsl::status.eq("created")) + .group_by(crate::schema::jobs::dsl::arch) + .select(( + crate::schema::jobs::dsl::arch, + count(crate::schema::jobs::dsl::id), + )) + .load::<(String, i64)>(&mut conn)? + .into_iter() + .collect(); + let running: BTreeMap = crate::schema::jobs::dsl::jobs + .filter(crate::schema::jobs::dsl::status.eq("assigned")) + .group_by(crate::schema::jobs::dsl::arch) + .select(( + crate::schema::jobs::dsl::arch, + count(crate::schema::jobs::dsl::id), + )) + .load::<(String, i64)>(&mut conn)? + .into_iter() + .collect(); + + use crate::schema::workers::dsl::*; + let available_servers: BTreeMap = workers + .group_by(arch) + .select((arch, count(id))) + .load::<(String, i64)>(&mut conn)? + .into_iter() + .collect(); + + let mut res = vec![]; + for a in ALL_ARCH { + res.push(PipelineStatus { + arch: a.to_string(), + pending: *pending.get(*a).unwrap_or(&0) as u64, + running: *running.get(*a).unwrap_or(&0) as u64, + available_servers: *available_servers.get(*a).unwrap_or(&0) as u64, + }); + } + + Ok(res) +} + +pub async fn worker_status(pool: DbPool) -> anyhow::Result> { + let mut conn = pool + .get() + .context("Failed to get db connection from pool")?; + + let workers = crate::schema::workers::dsl::workers.load::(&mut conn)?; + Ok(workers) +} diff --git a/server/src/bot.rs b/server/src/bot.rs index 0aae872..4dc44be 100644 --- a/server/src/bot.rs +++ b/server/src/bot.rs @@ -1,5 +1,5 @@ use crate::{ - api::{pipeline_new, pipeline_new_pr}, + api::{pipeline_new, pipeline_new_pr, pipeline_status, worker_status}, formatter::{code_repr_string, to_html_new_job_summary}, github::{get_github_token, login_github}, job::get_ready_message, @@ -7,6 +7,7 @@ use crate::{ }; use buildit_utils::github::{get_archs, OpenPRError, OpenPRRequest}; +use chrono::Local; use common::JobSource; use serde_json::Value; @@ -62,59 +63,33 @@ fn handle_archs_args(archs: Vec<&str>) -> Vec<&str> { archs } -async fn status(_pool: DbPool) -> anyhow::Result { - todo!() - /* +async fn status(pool: DbPool) -> anyhow::Result { let mut res = String::from("__*Queue Status*__\n\n"); - let conn = pool.get().await?; - let channel = conn.create_channel().await?; - - for arch in ALL_ARCH { - let queue_name = format!("job-{}", arch); - - let queue = ensure_job_queue(&queue_name, &channel).await?; - - // read unacknowledged job count - let mut unacknowledged_str = String::new(); - if let Some(api) = &ARGS.rabbitmq_queue_api { - let res = http_rabbitmq_api(api, queue_name).await?; - if let Some(unacknowledged) = res - .as_object() - .and_then(|m| m.get("messages_unacknowledged")) - .and_then(|v| v.as_i64()) - { - unacknowledged_str = format!("{} job\\(s\\) running, ", unacknowledged); - } - } + + for status in pipeline_status(pool.clone()).await? { res += &format!( - "*{}*: {}{} jobs\\(s\\) pending, {} available server\\(s\\)\n", - teloxide::utils::markdown::escape(arch), - unacknowledged_str, - queue.message_count(), - queue.consumer_count() + "*{}*: {} jobs \\(s\\) pending, {} jobs\\(s\\) running, {} available server\\(s\\)\n", + teloxide::utils::markdown::escape(&status.arch), + status.pending, + status.running, + status.available_servers ); } res += "\n__*Server Status*__\n\n"; let fmt = timeago::Formatter::new(); - if let Ok(lock) = WORKERS.lock() { - for (identifier, status) in lock.iter() { - res += &teloxide::utils::markdown::escape(&format!( - "{} ({}{}, {} core(s), {} memory): Online as of {}\n", - identifier.hostname, - identifier.arch, - match &status.git_commit { - Some(git_commit) => format!(" {}", git_commit), - None => String::new(), - }, - status.logical_cores, - size::Size::from_bytes(status.memory_bytes), - fmt.convert_chrono(status.last_heartbeat, Local::now()) - )); - } + for status in worker_status(pool).await? { + res += &teloxide::utils::markdown::escape(&format!( + "{} ({} {}, {} core(s), {} memory): Online as of {}\n", + status.hostname, + status.arch, + status.git_commit, + status.logical_cores, + size::Size::from_bytes(status.memory_bytes), + fmt.convert_chrono(status.last_heartbeat_time, Local::now()) + )); } Ok(res) - */ } pub async fn http_rabbitmq_api(api: &str, queue_name: String) -> anyhow::Result { diff --git a/server/src/main.rs b/server/src/main.rs index 22d2fcb..db369e3 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -5,6 +5,7 @@ use diesel::r2d2::ConnectionManager; use diesel::r2d2::Pool; use server::routes::{ping, pipeline_new_pr, worker_job_update, worker_poll}; use server::routes::{pipeline_new, worker_heartbeat}; +use server::routes::{pipeline_status, worker_status}; use server::ARGS; use tower_http::services::{ServeDir, ServeFile}; @@ -26,9 +27,11 @@ async fn main() -> anyhow::Result<()> { .route("/api/ping", get(ping)) .route("/api/pipeline/new", post(pipeline_new)) .route("/api/pipeline/new_pr", post(pipeline_new_pr)) + .route("/api/pipeline/status", get(pipeline_status)) .route("/api/worker/heartbeat", post(worker_heartbeat)) .route("/api/worker/poll", post(worker_poll)) .route("/api/worker/job_update", post(worker_job_update)) + .route("/api/worker/status", get(worker_status)) .fallback_service(serve_dir) .with_state(pool) .layer(tower_http::trace::TraceLayer::new_for_http()); diff --git a/server/src/models.rs b/server/src/models.rs index f6e7570..58b26dc 100644 --- a/server/src/models.rs +++ b/server/src/models.rs @@ -1,4 +1,5 @@ use diesel::prelude::*; +use serde::Serialize; #[derive(Queryable, Selectable)] #[diesel(table_name = crate::schema::pipelines)] @@ -65,7 +66,7 @@ pub struct NewJob { pub github_check_run_id: Option, } -#[derive(Queryable, Selectable)] +#[derive(Queryable, Selectable, Serialize)] #[diesel(table_name = crate::schema::workers)] #[diesel(check_for_backend(diesel::pg::Pg))] pub struct Worker { diff --git a/server/src/routes.rs b/server/src/routes.rs index ebaf058..c82d96a 100644 --- a/server/src/routes.rs +++ b/server/src/routes.rs @@ -1,5 +1,5 @@ use crate::{ - api, + api::{self, PipelineStatus}, models::{Job, NewWorker, Pipeline, Worker}, DbPool, }; @@ -255,3 +255,13 @@ pub async fn worker_job_update( } Ok(()) } + +pub async fn pipeline_status( + State(pool): State, +) -> Result>, AnyhowError> { + Ok(Json(api::pipeline_status(pool).await?)) +} + +pub async fn worker_status(State(pool): State) -> Result>, AnyhowError> { + Ok(Json(api::worker_status(pool).await?)) +}