Skip to content

Commit

Permalink
simplifying
Browse files Browse the repository at this point in the history
  • Loading branch information
deankarn committed Oct 8, 2023
1 parent f56a70f commit fbd1b3b
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 54 deletions.
2 changes: 1 addition & 1 deletion relay-backend-postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod migrations;
mod postgres;

pub use postgres::PgStore;
pub use postgres::{EnqueueMode, NewJob, PgStore};
200 changes: 199 additions & 1 deletion relay-backend-postgres/src/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(clippy::cast_possible_truncation)]
use crate::migrations::{run_migrations, Migration};
use async_trait::async_trait;
use chrono::{TimeZone, Utc};
use chrono::{DateTime, TimeZone, Utc};
use deadpool_postgres::{
ClientWrapper, GenericClient, Hook, HookError, Manager, ManagerConfig, Pool, PoolError,
RecyclingMethod,
Expand All @@ -24,6 +24,7 @@ use tokio_postgres::{Config as PostgresConfig, Row};
use tokio_stream::{Stream, StreamExt};
use tracing::{debug, warn};

// TODO: Update column `data` -> `payload` to keep terminology consistent..
const MIGRATIONS: [Migration; 2] = [
Migration::new(
"1678464484380_initialize.sql",
Expand All @@ -38,6 +39,44 @@ const MIGRATIONS: [Migration; 2] = [
/// Is the Postgres backend Job type.
pub type Job = CoreJob<Vec<u8>, Vec<u8>>;

/// This is a custom enqueue mode that determines the behaviour of the enqueue function.
pub enum EnqueueMode {
/// This ensures the Job is unique by Job ID and will return an error id any Job already exists.
Unique,
/// This will silently do nothing if the Job that already exists.
Ignore,
/// This will replace the existing Job with the new Job changing the job to be immediately no longer in-flight.
Replace,
}

pub struct NewJob<'a> {
/// The unique Job ID which is also CAN be used to ensure the Job is a singleton.
pub id: &'a str,

/// Is used to differentiate different job types that can be picked up by job runners.
pub queue: &'a str,

/// Denotes the duration, in seconds, after a Job has started processing or since the last
/// heartbeat request occurred before considering the Job failed and being put back into the
/// queue.
pub timeout: i32,

/// Determines how many times the Job can be retried, due to timeouts, before being considered
/// permanently failed. Infinite retries are supported by using a negative number eg. -1
pub max_retries: Option<i32>,

/// The raw JSON payload that the job runner will receive.
pub payload: &'a [u8],

/// The raw JSON payload that the job runner will receive.
pub state: Option<&'a [u8]>,

/// With this you can optionally schedule/set a Job to be run only at a specific time in the
/// future. This option should mainly be used for one-time jobs and scheduled jobs that have
/// the option of being self-perpetuated in combination with the reschedule endpoint.
pub run_at: Option<DateTime<Utc>>,
}

/// Postgres backing store
pub struct PgStore {
pool: Pool,
Expand Down Expand Up @@ -155,6 +194,165 @@ impl PgStore {
}
Ok(Self { pool })
}

/// Creates a batch of Jobs to be processed in a single write transaction.
///
/// NOTES: If the number of jobs passed is '1' then those will return a `JobExists` error
/// identifying the job as already existing.
/// If there are more than one jobs this function will not return an error for conflicts
/// in Job ID, but rather silently drop the record using an `ON CONFLICT DO NOTHING`.
/// If you need to have a Conflict error returned pass a single Job instead.
///
/// # Errors
///
/// Will return `Err` if there is any communication issues with the backend Postgres DB.
#[tracing::instrument(name = "pg_enqueue", level = "debug", skip_all, fields(jobs = jobs.len()))]
pub async fn enqueue_new<'a>(&self, mode: EnqueueMode, jobs: &[NewJob<'a>]) -> Result<()> {
let mut client = self.pool.get().await.map_err(|e| Error::Backend {
message: e.to_string(),
is_retryable: is_retryable_pool(e),
})?;

let transaction = client.transaction().await.map_err(|e| Error::Backend {
message: e.to_string(),
is_retryable: is_retryable(e),
})?;

let stmt = match mode {
EnqueueMode::Unique => transaction
.prepare_cached(
r#"INSERT INTO jobs (
id,
queue,
timeout,
max_retries,
retries_remaining,
data,
state,
updated_at,
created_at,
run_at
)
VALUES ($1, $2, $3, $4, $4, $5, $6, $7, $7, $8)"#,
)
.await
.map_err(|e| Error::Backend {
message: e.to_string(),
is_retryable: is_retryable(e),
})?,
EnqueueMode::Ignore => transaction
.prepare_cached(
r#"INSERT INTO jobs (
id,
queue,
timeout,
max_retries,
retries_remaining,
data,
state,
updated_at,
created_at,
run_at
)
VALUES ($1, $2, $3, $4, $4, $5, $6, $7, $7, $8)
ON CONFLICT DO NOTHING"#,
)
.await
.map_err(|e| Error::Backend {
message: e.to_string(),
is_retryable: is_retryable(e),
})?,
EnqueueMode::Replace => transaction
.prepare_cached(
r#"INSERT INTO jobs (
id,
queue,
timeout,
max_retries,
retries_remaining,
data,
state,
updated_at,
created_at,
run_at
)
VALUES ($1, $2, $3, $4, $4, $5, $6, $7, $7, $8)
ON CONFLICT UPDATE SET
timeout = EXCLUDED.timeout,
max_retries = EXCLUDED.max_retries,
retries_remaining = EXCLUDED.max_retries,
data = EXCLUDED.data,
state = EXCLUDED.state,
updated_at = EXCLUDED.updated_at,
run_at = EXCLUDED.run_at,
in_flight = false"#,
)
.await
.map_err(|e| Error::Backend {
message: e.to_string(),
is_retryable: is_retryable(e),
})?,
};

