Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: set chat action as typing to wait task done #9

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct WorkerHeartbeatRequest {
pub disk_free_space_bytes: i64,
pub worker_secret: String,
pub performance: Option<i64>,
pub internet_connectivity: Option<bool>
pub internet_connectivity: Option<bool>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
teloxide = { version = "0.12.2", features = ["macros"] }
timeago = { version = "0.4.2", features = ["chrono"] }
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync"] }
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "time"] }
console = "0.15.8"
buildit-utils = { path = "../buildit-utils" }
jsonwebtoken = "9.2.0"
Expand Down
10 changes: 5 additions & 5 deletions server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tracing::warn;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub enum JobSource {
/// Telegram user/group
Telegram(i64),
Expand All @@ -39,7 +39,7 @@ pub async fn pipeline_new(
github_pr: Option<u64>,
packages: &str,
archs: &str,
source: &JobSource,
source: JobSource,
) -> anyhow::Result<Pipeline> {
// sanitize archs arg
let mut archs: Vec<&str> = archs.split(',').collect();
Expand Down Expand Up @@ -135,7 +135,7 @@ pub async fn pipeline_new(
let creator_user_id = user.map(|user| user.id);
("telegram", github_pr, Some(id), creator_user_id)
}
JobSource::Github(id) => ("github", Some(*id), None, None),
JobSource::Github(id) => ("github", Some(id), None, None),
JobSource::Manual => ("manual", github_pr, None, None),
};
let new_pipeline = NewPipeline {
Expand All @@ -146,7 +146,7 @@ pub async fn pipeline_new(
creation_time: chrono::Utc::now(),
source: source.to_string(),
github_pr: github_pr.map(|pr| pr as i64),
telegram_user: telegram_user.copied(),
telegram_user: telegram_user,
creator_user_id: creator_user_id,
};
let pipeline = diesel::insert_into(pipelines::table)
Expand Down Expand Up @@ -218,7 +218,7 @@ pub async fn pipeline_new_pr(
pool: DbPool,
pr: u64,
archs: Option<&str>,
source: &JobSource,
source: JobSource,
) -> anyhow::Result<Pipeline> {
match octocrab::instance()
.pulls("AOSC-Dev", "aosc-os-abbs")
Expand Down
204 changes: 145 additions & 59 deletions server/src/bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,22 @@ use rand::prelude::SliceRandom;
use rand::thread_rng;
use reqwest::ClientBuilder;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, fmt::Display};
use std::{
borrow::{Borrow, Cow},
fmt::Display,
future::Future,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use teloxide::{
prelude::*,
types::{ChatAction, ParseMode},
utils::command::BotCommands,
};
use tokio::time::sleep;
use tracing::{warn, Instrument};

#[derive(BotCommands, Clone, Debug)]
Expand Down Expand Up @@ -61,6 +71,36 @@ pub enum Command {
Roll,
}

async fn wait_with_send_typing<T, F: Future<Output = T>, B: Borrow<Bot>>(
f: F,
bot: B,
id: i64,
) -> T {
let is_done = Arc::new(AtomicBool::new(false));
let is_done_shared = is_done.clone();
let bot_shared: Bot = bot.borrow().clone();
tokio::spawn(async move {
loop {
if is_done_shared.load(Ordering::Relaxed) {
break;
}

bot_shared
.send_chat_action(ChatId(id), ChatAction::Typing)
.send()
.instrument(tracing::info_span!("send_chat_action"))
.await
.ok();

sleep(Duration::from_secs(5)).await;
}
});

let res = f.await;
is_done.store(true, Ordering::Relaxed);
res
}

fn handle_archs_args(archs: Vec<&str>) -> Vec<&str> {
let mut archs = archs;
if archs.contains(&"mainline") {
Expand Down Expand Up @@ -123,14 +163,18 @@ async fn pipeline_new_and_report(
archs: &str,
msg: &Message,
) -> ResponseResult<()> {
match pipeline_new(
pool,
git_branch,
None,
None,
packages,
archs,
&JobSource::Telegram(msg.chat.id.0),
match wait_with_send_typing(
pipeline_new(
pool,
git_branch,
None,
None,
packages,
archs,
JobSource::Telegram(msg.chat.id.0),
),
bot,
msg.chat.id.0,
)
.await
{
Expand All @@ -155,6 +199,7 @@ async fn pipeline_new_and_report(
.await?;
}
}

Ok(())
}

Expand Down Expand Up @@ -266,7 +311,13 @@ async fn create_pipeline_from_pr(
msg: &Message,
bot: &Bot,
) -> ResponseResult<()> {
match pipeline_new_pr(pool, pr_number, archs, &JobSource::Telegram(msg.chat.id.0)).await {
match wait_with_send_typing(
pipeline_new_pr(pool, pr_number, archs, JobSource::Telegram(msg.chat.id.0)),
bot,
msg.chat.id.0,
)
.await
{
Ok(pipeline) => {
bot.send_message(
msg.chat.id,
Expand Down Expand Up @@ -299,10 +350,6 @@ async fn create_pipeline_from_pr(

#[tracing::instrument(skip(bot, msg, pool))]
pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> ResponseResult<()> {
bot.send_chat_action(msg.chat.id, ChatAction::Typing)
.send()
.instrument(tracing::info_span!("send_chat_action"))
.await?;
match cmd {
Command::Help => {
bot.send_message(msg.chat.id, Command::descriptions().to_string())
Expand Down Expand Up @@ -374,7 +421,7 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
)
.await?;
}
Command::Status => match status(pool).await {
Command::Status => match wait_with_send_typing(status(pool), &bot, msg.chat.id.0).await {
Ok(status) => {
bot.send_message(msg.chat.id, status)
.parse_mode(ParseMode::MarkdownV2)
Expand Down Expand Up @@ -472,18 +519,22 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
}
};

match buildit_utils::github::open_pr(
app_private_key,
&token,
id,
OpenPRRequest {
git_ref: parts[1].to_owned(),
abbs_path: ARGS.abbs_path.clone(),
packages: parts[2].to_owned(),
title: parts[0].to_string(),
tags: tags.clone(),
archs: archs.clone(),
},
match wait_with_send_typing(
buildit_utils::github::open_pr(
app_private_key,
&token,
id,
OpenPRRequest {
git_ref: parts[1].to_owned(),
abbs_path: ARGS.abbs_path.clone(),
packages: parts[2].to_owned(),
title: parts[0].to_string(),
tags: tags.clone(),
archs: archs.clone(),
},
),
&bot,
msg.chat.id.0,
)
.await
{
Expand Down Expand Up @@ -518,7 +569,8 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
.await?;
return Ok(());
} else {
let resp = login_github(&msg, arguments).await;
let resp =
wait_with_send_typing(login_github(&msg, arguments), &bot, msg.chat.id.0).await;

match resp {
Ok(_) => bot.send_message(msg.chat.id, "Login successful!").await?,
Expand Down Expand Up @@ -551,7 +603,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
};

// get topic of pr
match crab.pulls("AOSC-Dev", "aosc-os-abbs").get(pr_number).await {
match wait_with_send_typing(
crab.pulls("AOSC-Dev", "aosc-os-abbs").get(pr_number),
&bot,
msg.chat.id.0,
)
.await
{
Ok(pr) => match dickens::topic::report(
pr.head.ref_field.as_str(),
ARGS.local_repo.clone(),
Expand All @@ -560,10 +618,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
{
Ok(report) => {
// post report as github comment
match crab
.issues("AOSC-Dev", "aosc-os-abbs")
.create_comment(pr_number, report)
.await
match wait_with_send_typing(
crab.issues("AOSC-Dev", "aosc-os-abbs")
.create_comment(pr_number, report),
&bot,
msg.chat.id.0,
)
.await
{
Ok(comment) => {
bot.send_message(
Expand Down Expand Up @@ -617,13 +678,18 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
let arch = parts[0];
let ty = parts[1];
let client = reqwest::Client::new();
match client
.get(format!(
"https://aosc-packages.cth451.me/{}/{}/stable?type=json&page=all",
ty, arch
))
.send()
.await

match wait_with_send_typing(
client
.get(format!(
"https://aosc-packages.cth451.me/{}/{}/stable?type=json&page=all",
ty, arch
))
.send(),
&bot,
msg.chat.id.0,
)
.await
{
Ok(resp) => match resp.json::<QAResponse>().await {
Ok(qa) => {
Expand Down Expand Up @@ -668,22 +734,24 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
.await?;
}
Command::Restart(arguments) => match str::parse::<i32>(&arguments) {
Ok(job_id) => match job_restart(pool, job_id).await {
Ok(new_job) => {
bot.send_message(
msg.chat.id,
truncate(&format!("Restarted as job #{}", new_job.id)),
)
.await?;
}
Err(err) => {
bot.send_message(
msg.chat.id,
truncate(&format!("Failed to restart job: {err:?}")),
)
.await?;
Ok(job_id) => {
match wait_with_send_typing(job_restart(pool, job_id), &bot, msg.chat.id.0).await {
Ok(new_job) => {
bot.send_message(
msg.chat.id,
truncate(&format!("Restarted as job #{}", new_job.id)),
)
.await?;
}
Err(err) => {
bot.send_message(
msg.chat.id,
truncate(&format!("Failed to restart job: {err:?}")),
)
.await?;
}
}
},
}
Err(err) => {
bot.send_message(msg.chat.id, truncate(&format!("Bad job ID: {err:?}")))
.await?;
Expand All @@ -708,7 +776,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
}
};

let token = match get_github_token(&msg.chat.id, secret).await {
let token = match wait_with_send_typing(
get_github_token(&msg.chat.id, secret),
&bot,
msg.chat.id.0,
)
.await
{
Ok(s) => s.access_token,
Err(e) => {
bot.send_message(msg.chat.id, truncate(&format!("Got error: {e:?}")))
Expand All @@ -730,7 +804,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
}
};

let user = match get_user(pool.clone(), msg.chat.id, token.clone()).await {
let user = match wait_with_send_typing(
get_user(pool.clone(), msg.chat.id, token.clone()),
&bot,
msg.chat.id.0,
)
.await
{
Ok(user) => user,
Err(err) => {
bot.send_message(
Expand All @@ -754,7 +834,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
}
let coauthor = coauthor_parts.join(" ");

match find_update_and_update_checksum(&package, &ARGS.abbs_path, &coauthor).await {
match wait_with_send_typing(
find_update_and_update_checksum(&package, &ARGS.abbs_path, &coauthor),
&bot,
msg.chat.id.0,
)
.await
{
Ok(f) => {
match buildit_utils::github::open_pr(
app_private_key,
Expand Down Expand Up @@ -799,7 +885,7 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
}
};
}
Command::Roll => match roll().await {
Command::Roll => match wait_with_send_typing(roll(), &bot, msg.chat.id.0).await {
Ok(pkgs) => {
let mut s = String::new();
for i in pkgs {
Expand Down
Loading
Loading