Skip to content

Commit

Permalink
feat: Add read-data-subset to CheckOptions; allow to check given trees (
Browse files Browse the repository at this point in the history
#262)

Adds `Repository::check_with_trees` which allows to do tree and data
check only for the given trees.

Moreover, `CheckOptions` now have the `read_data_subset` option which
allows to read only parts of the pack files.

Also warming up of packfiles in case of hot/cold respository has been
added.

closes rustic-rs/rustic#251

---------

Co-authored-by: simonsan <[email protected]>
  • Loading branch information
aawsome and simonsan authored Sep 30, 2024
1 parent a37bec4 commit ff13bef
Show file tree
Hide file tree
Showing 9 changed files with 874 additions and 55 deletions.
280 changes: 231 additions & 49 deletions crates/core/src/commands/check.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,121 @@
//! `check` subcommand
use std::collections::HashMap;
use std::{
collections::{BTreeSet, HashMap},
str::FromStr,
};

use bytes::Bytes;
use bytesize::ByteSize;
use derive_setters::Setters;
use itertools::Itertools;
use log::{debug, error, warn};
use rayon::prelude::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use rand::{prelude::SliceRandom, thread_rng, Rng};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use zstd::stream::decode_all;

use crate::{
backend::{cache::Cache, decrypt::DecryptReadBackend, node::NodeType, FileType, ReadBackend},
blob::{tree::TreeStreamerOnce, BlobId, BlobType},
crypto::hasher::hash,
error::{RusticErrorKind, RusticResult},
error::{CommandErrorKind, RusticErrorKind, RusticResult},
id::Id,
index::{
binarysorted::{IndexCollector, IndexType},
GlobalIndex, ReadGlobalIndex,
},
progress::{Progress, ProgressBars},
repofile::{
packfile::PackId, IndexFile, IndexPack, PackHeader, PackHeaderLength, PackHeaderRef,
SnapshotFile,
},
repository::{Open, Repository},
TreeId,
};

#[derive(Clone, Copy, Debug, Default)]
/// Options to specify which subset of packs will be read
pub enum ReadSubsetOption {
#[default]
/// Read all pack files
All,
/// Read a random subset of pack files with (approximately) the given percentage of total size
Percentage(f64),
/// Read a random subset of pack files with (approximately) the given size
Size(u64),
/// Read a subset of packfiles based on Ids: Using (1,n) .. (n,n) in separate runs will cover all pack files
IdSubSet((u32, u32)),
}

impl ReadSubsetOption {
fn apply(self, packs: impl IntoIterator<Item = IndexPack>) -> Vec<IndexPack> {
self.apply_with_rng(packs, &mut thread_rng())
}

fn apply_with_rng(
self,
packs: impl IntoIterator<Item = IndexPack>,
rng: &mut impl Rng,
) -> Vec<IndexPack> {
fn id_matches_n_m(id: &Id, n: u32, m: u32) -> bool {
id.as_u32() % m == n % m
}

let mut total_size: u64 = 0;
let mut packs: Vec<_> = packs
.into_iter()
.inspect(|p| total_size += u64::from(p.pack_size()))
.collect();

// Apply read-subset option
if let Some(mut size) = match self {
Self::All => None,
// we need some casts to compute percentage...
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::cast_sign_loss)]
Self::Percentage(p) => Some((total_size as f64 * p / 100.0) as u64),
Self::Size(s) => Some(s),
Self::IdSubSet((n, m)) => {
packs.retain(|p| id_matches_n_m(&p.id, n, m));
None
}
} {
// random subset of given size is required
packs.shuffle(rng);
packs.retain(|p| {
let p_size = u64::from(p.pack_size());
if size > p_size {
size = size.saturating_sub(p_size);
true
} else {
false
}
});
}
packs
}
}

impl FromStr for ReadSubsetOption {
type Err = CommandErrorKind;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let result = if s == "all" {
Self::All
} else if let Some(p) = s.strip_suffix('%') {
// try to read percentage
Self::Percentage(p.parse()?)
} else if let Some((n, m)) = s.split_once('/') {
// try to read n/m
Self::IdSubSet((n.parse()?, m.parse()?))
} else {
Self::Size(
ByteSize::from_str(s)
.map_err(CommandErrorKind::FromByteSizeParser)?
.as_u64(),
)
};
Ok(result)
}
}

