Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
chore: Update noosphere_gateway::worker_queue documentation, reduce
Browse files Browse the repository at this point in the history
indirection on cloneable interfaces, and remove some temporary TODOs and cleanups.
  • Loading branch information
jsantell committed Feb 27, 2024
1 parent 5f120de commit 59fb09a
Show file tree
Hide file tree
Showing 22 changed files with 177 additions and 152 deletions.
2 changes: 1 addition & 1 deletion rust/noosphere-gateway/src/gateway_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ where
S: Storage + 'static,
{
/// Type of [JobClient] for this [GatewayManager].
type JobClient: JobClient;
type JobClient: JobClient + Clone;

/// Returns the [JobClient] for the gateway.
fn job_client(&self) -> Self::JobClient;
Expand Down
8 changes: 2 additions & 6 deletions rust/noosphere-gateway/src/handlers/v0alpha1/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ where
let (next_version, new_blocks) = self.update_gateway_sphere().await?;

// These steps are order-independent
let _ = tokio::join!(
self.notify_name_resolver(),
self.notify_ipfs_syndicator(next_version)
);
let _ = tokio::join!(self.notify_name_resolver(), self.notify_ipfs_syndicator());

Ok(PushResponse::Accepted {
new_tip: next_version,
Expand Down Expand Up @@ -315,7 +312,7 @@ where
}

/// Request that new history be syndicated to IPFS
async fn notify_ipfs_syndicator(&self, next_version: Link<MemoIpld>) -> Result<()> {
async fn notify_ipfs_syndicator(&self) -> Result<()> {
let name_publish_on_success = if let Some(name_record) = &self.request_body.name_record {
Some(LinkRecord::try_from(name_record)?)
} else {
Expand All @@ -327,7 +324,6 @@ where
// have added it to the gateway.
if let Err(error) = self.job_runner_client.submit(GatewayJob::IpfsSyndication {
identity: self.gateway_scope.counterpart.to_owned(),
revision: Some(next_version),
name_publish_on_success,
}) {
warn!("Failed to queue IPFS syndication job: {}", error);
Expand Down
9 changes: 2 additions & 7 deletions rust/noosphere-gateway/src/handlers/v0alpha2/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ where
// These steps are order-independent
let _ = tokio::join!(
self.notify_name_resolver(),
self.notify_ipfs_syndicator(next_version, &push_body)
self.notify_ipfs_syndicator(&push_body)
);

let roots = vec![next_version.into()];
Expand Down Expand Up @@ -341,11 +341,7 @@ where
}

/// Request that new history be syndicated to IPFS
async fn notify_ipfs_syndicator(
&self,
next_version: Link<MemoIpld>,
push_body: &PushBody,
) -> Result<()> {
async fn notify_ipfs_syndicator(&self, push_body: &PushBody) -> Result<()> {
debug!("Notifying syndication worker of new blocks...");

let name_publish_on_success = if let Some(name_record) = &push_body.name_record {
Expand All @@ -359,7 +355,6 @@ where
// have added it to the gateway.
if let Err(error) = self.job_runner_client.submit(GatewayJob::IpfsSyndication {
identity: self.gateway_scope.counterpart.to_owned(),
revision: Some(next_version),
name_publish_on_success,
}) {
warn!("Failed to queue IPFS syndication job: {}", error);
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-gateway/src/jobs/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Result;

/// [JobClient] allows a gateway or other service
/// to submit jobs to be processed.
pub trait JobClient: Clone + Send + Sync {
pub trait JobClient: Send + Sync {
/// Submit a [GatewayJob] to be processed.
fn submit(&self, job: GatewayJob) -> Result<()>;
}
7 changes: 0 additions & 7 deletions rust/noosphere-gateway/src/jobs/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ pub enum GatewayJob {
IpfsSyndication {
/// Counterpart sphere associated with this job.
identity: Did,
/// The revision of the sphere to discover the _counterpart_ sphere
/// from; the counterpart sphere's revision will need to be derived using
/// this checkpoint in local sphere history.
/// If not specified, uses the latest local revision.
///
/// @TODO this looks currently unused, confirm
revision: Option<Link<MemoIpld>>,
/// If provided, queues up a subsequent [GatewayJob::NameSystemPublish]
/// job to run upon success with the provided link record.
name_publish_on_success: Option<LinkRecord>,
Expand Down
6 changes: 3 additions & 3 deletions rust/noosphere-gateway/src/jobs/job_context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::ContextResolver;
use crate::SphereContextResolver;
use noosphere_core::context::HasMutableSphereContext;
use noosphere_ipfs::IpfsClient;
use noosphere_ns::NameResolver;
Expand All @@ -11,7 +11,7 @@ use std::marker::PhantomData;
pub struct GatewayJobContext<R, C, S, N, I>
where
Self: Send,
R: ContextResolver<C, S>,
R: SphereContextResolver<C, S>,
C: HasMutableSphereContext<S>,
S: Storage + 'static,
N: NameResolver + Clone,
Expand All @@ -29,7 +29,7 @@ where

impl<R, C, S, N, I> GatewayJobContext<R, C, S, N, I>
where
R: ContextResolver<C, S>,
R: SphereContextResolver<C, S>,
C: HasMutableSphereContext<S>,
S: Storage + 'static,
N: NameResolver + Clone,
Expand Down
10 changes: 4 additions & 6 deletions rust/noosphere-gateway/src/jobs/job_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
jobs::{processors::*, worker_queue::Processor, GatewayJob, GatewayJobContext},
ContextResolver,
SphereContextResolver,
};
use anyhow::Result;
use async_trait::async_trait;
Expand All @@ -14,7 +14,7 @@ use std::marker::PhantomData;
#[derive(Clone)]
pub struct GatewayJobProcessor<R, C, S, N, I>
where
R: ContextResolver<C, S>,
R: SphereContextResolver<C, S>,
C: HasMutableSphereContext<S>,
S: Storage + 'static,
N: NameResolver + Clone,
Expand All @@ -30,7 +30,7 @@ where
#[async_trait]
impl<R, C, S, N, I> Processor for GatewayJobProcessor<R, C, S, N, I>
where
R: ContextResolver<C, S> + 'static,
R: SphereContextResolver<C, S> + 'static,
C: HasMutableSphereContext<S> + 'static,
S: Storage + 'static,
N: NameResolver + Clone + 'static,
Expand All @@ -50,7 +50,7 @@ pub async fn process_job<R, C, S, N, I>(
job: GatewayJob,
) -> Result<Option<GatewayJob>>
where
R: ContextResolver<C, S>,
R: SphereContextResolver<C, S>,
C: HasMutableSphereContext<S>,
S: Storage + 'static,
N: NameResolver + Clone + 'static,
Expand All @@ -62,12 +62,10 @@ where
}
GatewayJob::IpfsSyndication {
identity,
revision,
name_publish_on_success,
} => {
syndicate_to_ipfs(
context.context_resolver.get_context(&identity).await?,
revision,
context.ipfs_client,
name_publish_on_success,
)
Expand Down
6 changes: 1 addition & 5 deletions rust/noosphere-gateway/src/jobs/processors/syndication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ impl SyndicationCheckpoint {
// Force full re-syndicate every 90 days
const MAX_SYNDICATION_CHECKPOINT_LIFETIME: Duration = Duration::from_secs(60 * 60 * 24 * 90);

/// Syndicate content to IPFS for given `context` since `revision`,
/// Syndicate content to IPFS for given `context`,
/// optionally publishing a provided [LinkRecord] on success.
#[instrument(skip(context, ipfs_client, name_publish_on_success))]
pub async fn syndicate_to_ipfs<C, S, I>(
context: C,
revision: Option<Link<MemoIpld>>,
ipfs_client: I,
name_publish_on_success: Option<LinkRecord>,
) -> Result<Option<GatewayJob>>
Expand All @@ -69,8 +68,6 @@ where
S: Storage + 'static,
I: IpfsClient + 'static,
{
let version_str = revision.map_or_else(|| "latest".into(), |link| link.cid.to_string());
debug!("Attempting to syndicate version DAG {version_str} to IPFS");
let kubo_identity = ipfs_client
.server_identity()
.await
Expand Down Expand Up @@ -268,7 +265,6 @@ mod tests {
debug!("Sending syndication job...");
syndicate_to_ipfs(
gateway_sphere_context.clone(),
Some(version.clone()),
local_kubo_client.clone(),
None,
)
Expand Down
9 changes: 5 additions & 4 deletions rust/noosphere-gateway/src/jobs/worker_queue/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use anyhow::{anyhow, Result};
use std::time::Duration;

/// Builder helper for [WorkerQueue].
/// Uses the same defaults as [WorkerQueue].
pub struct WorkerQueueBuilder<P: Processor> {
worker_count: usize,
retries: Option<usize>,
Expand All @@ -18,8 +19,8 @@ where
pub fn new() -> Self {
Self {
worker_count: 1,
retries: Some(1),
timeout: Some(Duration::from_secs(60 * 3)),
retries: None,
timeout: None,
context: None,
}
}
Expand Down Expand Up @@ -57,12 +58,12 @@ where
return Err(anyhow!("context must be provided."));
}

Ok(WorkerQueue::spawn(
WorkerQueue::spawn(
self.worker_count,
self.context.unwrap(),
self.retries,
self.timeout,
))
)
}
}

Expand Down
43 changes: 34 additions & 9 deletions rust/noosphere-gateway/src/jobs/worker_queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
//! Contains a generic worker queue in service of Noosphere
//! Gateway job processing.
//! Queue and distribute work among threads.
//!
//! [WorkerQueue] spawns an orchestrator thread that
//! distributes work among long-lived worker threads.
//!
//! The [WorkerQueue] [Processor] describes the type
//! of job requests ([Processor::Job]) and how the work
//! is performed. Processors report job completion as
//! [anyhow::Result<Option<Processor::Job>>], and
//! an error results in the job being rescheduled to run
//! again if a retry limit was configured.
//! If an `Ok(Some(job))` value is returned, the new job
//! will be put in the queue for subsequent processing
//! as a way to schedule composite jobs.
//!
//! A timeout can also be configured, where jobs are aborted
//! if a job reaches the timeout, and retried if under a
//! configured retry limit.
//!
//! All processing and threads are terminated upon dropping
//! the [WorkerQueue] handle.

mod builder;
mod orchestrator;
mod processor;
mod queue;
mod queue_thread;
mod worker;

pub use builder::*;
Expand All @@ -16,6 +35,7 @@ mod tests {
use super::*;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use noosphere_core::tracing::initialize_tracing;
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;

Expand Down Expand Up @@ -88,7 +108,8 @@ mod tests {
}

#[tokio::test]
async fn test_worker_queue_simple() -> Result<()> {
async fn it_processes_trivial_jobs() -> Result<()> {
initialize_tracing(None);
let context: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));

let queue = WorkerQueueBuilder::<TestProcessor>::new()
Expand All @@ -113,7 +134,8 @@ mod tests {
}

#[tokio::test]
async fn test_worker_queue_subsequent_job() -> Result<()> {
async fn it_processes_jobs_queued_from_other_jobs() -> Result<()> {
initialize_tracing(None);
let context: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));

let queue = WorkerQueueBuilder::<TestProcessor>::new()
Expand All @@ -140,7 +162,8 @@ mod tests {
}

#[tokio::test]
async fn test_worker_queue_retries() -> Result<()> {
async fn it_retries_jobs_upon_failure() -> Result<()> {
initialize_tracing(None);
let context: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));

let queue = WorkerQueueBuilder::<TestProcessor>::new()
Expand All @@ -161,7 +184,8 @@ mod tests {
queue.submit(job.to_owned())?;
}

// We expect 2 additional `WillFail` jobs due to retries.
// We expect 3 additional `WillFail` jobs due to retries.
jobs.push(TestJob::WillFail("expectedfailure".into()));
jobs.push(TestJob::WillFail("expectedfailure".into()));
jobs.push(TestJob::WillFail("expectedfailure".into()));

Expand All @@ -170,13 +194,14 @@ mod tests {
}

#[tokio::test]
async fn test_worker_queue_timeouts() -> Result<()> {
async fn it_retries_jobs_that_timeout() -> Result<()> {
initialize_tracing(None);
let context: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));

let queue = WorkerQueueBuilder::<TestProcessor>::new()
.with_worker_count(1)
.with_timeout(Duration::from_secs(1))
.with_retries(2)
.with_retries(1)
.with_context(context.clone())
.build()?;

Expand Down
Loading

0 comments on commit 59fb09a

Please sign in to comment.