Skip to content

Commit

Permalink
fix: rewrite code to use async with transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Mar 14, 2024
1 parent fe1accb commit dfea3ee
Showing 1 changed file with 81 additions and 65 deletions.
146 changes: 81 additions & 65 deletions server/src/routes/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use crate::models::{Job, NewJob, Pipeline};
use crate::routes::{AnyhowError, AppState};
use anyhow::{bail, Context};
use axum::extract::{Json, Query, State};
use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl};
use octocrab::models::checks::CheckRunOutput;
use octocrab::models::CheckRunId;
use diesel::connection::{AnsiTransactionManager, TransactionManager};
use diesel::r2d2::PoolTransactionManager;
use diesel::{Connection, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl};
use serde::{Deserialize, Serialize};
use tokio::runtime::Handle;

Check warning on line 10 in server/src/routes/job.rs

View workflow job for this annotation

GitHub Actions / build

unused import: `tokio::runtime::Handle`
use tracing::{error, warn};

Check warning on line 11 in server/src/routes/job.rs

View workflow job for this annotation

GitHub Actions / build

unused import: `error`
Expand Down Expand Up @@ -170,6 +170,68 @@ pub struct JobRestartRequest {
pub struct JobRestartResponse {
job_id: i32,
}
async fn job_restart_in_transaction(
payload: &JobRestartRequest,
conn: &mut PgConnection,
) -> anyhow::Result<Job> {
let job = crate::schema::jobs::dsl::jobs
.find(payload.job_id)
.get_result::<Job>(conn)?;
let pipeline = crate::schema::pipelines::dsl::pipelines
.find(job.pipeline_id)
.get_result::<Pipeline>(conn)?;

// job must be finished
if job.status != "finished" {
bail!("Cannot restart the job unless it is finished");
}

// create a new job
use crate::schema::jobs;
let mut new_job = NewJob {
pipeline_id: job.pipeline_id,
packages: job.packages,
arch: job.arch.clone(),
creation_time: chrono::Utc::now(),
status: "created".to_string(),
github_check_run_id: None,
};

// create new github check run if the restarted job has one
if job.github_check_run_id.is_some() {
// authenticate with github app
match get_crab_github_installation().await {
Ok(Some(crab)) => {
match crab
.checks("AOSC-Dev", "aosc-os-abbs")
.create_check_run(format!("buildit {}", job.arch), &pipeline.git_sha)
.status(octocrab::params::checks::CheckRunStatus::Queued)
.send()
.await
{
Ok(check_run) => {
new_job.github_check_run_id = Some(check_run.id.0 as i64);
}
Err(err) => {
warn!("Failed to create check run: {}", err);
}
}
}
Ok(None) => {
// github app unavailable
}
Err(err) => {
warn!("Failed to get installation token: {}", err);
}
}
}

let new_job: Job = diesel::insert_into(jobs::table)
.values(&new_job)
.get_result(conn)
.context("Failed to create job")?;
Ok(new_job)
}

pub async fn job_restart(
State(AppState { pool, .. }): State<AppState>,
Expand All @@ -179,69 +241,23 @@ pub async fn job_restart(
.get()
.context("Failed to get db connection from pool")?;

let new_job = conn.transaction::<Job, anyhow::Error, _>(|conn| {
let job = crate::schema::jobs::dsl::jobs
.find(payload.job_id)
.get_result::<Job>(conn)?;
let pipeline = crate::schema::pipelines::dsl::pipelines
.find(job.pipeline_id)
.get_result::<Pipeline>(conn)?;

// job must be finished
if job.status != "finished" {
bail!("Cannot restart the job unless it is finished");
// manually handle transaction, since we want to use async in transaction
PoolTransactionManager::<AnsiTransactionManager>::begin_transaction(&mut conn)?;
match job_restart_in_transaction(&payload, &mut conn).await {
Ok(new_job) => {
PoolTransactionManager::<AnsiTransactionManager>::commit_transaction(&mut conn)?;
return Ok(Json(JobRestartResponse { job_id: new_job.id }));
}

// create a new job
use crate::schema::jobs;
let mut new_job = NewJob {
pipeline_id: job.pipeline_id,
packages: job.packages,
arch: job.arch.clone(),
creation_time: chrono::Utc::now(),
status: "created".to_string(),
github_check_run_id: None,
};

// create new github check run if the restarted job has one
if job.github_check_run_id.is_some() {
new_job.github_check_run_id = Handle::current().block_on(async {
// authenticate with github app
match get_crab_github_installation().await {
Ok(Some(crab)) => {
match crab
.checks("AOSC-Dev", "aosc-os-abbs")
.create_check_run(format!("buildit {}", job.arch), &pipeline.git_sha)
.status(octocrab::params::checks::CheckRunStatus::Queued)
.send()
.await
{
Ok(check_run) => {
return Some(check_run.id.0 as i64);
}
Err(err) => {
warn!("Failed to create check run: {}", err);
}
}
}
Ok(None) => {
// github app unavailable
}
Err(err) => {
warn!("Failed to get installation token: {}", err);
}
Err(err) => {
match PoolTransactionManager::<AnsiTransactionManager>::rollback_transaction(&mut conn)
{
Ok(()) => {
return Err(err.into());
}
return None;
});
Err(rollback_err) => {
return Err(err.context(rollback_err).into());
}
}
}

let new_job: Job = diesel::insert_into(jobs::table)
.values(&new_job)
.get_result(conn)
.context("Failed to create job")?;

Ok(new_job)
})?;

Ok(Json(JobRestartResponse { job_id: new_job.id }))
}
}

0 comments on commit dfea3ee

Please sign in to comment.