Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
Initial Job Worker revamp.
Browse files Browse the repository at this point in the history
* Decouple worker tasks to run independently of gateway. Fixes #720.
* Reduce GatewayManager queries in the gateway handlers on request.
  Fixes #723.
  • Loading branch information
jsantell committed Jan 31, 2024
1 parent a6602b0 commit b8f9f6b
Show file tree
Hide file tree
Showing 42 changed files with 2,062 additions and 1,790 deletions.
11 changes: 9 additions & 2 deletions rust/noosphere-cli/src/native/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,16 @@ pub async fn serve(
.author()
.did()
.await?;
let manager = SingleTenantGatewayManager::new(sphere_context, counterpart.clone()).await?;
let manager = SingleTenantGatewayManager::new(
sphere_context,
counterpart.clone(),
ipfs_api,
name_resolver_api,
cors_origin,
)
.await?;

let gateway = Gateway::new(manager, ipfs_api, name_resolver_api, cors_origin)?;
let gateway = Gateway::new(manager)?;

info!(
r#"A geist is summoned to manage local sphere {}
Expand Down
12 changes: 9 additions & 3 deletions rust/noosphere-cli/src/native/helpers/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,17 @@ async fn start_gateway_for_workspace(
let client_sphere_identity = client_sphere_identity.to_owned();
let ns_url = ns_url.clone();
let ipfs_url = ipfs_url.clone();
let manager =
SingleTenantGatewayManager::new(gateway_sphere_context, client_sphere_identity).await?;
let manager = SingleTenantGatewayManager::new(
gateway_sphere_context,
client_sphere_identity,
ipfs_url,
ns_url,
None,
)
.await?;

let join_handle = tokio::spawn(async move {
let gateway = Gateway::new(manager, ipfs_url, ns_url, None).unwrap();
let gateway = Gateway::new(manager).unwrap();
gateway.start(gateway_listener).await.unwrap()
});

Expand Down
19 changes: 19 additions & 0 deletions rust/noosphere-gateway/src/context_resolver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use anyhow::Result;
use async_trait::async_trait;
use noosphere_core::{context::HasMutableSphereContext, data::Did};
use noosphere_storage::Storage;

#[cfg(doc)]
use noosphere_core::context::SphereContext;

/// Returns a [SphereContext] given a [Did]. For gateways, this
/// `did` is the gateway identity, the author of the sphere context.
#[async_trait]
pub trait ContextResolver<C, S>: Clone + Send + Sync
where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
{
/// Get a [SphereContext] that is mapped from `did`.
async fn get_context(&self, did: &Did) -> Result<C>;
}
80 changes: 50 additions & 30 deletions rust/noosphere-gateway/src/extractors/authority.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use crate::{
extractors::{map_bad_request, GatewayScope},
GatewayManager,
};
use anyhow::Result;
use async_trait::async_trait;
use axum::{
Expand All @@ -10,14 +14,11 @@ use axum_extra::{
};
use noosphere_core::{
api::headers::{self as noosphere_headers},
authority::{SphereAbility, SphereReference, SPHERE_SEMANTICS},
authority::{generate_capability, SphereAbility, SPHERE_SEMANTICS},
context::HasMutableSphereContext,
data::Did,
};
use noosphere_storage::Storage;
use ucan::capability::CapabilityView;

use crate::extractors::map_bad_request;
use std::{marker::PhantomData, sync::Arc};

/// Represents the scope of a gateway request's authorization and sphere
/// access.
Expand All @@ -26,29 +27,32 @@ use crate::extractors::map_bad_request;
/// represented by its `ucan` headers. Any request handler can use a [GatewayAuthority]
/// to test if a required capability is satisfied by the authorization
/// presented by the maker of the request.
pub struct GatewayAuthority {
pub struct GatewayAuthority<M, C, S> {
bearer: Bearer,
ucans: noosphere_headers::Ucan,
manager: Arc<M>,
sphere_context_marker: PhantomData<C>,
storage_marker: PhantomData<S>,
}

impl GatewayAuthority {
pub async fn try_authorize<C, S>(
impl<M, C, S> GatewayAuthority<M, C, S>
where
M: GatewayManager<C, S> + 'static,
C: HasMutableSphereContext<S>,
S: Storage + 'static,
{
pub async fn try_authorize(
&self,
sphere_context: &mut C,
counterpart: &Did,
capability: &CapabilityView<SphereReference, SphereAbility>,
) -> Result<(), StatusCode>
where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
{
let db = {
let sphere_context: C::SphereContext = sphere_context
.sphere_context()
.await
.map_err(map_bad_request)?;
sphere_context.db().clone()
};
gateway_scope: &GatewayScope<C, S>,
required_ability: SphereAbility,
) -> Result<C, StatusCode> {
let counterpart_str = gateway_scope.counterpart.as_str();
let capability = generate_capability(counterpart_str, required_ability);
let db = self
.manager
.ucan_store()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;

let proof_chain = self
.ucans
Expand All @@ -60,11 +64,15 @@ impl GatewayAuthority {

for capability_info in capability_infos {
trace!("Checking capability: {:?}", capability_info.capability);
if capability_info.originators.contains(counterpart.as_str())
&& capability_info.capability.enables(capability)
if capability_info.originators.contains(counterpart_str)
&& capability_info.capability.enables(&capability)
{
debug!("Authorized!");
return Ok(());
return self
.manager
.sphere_context(&gateway_scope.gateway_identity)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR);
}
}

Expand All @@ -73,13 +81,18 @@ impl GatewayAuthority {
}

#[async_trait]
impl<State> FromRequestParts<State> for GatewayAuthority
impl<M, C, S> FromRequestParts<Arc<M>> for GatewayAuthority<M, C, S>
where
State: Send + Sync,
M: GatewayManager<C, S> + 'static,
C: HasMutableSphereContext<S>,
S: Storage + 'static,
{
type Rejection = StatusCode;

async fn from_request_parts(parts: &mut Parts, state: &State) -> Result<Self, Self::Rejection> {
async fn from_request_parts(
parts: &mut Parts,
state: &Arc<M>,
) -> Result<Self, Self::Rejection> {
let TypedHeader(Authorization(bearer)) =
TypedHeader::<Authorization<Bearer>>::from_request_parts(parts, state)
.await
Expand All @@ -91,6 +104,13 @@ where
.map_err(map_bad_request)?
.0;

Ok(GatewayAuthority { bearer, ucans })
let manager = state.to_owned();
Ok(GatewayAuthority {
bearer,
ucans,
manager,
sphere_context_marker: PhantomData,
storage_marker: PhantomData,
})
}
}
2 changes: 0 additions & 2 deletions rust/noosphere-gateway/src/extractors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
mod authority;
mod cbor;
mod scope;
mod sphere_extractor;

pub use authority::*;
pub use cbor::*;
pub use scope::*;
pub use sphere_extractor::*;

pub(crate) fn map_bad_request<E: std::fmt::Debug>(error: E) -> axum::http::StatusCode {
tracing::error!("{:?}", error);
Expand Down
27 changes: 13 additions & 14 deletions rust/noosphere-gateway/src/extractors/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,31 @@ use noosphere_storage::Storage;
#[cfg(doc)]
use noosphere_core::context::SphereContext;

use super::map_bad_request;

/// Represents the scope of a gateway request as a counterpart [Did],
/// and the corresponding managed sphere's author/device key,
/// the gateway identity.
///
/// Extracting a [GatewayScope] is efficient, and does not open
/// a [SphereContext].
#[derive(Clone)]
pub struct GatewayScope<C, S> {
pub counterpart: Did,
pub gateway_identity: Did,
sphere_context_marker: PhantomData<C>,
storage_marker: PhantomData<S>,
}

impl<C, S> GatewayScope<C, S> {
pub fn new(gateway_identity: Did, counterpart: Did) -> Self {
Self {
gateway_identity,
counterpart,
sphere_context_marker: PhantomData,
storage_marker: PhantomData,
}
}
}

#[async_trait]
impl<M, C, S> FromRequestParts<Arc<M>> for GatewayScope<C, S>
where
Expand All @@ -41,17 +51,6 @@ where
parts: &mut Parts,
state: &Arc<M>,
) -> Result<Self, Self::Rejection> {
let counterpart = state.extract_counterpart(parts).await?;
let gateway_identity = state
.get_gateway_identity(&counterpart)
.await
.map_err(map_bad_request)?;

Ok(GatewayScope {
counterpart,
gateway_identity,
sphere_context_marker: PhantomData,
storage_marker: PhantomData,
})
state.gateway_scope(parts).await
}
}
63 changes: 0 additions & 63 deletions rust/noosphere-gateway/src/extractors/sphere_extractor.rs

This file was deleted.

Loading

0 comments on commit b8f9f6b

Please sign in to comment.