diff --git a/common/src/lib.rs b/common/src/lib.rs index 1e528b2..0dcf94c 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -28,7 +28,7 @@ pub struct WorkerHeartbeatRequest { pub disk_free_space_bytes: i64, pub worker_secret: String, pub performance: Option, - pub internet_connectivity: Option + pub internet_connectivity: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/server/Cargo.toml b/server/Cargo.toml index e316d33..7225692 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -18,7 +18,7 @@ serde = { version = "1.0.196", features = ["derive"] } serde_json = "1.0.113" teloxide = { version = "0.12.2", features = ["macros"] } timeago = { version = "0.4.2", features = ["chrono"] } -tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync"] } +tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "time"] } console = "0.15.8" buildit-utils = { path = "../buildit-utils" } jsonwebtoken = "9.2.0" diff --git a/server/src/api.rs b/server/src/api.rs index beca0ea..87f52a3 100644 --- a/server/src/api.rs +++ b/server/src/api.rs @@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use tracing::warn; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Copy, Clone, Serialize, Deserialize)] pub enum JobSource { /// Telegram user/group Telegram(i64), @@ -59,7 +59,7 @@ pub async fn pipeline_new( github_pr: Option, packages: &str, archs: &str, - source: &JobSource, + source: JobSource, skip_git_fetch: bool, ) -> anyhow::Result { // sanitize archs arg @@ -156,7 +156,7 @@ pub async fn pipeline_new( let creator_user_id = user.map(|user| user.id); ("telegram", github_pr, Some(id), creator_user_id) } - JobSource::Github(id) => ("github", Some(*id), None, None), + JobSource::Github(id) => ("github", Some(id), None, None), JobSource::Manual => ("manual", github_pr, None, None), }; let new_pipeline = NewPipeline { @@ -167,7 +167,7 @@ pub async fn pipeline_new( creation_time: chrono::Utc::now(), source: source.to_string(), github_pr: github_pr.map(|pr| pr as i64), - telegram_user: telegram_user.copied(), + telegram_user: telegram_user, creator_user_id: creator_user_id, }; let pipeline = diesel::insert_into(pipelines::table) @@ -240,7 +240,7 @@ pub async fn pipeline_new_pr( pool: DbPool, pr: u64, archs: Option<&str>, - source: &JobSource, + source: JobSource, ) -> anyhow::Result { match octocrab::instance() .pulls("AOSC-Dev", "aosc-os-abbs") diff --git a/server/src/bot.rs b/server/src/bot.rs index e9726ea..efa6dcf 100644 --- a/server/src/bot.rs +++ b/server/src/bot.rs @@ -13,12 +13,22 @@ use rand::prelude::SliceRandom; use rand::thread_rng; use reqwest::ClientBuilder; use serde::{Deserialize, Serialize}; -use std::{borrow::Cow, fmt::Display}; +use std::{ + borrow::{Borrow, Cow}, + fmt::Display, + future::Future, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; use teloxide::{ prelude::*, types::{ChatAction, ParseMode}, utils::command::BotCommands, }; +use tokio::time::sleep; use tracing::{warn, Instrument}; #[derive(BotCommands, Clone, Debug)] @@ -61,6 +71,36 @@ pub enum Command { Roll, } +async fn wait_with_send_typing, B: Borrow>( + f: F, + bot: B, + id: i64, +) -> T { + let is_done = Arc::new(AtomicBool::new(false)); + let is_done_shared = is_done.clone(); + let bot_shared: Bot = bot.borrow().clone(); + tokio::spawn(async move { + loop { + if is_done_shared.load(Ordering::Relaxed) { + break; + } + + bot_shared + .send_chat_action(ChatId(id), ChatAction::Typing) + .send() + .instrument(tracing::info_span!("send_chat_action")) + .await + .ok(); + + sleep(Duration::from_secs(5)).await; + } + }); + + let res = f.await; + is_done.store(true, Ordering::Relaxed); + res +} + fn handle_archs_args(archs: Vec<&str>) -> Vec<&str> { let mut archs = archs; if archs.contains(&"mainline") { @@ -123,15 +163,19 @@ async fn pipeline_new_and_report( archs: &str, msg: &Message, ) -> ResponseResult<()> { - match pipeline_new( - pool, - git_branch, - None, - None, - packages, - archs, - &JobSource::Telegram(msg.chat.id.0), - false, + match wait_with_send_typing( + pipeline_new( + pool, + git_branch, + None, + None, + packages, + archs, + JobSource::Telegram(msg.chat.id.0), + false, + ), + bot, + msg.chat.id.0, ) .await { @@ -156,6 +200,7 @@ async fn pipeline_new_and_report( .await?; } } + Ok(()) } @@ -267,7 +312,13 @@ async fn create_pipeline_from_pr( msg: &Message, bot: &Bot, ) -> ResponseResult<()> { - match pipeline_new_pr(pool, pr_number, archs, &JobSource::Telegram(msg.chat.id.0)).await { + match wait_with_send_typing( + pipeline_new_pr(pool, pr_number, archs, JobSource::Telegram(msg.chat.id.0)), + bot, + msg.chat.id.0, + ) + .await + { Ok(pipeline) => { bot.send_message( msg.chat.id, @@ -300,10 +351,6 @@ async fn create_pipeline_from_pr( #[tracing::instrument(skip(bot, msg, pool))] pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> ResponseResult<()> { - bot.send_chat_action(msg.chat.id, ChatAction::Typing) - .send() - .instrument(tracing::info_span!("send_chat_action")) - .await?; match cmd { Command::Help => { bot.send_message(msg.chat.id, Command::descriptions().to_string()) @@ -375,7 +422,7 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo ) .await?; } - Command::Status => match status(pool).await { + Command::Status => match wait_with_send_typing(status(pool), &bot, msg.chat.id.0).await { Ok(status) => { bot.send_message(msg.chat.id, status) .parse_mode(ParseMode::MarkdownV2) @@ -473,18 +520,22 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo } }; - match buildit_utils::github::open_pr( - app_private_key, - &token, - id, - OpenPRRequest { - git_ref: parts[1].to_owned(), - abbs_path: ARGS.abbs_path.clone(), - packages: parts[2].to_owned(), - title: parts[0].to_string(), - tags: tags.clone(), - archs: archs.clone(), - }, + match wait_with_send_typing( + buildit_utils::github::open_pr( + app_private_key, + &token, + id, + OpenPRRequest { + git_ref: parts[1].to_owned(), + abbs_path: ARGS.abbs_path.clone(), + packages: parts[2].to_owned(), + title: parts[0].to_string(), + tags: tags.clone(), + archs: archs.clone(), + }, + ), + &bot, + msg.chat.id.0, ) .await { @@ -519,7 +570,8 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo .await?; return Ok(()); } else { - let resp = login_github(&msg, arguments).await; + let resp = + wait_with_send_typing(login_github(&msg, arguments), &bot, msg.chat.id.0).await; match resp { Ok(_) => bot.send_message(msg.chat.id, "Login successful!").await?, @@ -552,7 +604,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo }; // get topic of pr - match crab.pulls("AOSC-Dev", "aosc-os-abbs").get(pr_number).await { + match wait_with_send_typing( + crab.pulls("AOSC-Dev", "aosc-os-abbs").get(pr_number), + &bot, + msg.chat.id.0, + ) + .await + { Ok(pr) => match dickens::topic::report( pr.head.ref_field.as_str(), ARGS.local_repo.clone(), @@ -561,10 +619,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo { Ok(report) => { // post report as github comment - match crab - .issues("AOSC-Dev", "aosc-os-abbs") - .create_comment(pr_number, report) - .await + match wait_with_send_typing( + crab.issues("AOSC-Dev", "aosc-os-abbs") + .create_comment(pr_number, report), + &bot, + msg.chat.id.0, + ) + .await { Ok(comment) => { bot.send_message( @@ -618,13 +679,18 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo let arch = parts[0]; let ty = parts[1]; let client = reqwest::Client::new(); - match client - .get(format!( - "https://aosc-packages.cth451.me/{}/{}/stable?type=json&page=all", - ty, arch - )) - .send() - .await + + match wait_with_send_typing( + client + .get(format!( + "https://aosc-packages.cth451.me/{}/{}/stable?type=json&page=all", + ty, arch + )) + .send(), + &bot, + msg.chat.id.0, + ) + .await { Ok(resp) => match resp.json::().await { Ok(qa) => { @@ -669,22 +735,24 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo .await?; } Command::Restart(arguments) => match str::parse::(&arguments) { - Ok(job_id) => match job_restart(pool, job_id).await { - Ok(new_job) => { - bot.send_message( - msg.chat.id, - truncate(&format!("Restarted as job #{}", new_job.id)), - ) - .await?; - } - Err(err) => { - bot.send_message( - msg.chat.id, - truncate(&format!("Failed to restart job: {err:?}")), - ) - .await?; + Ok(job_id) => { + match wait_with_send_typing(job_restart(pool, job_id), &bot, msg.chat.id.0).await { + Ok(new_job) => { + bot.send_message( + msg.chat.id, + truncate(&format!("Restarted as job #{}", new_job.id)), + ) + .await?; + } + Err(err) => { + bot.send_message( + msg.chat.id, + truncate(&format!("Failed to restart job: {err:?}")), + ) + .await?; + } } - }, + } Err(err) => { bot.send_message(msg.chat.id, truncate(&format!("Bad job ID: {err:?}"))) .await?; @@ -709,7 +777,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo } }; - let token = match get_github_token(&msg.chat.id, secret).await { + let token = match wait_with_send_typing( + get_github_token(&msg.chat.id, secret), + &bot, + msg.chat.id.0, + ) + .await + { Ok(s) => s.access_token, Err(e) => { bot.send_message(msg.chat.id, truncate(&format!("Got error: {e:?}"))) @@ -731,7 +805,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo } }; - let user = match get_user(pool.clone(), msg.chat.id, token.clone()).await { + let user = match wait_with_send_typing( + get_user(pool.clone(), msg.chat.id, token.clone()), + &bot, + msg.chat.id.0, + ) + .await + { Ok(user) => user, Err(err) => { bot.send_message( @@ -755,7 +835,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo } let coauthor = coauthor_parts.join(" "); - match find_update_and_update_checksum(&package, &ARGS.abbs_path, &coauthor).await { + match wait_with_send_typing( + find_update_and_update_checksum(&package, &ARGS.abbs_path, &coauthor), + &bot, + msg.chat.id.0, + ) + .await + { Ok(f) => { match buildit_utils::github::open_pr( app_private_key, @@ -800,7 +886,7 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo } }; } - Command::Roll => match roll().await { + Command::Roll => match wait_with_send_typing(roll(), &bot, msg.chat.id.0).await { Ok(pkgs) => { let mut s = String::new(); for i in pkgs { diff --git a/server/src/routes/pipeline.rs b/server/src/routes/pipeline.rs index f851b4d..c08c1b2 100644 --- a/server/src/routes/pipeline.rs +++ b/server/src/routes/pipeline.rs @@ -36,7 +36,7 @@ pub async fn pipeline_new( None, &payload.packages, &payload.archs, - &JobSource::Manual, + JobSource::Manual, false, ) .await?; @@ -57,7 +57,7 @@ pub async fn pipeline_new_pr( pool, payload.pr, payload.archs.as_deref(), - &JobSource::Manual, + JobSource::Manual, ) .await?; Ok(Json(PipelineNewResponse { id: pipeline.id }))