Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Persistent Jobs Service #211

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"crates/bonsaidb-server",
"crates/bonsaidb-keystorage-s3",
"crates/bonsaidb-utils",
"crates/bonsaidb-jobs",
"examples/*",
"book/book-examples",
"xtask",
Expand Down
26 changes: 26 additions & 0 deletions crates/bonsaidb-jobs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "bonsaidb-jobs"
version = "0.4.0"
authors = ["Jonathan Johnson <[email protected]>"]
edition = "2021"
description = "Persistent job queueing and scheduling for BonsaiDb."
repository = "https://github.com/khonsulabs/bonsaidb"
license = "MIT OR Apache-2.0"
keywords = ["bonsaidb", "job", "queue"]
categories = ["config"]
readme = "./README.md"
homepage = "https://bonsaidb.io/"
rust-version = "1.58"

[dependencies]
bonsaidb-core = { version = "0.4.0", path = "../bonsaidb-core" }
serde = { version = "1", features = ["derive"] }
thiserror = "1"
tokio = { version = "=1.16.1", default-features = false, features = ["sync"] }
flume = "0.10"

[dev-dependencies]
tokio = { version = "=1.16.1", features = ["full"] }
bonsaidb-core = { version = "0.4.0", path = "../bonsaidb-core", features = [
"test-util",
] }
66 changes: 66 additions & 0 deletions crates/bonsaidb-jobs/src/fifo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use bonsaidb_core::{
connection::Connection, document::CollectionDocument, keyvalue::KeyValue, pubsub::PubSub,
};

use crate::{
orchestrator::{Backend, Strategy},
queue::{self, QueueId},
schema,
};

pub struct Config {
pub tiers: Vec<JobTier>,
}

pub struct JobTier(pub Vec<QueueId>);

pub struct PriorityFifo;

impl Strategy for PriorityFifo {
type WorkerConfig = Config;
type Worker = Config;

fn initialize_worker<Database: Connection + PubSub + KeyValue>(
&mut self,
mut config: Self::WorkerConfig,
backend: &mut Backend<Database>,
) -> Result<Self::Worker, queue::Error> {
for tier in &mut config.tiers {
for queue in &mut tier.0 {
queue.resolve(backend.database())?;
}
}
Ok(config)
}

fn dequeue_for_worker<Database: Connection + PubSub + KeyValue>(
&mut self,
worker: &Self::WorkerConfig,
backend: &mut Backend<Database>,
) -> Result<Option<CollectionDocument<schema::Job>>, queue::Error> {
for tier in &worker.tiers {
if let Some((queue_with_oldest_job, _)) = tier
.0
.iter()
.filter_map(|q| {
backend
.queue(q.as_id().unwrap())
.and_then(|jobs| jobs.front().map(|j| (q, j.clone())))
})
.max_by(|(_, q1_front), (_, q2_front)| {
q1_front
.contents
.enqueued_at
.cmp(&q2_front.contents.enqueued_at)
})
{
return Ok(backend
.queue(queue_with_oldest_job.as_id().unwrap())
.unwrap()
.pop_front());
}
}

Ok(None)
}
}
291 changes: 291 additions & 0 deletions crates/bonsaidb-jobs/src/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
use std::{fmt::Display, marker::PhantomData, sync::Arc};

use bonsaidb_core::{
actionable::async_trait,
arc_bytes::serde::Bytes,
connection::Connection,
document::CollectionDocument,
keyvalue::Timestamp,
pubsub::{PubSub, Subscriber},
schema::{Schematic, SerializedCollection},
transmog::{Format, OwnedDeserializer},
};
use serde::{Deserialize, Serialize};
use tokio::sync::watch;

use crate::{
orchestrator::{job_topic, Orchestrator, WorkerId},
schema,
};

pub(crate) fn define_collections(schematic: &mut Schematic) -> Result<(), bonsaidb_core::Error> {
schematic.define_collection::<schema::Job>()
}

pub struct Job<Q: Queueable>(CollectionDocument<schema::Job>, PhantomData<Q>);

impl<Q: Queueable> From<CollectionDocument<schema::Job>> for Job<Q> {
fn from(doc: CollectionDocument<schema::Job>) -> Self {
Self(doc, PhantomData)
}
}

impl<Q: Queueable> Job<Q> {
pub fn update<Database: Connection>(
&mut self,
database: &Database,
) -> Result<bool, bonsaidb_core::Error> {
match schema::Job::get(self.0.header.id, database)? {
Some(doc) => {
self.0 = doc;
Ok(true)
}
None => Ok(false),
}
}

pub fn wait_for_result<Database: Connection + PubSub>(
&mut self,
database: &Database,
) -> JobResult<Q> {
loop {
let subscriber = database.create_subscriber()?;
subscriber.subscribe_to(&job_topic(self.0.header.id))?;
// Check that the job hasn't completed before we could create the subscriber
self.update(database)?;
return if let Some(result) = &self.0.contents.result {
<Q::Format as OwnedDeserializer<JobResult<Q>>>::deserialize_owned(
&Q::format(),
result,
)
} else {
// Wait for the subscriber to be notified
match subscriber.receiver().receive() {
Ok(message) => {
<Q::Format as OwnedDeserializer<JobResult<Q>>>::deserialize_owned(
&Q::format(),
&message.payload,
)
}
Err(_) => continue,
}
}
.map_err(|err| bonsaidb_core::Error::Serialization(err.to_string()))?;
}
}

#[must_use]
pub fn progress(&self) -> &Progress {
&self.0.contents.progress
}

#[must_use]
pub fn enqueued_at(&self) -> Timestamp {
self.0.contents.enqueued_at
}

#[must_use]
pub fn cancelled_at(&self) -> Option<Timestamp> {
self.0.contents.cancelled_at
}
}