let mut counts = HashMap::new();

for job in jobs {
let now = Utc::now().naive_utc();
let run_at = if let Some(run_at) = job.run_at {
run_at.naive_utc()
} else {
now
};

transaction
.execute(
&stmt,
&[
&job.id,
&job.queue,
&Interval::from_duration(chrono::Duration::seconds(i64::from(job.timeout))),
&job.max_retries.unwrap_or(-1),
&job.payload,
&job.state,
&now,
&run_at,
],
)
.await
.map_err(|e| {
if let Some(&SqlState::UNIQUE_VIOLATION) = e.code() {
Error::JobExists {
job_id: job.id.to_string(),
queue: job.queue.to_string(),
}
} else {
Error::Backend {
message: e.to_string(),
is_retryable: is_retryable(e),
}
}
})?;

match counts.entry(job.queue) {
Entry::Occupied(mut o) => *o.get_mut() += 1,
Entry::Vacant(v) => {
v.insert(1);
}
};
}

transaction.commit().await.map_err(|e| Error::Backend {
message: e.to_string(),
is_retryable: is_retryable(e),
})?;

for (queue, count) in counts {
counter!("enqueued", count, "queue" => queue.to_string());
}

debug!("enqueued jobs");
Ok(())
}
}

#[async_trait]
Expand Down
3 changes: 1 addition & 2 deletions relay-frontend-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ async-channel.workspace = true
axum = { version = "0.6.20", default-features = false, features = ["http1", "http2", "json","matched-path","original-uri","query","tokio","tower-log"] }
tower-http = { version = "0.4.4", features = ["trace"] }
uuid = { workspace = true, features = ["v4"] }

relay-backend-postgres = { version = "*", path="../relay-backend-postgres"}

[dev-dependencies]
portpicker = "0.1.1"
relay-backend-postgres = { version = "*", path="../relay-backend-postgres"}
chrono.workspace = true
114 changes: 65 additions & 49 deletions relay-frontend-http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use axum::response::{IntoResponse, Response};
use axum::routing::{delete, get, head, patch, post, put};
use axum::{Json, Router};
use metrics::increment_counter;
use relay_backend_postgres::{EnqueueMode, NewJob, PgStore};
use relay_core::{Backend, Error, Job};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
Expand Down Expand Up @@ -97,53 +98,59 @@ pub struct Server;
// }
// }

#[derive(Deserialize)]
struct Jobs(Vec<Job<Box<RawValue>, Box<RawValue>>>);

impl From<Jobs> for Vec<Job<Vec<u8>, Vec<u8>>> {
fn from(value: Jobs) -> Self {
value
.0
.into_iter()
.map(|j| Job {
id: j.id,
queue: j.queue,
timeout: j.timeout,
max_retries: j.max_retries,
payload: j.payload.get().as_bytes().to_vec(),
state: j.state.map(|s| s.get().as_bytes().to_vec()),
run_at: j.run_at,
updated_at: j.updated_at,
})
.collect()
}
}
// #[derive(Deserialize)]
// struct Jobs(Vec<Job<Box<RawValue>, Box<RawValue>>>);
//
// impl From<Jobs> for Vec<Job<Vec<u8>, Vec<u8>>> {
// fn from(value: Jobs) -> Self {
// value
// .0
// .into_iter()
// .map(|j| Job {
// id: j.id,
// queue: j.queue,
// timeout: j.timeout,
// max_retries: j.max_retries,
// payload: j.payload.get().as_bytes().to_vec(),
// state: j.state.map(|s| s.get().as_bytes().to_vec()),
// run_at: j.run_at,
// updated_at: j.updated_at,
// })
// .collect()
// }
// }

