Skip to content

Commit

Permalink
Always re-read journal header when adding new/existing snapshot (#27)
Browse files Browse the repository at this point in the history
* Always re-read journal header on new snapshot

* Add test

* Force threads to sleep
  • Loading branch information
dmzmk authored Feb 10, 2023
1 parent 0c993a7 commit 974a386
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 4 deletions.
18 changes: 15 additions & 3 deletions journal/src/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,22 @@ impl<F: Read + Write + Seek> Journal<F> {
})
}

/// Initiate snapshot
/// Initiate new snapshot
///
/// * udpate journal header to correctly setup offset
/// * to initiate snapshot we seek to current end of the file (value stored in header)
/// * switch fd to buffered mode
/// * write snapshot header with current header counter number
pub fn new_snapshot(&mut self) -> Result<()> {
if self.page_count.is_some() {
return Ok(());
}
self.update_header()?;
let snapshot_header = SnapshotHeader::new(
self.header.snapshot_counter,
chrono::Utc::now().timestamp_micros(),
);
self.add_snapshot(&snapshot_header)
self.write_snapshot(&snapshot_header)
}

/// Add new sqlite page
Expand All @@ -180,8 +182,18 @@ impl<F: Read + Write + Seek> Journal<F> {
self.add_page(&page_header, page)
}

/// Add snapshot
/// Add existing snapshot
///
/// Re-syncs journal header
pub fn add_snapshot(&mut self, snapshot_header: &SnapshotHeader) -> Result<()> {
self.update_header()?;
self.write_snapshot(snapshot_header)
}

/// Write snapshot to journal
///
/// This function assumes journal header is up to date
fn write_snapshot(&mut self, snapshot_header: &SnapshotHeader) -> Result<()> {
if snapshot_header.id != self.header.snapshot_counter {
return Err(Error::OutOfOrderSnapshot {
snapshot_id: snapshot_header.id,
Expand Down
143 changes: 142 additions & 1 deletion journal/tests/journal_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use block::Block;
use journal::{Header, Journal, Protocol, Stream};
use quickcheck::{quickcheck, Arbitrary, Gen, TestResult};
use std::io::{Cursor, Read, Write};
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, Mutex};
use std::time::Duration;

#[test]
fn test_journal_not_exists() {
Expand Down Expand Up @@ -312,3 +314,142 @@ fn test_journal_rebuild_from_stream() {
}
quickcheck(check as fn(Vec<TestSnapshot>, XorShift));
}

#[derive(Debug)]
struct ShareableCursor {
cur: Arc<Mutex<Cursor<Vec<u8>>>>,
}

impl ShareableCursor {
fn new() -> Self {
Self {
cur: Arc::new(Mutex::new(Cursor::new(vec![]))),
}
}
}

impl Clone for ShareableCursor {
fn clone(&self) -> Self {
Self {
cur: Arc::clone(&self.cur),
}
}
}

impl Read for ShareableCursor {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.cur.lock().unwrap().read(buf)
}
}

impl Write for ShareableCursor {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.cur.lock().unwrap().write(buf)
}

fn flush(&mut self) -> std::io::Result<()> {
self.cur.lock().unwrap().flush()
}
}

impl Seek for ShareableCursor {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.cur.lock().unwrap().seek(pos)
}
}

// Test journal ability to work concurrently on same underlying IO resource
#[test]
fn test_journal_concurrent_updates() {
fn check(size: usize, prng: XorShift) -> TestResult {
// limit max number of snapshots
let size = (size % 1000).max(1);
let file = ShareableCursor::new();

let journal_1 = &mut Journal::new(Header::default(), file.clone(), None).unwrap();
let journal_2 = &mut Journal::new(Header::default(), file, None).unwrap();
let lock = Mutex::new(());

let snapshots = (0..size).map(|s| vec![0; s + 1]).collect::<Vec<Vec<u8>>>();
let (s1, s2) = snapshots.as_slice().split_at(snapshots.len() / 2);

let prng = Mutex::new(prng);

// test concurrent snapshot creation
std::thread::scope(|s| {
s.spawn(|| {
s1.iter().for_each(|page| {
let guard = lock.lock().unwrap();
journal_1
.new_page(page.len() as u64, page.as_slice())
.unwrap();
journal_1.commit().unwrap();
drop(guard);
std::thread::sleep(Duration::from_micros(prng.lock().unwrap().next() % 10));
});
});
s.spawn(|| {
s2.iter().for_each(|page| {
let guard = lock.lock().unwrap();
journal_2
.new_page(page.len() as u64, page.as_slice())
.unwrap();
journal_2.commit().unwrap();
drop(guard);
std::thread::sleep(Duration::from_micros(prng.lock().unwrap().next() % 10));
});
});
});

assert!(journal_1
.into_iter()
.zip(journal_2.into_iter())
.all(|(left, right)| left.unwrap() == right.unwrap()));

// test concurrent snapshot addition
let file_re = ShareableCursor::new();
let journal_1_re = &mut Journal::new(Header::default(), file_re.clone(), None).unwrap();
let journal_2_re = &mut Journal::new(Header::default(), file_re, None).unwrap();

let iter = Mutex::new(journal_1.into_iter());
std::thread::scope(|s| {
s.spawn(|| loop {
let mut i = iter.lock().unwrap();
if let Some(res) = i.next() {
let (snapshot_h, page_h, page) = res.unwrap();
journal_1_re.add_snapshot(&snapshot_h).unwrap();
journal_1_re.add_page(&page_h, page.as_slice()).unwrap();
journal_1_re.commit().unwrap();
} else {
break;
}
drop(i);
std::thread::sleep(Duration::from_micros(prng.lock().unwrap().next() % 10));
});
s.spawn(|| loop {
let mut i = iter.lock().unwrap();
if let Some(res) = i.next() {
let (snapshot_h, page_h, page) = res.unwrap();
journal_2_re.add_snapshot(&snapshot_h).unwrap();
journal_2_re.add_page(&page_h, page.as_slice()).unwrap();
journal_2_re.commit().unwrap();
} else {
break;
}
drop(i);
std::thread::sleep(Duration::from_micros(prng.lock().unwrap().next() % 10));
});
});
assert!(journal_1
.into_iter()
.zip(journal_1_re.into_iter())
.all(|(left, right)| left.unwrap() == right.unwrap()));
assert!(journal_1_re
.into_iter()
.zip(journal_2_re.into_iter())
.all(|(left, right)| left.unwrap() == right.unwrap()));

TestResult::from_bool(true)
}
quickcheck(check as fn(usize, XorShift) -> TestResult)
}

0 comments on commit 974a386

Please sign in to comment.