Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume Run Functionality #273

Merged
merged 13 commits into from
Dec 9, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "4.5", features = ["derive", "env"] }
crossterm = "0.26.1"
flatbuffers = "22.12.6"
glob = "0.3.1"
hdf5 = "0.8.1"
itertools = "0.12.1"
lazy_static = "1.5.0"
Expand Down
1 change: 1 addition & 0 deletions nexus-writer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition.workspace = true
anyhow.workspace = true
chrono.workspace = true
clap.workspace = true
glob.workspace = true
hdf5.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion nexus-writer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ async fn main() -> anyhow::Result<()> {
nexus_settings,
nexus_configuration,
);
nexus_engine.resume_partial_runs()?;

let mut nexus_write_interval =
tokio::time::interval(time::Duration::from_millis(args.cache_poll_interval_ms));
Expand Down Expand Up @@ -389,7 +390,7 @@ fn process_run_stop_message(nexus_engine: &mut NexusEngine, payload: &[u8]) {
}
Err(e) => {
let _guard = warn_span!(
"RunStop Error.",
"RunStop Error",
run_name = data.run_name(),
stop_time = data.stop_time(),
)
Expand Down
62 changes: 47 additions & 15 deletions nexus-writer/src/nexus/engine.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::{Run, RunParameters};
use chrono::{DateTime, Duration, Utc};
use glob::glob;
#[cfg(test)]
use std::collections::vec_deque;
use std::{
collections::VecDeque,
ffi::OsStr,
path::{Path, PathBuf},
};
use supermusr_common::spanned::SpannedAggregator;
Expand All @@ -13,10 +15,10 @@ use supermusr_streaming_types::{
ecs_f144_logdata_generated::f144_LogData, ecs_pl72_run_start_generated::RunStart,
ecs_se00_data_generated::se00_SampleEnvironmentData,
};
use tracing::warn;
use tracing::{info_span, warn};

pub(crate) struct NexusEngine {
filename: Option<PathBuf>,
local_path: Option<PathBuf>,
run_cache: VecDeque<Run>,
run_number: u32,
nexus_settings: NexusSettings,
Expand All @@ -27,12 +29,12 @@ pub(crate) struct NexusEngine {
impl NexusEngine {
#[tracing::instrument(skip_all)]
pub(crate) fn new(
filename: Option<&Path>,
local_path: Option<&Path>,
nexus_settings: NexusSettings,
nexus_configuration: NexusConfiguration,
) -> Self {
Self {
filename: filename.map(ToOwned::to_owned),
local_path: local_path.map(ToOwned::to_owned),
run_cache: Default::default(),
run_number: 0,
nexus_settings,
Expand All @@ -41,6 +43,36 @@ impl NexusEngine {
}
}

pub(crate) fn resume_partial_runs(&mut self) -> anyhow::Result<()> {
if let Some(local_path) = &self.local_path {
let local_path_str = local_path.as_os_str().to_str().ok_or_else(|| {
anyhow::anyhow!("Cannot convert local path to string: {0:?}", local_path)
})?;

for filename in glob(&format!("{local_path_str}/*.nxs"))? {
let filename = filename?;
let filename_str =
filename
.file_stem()
.and_then(OsStr::to_str)
.ok_or_else(|| {
anyhow::anyhow!("Cannot convert filename to string: {0:?}", filename)
})?;
let mut run = info_span!(
"Partial Run Found",
path = local_path_str,
file_name = filename_str
)
.in_scope(|| Run::resume_partial_run(local_path, filename_str))?;
if let Err(e) = run.span_init() {
warn!("Run span initiation failed {e}")
}
self.run_cache.push_back(run);
}
}
Ok(())
}

#[cfg(test)]
fn cache_iter(&self) -> vec_deque::Iter<'_, Run> {
self.run_cache.iter()
Expand All @@ -61,7 +93,7 @@ impl NexusEngine {
.iter_mut()
.find(|run| run.is_message_timestamp_valid(&timestamp))
{
run.push_selogdata(self.filename.as_deref(), data, &self.nexus_settings)?;
run.push_selogdata(self.local_path.as_deref(), data, &self.nexus_settings)?;
Ok(Some(run))
} else {
warn!("No run found for selogdata message with timestamp: {timestamp}");
Expand All @@ -77,7 +109,7 @@ impl NexusEngine {
.iter_mut()
.find(|run| run.is_message_timestamp_valid(&timestamp))
{
run.push_logdata_to_run(self.filename.as_deref(), data, &self.nexus_settings)?;
run.push_logdata_to_run(self.local_path.as_deref(), data, &self.nexus_settings)?;
Ok(Some(run))
} else {
warn!("No run found for logdata message with timestamp: {timestamp}");
Expand All @@ -93,7 +125,7 @@ impl NexusEngine {
.iter_mut()
.find(|run| run.is_message_timestamp_valid(&timestamp))
{
run.push_alarm_to_run(self.filename.as_deref(), data)?;
run.push_alarm_to_run(self.local_path.as_deref(), data)?;
Ok(Some(run))
} else {
warn!("No run found for alarm message with timestamp: {timestamp}");
Expand All @@ -110,7 +142,7 @@ impl NexusEngine {
}

let mut run = Run::new_run(
self.filename.as_deref(),
self.local_path.as_deref(),
RunParameters::new(data, self.run_number)?,
&self.nexus_settings,
&self.nexus_configuration,
Expand All @@ -134,7 +166,7 @@ impl NexusEngine {
.back_mut()
.expect("run_cache::back_mut should exist")
.abort_run(
self.filename.as_deref(),
self.local_path.as_deref(),
data.start_time(),
&self.nexus_settings,
)?;
Expand All @@ -144,7 +176,7 @@ impl NexusEngine {
#[tracing::instrument(skip_all)]
pub(crate) fn stop_command(&mut self, data: RunStop<'_>) -> anyhow::Result<&Run> {
if let Some(last_run) = self.run_cache.back_mut() {
last_run.set_stop_if_valid(self.filename.as_deref(), data)?;
last_run.set_stop_if_valid(self.local_path.as_deref(), data)?;

Ok(last_run)
} else {
Expand Down Expand Up @@ -178,7 +210,7 @@ impl NexusEngine {
.iter_mut()
.find(|run| run.is_message_timestamp_valid(&timestamp))
{
run.push_message(self.filename.as_deref(), message)?;
run.push_message(self.local_path.as_deref(), message)?;
Some(run)
} else {
warn!("No run found for message with timestamp: {timestamp}");
Expand Down Expand Up @@ -211,12 +243,12 @@ impl NexusEngine {
/// Afterwhich the runs are dropped.
#[tracing::instrument(skip_all, level = "debug")]
pub(crate) async fn flush_move_cache(&mut self) {
if let Some((file_name, archive_name)) = Option::zip(
self.filename.as_ref(),
self.nexus_settings.archive_path.as_ref(),
if let Some((local_path, archive_path)) = Option::zip(
self.local_path.as_deref(),
self.nexus_settings.archive_path.as_deref(),
) {
for run in self.run_move_cache.iter() {
match run.move_to_archive(file_name, archive_name) {
match run.move_to_archive(local_path, archive_path) {
Ok(move_to_archive) => move_to_archive.await,
Err(e) => warn!("Error Moving to Archive {e}"),
}
Expand Down
108 changes: 49 additions & 59 deletions nexus-writer/src/nexus/hdf5_file/run_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use super::{
};
use crate::nexus::{
hdf5_file::run_file_components::{RunLog, SeLog},
nexus_class as NX, NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT,
nexus_class as NX,
run_parameters::RunStopParameters,
NexusConfiguration, NexusSettings, RunParameters, DATETIME_FORMAT,
};
use chrono::{DateTime, Duration, Utc};
use hdf5::{types::VarLenUnicode, Dataset, File};
use chrono::{DateTime, Utc};
use hdf5::{types::VarLenUnicode, Dataset, File, H5Type};
use std::{fs::create_dir_all, path::Path};
use supermusr_streaming_types::{
aev2_frame_assembled_event_v2_generated::FrameAssembledEventListMessage,
Expand Down Expand Up @@ -49,17 +51,12 @@ pub(crate) struct RunFile {
impl RunFile {
#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn new_runfile(
filename: &Path,
path: &Path,
run_name: &str,
nexus_settings: &NexusSettings,
) -> anyhow::Result<Self> {
create_dir_all(filename)?;
let filename = {
let mut filename = filename.to_owned();
filename.push(run_name);
filename.set_extension("nxs");
filename
};
create_dir_all(path)?;
let filename = RunParameters::get_hdf5_filename(path, run_name);
debug!("File save begin. File: {0}.", filename.display());

let file = File::create(filename)?;
Expand Down Expand Up @@ -138,13 +135,8 @@ impl RunFile {
}

#[tracing::instrument(skip_all, err(level = "warn"))]
pub(crate) fn open_runfile(filename: &Path, run_name: &str) -> anyhow::Result<Self> {
let filename = {
let mut filename = filename.to_owned();
filename.push(run_name);
filename.set_extension("nxs");
filename
};
pub(crate) fn open_runfile(local_path: &Path, run_name: &str) -> anyhow::Result<Self> {
let filename = RunParameters::get_hdf5_filename(local_path, run_name);
debug!("File open begin. File: {0}.", filename.display());

let file = File::open_rw(filename)?;
Expand Down Expand Up @@ -229,6 +221,7 @@ impl RunFile {
let start_time = parameters.collect_from.format(DATETIME_FORMAT).to_string();

set_string_to(&self.start_time, &start_time)?;
set_string_to(&self.end_time, "")?;

set_string_to(&self.name, &parameters.run_name)?;
set_string_to(&self.title, "")?;
Expand All @@ -254,43 +247,6 @@ impl RunFile {
Ok(())
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn ensure_end_time_is_set(
&mut self,
parameters: &RunParameters,
message: &FrameAssembledEventListMessage,
) -> anyhow::Result<()> {
let end_time = {
if let Some(run_stop_parameters) = &parameters.run_stop_parameters {
run_stop_parameters.collect_until
} else {
let time = message
.time()
.ok_or(anyhow::anyhow!("Event time missing."))?;

let ms = if time.is_empty() {
0
} else {
time.get(time.len() - 1).div_ceil(1_000_000).into()
};

let duration = Duration::try_milliseconds(ms)
.ok_or(anyhow::anyhow!("Invalid duration {ms}ms."))?;

let timestamp: DateTime<Utc> = (*message
.metadata()
.timestamp()
.ok_or(anyhow::anyhow!("Message timestamp missing."))?)
.try_into()?;

timestamp
.checked_add_signed(duration)
.ok_or(anyhow::anyhow!("Unable to add {duration} to {timestamp}"))?
}
};
self.set_end_time(&end_time)
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn push_logdata_to_runfile(
&mut self,
Expand Down Expand Up @@ -318,12 +274,46 @@ impl RunFile {
#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn push_message_to_runfile(
&mut self,
parameters: &RunParameters,
message: &FrameAssembledEventListMessage,
) -> anyhow::Result<()> {
self.lists.push_message_to_event_runfile(message)?;
self.ensure_end_time_is_set(parameters, message)?;
Ok(())
self.lists.push_message_to_event_runfile(message)
}

fn try_read_scalar<T: H5Type>(dataset: &Dataset) -> anyhow::Result<T> {
if dataset.storage_size() != 0 {
if dataset.is_scalar() {
Ok(dataset.read_scalar::<T>()?)
} else {
anyhow::bail!("{} is not a scalar", dataset.name())
}
} else {
anyhow::bail!("{} is not allocated", dataset.name())
}
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
pub(crate) fn extract_run_parameters(&self) -> anyhow::Result<RunParameters> {
let collect_from: DateTime<Utc> =
Self::try_read_scalar::<VarLenUnicode>(&self.start_time)?.parse()?;
let run_name = Self::try_read_scalar::<VarLenUnicode>(&self.name)?.into();
let run_number = Self::try_read_scalar::<u32>(&self.run_number)?;
let num_periods = Self::try_read_scalar::<u32>(&self.period_number)?;
let instrument_name = Self::try_read_scalar::<VarLenUnicode>(&self.instrument_name)?.into();
let run_stop_parameters = Self::try_read_scalar::<VarLenUnicode>(&self.end_time)?
.parse()
.map(|collect_until| RunStopParameters {
collect_until,
last_modified: Utc::now(),
})
.ok();
Ok(RunParameters {
collect_from,
run_stop_parameters,
num_periods,
run_name,
run_number,
instrument_name,
})
}

#[tracing::instrument(skip_all, level = "trace", err(level = "warn"))]
Expand Down
Loading
Loading