Skip to content

Commit

Permalink
refactor parallel execution, add parallel walker tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chrissimpkins committed Dec 17, 2023
1 parent 2bd40ef commit 722fca2
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 158 deletions.
58 changes: 58 additions & 0 deletions src/lib/stdstreams.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use std::io::Write;
use std::path::{Path, MAIN_SEPARATOR_STR};
use std::sync::OnceLock;

use crate::args::Args;
use colored::*;

static COLORED_SEPARATOR_STR: OnceLock<ColoredString> = OnceLock::new();

#[inline(always)]
pub fn write_stdout<T, U>(filesize: T, filepath: U) -> Result<(), std::io::Error>
Expand All @@ -9,3 +16,54 @@ where
writeln!(std::io::stdout(), "{}\t{}", filesize, filepath)?;
Ok(())
}

#[inline(always)]
pub fn format_print_file(
args: &Args,
filesize: &u64,
filepath: &Path,
metric_size_formatter: impl Fn(u64) -> String,
binary_size_formatter: impl Fn(u64) -> String,
) -> Result<(), std::io::Error> {
// exclude directories, filter on files only
if args.color {
let fmt_filepath = match filepath.parent() {
Some(ppath) => match filepath.file_name() {
Some(fpath) => {
format!(
"{}{}{}",
ppath.to_string_lossy().blue(),
COLORED_SEPARATOR_STR.get_or_init(|| MAIN_SEPARATOR_STR.blue()),
fpath.to_string_lossy()
)
}
None => format!("{}", ppath.to_string_lossy().blue()),
},
None => String::from(""),
};

let fmt_filesize: String;
if args.metric_units {
fmt_filesize = format!("{:>9}", metric_size_formatter(*filesize));
write_stdout(&fmt_filesize, &fmt_filepath)?;
} else if args.binary_units {
fmt_filesize = format!("{:>10}", binary_size_formatter(*filesize));
write_stdout(&fmt_filesize, &fmt_filepath)?;
} else {
write_stdout(filesize, &fmt_filepath)?;
}
} else if args.metric_units {
write_stdout(
format!("{:>9}", metric_size_formatter(*filesize)),
filepath.display(),
)?;
} else if args.binary_units {
write_stdout(
format!("{:>10}", binary_size_formatter(*filesize)),
filepath.display(),
)?;
} else {
write_stdout(filesize, filepath.display())?;
}
Ok(())
}
193 changes: 137 additions & 56 deletions src/lib/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::{Error, Result};
use ignore::{overrides::OverrideBuilder, WalkBuilder};

use crate::args::Args;
use crate::stdstreams::format_print_file;

