From 35b2867adad9d9ecf3c6c76525d92f1c967ec44a Mon Sep 17 00:00:00 2001 From: Jiajie Chen Date: Sun, 10 Mar 2024 22:17:57 +0800 Subject: [PATCH] feat: reuse api in telegram bot code --- common/src/lib.rs | 2 +- server/src/api.rs | 27 ++-- server/src/bot.rs | 261 ++++++++-------------------------- server/src/formatter.rs | 9 +- server/src/github_webhooks.rs | 7 +- server/src/job.rs | 14 +- server/src/lib.rs | 6 + server/src/routes.rs | 20 ++- 8 files changed, 110 insertions(+), 236 deletions(-) diff --git a/common/src/lib.rs b/common/src/lib.rs index fd09cf2..f998166 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -37,7 +37,7 @@ pub enum JobSource { /// GitHub PR number Github(u64), /// Manual - Manual + Manual, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/server/src/api.rs b/server/src/api.rs index 38aad59..a318fe3 100644 --- a/server/src/api.rs +++ b/server/src/api.rs @@ -2,26 +2,24 @@ use crate::{ github::get_packages_from_pr, job::get_crab_github_installation, models::{NewJob, NewPipeline, Pipeline}, - ALL_ARCH, ARGS, + DbPool, ALL_ARCH, ARGS, }; use anyhow::anyhow; use anyhow::Context; use buildit_utils::github::{get_archs, update_abbs}; use common::JobSource; -use diesel::{ - r2d2::{ConnectionManager, Pool}, - PgConnection, RunQueryDsl, SelectableHelper, -}; +use diesel::{RunQueryDsl, SelectableHelper}; use tracing::warn; pub async fn pipeline_new( - pool: Pool>, + pool: DbPool, git_branch: &str, git_sha: Option<&str>, + github_pr: Option, packages: &str, archs: &str, source: &JobSource, -) -> anyhow::Result { +) -> anyhow::Result { // resolve branch name to commit hash if not specified let git_sha = match git_sha { Some(git_sha) => git_sha.to_string(), @@ -62,9 +60,9 @@ pub async fn pipeline_new( .context("Failed to get db connection from pool")?; use crate::schema::pipelines; let (source, github_pr, telegram_user) = match source { - JobSource::Telegram(id) => ("telegram", None, Some(id)), - JobSource::Github(id) => ("github", Some(id), None), - JobSource::Manual => ("manual", None, None), + JobSource::Telegram(id) => ("telegram", github_pr, Some(id)), + JobSource::Github(id) => ("github", Some(*id), None), + JobSource::Manual => ("manual", github_pr, None), }; let new_pipeline = NewPipeline { packages: packages.to_string(), @@ -73,7 +71,7 @@ pub async fn pipeline_new( git_sha: git_sha.clone(), creation_time: chrono::Utc::now(), source: source.to_string(), - github_pr: github_pr.map(|id| *id as i64), + github_pr: github_pr.map(|pr| pr as i64), telegram_user: telegram_user.map(|id| *id), }; let pipeline = diesel::insert_into(pipelines::table) @@ -132,14 +130,14 @@ pub async fn pipeline_new( .context("Failed to create job")?; } - Ok(pipeline.id) + Ok(pipeline) } pub async fn pipeline_new_pr( - pool: Pool>, + pool: DbPool, pr: u64, archs: Option<&str>, -) -> anyhow::Result { +) -> anyhow::Result { match octocrab::instance() .pulls("AOSC-Dev", "aosc-os-abbs") .get(pr) @@ -182,6 +180,7 @@ pub async fn pipeline_new_pr( pool, git_branch, Some(&git_sha), + Some(pr.number), &packages.join(","), &archs, &JobSource::Github(pr.number), diff --git a/server/src/bot.rs b/server/src/bot.rs index f8492ed..0aae872 100644 --- a/server/src/bot.rs +++ b/server/src/bot.rs @@ -1,12 +1,13 @@ use crate::{ + api::{pipeline_new, pipeline_new_pr}, formatter::{code_repr_string, to_html_new_job_summary}, - github::{get_github_token, get_packages_from_pr, login_github}, - job::{get_ready_message, send_build_request}, - ALL_ARCH, ARGS, WORKERS, + github::{get_github_token, login_github}, + job::get_ready_message, + DbPool, ALL_ARCH, ARGS, }; -use buildit_utils::github::{get_archs, update_abbs, OpenPRError, OpenPRRequest}; -use chrono::Local; -use common::{ensure_job_queue, JobSource}; +use buildit_utils::github::{get_archs, OpenPRError, OpenPRRequest}; + +use common::JobSource; use serde_json::Value; use std::borrow::Cow; @@ -15,7 +16,6 @@ use teloxide::{ types::{ChatAction, ParseMode}, utils::command::BotCommands, }; -use tokio::process; #[derive(BotCommands, Clone)] #[command( @@ -49,81 +49,6 @@ pub enum Command { Dickens(String), } -pub struct BuildRequest<'a> { - pub branch: &'a str, - pub packages: &'a [String], - pub archs: &'a [&'a str], - pub github_pr: Option, - pub sha: &'a str, -} - -async fn telegram_send_build_request( - bot: &Bot, - build_request: BuildRequest<'_>, - msg: &Message, - pool: deadpool_lapin::Pool, -) -> ResponseResult<()> { - let BuildRequest { - branch, - packages, - archs, - github_pr, - sha, - } = build_request; - - let archs = handle_archs_args(archs.to_vec()); - - let conn = match pool.get().await { - Ok(conn) => conn, - Err(err) => { - bot.send_message( - msg.chat.id, - format!("Failed to connect to message queue: {}", err), - ) - .await?; - return Ok(()); - } - }; - let channel = match conn.create_channel().await { - Ok(conn) => conn, - Err(err) => { - bot.send_message( - msg.chat.id, - format!("Failed to connect to create channel: {}", err), - ) - .await?; - return Ok(()); - } - }; - - match send_build_request( - branch, - packages, - &archs, - github_pr, - JobSource::Telegram(msg.chat.id.0), - sha, - &channel, - ) - .await - { - Ok(()) => { - bot.send_message( - msg.chat.id, - to_html_new_job_summary(branch, github_pr, &archs, packages), - ) - .parse_mode(ParseMode::Html) - .disable_web_page_preview(true) - .await?; - } - Err(err) => { - bot.send_message(msg.chat.id, format!("Failed to create job: {}", err)) - .await?; - } - } - Ok(()) -} - fn handle_archs_args(archs: Vec<&str>) -> Vec<&str> { let mut archs = archs; if archs.contains(&"mainline") { @@ -137,7 +62,9 @@ fn handle_archs_args(archs: Vec<&str>) -> Vec<&str> { archs } -async fn status(pool: deadpool_lapin::Pool) -> anyhow::Result { +async fn status(_pool: DbPool) -> anyhow::Result { + todo!() + /* let mut res = String::from("__*Queue Status*__\n\n"); let conn = pool.get().await?; let channel = conn.create_channel().await?; @@ -187,6 +114,7 @@ async fn status(pool: deadpool_lapin::Pool) -> anyhow::Result { } } Ok(res) + */ } pub async fn http_rabbitmq_api(api: &str, queue_name: String) -> anyhow::Result { @@ -202,12 +130,7 @@ pub async fn http_rabbitmq_api(api: &str, queue_name: String) -> anyhow::Result< Ok(res) } -pub async fn answer( - bot: Bot, - msg: Message, - cmd: Command, - pool: deadpool_lapin::Pool, -) -> ResponseResult<()> { +pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> ResponseResult<()> { bot.send_chat_action(msg.chat.id, ChatAction::Typing) .await?; match cmd { @@ -249,93 +172,30 @@ pub async fn answer( } if parse_success { + let archs = if parts.len() == 1 { + None + } else { + Some(parts[1]) + }; for pr_number in pr_numbers { - match octocrab::instance() - .pulls("AOSC-Dev", "aosc-os-abbs") - .get(pr_number) - .await - { - Ok(pr) => { - // If the pull request has been merged, - // build and push packages based on stable - let (branch, sha) = if pr.merged_at.is_some() { - ( - "stable", - pr.merge_commit_sha - .as_ref() - .expect("merge_commit_sha should not be None"), - ) - } else { - (pr.head.ref_field.as_str(), &pr.head.sha) - }; - - if pr.head.repo.as_ref().and_then(|x| x.fork).unwrap_or(false) { - bot.send_message( - msg.chat.id, - "Failed to create job: Pull request is a fork", - ) - .await?; - return Ok(()); - } - - let path = &ARGS.abbs_path; - - if let Err(e) = update_abbs(branch, path).await { - bot.send_message(msg.chat.id, e.to_string()).await?; - } - - // find lines starting with #buildit - let packages = get_packages_from_pr(&pr); - if !packages.is_empty() { - let archs = if parts.len() == 1 { - let path = &ARGS.abbs_path; - - get_archs(path, &packages) - } else { - let archs = parts[1].split(',').collect(); - - for a in &archs { - if !ALL_ARCH.contains(a) && a != &"mainline" { - bot.send_message( - msg.chat.id, - format!("Architecture {a} is not supported."), - ) - .await?; - return Ok(()); - } - } - - archs - }; - - let build_request = BuildRequest { - branch, - packages: &packages, - archs: &archs, - github_pr: Some(pr_number), - sha, - }; - - telegram_send_build_request( - &bot, - build_request, - &msg, - pool.clone(), - ) - .await?; - } else { - bot.send_message(msg.chat.id, "Please list packages to build in pr info starting with '#buildit'.".to_string()) - .await?; - } - } - Err(err) => { - bot_send_message_handle_length( - &bot, - &msg, - &format!("Failed to get pr info: {err}."), + match pipeline_new_pr(pool.clone(), pr_number, archs).await { + Ok(pipeline) => { + bot.send_message( + msg.chat.id, + to_html_new_job_summary( + &pipeline.git_branch, + pipeline.github_pr.map(|n| n as u64), + &pipeline.archs.split(",").collect::>(), + &pipeline.packages.split(",").collect::>(), + ), ) + .parse_mode(ParseMode::Html) + .disable_web_page_preview(true) .await?; } + Err(err) => { + bot.send_message(msg.chat.id, format!("{err}")).await?; + } } } } @@ -343,33 +203,38 @@ pub async fn answer( Command::Build(arguments) => { let parts: Vec<&str> = arguments.split(' ').collect(); if parts.len() == 3 { - let branch = parts[0]; - let packages: Vec = parts[1].split(',').map(str::to_string).collect(); - let archs: Vec<&str> = parts[2].split(',').collect(); - - // resolve branch name to commit hash - let path = &ARGS.abbs_path; - - if let Err(e) = update_abbs(branch, path).await { - bot.send_message(msg.chat.id, format!("Failed to update ABBS tree: {e}")) - .await?; - } else { - let output = process::Command::new("git") - .arg("rev-parse") - .arg("HEAD") - .current_dir(path) - .output() + let git_branch = parts[0]; + let packages = parts[1]; + let archs = parts[2]; + + match pipeline_new( + pool, + git_branch, + None, + None, + packages, + archs, + &JobSource::Telegram(msg.chat.id.0), + ) + .await + { + Ok(pipeline) => { + bot.send_message( + msg.chat.id, + to_html_new_job_summary( + &pipeline.git_branch, + pipeline.github_pr.map(|n| n as u64), + &pipeline.archs.split(",").collect::>(), + &pipeline.packages.split(",").collect::>(), + ), + ) + .parse_mode(ParseMode::Html) + .disable_web_page_preview(true) .await?; - let sha = String::from_utf8_lossy(&output.stdout).trim().to_string(); - - let build_request = BuildRequest { - branch, - packages: &packages, - archs: &archs, - github_pr: None, - sha: &sha, - }; - telegram_send_build_request(&bot, build_request, &msg, pool.clone()).await?; + } + Err(err) => { + bot.send_message(msg.chat.id, format!("{err}")).await?; + } } return Ok(()); } diff --git a/server/src/formatter.rs b/server/src/formatter.rs index c1aad18..2288bff 100644 --- a/server/src/formatter.rs +++ b/server/src/formatter.rs @@ -9,7 +9,7 @@ pub fn to_html_new_job_summary( git_ref: &str, github_pr: Option, archs: &[&str], - packages: &[String], + packages: &[&str], ) -> String { format!( r#"New Job Summary @@ -117,7 +117,7 @@ pub fn code_repr_string(s: &str) -> String { #[test] fn test_format_html_new_job_summary() { - let s = to_html_new_job_summary("fd-9.0.0", Some(4992), &["amd64"], &["fd".to_string()]); + let s = to_html_new_job_summary("fd-9.0.0", Some(4992), &["amd64"], &["fd"]); assert_eq!(s, "New Job Summary\n\nGit reference: fd-9.0.0\nGitHub PR: #4992\nArchitecture(s): amd64\nPackage(s): fd") } @@ -130,7 +130,7 @@ fn test_format_html_build_result() { let job = JobOk { job: Job { packages: vec!["fd".to_string()], - git_ref: "fd-9.0.0".to_string(), + branch: "fd-9.0.0".to_string(), sha: "12345".to_string(), arch: "amd64".to_owned(), source: JobSource::Telegram(484493567), @@ -138,7 +138,9 @@ fn test_format_html_build_result() { noarch: false, enqueue_time: chrono::Utc .from_utc_datetime(&chrono::NaiveDateTime::from_timestamp_opt(61, 0).unwrap()), + github_check_run_id: None, }, + success: true, successful_packages: vec!["fd".to_string()], failed_package: None, skipped_packages: vec![], @@ -149,7 +151,6 @@ fn test_format_html_build_result() { pid: 54355, }, elapsed: Duration::from_secs_f64(888.85), - git_commit: Some("34acef168fc5ec454d3825fc864964951b130b49".to_string()), pushpkg_success: true, }; diff --git a/server/src/github_webhooks.rs b/server/src/github_webhooks.rs index 9162b56..b7cd08b 100644 --- a/server/src/github_webhooks.rs +++ b/server/src/github_webhooks.rs @@ -209,7 +209,12 @@ async fn handle_webhook_comment( create_github_comment(&crab, retry, num, &e.to_string()).await; } - let s = to_html_new_job_summary(branch, Some(num), &archs, &packages); + let s = to_html_new_job_summary( + branch, + Some(num), + &archs, + &packages.iter().map(|s| s.as_str()).collect::>(), + ); match send_build_request( branch, diff --git a/server/src/job.rs b/server/src/job.rs index d1f1873..f8a232b 100644 --- a/server/src/job.rs +++ b/server/src/job.rs @@ -1,9 +1,8 @@ use crate::{ - bot::http_rabbitmq_api, formatter::{to_html_build_result, to_markdown_build_result, FAILED, SUCCESS}, - ARGS, + DbPool, ARGS, }; -use anyhow::anyhow; + use buildit_utils::LOONGARCH64; use buildit_utils::{AMD64, ARM64, LOONGSON3, MIPS64R6EL, NOARCH, PPC64EL, RISCV64}; use common::{ensure_job_queue, Job, JobError, JobOk, JobResult, JobSource}; @@ -21,7 +20,7 @@ use octocrab::{ models::{CheckRunId, InstallationId}, Octocrab, }; -use std::fmt::Write; + use std::time::Duration; use teloxide::{prelude::*, types::ParseMode}; @@ -364,9 +363,11 @@ async fn handle_success_message( } pub async fn get_ready_message( - pool: deadpool_lapin::Pool, - archs: &[&str], + _pool: DbPool, + _archs: &[&str], ) -> anyhow::Result> { + todo!() + /* let mut res = vec![]; let conn = pool.get().await?; let channel = conn.create_channel().await?; @@ -419,6 +420,7 @@ pub async fn get_ready_message( } Ok(res) + */ } pub fn update_retry(retry: Option) -> HandleSuccessResult { diff --git a/server/src/lib.rs b/server/src/lib.rs index e78b5f5..63a4176 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,6 +1,10 @@ use chrono::{DateTime, Local}; use clap::Parser; use common::WorkerIdentifier; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; use once_cell::sync::Lazy; use std::{ collections::BTreeMap, @@ -19,6 +23,8 @@ pub mod models; pub mod routes; pub mod schema; +pub type DbPool = Pool>; + pub struct WorkerStatus { pub last_heartbeat: DateTime, pub git_commit: Option, diff --git a/server/src/routes.rs b/server/src/routes.rs index 5118b49..9152909 100644 --- a/server/src/routes.rs +++ b/server/src/routes.rs @@ -1,16 +1,11 @@ +use crate::{api, DbPool}; use axum::{ extract::{Json, State}, http::StatusCode, response::{IntoResponse, Response}, }; -use diesel::{ - r2d2::{ConnectionManager, Pool}, - PgConnection, -}; use serde::{Deserialize, Serialize}; -use crate::api; - pub async fn ping() -> &'static str { "PONG" } @@ -46,19 +41,20 @@ pub struct PipelineNewResponse { } pub async fn pipeline_new( - State(pool): State>>, + State(pool): State, Json(payload): Json, ) -> Result, AnyhowError> { - let pipeline_id = api::pipeline_new( + let pipeline = api::pipeline_new( pool, &payload.git_branch, None, + None, &payload.packages, &payload.archs, &common::JobSource::Manual, ) .await?; - Ok(Json(PipelineNewResponse { id: pipeline_id })) + Ok(Json(PipelineNewResponse { id: pipeline.id })) } #[derive(Deserialize)] @@ -68,10 +64,10 @@ pub struct PipelineNewPRRequest { } pub async fn pipeline_new_pr( - State(pool): State>>, + State(pool): State, Json(payload): Json, ) -> Result, AnyhowError> { - let pipeline_id = + let pipeline = api::pipeline_new_pr(pool, payload.pr, payload.archs.as_ref().map(|s| s.as_str())).await?; - Ok(Json(PipelineNewResponse { id: pipeline_id })) + Ok(Json(PipelineNewResponse { id: pipeline.id })) }