#[tracing::instrument(name = "http_enqueue", level = "debug", skip_all)]
async fn enqueue<BE>(State(state): State<Arc<BE>>, jobs: Json<Jobs>) -> Response
where
BE: Backend<Vec<u8>, Vec<u8>>,
{
async fn enqueue(
State(state): State<Arc<PgStore>>,
jobs: Json<Vec<Job<Box<RawValue>, Box<RawValue>>>>,
) -> Response {
increment_counter!("http_request", "endpoint" => "enqueue");

let input: Vec<Job<Vec<u8>, Vec<u8>>> = jobs.0.into();
// let input: Vec<Job<Vec<u8>, Vec<u8>>> = jobs
// .0
// .0
// .into_iter()
// .map(|j| Job {
// id: j.id,
// queue: j.queue,
// timeout: j.timeout,
// max_retries: j.max_retries,
// payload: j.payload.get().as_bytes().to_vec(),
// state: j.state.map(|s| s.get().as_bytes().to_vec()),
// run_at: j.run_at,
// updated_at: j.updated_at,
// })
// .collect();

if let Err(e) = state.enqueue(&input).await {
// let input: Vec<Job<Vec<u8>, Vec<u8>>> = jobs.0.into();
let input: Vec<NewJob> = jobs
.0
.iter()
.map(|j| NewJob {
id: &j.id,
queue: &j.queue,
timeout: j.timeout,
max_retries: if j.max_retries == -1 {
// temporary until client updated
None
} else {
Some(j.max_retries)
},
payload: j.payload.get().as_bytes(),
state: j.state.as_ref().map(|s| s.get().as_bytes()),
run_at: j.run_at,
})
.collect();

if let Err(e) = state
.enqueue_new(EnqueueMode::Unique, &input.as_slice())
.await
{
increment_counter!("errors", "endpoint" => "enqueue", "type" => e.error_type());
match e {
Error::Backend { .. } => {
Expand Down Expand Up @@ -313,9 +320,8 @@ impl Server {
/// Will panic the reaper async thread fails, which can only happen if the timer and channel
/// both die.
#[inline]
pub async fn run<BE, F>(backend: Arc<BE>, addr: &str, shutdown: F) -> anyhow::Result<()>
pub async fn run<F>(backend: Arc<PgStore>, addr: &str, shutdown: F) -> anyhow::Result<()>
where
BE: Backend<Vec<u8>, Vec<u8>> + Send + Sync + 'static,
F: Future<Output = ()>,
{
let app = Server::init_app(backend);
Expand All @@ -328,10 +334,20 @@ impl Server {
Ok(())
}

pub(crate) fn init_app<BE>(backend: Arc<BE>) -> Router
where
BE: Backend<Vec<u8>, Vec<u8>> + Send + Sync + 'static,
{
pub(crate) fn init_app(backend: Arc<PgStore>) -> Router {
// TODO: Can in-flight be replaced with the run_id, where NULL = in-flight = false, else true
//
// POST /v1/queues/jobs - accept optional query param for mode of operation?
// - POST is interesting as it has different modes of operation:
// - (Default behaviour) Unique enqueue, error on duplicate(first one encountered, entire transaction aborted.
// - Do nothing if already exists.
// - Replace if already exists - This is exactly like PUT/reschedule except queue and job id can't be changed.
//
// GET /v1/queues/:queue/jobs/:id
// HEAD /v1/queues/:queue/jobs/:id
// PUT /v1/queues/:queue/jobs/:id - Accepts entire Job, allowing rescheduling into different queue even if desired.
// DELETE /v1/queues/:queue/jobs/:id
// PATCH /v1/queues/:queue/jobs/:id - updates state + updated_at + expires_at only
Router::new()
.route("/v1/queues/jobs", post(enqueue))
// .route("/v1/queues/jobs", put(reschedule))
Expand Down
Loading

0 comments on commit fbd1b3b

Please sign in to comment.