From dfea3ee54af05e7f113a52bd1688cd03f7a8bbb6 Mon Sep 17 00:00:00 2001 From: Jiajie Chen Date: Thu, 14 Mar 2024 19:19:14 +0800 Subject: [PATCH] fix: rewrite code to use async with transaction --- server/src/routes/job.rs | 146 ++++++++++++++++++++++----------------- 1 file changed, 81 insertions(+), 65 deletions(-) diff --git a/server/src/routes/job.rs b/server/src/routes/job.rs index 5040b2d..83b34fa 100644 --- a/server/src/routes/job.rs +++ b/server/src/routes/job.rs @@ -3,9 +3,9 @@ use crate::models::{Job, NewJob, Pipeline}; use crate::routes::{AnyhowError, AppState}; use anyhow::{bail, Context}; use axum::extract::{Json, Query, State}; -use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl}; -use octocrab::models::checks::CheckRunOutput; -use octocrab::models::CheckRunId; +use diesel::connection::{AnsiTransactionManager, TransactionManager}; +use diesel::r2d2::PoolTransactionManager; +use diesel::{Connection, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl}; use serde::{Deserialize, Serialize}; use tokio::runtime::Handle; use tracing::{error, warn}; @@ -170,6 +170,68 @@ pub struct JobRestartRequest { pub struct JobRestartResponse { job_id: i32, } +async fn job_restart_in_transaction( + payload: &JobRestartRequest, + conn: &mut PgConnection, +) -> anyhow::Result { + let job = crate::schema::jobs::dsl::jobs + .find(payload.job_id) + .get_result::(conn)?; + let pipeline = crate::schema::pipelines::dsl::pipelines + .find(job.pipeline_id) + .get_result::(conn)?; + + // job must be finished + if job.status != "finished" { + bail!("Cannot restart the job unless it is finished"); + } + + // create a new job + use crate::schema::jobs; + let mut new_job = NewJob { + pipeline_id: job.pipeline_id, + packages: job.packages, + arch: job.arch.clone(), + creation_time: chrono::Utc::now(), + status: "created".to_string(), + github_check_run_id: None, + }; + + // create new github check run if the restarted job has one + if job.github_check_run_id.is_some() { + // authenticate with github app + match get_crab_github_installation().await { + Ok(Some(crab)) => { + match crab + .checks("AOSC-Dev", "aosc-os-abbs") + .create_check_run(format!("buildit {}", job.arch), &pipeline.git_sha) + .status(octocrab::params::checks::CheckRunStatus::Queued) + .send() + .await + { + Ok(check_run) => { + new_job.github_check_run_id = Some(check_run.id.0 as i64); + } + Err(err) => { + warn!("Failed to create check run: {}", err); + } + } + } + Ok(None) => { + // github app unavailable + } + Err(err) => { + warn!("Failed to get installation token: {}", err); + } + } + } + + let new_job: Job = diesel::insert_into(jobs::table) + .values(&new_job) + .get_result(conn) + .context("Failed to create job")?; + Ok(new_job) +} pub async fn job_restart( State(AppState { pool, .. }): State, @@ -179,69 +241,23 @@ pub async fn job_restart( .get() .context("Failed to get db connection from pool")?; - let new_job = conn.transaction::(|conn| { - let job = crate::schema::jobs::dsl::jobs - .find(payload.job_id) - .get_result::(conn)?; - let pipeline = crate::schema::pipelines::dsl::pipelines - .find(job.pipeline_id) - .get_result::(conn)?; - - // job must be finished - if job.status != "finished" { - bail!("Cannot restart the job unless it is finished"); + // manually handle transaction, since we want to use async in transaction + PoolTransactionManager::::begin_transaction(&mut conn)?; + match job_restart_in_transaction(&payload, &mut conn).await { + Ok(new_job) => { + PoolTransactionManager::::commit_transaction(&mut conn)?; + return Ok(Json(JobRestartResponse { job_id: new_job.id })); } - - // create a new job - use crate::schema::jobs; - let mut new_job = NewJob { - pipeline_id: job.pipeline_id, - packages: job.packages, - arch: job.arch.clone(), - creation_time: chrono::Utc::now(), - status: "created".to_string(), - github_check_run_id: None, - }; - - // create new github check run if the restarted job has one - if job.github_check_run_id.is_some() { - new_job.github_check_run_id = Handle::current().block_on(async { - // authenticate with github app - match get_crab_github_installation().await { - Ok(Some(crab)) => { - match crab - .checks("AOSC-Dev", "aosc-os-abbs") - .create_check_run(format!("buildit {}", job.arch), &pipeline.git_sha) - .status(octocrab::params::checks::CheckRunStatus::Queued) - .send() - .await - { - Ok(check_run) => { - return Some(check_run.id.0 as i64); - } - Err(err) => { - warn!("Failed to create check run: {}", err); - } - } - } - Ok(None) => { - // github app unavailable - } - Err(err) => { - warn!("Failed to get installation token: {}", err); - } + Err(err) => { + match PoolTransactionManager::::rollback_transaction(&mut conn) + { + Ok(()) => { + return Err(err.into()); } - return None; - }); + Err(rollback_err) => { + return Err(err.context(rollback_err).into()); + } + } } - - let new_job: Job = diesel::insert_into(jobs::table) - .values(&new_job) - .get_result(conn) - .context("Failed to create job")?; - - Ok(new_job) - })?; - - Ok(Json(JobRestartResponse { job_id: new_job.id })) + } }