Skip to content

Commit

Permalink
feat: persist job counter across restarts (#198)
Browse files Browse the repository at this point in the history
* zinniad: persist "jobs completed" across restarts
* refactor: code cleanup in reporters

---------

Signed-off-by: Miroslav Bajtoš <[email protected]>
  • Loading branch information
bajtos authored May 3, 2023
1 parent 0fb5b3c commit 6622306
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 58 deletions.
73 changes: 63 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ path = "main.rs"
doc = false

[dependencies]
atomicwrites = "0.4.1"
clap = { version = "4.2.7", features = ["derive", "env"] }
env_logger.workspace = true
log.workspace = true
Expand All @@ -24,3 +25,4 @@ zinnia_runtime = { workspace = true }

[dev-dependencies]
pretty_assertions = { workspace = true }
tempfile = "3.5.0"
6 changes: 6 additions & 0 deletions daemon/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod args;
mod state;
mod station_reporter;

use std::path::PathBuf;
use std::rc::Rc;
use std::time::Duration;

Expand Down Expand Up @@ -36,6 +38,9 @@ async fn run(config: CliArgs) -> Result<()> {
));
}

let state_file = PathBuf::from(config.state_root).join("state.json");
log::debug!("Using state file: {}", state_file.display());

log_info_activity("Module Runtime started.");

let file = &config.files[0];
Expand All @@ -56,6 +61,7 @@ async fn run(config: CliArgs) -> Result<()> {
),
wallet_address: config.wallet_address,
reporter: Rc::new(StationReporter::new(
state_file,
Duration::from_millis(200),
module_name.into(),
)),
Expand Down
89 changes: 89 additions & 0 deletions daemon/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use atomicwrites::{AtomicFile, OverwriteBehavior};
use serde::{Deserialize, Serialize};
use std::io::Write;
use std::path::Path;
use zinnia_runtime::anyhow::{self, Context, Result};

#[derive(Serialize, Deserialize, Debug, Default)]
pub struct State {
pub total_jobs_completed: u64,
}

impl State {
pub fn load(state_file: &Path) -> Result<Self> {
log::debug!("Loading initial state from {}", state_file.display());
match std::fs::read_to_string(state_file) {
Err(err) => match err.kind() {
std::io::ErrorKind::NotFound => {
let state = State::default();
log::debug!("State file not found, returning {state:?}");
Ok(state)
}
_ => Err(anyhow::Error::new(err).context(format!(
"Cannot load initial state from {}",
state_file.display()
))),
},
Ok(data) => {
let state = serde_json::from_str::<State>(&data).with_context(|| {
format!("Cannot parse initial state from {}", state_file.display())
})?;
log::debug!("Loaded initial state: {state:?}");
Ok(state)
}
}
}

pub fn store(&self, state_file: &Path) -> Result<()> {
let payload = serde_json::to_string_pretty(self).context("Cannot serialize state")?;

let mut write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite)
.write(|f| f.write_all(payload.as_bytes()));

if let Err(atomicwrites::Error::Internal(err)) = &write_result {
if err.kind() == std::io::ErrorKind::NotFound {
if let Some(parent) = state_file.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("Cannot create state directory {}", parent.display(),)
})?;
write_result = AtomicFile::new(state_file, OverwriteBehavior::AllowOverwrite)
.write(|f| f.write_all(payload.as_bytes()));
}
}
}

write_result.with_context(|| format!("Cannot write state to {}", state_file.display()))?;
log::debug!("State stored in {}", state_file.display());
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
use zinnia_runtime::anyhow::Result;

#[test]
fn loads_empty_state() -> Result<()> {
let state_dir = tempdir()?;
let state_file = state_dir.path().join("state.json");
let loaded = State::load(&state_file)?;
assert_eq!(loaded.total_jobs_completed, 0, "total_jobs_completed");
Ok(())
}

#[test]
fn creates_missing_directories() -> Result<()> {
let state_dir = tempdir()?;
let state_file = state_dir.path().join("subdir").join("state.json");
let state = State {
total_jobs_completed: 1,
};
state.store(&state_file)?;
let loaded = State::load(&state_file)?;
assert_eq!(loaded.total_jobs_completed, 1);
Ok(())
}
}
Loading

0 comments on commit 6622306

Please sign in to comment.