From 5e94f6eca1d75507e044a77c77ccbbbc8a1b02c2 Mon Sep 17 00:00:00 2001 From: Jonathan Johnson Date: Wed, 30 Mar 2022 08:51:15 -0700 Subject: [PATCH] WIP --- Cargo.toml | 1 + crates/bonsaidb-jobs/Cargo.toml | 26 ++ crates/bonsaidb-jobs/src/fifo.rs | 66 ++++ crates/bonsaidb-jobs/src/job.rs | 291 ++++++++++++++++++ crates/bonsaidb-jobs/src/lib.rs | 30 ++ crates/bonsaidb-jobs/src/orchestrator.rs | 364 +++++++++++++++++++++++ crates/bonsaidb-jobs/src/queue.rs | 250 ++++++++++++++++ crates/bonsaidb-jobs/src/schema/job.rs | 36 +++ crates/bonsaidb-jobs/src/schema/mod.rs | 3 + crates/bonsaidb-jobs/src/schema/queue.rs | 46 +++ 10 files changed, 1113 insertions(+) create mode 100644 crates/bonsaidb-jobs/Cargo.toml create mode 100644 crates/bonsaidb-jobs/src/fifo.rs create mode 100644 crates/bonsaidb-jobs/src/job.rs create mode 100644 crates/bonsaidb-jobs/src/lib.rs create mode 100644 crates/bonsaidb-jobs/src/orchestrator.rs create mode 100644 crates/bonsaidb-jobs/src/queue.rs create mode 100644 crates/bonsaidb-jobs/src/schema/job.rs create mode 100644 crates/bonsaidb-jobs/src/schema/mod.rs create mode 100644 crates/bonsaidb-jobs/src/schema/queue.rs diff --git a/Cargo.toml b/Cargo.toml index 8c3dac612b3..7cfadf72f8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "crates/bonsaidb-server", "crates/bonsaidb-keystorage-s3", "crates/bonsaidb-utils", + "crates/bonsaidb-jobs", "examples/*", "book/book-examples", "xtask", diff --git a/crates/bonsaidb-jobs/Cargo.toml b/crates/bonsaidb-jobs/Cargo.toml new file mode 100644 index 00000000000..18f563a5361 --- /dev/null +++ b/crates/bonsaidb-jobs/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "bonsaidb-jobs" +version = "0.4.0" +authors = ["Jonathan Johnson "] +edition = "2021" +description = "Persistent job queueing and scheduling for BonsaiDb." +repository = "https://github.com/khonsulabs/bonsaidb" +license = "MIT OR Apache-2.0" +keywords = ["bonsaidb", "job", "queue"] +categories = ["config"] +readme = "./README.md" +homepage = "https://bonsaidb.io/" +rust-version = "1.58" + +[dependencies] +bonsaidb-core = { version = "0.4.0", path = "../bonsaidb-core" } +serde = { version = "1", features = ["derive"] } +thiserror = "1" +tokio = { version = "=1.16.1", default-features = false, features = ["sync"] } +flume = "0.10" + +[dev-dependencies] +tokio = { version = "=1.16.1", features = ["full"] } +bonsaidb-core = { version = "0.4.0", path = "../bonsaidb-core", features = [ + "test-util", +] } diff --git a/crates/bonsaidb-jobs/src/fifo.rs b/crates/bonsaidb-jobs/src/fifo.rs new file mode 100644 index 00000000000..0269bcdf437 --- /dev/null +++ b/crates/bonsaidb-jobs/src/fifo.rs @@ -0,0 +1,66 @@ +use bonsaidb_core::{ + connection::Connection, document::CollectionDocument, keyvalue::KeyValue, pubsub::PubSub, +}; + +use crate::{ + orchestrator::{Backend, Strategy}, + queue::{self, QueueId}, + schema, +}; + +pub struct Config { + pub tiers: Vec, +} + +pub struct JobTier(pub Vec); + +pub struct PriorityFifo; + +impl Strategy for PriorityFifo { + type WorkerConfig = Config; + type Worker = Config; + + fn initialize_worker( + &mut self, + mut config: Self::WorkerConfig, + backend: &mut Backend, + ) -> Result { + for tier in &mut config.tiers { + for queue in &mut tier.0 { + queue.resolve(backend.database())?; + } + } + Ok(config) + } + + fn dequeue_for_worker( + &mut self, + worker: &Self::WorkerConfig, + backend: &mut Backend, + ) -> Result>, queue::Error> { + for tier in &worker.tiers { + if let Some((queue_with_oldest_job, _)) = tier + .0 + .iter() + .filter_map(|q| { + backend + .queue(q.as_id().unwrap()) + .and_then(|jobs| jobs.front().map(|j| (q, j.clone()))) + }) + .max_by(|(_, q1_front), (_, q2_front)| { + q1_front + .contents + .enqueued_at + .cmp(&q2_front.contents.enqueued_at) + }) + { + return Ok(backend + .queue(queue_with_oldest_job.as_id().unwrap()) + .unwrap() + .pop_front()); + } + } + + Ok(None) + } +} diff --git a/crates/bonsaidb-jobs/src/job.rs b/crates/bonsaidb-jobs/src/job.rs new file mode 100644 index 00000000000..2b9eee3702a --- /dev/null +++ b/crates/bonsaidb-jobs/src/job.rs @@ -0,0 +1,291 @@ +use std::{fmt::Display, marker::PhantomData, sync::Arc}; + +use bonsaidb_core::{ + actionable::async_trait, + arc_bytes::serde::Bytes, + connection::Connection, + document::CollectionDocument, + keyvalue::Timestamp, + pubsub::{PubSub, Subscriber}, + schema::{Schematic, SerializedCollection}, + transmog::{Format, OwnedDeserializer}, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::watch; + +use crate::{ + orchestrator::{job_topic, Orchestrator, WorkerId}, + schema, +}; + +pub(crate) fn define_collections(schematic: &mut Schematic) -> Result<(), bonsaidb_core::Error> { + schematic.define_collection::() +} + +pub struct Job(CollectionDocument, PhantomData); + +impl From> for Job { + fn from(doc: CollectionDocument) -> Self { + Self(doc, PhantomData) + } +} + +impl Job { + pub fn update( + &mut self, + database: &Database, + ) -> Result { + match schema::Job::get(self.0.header.id, database)? { + Some(doc) => { + self.0 = doc; + Ok(true) + } + None => Ok(false), + } + } + + pub fn wait_for_result( + &mut self, + database: &Database, + ) -> JobResult { + loop { + let subscriber = database.create_subscriber()?; + subscriber.subscribe_to(&job_topic(self.0.header.id))?; + // Check that the job hasn't completed before we could create the subscriber + self.update(database)?; + return if let Some(result) = &self.0.contents.result { + >>::deserialize_owned( + &Q::format(), + result, + ) + } else { + // Wait for the subscriber to be notified + match subscriber.receiver().receive() { + Ok(message) => { + >>::deserialize_owned( + &Q::format(), + &message.payload, + ) + } + Err(_) => continue, + } + } + .map_err(|err| bonsaidb_core::Error::Serialization(err.to_string()))?; + } + } + + #[must_use] + pub fn progress(&self) -> &Progress { + &self.0.contents.progress + } + + #[must_use] + pub fn enqueued_at(&self) -> Timestamp { + self.0.contents.enqueued_at + } + + #[must_use] + pub fn cancelled_at(&self) -> Option { + self.0.contents.cancelled_at + } +} + +#[allow(type_alias_bounds)] +type JobResult = Result, Q::Error>; + +#[async_trait] +pub trait Queueable: Sized + Send + Sync + std::fmt::Debug { + type Format: bonsaidb_core::transmog::OwnedDeserializer + + bonsaidb_core::transmog::OwnedDeserializer, Self::Error>>; + type Output: Send + Sync; + type Error: From + Send + Sync; + + fn format() -> Self::Format; +} + +#[async_trait] +pub trait Executor { + type Job: Queueable; + + async fn execute( + &mut self, + job: Self::Job, + progress: &mut ProgressReporter, + ) -> Result<::Output, ::Error>; + + async fn execute_with_progress( + &mut self, + worker_id: WorkerId, + job: &mut CollectionDocument, + orchestrator: &Orchestrator, + ) -> Result::Output>, ::Error> { + let (mut executor_handle, mut job_handle) = ProgressReporter::new(); + let payload = Self::Job::format() + .deserialize_owned(&job.contents.payload) + .unwrap(); + let mut task = self.execute(payload, &mut executor_handle); + + let result = loop { + tokio::select! { + output = &mut task => break output.map(Some), + // TODO have timeout to report to orchestrator with progress + progress = job_handle.receiver.changed() => { + progress.unwrap(); + // TODO throttle progress changes + let progress = job_handle.receiver.borrow_and_update().clone(); + // TODO properly handle errors. They shouldn't kill the + // worker, as the job could complete and communication could + // be restored. + drop(job_handle.cancel.send(orchestrator.report_progress(worker_id, progress).await.unwrap())); + } + } + }; + + let result_bytes = Bytes::from( + ::format() + .serialize(&result) + .map_err(|err| bonsaidb_core::Error::Serialization(err.to_string()))?, + ); + // TODO error handling + orchestrator + .complete_job(worker_id, result_bytes) + .await + .unwrap(); + + result + } +} + +#[derive(Default, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub struct Progress { + pub updated_at: Timestamp, + pub message: Option>, + pub step: ProgressStep, + pub total_steps: u64, +} + +#[derive(Default, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub struct ProgressStep { + pub name: Option>, + pub index: u64, + pub completion: StepCompletion, +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub enum StepCompletion { + Indeterminite, + Percent(u8), + Count { index: u64, total_steps: u64 }, + Complete, +} + +impl Default for StepCompletion { + fn default() -> Self { + Self::Indeterminite + } +} + +#[derive(Debug)] +pub struct ProgressReporter { + current: Progress, + sender: watch::Sender, + cancel: watch::Receiver>, +} + +struct ProgressReceiver { + receiver: watch::Receiver, + cancel: watch::Sender>, +} + +impl ProgressReporter { + fn new() -> (Self, ProgressReceiver) { + let (sender, receiver) = watch::channel(Progress::default()); + let (cancel_sender, cancel_receiver) = watch::channel(None); + ( + Self { + sender, + cancel: cancel_receiver, + current: Progress::default(), + }, + ProgressReceiver { + receiver, + cancel: cancel_sender, + }, + ) + } + + pub fn cancelled_at(&mut self) -> Option { + *self.cancel.borrow_and_update() + } + + pub fn set_message(&mut self, message: impl Display) { + let message = message.to_string(); + if self.current.message.as_deref() != Some(&message) { + self.current.message = Some(Arc::new(message)); + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn clear_message(&mut self) { + if self.current.message.is_some() { + self.current.message = None; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_total_steps(&mut self, steps: u64) { + if self.current.total_steps != steps { + self.current.total_steps = steps; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_step(&mut self, step: u64) { + if self.current.step.index != step { + self.current.step.index = step; + self.current.step.name = None; + self.current.step.completion = StepCompletion::Indeterminite; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_step_with_name(&mut self, step: u64, name: impl Display) { + if self.current.step.index != step { + self.current.step.index = step; + self.current.step.name = Some(Arc::new(name.to_string())); + self.current.step.completion = StepCompletion::Indeterminite; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_step_completion(&mut self, completion: StepCompletion) { + if self.current.step.completion != completion { + self.current.step.completion = completion; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_step_percent_complete(&mut self, percent: f32) { + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let percent = StepCompletion::Percent((percent.clamp(0., 1.) * 256.).floor() as u8); + if self.current.step.completion != percent { + self.current.step.completion = percent; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_step_progress(&mut self, index: u64, total_steps: u64) { + let progress = StepCompletion::Count { index, total_steps }; + if self.current.step.completion != progress { + self.current.step.completion = progress; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn complete_step(&mut self) { + if self.current.step.completion != StepCompletion::Complete { + self.current.step.completion = StepCompletion::Complete; + self.sender.send(self.current.clone()).unwrap(); + } + } +} diff --git a/crates/bonsaidb-jobs/src/lib.rs b/crates/bonsaidb-jobs/src/lib.rs new file mode 100644 index 00000000000..ab119c9b2c1 --- /dev/null +++ b/crates/bonsaidb-jobs/src/lib.rs @@ -0,0 +1,30 @@ +#![forbid(unsafe_code)] +#![warn( + clippy::cargo, + // missing_docs, + // clippy::missing_docs_in_private_items, + clippy::pedantic, + future_incompatible, + rust_2018_idioms, +)] +#![allow( + clippy::missing_panics_doc, + clippy::missing_errors_doc, // TODO clippy::missing_errors_doc + // clippy::option_if_let_else, + clippy::module_name_repetitions, +)] + +use bonsaidb_core::schema::Schematic; + +pub fn define_collections(schematic: &mut Schematic) -> Result<(), bonsaidb_core::Error> { + queue::define_collections(schematic)?; + job::define_collections(schematic)?; + + Ok(()) +} + +pub mod fifo; +pub mod job; +pub mod orchestrator; +pub mod queue; +mod schema; diff --git a/crates/bonsaidb-jobs/src/orchestrator.rs b/crates/bonsaidb-jobs/src/orchestrator.rs new file mode 100644 index 00000000000..db28bc500ce --- /dev/null +++ b/crates/bonsaidb-jobs/src/orchestrator.rs @@ -0,0 +1,364 @@ +use std::collections::{HashMap, VecDeque}; + +use bonsaidb_core::{ + arc_bytes::serde::Bytes, + connection::Connection, + document::CollectionDocument, + keyvalue::{KeyValue, Timestamp}, + pubsub::PubSub, + schema::SerializedCollection, + transmog::Format, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::oneshot; + +use crate::{ + fifo::PriorityFifo, + job::{Job, Progress, Queueable}, + queue::{self, IdResolver, QueueId, QueueName}, + schema::{self, job::PendingJobs, Queue}, +}; + +#[derive(Clone, Debug)] +pub struct Orchestrator +where + S: Strategy, +{ + sender: flume::Sender>, +} + +impl Orchestrator +where + S: Strategy, +{ + pub fn spawn(database: Database, strategy: S) -> Self + where + Database: Connection + PubSub + KeyValue + 'static, + { + let (sender, receiver) = flume::unbounded(); + std::thread::Builder::new() + .name(String::from("jobs-orchestrator")) + .spawn(move || ExecutingBackend::run(receiver, database, strategy)) + .unwrap(); + Self { sender } + } + + pub async fn enqueue + Send, Payload: Queueable>( + &self, + queue: Queue, + job: &Payload, + ) -> Result, queue::Error> { + let bytes = Payload::format() + .serialize(job) + .map_err(|err| bonsaidb_core::Error::Serialization(err.to_string()))?; + let (sender, receiver) = oneshot::channel(); + self.sender.send(Command::Enqueue { + queue: queue.into(), + payload: Bytes::from(bytes), + output: sender, + })?; + let job = receiver.await??; + + Ok(Job::from(job)) + } + + pub async fn register_worker(&self, config: S::WorkerConfig) -> Result { + let (sender, receiver) = oneshot::channel(); + self.sender.send(Command::RegisterWorker { + config, + output: sender, + })?; + receiver.await? + } + + pub async fn report_progress( + &self, + worker: WorkerId, + progress: Progress, + ) -> Result, queue::Error> { + let (sender, receiver) = oneshot::channel(); + self.sender.send(Command::JobProgress { + worker, + progress, + output: sender, + })?; + Ok(receiver.await?) + } + + pub async fn complete_job(&self, worker: WorkerId, result: Bytes) -> Result<(), queue::Error> { + let (sender, receiver) = oneshot::channel(); + self.sender.send(Command::JobComplete { + worker, + job_result: result, + output: sender, + })?; + Ok(receiver.await?) + } +} + +enum Command { + Enqueue { + queue: QueueId, + payload: Bytes, + output: oneshot::Sender, queue::Error>>, + }, + RegisterWorker { + config: S::WorkerConfig, + output: oneshot::Sender>, + }, + JobProgress { + worker: WorkerId, + progress: Progress, + output: oneshot::Sender>, + }, + JobComplete { + worker: WorkerId, + job_result: Bytes, + output: oneshot::Sender<()>, + }, +} + +pub struct ExecutingBackend +where + Database: Connection + PubSub + KeyValue, + S: Strategy, +{ + receiver: flume::Receiver>, + backend: Backend, + strategy: S, + workers: HashMap>, + last_worker_id: u64, +} + +pub struct Backend +where + Database: Connection + PubSub + KeyValue, +{ + database: Database, + queues_by_name: HashMap>, + queues: HashMap>>, +} + +impl Backend +where + Database: Connection + PubSub + KeyValue, +{ + pub fn database(&self) -> &Database { + &self.database + } + + pub fn queue(&mut self, queue: u64) -> Option<&mut VecDeque>> { + self.queues.get_mut(&queue) + } + + pub fn queue_by_name( + &mut self, + queue: &QueueName, + ) -> Option<&mut VecDeque>> { + let id = self.queues_by_name.get(queue)?.header.id; + self.queue(id) + } +} + +impl ExecutingBackend +where + Database: Connection + PubSub + KeyValue, + S: Strategy, +{ + fn run( + receiver: flume::Receiver>, + database: Database, + strategy: S, + ) -> Result<(), queue::Error> { + let mut queues_by_name = HashMap::new(); + let mut queues = HashMap::new(); + + for queue in Queue::all(&database).query()? { + queues_by_name.insert(queue.contents.name.clone(), queue); + } + + for (_, job) in database + .view::() + .query_with_collection_docs()? + .documents + { + let queue = queues + .entry(job.contents.queue_id) + .or_insert_with(VecDeque::default); + queue.push_back(job); + } + + Self { + receiver, + strategy, + backend: Backend { + database, + queues_by_name, + queues, + }, + workers: HashMap::new(), + last_worker_id: 0, + } + .orchestrate() + } + + fn orchestrate(&mut self) -> Result<(), queue::Error> { + while let Ok(command) = self.receiver.recv() { + match command { + Command::Enqueue { + queue, + payload, + output: result, + } => { + drop(result.send(self.enqueue(&queue, payload))); + } + Command::RegisterWorker { + config, + output: result, + } => { + drop(result.send(self.register_worker(config))); + } + Command::JobProgress { + worker, + progress, + output: result, + } => { + if let Some(worker) = self.workers.get_mut(&worker.0) { + if let Some(job) = &mut worker.current_job { + loop { + job.contents.progress = progress.clone(); + match job.update(&self.backend.database) { + Ok(()) => {} + Err(bonsaidb_core::Error::DocumentConflict(..)) => { + if let Some(updated_job) = + schema::Job::get(job.header.id, &self.backend.database)? + { + // Try updating the progress again + *job = updated_job; + } else { + break; + } + } + Err(other) => return Err(queue::Error::from(other)), + } + } + let _ = result.send(job.contents.cancelled_at); + } + } + } + Command::JobComplete { + worker, + job_result, + output, + } => { + if let Some(worker) = self.workers.get_mut(&worker.0) { + if let Some(job) = &mut worker.current_job { + job.contents.result = Some(job_result.clone()); + job.contents.returned_at = Some(Timestamp::now()); + job.update(&self.backend.database)?; + + self.backend.database.publish_bytes( + job_topic(job.header.id).into_bytes(), + job_result.0, + )?; + } + } + let _ = output.send(()); + } + } + } + Ok(()) + } + + fn enqueue( + &mut self, + queue: &QueueId, + payload: Bytes, + ) -> Result, queue::Error> { + let queue_id = self.backend.database.resolve(&queue)?; + let job = schema::Job { + queue_id, + payload, + enqueued_at: Timestamp::now(), + progress: Progress::default(), + result: None, + returned_at: None, + cancelled_at: None, + } + .push_into(&self.backend.database)?; + let entries = self + .backend + .queues + .entry(job.contents.queue_id) + .or_default(); + let insert_at = match entries.binary_search_by(|existing_job| { + existing_job + .contents + .enqueued_at + .cmp(&job.contents.enqueued_at) + }) { + Ok(index) | Err(index) => index, + }; + entries.insert(insert_at, job.clone()); + Ok(job) + } + + fn register_worker(&mut self, config: S::WorkerConfig) -> Result { + self.last_worker_id += 1; + self.workers.insert( + self.last_worker_id, + Worker { + current_job: None, + last_seen: Timestamp::now(), + strategy_worker: self.strategy.initialize_worker(config, &mut self.backend)?, + }, + ); + Ok(WorkerId(self.last_worker_id)) + } +} + +impl IdResolver for Backend +where + Database: Connection + PubSub + KeyValue, +{ + fn resolve(&self, id: &QueueId) -> Result { + match id { + QueueId::Id(id) => Ok(*id), + QueueId::Name(name) => self + .queues_by_name + .get(name) + .map(|q| q.header.id) + .ok_or(queue::Error::NotFound), + } + } +} + +pub trait Strategy: Sized + Send + Sync + 'static { + type WorkerConfig: Send + Sync; + type Worker: Send + Sync; + + fn initialize_worker( + &mut self, + config: Self::WorkerConfig, + backend: &mut Backend, + ) -> Result; + + fn dequeue_for_worker( + &mut self, + worker: &Self::Worker, + backend: &mut Backend, + ) -> Result>, queue::Error>; +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] +pub struct WorkerId(pub(crate) u64); + +pub struct Worker { + current_job: Option>, + strategy_worker: W, + last_seen: Timestamp, +} + +pub(crate) fn job_topic(id: u64) -> String { + format!("BONSIADB_JOB_{}_RESULT", id) +} diff --git a/crates/bonsaidb-jobs/src/queue.rs b/crates/bonsaidb-jobs/src/queue.rs new file mode 100644 index 00000000000..0b7b66e9988 --- /dev/null +++ b/crates/bonsaidb-jobs/src/queue.rs @@ -0,0 +1,250 @@ +use std::fmt::Display; + +use bonsaidb_core::{ + async_trait::async_trait, + connection::Connection, + document::CollectionDocument, + schema::{Authority, CollectionName, InsertError, SchemaName, Schematic, SerializedCollection}, +}; +use serde::{Deserialize, Serialize}; + +use crate::schema::{ + self, + queue::{ByOwnerAndName, ViewExt}, +}; + +pub(crate) fn define_collections(schematic: &mut Schematic) -> Result<(), bonsaidb_core::Error> { + schematic.define_collection::() +} + +pub struct Queue(CollectionDocument); + +impl Queue { + pub async fn find< + Owner: Into + Send, + Name: Into + Send, + Database: Connection, + >( + owner: Owner, + name: Name, + database: Database, + ) -> Result, bonsaidb_core::Error> { + let owner = owner.into(); + let name = name.into(); + let existing = database + .view::() + .find_queue(&owner, &name) + .query_with_collection_docs()?; + Ok(existing + .documents + .into_iter() + .next() + .map(|(_, doc)| Self(doc))) + } + + pub async fn create< + Owner: Into + Send, + Name: Into + Send, + Database: Connection, + >( + owner: Owner, + name: Name, + database: Database, + ) -> Result { + schema::Queue { + name: QueueName::new(owner, name), + } + .push_into(&database) + .map(Self) + .map_err(|err| err.error) + } + + #[must_use] + pub const fn id(&self) -> u64 { + self.0.header.id + } + + #[must_use] + pub const fn name(&self) -> &QueueName { + &self.0.contents.name + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq)] +pub enum QueueOwner { + Collection(CollectionName), + Authority(Authority), + Schema(SchemaName), + Backend, +} + +impl Display for QueueOwner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + QueueOwner::Collection(collection) => write!(f, "collection.{}", collection), + QueueOwner::Authority(authority) => write!(f, "authority.{}", authority), + QueueOwner::Schema(schema) => write!(f, "schema.{}", schema), + QueueOwner::Backend => f.write_str("backend"), + } + } +} + +impl From for QueueOwner { + fn from(name: CollectionName) -> Self { + Self::Collection(name) + } +} + +impl From for QueueOwner { + fn from(name: Authority) -> Self { + Self::Authority(name) + } +} + +impl From for QueueOwner { + fn from(name: SchemaName) -> Self { + Self::Schema(name) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct QueueName { + pub owner: QueueOwner, + pub name: String, +} + +impl QueueName { + pub fn new + Send, Name: Into + Send>( + owner: Owner, + name: Name, + ) -> Self { + Self { + owner: owner.into(), + name: name.into(), + } + } + + #[must_use] + pub fn format(owner: &QueueOwner, name: &str) -> String { + let mut string = String::new(); + Self::format_into(owner, name, &mut string).unwrap(); + string + } + + pub fn format_into( + owner: &QueueOwner, + name: &str, + mut writer: impl std::fmt::Write, + ) -> Result<(), std::fmt::Error> { + write!(writer, "{}.{}", owner, name) + } +} + +impl Display for QueueName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.owner, self.name) + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum QueueId { + Id(u64), + Name(QueueName), +} + +impl QueueId { + pub fn resolve(&mut self, resolver: &Resolver) -> Result { + let id = resolver.resolve(self)?; + *self = Self::Id(id); + Ok(id) + } + + #[must_use] + pub fn as_id(&self) -> Option { + if let Self::Id(id) = self { + Some(*id) + } else { + None + } + } +} + +pub trait IdResolver { + fn resolve(&self, id: &QueueId) -> Result; +} + +#[async_trait] +impl IdResolver for Database +where + Database: Connection, +{ + fn resolve(&self, id: &QueueId) -> Result { + match id { + QueueId::Id(id) => Ok(*id), + QueueId::Name(name) => { + let existing = self + .view::() + .find_queue(&name.owner, &name.name) + .query()?; + Ok(existing + .into_iter() + .next() + .map(|mapping| mapping.source.id.deserialize()) + .transpose()? + .ok_or(Error::NotFound)?) + } + } + } +} + +impl From for QueueId { + fn from(id: u64) -> Self { + Self::Id(id) + } +} + +impl<'a> From<&'a Queue> for QueueId { + fn from(queue: &'a Queue) -> Self { + Self::Name(queue.name().clone()) + } +} + +impl From for QueueId { + fn from(name: QueueName) -> Self { + Self::Name(name) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("queue not found")] + NotFound, + #[error("database error: {0}")] + Database(#[from] bonsaidb_core::Error), + #[error("internal communication failure")] + InternalCommunication, +} + +impl From for Error { + fn from(_: tokio::sync::oneshot::error::RecvError) -> Self { + Self::InternalCommunication + } +} + +impl From> for Error { + fn from(_: flume::SendError) -> Self { + Self::InternalCommunication + } +} + +impl From for Error { + fn from(_: flume::RecvError) -> Self { + Self::InternalCommunication + } +} + +impl From> for Error { + fn from(err: InsertError) -> Self { + Self::Database(err.error) + } +} diff --git a/crates/bonsaidb-jobs/src/schema/job.rs b/crates/bonsaidb-jobs/src/schema/job.rs new file mode 100644 index 00000000000..f967e7be5a3 --- /dev/null +++ b/crates/bonsaidb-jobs/src/schema/job.rs @@ -0,0 +1,36 @@ +use bonsaidb_core::{ + arc_bytes::serde::Bytes, + document::{CollectionDocument, Emit}, + keyvalue::Timestamp, + schema::{Collection, CollectionViewSchema, View, ViewMapResult}, +}; +use serde::{Deserialize, Serialize}; + +use crate::job::Progress; + +#[derive(Collection, Clone, Debug, Serialize, Deserialize)] +#[collection(name = "jobs", authority = "bonsaidb", core = bonsaidb_core)] +pub struct Job { + pub queue_id: u64, + pub payload: Bytes, + pub enqueued_at: Timestamp, + pub progress: Progress, + pub returned_at: Option, + pub cancelled_at: Option, + pub result: Option, +} + +#[derive(View, Debug, Clone)] +#[view(name = "pending", key = Timestamp, collection = Job, core = bonsaidb_core)] +pub struct PendingJobs; + +impl CollectionViewSchema for PendingJobs { + type View = Self; + + fn map( + &self, + document: CollectionDocument<::Collection>, + ) -> ViewMapResult { + document.header.emit_key(document.contents.enqueued_at) + } +} diff --git a/crates/bonsaidb-jobs/src/schema/mod.rs b/crates/bonsaidb-jobs/src/schema/mod.rs new file mode 100644 index 00000000000..94bb4ed5c09 --- /dev/null +++ b/crates/bonsaidb-jobs/src/schema/mod.rs @@ -0,0 +1,3 @@ +pub mod job; +pub mod queue; +pub use self::{job::Job, queue::Queue}; diff --git a/crates/bonsaidb-jobs/src/schema/queue.rs b/crates/bonsaidb-jobs/src/schema/queue.rs new file mode 100644 index 00000000000..61a0f8c38a4 --- /dev/null +++ b/crates/bonsaidb-jobs/src/schema/queue.rs @@ -0,0 +1,46 @@ +use bonsaidb_core::{ + connection::{self, Connection}, + document::{CollectionDocument, Emit}, + schema::{Collection, CollectionViewSchema, View, ViewMapResult}, +}; +use serde::{Deserialize, Serialize}; + +use crate::queue::{QueueName, QueueOwner}; + +#[derive(Collection, Debug, Serialize, Deserialize)] +#[collection(name = "queues", authority = "bonsaidb", core = bonsaidb_core)] +pub struct Queue { + pub name: QueueName, +} + +#[derive(View, Debug, Clone)] +#[view(name = "by-name", collection = Queue, key = String, core = bonsaidb_core)] +pub struct ByOwnerAndName; + +impl CollectionViewSchema for ByOwnerAndName { + type View = Self; + + fn unique(&self) -> bool { + true + } + + fn map( + &self, + document: CollectionDocument<::Collection>, + ) -> ViewMapResult { + document.header.emit_key(document.contents.name.to_string()) + } +} + +pub trait ViewExt: Sized { + fn find_queue(self, owner: &QueueOwner, name: &str) -> Self; +} + +impl<'a, Cn> ViewExt for connection::View<'a, Cn, ByOwnerAndName, String> +where + Cn: Connection, +{ + fn find_queue(self, owner: &QueueOwner, name: &str) -> Self { + self.with_key(QueueName::format(owner, name)) + } +}