Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure that an append is indexed before returning a successful result. #8

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions geth-engine/src/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pub mod streams {
pub mod types {
pub static STREAM_DELETED: &str = "$stream-deleted";
pub static EVENTS_WRITTEN: &str = "$events-written";
pub static EVENTS_INDEXED: &str = "$events-indexed";
}
13 changes: 13 additions & 0 deletions geth-engine/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ use crate::messages::{
};
use crate::process::storage::StorageService;
pub use crate::process::subscriptions::SubscriptionsClient;
use crate::process::write_request_manager::WriteRequestManagerClient;

pub mod storage;
mod subscriptions;
mod write_request_manager;

pub struct InternalClient<WAL, S: Storage> {
storage: StorageService<WAL, S>,
subscriptions: SubscriptionsClient,
write_request_manager_client: WriteRequestManagerClient,
}

impl<WAL, S> InternalClient<WAL, S>
Expand All @@ -40,6 +43,7 @@ where
Self {
storage: processes.storage,
subscriptions: processes.subscriptions,
write_request_manager_client: processes.write_request_manager_client,
}
}
}
Expand All @@ -65,6 +69,12 @@ where
})
.await?;

if let AppendStreamCompleted::Success(ref result) = outcome {
self.write_request_manager_client
.wait_until_indexing_reach(result.position.raw())
.await?;
}

Ok(outcome)
}

Expand Down Expand Up @@ -290,6 +300,7 @@ where
{
storage: StorageService<WAL, S>,
subscriptions: SubscriptionsClient,
write_request_manager_client: WriteRequestManagerClient,
}

impl<WAL, S> Processes<WAL, S>
Expand All @@ -299,11 +310,13 @@ where
{
pub fn new(wal: WALRef<WAL>, index: Index<S>) -> Self {
let subscriptions = subscriptions::start();
let write_request_manager_client = write_request_manager::start(subscriptions.clone());
let storage = StorageService::new(wal, index, subscriptions.clone());

Self {
storage,
subscriptions,
write_request_manager_client,
}
}

Expand Down
149 changes: 149 additions & 0 deletions geth-engine/src/process/write_request_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::collections::BTreeMap;

use eyre::Context;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;

use geth_common::Record;

use crate::bus::SubscribeMsg;
use crate::messages::{StreamTarget, SubscribeTo, SubscriptionRequestOutcome, SubscriptionTarget};
use crate::names;
use crate::process::SubscriptionsClient;

pub struct Request {
pub position: u64,
pub callback: oneshot::Sender<()>,
}

pub enum Cmd {
Apply(Record),
Register(Request),
}

#[derive(Clone)]
pub struct WriteRequestManagerClient {
inner: UnboundedSender<Cmd>,
}

impl WriteRequestManagerClient {
pub async fn wait_until_indexing_reach(&self, position: u64) -> eyre::Result<()> {
let (callback, recv) = oneshot::channel();

self.inner
.send(Cmd::Register(Request { position, callback }))
.wrap_err("write-request manager is unreachable")?;

recv.await
.wrap_err("unexpected error when waiting append request to be indexed")?;

Ok(())
}
}

pub fn start(sub_client: SubscriptionsClient) -> WriteRequestManagerClient {
let (inner, mailbox) = unbounded_channel();

tokio::spawn(indexing_subscription(sub_client, inner.clone()));
tokio::spawn(manager_process(mailbox));

WriteRequestManagerClient { inner }
}

#[derive(Default)]
struct WriteRequestManager {
position: u64,
inner: BTreeMap<u64, Request>,
}

impl WriteRequestManager {
fn collect_keys(&self, up_to: u64) -> Vec<u64> {
self.inner.range(..=up_to).map(|(k, _)| *k).collect()
}

pub fn handle(&mut self, cmd: Cmd) {
match cmd {
Cmd::Apply(r) => self.apply(r),
Cmd::Register(req) => self.register(req),
}
}

fn apply(&mut self, record: Record) {
if record.stream_name != names::streams::SYSTEM
|| record.r#type != names::types::EVENTS_INDEXED
{
return;
}

self.position = record.revision;

for key in self.collect_keys(record.revision) {
if let Some(req) = self.inner.remove(&key) {
let _ = req.callback.send(());
}
}
}

fn register(&mut self, request: Request) {
if self.position >= request.position {
let _ = request.callback.send(());
return;
}

self.inner.insert(request.position, request);
}
}

async fn indexing_subscription(
sub_client: SubscriptionsClient,
sender: UnboundedSender<Cmd>,
) -> eyre::Result<()> {
let (mail, resp) = oneshot::channel();

sub_client.subscribe(SubscribeMsg {
payload: SubscribeTo {
target: SubscriptionTarget::Stream(StreamTarget {
parent: None,
stream_name: names::streams::SYSTEM.to_string(),
}),
},
mail,
})?;

let confirm = resp
.await
.wrap_err("write-request manager is unable to subscribe to $system")?;

let mut stream = match confirm.outcome {
SubscriptionRequestOutcome::Success(s) => s,
SubscriptionRequestOutcome::Failure(e) => {
tracing::error!(
"write-request manager is unable to subscribe to $system: {}",
e
);

return Err(e);
}
};

while let Some(record) = stream.next().await? {
if sender.send(Cmd::Apply(record)).is_err() {
tracing::error!("unable to communicate with write-request manager process");
break;
}
}

tracing::warn!("write-request manager subscription to $system exited");

Ok(())
}

async fn manager_process(mut mailbox: UnboundedReceiver<Cmd>) {
let mut mgr = WriteRequestManager::default();

while let Some(cmd) = mailbox.recv().await {
mgr.handle(cmd);
}

tracing::warn!("write-request manager process exited")
}
15 changes: 13 additions & 2 deletions geth-engine/src/services/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io;
use bytes::{Buf, Bytes};
use futures::TryStreamExt;

use geth_common::{Client, Direction, Record, Revision, SubscriptionEvent};
use geth_common::{Client, Direction, Position, Record, Revision, SubscriptionEvent};
use geth_mikoshi::hashing::mikoshi_hash;
use geth_mikoshi::storage::{FileId, Storage};

Expand All @@ -20,7 +20,7 @@ where
C: Client,
S: Storage + Send + Sync + 'static,
{
let mut internal = Internal::new(index, sub_client)?;
let mut internal = Internal::new(index, sub_client.clone())?;

let starting = if internal.index_pos == 0 {
Revision::Start
Expand Down Expand Up @@ -67,9 +67,20 @@ where
)
.await;

let mut position = 0;
while let Some(record) = stream.try_next().await? {
position = record.position.raw();
internal.index_record(record)?;
}

sub_client.event_committed(Record {
id: Default::default(),
r#type: names::types::EVENTS_INDEXED.to_string(),
stream_name: names::streams::SYSTEM.to_string(),
position: Position(position),
revision: position,
data: Default::default(),
})?
}
}
}
Expand Down