Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume Run Functionality #273

Merged
merged 13 commits into from
Dec 9, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions nexus-writer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ metrics.workspace = true
metrics-exporter-prometheus.workspace = true
ndarray.workspace = true
rdkafka.workspace = true
serde.workspace = true
serde_json.workspace = true
supermusr-common.workspace = true
supermusr-streaming-types.workspace = true
tokio.workspace = true
Expand Down
1 change: 1 addition & 0 deletions nexus-writer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ async fn main() -> anyhow::Result<()> {
nexus_settings,
nexus_configuration,
);
nexus_engine.detect_partial_run()?;

let mut nexus_write_interval =
tokio::time::interval(time::Duration::from_millis(args.cache_poll_interval_ms));
Expand Down
42 changes: 42 additions & 0 deletions nexus-writer/src/nexus/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,48 @@ impl NexusEngine {
}
}

fn get_files_in_dir(dir_path: &Path, ext: &str) -> anyhow::Result<Vec<String>> {
Modularius marked this conversation as resolved.
Show resolved Hide resolved
let vec = std::fs::read_dir(dir_path)?
.flatten()
.flat_map(|entry| {
if match entry.file_type() {
Ok(file_type) => file_type.is_file(),
Err(e) => return Some(Err(e)),
} {
let path = entry.path();
if path.extension().is_some_and(|path_ext| path_ext == ext) {
path.file_stem()
.and_then(|stem| stem.to_os_string().into_string().ok())
.map(Ok)
} else {
None
}
} else {
None
}
})
.collect::<std::io::Result<Vec<_>>>()?;
Ok(vec)
}

pub(crate) fn detect_partial_run(&mut self) -> anyhow::Result<()> {
if let Some(local_path) = &self.filename {
for filename in Self::get_files_in_dir(local_path, "partial_run")? {
if let Some(mut partial_run) =
RunParameters::detect_partial_run(local_path, &filename)?
{
partial_run.update_last_modified();
let mut run = Run::resume_partial_run(self.filename.as_deref(), partial_run)?;
if let Err(e) = run.span_init() {
warn!("Run span initiation failed {e}")
}
self.run_cache.push_back(run);
}
}
}
Ok(())
}

#[cfg(test)]
fn cache_iter(&self) -> vec_deque::Iter<'_, Run> {
self.run_cache.iter()
Expand Down
11 changes: 3 additions & 8 deletions nexus-writer/src/nexus/hdf5_file/run_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,12 @@ pub(crate) struct RunFile {
impl RunFile {
#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn new_runfile(
filename: &Path,
path: &Path,
run_name: &str,
nexus_settings: &NexusSettings,
) -> anyhow::Result<Self> {
create_dir_all(filename)?;
let filename = {
let mut filename = filename.to_owned();
filename.push(run_name);
filename.set_extension("nxs");
filename
};
create_dir_all(path)?;
let filename = RunParameters::get_hdf5_path_buf(path, run_name);
debug!("File save begin. File: {0}.", filename.display());

let file = File::create(filename)?;
Expand Down
35 changes: 23 additions & 12 deletions nexus-writer/src/nexus/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ impl Run {
let mut hdf5 = RunFile::new_runfile(filename, &parameters.run_name, nexus_settings)?;
hdf5.init(&parameters, nexus_configuration)?;
hdf5.close()?;

parameters.save_partial_run(filename)?;
}
Ok(Self {
span: Default::default(),
parameters,
num_frames: usize::default(),
})
}

pub(crate) fn resume_partial_run(
filename: Option<&Path>,
parameters: RunParameters,
) -> anyhow::Result<Self> {
if let Some(filename) = filename {
parameters.save_partial_run(filename)?;
}
Ok(Self {
span: Default::default(),
Expand All @@ -46,18 +62,10 @@ impl Run {
archive_name: &Path,
) -> io::Result<impl Future<Output = ()>> {
create_dir_all(archive_name)?;
let from_path = {
let mut filename = file_name.to_owned();
filename.push(&self.parameters.run_name);
filename.set_extension("nxs");
filename
};
let to_path = {
let mut filename = archive_name.to_owned();
filename.push(&self.parameters.run_name);
filename.set_extension("nxs");
filename
};
let from_path = RunParameters::get_hdf5_path_buf(file_name, &self.parameters.run_name);
let to_path = RunParameters::get_hdf5_path_buf(archive_name, &self.parameters.run_name);
let partial_run_path =
RunParameters::get_partial_path_buf(file_name, &self.parameters.run_name);
let span = tracing::Span::current();
let future = async move {
info_span!(parent: &span, "move-async").in_scope(|| {
Expand All @@ -68,6 +76,9 @@ impl Run {
if let Err(e) = std::fs::remove_file(from_path) {
warn!("Error removing temporary file: {e}");
}
if let Err(e) = std::fs::remove_file(partial_run_path) {
warn!("Error removing partial_run file: {e}");
}
});
};
Ok(future)
Expand Down
43 changes: 41 additions & 2 deletions nexus-writer/src/nexus/run_parameters.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use std::{
fs::File,
path::{Path, PathBuf},
};

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use supermusr_streaming_types::{
ecs_6s4t_run_stop_generated::RunStop, ecs_pl72_run_start_generated::RunStart,
};

#[derive(Default, Debug)]
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub(crate) struct RunStopParameters {
pub(crate) collect_until: DateTime<Utc>,
pub(crate) last_modified: DateTime<Utc>,
}

#[derive(Debug)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct RunParameters {
pub(crate) collect_from: DateTime<Utc>,
pub(crate) run_stop_parameters: Option<RunStopParameters>,
Expand Down Expand Up @@ -86,4 +92,37 @@ impl RunParameters {
params.last_modified = Utc::now();
}
}

pub(crate) fn get_hdf5_path_buf(path: &Path, run_name: &str) -> PathBuf {
Modularius marked this conversation as resolved.
Show resolved Hide resolved
let mut path = path.to_owned();
path.push(run_name);
path.set_extension("nxs");
path
}

pub(crate) fn get_partial_path_buf(path: &Path, run_name: &str) -> PathBuf {
let mut path = path.to_owned();
path.push(run_name);
path.set_extension("partial_run");
path
}

pub(crate) fn save_partial_run(&self, path: &Path) -> anyhow::Result<()> {
let path_buf = Self::get_partial_path_buf(path, &self.run_name);
let file = File::create(path_buf.as_path())?;
serde_json::to_writer(file, &self)?;
Ok(())
}

pub(crate) fn detect_partial_run(path: &Path, filename: &str) -> anyhow::Result<Option<Self>> {
let path_buf = Self::get_partial_path_buf(path, filename);
if path_buf.as_path().exists() {
let file = File::open(path_buf.as_path())?;
let run_parameters: RunParameters = serde_json::from_reader(file)?;
std::fs::remove_file(path_buf.as_path())?;
Ok(Some(run_parameters))
} else {
Ok(None)
}
}
}