#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[derive(Clone, Copy, Debug, Default, Setters)]
#[setters(into)]
Expand All @@ -34,9 +125,13 @@ pub struct CheckOptions {
#[cfg_attr(feature = "clap", clap(long, conflicts_with = "no_cache"))]
pub trust_cache: bool,

/// Read all data blobs
/// Also read and check pack files
#[cfg_attr(feature = "clap", clap(long))]
pub read_data: bool,

/// Read and check pack files
#[cfg_attr(feature = "clap", clap(long, default_value = "all"))]
pub read_data_subset: ReadSubsetOption,
}

impl CheckOptions {
Expand All @@ -54,7 +149,11 @@ impl CheckOptions {
/// # Errors
///
/// If the repository is corrupted
pub(crate) fn run<P: ProgressBars, S: Open>(self, repo: &Repository<P, S>) -> RusticResult<()> {
pub(crate) fn run<P: ProgressBars, S: Open>(
self,
repo: &Repository<P, S>,
trees: Vec<TreeId>,
) -> RusticResult<()> {
let be = repo.dbe();
let cache = repo.cache();
let hot_be = &repo.be_hot;
Expand Down Expand Up @@ -105,37 +204,32 @@ impl CheckOptions {
}
}

let total_pack_size: u64 = index_collector
.data_packs()
.iter()
.map(|(_, size)| u64::from(*size))
.sum::<u64>()
+ index_collector
.tree_packs()
.iter()
.map(|(_, size)| u64::from(*size))
.sum::<u64>();

let index_be = GlobalIndex::new_from_index(index_collector.into_index());

check_snapshots(be, &index_be, pb)?;
let packs = check_trees(be, &index_be, trees, pb)?;

if self.read_data {
let packs = index_be
.into_index()
.into_iter()
.filter(|p| packs.contains(&p.id));

let packs = self.read_data_subset.apply(packs);

repo.warm_up_wait(packs.iter().map(|pack| pack.id))?;

let total_pack_size = packs.iter().map(|pack| u64::from(pack.pack_size())).sum();
let p = pb.progress_bytes("reading pack data...");
p.set_length(total_pack_size);

index_be
.into_index()
.into_iter()
.par_bridge()
.for_each(|pack| {
let id = pack.id;
let data = be.read_full(FileType::Pack, &id).unwrap();
match check_pack(be, pack, data, &p) {
Ok(()) => {}
Err(err) => error!("Error reading pack {id} : {err}",),
}
});
packs.into_par_iter().for_each(|pack| {
let id = pack.id;
let data = be.read_full(FileType::Pack, &id).unwrap();
match check_pack(be, pack, data, &p) {
Ok(()) => {}
Err(err) => error!("Error reading pack {id} : {err}",),
}
});
p.finish();
}
Ok(())
Expand Down Expand Up @@ -375,19 +469,13 @@ fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap<PackId, u32>) -> R
/// # Errors
///
/// If a snapshot or tree is missing or has a different size
fn check_snapshots(
fn check_trees(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
snap_trees: Vec<TreeId>,
pb: &impl ProgressBars,
) -> RusticResult<()> {
let p = pb.progress_counter("reading snapshots...");
let snap_trees: Vec<_> = be
.stream_all::<SnapshotFile>(&p)?
.iter()
.map_ok(|(_, snap)| snap.tree)
.try_collect()?;
p.finish();

) -> RusticResult<BTreeSet<PackId>> {
let mut packs = BTreeSet::new();
let p = pb.progress_counter("checking trees...");
let mut tree_streamer = TreeStreamerOnce::new(be, index, snap_trees, p)?;
while let Some(item) = tree_streamer.next().transpose()? {
Expand All @@ -404,12 +492,17 @@ fn check_snapshots(
error!("file {:?} blob {} has null ID", path.join(node.name()), i);
}

if !index.has_data(id) {
error!(
"file {:?} blob {} is missing in index",
path.join(node.name()),
id
);
match index.get_data(id) {
None => {
error!(
"file {:?} blob {} is missing in index",
path.join(node.name()),
id
);
}
Some(entry) => {
_ = packs.insert(entry.pack);
}
}
}
},
Expand All @@ -423,7 +516,18 @@ fn check_snapshots(
Some(tree) if tree.is_null() => {
error!("dir {:?} subtree has null ID", path.join(node.name()));
}
_ => {} // subtree is ok
Some(id) => match index.get_tree(&id) {
None => {
error!(
"dir {:?} subtree blob {} is missing in index",
path.join(node.name()),
id
);
}
Some(entry) => {
_ = packs.insert(entry.pack);
}
}, // subtree is ok
}
}

Expand All @@ -432,7 +536,7 @@ fn check_snapshots(
}
}

Ok(())
Ok(packs)
}

/// Check if a pack is valid
Expand Down Expand Up @@ -519,3 +623,81 @@ fn check_pack(

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use insta::assert_ron_snapshot;
use rand::{rngs::StdRng, Rng, SeedableRng};
use rstest::{fixture, rstest};

const PACK_SIZE: u32 = 100_000_000;

#[fixture]
fn rng() -> StdRng {
StdRng::seed_from_u64(5)
}
fn test_packs(rng: &mut impl Rng) -> Vec<IndexPack> {
(0..500)
.map(|_| IndexPack {
id: PackId::from(Id::random_from_rng(rng)),
blobs: Vec::new(),
time: None,
size: Some(rng.gen_range(0..PACK_SIZE)),
})
.collect()
}

#[rstest]
#[case("all")]
#[case("5/12")]
#[case("5%")]
#[case("250MiB")]
fn test_read_subset(mut rng: StdRng, #[case] s: &str) {
let size =
|packs: &[IndexPack]| -> u64 { packs.iter().map(|p| u64::from(p.pack_size())).sum() };

let test_packs = test_packs(&mut rng);
let total_size = size(&test_packs);

let subset: ReadSubsetOption = s.parse().unwrap();
let packs = subset.apply_with_rng(test_packs, &mut rng);
let test_size = size(&packs);

match subset {
ReadSubsetOption::All => assert_eq!(test_size, total_size),
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::cast_sign_loss)]
ReadSubsetOption::Percentage(s) => assert!(test_size <= (total_size as f64 * s) as u64),
ReadSubsetOption::Size(size) => {
assert!(test_size <= size && size <= test_size + u64::from(PACK_SIZE));
}
ReadSubsetOption::IdSubSet(_) => {}
};

let ids: Vec<_> = packs.iter().map(|pack| (pack.id, pack.size)).collect();
assert_ron_snapshot!(s, ids);
}

