Skip to content

Commit

Permalink
Merge branch 'listen-task-bot-set-typing'
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Jun 12, 2024
2 parents bbe81ae + e88442b commit 1181b3a
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 69 deletions.
2 changes: 1 addition & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct WorkerHeartbeatRequest {
pub disk_free_space_bytes: i64,
pub worker_secret: String,
pub performance: Option<i64>,
pub internet_connectivity: Option<bool>
pub internet_connectivity: Option<bool>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
teloxide = { version = "0.12.2", features = ["macros"] }
timeago = { version = "0.4.2", features = ["chrono"] }
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync"] }
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread", "process", "sync", "time"] }
console = "0.15.8"
buildit-utils = { path = "../buildit-utils" }
jsonwebtoken = "9.2.0"
Expand Down
10 changes: 5 additions & 5 deletions server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tracing::warn;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub enum JobSource {
/// Telegram user/group
Telegram(i64),
Expand Down Expand Up @@ -59,7 +59,7 @@ pub async fn pipeline_new(
github_pr: Option<u64>,
packages: &str,
archs: &str,
source: &JobSource,
source: JobSource,
skip_git_fetch: bool,
) -> anyhow::Result<Pipeline> {
// sanitize archs arg
Expand Down Expand Up @@ -156,7 +156,7 @@ pub async fn pipeline_new(
let creator_user_id = user.map(|user| user.id);
("telegram", github_pr, Some(id), creator_user_id)
}
JobSource::Github(id) => ("github", Some(*id), None, None),
JobSource::Github(id) => ("github", Some(id), None, None),
JobSource::Manual => ("manual", github_pr, None, None),
};
let new_pipeline = NewPipeline {
Expand All @@ -167,7 +167,7 @@ pub async fn pipeline_new(
creation_time: chrono::Utc::now(),
source: source.to_string(),
github_pr: github_pr.map(|pr| pr as i64),
telegram_user: telegram_user.copied(),
telegram_user: telegram_user,
creator_user_id: creator_user_id,
};
let pipeline = diesel::insert_into(pipelines::table)
Expand Down Expand Up @@ -240,7 +240,7 @@ pub async fn pipeline_new_pr(
pool: DbPool,
pr: u64,
archs: Option<&str>,
source: &JobSource,
source: JobSource,
) -> anyhow::Result<Pipeline> {
match octocrab::instance()
.pulls("AOSC-Dev", "aosc-os-abbs")
Expand Down
206 changes: 146 additions & 60 deletions server/src/bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,22 @@ use rand::prelude::SliceRandom;
use rand::thread_rng;
use reqwest::ClientBuilder;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, fmt::Display};
use std::{
borrow::{Borrow, Cow},
fmt::Display,
future::Future,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use teloxide::{
prelude::*,
types::{ChatAction, ParseMode},
utils::command::BotCommands,
};
use tokio::time::sleep;
use tracing::{warn, Instrument};

#[derive(BotCommands, Clone, Debug)]
Expand Down Expand Up @@ -61,6 +71,36 @@ pub enum Command {
Roll,
}

async fn wait_with_send_typing<T, F: Future<Output = T>, B: Borrow<Bot>>(
f: F,
bot: B,
id: i64,
) -> T {
let is_done = Arc::new(AtomicBool::new(false));
let is_done_shared = is_done.clone();
let bot_shared: Bot = bot.borrow().clone();
tokio::spawn(async move {
loop {
if is_done_shared.load(Ordering::Relaxed) {
break;
}

bot_shared
.send_chat_action(ChatId(id), ChatAction::Typing)
.send()
.instrument(tracing::info_span!("send_chat_action"))
.await
.ok();

sleep(Duration::from_secs(5)).await;
}
});

let res = f.await;
is_done.store(true, Ordering::Relaxed);
res
}

fn handle_archs_args(archs: Vec<&str>) -> Vec<&str> {
let mut archs = archs;
if archs.contains(&"mainline") {
Expand Down Expand Up @@ -123,15 +163,19 @@ async fn pipeline_new_and_report(
archs: &str,
msg: &Message,
) -> ResponseResult<()> {
match pipeline_new(
pool,
git_branch,
None,
None,
packages,
archs,
&JobSource::Telegram(msg.chat.id.0),
false,
match wait_with_send_typing(
pipeline_new(
pool,
git_branch,
None,
None,
packages,
archs,
JobSource::Telegram(msg.chat.id.0),
false,
),
bot,
msg.chat.id.0,
)
.await
{
Expand All @@ -156,6 +200,7 @@ async fn pipeline_new_and_report(
.await?;
}
}

Ok(())
}

Expand Down Expand Up @@ -267,7 +312,13 @@ async fn create_pipeline_from_pr(
msg: &Message,
bot: &Bot,
) -> ResponseResult<()> {
match pipeline_new_pr(pool, pr_number, archs, &JobSource::Telegram(msg.chat.id.0)).await {
match wait_with_send_typing(
pipeline_new_pr(pool, pr_number, archs, JobSource::Telegram(msg.chat.id.0)),
bot,
msg.chat.id.0,
)
.await
{
Ok(pipeline) => {
bot.send_message(
msg.chat.id,
Expand Down Expand Up @@ -300,10 +351,6 @@ async fn create_pipeline_from_pr(

#[tracing::instrument(skip(bot, msg, pool))]
pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> ResponseResult<()> {
bot.send_chat_action(msg.chat.id, ChatAction::Typing)
.send()
.instrument(tracing::info_span!("send_chat_action"))
.await?;
match cmd {
Command::Help => {
bot.send_message(msg.chat.id, Command::descriptions().to_string())
Expand Down Expand Up @@ -375,7 +422,7 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
)
.await?;
}
Command::Status => match status(pool).await {
Command::Status => match wait_with_send_typing(status(pool), &bot, msg.chat.id.0).await {
Ok(status) => {
bot.send_message(msg.chat.id, status)
.parse_mode(ParseMode::MarkdownV2)
Expand Down Expand Up @@ -473,18 +520,22 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
}
};

match buildit_utils::github::open_pr(
app_private_key,
&token,
id,
OpenPRRequest {
git_ref: parts[1].to_owned(),
abbs_path: ARGS.abbs_path.clone(),
packages: parts[2].to_owned(),
title: parts[0].to_string(),
tags: tags.clone(),
archs: archs.clone(),
},
match wait_with_send_typing(
buildit_utils::github::open_pr(
app_private_key,
&token,
id,
OpenPRRequest {
git_ref: parts[1].to_owned(),
abbs_path: ARGS.abbs_path.clone(),
packages: parts[2].to_owned(),
title: parts[0].to_string(),
tags: tags.clone(),
archs: archs.clone(),
},
),
&bot,
msg.chat.id.0,
)
.await
{
Expand Down Expand Up @@ -519,7 +570,8 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
.await?;
return Ok(());
} else {
let resp = login_github(&msg, arguments).await;
let resp =
wait_with_send_typing(login_github(&msg, arguments), &bot, msg.chat.id.0).await;

match resp {
Ok(_) => bot.send_message(msg.chat.id, "Login successful!").await?,
Expand Down Expand Up @@ -552,7 +604,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
};

// get topic of pr
match crab.pulls("AOSC-Dev", "aosc-os-abbs").get(pr_number).await {
match wait_with_send_typing(
crab.pulls("AOSC-Dev", "aosc-os-abbs").get(pr_number),
&bot,
msg.chat.id.0,
)
.await
{
Ok(pr) => match dickens::topic::report(
pr.head.ref_field.as_str(),
ARGS.local_repo.clone(),
Expand All @@ -561,10 +619,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
{
Ok(report) => {
// post report as github comment
match crab
.issues("AOSC-Dev", "aosc-os-abbs")
.create_comment(pr_number, report)
.await
match wait_with_send_typing(
crab.issues("AOSC-Dev", "aosc-os-abbs")
.create_comment(pr_number, report),
&bot,
msg.chat.id.0,
)
.await
{
Ok(comment) => {
bot.send_message(
Expand Down Expand Up @@ -618,13 +679,18 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
let arch = parts[0];
let ty = parts[1];
let client = reqwest::Client::new();
match client
.get(format!(
"https://aosc-packages.cth451.me/{}/{}/stable?type=json&page=all",
ty, arch
))
.send()
.await

match wait_with_send_typing(
client
.get(format!(
"https://aosc-packages.cth451.me/{}/{}/stable?type=json&page=all",
ty, arch
))
.send(),
&bot,
msg.chat.id.0,
)
.await
{
Ok(resp) => match resp.json::<QAResponse>().await {
Ok(qa) => {
Expand Down Expand Up @@ -669,22 +735,24 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
.await?;
}
Command::Restart(arguments) => match str::parse::<i32>(&arguments) {
Ok(job_id) => match job_restart(pool, job_id).await {
Ok(new_job) => {
bot.send_message(
msg.chat.id,
truncate(&format!("Restarted as job #{}", new_job.id)),
)
.await?;
}
Err(err) => {
bot.send_message(
msg.chat.id,
truncate(&format!("Failed to restart job: {err:?}")),
)
.await?;
Ok(job_id) => {
match wait_with_send_typing(job_restart(pool, job_id), &bot, msg.chat.id.0).await {
Ok(new_job) => {
bot.send_message(
msg.chat.id,
truncate(&format!("Restarted as job #{}", new_job.id)),
)
.await?;
}
Err(err) => {
bot.send_message(
msg.chat.id,
truncate(&format!("Failed to restart job: {err:?}")),
)
.await?;
}
}
},
}
Err(err) => {
bot.send_message(msg.chat.id, truncate(&format!("Bad job ID: {err:?}")))
.await?;
Expand All @@ -709,7 +777,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
}
};

let token = match get_github_token(&msg.chat.id, secret).await {
let token = match wait_with_send_typing(
get_github_token(&msg.chat.id, secret),
&bot,
msg.chat.id.0,
)
.await
{
Ok(s) => s.access_token,
Err(e) => {
bot.send_message(msg.chat.id, truncate(&format!("Got error: {e:?}")))
Expand All @@ -731,7 +805,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
}
};

let user = match get_user(pool.clone(), msg.chat.id, token.clone()).await {
let user = match wait_with_send_typing(
get_user(pool.clone(), msg.chat.id, token.clone()),
&bot,
msg.chat.id.0,
)
.await
{
Ok(user) => user,
Err(err) => {
bot.send_message(
Expand All @@ -755,7 +835,13 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
}
let coauthor = coauthor_parts.join(" ");

match find_update_and_update_checksum(&package, &ARGS.abbs_path, &coauthor).await {
match wait_with_send_typing(
find_update_and_update_checksum(&package, &ARGS.abbs_path, &coauthor),
&bot,
msg.chat.id.0,
)
.await
{
Ok(f) => {
match buildit_utils::github::open_pr(
app_private_key,
Expand Down Expand Up @@ -800,7 +886,7 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
}
};
}
Command::Roll => match roll().await {
Command::Roll => match wait_with_send_typing(roll(), &bot, msg.chat.id.0).await {
Ok(pkgs) => {
let mut s = String::new();
for i in pkgs {
Expand Down
Loading

0 comments on commit 1181b3a

Please sign in to comment.