pub struct Walker {
walker: ignore::Walk,
Expand Down Expand Up @@ -77,7 +78,7 @@ impl Iterator for FileWalker {
#[inline(always)]
fn next(&mut self) -> Option<Self::Item> {
match self.walker.by_ref().find(|entry| match entry {
// filter on file paths only, exclude directory paths
// filter on file paths only, exclude all directory paths
Ok(entry) => entry.path().is_file(),
Err(_) => false,
}) {
Expand Down Expand Up @@ -125,17 +126,77 @@ impl ParallelWalker {
walker: walker.build_parallel(),
})
}

pub fn print_files(
self,
args: &Args,
metric_size_formatter: impl Fn(u64) -> String + Send + std::marker::Sync,
binary_size_formatter: impl Fn(u64) -> String + Send + std::marker::Sync,
) -> Result<()> {
self.walker.run(|| {
Box::new(|entry| match entry {
Ok(entry) => {
// filter on file paths only, exclude all directory paths
if entry.path().is_file() {
match entry.metadata() {
Ok(metadata) => match format_print_file(
args,
&metadata.len(),
entry.path(),
&metric_size_formatter,
&binary_size_formatter,
) {
Ok(_) => ignore::WalkState::Continue,
Err(err) => {
let mut walk_state = ignore::WalkState::Quit;
let aerr = anyhow::Error::new(err);
let mut broken_pipe_error = false;
for cause in aerr.chain() {
if let Some(ioerr) = cause.downcast_ref::<std::io::Error>()
{
if ioerr.kind() == std::io::ErrorKind::BrokenPipe {
walk_state = ignore::WalkState::Continue;
broken_pipe_error = true;
break;
}
}
}
if !broken_pipe_error {
eprintln!("Error printing to standard output: {}", aerr);
}
walk_state
}
},
Err(e) => {
eprintln!("Error reading metadata: {}", e);
ignore::WalkState::Quit
}
}
} else {
// is a directory, not a file
// continue
ignore::WalkState::Continue
}
}
Err(e) => {
eprintln!("Error reading entry: {}", e);
ignore::WalkState::Quit
}
})
});
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
// use ignore::{DirEntry, WalkParallel, WalkState};
use ignore::{DirEntry, WalkParallel, WalkState};
use pretty_assertions::assert_eq;
use std::fs::File;
use std::io::Write;
use std::path::Path;
// use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex};
use tempfile::TempDir;

fn mk_args(
Expand Down Expand Up @@ -190,23 +251,6 @@ mod tests {
paths
}

// fn walk_collect(prefix: &Path, args: &Args) -> Result<Vec<String>> {
// let mut paths = vec![];
// for result in Walker::new(args)? {
// let dirent = match result {
// Err(_) => continue,
// Ok(dirent) => dirent,
// };
// let path = dirent.path().strip_prefix(prefix).unwrap();
// if path.as_os_str().is_empty() {
// continue;
// }
// paths.push(normalize_path(path.to_str().unwrap()));
// }

// Ok(paths)
// }

fn walk_file_collect(prefix: &Path, args: &Args) -> Result<Vec<String>> {
let mut paths = vec![];
for result in FileWalker::new(args)? {
Expand Down Expand Up @@ -241,38 +285,38 @@ mod tests {
Ok(paths)
}

// fn walk_collect_parallel(prefix: &Path, args: &Args) -> Result<Vec<String>> {
// let mut paths = vec![];
// for dirent in walk_collect_entries_parallel(ParallelWalker::new(args)?.walker) {
// let path = dirent.path().strip_prefix(prefix).unwrap();
// if path.as_os_str().is_empty() {
// continue;
// }
// paths.push(normalize_path(path.to_str().unwrap()));
// }
// // sort the paths before returning in order
// // in order to be able to test. This represents
// // an artificial order in the results, but we will
// // still be able to confirm we have complete results
// paths.sort();
// Ok(paths)
// }
fn walk_collect_parallel(prefix: &Path, args: &Args) -> Result<Vec<String>> {
let mut paths = vec![];
for dirent in walk_collect_entries_parallel(ParallelWalker::new(args)?.walker) {
let path = dirent.path().strip_prefix(prefix).unwrap();
if path.as_os_str().is_empty() {
continue;
}
paths.push(normalize_path(path.to_str().unwrap()));
}
// sort the paths before returning in order
// in order to be able to test. This represents
// an artificial order in the results, but we will
// still be able to confirm we have complete results
paths.sort();
Ok(paths)
}

// fn walk_collect_entries_parallel(par_walker: WalkParallel) -> Vec<DirEntry> {
// let dirents = Arc::new(Mutex::new(vec![]));
// par_walker.run(|| {
// let dirents = dirents.clone();
// Box::new(move |result| {
// if let Ok(dirent) = result {
// dirents.lock().unwrap().push(dirent);
// }
// WalkState::Continue
// })
// });

// let dirents = dirents.lock().unwrap();
// dirents.to_vec()
// }
fn walk_collect_entries_parallel(par_walker: WalkParallel) -> Vec<DirEntry> {
let dirents = Arc::new(Mutex::new(vec![]));
par_walker.run(|| {
let dirents = dirents.clone();
Box::new(move |result| {
if let Ok(dirent) = result {
dirents.lock().unwrap().push(dirent);
}
WalkState::Continue
})
});

let dirents = dirents.lock().unwrap();
dirents.to_vec()
}

// fn assert_paths_sequential(prefix: &Path, args: &Args, expected: &[&str]) -> Result<()> {
// let got = walk_collect(prefix, args)?;
Expand All @@ -296,11 +340,11 @@ mod tests {
Ok(())
}

// fn assert_paths_parallel(prefix: &Path, args: &Args, expected: &[&str]) -> Result<()> {
// let got = walk_collect_parallel(prefix, args)?;
// assert_eq!(got, mkpaths(expected), "parallel");
// Ok(())
// }
fn assert_paths_parallel_sorted(prefix: &Path, args: &Args, expected: &[&str]) -> Result<()> {
let got = walk_collect_parallel(prefix, args)?;
assert_eq!(got, mkpaths(expected), "parallel");
Ok(())
}

// ==================
// Default execution
Expand Down Expand Up @@ -334,6 +378,24 @@ mod tests {
],
)?;

assert_paths_parallel_sorted(
td.path(),
&args,
&[
"a",
"a/b",
"a/b/ack.js",
"a/b/c",
"a/b/foo.txt",
"a/b/zip.py",
"a/b/zoo.py",
"a/b/zoo.txt",
"y",
"y/z",
"y/z/foo.md",
],
)?;

Ok(())
}

Expand Down Expand Up @@ -406,6 +468,25 @@ mod tests {
],
)?;

assert_paths_parallel_sorted(
td.path(),
&args,
&[
"a",
"a/b",
"a/b/.hide.txt", // here is the hidden file
"a/b/ack.js",
"a/b/c",
"a/b/foo.txt",
"a/b/zip.py",
"a/b/zoo.py",
"a/b/zoo.txt",
"y",
"y/z",
"y/z/foo.md",
],
)?;

Ok(())
}

Expand Down
Loading

0 comments on commit 722fca2

Please sign in to comment.