From 0d31a3d65ebca3692271a518a1d12baecbeeed6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 2 May 2023 16:51:39 +0200 Subject: [PATCH 1/7] zinniad: persist "jobs completed" across restarts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- Cargo.lock | 73 +++++++++++++++++++++++---- daemon/Cargo.toml | 2 + daemon/main.rs | 6 +++ daemon/state.rs | 99 +++++++++++++++++++++++++++++++++++++ daemon/station_reporter.rs | 59 ++++++++++++++++++++-- runtime/console_reporter.rs | 20 +++++--- 6 files changed, 238 insertions(+), 21 deletions(-) create mode 100644 daemon/state.rs diff --git a/Cargo.lock b/Cargo.lock index 80bd3891..037bb445 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -436,6 +436,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "debc29dde2e69f9e47506b525f639ed42300fc014a3e007832592448fa8e4599" +[[package]] +name = "atomicwrites" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1163d9d7c51de51a2b79d6df5e8888d11e9df17c752ce4a285fb6ca1580734e" +dependencies = [ + "rustix 0.37.7", + "tempfile", + "windows-sys 0.48.0", +] + [[package]] name = "atty" version = "0.2.14" @@ -1507,6 +1518,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "errno" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" +dependencies = [ + "errno-dragonfly", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "errno-dragonfly" version = "0.1.2" @@ -2181,7 +2203,7 @@ checksum = "8687c819457e979cc940d09cb16e42a1bf70aa6b60a549de6d3a62a0ee90c69e" dependencies = [ "hermit-abi 0.3.1", "io-lifetimes", - "rustix", + "rustix 0.36.10", "windows-sys 0.45.0", ] @@ -2704,6 +2726,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" +[[package]] +name = "linux-raw-sys" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b64f40e5e03e0d54f03845c8197d0291253cdbedfb1cb46b13c2c117554a9f4c" + [[package]] name = "lock_api" version = "0.4.9" @@ -3165,7 +3193,7 @@ dependencies = [ "cfg-if", "instant", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "smallvec", "winapi", ] @@ -3178,7 +3206,7 @@ checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "smallvec", "windows-sys 0.45.0", ] @@ -3630,6 +3658,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.8.1" @@ -3833,10 +3870,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fe885c3a125aa45213b68cc1472a49880cb5923dc23f522ad2791b882228778" dependencies = [ "bitflags", - "errno", + "errno 0.2.8", + "io-lifetimes", + "libc", + "linux-raw-sys 0.1.4", + "windows-sys 0.45.0", +] + +[[package]] +name = "rustix" +version = "0.37.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2aae838e49b3d63e9274e1c01833cc8139d3fec468c3b84688c628f44b1ae11d" +dependencies = [ + "bitflags", + "errno 0.3.1", "io-lifetimes", "libc", - "linux-raw-sys", + "linux-raw-sys 0.3.6", "windows-sys 0.45.0", ] @@ -4354,15 +4405,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.4.0" +version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95" +checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998" dependencies = [ "cfg-if", "fastrand", - "redox_syscall", - "rustix", - "windows-sys 0.42.0", + "redox_syscall 0.3.5", + "rustix 0.37.7", + "windows-sys 0.45.0", ] [[package]] @@ -5693,12 +5744,14 @@ dependencies = [ name = "zinniad" version = "0.7.0" dependencies = [ + "atomicwrites", "clap", "env_logger", "log", "pretty_assertions", "serde", "serde_json", + "tempfile", "tokio", "zinnia_runtime", ] diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 49fc0fb3..70bcb695 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -14,6 +14,7 @@ path = "main.rs" doc = false [dependencies] +atomicwrites = "0.4.1" clap = { version = "4.2.5", features = ["derive", "env"] } env_logger.workspace = true log.workspace = true @@ -24,3 +25,4 @@ zinnia_runtime = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } +tempfile = "3.5.0" diff --git a/daemon/main.rs b/daemon/main.rs index d57ccce7..13e3d549 100644 --- a/daemon/main.rs +++ b/daemon/main.rs @@ -1,6 +1,8 @@ mod args; +mod state; mod station_reporter; +use std::path::PathBuf; use std::rc::Rc; use std::time::Duration; @@ -36,6 +38,9 @@ async fn run(config: CliArgs) -> Result<()> { )); } + let state_file = PathBuf::from(config.state_root).join("state.json"); + log::debug!("Using state file: {}", state_file.display()); + log_info_activity("Module Runtime started."); let file = &config.files[0]; @@ -56,6 +61,7 @@ async fn run(config: CliArgs) -> Result<()> { ), wallet_address: config.wallet_address, reporter: Rc::new(StationReporter::new( + state_file, Duration::from_millis(200), module_name.into(), )), diff --git a/daemon/state.rs b/daemon/state.rs new file mode 100644 index 00000000..be736505 --- /dev/null +++ b/daemon/state.rs @@ -0,0 +1,99 @@ +use atomicwrites::{AtomicFile, OverwriteBehavior}; +use serde::{Deserialize, Serialize}; +use std::io::Write; +use std::path::Path; + +#[derive(Serialize, Deserialize, Debug)] +pub struct State { + pub total_jobs_completed: u64, +} + +impl Default for State { + fn default() -> Self { + Self { + total_jobs_completed: 0, + } + } +} + +pub fn load(state_file: &Path) -> State { + log::debug!("Loading initial state from {}", state_file.display()); + match std::fs::read_to_string(state_file) { + Err(err) => { + if err.kind() != std::io::ErrorKind::NotFound { + log::warn!( + "Cannot read initial state from {}: {}", + state_file.display(), + err + ); + } + return State::default(); + } + Ok(data) => match serde_json::from_str::(&data) { + Err(err) => { + log::warn!( + "Cannot parse initial state from {}: {}", + state_file.display(), + err + ); + return State::default(); + } + Ok(state) => { + log::debug!("Initial state: {:?}", state); + return state; + } + }, + } +} + +pub fn store(state_file: &Path, state: &State) { + if let Some(parent) = state_file.parent() { + if let Err(err) = std::fs::create_dir_all(&parent) { + log::warn!( + "Cannot create state directory {}: {}", + parent.display(), + err + ); + return; + } + } + + let payload = match serde_json::to_string_pretty(&state) { + Err(err) => { + log::warn!("Cannot serialize state: {}", err); + return; + } + + Ok(payload) => payload, + }; + + let write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) + .write(|f| f.write_all(payload.as_bytes())); + match write_result { + Err(err) => log::warn!("Cannot write state to {}: {}", state_file.display(), err), + Ok(()) => log::debug!("State stored in {}", state_file.display()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use tempfile; + use zinnia_runtime::anyhow::Result; + + #[test] + fn creates_missing_directories() -> Result<()> { + let state_dir = tempfile::tempdir()?; + let state_file = state_dir.path().join("subdir").join("state.json"); + store( + &state_file, + &State { + total_jobs_completed: 1, + }, + ); + let loaded = load(&state_file); + assert_eq!(loaded.total_jobs_completed, 1); + Ok(()) + } +} diff --git a/daemon/station_reporter.rs b/daemon/station_reporter.rs index 3761e433..14624152 100644 --- a/daemon/station_reporter.rs +++ b/daemon/station_reporter.rs @@ -1,28 +1,38 @@ use std::cell::RefCell; use std::io::{stderr, stdout, Write}; +use std::path::PathBuf; use std::time::Duration; use serde_json::{json, Map}; use zinnia_runtime::anyhow::Result; use zinnia_runtime::{JobCompletionTracker, LogLevel, Reporter}; +use crate::state::{self, State}; + /// StationReporter reports activities to stdout as ND-JSON stream and all Console logs to stderr pub struct StationReporter { tracker: RefCell, module_name: String, log_target: String, + state_file: PathBuf, } impl StationReporter { /// Create a new instance. /// /// `job_report_delay` specifies how often the information about new jobs is printed. - pub fn new(job_report_delay: Duration, module_name: String) -> Self { + pub fn new(state_file: PathBuf, job_report_delay: Duration, module_name: String) -> Self { let log_target = format!("module:{module_name}"); + let initial_job_count = state::load(&state_file).total_jobs_completed; + Self { - tracker: RefCell::new(JobCompletionTracker::new(job_report_delay)), + tracker: RefCell::new(JobCompletionTracker::new( + initial_job_count, + job_report_delay, + )), module_name, log_target, + state_file, } } @@ -36,7 +46,7 @@ impl StationReporter { let event = json!({ "type": "jobs-completed", "total": total, - "modules": modules, + "modules": modules, }); let _ = print_event(&event); @@ -110,8 +120,49 @@ impl Reporter for StationReporter { } fn job_completed(&self) { - self.tracker + let total_jobs_completed = self + .tracker .borrow_mut() .job_completed(|n| self.print_jobs_completed(n)); + + let state = State { + total_jobs_completed, + }; + state::store(&self.state_file, &state); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use tempfile; + + const NO_DELAY: Duration = Duration::from_millis(0); + + #[test] + fn persists_job_counter() -> Result<()> { + let state_dir = tempfile::tempdir()?; + let state_file = state_dir.path().join("state.json"); + let reporter = StationReporter::new(state_file.clone(), NO_DELAY, "test".into()); + assert_eq!(reporter.tracker.borrow().counter(), 0, "initial count"); + + reporter.job_completed(); + println!("{:?}", reporter.tracker); + assert_eq!( + reporter.tracker.borrow().counter(), + 1, + "count after a job was completed" + ); + + let reporter = StationReporter::new(state_file, NO_DELAY, "test".into()); + println!("{:?}", reporter.tracker); + assert_eq!( + reporter.tracker.borrow().counter(), + 1, + "count after creating a new reporter" + ); + + Ok(()) } } diff --git a/runtime/console_reporter.rs b/runtime/console_reporter.rs index b5caf1ad..452128b4 100644 --- a/runtime/console_reporter.rs +++ b/runtime/console_reporter.rs @@ -16,25 +16,31 @@ pub struct JobCompletionTracker { } impl JobCompletionTracker { - pub fn new(delay: Duration) -> Self { + pub fn new(initial_value: u64, delay: Duration) -> Self { Self { delay, - counter: 0, + counter: initial_value, last_report: None, } } - pub fn job_completed(&mut self, log: F) { + pub fn counter(&self) -> u64 { + self.counter + } + + pub fn job_completed(&mut self, log: F) -> u64 { self.counter += 1; if let Some(last) = self.last_report { if last.0.elapsed() < self.delay { - return; + return self.counter; } } self.last_report.replace((Instant::now(), self.counter)); log(self.counter); + + self.counter } pub fn flush(&mut self, log: F) { @@ -63,7 +69,7 @@ impl ConsoleReporter { /// `job_report_delay` specifies how often the information about new jobs is printed. pub fn new(job_report_delay: Duration) -> Self { Self { - tracker: RefCell::new(JobCompletionTracker::new(job_report_delay)), + tracker: RefCell::new(JobCompletionTracker::new(0, job_report_delay)), } } @@ -145,7 +151,7 @@ mod tests { impl Default for JobCompletionTracker { fn default() -> Self { - Self::new(Duration::from_millis(1000)) + Self::new(0, Duration::from_millis(1000)) } } @@ -169,7 +175,7 @@ mod tests { #[test] fn tracker_prints_new_jobs_after_delay() { let mut reported = 0; - let mut tracker = JobCompletionTracker::new(Duration::from_millis(1)); + let mut tracker = JobCompletionTracker::new(0, Duration::from_millis(1)); tracker.job_completed(|x| reported = x); std::thread::sleep(Duration::from_millis(2)); tracker.job_completed(|x| reported = x); From b0ff2ff17bfaed2b78290d9aa9846320bd653e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 2 May 2023 17:03:47 +0200 Subject: [PATCH 2/7] refactor: code cleanup in reporters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove repetitive code to ignore print errors and move it into a shared helper fn. Signed-off-by: Miroslav Bajtoš --- daemon/station_reporter.rs | 26 +++++++++---------- runtime/console_reporter.rs | 51 +++++++++++++++++++++---------------- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/daemon/station_reporter.rs b/daemon/station_reporter.rs index 14624152..5411e2db 100644 --- a/daemon/station_reporter.rs +++ b/daemon/station_reporter.rs @@ -49,15 +49,17 @@ impl StationReporter { "modules": modules, }); - let _ = print_event(&event); - // ^^^ We are ignoring errors because there isn't much to do in such case + print_event(&event); } } -fn print_event(data: &serde_json::Value) -> Result<()> { - writeln!(stdout(), "{data}")?; - stdout().flush()?; - Ok(()) +fn print_event(data: &serde_json::Value) { + writeln!(stdout(), "{data}") + .and_then(|_| stdout().flush()) + .unwrap_or_else(|err| { + // We are ignoring errors because there isn't much to do in such case + log::debug!("Cannot print event {}: {}", data, err); + }); } pub fn log_info_activity(msg: &str) { @@ -66,8 +68,7 @@ pub fn log_info_activity(msg: &str) { "module": serde_json::Value::Null, "message": msg, }); - let _ = print_event(&event); - // ^^^ We are ignoring errors because there isn't much to do in such case + print_event(&event); } #[allow(unused)] @@ -77,8 +78,7 @@ pub fn log_error_activity(msg: &str) { "module": serde_json::Value::Null, "message": msg, }); - let _ = print_event(&event); - // ^^^ We are ignoring errors because there isn't much to do in such case + print_event(&event); } impl Drop for StationReporter { @@ -105,8 +105,7 @@ impl Reporter for StationReporter { "module": self.module_name, "message": msg, }); - let _ = print_event(&event); - // ^^^ We are ignoring errors because there isn't much to do in such case + print_event(&event); } fn error_activity(&self, msg: &str) { @@ -115,8 +114,7 @@ impl Reporter for StationReporter { "module": self.module_name, "message": msg, }); - let _ = print_event(&event); - // ^^^ We are ignoring errors because there isn't much to do in such case + print_event(&event); } fn job_completed(&self) { diff --git a/runtime/console_reporter.rs b/runtime/console_reporter.rs index 452128b4..4efd3c20 100644 --- a/runtime/console_reporter.rs +++ b/runtime/console_reporter.rs @@ -75,27 +75,36 @@ impl ConsoleReporter { fn print_jobs_completed(&self, total: u64) { let msg = format!("Jobs completed: {total}"); - let _ = self.report("STATS", &msg, Color::Yellow); - // ^^^ We are ignoring errors because there isn't much to do in such case - } - - fn report(&self, scope: &str, msg: &str, color: Color) -> Result<()> { - if use_color() { - let mut spec = ColorSpec::new(); - // spec.set_fg(Some(color)).set_bold(true); - spec.set_fg(Some(color)); - let mut ansi_writer = Ansi::new(stdout()); - ansi_writer.set_color(&spec)?; - print_raw_report(&mut ansi_writer, scope, msg)?; - ansi_writer.reset()?; - } else { - print_raw_report(&mut stdout(), scope, msg)?; - } - stdout().flush()?; - Ok(()) + self.report("STATS", &msg, Color::Yellow); + } + + fn report(&self, scope: &str, msg: &str, color: Color) { + print_report(scope, msg, color).unwrap_or_else(|err| { + // We are ignoring errors because there isn't much to do in such case + log::debug!( + "Cannot report event [scope:{scope} color:{color:?}] {msg:?}: {}", + err + ) + }) } } +fn print_report(scope: &str, msg: &str, color: Color) -> Result<()> { + if use_color() { + let mut spec = ColorSpec::new(); + // spec.set_fg(Some(color)).set_bold(true); + spec.set_fg(Some(color)); + let mut ansi_writer = Ansi::new(stdout()); + ansi_writer.set_color(&spec)?; + print_raw_report(&mut ansi_writer, scope, msg)?; + ansi_writer.reset()?; + } else { + print_raw_report(&mut stdout(), scope, msg)?; + } + stdout().flush()?; + Ok(()) +} + fn print_raw_report(w: &mut W, scope: &str, msg: &str) -> std::io::Result<()> { // Important: activity messages do not include the final newline character writeln!(w, "[{} {scope:>5}] {msg}", now_str()) @@ -124,13 +133,11 @@ impl Reporter for ConsoleReporter { } fn info_activity(&self, msg: &str) { - let _ = self.report("INFO", msg, Color::Green); - // ^^^ We are ignoring errors because there isn't much to do in such case + self.report("INFO", msg, Color::Green); } fn error_activity(&self, msg: &str) { - let _ = self.report("ERROR", msg, Color::Red); - // ^^^ We are ignoring errors because there isn't much to do in such case + self.report("ERROR", msg, Color::Red); } fn job_completed(&self) { From 041ac1554e9ff5b17457fa91e44ddcdcff346dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Wed, 3 May 2023 08:38:08 +0200 Subject: [PATCH 3/7] fixup! remove unecessary `return` keywords MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- daemon/state.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/daemon/state.rs b/daemon/state.rs index be736505..e240e35d 100644 --- a/daemon/state.rs +++ b/daemon/state.rs @@ -27,7 +27,7 @@ pub fn load(state_file: &Path) -> State { err ); } - return State::default(); + State::default() } Ok(data) => match serde_json::from_str::(&data) { Err(err) => { @@ -36,11 +36,11 @@ pub fn load(state_file: &Path) -> State { state_file.display(), err ); - return State::default(); + State::default() } Ok(state) => { log::debug!("Initial state: {:?}", state); - return state; + state } }, } From f2564108c2585685408d9a8540bd44f90d2f5f09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Wed, 3 May 2023 08:52:01 +0200 Subject: [PATCH 4/7] fixup! move state methods to struct impl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- daemon/state.rs | 107 +++++++++++++++++++------------------ daemon/station_reporter.rs | 8 +-- 2 files changed, 58 insertions(+), 57 deletions(-) diff --git a/daemon/state.rs b/daemon/state.rs index e240e35d..9f541896 100644 --- a/daemon/state.rs +++ b/daemon/state.rs @@ -16,62 +16,65 @@ impl Default for State { } } -pub fn load(state_file: &Path) -> State { - log::debug!("Loading initial state from {}", state_file.display()); - match std::fs::read_to_string(state_file) { - Err(err) => { - if err.kind() != std::io::ErrorKind::NotFound { - log::warn!( - "Cannot read initial state from {}: {}", - state_file.display(), - err - ); +impl State { + pub fn load(state_file: &Path) -> Self { + log::debug!("Loading initial state from {}", state_file.display()); + match std::fs::read_to_string(state_file) { + Err(err) => { + if err.kind() != std::io::ErrorKind::NotFound { + log::warn!( + "Cannot read initial state from {}: {}", + state_file.display(), + err + ); + } + State::default() } - State::default() + Ok(data) => match serde_json::from_str::(&data) { + Err(err) => { + log::warn!( + "Cannot parse initial state from {}: {}", + state_file.display(), + err + ); + State::default() + } + Ok(state) => { + log::debug!("Initial state: {:?}", state); + state + } + }, } - Ok(data) => match serde_json::from_str::(&data) { - Err(err) => { + } + + pub fn store(&self, state_file: &Path) { + if let Some(parent) = state_file.parent() { + if let Err(err) = std::fs::create_dir_all(&parent) { log::warn!( - "Cannot parse initial state from {}: {}", - state_file.display(), + "Cannot create state directory {}: {}", + parent.display(), err ); - State::default() - } - Ok(state) => { - log::debug!("Initial state: {:?}", state); - state + return; } - }, - } -} - -pub fn store(state_file: &Path, state: &State) { - if let Some(parent) = state_file.parent() { - if let Err(err) = std::fs::create_dir_all(&parent) { - log::warn!( - "Cannot create state directory {}: {}", - parent.display(), - err - ); - return; } - } - let payload = match serde_json::to_string_pretty(&state) { - Err(err) => { - log::warn!("Cannot serialize state: {}", err); - return; - } + let payload = match serde_json::to_string_pretty(self) { + Err(err) => { + log::warn!("Cannot serialize state: {}", err); + return; + } + + Ok(payload) => payload, + }; - Ok(payload) => payload, - }; + let write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) + .write(|f| f.write_all(payload.as_bytes())); - let write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) - .write(|f| f.write_all(payload.as_bytes())); - match write_result { - Err(err) => log::warn!("Cannot write state to {}: {}", state_file.display(), err), - Ok(()) => log::debug!("State stored in {}", state_file.display()), + match write_result { + Err(err) => log::warn!("Cannot write state to {}: {}", state_file.display(), err), + Ok(()) => log::debug!("State stored in {}", state_file.display()), + } } } @@ -86,13 +89,11 @@ mod tests { fn creates_missing_directories() -> Result<()> { let state_dir = tempfile::tempdir()?; let state_file = state_dir.path().join("subdir").join("state.json"); - store( - &state_file, - &State { - total_jobs_completed: 1, - }, - ); - let loaded = load(&state_file); + let state = State { + total_jobs_completed: 1, + }; + state.store(&state_file); + let loaded = State::load(&state_file); assert_eq!(loaded.total_jobs_completed, 1); Ok(()) } diff --git a/daemon/station_reporter.rs b/daemon/station_reporter.rs index 5411e2db..5d2af51d 100644 --- a/daemon/station_reporter.rs +++ b/daemon/station_reporter.rs @@ -4,10 +4,9 @@ use std::path::PathBuf; use std::time::Duration; use serde_json::{json, Map}; -use zinnia_runtime::anyhow::Result; use zinnia_runtime::{JobCompletionTracker, LogLevel, Reporter}; -use crate::state::{self, State}; +use crate::state::State; /// StationReporter reports activities to stdout as ND-JSON stream and all Console logs to stderr pub struct StationReporter { @@ -23,7 +22,7 @@ impl StationReporter { /// `job_report_delay` specifies how often the information about new jobs is printed. pub fn new(state_file: PathBuf, job_report_delay: Duration, module_name: String) -> Self { let log_target = format!("module:{module_name}"); - let initial_job_count = state::load(&state_file).total_jobs_completed; + let initial_job_count = State::load(&state_file).total_jobs_completed; Self { tracker: RefCell::new(JobCompletionTracker::new( @@ -126,7 +125,7 @@ impl Reporter for StationReporter { let state = State { total_jobs_completed, }; - state::store(&self.state_file, &state); + state.store(&self.state_file); } } @@ -135,6 +134,7 @@ mod tests { use super::*; use pretty_assertions::assert_eq; use tempfile; + use zinnia_runtime::anyhow::Result; const NO_DELAY: Duration = Duration::from_millis(0); From 7e7d49a8e21966f4b89ba56af39c0d27699a18f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Wed, 3 May 2023 08:54:40 +0200 Subject: [PATCH 5/7] fixup! mkdir only on write errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- daemon/state.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/daemon/state.rs b/daemon/state.rs index 9f541896..17912f53 100644 --- a/daemon/state.rs +++ b/daemon/state.rs @@ -48,17 +48,6 @@ impl State { } pub fn store(&self, state_file: &Path) { - if let Some(parent) = state_file.parent() { - if let Err(err) = std::fs::create_dir_all(&parent) { - log::warn!( - "Cannot create state directory {}: {}", - parent.display(), - err - ); - return; - } - } - let payload = match serde_json::to_string_pretty(self) { Err(err) => { log::warn!("Cannot serialize state: {}", err); @@ -68,9 +57,27 @@ impl State { Ok(payload) => payload, }; - let write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) + let mut write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) .write(|f| f.write_all(payload.as_bytes())); + if let Err(atomicwrites::Error::Internal(err)) = &write_result { + if err.kind() == std::io::ErrorKind::NotFound { + if let Some(parent) = state_file.parent() { + if let Err(err) = std::fs::create_dir_all(&parent) { + log::warn!( + "Cannot create state directory {}: {}", + parent.display(), + err + ); + } else { + write_result = + AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) + .write(|f| f.write_all(payload.as_bytes())); + } + } + } + } + match write_result { Err(err) => log::warn!("Cannot write state to {}: {}", state_file.display(), err), Ok(()) => log::debug!("State stored in {}", state_file.display()), From 6055e990fbc5ef158b25c6eaa5fd16cfb745b4eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Wed, 3 May 2023 15:23:09 +0200 Subject: [PATCH 6/7] fixup! rework error handling + fix clippy errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- daemon/state.rs | 103 ++++++++++++++++--------------------- daemon/station_reporter.rs | 16 ++++-- 2 files changed, 55 insertions(+), 64 deletions(-) diff --git a/daemon/state.rs b/daemon/state.rs index 17912f53..1b175d18 100644 --- a/daemon/state.rs +++ b/daemon/state.rs @@ -2,60 +2,40 @@ use atomicwrites::{AtomicFile, OverwriteBehavior}; use serde::{Deserialize, Serialize}; use std::io::Write; use std::path::Path; +use zinnia_runtime::anyhow::{self, Context, Result}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Default)] pub struct State { pub total_jobs_completed: u64, } -impl Default for State { - fn default() -> Self { - Self { - total_jobs_completed: 0, - } - } -} - impl State { - pub fn load(state_file: &Path) -> Self { + pub fn load(state_file: &Path) -> Result { log::debug!("Loading initial state from {}", state_file.display()); match std::fs::read_to_string(state_file) { - Err(err) => { - if err.kind() != std::io::ErrorKind::NotFound { - log::warn!( - "Cannot read initial state from {}: {}", - state_file.display(), - err - ); - } - State::default() - } - Ok(data) => match serde_json::from_str::(&data) { - Err(err) => { - log::warn!( - "Cannot parse initial state from {}: {}", - state_file.display(), - err - ); - State::default() - } - Ok(state) => { - log::debug!("Initial state: {:?}", state); - state + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => { + let state = State::default(); + log::debug!("State file not found, returning {state:?}"); + Ok(state) } + _ => Err(anyhow::Error::new(err).context(format!( + "Cannot load initial state from {}", + state_file.display() + ))), }, + Ok(data) => { + let state = serde_json::from_str::(&data).with_context(|| { + format!("Cannot parse initial state from {}", state_file.display()) + })?; + log::debug!("Loaded initial state: {state:?}"); + Ok(state) + } } } - pub fn store(&self, state_file: &Path) { - let payload = match serde_json::to_string_pretty(self) { - Err(err) => { - log::warn!("Cannot serialize state: {}", err); - return; - } - - Ok(payload) => payload, - }; + pub fn store(&self, state_file: &Path) -> Result<()> { + let payload = serde_json::to_string_pretty(self).context("Cannot serialize state")?; let mut write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) .write(|f| f.write_all(payload.as_bytes())); @@ -63,25 +43,19 @@ impl State { if let Err(atomicwrites::Error::Internal(err)) = &write_result { if err.kind() == std::io::ErrorKind::NotFound { if let Some(parent) = state_file.parent() { - if let Err(err) = std::fs::create_dir_all(&parent) { - log::warn!( - "Cannot create state directory {}: {}", - parent.display(), - err - ); - } else { - write_result = - AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) - .write(|f| f.write_all(payload.as_bytes())); - } + std::fs::create_dir_all(parent).with_context(|| { + format!("Cannot create state directory {}", parent.display(),) + })?; + write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) + .write(|f| f.write_all(payload.as_bytes())); } } } - match write_result { - Err(err) => log::warn!("Cannot write state to {}: {}", state_file.display(), err), - Ok(()) => log::debug!("State stored in {}", state_file.display()), - } + write_result.with_context(|| format!("Cannot write state to {}", state_file.display()))?; + log::debug!("State stored in {}", state_file.display()); + + Ok(()) } } @@ -89,18 +63,27 @@ impl State { mod tests { use super::*; use pretty_assertions::assert_eq; - use tempfile; + use tempfile::tempdir; use zinnia_runtime::anyhow::Result; + #[test] + fn loads_empty_state() -> Result<()> { + let state_dir = tempdir()?; + let state_file = state_dir.path().join("state.json"); + let loaded = State::load(&state_file)?; + assert_eq!(loaded.total_jobs_completed, 0, "total_jobs_completed"); + Ok(()) + } + #[test] fn creates_missing_directories() -> Result<()> { - let state_dir = tempfile::tempdir()?; + let state_dir = tempdir()?; let state_file = state_dir.path().join("subdir").join("state.json"); let state = State { total_jobs_completed: 1, }; - state.store(&state_file); - let loaded = State::load(&state_file); + state.store(&state_file)?; + let loaded = State::load(&state_file)?; assert_eq!(loaded.total_jobs_completed, 1); Ok(()) } diff --git a/daemon/station_reporter.rs b/daemon/station_reporter.rs index 5d2af51d..69166519 100644 --- a/daemon/station_reporter.rs +++ b/daemon/station_reporter.rs @@ -22,7 +22,11 @@ impl StationReporter { /// `job_report_delay` specifies how often the information about new jobs is printed. pub fn new(state_file: PathBuf, job_report_delay: Duration, module_name: String) -> Self { let log_target = format!("module:{module_name}"); - let initial_job_count = State::load(&state_file).total_jobs_completed; + let initial_job_count = State::load(&state_file) + // NOTE(bajtos) We are intentionally calling unwrap() to crash the process in case + // it's not possible to read the state file or parse the content. + .unwrap() + .total_jobs_completed; Self { tracker: RefCell::new(JobCompletionTracker::new( @@ -125,7 +129,11 @@ impl Reporter for StationReporter { let state = State { total_jobs_completed, }; - state.store(&self.state_file); + state + .store(&self.state_file) + // NOTE(bajtos) We are intentionally calling unwrap() to crash the process in case + // we cannot store the state into the file. + .unwrap(); } } @@ -133,14 +141,14 @@ impl Reporter for StationReporter { mod tests { use super::*; use pretty_assertions::assert_eq; - use tempfile; + use tempfile::tempdir; use zinnia_runtime::anyhow::Result; const NO_DELAY: Duration = Duration::from_millis(0); #[test] fn persists_job_counter() -> Result<()> { - let state_dir = tempfile::tempdir()?; + let state_dir = tempdir()?; let state_file = state_dir.path().join("state.json"); let reporter = StationReporter::new(state_file.clone(), NO_DELAY, "test".into()); assert_eq!(reporter.tracker.borrow().counter(), 0, "initial count"); From 518f4c042d5dfbe0c09261d9cdc2e057c53a2f45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Wed, 3 May 2023 15:27:18 +0200 Subject: [PATCH 7/7] fixup! code cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- daemon/state.rs | 1 - daemon/station_reporter.rs | 2 -- 2 files changed, 3 deletions(-) diff --git a/daemon/state.rs b/daemon/state.rs index 1b175d18..26684147 100644 --- a/daemon/state.rs +++ b/daemon/state.rs @@ -54,7 +54,6 @@ impl State { write_result.with_context(|| format!("Cannot write state to {}", state_file.display()))?; log::debug!("State stored in {}", state_file.display()); - Ok(()) } } diff --git a/daemon/station_reporter.rs b/daemon/station_reporter.rs index 69166519..6d71d476 100644 --- a/daemon/station_reporter.rs +++ b/daemon/station_reporter.rs @@ -154,7 +154,6 @@ mod tests { assert_eq!(reporter.tracker.borrow().counter(), 0, "initial count"); reporter.job_completed(); - println!("{:?}", reporter.tracker); assert_eq!( reporter.tracker.borrow().counter(), 1, @@ -162,7 +161,6 @@ mod tests { ); let reporter = StationReporter::new(state_file, NO_DELAY, "test".into()); - println!("{:?}", reporter.tracker); assert_eq!( reporter.tracker.borrow().counter(), 1,