#[allow(type_alias_bounds)]
type JobResult<Q: Queueable> = Result<Option<Q::Output>, Q::Error>;

#[async_trait]
pub trait Queueable: Sized + Send + Sync + std::fmt::Debug {
type Format: bonsaidb_core::transmog::OwnedDeserializer<Self>
+ bonsaidb_core::transmog::OwnedDeserializer<Result<Option<Self::Output>, Self::Error>>;
type Output: Send + Sync;
type Error: From<bonsaidb_core::Error> + Send + Sync;

fn format() -> Self::Format;
}

#[async_trait]
pub trait Executor {
type Job: Queueable;

async fn execute(
&mut self,
job: Self::Job,
progress: &mut ProgressReporter,
) -> Result<<Self::Job as Queueable>::Output, <Self::Job as Queueable>::Error>;

async fn execute_with_progress<Database: Connection + PubSub>(
&mut self,
worker_id: WorkerId,
job: &mut CollectionDocument<schema::Job>,
orchestrator: &Orchestrator,
) -> Result<Option<<Self::Job as Queueable>::Output>, <Self::Job as Queueable>::Error> {
let (mut executor_handle, mut job_handle) = ProgressReporter::new();
let payload = Self::Job::format()
.deserialize_owned(&job.contents.payload)
.unwrap();
let mut task = self.execute(payload, &mut executor_handle);

let result = loop {
tokio::select! {
output = &mut task => break output.map(Some),
// TODO have timeout to report to orchestrator with progress
progress = job_handle.receiver.changed() => {
progress.unwrap();
// TODO throttle progress changes
let progress = job_handle.receiver.borrow_and_update().clone();
// TODO properly handle errors. They shouldn't kill the
// worker, as the job could complete and communication could
// be restored.
drop(job_handle.cancel.send(orchestrator.report_progress(worker_id, progress).await.unwrap()));
}
}
};

let result_bytes = Bytes::from(
<Self::Job as Queueable>::format()
.serialize(&result)
.map_err(|err| bonsaidb_core::Error::Serialization(err.to_string()))?,
);
// TODO error handling
orchestrator
.complete_job(worker_id, result_bytes)
.await
.unwrap();

result
}
}

#[derive(Default, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub struct Progress {
pub updated_at: Timestamp,
pub message: Option<Arc<String>>,
pub step: ProgressStep,
pub total_steps: u64,
}

#[derive(Default, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub struct ProgressStep {
pub name: Option<Arc<String>>,
pub index: u64,
pub completion: StepCompletion,
}

#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub enum StepCompletion {
Indeterminite,
Percent(u8),
Count { index: u64, total_steps: u64 },
Complete,
}

impl Default for StepCompletion {
fn default() -> Self {
Self::Indeterminite
}
}

#[derive(Debug)]
pub struct ProgressReporter {
current: Progress,
sender: watch::Sender<Progress>,
cancel: watch::Receiver<Option<Timestamp>>,
}

struct ProgressReceiver {
receiver: watch::Receiver<Progress>,
cancel: watch::Sender<Option<Timestamp>>,
}

impl ProgressReporter {
fn new() -> (Self, ProgressReceiver) {
let (sender, receiver) = watch::channel(Progress::default());
let (cancel_sender, cancel_receiver) = watch::channel(None);
(
Self {
sender,
cancel: cancel_receiver,
current: Progress::default(),
},
ProgressReceiver {
receiver,
cancel: cancel_sender,
},
)
}

pub fn cancelled_at(&mut self) -> Option<Timestamp> {
*self.cancel.borrow_and_update()
}

pub fn set_message(&mut self, message: impl Display) {
let message = message.to_string();
if self.current.message.as_deref() != Some(&message) {
self.current.message = Some(Arc::new(message));
self.sender.send(self.current.clone()).unwrap();
}
}

pub fn clear_message(&mut self) {
if self.current.message.is_some() {
self.current.message = None;
self.sender.send(self.current.clone()).unwrap();
}
}

pub fn set_total_steps(&mut self, steps: u64) {
if self.current.total_steps != steps {
self.current.total_steps = steps;
self.sender.send(self.current.clone()).unwrap();
}
}

pub fn set_step(&mut self, step: u64) {
if self.current.step.index != step {
self.current.step.index = step;
self.current.step.name = None;
self.current.step.completion = StepCompletion::Indeterminite;
self.sender.send(self.current.clone()).unwrap();
}
}

pub fn set_step_with_name(&mut self, step: u64, name: impl Display) {
if self.current.step.index != step {
self.current.step.index = step;
self.current.step.name = Some(Arc::new(name.to_string()));
self.current.step.completion = StepCompletion::Indeterminite;
self.sender.send(self.current.clone()).unwrap();
}
}

pub fn set_step_completion(&mut self, completion: StepCompletion) {
if self.current.step.completion != completion {
self.current.step.completion = completion;
self.sender.send(self.current.clone()).unwrap();
}
}

pub fn set_step_percent_complete(&mut self, percent: f32) {
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let percent = StepCompletion::Percent((percent.clamp(0., 1.) * 256.).floor() as u8);
if self.current.step.completion != percent {
self.current.step.completion = percent;
self.sender.send(self.current.clone()).unwrap();
}
}

pub fn set_step_progress(&mut self, index: u64, total_steps: u64) {
let progress = StepCompletion::Count { index, total_steps };
if self.current.step.completion != progress {
self.current.step.completion = progress;
self.sender.send(self.current.clone()).unwrap();
}
}

pub fn complete_step(&mut self) {
if self.current.step.completion != StepCompletion::Complete {
self.current.step.completion = StepCompletion::Complete;
self.sender.send(self.current.clone()).unwrap();
}
}
}
Loading