From 6622306704ad0a73422443a79463dbe4e6d62920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Wed, 3 May 2023 15:49:07 +0200 Subject: [PATCH] feat: persist job counter across restarts (#198) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * zinniad: persist "jobs completed" across restarts * refactor: code cleanup in reporters --------- Signed-off-by: Miroslav Bajtoš --- Cargo.lock | 73 +++++++++++++++++++++++++---- daemon/Cargo.toml | 2 + daemon/main.rs | 6 +++ daemon/state.rs | 89 +++++++++++++++++++++++++++++++++++ daemon/station_reporter.rs | 93 +++++++++++++++++++++++++++++-------- runtime/console_reporter.rs | 71 ++++++++++++++++------------ 6 files changed, 276 insertions(+), 58 deletions(-) create mode 100644 daemon/state.rs diff --git a/Cargo.lock b/Cargo.lock index 6f75f5ae..8ead95b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -436,6 +436,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "debc29dde2e69f9e47506b525f639ed42300fc014a3e007832592448fa8e4599" +[[package]] +name = "atomicwrites" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1163d9d7c51de51a2b79d6df5e8888d11e9df17c752ce4a285fb6ca1580734e" +dependencies = [ + "rustix 0.37.7", + "tempfile", + "windows-sys 0.48.0", +] + [[package]] name = "atty" version = "0.2.14" @@ -1507,6 +1518,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "errno" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" +dependencies = [ + "errno-dragonfly", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "errno-dragonfly" version = "0.1.2" @@ -2181,7 +2203,7 @@ checksum = "8687c819457e979cc940d09cb16e42a1bf70aa6b60a549de6d3a62a0ee90c69e" dependencies = [ "hermit-abi 0.3.1", "io-lifetimes", - "rustix", + "rustix 0.36.10", "windows-sys 0.45.0", ] @@ -2704,6 +2726,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" +[[package]] +name = "linux-raw-sys" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b64f40e5e03e0d54f03845c8197d0291253cdbedfb1cb46b13c2c117554a9f4c" + [[package]] name = "lock_api" version = "0.4.9" @@ -3165,7 +3193,7 @@ dependencies = [ "cfg-if", "instant", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "smallvec", "winapi", ] @@ -3178,7 +3206,7 @@ checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "smallvec", "windows-sys 0.45.0", ] @@ -3630,6 +3658,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.8.1" @@ -3833,10 +3870,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fe885c3a125aa45213b68cc1472a49880cb5923dc23f522ad2791b882228778" dependencies = [ "bitflags", - "errno", + "errno 0.2.8", + "io-lifetimes", + "libc", + "linux-raw-sys 0.1.4", + "windows-sys 0.45.0", +] + +[[package]] +name = "rustix" +version = "0.37.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2aae838e49b3d63e9274e1c01833cc8139d3fec468c3b84688c628f44b1ae11d" +dependencies = [ + "bitflags", + "errno 0.3.1", "io-lifetimes", "libc", - "linux-raw-sys", + "linux-raw-sys 0.3.6", "windows-sys 0.45.0", ] @@ -4354,15 +4405,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.4.0" +version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95" +checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998" dependencies = [ "cfg-if", "fastrand", - "redox_syscall", - "rustix", - "windows-sys 0.42.0", + "redox_syscall 0.3.5", + "rustix 0.37.7", + "windows-sys 0.45.0", ] [[package]] @@ -5693,12 +5744,14 @@ dependencies = [ name = "zinniad" version = "0.7.0" dependencies = [ + "atomicwrites", "clap", "env_logger", "log", "pretty_assertions", "serde", "serde_json", + "tempfile", "tokio", "zinnia_runtime", ] diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index eda40717..802ca2f8 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -14,6 +14,7 @@ path = "main.rs" doc = false [dependencies] +atomicwrites = "0.4.1" clap = { version = "4.2.7", features = ["derive", "env"] } env_logger.workspace = true log.workspace = true @@ -24,3 +25,4 @@ zinnia_runtime = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } +tempfile = "3.5.0" diff --git a/daemon/main.rs b/daemon/main.rs index d57ccce7..13e3d549 100644 --- a/daemon/main.rs +++ b/daemon/main.rs @@ -1,6 +1,8 @@ mod args; +mod state; mod station_reporter; +use std::path::PathBuf; use std::rc::Rc; use std::time::Duration; @@ -36,6 +38,9 @@ async fn run(config: CliArgs) -> Result<()> { )); } + let state_file = PathBuf::from(config.state_root).join("state.json"); + log::debug!("Using state file: {}", state_file.display()); + log_info_activity("Module Runtime started."); let file = &config.files[0]; @@ -56,6 +61,7 @@ async fn run(config: CliArgs) -> Result<()> { ), wallet_address: config.wallet_address, reporter: Rc::new(StationReporter::new( + state_file, Duration::from_millis(200), module_name.into(), )), diff --git a/daemon/state.rs b/daemon/state.rs new file mode 100644 index 00000000..26684147 --- /dev/null +++ b/daemon/state.rs @@ -0,0 +1,89 @@ +use atomicwrites::{AtomicFile, OverwriteBehavior}; +use serde::{Deserialize, Serialize}; +use std::io::Write; +use std::path::Path; +use zinnia_runtime::anyhow::{self, Context, Result}; + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct State { + pub total_jobs_completed: u64, +} + +impl State { + pub fn load(state_file: &Path) -> Result { + log::debug!("Loading initial state from {}", state_file.display()); + match std::fs::read_to_string(state_file) { + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => { + let state = State::default(); + log::debug!("State file not found, returning {state:?}"); + Ok(state) + } + _ => Err(anyhow::Error::new(err).context(format!( + "Cannot load initial state from {}", + state_file.display() + ))), + }, + Ok(data) => { + let state = serde_json::from_str::(&data).with_context(|| { + format!("Cannot parse initial state from {}", state_file.display()) + })?; + log::debug!("Loaded initial state: {state:?}"); + Ok(state) + } + } + } + + pub fn store(&self, state_file: &Path) -> Result<()> { + let payload = serde_json::to_string_pretty(self).context("Cannot serialize state")?; + + let mut write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) + .write(|f| f.write_all(payload.as_bytes())); + + if let Err(atomicwrites::Error::Internal(err)) = &write_result { + if err.kind() == std::io::ErrorKind::NotFound { + if let Some(parent) = state_file.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!("Cannot create state directory {}", parent.display(),) + })?; + write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite) + .write(|f| f.write_all(payload.as_bytes())); + } + } + } + + write_result.with_context(|| format!("Cannot write state to {}", state_file.display()))?; + log::debug!("State stored in {}", state_file.display()); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use tempfile::tempdir; + use zinnia_runtime::anyhow::Result; + + #[test] + fn loads_empty_state() -> Result<()> { + let state_dir = tempdir()?; + let state_file = state_dir.path().join("state.json"); + let loaded = State::load(&state_file)?; + assert_eq!(loaded.total_jobs_completed, 0, "total_jobs_completed"); + Ok(()) + } + + #[test] + fn creates_missing_directories() -> Result<()> { + let state_dir = tempdir()?; + let state_file = state_dir.path().join("subdir").join("state.json"); + let state = State { + total_jobs_completed: 1, + }; + state.store(&state_file)?; + let loaded = State::load(&state_file)?; + assert_eq!(loaded.total_jobs_completed, 1); + Ok(()) + } +} diff --git a/daemon/station_reporter.rs b/daemon/station_reporter.rs index 3761e433..6d71d476 100644 --- a/daemon/station_reporter.rs +++ b/daemon/station_reporter.rs @@ -1,28 +1,41 @@ use std::cell::RefCell; use std::io::{stderr, stdout, Write}; +use std::path::PathBuf; use std::time::Duration; use serde_json::{json, Map}; -use zinnia_runtime::anyhow::Result; use zinnia_runtime::{JobCompletionTracker, LogLevel, Reporter}; +use crate::state::State; + /// StationReporter reports activities to stdout as ND-JSON stream and all Console logs to stderr pub struct StationReporter { tracker: RefCell, module_name: String, log_target: String, + state_file: PathBuf, } impl StationReporter { /// Create a new instance. /// /// `job_report_delay` specifies how often the information about new jobs is printed. - pub fn new(job_report_delay: Duration, module_name: String) -> Self { + pub fn new(state_file: PathBuf, job_report_delay: Duration, module_name: String) -> Self { let log_target = format!("module:{module_name}"); + let initial_job_count = State::load(&state_file) + // NOTE(bajtos) We are intentionally calling unwrap() to crash the process in case + // it's not possible to read the state file or parse the content. + .unwrap() + .total_jobs_completed; + Self { - tracker: RefCell::new(JobCompletionTracker::new(job_report_delay)), + tracker: RefCell::new(JobCompletionTracker::new( + initial_job_count, + job_report_delay, + )), module_name, log_target, + state_file, } } @@ -36,18 +49,20 @@ impl StationReporter { let event = json!({ "type": "jobs-completed", "total": total, - "modules": modules, + "modules": modules, }); - let _ = print_event(&event); - // ^^^ We are ignoring errors because there isn't much to do in such case + print_event(&event); } } -fn print_event(data: &serde_json::Value) -> Result<()> { - writeln!(stdout(), "{data}")?; - stdout().flush()?; - Ok(()) +fn print_event(data: &serde_json::Value) { + writeln!(stdout(), "{data}") + .and_then(|_| stdout().flush()) + .unwrap_or_else(|err| { + // We are ignoring errors because there isn't much to do in such case + log::debug!("Cannot print event {}: {}", data, err); + }); } pub fn log_info_activity(msg: &str) { @@ -56,8 +71,7 @@ pub fn log_info_activity(msg: &str) { "module": serde_json::Value::Null, "message": msg, }); - let _ = print_event(&event); - // ^^^ We are ignoring errors because there isn't much to do in such case + print_event(&event); } #[allow(unused)] @@ -67,8 +81,7 @@ pub fn log_error_activity(msg: &str) { "module": serde_json::Value::Null, "message": msg, }); - let _ = print_event(&event); - // ^^^ We are ignoring errors because there isn't much to do in such case + print_event(&event); } impl Drop for StationReporter { @@ -95,8 +108,7 @@ impl Reporter for StationReporter { "module": self.module_name, "message": msg, }); - let _ = print_event(&event); - // ^^^ We are ignoring errors because there isn't much to do in such case + print_event(&event); } fn error_activity(&self, msg: &str) { @@ -105,13 +117,56 @@ impl Reporter for StationReporter { "module": self.module_name, "message": msg, }); - let _ = print_event(&event); - // ^^^ We are ignoring errors because there isn't much to do in such case + print_event(&event); } fn job_completed(&self) { - self.tracker + let total_jobs_completed = self + .tracker .borrow_mut() .job_completed(|n| self.print_jobs_completed(n)); + + let state = State { + total_jobs_completed, + }; + state + .store(&self.state_file) + // NOTE(bajtos) We are intentionally calling unwrap() to crash the process in case + // we cannot store the state into the file. + .unwrap(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use tempfile::tempdir; + use zinnia_runtime::anyhow::Result; + + const NO_DELAY: Duration = Duration::from_millis(0); + + #[test] + fn persists_job_counter() -> Result<()> { + let state_dir = tempdir()?; + let state_file = state_dir.path().join("state.json"); + let reporter = StationReporter::new(state_file.clone(), NO_DELAY, "test".into()); + assert_eq!(reporter.tracker.borrow().counter(), 0, "initial count"); + + reporter.job_completed(); + assert_eq!( + reporter.tracker.borrow().counter(), + 1, + "count after a job was completed" + ); + + let reporter = StationReporter::new(state_file, NO_DELAY, "test".into()); + assert_eq!( + reporter.tracker.borrow().counter(), + 1, + "count after creating a new reporter" + ); + + Ok(()) } } diff --git a/runtime/console_reporter.rs b/runtime/console_reporter.rs index b5caf1ad..4efd3c20 100644 --- a/runtime/console_reporter.rs +++ b/runtime/console_reporter.rs @@ -16,25 +16,31 @@ pub struct JobCompletionTracker { } impl JobCompletionTracker { - pub fn new(delay: Duration) -> Self { + pub fn new(initial_value: u64, delay: Duration) -> Self { Self { delay, - counter: 0, + counter: initial_value, last_report: None, } } - pub fn job_completed(&mut self, log: F) { + pub fn counter(&self) -> u64 { + self.counter + } + + pub fn job_completed(&mut self, log: F) -> u64 { self.counter += 1; if let Some(last) = self.last_report { if last.0.elapsed() < self.delay { - return; + return self.counter; } } self.last_report.replace((Instant::now(), self.counter)); log(self.counter); + + self.counter } pub fn flush(&mut self, log: F) { @@ -63,31 +69,40 @@ impl ConsoleReporter { /// `job_report_delay` specifies how often the information about new jobs is printed. pub fn new(job_report_delay: Duration) -> Self { Self { - tracker: RefCell::new(JobCompletionTracker::new(job_report_delay)), + tracker: RefCell::new(JobCompletionTracker::new(0, job_report_delay)), } } fn print_jobs_completed(&self, total: u64) { let msg = format!("Jobs completed: {total}"); - let _ = self.report("STATS", &msg, Color::Yellow); - // ^^^ We are ignoring errors because there isn't much to do in such case - } - - fn report(&self, scope: &str, msg: &str, color: Color) -> Result<()> { - if use_color() { - let mut spec = ColorSpec::new(); - // spec.set_fg(Some(color)).set_bold(true); - spec.set_fg(Some(color)); - let mut ansi_writer = Ansi::new(stdout()); - ansi_writer.set_color(&spec)?; - print_raw_report(&mut ansi_writer, scope, msg)?; - ansi_writer.reset()?; - } else { - print_raw_report(&mut stdout(), scope, msg)?; - } - stdout().flush()?; - Ok(()) + self.report("STATS", &msg, Color::Yellow); } + + fn report(&self, scope: &str, msg: &str, color: Color) { + print_report(scope, msg, color).unwrap_or_else(|err| { + // We are ignoring errors because there isn't much to do in such case + log::debug!( + "Cannot report event [scope:{scope} color:{color:?}] {msg:?}: {}", + err + ) + }) + } +} + +fn print_report(scope: &str, msg: &str, color: Color) -> Result<()> { + if use_color() { + let mut spec = ColorSpec::new(); + // spec.set_fg(Some(color)).set_bold(true); + spec.set_fg(Some(color)); + let mut ansi_writer = Ansi::new(stdout()); + ansi_writer.set_color(&spec)?; + print_raw_report(&mut ansi_writer, scope, msg)?; + ansi_writer.reset()?; + } else { + print_raw_report(&mut stdout(), scope, msg)?; + } + stdout().flush()?; + Ok(()) } fn print_raw_report(w: &mut W, scope: &str, msg: &str) -> std::io::Result<()> { @@ -118,13 +133,11 @@ impl Reporter for ConsoleReporter { } fn info_activity(&self, msg: &str) { - let _ = self.report("INFO", msg, Color::Green); - // ^^^ We are ignoring errors because there isn't much to do in such case + self.report("INFO", msg, Color::Green); } fn error_activity(&self, msg: &str) { - let _ = self.report("ERROR", msg, Color::Red); - // ^^^ We are ignoring errors because there isn't much to do in such case + self.report("ERROR", msg, Color::Red); } fn job_completed(&self) { @@ -145,7 +158,7 @@ mod tests { impl Default for JobCompletionTracker { fn default() -> Self { - Self::new(Duration::from_millis(1000)) + Self::new(0, Duration::from_millis(1000)) } } @@ -169,7 +182,7 @@ mod tests { #[test] fn tracker_prints_new_jobs_after_delay() { let mut reported = 0; - let mut tracker = JobCompletionTracker::new(Duration::from_millis(1)); + let mut tracker = JobCompletionTracker::new(0, Duration::from_millis(1)); tracker.job_completed(|x| reported = x); std::thread::sleep(Duration::from_millis(2)); tracker.job_completed(|x| reported = x);