Skip to content

Commit

Permalink
feat: add pipeline and worker status
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Mar 11, 2024
1 parent 9a5f667 commit c0ff819
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 49 deletions.
71 changes: 69 additions & 2 deletions server/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::collections::BTreeMap;

use crate::{
github::get_packages_from_pr,
job::get_crab_github_installation,
models::{NewJob, NewPipeline, Pipeline},
models::{NewJob, NewPipeline, Pipeline, Worker},
DbPool, ALL_ARCH, ARGS,
};
use anyhow::anyhow;
use anyhow::Context;
use buildit_utils::github::{get_archs, update_abbs};
use common::JobSource;
use diesel::{RunQueryDsl, SelectableHelper};
use diesel::{dsl::count, ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper};
use serde::Serialize;
use tracing::warn;

pub async fn pipeline_new(
Expand Down Expand Up @@ -197,3 +200,67 @@ pub async fn pipeline_new_pr(
}
}
}

#[derive(Serialize)]
pub struct PipelineStatus {
pub arch: String,
pub pending: u64,
pub running: u64,
pub available_servers: u64,
}

pub async fn pipeline_status(pool: DbPool) -> anyhow::Result<Vec<PipelineStatus>> {
let mut conn = pool
.get()
.context("Failed to get db connection from pool")?;
// find pending/running jobs
let pending: BTreeMap<String, i64> = crate::schema::jobs::dsl::jobs
.filter(crate::schema::jobs::dsl::status.eq("created"))
.group_by(crate::schema::jobs::dsl::arch)
.select((
crate::schema::jobs::dsl::arch,
count(crate::schema::jobs::dsl::id),
))
.load::<(String, i64)>(&mut conn)?
.into_iter()
.collect();
let running: BTreeMap<String, i64> = crate::schema::jobs::dsl::jobs
.filter(crate::schema::jobs::dsl::status.eq("assigned"))
.group_by(crate::schema::jobs::dsl::arch)
.select((
crate::schema::jobs::dsl::arch,
count(crate::schema::jobs::dsl::id),
))
.load::<(String, i64)>(&mut conn)?
.into_iter()
.collect();

use crate::schema::workers::dsl::*;
let available_servers: BTreeMap<String, i64> = workers
.group_by(arch)
.select((arch, count(id)))
.load::<(String, i64)>(&mut conn)?
.into_iter()
.collect();

let mut res = vec![];
for a in ALL_ARCH {
res.push(PipelineStatus {
arch: a.to_string(),
pending: *pending.get(*a).unwrap_or(&0) as u64,
running: *running.get(*a).unwrap_or(&0) as u64,
available_servers: *available_servers.get(*a).unwrap_or(&0) as u64,
});
}

Ok(res)
}

pub async fn worker_status(pool: DbPool) -> anyhow::Result<Vec<Worker>> {
let mut conn = pool
.get()
.context("Failed to get db connection from pool")?;

let workers = crate::schema::workers::dsl::workers.load::<Worker>(&mut conn)?;
Ok(workers)
}
65 changes: 20 additions & 45 deletions server/src/bot.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{
api::{pipeline_new, pipeline_new_pr},
api::{pipeline_new, pipeline_new_pr, pipeline_status, worker_status},
formatter::{code_repr_string, to_html_new_job_summary},
github::{get_github_token, login_github},
job::get_ready_message,
DbPool, ALL_ARCH, ARGS,
};
use buildit_utils::github::{get_archs, OpenPRError, OpenPRRequest};

use chrono::Local;
use common::JobSource;

use serde_json::Value;
Expand Down Expand Up @@ -62,59 +63,33 @@ fn handle_archs_args(archs: Vec<&str>) -> Vec<&str> {
archs
}

