Skip to content

Commit

Permalink
Fixes flushed runs index bug (#280)
Browse files Browse the repository at this point in the history
## Summary of changes

Bug fix.

In very particular circumstances a vector is accessed out of bounds.
When two runs are present in the run cache, and complete at the same
time the `flush` method will likely try to access the `run_cache` deque
out of bounds.

This bug was missed due to non-idiomatic code being used. It has been
replaced and a suitable test added to check this (albeit unlikely) use
case.

## Instruction for review/testing

General code review.

Tested on simulated data.
  • Loading branch information
Modularius authored Nov 26, 2024
1 parent e3aa0b5 commit 5367255
Showing 1 changed file with 47 additions and 17 deletions.
64 changes: 47 additions & 17 deletions nexus-writer/src/nexus/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,24 +174,19 @@ impl NexusEngine {

#[tracing::instrument(skip_all, level = "debug")]
pub(crate) fn flush(&mut self, delay: &Duration) {
// Get Indices of all completed run
let removed_indices: Vec<_> = self
.run_cache
.iter()
.enumerate()
.filter_map(|(index, run)| run.has_completed(delay).then_some(index))
.collect();

// Remove all runs found to be completed, and place them in self.run_move_cache
for index in removed_indices {
let run = self
.run_cache
.remove(index)
.expect("Index should be within bounds");
if let Err(e) = run.end_span() {
warn!("Run span drop failed {e}")
// Moves the runs into a new vector, then consumes it,
// directing completed runs to self.run_move_cache
// and incomplete ones back to self.run_cache
let temp: Vec<_> = self.run_cache.drain(..).collect();
for run in temp.into_iter() {
if run.has_completed(delay) {
if let Err(e) = run.end_span() {
warn!("Run span drop failed {e}")
}
self.run_move_cache.push(run);
} else {
self.run_cache.push_back(run);
}
self.run_move_cache.push(run);
}
}

Expand Down Expand Up @@ -409,6 +404,41 @@ mod test {
nexus.flush(&Duration::zero());
assert_eq!(nexus.cache_iter().len(), 0);
}

#[test]
fn two_runs_flushed() {
let mut nexus = NexusEngine::new(
None,
NexusSettings::default(),
NexusConfiguration::new(None),
);
let mut fbb = FlatBufferBuilder::new();

let ts_start: DateTime<Utc> = GpsTime::new(0, 1, 0, 0, 15, 0, 0, 0).try_into().unwrap();
let ts_end: DateTime<Utc> = GpsTime::new(0, 1, 0, 0, 17, 0, 0, 0).try_into().unwrap();

let start = create_start(&mut fbb, "TestRun1", ts_start.timestamp_millis() as u64).unwrap();
nexus.start_command(start).unwrap();

fbb.reset();
let stop = create_stop(&mut fbb, "TestRun1", ts_end.timestamp_millis() as u64).unwrap();
nexus.stop_command(stop).unwrap();

assert_eq!(nexus.cache_iter().len(), 1);

fbb.reset();
let start = create_start(&mut fbb, "TestRun2", ts_start.timestamp_millis() as u64).unwrap();
nexus.start_command(start).unwrap();

fbb.reset();
let stop = create_stop(&mut fbb, "TestRun2", ts_end.timestamp_millis() as u64).unwrap();
nexus.stop_command(stop).unwrap();

assert_eq!(nexus.cache_iter().len(), 2);

nexus.flush(&Duration::zero());
assert_eq!(nexus.cache_iter().len(), 0);
}
}

#[derive(Default, Debug)]
Expand Down

0 comments on commit 5367255

Please sign in to comment.