Skip to content

Commit

Permalink
Move indexing from a process to a service.
Browse files Browse the repository at this point in the history
  • Loading branch information
YoEight committed Jul 1, 2024
1 parent 61f1ea3 commit e364ddb
Show file tree
Hide file tree
Showing 21 changed files with 344 additions and 313 deletions.
20 changes: 12 additions & 8 deletions geth-common/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ where
expected_revision: ExpectedRevision,
proposes: Vec<Propose>,
) -> eyre::Result<AppendStreamCompleted> {
self.append_stream(stream_id, expected_revision, proposes)
self.as_ref()
.append_stream(stream_id, expected_revision, proposes)
.await
}

Expand All @@ -86,7 +87,8 @@ where
revision: Revision<u64>,
max_count: u64,
) -> BoxStream<'static, eyre::Result<Record>> {
self.read_stream(stream_id, direction, revision, max_count)
self.as_ref()
.read_stream(stream_id, direction, revision, max_count)
.await
}

Expand All @@ -95,34 +97,36 @@ where
stream_id: &str,
start: Revision<u64>,
) -> BoxStream<'static, eyre::Result<SubscriptionEvent>> {
self.subscribe_to_stream(stream_id, start).await
self.as_ref().subscribe_to_stream(stream_id, start).await
}

async fn subscribe_to_process(
&self,
name: &str,
source_code: &str,
) -> BoxStream<'static, eyre::Result<SubscriptionEvent>> {
self.subscribe_to_process(name, source_code).await
self.as_ref().subscribe_to_process(name, source_code).await
}

async fn delete_stream(
&self,
stream_id: &str,
expected_revision: ExpectedRevision,
) -> eyre::Result<DeleteStreamCompleted> {
self.delete_stream(stream_id, expected_revision).await
self.as_ref()
.delete_stream(stream_id, expected_revision)
.await
}

async fn list_programs(&self) -> eyre::Result<Vec<ProgramSummary>> {
self.list_programs().await
self.as_ref().list_programs().await
}

async fn get_program(&self, id: Uuid) -> eyre::Result<ProgramObtained> {
self.get_program(id).await
self.as_ref().get_program(id).await
}

