diff --git a/server/migrations/2024-03-10-162811_add_assigned_worker_id_to_jobs/down.sql b/server/migrations/2024-03-10-162811_add_assigned_worker_id_to_jobs/down.sql new file mode 100644 index 0000000..ec1fdef --- /dev/null +++ b/server/migrations/2024-03-10-162811_add_assigned_worker_id_to_jobs/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE jobs DROP CONSTRAINT assigned_worker; +ALTER TABLE jobs DROP COLUMN assigned_worker_id; diff --git a/server/migrations/2024-03-10-162811_add_assigned_worker_id_to_jobs/up.sql b/server/migrations/2024-03-10-162811_add_assigned_worker_id_to_jobs/up.sql new file mode 100644 index 0000000..92502ed --- /dev/null +++ b/server/migrations/2024-03-10-162811_add_assigned_worker_id_to_jobs/up.sql @@ -0,0 +1,3 @@ +-- Your SQL goes here +ALTER TABLE jobs ADD COLUMN assigned_worker_id INT; +ALTER TABLE jobs ADD CONSTRAINT assigned_worker FOREIGN KEY(assigned_worker_id) REFERENCES workers(id); diff --git a/server/src/models.rs b/server/src/models.rs index 566d8eb..f6e7570 100644 --- a/server/src/models.rs +++ b/server/src/models.rs @@ -50,6 +50,7 @@ pub struct Job { pub finish_time: Option>, pub error_message: Option, pub elapsed_secs: Option, + pub assigned_worker_id: Option, } #[derive(Insertable)] diff --git a/server/src/routes.rs b/server/src/routes.rs index 5c67ab4..ebaf058 100644 --- a/server/src/routes.rs +++ b/server/src/routes.rs @@ -1,8 +1,6 @@ -use std::time::Duration; - use crate::{ api, - models::{Job, NewWorker, Pipeline}, + models::{Job, NewWorker, Pipeline, Worker}, DbPool, }; use anyhow::Context; @@ -146,14 +144,25 @@ pub async fn worker_poll( use crate::schema::jobs::dsl::*; match jobs .filter(status.eq("created")) - .filter(arch.eq(payload.arch)) + .filter(arch.eq(&payload.arch)) .first::(conn) .optional()? { Some(job) => { + // find worker id + let worker = crate::schema::workers::dsl::workers + .filter(crate::schema::workers::dsl::hostname.eq(&payload.hostname)) + .filter(crate::schema::workers::dsl::arch.eq(&payload.arch)) + .first::(conn)?; + + // remove if already allocated to the worker + diesel::update(jobs.filter(assigned_worker_id.eq(worker.id))) + .set((status.eq("created"), assigned_worker_id.eq(None::))) + .execute(conn)?; + // allocate to the worker diesel::update(&job) - .set(status.eq("assigned")) + .set((status.eq("assigned"), assigned_worker_id.eq(worker.id))) .execute(conn)?; // get pipeline the job belongs to @@ -234,6 +243,7 @@ pub async fn worker_job_update( log_url.eq(res.log_url), finish_time.eq(chrono::Utc::now()), elapsed_secs.eq(res.elapsed_secs), + assigned_worker_id.eq(None::), )) .execute(&mut conn)?; } diff --git a/server/src/schema.rs b/server/src/schema.rs index 552daef..39a0937 100644 --- a/server/src/schema.rs +++ b/server/src/schema.rs @@ -18,6 +18,7 @@ diesel::table! { finish_time -> Nullable, error_message -> Nullable, elapsed_secs -> Nullable, + assigned_worker_id -> Nullable, } } @@ -48,9 +49,6 @@ diesel::table! { } diesel::joinable!(jobs -> pipelines (pipeline_id)); +diesel::joinable!(jobs -> workers (assigned_worker_id)); -diesel::allow_tables_to_appear_in_same_query!( - jobs, - pipelines, - workers, -); +diesel::allow_tables_to_appear_in_same_query!(jobs, pipelines, workers,);