diff --git a/geth-engine/src/names.rs b/geth-engine/src/names.rs index a52142d..f77eba2 100644 --- a/geth-engine/src/names.rs +++ b/geth-engine/src/names.rs @@ -8,4 +8,5 @@ pub mod streams { pub mod types { pub static STREAM_DELETED: &str = "$stream-deleted"; pub static EVENTS_WRITTEN: &str = "$events-written"; + pub static EVENTS_INDEXED: &str = "$events-indexed"; } diff --git a/geth-engine/src/process.rs b/geth-engine/src/process.rs index b38fc4f..bf85d77 100644 --- a/geth-engine/src/process.rs +++ b/geth-engine/src/process.rs @@ -22,13 +22,16 @@ use crate::messages::{ }; use crate::process::storage::StorageService; pub use crate::process::subscriptions::SubscriptionsClient; +use crate::process::write_request_manager::WriteRequestManagerClient; pub mod storage; mod subscriptions; +mod write_request_manager; pub struct InternalClient { storage: StorageService, subscriptions: SubscriptionsClient, + write_request_manager_client: WriteRequestManagerClient, } impl InternalClient @@ -40,6 +43,7 @@ where Self { storage: processes.storage, subscriptions: processes.subscriptions, + write_request_manager_client: processes.write_request_manager_client, } } } @@ -65,6 +69,12 @@ where }) .await?; + if let AppendStreamCompleted::Success(ref result) = outcome { + self.write_request_manager_client + .wait_until_indexing_reach(result.position.raw()) + .await?; + } + Ok(outcome) } @@ -290,6 +300,7 @@ where { storage: StorageService, subscriptions: SubscriptionsClient, + write_request_manager_client: WriteRequestManagerClient, } impl Processes @@ -299,11 +310,13 @@ where { pub fn new(wal: WALRef, index: Index) -> Self { let subscriptions = subscriptions::start(); + let write_request_manager_client = write_request_manager::start(subscriptions.clone()); let storage = StorageService::new(wal, index, subscriptions.clone()); Self { storage, subscriptions, + write_request_manager_client, } } diff --git a/geth-engine/src/process/write_request_manager.rs b/geth-engine/src/process/write_request_manager.rs new file mode 100644 index 0000000..cc73abd --- /dev/null +++ b/geth-engine/src/process/write_request_manager.rs @@ -0,0 +1,149 @@ +use std::collections::BTreeMap; + +use eyre::Context; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; + +use geth_common::Record; + +use crate::bus::SubscribeMsg; +use crate::messages::{StreamTarget, SubscribeTo, SubscriptionRequestOutcome, SubscriptionTarget}; +use crate::names; +use crate::process::SubscriptionsClient; + +pub struct Request { + pub position: u64, + pub callback: oneshot::Sender<()>, +} + +pub enum Cmd { + Apply(Record), + Register(Request), +} + +#[derive(Clone)] +pub struct WriteRequestManagerClient { + inner: UnboundedSender, +} + +impl WriteRequestManagerClient { + pub async fn wait_until_indexing_reach(&self, position: u64) -> eyre::Result<()> { + let (callback, recv) = oneshot::channel(); + + self.inner + .send(Cmd::Register(Request { position, callback })) + .wrap_err("write-request manager is unreachable")?; + + recv.await + .wrap_err("unexpected error when waiting append request to be indexed")?; + + Ok(()) + } +} + +pub fn start(sub_client: SubscriptionsClient) -> WriteRequestManagerClient { + let (inner, mailbox) = unbounded_channel(); + + tokio::spawn(indexing_subscription(sub_client, inner.clone())); + tokio::spawn(manager_process(mailbox)); + + WriteRequestManagerClient { inner } +} + +#[derive(Default)] +struct WriteRequestManager { + position: u64, + inner: BTreeMap, +} + +impl WriteRequestManager { + fn collect_keys(&self, up_to: u64) -> Vec { + self.inner.range(..=up_to).map(|(k, _)| *k).collect() + } + + pub fn handle(&mut self, cmd: Cmd) { + match cmd { + Cmd::Apply(r) => self.apply(r), + Cmd::Register(req) => self.register(req), + } + } + + fn apply(&mut self, record: Record) { + if record.stream_name != names::streams::SYSTEM + || record.r#type != names::types::EVENTS_INDEXED + { + return; + } + + self.position = record.revision; + + for key in self.collect_keys(record.revision) { + if let Some(req) = self.inner.remove(&key) { + let _ = req.callback.send(()); + } + } + } + + fn register(&mut self, request: Request) { + if self.position >= request.position { + let _ = request.callback.send(()); + return; + } + + self.inner.insert(request.position, request); + } +} + +async fn indexing_subscription( + sub_client: SubscriptionsClient, + sender: UnboundedSender, +) -> eyre::Result<()> { + let (mail, resp) = oneshot::channel(); + + sub_client.subscribe(SubscribeMsg { + payload: SubscribeTo { + target: SubscriptionTarget::Stream(StreamTarget { + parent: None, + stream_name: names::streams::SYSTEM.to_string(), + }), + }, + mail, + })?; + + let confirm = resp + .await + .wrap_err("write-request manager is unable to subscribe to $system")?; + + let mut stream = match confirm.outcome { + SubscriptionRequestOutcome::Success(s) => s, + SubscriptionRequestOutcome::Failure(e) => { + tracing::error!( + "write-request manager is unable to subscribe to $system: {}", + e + ); + + return Err(e); + } + }; + + while let Some(record) = stream.next().await? { + if sender.send(Cmd::Apply(record)).is_err() { + tracing::error!("unable to communicate with write-request manager process"); + break; + } + } + + tracing::warn!("write-request manager subscription to $system exited"); + + Ok(()) +} + +async fn manager_process(mut mailbox: UnboundedReceiver) { + let mut mgr = WriteRequestManager::default(); + + while let Some(cmd) = mailbox.recv().await { + mgr.handle(cmd); + } + + tracing::warn!("write-request manager process exited") +} diff --git a/geth-engine/src/services/index.rs b/geth-engine/src/services/index.rs index 31cbbbd..b034ca8 100644 --- a/geth-engine/src/services/index.rs +++ b/geth-engine/src/services/index.rs @@ -3,7 +3,7 @@ use std::io; use bytes::{Buf, Bytes}; use futures::TryStreamExt; -use geth_common::{Client, Direction, Record, Revision, SubscriptionEvent}; +use geth_common::{Client, Direction, Position, Record, Revision, SubscriptionEvent}; use geth_mikoshi::hashing::mikoshi_hash; use geth_mikoshi::storage::{FileId, Storage}; @@ -20,7 +20,7 @@ where C: Client, S: Storage + Send + Sync + 'static, { - let mut internal = Internal::new(index, sub_client)?; + let mut internal = Internal::new(index, sub_client.clone())?; let starting = if internal.index_pos == 0 { Revision::Start @@ -67,9 +67,20 @@ where ) .await; + let mut position = 0; while let Some(record) = stream.try_next().await? { + position = record.position.raw(); internal.index_record(record)?; } + + sub_client.event_committed(Record { + id: Default::default(), + r#type: names::types::EVENTS_INDEXED.to_string(), + stream_name: names::streams::SYSTEM.to_string(), + position: Position(position), + revision: position, + data: Default::default(), + })? } } }