async fn kill_program(&self, id: Uuid) -> eyre::Result<ProgramKilled> {
self.kill_program(id).await
self.as_ref().kill_program(id).await
}
}
2 changes: 1 addition & 1 deletion geth-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ impl From<operation_in::append_stream::Propose> for Propose {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Record {
pub id: Uuid,
pub r#type: String,
Expand Down
2 changes: 1 addition & 1 deletion geth-domain/src/index/lsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use geth_mikoshi::wal::{WALRef, WriteAheadLog};
use crate::binary::events::Event;
use crate::index::block::BlockEntry;
use crate::index::mem_table::MemTable;
use crate::index::Merge;
use crate::index::ss_table::SsTable;
use crate::index::Merge;
use crate::parse_event_io;

pub const LSM_DEFAULT_MEM_TABLE_SIZE: usize = 4_096;
Expand Down
1 change: 1 addition & 0 deletions geth-domain/src/index/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub use block::BlockEntry;
pub use lsm::{Lsm, LsmSettings};
pub use merge::Merge;

Expand Down
2 changes: 1 addition & 1 deletion geth-domain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::binary::events::Events;

mod append_propose;
pub mod binary;
mod index;
pub mod index;
mod iter;

#[derive(Copy, Clone, Eq, PartialEq)]
Expand Down
33 changes: 6 additions & 27 deletions geth-mikoshi/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use bytes::Bytes;
use chrono::{DateTime, Utc};
use tokio::sync::mpsc;
use uuid::Uuid;

use geth_common::{Position, Record};
use geth_common::Record;

pub use crate::storage::fs::FileSystemStorage;
pub use crate::storage::in_mem::InMemoryStorage;
Expand All @@ -13,19 +10,8 @@ pub mod hashing;
pub mod storage;
pub mod wal;

#[derive(Debug, Clone)]
pub struct Entry {
pub id: Uuid,
pub r#type: String,
pub stream_name: String,
pub revision: u64,
pub data: Bytes,
pub position: Position,
pub created: DateTime<Utc>,
}

pub struct MikoshiStream {
inner: mpsc::UnboundedReceiver<Entry>,
inner: mpsc::UnboundedReceiver<Record>,
}

impl MikoshiStream {
Expand All @@ -35,11 +21,11 @@ impl MikoshiStream {
Self { inner }
}

pub fn new(inner: mpsc::UnboundedReceiver<Entry>) -> Self {
pub fn new(inner: mpsc::UnboundedReceiver<Record>) -> Self {
Self { inner }
}

pub fn from_vec(entries: Vec<Entry>) -> Self {
pub fn from_vec(entries: Vec<Record>) -> Self {
let (sender, inner) = mpsc::unbounded_channel();

for entry in entries {
Expand All @@ -50,15 +36,8 @@ impl MikoshiStream {
}

pub async fn next(&mut self) -> eyre::Result<Option<Record>> {
if let Some(entry) = self.inner.recv().await {
return Ok(Some(Record {
id: entry.id,
stream_name: entry.stream_name,
position: entry.position,
revision: entry.revision,
data: entry.data,
r#type: entry.r#type,
}));
if let Some(record) = self.inner.recv().await {
return Ok(Some(record));
}

Ok(None)
Expand Down
6 changes: 3 additions & 3 deletions geth-mikoshi/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ impl FileId {
Self::Chunk { num, version }
}

pub fn writer_chk() -> Self {
pub const fn writer_chk() -> Self {
Self::Checkpoint(Checkpoint::Writer)
}

pub fn index_chk() -> Self {
pub const fn index_chk() -> Self {
Self::Checkpoint(Checkpoint::Index)
}

pub fn index_global_chk() -> Self {
pub const fn index_global_chk() -> Self {
Self::Checkpoint(Checkpoint::IndexGlobal)
}
}
Expand Down
116 changes: 116 additions & 0 deletions geth-node/src/domain/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use std::io;

use geth_common::{Direction, ExpectedRevision, IteratorIO, Revision};
use geth_domain::index::BlockEntry;
use geth_domain::Lsm;
use geth_mikoshi::hashing::mikoshi_hash;
use geth_mikoshi::storage::Storage;

pub type RevisionCache = moka::sync::Cache<String, u64>;

pub fn new_revision_cache() -> RevisionCache {
moka::sync::Cache::<String, u64>::builder()
.max_capacity(10_000)
.name("revision-cache")
.build()
}

#[derive(Clone)]
pub struct Index<S> {
lsm: Lsm<S>,
revision_cache: RevisionCache,
}

impl<S> Index<S>
where
S: Storage + Send + Sync + 'static,
{
pub fn new(lsm: Lsm<S>) -> Self {
Self {
lsm,
revision_cache: new_revision_cache(),
}
}

pub fn stream_current_revision(&self, stream_name: &str) -> io::Result<CurrentRevision> {
let stream_key = mikoshi_hash(stream_name);
let current_revision = if let Some(current) = self.revision_cache.get(stream_name) {
CurrentRevision::Revision(current)
} else {
let revision = self
.lsm
.highest_revision(stream_key)?
.map_or_else(|| CurrentRevision::NoStream, CurrentRevision::Revision);

if let CurrentRevision::Revision(rev) = revision {
self.revision_cache.insert(stream_name.to_string(), rev);
}

revision
};

Ok(current_revision)
}

pub fn cache_stream_revision(&self, stream_name: String, revision: u64) {
self.revision_cache.insert(stream_name, revision);
}

pub fn scan(
&self,
stream_name: &str,
direction: Direction,
starting: Revision<u64>,
count: usize,
) -> impl IteratorIO<Item = BlockEntry> + Send + Sync {
self.lsm
.scan(mikoshi_hash(stream_name), direction, starting, count)
}

pub fn storage(&self) -> &S {
self.lsm.storage()
}

pub fn register(&self, stream_hash: u64, revision: u64, position: u64) -> eyre::Result<()> {
self.lsm.put_single(stream_hash, revision, position)?;
Ok(())
}

pub fn register_multiple(
&self,
values: impl IntoIterator<Item = (u64, u64, u64)>,
) -> eyre::Result<()> {
self.lsm.put_values(values)?;
Ok(())
}
}

#[derive(Copy, Clone)]
pub enum CurrentRevision {
NoStream,
Revision(u64),
}

impl CurrentRevision {
pub fn next_revision(self) -> u64 {
match self {
CurrentRevision::NoStream => 0,
CurrentRevision::Revision(r) => r + 1,
}
}

pub fn as_expected(self) -> ExpectedRevision {
match self {
CurrentRevision::NoStream => ExpectedRevision::NoStream,
CurrentRevision::Revision(v) => ExpectedRevision::Revision(v),
}
}

pub fn is_deleted(&self) -> bool {
if let CurrentRevision::Revision(r) = self {
return *r == u64::MAX;
}

false
}
}
1 change: 1 addition & 0 deletions geth-node/src/domain/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod index;
2 changes: 1 addition & 1 deletion geth-node/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use tonic::transport::{self, Server};

use geth_common::Client;
use geth_common::generated::next::protocol::protocol_server::ProtocolServer;
use geth_common::Client;

mod local;
mod protocol;
Expand Down
11 changes: 6 additions & 5 deletions geth-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use geth_mikoshi::storage::FileSystemStorage;
use geth_mikoshi::wal::chunks::ChunkBasedWAL;
use geth_mikoshi::wal::WALRef;

use crate::domain::index::Index;
use crate::process::{InternalClient, Processes};

mod bus;
mod domain;
mod grpc;
pub mod messages;
mod names;
Expand All @@ -28,14 +30,13 @@ async fn main() -> eyre::Result<()> {
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

let storage = FileSystemStorage::new(PathBuf::from("./geth"))?;
let index = Lsm::load(LsmSettings::default(), storage.clone())?;

let lsm = Lsm::load(LsmSettings::default(), storage.clone())?;
let index = Index::new(lsm);
let wal = WALRef::new(ChunkBasedWAL::load(storage.clone())?);
index.rebuild(&wal)?;

let processes = Processes::new(wal, index.clone());
let sub_client = processes.subscriptions_client().clone();
let client = Arc::new(InternalClient::new(processes));
let services = services::start(client.clone(), index);
let services = services::start(client.clone(), index, sub_client);

select! {
server = grpc::start_server(client) => {
Expand Down
2 changes: 2 additions & 0 deletions geth-node/src/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
pub mod streams {
pub static ALL: &'static str = "$all";
pub static GLOBALS: &'static str = "$globals";
pub static SYSTEM: &'static str = "$system";
}

pub mod types {
pub static STREAM_DELETED: &'static str = "$stream-deleted";
pub static EVENTS_WRITTEN: &'static str = "$events-written";
}
12 changes: 8 additions & 4 deletions geth-node/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ use uuid::Uuid;

use geth_common::{
AppendStreamCompleted, Client, DeleteStreamCompleted, Direction, ExpectedRevision,
GetProgramError, ProgramKilled, ProgramKillError, ProgramObtained, ProgramSummary, Propose,
GetProgramError, ProgramKillError, ProgramKilled, ProgramObtained, ProgramSummary, Propose,
Record, Revision, SubscriptionEvent,
};
use geth_domain::Lsm;
use geth_mikoshi::storage::Storage;
use geth_mikoshi::wal::{WALRef, WriteAheadLog};

use crate::bus::{
GetProgrammableSubscriptionStatsMsg, KillProgrammableSubscriptionMsg, SubscribeMsg,
};
use crate::domain::index::Index;
use crate::messages::{
AppendStream, DeleteStream, ProcessTarget, ReadStream, ReadStreamCompleted, StreamTarget,
SubscribeTo, SubscriptionRequestOutcome, SubscriptionTarget,
};
use crate::process::storage::StorageService;
use crate::process::subscriptions::SubscriptionsClient;
pub use crate::process::subscriptions::SubscriptionsClient;

pub mod storage;
mod subscriptions;
Expand Down Expand Up @@ -294,7 +294,7 @@ where
WAL: WriteAheadLog + Send + Sync + 'static,
S: Storage + Send + Sync + 'static,
{
pub fn new(wal: WALRef<WAL>, index: Lsm<S>) -> Self {
pub fn new(wal: WALRef<WAL>, index: Index<S>) -> Self {
let subscriptions = subscriptions::start();
let storage = StorageService::new(wal, index, subscriptions.clone());

Expand All @@ -303,4 +303,8 @@ where
subscriptions,
}
}

pub fn subscriptions_client(&self) -> &SubscriptionsClient {
&self.subscriptions
}
}
Loading

0 comments on commit e364ddb

Please sign in to comment.