From 10b362d8f7469630ff637e86a24bcda935019dce Mon Sep 17 00:00:00 2001 From: Modularius Date: Mon, 9 Dec 2024 14:08:49 +0000 Subject: [PATCH] Resume Run Functionality (#273) ## Summary of changes Added behaviour for the Nexus Writer to resume a run if it was interrupted by the component going down. - ~~A file with extension ".partial_run" is added to the local path when a run_start is received. This file is removed when the nexus file is transferred to the archive (signalling a successful run).~~ - When the nexus writer starts, any ~~".partial_run"~~ ".nxs" files in the local path are read and runs re-created (as ~~.partial_run~~ .nxs files indicate that a previous instance of nexus writer had failed). ## Instruction for review/testing General code review. Tool was tested with simulated data. --- Cargo.lock | 1 + Cargo.toml | 1 + nexus-writer/Cargo.toml | 1 + nexus-writer/src/main.rs | 3 +- nexus-writer/src/nexus/engine.rs | 62 ++++++++--- nexus-writer/src/nexus/hdf5_file/run_file.rs | 108 +++++++++---------- nexus-writer/src/nexus/run.rs | 80 +++++++------- nexus-writer/src/nexus/run_parameters.rs | 13 ++- 8 files changed, 151 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5c768ea..64aa0408 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1726,6 +1726,7 @@ dependencies = [ "anyhow", "chrono", "clap", + "glob", "hdf5", "metrics", "metrics-exporter-prometheus", diff --git a/Cargo.toml b/Cargo.toml index 421d3770..4b05d9d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ chrono = { version = "0.4.38", features = ["serde"] } clap = { version = "4.5", features = ["derive", "env"] } crossterm = "0.26.1" flatbuffers = "22.12.6" +glob = "0.3.1" hdf5 = "0.8.1" itertools = "0.13.0" lazy_static = "1.5.0" diff --git a/nexus-writer/Cargo.toml b/nexus-writer/Cargo.toml index 6094a961..775e9961 100644 --- a/nexus-writer/Cargo.toml +++ b/nexus-writer/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true anyhow.workspace = true chrono.workspace = true clap.workspace = true +glob.workspace = true hdf5.workspace = true metrics.workspace = true metrics-exporter-prometheus.workspace = true diff --git a/nexus-writer/src/main.rs b/nexus-writer/src/main.rs index d1a67060..41cde6e7 100644 --- a/nexus-writer/src/main.rs +++ b/nexus-writer/src/main.rs @@ -167,6 +167,7 @@ async fn main() -> anyhow::Result<()> { nexus_settings, nexus_configuration, ); + nexus_engine.resume_partial_runs()?; let mut nexus_write_interval = tokio::time::interval(time::Duration::from_millis(args.cache_poll_interval_ms)); @@ -389,7 +390,7 @@ fn process_run_stop_message(nexus_engine: &mut NexusEngine, payload: &[u8]) { } Err(e) => { let _guard = warn_span!( - "RunStop Error.", + "RunStop Error", run_name = data.run_name(), stop_time = data.stop_time(), ) diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index 368a667c..abc04f90 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -1,9 +1,11 @@ use super::{Run, RunParameters}; use chrono::{DateTime, Duration, Utc}; +use glob::glob; #[cfg(test)] use std::collections::vec_deque; use std::{ collections::VecDeque, + ffi::OsStr, path::{Path, PathBuf}, }; use supermusr_common::spanned::SpannedAggregator; @@ -13,10 +15,10 @@ use supermusr_streaming_types::{ ecs_f144_logdata_generated::f144_LogData, ecs_pl72_run_start_generated::RunStart, ecs_se00_data_generated::se00_SampleEnvironmentData, }; -use tracing::warn; +use tracing::{info_span, warn}; pub(crate) struct NexusEngine { - filename: Option, + local_path: Option, run_cache: VecDeque, run_number: u32, nexus_settings: NexusSettings, @@ -27,12 +29,12 @@ pub(crate) struct NexusEngine { impl NexusEngine { #[tracing::instrument(skip_all)] pub(crate) fn new( - filename: Option<&Path>, + local_path: Option<&Path>, nexus_settings: NexusSettings, nexus_configuration: NexusConfiguration, ) -> Self { Self { - filename: filename.map(ToOwned::to_owned), + local_path: local_path.map(ToOwned::to_owned), run_cache: Default::default(), run_number: 0, nexus_settings, @@ -41,6 +43,36 @@ impl NexusEngine { } } + pub(crate) fn resume_partial_runs(&mut self) -> anyhow::Result<()> { + if let Some(local_path) = &self.local_path { + let local_path_str = local_path.as_os_str().to_str().ok_or_else(|| { + anyhow::anyhow!("Cannot convert local path to string: {0:?}", local_path) + })?; + + for filename in glob(&format!("{local_path_str}/*.nxs"))? { + let filename = filename?; + let filename_str = + filename + .file_stem() + .and_then(OsStr::to_str) + .ok_or_else(|| { + anyhow::anyhow!("Cannot convert filename to string: {0:?}", filename) + })?; + let mut run = info_span!( + "Partial Run Found", + path = local_path_str, + file_name = filename_str + ) + .in_scope(|| Run::resume_partial_run(local_path, filename_str))?; + if let Err(e) = run.span_init() { + warn!("Run span initiation failed {e}") + } + self.run_cache.push_back(run); + } + } + Ok(()) + } + #[cfg(test)] fn cache_iter(&self) -> vec_deque::Iter<'_, Run> { self.run_cache.iter() @@ -61,7 +93,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_selogdata(self.filename.as_deref(), data, &self.nexus_settings)?; + run.push_selogdata(self.local_path.as_deref(), data, &self.nexus_settings)?; Ok(Some(run)) } else { warn!("No run found for selogdata message with timestamp: {timestamp}"); @@ -77,7 +109,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_logdata_to_run(self.filename.as_deref(), data, &self.nexus_settings)?; + run.push_logdata_to_run(self.local_path.as_deref(), data, &self.nexus_settings)?; Ok(Some(run)) } else { warn!("No run found for logdata message with timestamp: {timestamp}"); @@ -93,7 +125,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_alarm_to_run(self.filename.as_deref(), data)?; + run.push_alarm_to_run(self.local_path.as_deref(), data)?; Ok(Some(run)) } else { warn!("No run found for alarm message with timestamp: {timestamp}"); @@ -110,7 +142,7 @@ impl NexusEngine { } let mut run = Run::new_run( - self.filename.as_deref(), + self.local_path.as_deref(), RunParameters::new(data, self.run_number)?, &self.nexus_settings, &self.nexus_configuration, @@ -134,7 +166,7 @@ impl NexusEngine { .back_mut() .expect("run_cache::back_mut should exist") .abort_run( - self.filename.as_deref(), + self.local_path.as_deref(), data.start_time(), &self.nexus_settings, )?; @@ -144,7 +176,7 @@ impl NexusEngine { #[tracing::instrument(skip_all)] pub(crate) fn stop_command(&mut self, data: RunStop<'_>) -> anyhow::Result<&Run> { if let Some(last_run) = self.run_cache.back_mut() { - last_run.set_stop_if_valid(self.filename.as_deref(), data)?; + last_run.set_stop_if_valid(self.local_path.as_deref(), data)?; Ok(last_run) } else { @@ -178,7 +210,7 @@ impl NexusEngine { .iter_mut() .find(|run| run.is_message_timestamp_valid(×tamp)) { - run.push_message(self.filename.as_deref(), message)?; + run.push_message(self.local_path.as_deref(), message)?; Some(run) } else { warn!("No run found for message with timestamp: {timestamp}"); @@ -211,12 +243,12 @@ impl NexusEngine { /// Afterwhich the runs are dropped. #[tracing::instrument(skip_all, level = "debug")] pub(crate) async fn flush_move_cache(&mut self) { - if let Some((file_name, archive_name)) = Option::zip( - self.filename.as_ref(), - self.nexus_settings.archive_path.as_ref(), + if let Some((local_path, archive_path)) = Option::zip( + self.local_path.as_deref(), + self.nexus_settings.archive_path.as_deref(), ) { for run in self.run_move_cache.iter() { - match run.move_to_archive(file_name, archive_name) { + match run.move_to_archive(local_path, archive_path) { Ok(move_to_archive) => move_to_archive.await, Err(e) => warn!("Error Moving to Archive {e}"), } diff --git a/nexus-writer/src/nexus/hdf5_file/run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file.rs index 802c3d8b..44b56ace 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file.rs @@ -4,10 +4,12 @@ use super::{ }; use crate::nexus::{ hdf5_file::run_file_components::{RunLog, SeLog}, - nexus_class as NX, NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT, + nexus_class as NX, + run_parameters::RunStopParameters, + NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT, }; -use chrono::{DateTime, Duration, Utc}; -use hdf5::{types::VarLenUnicode, Dataset, File}; +use chrono::{DateTime, Utc}; +use hdf5::{types::VarLenUnicode, Dataset, File, H5Type}; use std::{fs::create_dir_all, path::Path}; use supermusr_streaming_types::{ aev2_frame_assembled_event_v2_generated::FrameAssembledEventListMessage, @@ -49,17 +51,12 @@ pub(crate) struct RunFile { impl RunFile { #[tracing::instrument(skip_all, err(level = "warn"))] pub(crate) fn new_runfile( - filename: &Path, + path: &Path, run_name: &str, nexus_settings: &NexusSettings, ) -> anyhow::Result { - create_dir_all(filename)?; - let filename = { - let mut filename = filename.to_owned(); - filename.push(run_name); - filename.set_extension("nxs"); - filename - }; + create_dir_all(path)?; + let filename = RunParameters::get_hdf5_filename(path, run_name); debug!("File save begin. File: {0}.", filename.display()); let file = File::create(filename)?; @@ -138,13 +135,8 @@ impl RunFile { } #[tracing::instrument(skip_all, err(level = "warn"))] - pub(crate) fn open_runfile(filename: &Path, run_name: &str) -> anyhow::Result { - let filename = { - let mut filename = filename.to_owned(); - filename.push(run_name); - filename.set_extension("nxs"); - filename - }; + pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result { + let filename = RunParameters::get_hdf5_filename(local_path, run_name); debug!("File open begin. File: {0}.", filename.display()); let file = File::open_rw(filename)?; @@ -229,6 +221,7 @@ impl RunFile { let start_time = parameters.collect_from.format(DATETIME_FORMAT).to_string(); set_string_to(&self.start_time, &start_time)?; + set_string_to(&self.end_time, "")?; set_string_to(&self.name, ¶meters.run_name)?; set_string_to(&self.title, "")?; @@ -254,43 +247,6 @@ impl RunFile { Ok(()) } - #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] - pub(crate) fn ensure_end_time_is_set( - &mut self, - parameters: &RunParameters, - message: &FrameAssembledEventListMessage, - ) -> anyhow::Result<()> { - let end_time = { - if let Some(run_stop_parameters) = ¶meters.run_stop_parameters { - run_stop_parameters.collect_until - } else { - let time = message - .time() - .ok_or(anyhow::anyhow!("Event time missing."))?; - - let ms = if time.is_empty() { - 0 - } else { - time.get(time.len() - 1).div_ceil(1_000_000).into() - }; - - let duration = Duration::try_milliseconds(ms) - .ok_or(anyhow::anyhow!("Invalid duration {ms}ms."))?; - - let timestamp: DateTime = (*message - .metadata() - .timestamp() - .ok_or(anyhow::anyhow!("Message timestamp missing."))?) - .try_into()?; - - timestamp - .checked_add_signed(duration) - .ok_or(anyhow::anyhow!("Unable to add {duration} to {timestamp}"))? - } - }; - self.set_end_time(&end_time) - } - #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn push_logdata_to_runfile( &mut self, @@ -318,12 +274,46 @@ impl RunFile { #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn push_message_to_runfile( &mut self, - parameters: &RunParameters, message: &FrameAssembledEventListMessage, ) -> anyhow::Result<()> { - self.lists.push_message_to_event_runfile(message)?; - self.ensure_end_time_is_set(parameters, message)?; - Ok(()) + self.lists.push_message_to_event_runfile(message) + } + + fn try_read_scalar(dataset: &Dataset) -> anyhow::Result { + if dataset.storage_size() != 0 { + if dataset.is_scalar() { + Ok(dataset.read_scalar::()?) + } else { + anyhow::bail!("{} is not a scalar", dataset.name()) + } + } else { + anyhow::bail!("{} is not allocated", dataset.name()) + } + } + + #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] + pub(crate) fn extract_run_parameters(&self) -> anyhow::Result { + let collect_from: DateTime = + Self::try_read_scalar::(&self.start_time)?.parse()?; + let run_name = Self::try_read_scalar::(&self.name)?.into(); + let run_number = Self::try_read_scalar::(&self.run_number)?; + let num_periods = Self::try_read_scalar::(&self.period_number)?; + let instrument_name = Self::try_read_scalar::(&self.instrument_name)?.into(); + let run_stop_parameters = Self::try_read_scalar::(&self.end_time)? + .parse() + .map(|collect_until| RunStopParameters { + collect_until, + last_modified: Utc::now(), + }) + .ok(); + Ok(RunParameters { + collect_from, + run_stop_parameters, + num_periods, + run_name, + run_number, + instrument_name, + }) } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] diff --git a/nexus-writer/src/nexus/run.rs b/nexus-writer/src/nexus/run.rs index 4c5cbd98..aee0dc6b 100644 --- a/nexus-writer/src/nexus/run.rs +++ b/nexus-writer/src/nexus/run.rs @@ -12,26 +12,34 @@ use tracing::{info, info_span, warn, Span}; pub(crate) struct Run { span: SpanOnce, parameters: RunParameters, - num_frames: usize, } impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn new_run( - filename: Option<&Path>, + local_path: Option<&Path>, parameters: RunParameters, nexus_settings: &NexusSettings, nexus_configuration: &NexusConfiguration, ) -> anyhow::Result { - if let Some(filename) = filename { - let mut hdf5 = RunFile::new_runfile(filename, ¶meters.run_name, nexus_settings)?; + if let Some(local_path) = local_path { + let mut hdf5 = RunFile::new_runfile(local_path, ¶meters.run_name, nexus_settings)?; hdf5.init(¶meters, nexus_configuration)?; hdf5.close()?; } + + Ok(Self { + span: Default::default(), + parameters, + }) + } + + pub(crate) fn resume_partial_run(local_path: &Path, filename: &str) -> anyhow::Result { + let run = RunFile::open_runfile(local_path, filename)?; + let parameters = run.extract_run_parameters()?; Ok(Self { span: Default::default(), parameters, - num_frames: usize::default(), }) } @@ -42,22 +50,14 @@ impl Run { #[tracing::instrument(skip_all, level = "info")] pub(crate) fn move_to_archive( &self, - file_name: &Path, + local_name: &Path, archive_name: &Path, ) -> io::Result> { create_dir_all(archive_name)?; - let from_path = { - let mut filename = file_name.to_owned(); - filename.push(&self.parameters.run_name); - filename.set_extension("nxs"); - filename - }; - let to_path = { - let mut filename = archive_name.to_owned(); - filename.push(&self.parameters.run_name); - filename.set_extension("nxs"); - filename - }; + + let from_path = RunParameters::get_hdf5_filename(local_name, &self.parameters.run_name); + let to_path = RunParameters::get_hdf5_filename(archive_name, &self.parameters.run_name); + let span = tracing::Span::current(); let future = async move { info_span!(parent: &span, "move-async").in_scope(|| { @@ -76,12 +76,12 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_logdata_to_run( &mut self, - filename: Option<&Path>, + local_path: Option<&Path>, logdata: &f144_LogData, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if let Some(local_path) = local_path { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_logdata_to_runfile(logdata, nexus_settings)?; hdf5.close()?; } @@ -93,11 +93,11 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_alarm_to_run( &mut self, - filename: Option<&Path>, + local_path: Option<&Path>, alarm: Alarm, ) -> anyhow::Result<()> { - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if let Some(local_path) = local_path { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_alarm_to_runfile(alarm)?; hdf5.close()?; } @@ -109,12 +109,12 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_selogdata( &mut self, - filename: Option<&Path>, + local_path: Option<&Path>, logdata: se00_SampleEnvironmentData, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if let Some(local_path) = local_path { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.push_selogdata(logdata, nexus_settings)?; hdf5.close()?; } @@ -126,16 +126,15 @@ impl Run { #[tracing::instrument(skip_all, level = "debug", err(level = "warn"))] pub(crate) fn push_message( &mut self, - filename: Option<&Path>, + local_path: Option<&Path>, message: &FrameAssembledEventListMessage, ) -> anyhow::Result<()> { - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; - hdf5.push_message_to_runfile(&self.parameters, message)?; + if let Some(local_path) = local_path { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; + hdf5.push_message_to_runfile(message)?; hdf5.close()?; } - self.num_frames += 1; self.parameters.update_last_modified(); Ok(()) } @@ -151,20 +150,20 @@ impl Run { pub(crate) fn set_stop_if_valid( &mut self, - filename: Option<&Path>, + local_path: Option<&Path>, data: RunStop<'_>, ) -> anyhow::Result<()> { self.parameters.set_stop_if_valid(data)?; - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if let Some(local_path) = local_path { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; hdf5.set_end_time( &self .parameters .run_stop_parameters .as_ref() - .expect("RunStopParameters exists") // This never panics + .expect("RunStopParameters should exist, this should never happen") .collect_until, )?; hdf5.close()?; @@ -174,20 +173,20 @@ impl Run { pub(crate) fn abort_run( &mut self, - filename: Option<&Path>, + local_path: Option<&Path>, absolute_stop_time_ms: u64, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { self.parameters.set_aborted_run(absolute_stop_time_ms)?; - if let Some(filename) = filename { - let mut hdf5 = RunFile::open_runfile(filename, &self.parameters.run_name)?; + if let Some(local_path) = local_path { + let mut hdf5 = RunFile::open_runfile(local_path, &self.parameters.run_name)?; let collect_until = self .parameters .run_stop_parameters .as_ref() - .expect("RunStopParameters should exists") // This never panics + .expect("RunStopParameters should exist, this should never happen") .collect_until; hdf5.set_end_time(&collect_until)?; @@ -251,7 +250,6 @@ impl SpannedAggregator for Run { } fn end_span(&self) -> Result<(), SpanOnceError> { - //let span_once = ;//.take().expect("SpanOnce should be takeable"); self.span() .get()? .record("run_has_run_stop", self.has_run_stop()); diff --git a/nexus-writer/src/nexus/run_parameters.rs b/nexus-writer/src/nexus/run_parameters.rs index 70ed5285..1a5e4526 100644 --- a/nexus-writer/src/nexus/run_parameters.rs +++ b/nexus-writer/src/nexus/run_parameters.rs @@ -1,15 +1,17 @@ +use std::path::{Path, PathBuf}; + use chrono::{DateTime, Utc}; use supermusr_streaming_types::{ ecs_6s4t_run_stop_generated::RunStop, ecs_pl72_run_start_generated::RunStart, }; -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub(crate) struct RunStopParameters { pub(crate) collect_until: DateTime, pub(crate) last_modified: DateTime, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct RunParameters { pub(crate) collect_from: DateTime, pub(crate) run_stop_parameters: Option, @@ -103,4 +105,11 @@ impl RunParameters { params.last_modified = Utc::now(); } } + + pub(crate) fn get_hdf5_filename(path: &Path, run_name: &str) -> PathBuf { + let mut path = path.to_owned(); + path.push(run_name); + path.set_extension("nxs"); + path + } }