From 2c5f58805bbc1f553a723088f6df3d53c7522dae Mon Sep 17 00:00:00 2001 From: Modularius Date: Wed, 11 Dec 2024 12:26:57 +0000 Subject: [PATCH 1/3] Close hdf5 file when resuming --- nexus-writer/src/nexus/run.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nexus-writer/src/nexus/run.rs b/nexus-writer/src/nexus/run.rs index aee0dc6b..7062e80f 100644 --- a/nexus-writer/src/nexus/run.rs +++ b/nexus-writer/src/nexus/run.rs @@ -37,6 +37,8 @@ impl Run { pub(crate) fn resume_partial_run(local_path: &Path, filename: &str) -> anyhow::Result { let run = RunFile::open_runfile(local_path, filename)?; let parameters = run.extract_run_parameters()?; + run.close()?; + Ok(Self { span: Default::default(), parameters, From eadbd86eeaa99b11418fd7653b7de7114f45f996 Mon Sep 17 00:00:00 2001 From: Modularius Date: Wed, 11 Dec 2024 13:03:36 +0000 Subject: [PATCH 2/3] Ensure hdf5 file is closed upon error --- nexus-writer/src/nexus/hdf5_file/run_file.rs | 146 ++++++++++++------- 1 file changed, 91 insertions(+), 55 deletions(-) diff --git a/nexus-writer/src/nexus/hdf5_file/run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file.rs index 44b56ace..191516b9 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file.rs @@ -17,10 +17,15 @@ use supermusr_streaming_types::{ ecs_se00_data_generated::se00_SampleEnvironmentData, }; use tracing::debug; + #[derive(Debug)] pub(crate) struct RunFile { file: File, + contents: RunFileContents, +} +#[derive(Debug)] +struct RunFileContents { idf_version: Dataset, definition: Dataset, program_name: Dataset, @@ -48,26 +53,20 @@ pub(crate) struct RunFile { lists: EventRun, } -impl RunFile { +impl RunFileContents { #[tracing::instrument(skip_all, err(level = "warn"))] - pub(crate) fn new_runfile( - path: &Path, - run_name: &str, + pub(crate) fn populate_new_runfile( + file: &File, nexus_settings: &NexusSettings, ) -> anyhow::Result { - create_dir_all(path)?; - let filename = RunParameters::get_hdf5_filename(path, run_name); - debug!("File save begin. File: {0}.", filename.display()); - - let file = File::create(filename)?; - set_group_nx_class(&file, NX::ROOT)?; + set_group_nx_class(file, NX::ROOT)?; - add_attribute_to(&file, "HDF5_version", "1.14.3")?; // Can this be taken directly from the nix package; - add_attribute_to(&file, "NeXus_version", "")?; // Where does this come from? - add_attribute_to(&file, "file_name", &file.filename())?; // This should be absolutized at some point - add_attribute_to(&file, "file_time", Utc::now().to_string().as_str())?; // This should be formatted, the nanoseconds are overkill. + add_attribute_to(file, "HDF5_version", "1.14.3")?; // Can this be taken directly from the nix package; + add_attribute_to(file, "NeXus_version", "")?; // Where does this come from? + add_attribute_to(file, "file_name", &file.filename())?; // This should be absolutized at some point + add_attribute_to(file, "file_time", Utc::now().to_string().as_str())?; // This should be formatted, the nanoseconds are overkill. - let entry = add_new_group_to(&file, "raw_data_1", NX::ENTRY)?; + let entry = add_new_group_to(file, "raw_data_1", NX::ENTRY)?; let idf_version = entry.new_dataset::().create("IDF_version")?; let definition = entry.new_dataset::().create("definition")?; @@ -112,7 +111,6 @@ impl RunFile { let lists = EventRun::new_event_runfile(&entry, nexus_settings)?; Ok(Self { - file, idf_version, start_time, end_time, @@ -133,14 +131,8 @@ impl RunFile { experiment_identifier, }) } - - #[tracing::instrument(skip_all, err(level = "warn"))] - pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result { - let filename = RunParameters::get_hdf5_filename(local_path, run_name); - debug!("File open begin. File: {0}.", filename.display()); - - let file = File::open_rw(filename)?; - + + fn populate_open_runfile(file: &File) -> anyhow::Result { let entry = file.group("raw_data_1")?; let idf_version = entry.dataset("IDF_version")?; @@ -176,7 +168,6 @@ impl RunFile { let lists = EventRun::open_event_runfile(&entry)?; Ok(Self { - file, idf_version, start_time, end_time, @@ -197,6 +188,50 @@ impl RunFile { experiment_identifier, }) } +} + +impl RunFile { + #[tracing::instrument(skip_all, err(level = "warn"))] + pub(crate) fn new_runfile( + path: &Path, + run_name: &str, + nexus_settings: &NexusSettings, + ) -> anyhow::Result { + create_dir_all(path)?; + let filename = RunParameters::get_hdf5_filename(path, run_name); + debug!("File save begin. File: {0}.", filename.display()); + + let file = File::create(filename)?; + match RunFileContents::populate_new_runfile(&file, nexus_settings) { + Ok(contents) => Ok(Self{ + file, + contents + }), + Err(e) => { + file.close()?; + Err(e) + } + } + } + + #[tracing::instrument(skip_all, err(level = "warn"))] + pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result { + let filename = RunParameters::get_hdf5_filename(local_path, run_name); + debug!("File open begin. File: {0}.", filename.display()); + + let file = File::open_rw(filename)?; + match RunFileContents::populate_open_runfile(&file) { + Ok(contents) => Ok(Self{ + file, + contents + }), + Err(e) => { + file.close()?; + Err(e) + }, + } + } + #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn init( @@ -204,38 +239,39 @@ impl RunFile { parameters: &RunParameters, nexus_configuration: &NexusConfiguration, ) -> anyhow::Result<()> { - self.idf_version.write_scalar(&2)?; - self.run_number.write_scalar(¶meters.run_number)?; + + self.contents.idf_version.write_scalar(&2)?; + self.contents.run_number.write_scalar(¶meters.run_number)?; - set_string_to(&self.definition, "muonTD")?; - set_string_to(&self.experiment_identifier, "")?; + set_string_to(&self.contents.definition, "muonTD")?; + set_string_to(&self.contents.experiment_identifier, "")?; - set_string_to(&self.program_name, "SuperMuSR Data Pipeline Nexus Writer")?; - add_attribute_to(&self.program_name, "version", "1.0")?; + set_string_to(&self.contents.program_name, "SuperMuSR Data Pipeline Nexus Writer")?; + add_attribute_to(&self.contents.program_name, "version", "1.0")?; add_attribute_to( - &self.program_name, + &self.contents.program_name, "configuration", &nexus_configuration.configuration, )?; let start_time = parameters.collect_from.format(DATETIME_FORMAT).to_string(); - set_string_to(&self.start_time, &start_time)?; - set_string_to(&self.end_time, "")?; + set_string_to(&self.contents.start_time, &start_time)?; + set_string_to(&self.contents.end_time, "")?; - set_string_to(&self.name, ¶meters.run_name)?; - set_string_to(&self.title, "")?; + set_string_to(&self.contents.name, ¶meters.run_name)?; + set_string_to(&self.contents.title, "")?; - set_string_to(&self.instrument_name, ¶meters.instrument_name)?; + set_string_to(&self.contents.instrument_name, ¶meters.instrument_name)?; - self.period_number.write_scalar(¶meters.num_periods)?; - set_slice_to(&self.period_type, &vec![1; parameters.num_periods as usize])?; + self.contents.period_number.write_scalar(¶meters.num_periods)?; + set_slice_to(&self.contents.period_type, &vec![1; parameters.num_periods as usize])?; - set_string_to(&self.source_name, "MuSR")?; - set_string_to(&self.source_type, "")?; - set_string_to(&self.source_probe, "")?; + set_string_to(&self.contents.source_name, "MuSR")?; + set_string_to(&self.contents.source_type, "")?; + set_string_to(&self.contents.source_probe, "")?; - self.lists.init(¶meters.collect_from)?; + self.contents.lists.init(¶meters.collect_from)?; Ok(()) } @@ -243,7 +279,7 @@ impl RunFile { pub(crate) fn set_end_time(&mut self, end_time: &DateTime) -> anyhow::Result<()> { let end_time = end_time.format(DATETIME_FORMAT).to_string(); - set_string_to(&self.end_time, &end_time)?; + set_string_to(&self.contents.end_time, &end_time)?; Ok(()) } @@ -253,12 +289,12 @@ impl RunFile { logdata: &f144_LogData, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - self.logs.push_logdata_to_runlog(logdata, nexus_settings) + self.contents.logs.push_logdata_to_runlog(logdata, nexus_settings) } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn push_alarm_to_runfile(&mut self, alarm: Alarm) -> anyhow::Result<()> { - self.selogs.push_alarm_to_selog(alarm) + self.contents.selogs.push_alarm_to_selog(alarm) } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] @@ -267,7 +303,7 @@ impl RunFile { selogdata: se00_SampleEnvironmentData, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - self.selogs + self.contents.selogs .push_selogdata_to_selog(&selogdata, nexus_settings) } @@ -276,7 +312,7 @@ impl RunFile { &mut self, message: &FrameAssembledEventListMessage, ) -> anyhow::Result<()> { - self.lists.push_message_to_event_runfile(message) + self.contents.lists.push_message_to_event_runfile(message) } fn try_read_scalar(dataset: &Dataset) -> anyhow::Result { @@ -294,12 +330,12 @@ impl RunFile { #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn extract_run_parameters(&self) -> anyhow::Result { let collect_from: DateTime = - Self::try_read_scalar::(&self.start_time)?.parse()?; - let run_name = Self::try_read_scalar::(&self.name)?.into(); - let run_number = Self::try_read_scalar::(&self.run_number)?; - let num_periods = Self::try_read_scalar::(&self.period_number)?; - let instrument_name = Self::try_read_scalar::(&self.instrument_name)?.into(); - let run_stop_parameters = Self::try_read_scalar::(&self.end_time)? + Self::try_read_scalar::(&self.contents.start_time)?.parse()?; + let run_name = Self::try_read_scalar::(&self.contents.name)?.into(); + let run_number = Self::try_read_scalar::(&self.contents.run_number)?; + let num_periods = Self::try_read_scalar::(&self.contents.period_number)?; + let instrument_name = Self::try_read_scalar::(&self.contents.instrument_name)?.into(); + let run_stop_parameters = Self::try_read_scalar::(&self.contents.end_time)? .parse() .map(|collect_until| RunStopParameters { collect_until, @@ -322,7 +358,7 @@ impl RunFile { stop_time: i32, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - self.logs + self.contents.logs .set_aborted_run_warning(stop_time, nexus_settings)?; Ok(()) } From a1df62c41acefe157bb968abac9711481d8f4813 Mon Sep 17 00:00:00 2001 From: Modularius Date: Wed, 11 Dec 2024 14:04:08 +0000 Subject: [PATCH 3/3] Formatting --- nexus-writer/src/nexus/hdf5_file/run_file.rs | 47 +++++++++++--------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/nexus-writer/src/nexus/hdf5_file/run_file.rs b/nexus-writer/src/nexus/hdf5_file/run_file.rs index 191516b9..c953ed84 100644 --- a/nexus-writer/src/nexus/hdf5_file/run_file.rs +++ b/nexus-writer/src/nexus/hdf5_file/run_file.rs @@ -131,7 +131,7 @@ impl RunFileContents { experiment_identifier, }) } - + fn populate_open_runfile(file: &File) -> anyhow::Result { let entry = file.group("raw_data_1")?; @@ -203,10 +203,7 @@ impl RunFile { let file = File::create(filename)?; match RunFileContents::populate_new_runfile(&file, nexus_settings) { - Ok(contents) => Ok(Self{ - file, - contents - }), + Ok(contents) => Ok(Self { file, contents }), Err(e) => { file.close()?; Err(e) @@ -221,32 +218,32 @@ impl RunFile { let file = File::open_rw(filename)?; match RunFileContents::populate_open_runfile(&file) { - Ok(contents) => Ok(Self{ - file, - contents - }), + Ok(contents) => Ok(Self { file, contents }), Err(e) => { file.close()?; Err(e) - }, + } } } - #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] pub(crate) fn init( &mut self, parameters: &RunParameters, nexus_configuration: &NexusConfiguration, ) -> anyhow::Result<()> { - self.contents.idf_version.write_scalar(&2)?; - self.contents.run_number.write_scalar(¶meters.run_number)?; + self.contents + .run_number + .write_scalar(¶meters.run_number)?; set_string_to(&self.contents.definition, "muonTD")?; set_string_to(&self.contents.experiment_identifier, "")?; - set_string_to(&self.contents.program_name, "SuperMuSR Data Pipeline Nexus Writer")?; + set_string_to( + &self.contents.program_name, + "SuperMuSR Data Pipeline Nexus Writer", + )?; add_attribute_to(&self.contents.program_name, "version", "1.0")?; add_attribute_to( &self.contents.program_name, @@ -264,8 +261,13 @@ impl RunFile { set_string_to(&self.contents.instrument_name, ¶meters.instrument_name)?; - self.contents.period_number.write_scalar(¶meters.num_periods)?; - set_slice_to(&self.contents.period_type, &vec![1; parameters.num_periods as usize])?; + self.contents + .period_number + .write_scalar(¶meters.num_periods)?; + set_slice_to( + &self.contents.period_type, + &vec![1; parameters.num_periods as usize], + )?; set_string_to(&self.contents.source_name, "MuSR")?; set_string_to(&self.contents.source_type, "")?; @@ -289,7 +291,9 @@ impl RunFile { logdata: &f144_LogData, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - self.contents.logs.push_logdata_to_runlog(logdata, nexus_settings) + self.contents + .logs + .push_logdata_to_runlog(logdata, nexus_settings) } #[tracing::instrument(skip_all, level = "trace", err(level = "warn"))] @@ -303,7 +307,8 @@ impl RunFile { selogdata: se00_SampleEnvironmentData, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - self.contents.selogs + self.contents + .selogs .push_selogdata_to_selog(&selogdata, nexus_settings) } @@ -334,7 +339,8 @@ impl RunFile { let run_name = Self::try_read_scalar::(&self.contents.name)?.into(); let run_number = Self::try_read_scalar::(&self.contents.run_number)?; let num_periods = Self::try_read_scalar::(&self.contents.period_number)?; - let instrument_name = Self::try_read_scalar::(&self.contents.instrument_name)?.into(); + let instrument_name = + Self::try_read_scalar::(&self.contents.instrument_name)?.into(); let run_stop_parameters = Self::try_read_scalar::(&self.contents.end_time)? .parse() .map(|collect_until| RunStopParameters { @@ -358,7 +364,8 @@ impl RunFile { stop_time: i32, nexus_settings: &NexusSettings, ) -> anyhow::Result<()> { - self.contents.logs + self.contents + .logs .set_aborted_run_warning(stop_time, nexus_settings)?; Ok(()) }