Skip to content

Commit

Permalink
v0.3.0 (#41)
Browse files Browse the repository at this point in the history
* diffing kickoff

* v0.3.0 on the way

* Fix example. Renamings

* Fix initial bootstraping && remove prints

* fmt & clippy

* Update example. Add drop for ReadReceiver to make sure all future wakers are consumed and called

* Move all blocking ops to spawned thread

* Redo async bridge API

* Name things properly, remove missleading comment, snapshots are explicit now

* Rebase, fmt & clippy
  • Loading branch information
dmzmk authored Mar 10, 2023
1 parent ed2419c commit d243ef5
Show file tree
Hide file tree
Showing 15 changed files with 424 additions and 288 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/sync-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ publish = false

[dependencies]
libsqlite-sys = { path = "../../libsqlite-sys" }
journal = { path = "../../journal" }
journal = { path = "../../journal", features = ["async"] }
serde_sqlite = { path = "../../serde_sqlite" }

tokio = { version = "1", features = ["full"] }
Expand Down
96 changes: 28 additions & 68 deletions examples/sync-backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,7 @@
//!
//! ** Strictly for the demo purposes only **
//! ** Known issues **:
//! - It works only for single database and single client
//! - There is no Sync procotol implemented here at all – direct stream of journal.
//! - The server assumes the client sends only new snapshots so the local version is not checked, and it's
//! possible to write the same snapshots multiple times.
//! - No sanity checks that the client actually sends valid data not random garbage
//! - Calling Journal::add_page directly is a hack and rewrites snapshot timestamps/numbers.
//! - The Journal API doesn't allow to write headers directly (yet).
//! - The Journal is experimental, it only supports blocking IO so the scheduler is blocked on the journal IO ops.
//! - It works only for single database
//!
//! Run with
//!
Expand All @@ -19,21 +12,20 @@

use axum::{
extract::{BodyStream, Path, State, Query},
http::StatusCode,
body,
response,
routing::get,
Router, Server,
};
use futures::StreamExt;
use journal::{Journal, Protocol, Stream};
use serde_sqlite::de;
use std::io::Read;
use std::sync::Arc;
use tokio::sync::Mutex;
use journal::{Journal, AsyncReadJournalStream, AsyncWriteJournalStream};
use tokio::io::AsyncWriteExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use serde::Deserialize;

fn to_error<T: std::fmt::Debug>(e: T) -> String {
format!("{e:?}")
fn to_error<T: std::fmt::Debug>(_e: T) -> StatusCode {
StatusCode::INTERNAL_SERVER_ERROR
}

#[derive(Debug, Default, Deserialize)]
Expand All @@ -48,52 +40,27 @@ async fn post_snapshot(
State(state): State<AppState>,
Path(_domain): Path<String>,
mut stream: BodyStream,
) -> Result<&'static str, String> {
let mut whole_body = vec![];
) -> Result<&'static str, StatusCode> {
let mut write_stream = AsyncWriteJournalStream::new(state.journal_path).spawn();
while let Some(chunk) = stream.next().await {
whole_body.extend(chunk.map_err(to_error)?);
}
if whole_body.is_empty() {
return Ok("");
}
let mut whole_body = std::io::Cursor::new(whole_body);
let mut journal = state.journal.lock().await;

loop {
match de::from_reader::<Protocol, _>(&mut whole_body) {
Ok(Protocol::SnapshotHeader(snapshot_header)) => {
journal.commit().map_err(to_error)?;
journal.add_snapshot(&snapshot_header).map_err(to_error)?;
tracing::info!("snapshot: {:?}", snapshot_header.id);
}
Ok(Protocol::PageHeader(page_header)) => {
let mut page = vec![0; page_header.page_size as usize];
whole_body.read_exact(page.as_mut_slice()).map_err(to_error)?;
journal.add_page(&page_header, page.as_slice()).map_err(to_error)?;
tracing::info!(" page: {:?}", page_header.page_num);
},
Ok(Protocol::EndOfStream(_)) => {
journal.commit().map_err(to_error)?;
tracing::info!("end of stream");
break;
},
Err(e) => return Err(to_error(e)),
}
}
let chunk = chunk.map_err(to_error)?;
write_stream.write_all(&chunk).await.map_err(to_error)?;
};
Ok("OK")
}

/// get latest knowns snapshot num
async fn head_snapshot(
State(state): State<AppState>,
Path(_domain): Path<String>,
) -> Result<impl response::IntoResponse, String> {
let journal = state.journal.lock().await;
let snapshot_id = match journal.current_snapshot() {
Some(v) => format!("{v}"),
None => "".into(),
};
let headers = response::AppendHeaders([("x-snapshot-id", snapshot_id)]);
) -> Result<impl response::IntoResponse, StatusCode> {
let res = tokio::task::spawn_blocking(move ||{
let journal = Journal::try_from(state.journal_path)
.or_else(|_e| Journal::create(state.journal_path))?;
Ok::<_, journal::Error>(journal.get_header().snapshot_counter)
});
let snapshot_id = res.await.map_err(to_error)?.map_err(to_error)?;
let headers = response::AppendHeaders([("x-snapshot-id", snapshot_id.to_string())]);
Ok((headers, "head"))
}

Expand All @@ -102,30 +69,23 @@ async fn get_snapshot(
State(state): State<AppState>,
Path(_domain): Path<String>,
params: Option<Query<Params>>,
) -> Result<impl response::IntoResponse, String> {
let snapshot_id: u64 = params.unwrap_or_default().snapshot_id;
let mut journal = state.journal.lock().await;
let iter = journal
.into_iter()
.skip_snapshots(snapshot_id);
let mut buf = vec![];
Stream::new(iter).read_to_end(&mut buf).map_err(to_error)?;
Ok(buf)
) -> Result<impl response::IntoResponse, StatusCode> {
let stream = AsyncReadJournalStream::new(
state.journal_path,
params.map(|p| p.snapshot_id).unwrap_or(0)
).spawn();
Ok(body::StreamBody::new(tokio_util::io::ReaderStream::new(stream)))
}

#[derive(Debug, Clone)]
struct AppState {
journal: Arc<Mutex<journal::Journal>>,
journal_path: &'static str
}

impl AppState {
fn new() -> Self {
let journal_path = "/tmp/journal";
let journal = Journal::try_from(journal_path)
.or_else(|_e| Journal::create(journal_path))
.unwrap();
Self {
journal: Arc::new(Mutex::new(journal)),
journal_path: "/tmp/journal"
}
}
}
Expand Down
Loading

0 comments on commit d243ef5

Please sign in to comment.