Skip to content

Commit

Permalink
Make sure that an append is indexed before returning a successful result
Browse files Browse the repository at this point in the history
to the user.
  • Loading branch information
YoEight committed Jul 7, 2024
1 parent d8567be commit 92604db
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 2 deletions.
1 change: 1 addition & 0 deletions geth-engine/src/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pub mod streams {
pub mod types {
pub static STREAM_DELETED: &str = "$stream-deleted";
pub static EVENTS_WRITTEN: &str = "$events-written";
pub static EVENTS_INDEXED: &str = "$events-indexed";
}
13 changes: 13 additions & 0 deletions geth-engine/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ use crate::messages::{
};
use crate::process::storage::StorageService;
pub use crate::process::subscriptions::SubscriptionsClient;
use crate::process::write_request_manager::WriteRequestManagerClient;

pub mod storage;
mod subscriptions;
mod write_request_manager;

pub struct InternalClient<WAL, S: Storage> {
storage: StorageService<WAL, S>,
subscriptions: SubscriptionsClient,
write_request_manager_client: WriteRequestManagerClient,
}

impl<WAL, S> InternalClient<WAL, S>
Expand All @@ -40,6 +43,7 @@ where
Self {
storage: processes.storage,
subscriptions: processes.subscriptions,
write_request_manager_client: processes.write_request_manager_client,
}
}
}
Expand All @@ -65,6 +69,12 @@ where
})
.await?;

if let AppendStreamCompleted::Success(ref result) = outcome {
self.write_request_manager_client
.wait_until_indexing_reach(result.position.raw())
.await?;
}

Ok(outcome)
}

Expand Down Expand Up @@ -290,6 +300,7 @@ where
{
storage: StorageService<WAL, S>,
subscriptions: SubscriptionsClient,
write_request_manager_client: WriteRequestManagerClient,
}

impl<WAL, S> Processes<WAL, S>
Expand All @@ -299,11 +310,13 @@ where
{
pub fn new(wal: WALRef<WAL>, index: Index<S>) -> Self {
let subscriptions = subscriptions::start();
let write_request_manager_client = write_request_manager::start(subscriptions.clone());
let storage = StorageService::new(wal, index, subscriptions.clone());

Self {
storage,
subscriptions,
write_request_manager_client,
}
}

Expand Down
157 changes: 157 additions & 0 deletions geth-engine/src/process/write_request_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use std::collections::BTreeMap;

use eyre::Context;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;

use geth_common::Record;

use crate::bus::SubscribeMsg;
use crate::messages::{StreamTarget, SubscribeTo, SubscriptionRequestOutcome, SubscriptionTarget};
use crate::names;
use crate::process::SubscriptionsClient;

pub struct Request {
pub position: u64,
pub callback: oneshot::Sender<()>,
}

pub enum Cmd {
Apply(Record),
Register(Request),
}

#[derive(Clone)]
pub struct WriteRequestManagerClient {
inner: UnboundedSender<Cmd>,
}

impl WriteRequestManagerClient {
pub async fn wait_until_indexing_reach(&self, position: u64) -> eyre::Result<()> {
let (callback, recv) = oneshot::channel();

self.inner
.send(Cmd::Register(Request { position, callback }))
.wrap_err("write-request manager is unreachable")?;

recv.await
.wrap_err("unexpected error when waiting append request to be indexed")?;

Ok(())
}
}

pub fn start(sub_client: SubscriptionsClient) -> WriteRequestManagerClient {
let (inner, mailbox) = unbounded_channel();

tokio::spawn(indexing_subscription(sub_client, inner.clone()));
tokio::spawn(manager_process(mailbox));

WriteRequestManagerClient { inner }
}

struct WriteRequestManager {
position: u64,
inner: BTreeMap<u64, Request>,
}

impl Default for WriteRequestManager {
fn default() -> Self {
Self {
position: 0,
inner: Default::default(),
}
}
}

impl WriteRequestManager {
fn collect_keys(&self, up_to: u64) -> Vec<u64> {
self.inner.range(..=up_to).map(|(k, _)| *k).collect()
}

pub fn handle(&mut self, cmd: Cmd) {
match cmd {
Cmd::Apply(r) => self.apply(r),
Cmd::Register(req) => self.register(req),
}
}

fn apply(&mut self, record: Record) {
if record.stream_name != names::streams::SYSTEM
|| record.r#type != names::types::EVENTS_INDEXED
{
return;
}

self.position = record.revision;

for key in self.collect_keys(record.revision) {
if let Some(req) = self.inner.remove(&key) {
let _ = req.callback.send(());
}
}
}

fn register(&mut self, request: Request) {
if self.position >= request.position {
let _ = request.callback.send(());
return;
}

self.inner.insert(request.position, request);
}
}

async fn indexing_subscription(
sub_client: SubscriptionsClient,
sender: UnboundedSender<Cmd>,
) -> eyre::Result<()> {
let (mail, resp) = oneshot::channel();

sub_client.subscribe(SubscribeMsg {
payload: SubscribeTo {
target: SubscriptionTarget::Stream(StreamTarget {
parent: None,
stream_name: names::streams::SYSTEM.to_string(),
}),
},
mail,
})?;

let confirm = resp
.await
.wrap_err("write-request manager is unable to subscribe to $system")?;

let mut stream = match confirm.outcome {
SubscriptionRequestOutcome::Success(s) => s,
SubscriptionRequestOutcome::Failure(e) => {
tracing::error!(
"write-request manager is unable to subscribe to $system: {}",
e
);

return Err(e);
}
};

while let Some(record) = stream.next().await? {
if sender.send(Cmd::Apply(record)).is_err() {
tracing::error!("unable to communicate with write-request manager process");
break;
}
}

tracing::warn!("write-request manager subscription to $system exited");

Ok(())
}

async fn manager_process(mut mailbox: UnboundedReceiver<Cmd>) {
let mut mgr = WriteRequestManager::default();

while let Some(cmd) = mailbox.recv().await {
mgr.handle(cmd);
}

tracing::warn!("write-request manager process exited")
}
15 changes: 13 additions & 2 deletions geth-engine/src/services/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io;
use bytes::{Buf, Bytes};
use futures::TryStreamExt;

use geth_common::{Client, Direction, Record, Revision, SubscriptionEvent};
use geth_common::{Client, Direction, Position, Record, Revision, SubscriptionEvent};
use geth_mikoshi::hashing::mikoshi_hash;
use geth_mikoshi::storage::{FileId, Storage};

Expand All @@ -20,7 +20,7 @@ where
C: Client,
S: Storage + Send + Sync + 'static,
{
let mut internal = Internal::new(index, sub_client)?;
let mut internal = Internal::new(index, sub_client.clone())?;

let starting = if internal.index_pos == 0 {
Revision::Start
Expand Down Expand Up @@ -67,9 +67,20 @@ where
)
.await;

let mut position = 0;
while let Some(record) = stream.try_next().await? {
position = record.position.raw();
internal.index_record(record)?;
}

sub_client.event_committed(Record {
id: Default::default(),
r#type: names::types::EVENTS_INDEXED.to_string(),
stream_name: names::streams::SYSTEM.to_string(),
position: Position(position),
revision: position,
data: Default::default(),
})?
}
}
}
Expand Down

0 comments on commit 92604db

Please sign in to comment.