From 966df3e11d7e57a0119d3cfa0517f9f16d7e512b Mon Sep 17 00:00:00 2001 From: Modularius Date: Thu, 21 Nov 2024 11:53:19 +0000 Subject: [PATCH 01/11] Added run_parameters changes --- Cargo.lock | 2 ++ nexus-writer/Cargo.toml | 2 ++ nexus-writer/src/nexus/run_parameters.rs | 46 ++++++++++++++++++++++-- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b16e29c1..67c83d43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1740,6 +1740,8 @@ dependencies = [ "metrics-exporter-prometheus", "ndarray", "rdkafka", + "serde", + "serde_json", "supermusr-common", "supermusr-streaming-types", "tokio", diff --git a/nexus-writer/Cargo.toml b/nexus-writer/Cargo.toml index 6094a961..ee4f42a2 100644 --- a/nexus-writer/Cargo.toml +++ b/nexus-writer/Cargo.toml @@ -13,6 +13,8 @@ metrics.workspace = true metrics-exporter-prometheus.workspace = true ndarray.workspace = true rdkafka.workspace = true +serde.workspace = true +serde_json.workspace = true supermusr-common.workspace = true supermusr-streaming-types.workspace = true tokio.workspace = true diff --git a/nexus-writer/src/nexus/run_parameters.rs b/nexus-writer/src/nexus/run_parameters.rs index 0de89cf9..d025858b 100644 --- a/nexus-writer/src/nexus/run_parameters.rs +++ b/nexus-writer/src/nexus/run_parameters.rs @@ -1,15 +1,18 @@ +use std::{ffi::OsStr, fs::File, path::{Path, PathBuf}}; + use chrono::{DateTime, Utc}; +use serde::{Serialize, Deserialize}; use supermusr_streaming_types::{ ecs_6s4t_run_stop_generated::RunStop, ecs_pl72_run_start_generated::RunStart, }; -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub(crate) struct RunStopParameters { pub(crate) collect_until: DateTime, pub(crate) last_modified: DateTime, } -#[derive(Debug)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct RunParameters { pub(crate) collect_from: DateTime, pub(crate) run_stop_parameters: Option, @@ -86,4 +89,43 @@ impl RunParameters { params.last_modified = Utc::now(); } } + + pub(crate) fn get_hdf5_path_buf( + path: &Path, + run_name: &str, + ) -> PathBuf { + let mut path = path.to_owned(); + path.push(run_name); + path.set_extension("nxs"); + path + } + + fn get_partial_path_buf( + path: &Path, + run_name: &str, + ) -> PathBuf { + let mut path = path.to_owned(); + path.push(run_name); + path.set_extension("partial_run"); + path + } + + pub(crate) fn save_partial_run(&self, path: &Path) -> anyhow::Result<()> { + let path_buf = Self::get_partial_path_buf(path, &self.run_name); + let mut file = File::create(path_buf.as_path())?; + serde_json::to_writer(file, &self)?; + Ok(()) + } + + pub(crate) fn detect_partial_run(path: &Path, filename: &str) -> anyhow::Result> { + let path_buf = Self::get_partial_path_buf(path, filename); + if path_buf.as_path().exists() { + let file = File::open(path_buf.as_path())?; + let run_parameters : RunParameters = serde_json::from_reader(file)?; + std::fs::remove_file(path_buf.as_path())?; + Ok(Some(run_parameters)) + } else { + Ok(None) + } + } } From af2b0e5b7a64909dd88ede8a1983e3085b3c5b27 Mon Sep 17 00:00:00 2001 From: Modularius Date: Thu, 21 Nov 2024 11:55:48 +0000 Subject: [PATCH 02/11] Condensing repeated code --- nexus-writer/src/nexus/hdf5_file/run_file.rs | 11 +++-------- nexus-writer/src/nexus/run.rs | 14 ++------------ 2 files changed, 5 insertions(+), 20 deletions(-) diff --git a/nexus-writer/src/nexus/hdf5_file/run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file.rs index 27c231d2..4d087959 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file.rs @@ -49,17 +49,12 @@ pub(crate) struct RunFile { impl RunFile { #[tracing::instrument(skip_all, err(level = "warn"))] pub(crate) fn new_runfile( - filename: &Path, + path: &Path, run_name: &str, nexus_settings: &NexusSettings, ) -> anyhow::Result { - create_dir_all(filename)?; - let filename = { - let mut filename = filename.to_owned(); - filename.push(run_name); - filename.set_extension("nxs"); - filename - }; + create_dir_all(path)?; + let filename = RunParameters::get_hdf5_path_buf(path, run_name); debug!("File save begin. File: {0}.", filename.display()); let file = File::create(filename)?; diff --git a/nexus-writer/src/nexus/run.rs b/nexus-writer/src/nexus/run.rs index 050f906c..98982f5e 100644 --- a/nexus-writer/src/nexus/run.rs +++ b/nexus-writer/src/nexus/run.rs @@ -46,18 +46,8 @@ impl Run { archive_name: &Path, ) -> io::Result> { create_dir_all(archive_name)?; - let from_path = { - let mut filename = file_name.to_owned(); - filename.push(&self.parameters.run_name); - filename.set_extension("nxs"); - filename - }; - let to_path = { - let mut filename = archive_name.to_owned(); - filename.push(&self.parameters.run_name); - filename.set_extension("nxs"); - filename - }; + let from_path = RunParameters::get_hdf5_path_buf(file_name, &self.parameters.run_name); + let to_path = RunParameters::get_hdf5_path_buf(archive_name, &self.parameters.run_name); let span = tracing::Span::current(); let future = async move { info_span!(parent: &span, "move-async").in_scope(|| { From db1a3586b1e7a108baeea9775796126575bde9fa Mon Sep 17 00:00:00 2001 From: Modularius Date: Thu, 21 Nov 2024 11:56:07 +0000 Subject: [PATCH 03/11] Formatting and implementation --- nexus-writer/src/main.rs | 1 + nexus-writer/src/nexus/engine.rs | 45 ++++++++++++++++++++++++ nexus-writer/src/nexus/run_parameters.rs | 24 ++++++------- 3 files changed, 57 insertions(+), 13 deletions(-) diff --git a/nexus-writer/src/main.rs b/nexus-writer/src/main.rs index 6bf3b081..ac42b16c 100644 --- a/nexus-writer/src/main.rs +++ b/nexus-writer/src/main.rs @@ -162,6 +162,7 @@ async fn main() -> anyhow::Result<()> { nexus_settings, nexus_configuration, ); + nexus_engine.detect_partial_run()?; let mut nexus_write_interval = tokio::time::interval(time::Duration::from_millis(args.cache_poll_interval_ms)); diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index 095bcc43..3c9a7c39 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -41,6 +41,51 @@ impl NexusEngine { } } + fn get_files_in_dir(dir_path: &Path, ext: &str) -> anyhow::Result> { + let vec = std::fs::read_dir(dir_path)? + .flatten() + .flat_map(|entry| { + if match entry.file_type() { + Ok(file_type) => file_type.is_file(), + Err(e) => return Some(Err(e)), + } { + let path = entry.path(); + if path.extension().is_some_and(|path_ext| path_ext == ext) { + path.file_stem() + .and_then(|stem| stem.to_os_string().into_string().ok()) + .map(|stem| Ok(stem)) + } else { + None + } + } else { + None + } + }) + .collect::>>()?; + Ok(vec) + } + + pub(crate) fn detect_partial_run(&mut self) -> anyhow::Result<()> { + if let Some(local_path) = &self.filename { + for filename in Self::get_files_in_dir(local_path, "partial_run")? { + if let Some(partial_run) = RunParameters::detect_partial_run(local_path, &filename)? + { + let mut run = Run::new_run( + self.filename.as_deref(), + partial_run, + &self.nexus_settings, + &self.nexus_configuration, + )?; + if let Err(e) = run.span_init() { + warn!("Run span initiation failed {e}") + } + self.run_cache.push_back(run); + } + } + } + Ok(()) + } + #[cfg(test)] fn cache_iter(&self) -> vec_deque::Iter<'_, Run> { self.run_cache.iter() diff --git a/nexus-writer/src/nexus/run_parameters.rs b/nexus-writer/src/nexus/run_parameters.rs index d025858b..a83397bf 100644 --- a/nexus-writer/src/nexus/run_parameters.rs +++ b/nexus-writer/src/nexus/run_parameters.rs @@ -1,7 +1,11 @@ -use std::{ffi::OsStr, fs::File, path::{Path, PathBuf}}; +use std::{ + ffi::OsStr, + fs::File, + path::{Path, PathBuf}, +}; use chrono::{DateTime, Utc}; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use supermusr_streaming_types::{ ecs_6s4t_run_stop_generated::RunStop, ecs_pl72_run_start_generated::RunStart, }; @@ -89,21 +93,15 @@ impl RunParameters { params.last_modified = Utc::now(); } } - - pub(crate) fn get_hdf5_path_buf( - path: &Path, - run_name: &str, - ) -> PathBuf { + + pub(crate) fn get_hdf5_path_buf(path: &Path, run_name: &str) -> PathBuf { let mut path = path.to_owned(); path.push(run_name); path.set_extension("nxs"); path } - - fn get_partial_path_buf( - path: &Path, - run_name: &str, - ) -> PathBuf { + + fn get_partial_path_buf(path: &Path, run_name: &str) -> PathBuf { let mut path = path.to_owned(); path.push(run_name); path.set_extension("partial_run"); @@ -121,7 +119,7 @@ impl RunParameters { let path_buf = Self::get_partial_path_buf(path, filename); if path_buf.as_path().exists() { let file = File::open(path_buf.as_path())?; - let run_parameters : RunParameters = serde_json::from_reader(file)?; + let run_parameters: RunParameters = serde_json::from_reader(file)?; std::fs::remove_file(path_buf.as_path())?; Ok(Some(run_parameters)) } else { From e548bfa56b6faca7037331ffc4bee9d994c46d55 Mon Sep 17 00:00:00 2001 From: Modularius Date: Thu, 21 Nov 2024 13:29:57 +0000 Subject: [PATCH 04/11] Tidying --- nexus-writer/src/nexus/run_parameters.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nexus-writer/src/nexus/run_parameters.rs b/nexus-writer/src/nexus/run_parameters.rs index a83397bf..ec1a2553 100644 --- a/nexus-writer/src/nexus/run_parameters.rs +++ b/nexus-writer/src/nexus/run_parameters.rs @@ -1,5 +1,4 @@ use std::{ - ffi::OsStr, fs::File, path::{Path, PathBuf}, }; @@ -101,7 +100,7 @@ impl RunParameters { path } - fn get_partial_path_buf(path: &Path, run_name: &str) -> PathBuf { + pub(crate) fn get_partial_path_buf(path: &Path, run_name: &str) -> PathBuf { let mut path = path.to_owned(); path.push(run_name); path.set_extension("partial_run"); @@ -110,7 +109,7 @@ impl RunParameters { pub(crate) fn save_partial_run(&self, path: &Path) -> anyhow::Result<()> { let path_buf = Self::get_partial_path_buf(path, &self.run_name); - let mut file = File::create(path_buf.as_path())?; + let file = File::create(path_buf.as_path())?; serde_json::to_writer(file, &self)?; Ok(()) } From 50550c3c90e29509581035260f9df6ae031018ff Mon Sep 17 00:00:00 2001 From: Modularius Date: Thu, 21 Nov 2024 13:30:35 +0000 Subject: [PATCH 05/11] Resume partial run --- nexus-writer/src/nexus/engine.rs | 9 ++++----- nexus-writer/src/nexus/run.rs | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index 3c9a7c39..bff95cf2 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -68,13 +68,12 @@ impl NexusEngine { pub(crate) fn detect_partial_run(&mut self) -> anyhow::Result<()> { if let Some(local_path) = &self.filename { for filename in Self::get_files_in_dir(local_path, "partial_run")? { - if let Some(partial_run) = RunParameters::detect_partial_run(local_path, &filename)? + if let Some(mut partial_run) = RunParameters::detect_partial_run(local_path, &filename)? { - let mut run = Run::new_run( + partial_run.update_last_modified(); + let mut run = Run::resume_partial_run( self.filename.as_deref(), - partial_run, - &self.nexus_settings, - &self.nexus_configuration, + partial_run )?; if let Err(e) = run.span_init() { warn!("Run span initiation failed {e}") diff --git a/nexus-writer/src/nexus/run.rs b/nexus-writer/src/nexus/run.rs index 98982f5e..f6510282 100644 --- a/nexus-writer/src/nexus/run.rs +++ b/nexus-writer/src/nexus/run.rs @@ -27,6 +27,22 @@ impl Run { let mut hdf5 = RunFile::new_runfile(filename, ¶meters.run_name, nexus_settings)?; hdf5.init(¶meters, nexus_configuration)?; hdf5.close()?; + + parameters.save_partial_run(filename)?; + } + Ok(Self { + span: Default::default(), + parameters, + num_frames: usize::default(), + }) + } + + pub(crate) fn resume_partial_run( + filename: Option<&Path>, + parameters: RunParameters + ) -> anyhow::Result { + if let Some(filename) = filename { + parameters.save_partial_run(filename)?; } Ok(Self { span: Default::default(), @@ -48,6 +64,7 @@ impl Run { create_dir_all(archive_name)?; let from_path = RunParameters::get_hdf5_path_buf(file_name, &self.parameters.run_name); let to_path = RunParameters::get_hdf5_path_buf(archive_name, &self.parameters.run_name); + let partial_run_path = RunParameters::get_partial_path_buf(file_name, &self.parameters.run_name); let span = tracing::Span::current(); let future = async move { info_span!(parent: &span, "move-async").in_scope(|| { @@ -58,6 +75,9 @@ impl Run { if let Err(e) = std::fs::remove_file(from_path) { warn!("Error removing temporary file: {e}"); } + if let Err(e) = std::fs::remove_file(partial_run_path) { + warn!("Error removing partial_run file: {e}"); + } }); }; Ok(future) From c90fa03b8d4bfee747248501e2c807fc7d407fb4 Mon Sep 17 00:00:00 2001 From: Modularius Date: Thu, 21 Nov 2024 14:15:23 +0000 Subject: [PATCH 06/11] Formatting --- nexus-writer/src/nexus/engine.rs | 10 ++++------ nexus-writer/src/nexus/run.rs | 5 +++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index bff95cf2..a4e1b47b 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -53,7 +53,7 @@ impl NexusEngine { if path.extension().is_some_and(|path_ext| path_ext == ext) { path.file_stem() .and_then(|stem| stem.to_os_string().into_string().ok()) - .map(|stem| Ok(stem)) + .map(Ok) } else { None } @@ -68,13 +68,11 @@ impl NexusEngine { pub(crate) fn detect_partial_run(&mut self) -> anyhow::Result<()> { if let Some(local_path) = &self.filename { for filename in Self::get_files_in_dir(local_path, "partial_run")? { - if let Some(mut partial_run) = RunParameters::detect_partial_run(local_path, &filename)? + if let Some(mut partial_run) = + RunParameters::detect_partial_run(local_path, &filename)? { partial_run.update_last_modified(); - let mut run = Run::resume_partial_run( - self.filename.as_deref(), - partial_run - )?; + let mut run = Run::resume_partial_run(self.filename.as_deref(), partial_run)?; if let Err(e) = run.span_init() { warn!("Run span initiation failed {e}") } diff --git a/nexus-writer/src/nexus/run.rs b/nexus-writer/src/nexus/run.rs index f6510282..ce772ce6 100644 --- a/nexus-writer/src/nexus/run.rs +++ b/nexus-writer/src/nexus/run.rs @@ -39,7 +39,7 @@ impl Run { pub(crate) fn resume_partial_run( filename: Option<&Path>, - parameters: RunParameters + parameters: RunParameters, ) -> anyhow::Result { if let Some(filename) = filename { parameters.save_partial_run(filename)?; @@ -64,7 +64,8 @@ impl Run { create_dir_all(archive_name)?; let from_path = RunParameters::get_hdf5_path_buf(file_name, &self.parameters.run_name); let to_path = RunParameters::get_hdf5_path_buf(archive_name, &self.parameters.run_name); - let partial_run_path = RunParameters::get_partial_path_buf(file_name, &self.parameters.run_name); + let partial_run_path = + RunParameters::get_partial_path_buf(file_name, &self.parameters.run_name); let span = tracing::Span::current(); let future = async move { info_span!(parent: &span, "move-async").in_scope(|| { From c0021bd9c402989225ebdfa7e94d81345d88278d Mon Sep 17 00:00:00 2001 From: Modularius Date: Wed, 27 Nov 2024 21:10:48 +0000 Subject: [PATCH 07/11] Begin encapsulate partial_run into nexus file --- nexus-writer/src/nexus/engine.rs | 2 +- nexus-writer/src/nexus/hdf5_file/run_file.rs | 108 ++++++++++++------- nexus-writer/src/nexus/run.rs | 2 +- nexus-writer/src/nexus/run_parameters.rs | 3 + 4 files changed, 72 insertions(+), 43 deletions(-) diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index ba4ebd5f..6a7871ef 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -67,7 +67,7 @@ impl NexusEngine { pub(crate) fn detect_partial_run(&mut self) -> anyhow::Result<()> { if let Some(local_path) = &self.filename { - for filename in Self::get_files_in_dir(local_path, "partial_run")? { + for filename in Self::get_files_in_dir(local_path, "nxs")? { if let Some(mut partial_run) = RunParameters::detect_partial_run(local_path, &filename)? { diff --git a/nexus-writer/src/nexus/hdf5_file/run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file.rs index 4d087959..93aab273 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file.rs @@ -4,7 +4,9 @@ use super::{ }; use crate::nexus::{ hdf5_file::run_file_components::{RunLog, SeLog}, - nexus_class as NX, NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT, + nexus_class as NX, + run_parameters::RunStopParameters, + NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT, }; use chrono::{DateTime, Duration, Utc}; use hdf5::{types::VarLenUnicode, Dataset, File}; @@ -248,44 +250,44 @@ impl RunFile { set_string_to(&self.end_time, &end_time)?; Ok(()) } - - #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] - pub(crate) fn ensure_end_time_is_set( - &mut self, - parameters: &RunParameters, - message: &FrameAssembledEventListMessage, - ) -> anyhow::Result<()> { - let end_time = { - if let Some(run_stop_parameters) = ¶meters.run_stop_parameters { - run_stop_parameters.collect_until - } else { - let time = message - .time() - .ok_or(anyhow::anyhow!("Event time missing."))?; - - let ms = if time.is_empty() { - 0 - } else { - time.get(time.len() - 1).div_ceil(1_000_000).into() - }; - - let duration = Duration::try_milliseconds(ms) - .ok_or(anyhow::anyhow!("Invalid duration {ms}ms."))?; - - let timestamp: DateTime = (*message - .metadata() - .timestamp() - .ok_or(anyhow::anyhow!("Message timestamp missing."))?) - .try_into()?; - - timestamp - .checked_add_signed(duration) - .ok_or(anyhow::anyhow!("Unable to add {duration} to {timestamp}"))? - } - }; - self.set_end_time(&end_time) - } - + /* + #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] + pub(crate) fn ensure_end_time_is_set( + &mut self, + parameters: &RunParameters, + message: &FrameAssembledEventListMessage, + ) -> anyhow::Result<()> { + let end_time = { + if let Some(run_stop_parameters) = ¶meters.run_stop_parameters { + run_stop_parameters.collect_until + } else { + let time = message + .time() + .ok_or(anyhow::anyhow!("Event time missing."))?; + + let ms = if time.is_empty() { + 0 + } else { + time.get(time.len() - 1).div_ceil(1_000_000).into() + }; + + let duration = Duration::try_milliseconds(ms) + .ok_or(anyhow::anyhow!("Invalid duration {ms}ms."))?; + + let timestamp: DateTime = (*message + .metadata() + .timestamp() + .ok_or(anyhow::anyhow!("Message timestamp missing."))?) + .try_into()?; + + timestamp + .checked_add_signed(duration) + .ok_or(anyhow::anyhow!("Unable to add {duration} to {timestamp}"))? + } + }; + self.set_end_time(&end_time) + } + */ #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn push_logdata_to_runfile( &mut self, @@ -313,13 +315,37 @@ impl RunFile { #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn push_message_to_runfile( &mut self, - parameters: &RunParameters, + //parameters: &RunParameters, message: &FrameAssembledEventListMessage, ) -> anyhow::Result<()> { self.lists.push_message_to_event_runfile(message)?; - self.ensure_end_time_is_set(parameters, message)?; + //self.ensure_end_time_is_set(parameters, message)?; Ok(()) } + #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] + pub(crate) fn extract_run_parameters(&self) -> anyhow::Result { + let collect_from: DateTime = + String::from(self.start_time.read_scalar::()?).parse()?; + let run_name = self.name.read_scalar::()?.into(); + let run_number = self.run_number.read_scalar::()?; + let num_periods = self.period_number.read_scalar::()?; + let instrument_name = self.instrument_name.read_scalar::()?.into(); + let run_stop_parameters = String::from(self.end_time.read_scalar::()?) + .parse() + .ok() + .map(|collect_until| RunStopParameters { + collect_until, + last_modified: Utc::now(), + }); + Ok(RunParameters { + collect_from, + run_stop_parameters, + num_periods, + run_name, + run_number, + instrument_name, + }) + } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn close(self) -> anyhow::Result<()> { diff --git a/nexus-writer/src/nexus/run.rs b/nexus-writer/src/nexus/run.rs index ce772ce6..8be4343e 100644 --- a/nexus-writer/src/nexus/run.rs +++ b/nexus-writer/src/nexus/run.rs @@ -142,7 +142,7 @@ impl Run { ) -> anyhow::Result<()> { if let Some(filename) = filename { let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; - hdf5.push_message_to_runfile(&self.parameters, message)?; + hdf5.push_message_to_runfile(message)?; //&self.parameters, hdf5.close()?; } diff --git a/nexus-writer/src/nexus/run_parameters.rs b/nexus-writer/src/nexus/run_parameters.rs index ec1a2553..8fc351cc 100644 --- a/nexus-writer/src/nexus/run_parameters.rs +++ b/nexus-writer/src/nexus/run_parameters.rs @@ -9,6 +9,8 @@ use supermusr_streaming_types::{ ecs_6s4t_run_stop_generated::RunStop, ecs_pl72_run_start_generated::RunStart, }; +use super::hdf5_file::RunFile; + #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub(crate) struct RunStopParameters { pub(crate) collect_until: DateTime, @@ -115,6 +117,7 @@ impl RunParameters { } pub(crate) fn detect_partial_run(path: &Path, filename: &str) -> anyhow::Result> { + let run_file = RunFile::open_runfile(path, filename)?; let path_buf = Self::get_partial_path_buf(path, filename); if path_buf.as_path().exists() { let file = File::open(path_buf.as_path())?; From 78ad764374f732c7564ac7037c076b7945801944 Mon Sep 17 00:00:00 2001 From: Modularius Date: Wed, 27 Nov 2024 21:54:19 +0000 Subject: [PATCH 08/11] Finished modifications --- Cargo.lock | 2 - nexus-writer/Cargo.toml | 2 - nexus-writer/Test1.nxs | Bin 0 -> 13504 bytes nexus-writer/TestRun1.nxs | Bin 0 -> 13504 bytes nexus-writer/src/main.rs | 4 +- nexus-writer/src/nexus/engine.rs | 62 +++++++---------- nexus-writer/src/nexus/hdf5_file/run_file.rs | 3 +- nexus-writer/src/nexus/run.rs | 66 ++++++++----------- nexus-writer/src/nexus/run_parameters.rs | 34 +++++----- 9 files changed, 71 insertions(+), 102 deletions(-) create mode 100644 nexus-writer/Test1.nxs create mode 100644 nexus-writer/TestRun1.nxs diff --git a/Cargo.lock b/Cargo.lock index 67c83d43..b16e29c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1740,8 +1740,6 @@ dependencies = [ "metrics-exporter-prometheus", "ndarray", "rdkafka", - "serde", - "serde_json", "supermusr-common", "supermusr-streaming-types", "tokio", diff --git a/nexus-writer/Cargo.toml b/nexus-writer/Cargo.toml index ee4f42a2..6094a961 100644 --- a/nexus-writer/Cargo.toml +++ b/nexus-writer/Cargo.toml @@ -13,8 +13,6 @@ metrics.workspace = true metrics-exporter-prometheus.workspace = true ndarray.workspace = true rdkafka.workspace = true -serde.workspace = true -serde_json.workspace = true supermusr-common.workspace = true supermusr-streaming-types.workspace = true tokio.workspace = true diff --git a/nexus-writer/Test1.nxs b/nexus-writer/Test1.nxs new file mode 100644 index 0000000000000000000000000000000000000000..d6a58f225b2c96652dc3d337495d5b6c95274714 GIT binary patch literal 13504 zcmeHN%}(1u5FQ681_7c{4}t!LwyJ0=C`p4#Ikn=asZ;@KspMNo8v~ZWL5^wZt;h7n zu}6*^^9UR{@(6td-k{yxnTcboj;R6?klll4JUbrmex4bx*YmS5zc|=G+7GgsCUnEF zV)^ig+EE@Qe9rxA#27?tL=RKQ7|;u{FHQ1cvY+Wf!699^`e9)KgeKc(N|!YZm<@qu zIthWo!kZO2UyCNq{&Y-!##nfzZ^d%SFBP*WVZVIM3jaxOlD|g%dH@Q_k^y~i3rtwD zieI)$)he4GF${I3j3yiddM=NrS1OV-OIcuVP}vh48TlaK`V zhH!kgwOeg6m}F0yV|y}G1AG0--nz487v0jftrU~T*VkmPJIC$*qxK*1Aon8?QO2=L>;_9-ZnGf788wg|p(o|3R7$Cu^rxiD;6EU|qoUNs z;eeS-P0_Eq8-7(7X+2!t&p)``T5pHC55`*S6{!2qd?-M zdaUuMsA#q}qkFF@0$e9y(fs%;!r}lRIhP1=ycF8RgV!_BIBs`6qv{99nPsTyyc&K` z(S4%ylW@O2I~xL+S0T3k>}*!F)^$Z7npgS#TBq+eo;T;y6&9QmeK)_@R3!>fw$~lk z@u>8GfsD!RMdpKT8ZVDHH~Dp`G1bjsrBtUIc!>X$sf7!-KgZn^0U zfZr3o<8R5D(r^_Q09I{@`>QIgDsE;8z4|%L`=ZSxt*7=xEuC!qyzxvQI|YGg-1B&f2GpuBSF>pNyZgre{OoFN8#c_y7O^ literal 0 HcmV?d00001 diff --git a/nexus-writer/TestRun1.nxs b/nexus-writer/TestRun1.nxs new file mode 100644 index 0000000000000000000000000000000000000000..47b3a5581b05de512db005eca541ae842a6357bd GIT binary patch literal 13504 zcmeHN%}(1u5FQ681_7c{4?zDyTUE3bl!TxHr&jzll`24#O1_1(F<=QC4Uv41>?b==a6}ibuPrWu&}92e>9U#uvnJ3? zJ0Xx=e6uR&Ytf|HpH9fn7z;1|HCM>{`CJMm?3b@u;XmnZ^4G9m_d!-!GN1?UfCKDXZJ-SN23$PUpVZUdbuCQZ>%rEvZEIBqViv z12{h0Dwi4z#@UnR*q#j4y1iayZ`0Yab8dduR*LcC>ua#r8D-CR&TVhB!Csf(4e0o1 zo~bCH0k);SXqaabk$J`%LA@*zIDLXlh0Zm(*$8l+*`PqJ(sa!UWTO16I)?r1XHj~; z9tQQuJaC>llffa{Ao*=Q?pn_?Do3FC+VSoD%6m0cdhOG+#akPj$ZKA)=mV!&wvn1h zWhSOTL|w4a@I}7Bwr-dFwX&O3XKimRELnid#K@$@gr} zDHcc!X9LV>su9o#XaqC@8Uc-fMnEH=5zq)|1T+E~f$NXJhvk(8L+qUK_wfmR!1F#p zct7I$&-ij?U_h7Jf5d~_k3>Wn#}2U@EP1)jf)FRwK)QsUlqV7irE1cjk`9Cafbfor zQWu9qX09|vzv`^{Rd%@faCtxf@NRRxJ?cIjZLXK4?$Za&^{#qvyk89||B2equltu$ z6VC&YuKO2l{MCT?S>vuEqwxqnm3TbbEAPbP(O&Vw{}uLLpx+#fBW?A+f_l5rqCt|V z{~eIV`)wnMJ?iHdgh=C|@dzRiJk$OW!~@rzCLEA>l71Mf#kG(PJ;jchF{Iy-K=lzZ z5#)&E*(5S-Z~G;lSE$~E%tG}ZT@XQLNPc`jLV?7GM6J*~!ufoI^bV55M34%}j~_=U zkT|X$YwS5Hn$6AV-W!Sl*GX73KmLlaH~>h_B|;o8*%tBO^-MU9TV2mI)+fnO(|I-c zpu+n^L(hW!`s{oNU|xmD`m^&{(ONeZfpA{s_iL5D+j!obPghxRLG<0iQbUy}K*8R0 zT*s#>RDeCNxaH+{Yo(u}nkwg(cR$%44#3`*byTwYD(4i$(b;rVSIN(Ney(2NX}g7n zGXQ>1_>R9LYf6JvTmV?LCGM}Pv`V;{CG@HnH1Eqc6Stn)7qxV<@$<$reQXy5!g0^* xsU!NXU>rEsIQ}ncJr(|rX>~oN{Ft0s)>_7|r`l_uGQ6JJrhPJg&YGSNfj`pgg*N~I literal 0 HcmV?d00001 diff --git a/nexus-writer/src/main.rs b/nexus-writer/src/main.rs index 82db7e00..42d1386d 100644 --- a/nexus-writer/src/main.rs +++ b/nexus-writer/src/main.rs @@ -163,11 +163,11 @@ async fn main() -> anyhow::Result<()> { let nexus_configuration = NexusConfiguration::new(args.configuration_options); let mut nexus_engine = NexusEngine::new( - Some(args.file_name.as_path()), + args.file_name.as_path(), nexus_settings, nexus_configuration, ); - nexus_engine.detect_partial_run()?; + nexus_engine.detect_partial_runs()?; let mut nexus_write_interval = tokio::time::interval(time::Duration::from_millis(args.cache_poll_interval_ms)); diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index 654ef967..9fec0a21 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -16,7 +16,7 @@ use supermusr_streaming_types::{ use tracing::warn; pub(crate) struct NexusEngine { - filename: Option, + local_path: PathBuf, run_cache: VecDeque, run_number: u32, nexus_settings: NexusSettings, @@ -27,12 +27,12 @@ pub(crate) struct NexusEngine { impl NexusEngine { #[tracing::instrument(skip_all)] pub(crate) fn new( - filename: Option<&Path>, + local_path: &Path, nexus_settings: NexusSettings, nexus_configuration: NexusConfiguration, ) -> Self { Self { - filename: filename.map(ToOwned::to_owned), + local_path: local_path.to_owned(), run_cache: Default::default(), run_number: 0, nexus_settings, @@ -65,20 +65,13 @@ impl NexusEngine { Ok(vec) } - pub(crate) fn detect_partial_run(&mut self) -> anyhow::Result<()> { - if let Some(local_path) = &self.filename { - for filename in Self::get_files_in_dir(local_path, "nxs")? { - if let Some(mut partial_run) = - RunParameters::detect_partial_run(local_path, &filename)? - { - partial_run.update_last_modified(); - let mut run = Run::resume_partial_run(self.filename.as_deref(), partial_run)?; - if let Err(e) = run.span_init() { - warn!("Run span initiation failed {e}") - } - self.run_cache.push_back(run); - } + pub(crate) fn detect_partial_runs(&mut self) -> anyhow::Result<()> { + for filename in Self::get_files_in_dir(&self.local_path, "nxs")? { + let mut run = Run::resume_partial_run(&self.local_path, &filename)?; + if let Err(e) = run.span_init() { + warn!("Run span initiation failed {e}") } + self.run_cache.push_back(run); } Ok(()) } @@ -103,7 +96,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_selogdata(self.filename.as_deref(), data, &self.nexus_settings)?; + run.push_selogdata(&self.local_path, data, &self.nexus_settings)?; Ok(Some(run)) } else { warn!("No run found for selogdata message with timestamp: {timestamp}"); @@ -119,7 +112,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_logdata_to_run(self.filename.as_deref(), data, &self.nexus_settings)?; + run.push_logdata_to_run(&self.local_path, data, &self.nexus_settings)?; Ok(Some(run)) } else { warn!("No run found for logdata message with timestamp: {timestamp}"); @@ -135,7 +128,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_alarm_to_run(self.filename.as_deref(), data)?; + run.push_alarm_to_run(&self.local_path, data)?; Ok(Some(run)) } else { warn!("No run found for alarm message with timestamp: {timestamp}"); @@ -152,7 +145,7 @@ impl NexusEngine { } let mut run = Run::new_run( - self.filename.as_deref(), + &self.local_path, RunParameters::new(data, self.run_number)?, &self.nexus_settings, &self.nexus_configuration, @@ -175,18 +168,14 @@ impl NexusEngine { self.run_cache .back_mut() .expect("run_cache::back_mut should exist") - .abort_run( - self.filename.as_deref(), - data.start_time(), - &self.nexus_settings, - )?; + .abort_run(&self.local_path, data.start_time(), &self.nexus_settings)?; Ok(()) } #[tracing::instrument(skip_all)] pub(crate) fn stop_command(&mut self, data: RunStop<'_>) -> anyhow::Result<&Run> { if let Some(last_run) = self.run_cache.back_mut() { - last_run.set_stop_if_valid(self.filename.as_deref(), data)?; + last_run.set_stop_if_valid(&self.local_path, data)?; Ok(last_run) } else { @@ -220,7 +209,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_message(self.filename.as_deref(), message)?; + run.push_message(&self.local_path, message)?; Some(run) } else { warn!("No run found for message with timestamp: {timestamp}"); @@ -253,12 +242,9 @@ impl NexusEngine { /// Afterwhich the runs are dropped. #[tracing::instrument(skip_all, level = "debug")] pub(crate) async fn flush_move_cache(&mut self) { - if let Some((file_name, archive_name)) = Option::zip( - self.filename.as_ref(), - self.nexus_settings.archive_path.as_ref(), - ) { + if let Some(archive_name) = self.nexus_settings.archive_path.as_ref() { for run in self.run_move_cache.iter() { - match run.move_to_archive(file_name, archive_name) { + match run.move_to_archive(&self.local_path, archive_name) { Ok(move_to_archive) => move_to_archive.await, Err(e) => warn!("Error Moving to Archive {e}"), } @@ -270,6 +256,8 @@ impl NexusEngine { #[cfg(test)] mod test { + use std::path::PathBuf; + use crate::nexus::{NexusConfiguration, NexusSettings}; use super::NexusEngine; @@ -349,7 +337,7 @@ mod test { #[test] fn empty_run() { let mut nexus = NexusEngine::new( - None, + &PathBuf::new(), NexusSettings::default(), NexusConfiguration::new(None), ); @@ -397,7 +385,7 @@ mod test { #[test] fn no_run_start() { let mut nexus = NexusEngine::new( - None, + &PathBuf::new(), NexusSettings::default(), NexusConfiguration::new(None), ); @@ -410,7 +398,7 @@ mod test { #[test] fn no_run_stop() { let mut nexus = NexusEngine::new( - None, + &PathBuf::new(), NexusSettings::default(), NexusConfiguration::new(None), ); @@ -429,7 +417,7 @@ mod test { #[test] fn frame_messages_correct() { let mut nexus = NexusEngine::new( - None, + &PathBuf::new(), NexusSettings::default(), NexusConfiguration::new(None), ); @@ -467,7 +455,7 @@ mod test { #[test] fn two_runs_flushed() { let mut nexus = NexusEngine::new( - None, + &PathBuf::new(), NexusSettings::default(), NexusConfiguration::new(None), ); diff --git a/nexus-writer/src/nexus/hdf5_file/run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file.rs index 4de9279c..4fb2c1e0 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file.rs @@ -8,7 +8,7 @@ use crate::nexus::{ run_parameters::RunStopParameters, NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT, }; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use hdf5::{types::VarLenUnicode, Dataset, File}; use std::{fs::create_dir_all, path::Path}; use supermusr_streaming_types::{ @@ -322,6 +322,7 @@ impl RunFile { //self.ensure_end_time_is_set(parameters, message)?; Ok(()) } + #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn extract_run_parameters(&self) -> anyhow::Result { let collect_from: DateTime = diff --git a/nexus-writer/src/nexus/run.rs b/nexus-writer/src/nexus/run.rs index ad9b09d2..ea7ce71a 100644 --- a/nexus-writer/src/nexus/run.rs +++ b/nexus-writer/src/nexus/run.rs @@ -12,42 +12,34 @@ use tracing::{info, info_span, warn, Span}; pub(crate) struct Run { span: SpanOnce, parameters: RunParameters, - num_frames: usize, } impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn new_run( - filename: Option<&Path>, + local_path: &Path, parameters: RunParameters, nexus_settings: &NexusSettings, nexus_configuration: &NexusConfiguration, ) -> anyhow::Result { - if let Some(filename) = filename { - let mut hdf5 = RunFile::new_runfile(filename, ¶meters.run_name, nexus_settings)?; + if local_path.exists() { + let mut hdf5 = RunFile::new_runfile(local_path, ¶meters.run_name, nexus_settings)?; hdf5.init(¶meters, nexus_configuration)?; hdf5.close()?; - - parameters.save_partial_run(filename)?; } + Ok(Self { span: Default::default(), parameters, - num_frames: usize::default(), }) } - pub(crate) fn resume_partial_run( - filename: Option<&Path>, - parameters: RunParameters, - ) -> anyhow::Result { - if let Some(filename) = filename { - parameters.save_partial_run(filename)?; - } + pub(crate) fn resume_partial_run(local_path: &Path, filename: &str) -> anyhow::Result { + let run = RunFile::open_runfile(local_path, filename)?; + let parameters = run.extract_run_parameters()?; Ok(Self { span: Default::default(), parameters, - num_frames: usize::default(), }) } @@ -62,10 +54,10 @@ impl Run { archive_name: &Path, ) -> io::Result> { create_dir_all(archive_name)?; + let from_path = RunParameters::get_hdf5_path_buf(file_name, &self.parameters.run_name); let to_path = RunParameters::get_hdf5_path_buf(archive_name, &self.parameters.run_name); - let partial_run_path = - RunParameters::get_partial_path_buf(file_name, &self.parameters.run_name); + let span = tracing::Span::current(); let future = async move { info_span!(parent: &span, "move-async").in_scope(|| { @@ -76,9 +68,6 @@ impl Run { if let Err(e) = std::fs::remove_file(from_path) { warn!("Error removing temporary file: {e}"); } - if let Err(e) = std::fs::remove_file(partial_run_path) { - warn!("Error removing partial_run file: {e}"); - } }); }; Ok(future) @@ -87,12 +76,12 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_logdata_to_run( &mut self, - filename: Option<&Path>, + local_path: &Path, logdata: &f144_LogData, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if local_path.exists() { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_logdata_to_runfile(logdata, nexus_settings)?; hdf5.close()?; } @@ -104,11 +93,11 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_alarm_to_run( &mut self, - filename: Option<&Path>, + local_path: &Path, alarm: Alarm, ) -> anyhow::Result<()> { - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if local_path.exists() { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_alarm_to_runfile(alarm)?; hdf5.close()?; } @@ -120,12 +109,12 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_selogdata( &mut self, - filename: Option<&Path>, + local_path: &Path, logdata: se00_SampleEnvironmentData, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if local_path.exists() { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_selogdata(logdata, nexus_settings)?; hdf5.close()?; } @@ -137,16 +126,15 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_message( &mut self, - filename: Option<&Path>, + local_path: &Path, message: &FrameAssembledEventListMessage, ) -> anyhow::Result<()> { - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if local_path.exists() { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_message_to_runfile(message)?; //&self.parameters, hdf5.close()?; } - self.num_frames += 1; self.parameters.update_last_modified(); Ok(()) } @@ -162,13 +150,13 @@ impl Run { pub(crate) fn set_stop_if_valid( &mut self, - filename: Option<&Path>, + local_path: &Path, data: RunStop<'_>, ) -> anyhow::Result<()> { self.parameters.set_stop_if_valid(data)?; - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if local_path.exists() { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.set_end_time( &self @@ -185,14 +173,14 @@ impl Run { pub(crate) fn abort_run( &mut self, - filename: Option<&Path>, + local_path: &Path, absolute_stop_time_ms: u64, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { self.parameters.set_aborted_run(absolute_stop_time_ms)?; - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if local_path.exists() { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; let collect_until = self .parameters diff --git a/nexus-writer/src/nexus/run_parameters.rs b/nexus-writer/src/nexus/run_parameters.rs index 471aa543..bc91a97c 100644 --- a/nexus-writer/src/nexus/run_parameters.rs +++ b/nexus-writer/src/nexus/run_parameters.rs @@ -1,23 +1,17 @@ -use std::{ - fs::File, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; use supermusr_streaming_types::{ ecs_6s4t_run_stop_generated::RunStop, ecs_pl72_run_start_generated::RunStart, }; -use super::hdf5_file::RunFile; - -#[derive(Default, Debug, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone)] pub(crate) struct RunStopParameters { pub(crate) collect_until: DateTime, pub(crate) last_modified: DateTime, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub(crate) struct RunParameters { pub(crate) collect_from: DateTime, pub(crate) run_stop_parameters: Option, @@ -118,21 +112,23 @@ impl RunParameters { path.set_extension("nxs"); path } - - pub(crate) fn get_partial_path_buf(path: &Path, run_name: &str) -> PathBuf { - let mut path = path.to_owned(); - path.push(run_name); - path.set_extension("partial_run"); - path - } - + /* + pub(crate) fn get_partial_path_buf(path: &Path, run_name: &str) -> PathBuf { + let mut path = path.to_owned(); + path.push(run_name); + path.set_extension("partial_run"); + path + } + */ + /* pub(crate) fn save_partial_run(&self, path: &Path) -> anyhow::Result<()> { let path_buf = Self::get_partial_path_buf(path, &self.run_name); let file = File::create(path_buf.as_path())?; serde_json::to_writer(file, &self)?; Ok(()) } - + */ + /* pub(crate) fn detect_partial_run(path: &Path, filename: &str) -> anyhow::Result> { let run_file = RunFile::open_runfile(path, filename)?; let path_buf = Self::get_partial_path_buf(path, filename); @@ -144,5 +140,5 @@ impl RunParameters { } else { Ok(None) } - } + } */ } From e007a6f8b429fa682754718fb216a4579b601730 Mon Sep 17 00:00:00 2001 From: Modularius Date: Thu, 28 Nov 2024 01:01:38 +0000 Subject: [PATCH 09/11] Restore optional local_path --- nexus-writer/src/main.rs | 6 +- nexus-writer/src/nexus/engine.rs | 60 +++++++++++--------- nexus-writer/src/nexus/hdf5_file/run_file.rs | 9 +-- nexus-writer/src/nexus/run.rs | 32 +++++------ 4 files changed, 55 insertions(+), 52 deletions(-) diff --git a/nexus-writer/src/main.rs b/nexus-writer/src/main.rs index 42d1386d..41cde6e7 100644 --- a/nexus-writer/src/main.rs +++ b/nexus-writer/src/main.rs @@ -163,11 +163,11 @@ async fn main() -> anyhow::Result<()> { let nexus_configuration = NexusConfiguration::new(args.configuration_options); let mut nexus_engine = NexusEngine::new( - args.file_name.as_path(), + Some(args.file_name.as_path()), nexus_settings, nexus_configuration, ); - nexus_engine.detect_partial_runs()?; + nexus_engine.resume_partial_runs()?; let mut nexus_write_interval = tokio::time::interval(time::Duration::from_millis(args.cache_poll_interval_ms)); @@ -390,7 +390,7 @@ fn process_run_stop_message(nexus_engine: &mut NexusEngine, payload: &[u8]) { } Err(e) => { let _guard = warn_span!( - "RunStop Error.", + "RunStop Error", run_name = data.run_name(), stop_time = data.stop_time(), ) diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index 9fec0a21..1ff9822f 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -13,10 +13,10 @@ use supermusr_streaming_types::{ ecs_f144_logdata_generated::f144_LogData, ecs_pl72_run_start_generated::RunStart, ecs_se00_data_generated::se00_SampleEnvironmentData, }; -use tracing::warn; +use tracing::{info_span, warn}; pub(crate) struct NexusEngine { - local_path: PathBuf, + local_path: Option, run_cache: VecDeque, run_number: u32, nexus_settings: NexusSettings, @@ -27,12 +27,12 @@ pub(crate) struct NexusEngine { impl NexusEngine { #[tracing::instrument(skip_all)] pub(crate) fn new( - local_path: &Path, + local_path: Option<&Path>, nexus_settings: NexusSettings, nexus_configuration: NexusConfiguration, ) -> Self { Self { - local_path: local_path.to_owned(), + local_path: local_path.map(ToOwned::to_owned), run_cache: Default::default(), run_number: 0, nexus_settings, @@ -65,13 +65,16 @@ impl NexusEngine { Ok(vec) } - pub(crate) fn detect_partial_runs(&mut self) -> anyhow::Result<()> { - for filename in Self::get_files_in_dir(&self.local_path, "nxs")? { - let mut run = Run::resume_partial_run(&self.local_path, &filename)?; - if let Err(e) = run.span_init() { - warn!("Run span initiation failed {e}") + pub(crate) fn resume_partial_runs(&mut self) -> anyhow::Result<()> { + if let Some(local_path) = &self.local_path { + for filename in Self::get_files_in_dir(local_path, "nxs")? { + let mut run = info_span!("Partial Run Found", path = filename) + .in_scope(|| Run::resume_partial_run(local_path, &filename))?; + if let Err(e) = run.span_init() { + warn!("Run span initiation failed {e}") + } + self.run_cache.push_back(run); } - self.run_cache.push_back(run); } Ok(()) } @@ -96,7 +99,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_selogdata(&self.local_path, data, &self.nexus_settings)?; + run.push_selogdata(self.local_path.as_deref(), data, &self.nexus_settings)?; Ok(Some(run)) } else { warn!("No run found for selogdata message with timestamp: {timestamp}"); @@ -112,7 +115,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_logdata_to_run(&self.local_path, data, &self.nexus_settings)?; + run.push_logdata_to_run(self.local_path.as_deref(), data, &self.nexus_settings)?; Ok(Some(run)) } else { warn!("No run found for logdata message with timestamp: {timestamp}"); @@ -128,7 +131,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_alarm_to_run(&self.local_path, data)?; + run.push_alarm_to_run(self.local_path.as_deref(), data)?; Ok(Some(run)) } else { warn!("No run found for alarm message with timestamp: {timestamp}"); @@ -145,7 +148,7 @@ impl NexusEngine { } let mut run = Run::new_run( - &self.local_path, + self.local_path.as_deref(), RunParameters::new(data, self.run_number)?, &self.nexus_settings, &self.nexus_configuration, @@ -168,14 +171,18 @@ impl NexusEngine { self.run_cache .back_mut() .expect("run_cache::back_mut should exist") - .abort_run(&self.local_path, data.start_time(), &self.nexus_settings)?; + .abort_run( + self.local_path.as_deref(), + data.start_time(), + &self.nexus_settings, + )?; Ok(()) } #[tracing::instrument(skip_all)] pub(crate) fn stop_command(&mut self, data: RunStop<'_>) -> anyhow::Result<&Run> { if let Some(last_run) = self.run_cache.back_mut() { - last_run.set_stop_if_valid(&self.local_path, data)?; + last_run.set_stop_if_valid(self.local_path.as_deref(), data)?; Ok(last_run) } else { @@ -209,7 +216,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_message(&self.local_path, message)?; + run.push_message(self.local_path.as_deref(), message)?; Some(run) } else { warn!("No run found for message with timestamp: {timestamp}"); @@ -242,9 +249,12 @@ impl NexusEngine { /// Afterwhich the runs are dropped. #[tracing::instrument(skip_all, level = "debug")] pub(crate) async fn flush_move_cache(&mut self) { - if let Some(archive_name) = self.nexus_settings.archive_path.as_ref() { + if let Some((local_path, archive_path)) = Option::zip( + self.local_path.as_deref(), + self.nexus_settings.archive_path.as_ref(), + ) { for run in self.run_move_cache.iter() { - match run.move_to_archive(&self.local_path, archive_name) { + match run.move_to_archive(local_path, archive_path) { Ok(move_to_archive) => move_to_archive.await, Err(e) => warn!("Error Moving to Archive {e}"), } @@ -256,8 +266,6 @@ impl NexusEngine { #[cfg(test)] mod test { - use std::path::PathBuf; - use crate::nexus::{NexusConfiguration, NexusSettings}; use super::NexusEngine; @@ -337,7 +345,7 @@ mod test { #[test] fn empty_run() { let mut nexus = NexusEngine::new( - &PathBuf::new(), + None, NexusSettings::default(), NexusConfiguration::new(None), ); @@ -385,7 +393,7 @@ mod test { #[test] fn no_run_start() { let mut nexus = NexusEngine::new( - &PathBuf::new(), + None, NexusSettings::default(), NexusConfiguration::new(None), ); @@ -398,7 +406,7 @@ mod test { #[test] fn no_run_stop() { let mut nexus = NexusEngine::new( - &PathBuf::new(), + None, NexusSettings::default(), NexusConfiguration::new(None), ); @@ -417,7 +425,7 @@ mod test { #[test] fn frame_messages_correct() { let mut nexus = NexusEngine::new( - &PathBuf::new(), + None, NexusSettings::default(), NexusConfiguration::new(None), ); @@ -455,7 +463,7 @@ mod test { #[test] fn two_runs_flushed() { let mut nexus = NexusEngine::new( - &PathBuf::new(), + None, NexusSettings::default(), NexusConfiguration::new(None), ); diff --git a/nexus-writer/src/nexus/hdf5_file/run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file.rs index 4fb2c1e0..0e04cff3 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file.rs @@ -135,13 +135,8 @@ impl RunFile { } #[tracing::instrument(skip_all, err(level = "warn"))] - pub(crate) fn open_runfile(filename: &Path, run_name: &str) -> anyhow::Result { - let filename = { - let mut filename = filename.to_owned(); - filename.push(run_name); - filename.set_extension("nxs"); - filename - }; + pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result { + let filename = RunParameters::get_hdf5_path_buf(local_path, run_name); debug!("File open begin. File: {0}.", filename.display()); let file = File::open_rw(filename)?; diff --git a/nexus-writer/src/nexus/run.rs b/nexus-writer/src/nexus/run.rs index ea7ce71a..1528bc5c 100644 --- a/nexus-writer/src/nexus/run.rs +++ b/nexus-writer/src/nexus/run.rs @@ -17,12 +17,12 @@ pub(crate) struct Run { impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn new_run( - local_path: &Path, + local_path: Option<&Path>, parameters: RunParameters, nexus_settings: &NexusSettings, nexus_configuration: &NexusConfiguration, ) -> anyhow::Result { - if local_path.exists() { + if let Some(local_path) = local_path { let mut hdf5 = RunFile::new_runfile(local_path, ¶meters.run_name, nexus_settings)?; hdf5.init(¶meters, nexus_configuration)?; hdf5.close()?; @@ -50,12 +50,12 @@ impl Run { #[tracing::instrument(skip_all, level = "info")] pub(crate) fn move_to_archive( &self, - file_name: &Path, + local_name: &Path, archive_name: &Path, ) -> io::Result> { create_dir_all(archive_name)?; - let from_path = RunParameters::get_hdf5_path_buf(file_name, &self.parameters.run_name); + let from_path = RunParameters::get_hdf5_path_buf(local_name, &self.parameters.run_name); let to_path = RunParameters::get_hdf5_path_buf(archive_name, &self.parameters.run_name); let span = tracing::Span::current(); @@ -76,11 +76,11 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_logdata_to_run( &mut self, - local_path: &Path, + local_path: Option<&Path>, logdata: &f144_LogData, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - if local_path.exists() { + if let Some(local_path) = local_path { let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_logdata_to_runfile(logdata, nexus_settings)?; hdf5.close()?; @@ -93,10 +93,10 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_alarm_to_run( &mut self, - local_path: &Path, + local_path: Option<&Path>, alarm: Alarm, ) -> anyhow::Result<()> { - if local_path.exists() { + if let Some(local_path) = local_path { let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_alarm_to_runfile(alarm)?; hdf5.close()?; @@ -109,11 +109,11 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_selogdata( &mut self, - local_path: &Path, + local_path: Option<&Path>, logdata: se00_SampleEnvironmentData, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - if local_path.exists() { + if let Some(local_path) = local_path { let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_selogdata(logdata, nexus_settings)?; hdf5.close()?; @@ -126,10 +126,10 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_message( &mut self, - local_path: &Path, + local_path: Option<&Path>, message: &FrameAssembledEventListMessage, ) -> anyhow::Result<()> { - if local_path.exists() { + if let Some(local_path) = local_path { let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_message_to_runfile(message)?; //&self.parameters, hdf5.close()?; @@ -150,12 +150,12 @@ impl Run { pub(crate) fn set_stop_if_valid( &mut self, - local_path: &Path, + local_path: Option<&Path>, data: RunStop<'_>, ) -> anyhow::Result<()> { self.parameters.set_stop_if_valid(data)?; - if local_path.exists() { + if let Some(local_path) = local_path { let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.set_end_time( @@ -173,13 +173,13 @@ impl Run { pub(crate) fn abort_run( &mut self, - local_path: &Path, + local_path: Option<&Path>, absolute_stop_time_ms: u64, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { self.parameters.set_aborted_run(absolute_stop_time_ms)?; - if local_path.exists() { + if let Some(local_path) = local_path { let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; let collect_until = self From 7ab38e6c9e9a7adb90a57e798255155a8c01ab74 Mon Sep 17 00:00:00 2001 From: Modularius Date: Fri, 29 Nov 2024 00:16:25 +0000 Subject: [PATCH 10/11] Fixed bug in hdf5, check storage space first --- nexus-writer/src/nexus/engine.rs | 2 +- nexus-writer/src/nexus/hdf5_file/run_file.rs | 75 +++++++------------- nexus-writer/src/nexus/run_parameters.rs | 29 -------- 3 files changed, 25 insertions(+), 81 deletions(-) diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index 1ff9822f..1bcfb29e 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -251,7 +251,7 @@ impl NexusEngine { pub(crate) async fn flush_move_cache(&mut self) { if let Some((local_path, archive_path)) = Option::zip( self.local_path.as_deref(), - self.nexus_settings.archive_path.as_ref(), + self.nexus_settings.archive_path.as_deref(), ) { for run in self.run_move_cache.iter() { match run.move_to_archive(local_path, archive_path) { diff --git a/nexus-writer/src/nexus/hdf5_file/run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file.rs index 0e04cff3..9cc62dac 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file.rs @@ -9,7 +9,7 @@ use crate::nexus::{ NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT, }; use chrono::{DateTime, Utc}; -use hdf5::{types::VarLenUnicode, Dataset, File}; +use hdf5::{types::VarLenUnicode, Dataset, File, H5Type}; use std::{fs::create_dir_all, path::Path}; use supermusr_streaming_types::{ aev2_frame_assembled_event_v2_generated::FrameAssembledEventListMessage, @@ -221,6 +221,7 @@ impl RunFile { let start_time = parameters.collect_from.format(DATETIME_FORMAT).to_string(); set_string_to(&self.start_time, &start_time)?; + set_string_to(&self.end_time, "")?; set_string_to(&self.name, ¶meters.run_name)?; set_string_to(&self.title, "")?; @@ -245,44 +246,7 @@ impl RunFile { set_string_to(&self.end_time, &end_time)?; Ok(()) } - /* - #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] - pub(crate) fn ensure_end_time_is_set( - &mut self, - parameters: &RunParameters, - message: &FrameAssembledEventListMessage, - ) -> anyhow::Result<()> { - let end_time = { - if let Some(run_stop_parameters) = ¶meters.run_stop_parameters { - run_stop_parameters.collect_until - } else { - let time = message - .time() - .ok_or(anyhow::anyhow!("Event time missing."))?; - - let ms = if time.is_empty() { - 0 - } else { - time.get(time.len() - 1).div_ceil(1_000_000).into() - }; - - let duration = Duration::try_milliseconds(ms) - .ok_or(anyhow::anyhow!("Invalid duration {ms}ms."))?; - - let timestamp: DateTime = (*message - .metadata() - .timestamp() - .ok_or(anyhow::anyhow!("Message timestamp missing."))?) - .try_into()?; - - timestamp - .checked_add_signed(duration) - .ok_or(anyhow::anyhow!("Unable to add {duration} to {timestamp}"))? - } - }; - self.set_end_time(&end_time) - } - */ + #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn push_logdata_to_runfile( &mut self, @@ -310,29 +274,38 @@ impl RunFile { #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn push_message_to_runfile( &mut self, - //parameters: &RunParameters, message: &FrameAssembledEventListMessage, ) -> anyhow::Result<()> { - self.lists.push_message_to_event_runfile(message)?; - //self.ensure_end_time_is_set(parameters, message)?; - Ok(()) + self.lists.push_message_to_event_runfile(message) + } + + fn try_read_scalar(dataset: &Dataset) -> anyhow::Result { + if dataset.storage_size() != 0 { + if dataset.is_scalar() { + Ok(dataset.read_scalar::()?) + } else { + anyhow::bail!("{} is not a scalar", dataset.name()) + } + } else { + anyhow::bail!("{} is not allocated", dataset.name()) + } } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn extract_run_parameters(&self) -> anyhow::Result { let collect_from: DateTime = - String::from(self.start_time.read_scalar::()?).parse()?; - let run_name = self.name.read_scalar::()?.into(); - let run_number = self.run_number.read_scalar::()?; - let num_periods = self.period_number.read_scalar::()?; - let instrument_name = self.instrument_name.read_scalar::()?.into(); - let run_stop_parameters = String::from(self.end_time.read_scalar::()?) + Self::try_read_scalar::(&self.start_time)?.parse()?; + let run_name = Self::try_read_scalar::(&self.name)?.into(); + let run_number = Self::try_read_scalar::(&self.run_number)?; + let num_periods = Self::try_read_scalar::(&self.period_number)?; + let instrument_name = Self::try_read_scalar::(&self.instrument_name)?.into(); + let run_stop_parameters = Self::try_read_scalar::(&self.end_time)? .parse() - .ok() .map(|collect_until| RunStopParameters { collect_until, last_modified: Utc::now(), - }); + }) + .ok(); Ok(RunParameters { collect_from, run_stop_parameters, diff --git a/nexus-writer/src/nexus/run_parameters.rs b/nexus-writer/src/nexus/run_parameters.rs index bc91a97c..8752d542 100644 --- a/nexus-writer/src/nexus/run_parameters.rs +++ b/nexus-writer/src/nexus/run_parameters.rs @@ -112,33 +112,4 @@ impl RunParameters { path.set_extension("nxs"); path } - /* - pub(crate) fn get_partial_path_buf(path: &Path, run_name: &str) -> PathBuf { - let mut path = path.to_owned(); - path.push(run_name); - path.set_extension("partial_run"); - path - } - */ - /* - pub(crate) fn save_partial_run(&self, path: &Path) -> anyhow::Result<()> { - let path_buf = Self::get_partial_path_buf(path, &self.run_name); - let file = File::create(path_buf.as_path())?; - serde_json::to_writer(file, &self)?; - Ok(()) - } - */ - /* - pub(crate) fn detect_partial_run(path: &Path, filename: &str) -> anyhow::Result> { - let run_file = RunFile::open_runfile(path, filename)?; - let path_buf = Self::get_partial_path_buf(path, filename); - if path_buf.as_path().exists() { - let file = File::open(path_buf.as_path())?; - let run_parameters: RunParameters = serde_json::from_reader(file)?; - std::fs::remove_file(path_buf.as_path())?; - Ok(Some(run_parameters)) - } else { - Ok(None) - } - } */ } From 252612912ca7924a48dcc701d12470eef32ce9e7 Mon Sep 17 00:00:00 2001 From: Modularius Date: Mon, 9 Dec 2024 13:57:12 +0000 Subject: [PATCH 11/11] Implemented changes --- Cargo.lock | 1 + Cargo.toml | 1 + nexus-writer/Cargo.toml | 1 + nexus-writer/Test1.nxs | Bin 13504 -> 0 bytes nexus-writer/TestRun1.nxs | Bin 13504 -> 0 bytes nexus-writer/src/nexus/engine.rs | 48 ++++++++----------- nexus-writer/src/nexus/hdf5_file/run_file.rs | 4 +- nexus-writer/src/nexus/run.rs | 11 ++--- nexus-writer/src/nexus/run_parameters.rs | 2 +- 9 files changed, 32 insertions(+), 36 deletions(-) delete mode 100644 nexus-writer/Test1.nxs delete mode 100644 nexus-writer/TestRun1.nxs diff --git a/Cargo.lock b/Cargo.lock index b16e29c1..9e79185f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1735,6 +1735,7 @@ dependencies = [ "anyhow", "chrono", "clap", + "glob", "hdf5", "metrics", "metrics-exporter-prometheus", diff --git a/Cargo.toml b/Cargo.toml index 0d40ce9b..fe1e3543 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ chrono = { version = "0.4.38", features = ["serde"] } clap = { version = "4.5", features = ["derive", "env"] } crossterm = "0.26.1" flatbuffers = "22.12.6" +glob = "0.3.1" hdf5 = "0.8.1" itertools = "0.12.1" lazy_static = "1.5.0" diff --git a/nexus-writer/Cargo.toml b/nexus-writer/Cargo.toml index 6094a961..775e9961 100644 --- a/nexus-writer/Cargo.toml +++ b/nexus-writer/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true anyhow.workspace = true chrono.workspace = true clap.workspace = true +glob.workspace = true hdf5.workspace = true metrics.workspace = true metrics-exporter-prometheus.workspace = true diff --git a/nexus-writer/Test1.nxs b/nexus-writer/Test1.nxs deleted file mode 100644 index d6a58f225b2c96652dc3d337495d5b6c95274714..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 13504 zcmeHN%}(1u5FQ681_7c{4}t!LwyJ0=C`p4#Ikn=asZ;@KspMNo8v~ZWL5^wZt;h7n zu}6*^^9UR{@(6td-k{yxnTcboj;R6?klll4JUbrmex4bx*YmS5zc|=G+7GgsCUnEF zV)^ig+EE@Qe9rxA#27?tL=RKQ7|;u{FHQ1cvY+Wf!699^`e9)KgeKc(N|!YZm<@qu zIthWo!kZO2UyCNq{&Y-!##nfzZ^d%SFBP*WVZVIM3jaxOlD|g%dH@Q_k^y~i3rtwD zieI)$)he4GF${I3j3yiddM=NrS1OV-OIcuVP}vh48TlaK`V zhH!kgwOeg6m}F0yV|y}G1AG0--nz487v0jftrU~T*VkmPJIC$*qxK*1Aon8?QO2=L>;_9-ZnGf788wg|p(o|3R7$Cu^rxiD;6EU|qoUNs z;eeS-P0_Eq8-7(7X+2!t&p)``T5pHC55`*S6{!2qd?-M zdaUuMsA#q}qkFF@0$e9y(fs%;!r}lRIhP1=ycF8RgV!_BIBs`6qv{99nPsTyyc&K` z(S4%ylW@O2I~xL+S0T3k>}*!F)^$Z7npgS#TBq+eo;T;y6&9QmeK)_@R3!>fw$~lk z@u>8GfsD!RMdpKT8ZVDHH~Dp`G1bjsrBtUIc!>X$sf7!-KgZn^0U zfZr3o<8R5D(r^_Q09I{@`>QIgDsE;8z4|%L`=ZSxt*7=xEuC!qyzxvQI|YGg-1B&f2GpuBSF>pNyZgre{OoFN8#c_y7O^ diff --git a/nexus-writer/TestRun1.nxs b/nexus-writer/TestRun1.nxs deleted file mode 100644 index 47b3a5581b05de512db005eca541ae842a6357bd..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 13504 zcmeHN%}(1u5FQ681_7c{4?zDyTUE3bl!TxHr&jzll`24#O1_1(F<=QC4Uv41>?b==a6}ibuPrWu&}92e>9U#uvnJ3? zJ0Xx=e6uR&Ytf|HpH9fn7z;1|HCM>{`CJMm?3b@u;XmnZ^4G9m_d!-!GN1?UfCKDXZJ-SN23$PUpVZUdbuCQZ>%rEvZEIBqViv z12{h0Dwi4z#@UnR*q#j4y1iayZ`0Yab8dduR*LcC>ua#r8D-CR&TVhB!Csf(4e0o1 zo~bCH0k);SXqaabk$J`%LA@*zIDLXlh0Zm(*$8l+*`PqJ(sa!UWTO16I)?r1XHj~; z9tQQuJaC>llffa{Ao*=Q?pn_?Do3FC+VSoD%6m0cdhOG+#akPj$ZKA)=mV!&wvn1h zWhSOTL|w4a@I}7Bwr-dFwX&O3XKimRELnid#K@$@gr} zDHcc!X9LV>su9o#XaqC@8Uc-fMnEH=5zq)|1T+E~f$NXJhvk(8L+qUK_wfmR!1F#p zct7I$&-ij?U_h7Jf5d~_k3>Wn#}2U@EP1)jf)FRwK)QsUlqV7irE1cjk`9Cafbfor zQWu9qX09|vzv`^{Rd%@faCtxf@NRRxJ?cIjZLXK4?$Za&^{#qvyk89||B2equltu$ z6VC&YuKO2l{MCT?S>vuEqwxqnm3TbbEAPbP(O&Vw{}uLLpx+#fBW?A+f_l5rqCt|V z{~eIV`)wnMJ?iHdgh=C|@dzRiJk$OW!~@rzCLEA>l71Mf#kG(PJ;jchF{Iy-K=lzZ z5#)&E*(5S-Z~G;lSE$~E%tG}ZT@XQLNPc`jLV?7GM6J*~!ufoI^bV55M34%}j~_=U zkT|X$YwS5Hn$6AV-W!Sl*GX73KmLlaH~>h_B|;o8*%tBO^-MU9TV2mI)+fnO(|I-c zpu+n^L(hW!`s{oNU|xmD`m^&{(ONeZfpA{s_iL5D+j!obPghxRLG<0iQbUy}K*8R0 zT*s#>RDeCNxaH+{Yo(u}nkwg(cR$%44#3`*byTwYD(4i$(b;rVSIN(Ney(2NX}g7n zGXQ>1_>R9LYf6JvTmV?LCGM}Pv`V;{CG@HnH1Eqc6Stn)7qxV<@$<$reQXy5!g0^* xsU!NXU>rEsIQ}ncJr(|rX>~oN{Ft0s)>_7|r`l_uGQ6JJrhPJg&YGSNfj`pgg*N~I diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index 1bcfb29e..abc04f90 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -1,9 +1,11 @@ use super::{Run, RunParameters}; use chrono::{DateTime, Duration, Utc}; +use glob::glob; #[cfg(test)] use std::collections::vec_deque; use std::{ collections::VecDeque, + ffi::OsStr, path::{Path, PathBuf}, }; use supermusr_common::spanned::SpannedAggregator; @@ -41,35 +43,27 @@ impl NexusEngine { } } - fn get_files_in_dir(dir_path: &Path, ext: &str) -> anyhow::Result> { - let vec = std::fs::read_dir(dir_path)? - .flatten() - .flat_map(|entry| { - if match entry.file_type() { - Ok(file_type) => file_type.is_file(), - Err(e) => return Some(Err(e)), - } { - let path = entry.path(); - if path.extension().is_some_and(|path_ext| path_ext == ext) { - path.file_stem() - .and_then(|stem| stem.to_os_string().into_string().ok()) - .map(Ok) - } else { - None - } - } else { - None - } - }) - .collect::>>()?; - Ok(vec) - } - pub(crate) fn resume_partial_runs(&mut self) -> anyhow::Result<()> { if let Some(local_path) = &self.local_path { - for filename in Self::get_files_in_dir(local_path, "nxs")? { - let mut run = info_span!("Partial Run Found", path = filename) - .in_scope(|| Run::resume_partial_run(local_path, &filename))?; + let local_path_str = local_path.as_os_str().to_str().ok_or_else(|| { + anyhow::anyhow!("Cannot convert local path to string: {0:?}", local_path) + })?; + + for filename in glob(&format!("{local_path_str}/*.nxs"))? { + let filename = filename?; + let filename_str = + filename + .file_stem() + .and_then(OsStr::to_str) + .ok_or_else(|| { + anyhow::anyhow!("Cannot convert filename to string: {0:?}", filename) + })?; + let mut run = info_span!( + "Partial Run Found", + path = local_path_str, + file_name = filename_str + ) + .in_scope(|| Run::resume_partial_run(local_path, filename_str))?; if let Err(e) = run.span_init() { warn!("Run span initiation failed {e}") } diff --git a/nexus-writer/src/nexus/hdf5_file/run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file.rs index 9cc62dac..44b56ace 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file.rs @@ -56,7 +56,7 @@ impl RunFile { nexus_settings: &NexusSettings, ) -> anyhow::Result { create_dir_all(path)?; - let filename = RunParameters::get_hdf5_path_buf(path, run_name); + let filename = RunParameters::get_hdf5_filename(path, run_name); debug!("File save begin. File: {0}.", filename.display()); let file = File::create(filename)?; @@ -136,7 +136,7 @@ impl RunFile { #[tracing::instrument(skip_all, err(level = "warn"))] pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result { - let filename = RunParameters::get_hdf5_path_buf(local_path, run_name); + let filename = RunParameters::get_hdf5_filename(local_path, run_name); debug!("File open begin. File: {0}.", filename.display()); let file = File::open_rw(filename)?; diff --git a/nexus-writer/src/nexus/run.rs b/nexus-writer/src/nexus/run.rs index 1528bc5c..aee0dc6b 100644 --- a/nexus-writer/src/nexus/run.rs +++ b/nexus-writer/src/nexus/run.rs @@ -55,8 +55,8 @@ impl Run { ) -> io::Result> { create_dir_all(archive_name)?; - let from_path = RunParameters::get_hdf5_path_buf(local_name, &self.parameters.run_name); - let to_path = RunParameters::get_hdf5_path_buf(archive_name, &self.parameters.run_name); + let from_path = RunParameters::get_hdf5_filename(local_name, &self.parameters.run_name); + let to_path = RunParameters::get_hdf5_filename(archive_name, &self.parameters.run_name); let span = tracing::Span::current(); let future = async move { @@ -131,7 +131,7 @@ impl Run { ) -> anyhow::Result<()> { if let Some(local_path) = local_path { let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; - hdf5.push_message_to_runfile(message)?; //&self.parameters, + hdf5.push_message_to_runfile(message)?; hdf5.close()?; } @@ -163,7 +163,7 @@ impl Run { .parameters .run_stop_parameters .as_ref() - .expect("RunStopParameters exists") // This never panics + .expect("RunStopParameters should exist, this should never happen") .collect_until, )?; hdf5.close()?; @@ -186,7 +186,7 @@ impl Run { .parameters .run_stop_parameters .as_ref() - .expect("RunStopParameters should exists") // This never panics + .expect("RunStopParameters should exist, this should never happen") .collect_until; hdf5.set_end_time(&collect_until)?; @@ -250,7 +250,6 @@ impl SpannedAggregator for Run { } fn end_span(&self) -> Result<(), SpanOnceError> { - //let span_once = ;//.take().expect("SpanOnce should be takeable"); self.span() .get()? .record("run_has_run_stop", self.has_run_stop()); diff --git a/nexus-writer/src/nexus/run_parameters.rs b/nexus-writer/src/nexus/run_parameters.rs index 8752d542..1a5e4526 100644 --- a/nexus-writer/src/nexus/run_parameters.rs +++ b/nexus-writer/src/nexus/run_parameters.rs @@ -106,7 +106,7 @@ impl RunParameters { } } - pub(crate) fn get_hdf5_path_buf(path: &Path, run_name: &str) -> PathBuf { + pub(crate) fn get_hdf5_filename(path: &Path, run_name: &str) -> PathBuf { let mut path = path.to_owned(); path.push(run_name); path.set_extension("nxs");