Skip to content

Commit

Permalink
Internalizing jobs and renaming it to tasks
Browse files Browse the repository at this point in the history
This commit is in preparation of the scheduled jobs system. My original
goal was to reuse this code, but as I thought about how I was going to
implement it, it will be built atop the higher-level features of PubSub
and Collections/KeyValue to allow persistence. So this internalizes it,
and implements a simple task system for handling incoming requests in
the server separately.
  • Loading branch information
ecton committed Feb 20, 2022
1 parent 8a532ce commit 543777e
Show file tree
Hide file tree
Showing 17 changed files with 123 additions and 144 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Breaking Changes

- `bonsaidb::local::jobs` is now private. It used to be a separate, public crate
in the PliantDb days. After thinking about the job scheduler more, this
initial implementation is better suited for the internal task management than
the higher-level jobs system. As such, it has been internalized.

## v0.2.0

### Breaking Changes
Expand Down
3 changes: 1 addition & 2 deletions crates/bonsaidb-local/src/database/keyvalue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use tokio::{

use crate::{
config::KeyValuePersistence,
jobs::{Job, Keyed},
tasks::Task,
tasks::{Job, Keyed, Task},
Database, Error,
};

Expand Down
11 changes: 0 additions & 11 deletions crates/bonsaidb-local/src/jobs/mod.rs

This file was deleted.

60 changes: 0 additions & 60 deletions crates/bonsaidb-local/src/jobs/task.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/bonsaidb-local/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub mod cli;
pub mod config;
mod database;
mod error;
pub mod jobs;
mod open_trees;
mod storage;
mod tasks;
Expand Down
3 changes: 1 addition & 2 deletions crates/bonsaidb-local/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ use crate::vault::{self, LocalVaultKeyStorage, Vault};
use crate::{
config::{KeyValuePersistence, StorageConfiguration},
database::Context,
jobs::manager::Manager,
tasks::TaskManager,
tasks::{manager::Manager, TaskManager},
Database, Error,
};

Expand Down
15 changes: 11 additions & 4 deletions crates/bonsaidb-local/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@ use bonsaidb_utils::{fast_async_read, fast_async_write};

use crate::{
database::{keyvalue::ExpirationLoader, Database},
jobs::{manager::Manager, task::Handle},
tasks::compactor::Compactor,
tasks::{compactor::Compactor, handle::Handle, manager::Manager},
views::{
integrity_scanner::{IntegrityScan, IntegrityScanner, OptionalViewMapHandle},
mapper::{Map, Mapper},
},
Error,
};

/// Types related to defining [`Job`]s.
pub mod handle;
/// Types related to the job [`Manager`](manager::Manager).
pub mod manager;
mod traits;

pub use self::traits::{Job, Keyed};

mod compactor;
mod task;

Expand Down Expand Up @@ -125,7 +132,7 @@ impl TaskManager {
&self,
view: &dyn view::Serialized,
database: &Database,
) -> Result<Option<Handle<OptionalViewMapHandle, Error, Task>>, crate::Error> {
) -> Result<Option<Handle<OptionalViewMapHandle, Error>>, crate::Error> {
let view_name = view.view_name();
if !self
.view_integrity_checked(
Expand Down Expand Up @@ -186,7 +193,7 @@ impl TaskManager {
pub async fn spawn_key_value_expiration_loader(
&self,
database: &crate::Database,
) -> Option<Handle<(), Error, Task>> {
) -> Option<Handle<(), Error>> {
if self.key_value_expiration_loaded(&database.data.name).await {
None
} else {
Expand Down
3 changes: 1 addition & 2 deletions crates/bonsaidb-local/src/tasks/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use nebari::tree::{Root, Unversioned, Versioned};

use crate::{
database::{document_tree_name, keyvalue::KEY_TREE},
jobs::{Job, Keyed},
tasks::Task,
tasks::{Job, Keyed, Task},
views::{
view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
view_omitted_docs_tree_name, view_versions_tree_name,
Expand Down
49 changes: 49 additions & 0 deletions crates/bonsaidb-local/src/tasks/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::{fmt::Debug, sync::Arc};

use tokio::sync::oneshot;

/// he `Id` of an executing task.
#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)]
pub struct Id(pub(crate) u64);

/// References a background task.
#[derive(Debug)]
pub struct Handle<T, E> {
/// The task's id.
pub id: Id,

pub(crate) receiver: oneshot::Receiver<Result<T, Arc<E>>>,
}

impl<T, E> Handle<T, E>
where
T: Send + Sync + 'static,
E: Send + Sync + 'static,
{
/// Waits for the job to complete and returns the result.
///
/// # Errors
///
/// Returns an error if the job is cancelled.
pub async fn receive(
self,
) -> Result<Result<T, Arc<E>>, tokio::sync::oneshot::error::RecvError> {
self.receiver.await
}

// /// Tries to receive the status of the job. If available, it is returned.
// /// This function will not block.
// ///
// /// # Errors
// ///
// /// Returns an error if the job isn't complete.
// ///
// /// * [`TryRecvError::Disconnected`](flume::TryRecvError::Disconnected): The job has been cancelled.
// /// * [`TryRecvError::Empty`](flume::TryRecvError::Empty): The job has not completed yet.
#[cfg(test)]
pub fn try_receive(
&mut self,
) -> Result<Result<T, Arc<E>>, tokio::sync::oneshot::error::TryRecvError> {
self.receiver.try_recv()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use async_lock::RwLock;
use bonsaidb_utils::{fast_async_read, fast_async_write};
use derive_where::derive_where;

use crate::jobs::{
task::{Handle, Id},
use crate::tasks::{
handle::{Handle, Id},
Job, Keyed,
};

Expand All @@ -30,7 +30,8 @@ where
{
/// Pushes a `job` into the queue. Pushing the same job definition twice
/// will yield two tasks in the queue.
pub async fn enqueue<J: Job + 'static>(&self, job: J) -> Handle<J::Output, J::Error, Key> {
#[cfg(test)]
pub async fn enqueue<J: Job + 'static>(&self, job: J) -> Handle<J::Output, J::Error> {
let mut jobs = fast_async_write!(self.jobs);
jobs.enqueue(job, None, self.clone())
}
Expand All @@ -42,7 +43,7 @@ where
pub async fn lookup_or_enqueue<J: Keyed<Key>>(
&self,
job: J,
) -> Handle<<J as Job>::Output, <J as Job>::Error, Key> {
) -> Handle<<J as Job>::Output, <J as Job>::Error> {
let mut jobs = fast_async_write!(self.jobs);
jobs.lookup_or_enqueue(job, self.clone())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc};
use flume::{Receiver, Sender};
use tokio::sync::oneshot;

use crate::jobs::{
use crate::tasks::{
handle::{Handle, Id},
manager::{ManagedJob, Manager},
task::{Handle, Id},
traits::Executable,
Job, Keyed,
};
Expand Down Expand Up @@ -60,45 +60,40 @@ where
job: J,
key: Option<Key>,
manager: Manager<Key>,
) -> Handle<J::Output, J::Error, Key> {
) -> Handle<J::Output, J::Error> {
self.last_task_id = self.last_task_id.wrapping_add(1);
let id = Id(self.last_task_id);
self.queuer
.send(Box::new(ManagedJob {
id,
job,
manager,
key,
manager: manager.clone(),
}))
.unwrap();

self.create_new_task_handle(id, manager)
self.create_new_task_handle(id)
}

pub fn create_new_task_handle<T: Send + Sync + 'static, E: Send + Sync + 'static>(
&mut self,
id: Id,
manager: Manager<Key>,
) -> Handle<T, E, Key> {
) -> Handle<T, E> {
let (sender, receiver) = oneshot::channel();
let senders = self.result_senders.entry(id).or_insert_with(Vec::default);
senders.push(Box::new(Some(sender)));

Handle {
id,
manager,
receiver,
}
Handle { id, receiver }
}

pub fn lookup_or_enqueue<J: Keyed<Key>>(
&mut self,
job: J,
manager: Manager<Key>,
) -> Handle<<J as Job>::Output, <J as Job>::Error, Key> {
) -> Handle<<J as Job>::Output, <J as Job>::Error> {
let key = job.key();
if let Some(&id) = self.keyed_jobs.get(&key) {
self.create_new_task_handle(id, manager)
self.create_new_task_handle(id)
} else {
let handle = self.enqueue(job, Some(key.clone()), manager);
self.keyed_jobs.insert(key, handle.id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Debug;

use async_trait::async_trait;

use crate::jobs::{manager::Manager, task::Id, traits::Executable, Job};
use crate::tasks::{handle::Id, manager::Manager, traits::Executable, Job};

#[derive(Debug)]
pub struct ManagedJob<J, Key> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{convert::Infallible, fmt::Debug, hash::Hash};
use async_trait::async_trait;

use super::Manager;
use crate::jobs::{Job, Keyed};
use crate::tasks::{Job, Keyed};

#[derive(Debug)]
struct Echo<T>(T);
Expand Down Expand Up @@ -51,7 +51,7 @@ async fn keyed_simple() -> Result<(), tokio::sync::oneshot::error::RecvError> {
let handle2 = manager.lookup_or_enqueue(Echo(1)).await;
// Tests that they received the same job id
assert_eq!(handle.id, handle2.id);
let mut handle3 = handle.clone().await;
let mut handle3 = manager.lookup_or_enqueue(Echo(1)).await;
assert_eq!(handle3.id, handle.id);

manager.spawn_worker();
Expand Down
File renamed without changes.
5 changes: 2 additions & 3 deletions crates/bonsaidb-local/src/views/integrity_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use super::{
};
use crate::{
database::{document_tree_name, Database},
jobs::{task, Job, Keyed},
tasks::Task,
tasks::{handle::Handle, Job, Keyed, Task},
Error,
};

Expand All @@ -38,7 +37,7 @@ pub struct IntegrityScan {
pub view_name: ViewName,
}

pub type OptionalViewMapHandle = Option<Arc<Mutex<Option<task::Handle<u64, Error, Task>>>>>;
pub type OptionalViewMapHandle = Option<Arc<Mutex<Option<Handle<u64, Error>>>>>;

#[async_trait]
impl Job for IntegrityScanner {
Expand Down
3 changes: 1 addition & 2 deletions crates/bonsaidb-local/src/views/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use serde::{Deserialize, Serialize};

use crate::{
database::{deserialize_document, document_tree_name, Database},
jobs::{Job, Keyed},
tasks::Task,
tasks::{Job, Keyed, Task},
views::{
view_document_map_tree_name, view_entries_tree_name, view_invalidated_docs_tree_name,
view_omitted_docs_tree_name, EntryMapping, ViewEntry,
Expand Down
Loading

0 comments on commit 543777e

Please sign in to comment.