Skip to content

Commit

Permalink
feat: add worker assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Mar 11, 2024
1 parent c1920a4 commit 9a5f667
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
ALTER TABLE jobs DROP CONSTRAINT assigned_worker;
ALTER TABLE jobs DROP COLUMN assigned_worker_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Your SQL goes here
ALTER TABLE jobs ADD COLUMN assigned_worker_id INT;
ALTER TABLE jobs ADD CONSTRAINT assigned_worker FOREIGN KEY(assigned_worker_id) REFERENCES workers(id);
1 change: 1 addition & 0 deletions server/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct Job {
pub finish_time: Option<chrono::DateTime<chrono::Utc>>,
pub error_message: Option<String>,
pub elapsed_secs: Option<i64>,
pub assigned_worker_id: Option<i32>,
}

#[derive(Insertable)]
Expand Down
20 changes: 15 additions & 5 deletions server/src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::time::Duration;

use crate::{
api,
models::{Job, NewWorker, Pipeline},
models::{Job, NewWorker, Pipeline, Worker},
DbPool,
};
use anyhow::Context;
Expand Down Expand Up @@ -146,14 +144,25 @@ pub async fn worker_poll(
use crate::schema::jobs::dsl::*;
match jobs
.filter(status.eq("created"))
.filter(arch.eq(payload.arch))
.filter(arch.eq(&payload.arch))
.first::<Job>(conn)
.optional()?
{
Some(job) => {
// find worker id
let worker = crate::schema::workers::dsl::workers
.filter(crate::schema::workers::dsl::hostname.eq(&payload.hostname))
.filter(crate::schema::workers::dsl::arch.eq(&payload.arch))
.first::<Worker>(conn)?;

// remove if already allocated to the worker
diesel::update(jobs.filter(assigned_worker_id.eq(worker.id)))
.set((status.eq("created"), assigned_worker_id.eq(None::<i32>)))
.execute(conn)?;

// allocate to the worker
diesel::update(&job)
.set(status.eq("assigned"))
.set((status.eq("assigned"), assigned_worker_id.eq(worker.id)))
.execute(conn)?;

// get pipeline the job belongs to
Expand Down Expand Up @@ -234,6 +243,7 @@ pub async fn worker_job_update(
log_url.eq(res.log_url),
finish_time.eq(chrono::Utc::now()),
elapsed_secs.eq(res.elapsed_secs),
assigned_worker_id.eq(None::<i32>),
))
.execute(&mut conn)?;
}
Expand Down
8 changes: 3 additions & 5 deletions server/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ diesel::table! {
finish_time -> Nullable<Timestamptz>,
error_message -> Nullable<Text>,
elapsed_secs -> Nullable<Int8>,
assigned_worker_id -> Nullable<Int4>,
}
}

Expand Down Expand Up @@ -48,9 +49,6 @@ diesel::table! {
}

diesel::joinable!(jobs -> pipelines (pipeline_id));
diesel::joinable!(jobs -> workers (assigned_worker_id));

diesel::allow_tables_to_appear_in_same_query!(
jobs,
pipelines,
workers,
);
diesel::allow_tables_to_appear_in_same_query!(jobs, pipelines, workers,);

0 comments on commit 9a5f667

Please sign in to comment.