fn test_read_subset_n_m() {
let test_packs = test_packs(&mut thread_rng());
let mut all_packs: BTreeSet<_> = test_packs.iter().map(|pack| pack.id).collect();

let mut run_with = |s: &str| {
let subset: ReadSubsetOption = s.parse().unwrap();
let packs = subset.apply(test_packs.clone());
for pack in packs {
assert!(all_packs.remove(&pack.id));
}
};

run_with("1/5");
run_with("2/5");
run_with("3/5");
run_with("4/5");
run_with("5/5");

assert!(all_packs.is_empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
source: crates/core/src/commands/check.rs
expression: ids
---
[
(Id("50ec36b55a9b35de779a2757571d2e4b16f6d1ca3ff73ade120901262c2b265d"), Some(25543042)),
(Id("d07f392b1dfebfdf50bb10a4a7857b543cbe246f84549af8902dfde9737fd425"), Some(16489215)),
(Id("f879520cb3e653a0c77dcc37dcd813a71ec7d972a4a6185b13f2ed240c6e72e1"), Some(3342498)),
(Id("8ba2cc8cf44054f35be413f12ba83969deb01f1c44660822cee1b960d69a7526"), Some(46887261)),
(Id("e6bae2c9c6b9d8b8a72b45590ffc6c8e034083d6a8180877f6d270537a1ac214"), Some(47685315)),
(Id("f77e206f69693ae3490de38ce00f5e89ae7db4808b770c60e07d64815ee0478d"), Some(9217773)),
(Id("20b51c8c49aff07d7063c76a863cbdcea845989ef79d4a3f8ff599687eaebe48"), Some(83677501)),
(Id("9630b7b1e6329e7c28eb0eeb4e0df36bbf45acf3ba5de4a0403b77e47216857a"), Some(24144078)),
(Id("33e053041d2de235e03cc219a8b8300d8f1e35ee034c45f4613ea782d5e672f2"), Some(2254122)),
(Id("7583c1099bf604771a03af7627f4122a59da07db7358484b8543e881a7939b3f"), Some(68829)),
(Id("a94f61701a165181c6940584ca0cd2c2355e5e1eb65a3a295fc4d1c02fa81138"), Some(2347337)),
(Id("5ae22a813d32049b56ac2760a5a34b8f66e30b5232f66bb8eae420c7022197e8"), Some(452910)),
]
Loading

0 comments on commit ff13bef

Please sign in to comment.