async fn status(_pool: DbPool) -> anyhow::Result<String> {
todo!()
/*
async fn status(pool: DbPool) -> anyhow::Result<String> {
let mut res = String::from("__*Queue Status*__\n\n");
let conn = pool.get().await?;
let channel = conn.create_channel().await?;
for arch in ALL_ARCH {
let queue_name = format!("job-{}", arch);
let queue = ensure_job_queue(&queue_name, &channel).await?;
// read unacknowledged job count
let mut unacknowledged_str = String::new();
if let Some(api) = &ARGS.rabbitmq_queue_api {
let res = http_rabbitmq_api(api, queue_name).await?;
if let Some(unacknowledged) = res
.as_object()
.and_then(|m| m.get("messages_unacknowledged"))
.and_then(|v| v.as_i64())
{
unacknowledged_str = format!("{} job\\(s\\) running, ", unacknowledged);
}
}

for status in pipeline_status(pool.clone()).await? {
res += &format!(
"*{}*: {}{} jobs\\(s\\) pending, {} available server\\(s\\)\n",
teloxide::utils::markdown::escape(arch),
unacknowledged_str,
queue.message_count(),
queue.consumer_count()
"*{}*: {} jobs \\(s\\) pending, {} jobs\\(s\\) running, {} available server\\(s\\)\n",
teloxide::utils::markdown::escape(&status.arch),
status.pending,
status.running,
status.available_servers
);
}

res += "\n__*Server Status*__\n\n";
let fmt = timeago::Formatter::new();
if let Ok(lock) = WORKERS.lock() {
for (identifier, status) in lock.iter() {
res += &teloxide::utils::markdown::escape(&format!(
"{} ({}{}, {} core(s), {} memory): Online as of {}\n",
identifier.hostname,
identifier.arch,
match &status.git_commit {
Some(git_commit) => format!(" {}", git_commit),
None => String::new(),
},
status.logical_cores,
size::Size::from_bytes(status.memory_bytes),
fmt.convert_chrono(status.last_heartbeat, Local::now())
));
}
for status in worker_status(pool).await? {
res += &teloxide::utils::markdown::escape(&format!(
"{} ({} {}, {} core(s), {} memory): Online as of {}\n",
status.hostname,
status.arch,
status.git_commit,
status.logical_cores,
size::Size::from_bytes(status.memory_bytes),
fmt.convert_chrono(status.last_heartbeat_time, Local::now())
));
}
Ok(res)
*/
}

pub async fn http_rabbitmq_api(api: &str, queue_name: String) -> anyhow::Result<Value> {
Expand Down
3 changes: 3 additions & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use server::routes::{ping, pipeline_new_pr, worker_job_update, worker_poll};
use server::routes::{pipeline_new, worker_heartbeat};
use server::routes::{pipeline_status, worker_status};
use server::ARGS;
use tower_http::services::{ServeDir, ServeFile};

Expand All @@ -26,9 +27,11 @@ async fn main() -> anyhow::Result<()> {
.route("/api/ping", get(ping))
.route("/api/pipeline/new", post(pipeline_new))
.route("/api/pipeline/new_pr", post(pipeline_new_pr))
.route("/api/pipeline/status", get(pipeline_status))
.route("/api/worker/heartbeat", post(worker_heartbeat))
.route("/api/worker/poll", post(worker_poll))
.route("/api/worker/job_update", post(worker_job_update))
.route("/api/worker/status", get(worker_status))
.fallback_service(serve_dir)
.with_state(pool)
.layer(tower_http::trace::TraceLayer::new_for_http());
Expand Down
3 changes: 2 additions & 1 deletion server/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use diesel::prelude::*;
use serde::Serialize;

#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::schema::pipelines)]
Expand Down Expand Up @@ -65,7 +66,7 @@ pub struct NewJob {
pub github_check_run_id: Option<i64>,
}

#[derive(Queryable, Selectable)]
#[derive(Queryable, Selectable, Serialize)]
#[diesel(table_name = crate::schema::workers)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct Worker {
Expand Down
12 changes: 11 additions & 1 deletion server/src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
api,
api::{self, PipelineStatus},
models::{Job, NewWorker, Pipeline, Worker},
DbPool,
};
Expand Down Expand Up @@ -255,3 +255,13 @@ pub async fn worker_job_update(
}
Ok(())
}

pub async fn pipeline_status(
State(pool): State<DbPool>,
) -> Result<Json<Vec<PipelineStatus>>, AnyhowError> {
Ok(Json(api::pipeline_status(pool).await?))
}

pub async fn worker_status(State(pool): State<DbPool>) -> Result<Json<Vec<Worker>>, AnyhowError> {
Ok(Json(api::worker_status(pool).await?))
}

0 comments on commit c0ff819

Please sign in to comment.