Skip to content

Commit

Permalink
feat: allow server/worker to run without telegram/ssh key
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Mar 11, 2024
1 parent 3a004ff commit 82a45b4
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 91 deletions.
3 changes: 2 additions & 1 deletion server/src/bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ pub async fn answer(bot: Bot, msg: Message, cmd: Command, pool: DbPool) -> Respo
let secret = match ARGS.github_secret.as_ref() {
Some(s) => s,
None => {
bot.send_message(msg.chat.id, "GITHUB_SECRET is not set").await?;
bot.send_message(msg.chat.id, "GITHUB_SECRET is not set")
.await?;
return Ok(());
}
};
Expand Down
43 changes: 27 additions & 16 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,31 @@ async fn main() -> anyhow::Result<()> {
let manager = ConnectionManager::<PgConnection>::new(&ARGS.database_url);
let pool = Pool::builder().test_on_check_out(true).build(manager)?;

let bot = Bot::from_env();
let mut handles = vec![];
let bot = if std::env::var("TELOXIDE_TOKEN").is_ok() {
tracing::info!("Starting telegram bot");
let bot = Bot::from_env();

let handler =
Update::filter_message().branch(dptree::entry().filter_command::<Command>().endpoint(
|bot: Bot, pool: DbPool, msg: Message, cmd: Command| async move {
answer(bot, msg, cmd, pool).await
},
));
let handler =
Update::filter_message().branch(dptree::entry().filter_command::<Command>().endpoint(
|bot: Bot, pool: DbPool, msg: Message, cmd: Command| async move {
answer(bot, msg, cmd, pool).await
},
));

let mut telegram = Dispatcher::builder(bot.clone(), handler)
// Pass the shared state to the handler as a dependency.
.dependencies(dptree::deps![pool.clone()])
.enable_ctrlc_handler()
.build();
let mut telegram = Dispatcher::builder(bot.clone(), handler)
// Pass the shared state to the handler as a dependency.
.dependencies(dptree::deps![pool.clone()])
.enable_ctrlc_handler()
.build();

tracing::info!("Starting http server");
handles.push(tokio::spawn(async move { telegram.dispatch().await }));
Some(bot)
} else {
None
};

tracing::info!("Starting http server");
// build our application with a route
let serve_dir = ServeDir::new("frontend/dist")
.not_found_service(ServeFile::new("frontend/dist/index.html"));
Expand All @@ -54,11 +62,14 @@ async fn main() -> anyhow::Result<()> {
.with_state(state)
.layer(tower_http::trace::TraceLayer::new_for_http());

tracing::debug!("listening on 127.0.0.1:3000");
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
tokio::spawn(async { axum::serve(listener, app).await.unwrap() });
handles.push(tokio::spawn(async {
axum::serve(listener, app).await.unwrap()
}));

telegram.dispatch().await;
for handle in handles {
handle.await?;
}

Ok(())
}
72 changes: 41 additions & 31 deletions server/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn ping() -> &'static str {
#[derive(Clone)]
pub struct AppState {
pub pool: DbPool,
pub bot: Bot,
pub bot: Option<Bot>,
}

// learned from https://github.com/tokio-rs/axum/blob/main/examples/anyhow-error-response/src/main.rs
Expand Down Expand Up @@ -254,7 +254,7 @@ pub async fn handle_success_message(
job: &Job,
pipeline: &Pipeline,
req: &WorkerJobUpdateRequest,
bot: &Bot,
bot: &Option<Bot>,
retry: Option<u8>,
) -> HandleSuccessResult {
match &req.result {
Expand All @@ -270,23 +270,28 @@ pub async fn handle_success_message(
let success = *build_success && *pushpkg_success;

if pipeline.source == "telegram" {
let s = to_html_build_result(
&pipeline,
&job,
&job_ok,
&req.hostname,
&req.arch,
success,
);

if let Err(e) = bot
.send_message(ChatId(pipeline.telegram_user.unwrap()), &s)
.parse_mode(ParseMode::Html)
.disable_web_page_preview(true)
.await
{
error!("{}", e);
return update_retry(retry);
if let Some(bot) = bot {
let s = to_html_build_result(
&pipeline,
&job,
&job_ok,
&req.hostname,
&req.arch,
success,
);

if let Err(e) = bot
.send_message(ChatId(pipeline.telegram_user.unwrap()), &s)
.parse_mode(ParseMode::Html)
.disable_web_page_preview(true)
.await
{
error!("{}", e);
return update_retry(retry);
}
} else {
error!("Telegram bot not configured");
return HandleSuccessResult::DoNotRetry;
}
}

Expand Down Expand Up @@ -465,18 +470,23 @@ pub async fn handle_success_message(
}
JobResult::Error(error) => {
if pipeline.source == "telegram" {
if let Err(e) = bot
.send_message(
ChatId(pipeline.telegram_user.unwrap()),
format!(
"{}({}) build packages: {:?} Got Error: {}",
req.hostname, job.arch, pipeline.packages, error
),
)
.await
{
error!("{e}");
return update_retry(retry);
if let Some(bot) = bot {
if let Err(e) = bot
.send_message(
ChatId(pipeline.telegram_user.unwrap()),
format!(
"{}({}) build packages: {:?} Got Error: {}",
req.hostname, job.arch, pipeline.packages, error
),
)
.await
{
error!("{e}");
return update_retry(retry);
}
} else {
error!("Telegram bot not configured");
return HandleSuccessResult::DoNotRetry;
}
} else if pipeline.source == "github" {
let crab = match octocrab::Octocrab::builder()
Expand Down
88 changes: 49 additions & 39 deletions worker/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::Args;
use chrono::Local;
use common::{JobOk, WorkerJobUpdateRequest, WorkerPollResponse};
use common::{JobOk, WorkerJobUpdateRequest, WorkerPollRequest, WorkerPollResponse};
use log::{error, info, warn};
use std::{
path::Path,
Expand Down Expand Up @@ -203,20 +203,22 @@ async fn build(
}

if failed_package.is_none() {
pushpkg_success = run_logged_with_retry(
"pushpkg",
&[
"--host",
&args.rsync_host,
"-i",
&args.upload_ssh_key,
"maintainers",
&job.git_branch,
],
&output_path,
&mut logs,
)
.await?;
if let Some(upload_ssh_key) = &args.upload_ssh_key {
pushpkg_success = run_logged_with_retry(
"pushpkg",
&[
"--host",
&args.rsync_host,
"-i",
upload_ssh_key,
"maintainers",
&job.git_branch,
],
&output_path,
&mut logs,
)
.await?;
}
}
}
}
Expand All @@ -232,31 +234,33 @@ async fn build(
let path = format!("/tmp/{file_name}");
fs::write(&path, logs).await?;

let output = Command::new("scp")
.args([
"-i",
&args.upload_ssh_key,
&path,
"[email protected]:/buildit/logs",
])
.output()
.await?;

let log_url = if output.status.success() {
fs::remove_file(path).await?;
Some(format!("https://buildit.aosc.io/logs/{file_name}"))
} else {
error!("scp return error code: {:?}", output.status.code());
error!("`scp' stdout: {}", String::from_utf8_lossy(&output.stdout));
error!("`scp' stderr: {}", String::from_utf8_lossy(&output.stderr));
let mut log_url = None;
if let Some(upload_ssh_key) = &args.upload_ssh_key {
let output = Command::new("scp")
.args([
"-i",
&upload_ssh_key,
&path,
"[email protected]:/buildit/logs",
])
.output()
.await?;
if output.status.success() {
fs::remove_file(&path).await?;
log_url = Some(format!("https://buildit.aosc.io/logs/{file_name}"));
} else {
error!("scp return error code: {:?}", output.status.code());
error!("`scp' stdout: {}", String::from_utf8_lossy(&output.stdout));
error!("`scp' stderr: {}", String::from_utf8_lossy(&output.stderr));
};
}

if log_url.is_none() {
let dir = Path::new("./push_failed_logs");
let to = dir.join(file_name);
fs::create_dir_all(dir).await?;
fs::copy(path, to).await?;

None
};
fs::copy(&path, to).await?;
}

let result = WorkerJobUpdateRequest {
hostname: gethostname::gethostname().to_string_lossy().to_string(),
Expand All @@ -283,9 +287,15 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
info!("Receiving new messages");

let client = reqwest::Client::new();
let req = WorkerPollRequest {
hostname: gethostname::gethostname().to_string_lossy().to_string(),
arch: args.arch.clone(),
};

loop {
if let Some(job) = client
.post(format!("https://{}/api/worker/poll", args.server))
.post(format!("{}/api/worker/poll", args.server))
.json(&req)
.send()
.await?
.json::<Option<WorkerPollResponse>>()
Expand All @@ -298,15 +308,15 @@ async fn build_worker_inner(args: &Args) -> anyhow::Result<()> {
// post result
warn!("Finished to run job {:?} with result {:?}", job, result);
client
.post(format!("https://{}/api/worker/job_update", args.server))
.post(format!("{}/api/worker/job_update", args.server))
.json(&result)
.send()
.await?;
}
Err(err) => {
warn!("Failed to run job {:?} with err {:?}", job, err);
client
.post(format!("https://{}/api/worker/job_update", args.server))
.post(format!("{}/api/worker/job_update", args.server))
.json(&WorkerJobUpdateRequest {
hostname: gethostname::gethostname().to_string_lossy().to_string(),
arch: args.arch.clone(),
Expand Down
2 changes: 1 addition & 1 deletion worker/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub async fn heartbeat_worker_inner(args: &Args) -> anyhow::Result<()> {
loop {
info!("Sending heartbeat");
client
.post(format!("https://{}/api/worker/heartbeat", args.server))
.post(format!("{}/api/worker/heartbeat", args.server))
.json(&WorkerHeartbeatRequest {
hostname: gethostname::gethostname().to_string_lossy().to_string(),
arch: args.arch.clone(),
Expand Down
6 changes: 3 additions & 3 deletions worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub mod heartbeat;
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// buildit server hostname
#[arg(short = 'H', long, env = "BUILDIT_HOSTNAME")]
/// buildit server url e.g. https://buildit.aosc.io
#[arg(short = 'H', long, env = "BUILDIT_SERVER")]
pub server: String,

/// Architecture that can build
Expand All @@ -30,7 +30,7 @@ pub struct Args {

/// SSH key for repo uploading
#[arg(short = 's', long, env = "BUILDIT_SSH_KEY")]
pub upload_ssh_key: String,
pub upload_ssh_key: Option<String>,

/// rsync host (server)
#[arg(
Expand Down

0 comments on commit 82a45b4

Please sign in to comment.