From 9c1f27bf27841bce59a368e15014d2393459261f Mon Sep 17 00:00:00 2001 From: Jan-Erik Rediger Date: Fri, 24 Mar 2023 13:34:58 +0100 Subject: [PATCH] Introduce a recovery strategy as a replacement for set_discard_if_corrupted The user can now choose between discarding corrupted data, moving it out of the way (into another file) and starting with an empty database or bubbling up the error. Fixes #234 --- src/backend/common.rs | 13 ++++ src/backend/impl_lmdb/environment.rs | 4 +- src/backend/impl_safe/environment.rs | 49 +++++++++----- src/backend/traits.rs | 5 +- tests/manager.rs | 98 +++++++++++++++++++++++++++- 5 files changed, 149 insertions(+), 20 deletions(-) diff --git a/src/backend/common.rs b/src/backend/common.rs index bea3839d..89b091b2 100644 --- a/src/backend/common.rs +++ b/src/backend/common.rs @@ -42,3 +42,16 @@ pub enum WriteFlags { APPEND, APPEND_DUP, } + +/// Strategy to use when corrupted data is detected while opening a database. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum RecoveryStrategy { + /// Bubble up the error on detecting a corrupted data file. The default. + Error, + + /// Discard the corrupted data and start with an empty database. + Discard, + + /// Move the corrupted data file to `$file.corrupt` and start with an empty database. + Rename, +} diff --git a/src/backend/impl_lmdb/environment.rs b/src/backend/impl_lmdb/environment.rs index e528fad0..a8a72ae8 100644 --- a/src/backend/impl_lmdb/environment.rs +++ b/src/backend/impl_lmdb/environment.rs @@ -19,6 +19,7 @@ use super::{ DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl, RoTransactionImpl, RwTransactionImpl, StatImpl, }; +use crate::backend::common::RecoveryStrategy; use crate::backend::traits::{ BackendEnvironment, BackendEnvironmentBuilder, BackendInfo, BackendIter, BackendRoCursor, BackendRoCursorTransaction, BackendStat, @@ -86,7 +87,8 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl { self } - fn set_discard_if_corrupted(&mut self, _discard_if_corrupted: bool) -> &mut Self { + /// **UNIMPLEMENTED.** Will panic at runtime. + fn set_corruption_recovery_strategy(&mut self, _strategy: RecoveryStrategy) -> &mut Self { // Unfortunately, when opening a database, LMDB doesn't handle all the ways it could have // been corrupted. Prefer using the `SafeMode` backend if this is important. unimplemented!(); diff --git a/src/backend/impl_safe/environment.rs b/src/backend/impl_safe/environment.rs index 17cfb87f..f9c44cf6 100644 --- a/src/backend/impl_safe/environment.rs +++ b/src/backend/impl_safe/environment.rs @@ -24,9 +24,11 @@ use super::{ database::Database, DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl, RoTransactionImpl, RwTransactionImpl, StatImpl, }; +use crate::backend::common::RecoveryStrategy; use crate::backend::traits::{BackendEnvironment, BackendEnvironmentBuilder}; const DEFAULT_DB_FILENAME: &str = "data.safe.bin"; +const DEFAULT_CORRUPT_DB_EXTENSION: &str = "bin.corrupt"; type DatabaseArena = Arena; type DatabaseNameMap = HashMap, DatabaseImpl>; @@ -38,7 +40,7 @@ pub struct EnvironmentBuilderImpl { max_dbs: Option, map_size: Option, make_dir_if_needed: bool, - discard_if_corrupted: bool, + corruption_recovery_strategy: RecoveryStrategy, } impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl { @@ -53,7 +55,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl { max_dbs: None, map_size: None, make_dir_if_needed: false, - discard_if_corrupted: false, + corruption_recovery_strategy: RecoveryStrategy::Error, } } @@ -85,8 +87,8 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl { self } - fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self { - self.discard_if_corrupted = discard_if_corrupted; + fn set_corruption_recovery_strategy(&mut self, strategy: RecoveryStrategy) -> &mut Self { + self.corruption_recovery_strategy = strategy; self } @@ -106,7 +108,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl { self.max_dbs, self.map_size, )?; - env.read_from_disk(self.discard_if_corrupted)?; + env.read_from_disk(self.corruption_recovery_strategy)?; Ok(env) } } @@ -152,16 +154,32 @@ impl EnvironmentImpl { Ok(bincode::serialize(&data)?) } - fn deserialize( - bytes: &[u8], - discard_if_corrupted: bool, + fn load( + path: &Path, + strategy: RecoveryStrategy, ) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> { + let bytes = fs::read(path)?; + + match Self::deserialize(&bytes) { + Ok((arena, name_map)) => Ok((arena, name_map)), + Err(err) => match strategy { + RecoveryStrategy::Error => Err(err), + RecoveryStrategy::Discard => Ok((DatabaseArena::new(), HashMap::new())), + RecoveryStrategy::Rename => { + let corrupted_path = path.with_extension(DEFAULT_CORRUPT_DB_EXTENSION); + fs::rename(path, corrupted_path)?; + + Ok((DatabaseArena::new(), HashMap::new())) + } + }, + } + } + + fn deserialize(bytes: &[u8]) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> { let mut arena = DatabaseArena::new(); let mut name_map = HashMap::new(); - let data: HashMap<_, _> = match bincode::deserialize(bytes) { - Err(_) if discard_if_corrupted => Ok(HashMap::new()), - result => result, - }?; + let data: HashMap<_, _> = bincode::deserialize(bytes)?; + for (name, db) in data { name_map.insert(name, DatabaseImpl(arena.alloc(db))); } @@ -199,7 +217,7 @@ impl EnvironmentImpl { }) } - pub(crate) fn read_from_disk(&mut self, discard_if_corrupted: bool) -> Result<(), ErrorImpl> { + pub(crate) fn read_from_disk(&mut self, strategy: RecoveryStrategy) -> Result<(), ErrorImpl> { let mut path = Cow::from(&self.path); if fs::metadata(&path)?.is_dir() { path.to_mut().push(DEFAULT_DB_FILENAME); @@ -207,7 +225,7 @@ impl EnvironmentImpl { if fs::metadata(&path).is_err() { return Ok(()); }; - let (arena, name_map) = Self::deserialize(&fs::read(&path)?, discard_if_corrupted)?; + let (arena, name_map) = Self::load(&path, strategy)?; self.dbs = RwLock::new(EnvironmentDbs { arena, name_map }); Ok(()) } @@ -272,7 +290,8 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl { // TOOD: don't reallocate `name`. let key = name.map(String::from); let mut dbs = self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError)?; - if dbs.name_map.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name.is_some() { + if dbs.name_map.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name.is_some() + { return Err(ErrorImpl::DbsFull); } let parts = EnvironmentDbsRefMut::from(dbs.deref_mut()); diff --git a/src/backend/traits.rs b/src/backend/traits.rs index 1dda15a8..ac5d1e9e 100644 --- a/src/backend/traits.rs +++ b/src/backend/traits.rs @@ -14,7 +14,7 @@ use std::{ }; use crate::{ - backend::common::{DatabaseFlags, EnvironmentFlags, WriteFlags}, + backend::common::{DatabaseFlags, EnvironmentFlags, RecoveryStrategy, WriteFlags}, error::StoreError, }; @@ -83,7 +83,8 @@ pub trait BackendEnvironmentBuilder<'b>: Debug + Eq + PartialEq + Copy + Clone { fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self; - fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self; + /// Set the corruption recovery strategy. See [`RecoveryStrategy`] for details. + fn set_corruption_recovery_strategy(&mut self, strategy: RecoveryStrategy) -> &mut Self; fn open(&self, path: &Path) -> Result; } diff --git a/tests/manager.rs b/tests/manager.rs index fe6b6cd0..78b47717 100644 --- a/tests/manager.rs +++ b/tests/manager.rs @@ -15,7 +15,7 @@ use tempfile::Builder; #[cfg(feature = "lmdb")] use rkv::backend::{Lmdb, LmdbEnvironment}; use rkv::{ - backend::{BackendEnvironmentBuilder, SafeMode, SafeModeEnvironment}, + backend::{BackendEnvironmentBuilder, RecoveryStrategy, SafeMode, SafeModeEnvironment}, CloseOptions, Rkv, StoreOptions, Value, }; @@ -247,7 +247,7 @@ fn test_safe_mode_corrupt_while_open_1() { // But we can use a builder and pass `discard_if_corrupted` to deal with it. let mut builder = Rkv::environment_builder::(); - builder.set_discard_if_corrupted(true); + builder.set_corruption_recovery_strategy(RecoveryStrategy::Discard); manager .get_or_create_from_builder(root.path(), builder, Rkv::from_builder::) .expect("created"); @@ -378,3 +378,97 @@ fn test_safe_mode_corrupt_while_open_2() { Some(Value::Str("byé, yöu")) ); } + +/// Test how the manager can discard corrupted databases, while moving the corrupted one aside for +/// later inspection. +#[test] +fn test_safe_mode_corrupt_while_open_3() { + type Manager = rkv::Manager; + + let root = Builder::new() + .prefix("test_safe_mode_corrupt_while_open_3") + .tempdir() + .expect("tempdir"); + fs::create_dir_all(root.path()).expect("dir created"); + + // Verify it was flushed to disk. + let mut safebin = root.path().to_path_buf(); + safebin.push("data.safe.bin"); + + // Oops, corruption. + fs::write(&safebin, "bogus").expect("dbfile corrupted"); + assert!(safebin.exists()); + + // Create environment. + let mut manager = Manager::singleton().write().unwrap(); + + // Recreating environment fails. + manager + .get_or_create(root.path(), Rkv::new::) + .expect_err("not created"); + assert!(manager.get(root.path()).expect("success").is_none()); + + // But we can use a builder and pass `RecoveryStrategy::Rename` to deal with it. + let mut builder = Rkv::environment_builder::(); + builder.set_corruption_recovery_strategy(RecoveryStrategy::Rename); + manager + .get_or_create_from_builder(root.path(), builder, Rkv::from_builder::) + .expect("created"); + assert!(manager.get(root.path()).expect("success").is_some()); + + // Database file was moved out of the way. + assert!(!safebin.exists()); + + let mut corruptbin = root.path().to_path_buf(); + corruptbin.push("data.safe.bin.corrupt"); + assert!(corruptbin.exists()); + + let shared_env = manager + .get_or_create(root.path(), Rkv::new::) + .expect("created"); + let env = shared_env.read().unwrap(); + + // Writing still works. + let store = env + .open_single("store", StoreOptions::create()) + .expect("opened"); + + // Nothing to be read. + let reader = env.read().expect("reader"); + assert_eq!(store.get(&reader, "foo").expect("read"), None); + + // We can write. + let mut writer = env.write().expect("writer"); + store + .put(&mut writer, "foo", &Value::I64(5678)) + .expect("wrote"); + writer.commit().expect("committed"); + env.sync(true).expect("synced"); + + // Database file exists. + assert!(safebin.exists()); + + // Close everything. + drop(env); + drop(shared_env); + manager + .try_close(root.path(), CloseOptions::default()) + .expect("closed without deleting"); + assert!(manager.get(root.path()).expect("success").is_none()); + + // Recreate environment. + let shared_env = manager + .get_or_create(root.path(), Rkv::new::) + .expect("created"); + let env = shared_env.read().unwrap(); + + // Verify that the dbfile is not corrupted. + let store = env + .open_single("store", StoreOptions::default()) + .expect("opened"); + let reader = env.read().expect("reader"); + assert_eq!( + store.get(&reader, "foo").expect("read"), + Some(Value::I64(5678)) + ); +}