Skip to content

Commit

Permalink
Improve simulator functionality (#249)
Browse files Browse the repository at this point in the history
## Summary of changes

1. Changed `unwrap`s to `expect`s in `build_messages.rs`
2. Changed `send_trace_message` to `send_digitiser_trace_message` and
integrated edits in `send_messages.rs`, `simulation.rs`, `engine.rs` and
others.
3. Added `Trace` struct for channel traces in `event_lists.rs` and
refactored
4. Added `IntConstant` and `TextConstant` types
5. Added `EnsureDelayMs` action
6. Improved error handling

## Instruction for review/testing

General code review as the simulator was successfully tested in the
pipeline.
  • Loading branch information
Modularius authored Sep 10, 2024
1 parent 70d5e60 commit 88e4cc0
Show file tree
Hide file tree
Showing 19 changed files with 678 additions and 492 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions simulator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ serde.workspace = true
serde_json.workspace = true
supermusr-common.workspace = true
supermusr-streaming-types.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

Expand Down
25 changes: 12 additions & 13 deletions simulator/src/integrated/build_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,17 @@ pub(crate) fn build_trace_message(
digitizer_id: DigitizerId,
channels: &[Channel],
selection_mode: SelectionModeOptions,
) -> Option<()> {
) {
let channels = channels
.iter()
.map(|&channel| {
info_span!(target: "otel", "channel", channel = channel).in_scope(|| {
let trace = cache.extract_one(selection_mode);
tracing::Span::current().follows_from(trace.span().get().unwrap());
let voltage = Some(fbb.create_vector::<Intensity>(trace));

tracing::Span::current()
.follows_from(trace.span().get().expect("Span should be initialised"));
let voltage = Some(fbb.create_vector::<Intensity>(trace.get_intensities()));

cache.finish_one(selection_mode);
ChannelTrace::create(fbb, &ChannelTraceArgs { channel, voltage })
})
Expand All @@ -73,8 +76,6 @@ pub(crate) fn build_trace_message(
};
let message = DigitizerAnalogTraceMessage::create(fbb, &message);
finish_digitizer_analog_trace_message_buffer(fbb, message);

Some(())
}

pub(crate) fn build_digitiser_event_list_message(
Expand All @@ -84,7 +85,7 @@ pub(crate) fn build_digitiser_event_list_message(
digitizer_id: DigitizerId,
channels: &[Channel],
source_options: &SourceOptions,
) -> anyhow::Result<()> {
) {
let mut time = Vec::<Time>::new();
let mut voltage = Vec::<Intensity>::new();
let mut channel = Vec::<Channel>::new();
Expand All @@ -96,7 +97,8 @@ pub(crate) fn build_digitiser_event_list_message(
.zip(event_lists)
.for_each(|(c, event_list)| {
info_span!(target: "otel", "channel", channel = c).in_scope(|| {
tracing::Span::current().follows_from(event_list.span().get().unwrap());
tracing::Span::current()
.follows_from(event_list.span().get().expect("Span exists"));
event_list.pulses.iter().for_each(|p| {
time.push(p.time());
voltage.push(p.intensity());
Expand All @@ -119,8 +121,6 @@ pub(crate) fn build_digitiser_event_list_message(
};
let message = DigitizerEventListMessage::create(fbb, &message);
finish_digitizer_event_list_message_buffer(fbb, message);

Ok(())
}

pub(crate) fn build_aggregated_event_list_message(
Expand All @@ -129,7 +129,7 @@ pub(crate) fn build_aggregated_event_list_message(
metadata: &FrameMetadata,
channels: &[Channel],
source_options: &SourceOptions,
) -> anyhow::Result<()> {
) {
let mut time = Vec::<Time>::new();
let mut voltage = Vec::<Intensity>::new();
let mut channel = Vec::<Channel>::new();
Expand All @@ -141,7 +141,8 @@ pub(crate) fn build_aggregated_event_list_message(
.zip(event_lists)
.for_each(|(c, event_list)| {
info_span!(target: "otel", "channel", channel = c).in_scope(|| {
tracing::Span::current().follows_from(event_list.span().get().unwrap());
tracing::Span::current()
.follows_from(event_list.span().get().expect("Span exists"));
event_list.pulses.iter().for_each(|p| {
time.push(p.time());
voltage.push(p.intensity());
Expand All @@ -163,6 +164,4 @@ pub(crate) fn build_aggregated_event_list_message(
};
let message = FrameAssembledEventListMessage::create(fbb, &message);
finish_frame_assembled_event_list_message_buffer(fbb, message);

Ok(())
}
15 changes: 13 additions & 2 deletions simulator/src/integrated/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ pub(crate) mod simulation_engine;

use crate::{Cli, Defined};
use rdkafka::producer::FutureProducer;
use simulation::Simulation;
use simulation::{Simulation, SimulationError};
use simulation_engine::{run_schedule, SimulationEngine, SimulationEngineExternals};
use std::fs::File;
use thiserror::Error;
use tokio::task::JoinSet;
use tracing::{error, trace};

Expand All @@ -23,12 +24,22 @@ pub(crate) struct Topics<'a> {
pub(crate) alarm: &'a str,
}

#[derive(Debug, Error)]
pub(crate) enum ConfiguredError {
#[error("Simulation Error: {0}")]
Simulation(#[from] SimulationError),
#[error("Json Error: {0}")]
Json(#[from] serde_json::Error),
#[error("File Error: {0}")]
IO(#[from] std::io::Error),
}

pub(crate) async fn run_configured_simulation(
use_otel: bool,
cli: &Cli,
producer: &FutureProducer,
defined: Defined,
) -> anyhow::Result<()> {
) -> Result<(), ConfiguredError> {
let Defined { file, .. } = defined;

let simulation: Simulation = serde_json::from_reader(File::open(file)?)?;
Expand Down
78 changes: 43 additions & 35 deletions simulator/src/integrated/send_messages.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
use super::build_messages::{
build_aggregated_event_list_message, build_digitiser_event_list_message, build_trace_message,
};
use crate::{
integrated::{
build_messages::{
build_aggregated_event_list_message, build_digitiser_event_list_message,
build_trace_message,
},
simulation_elements::{
run_messages::{
SendAlarm, SendRunLogData, SendRunStart, SendRunStop, SendSampleEnvLog,
},
EventList, Trace,
},
simulation_engine::actions::{SelectionModeOptions, SourceOptions},
simulation_engine::SimulationEngineExternals,
simulation_engine::{
actions::{SelectionModeOptions, SourceOptions},
SimulationEngineExternals,
},
},
runs::{runlog, sample_environment},
runs::{runlog, sample_environment, RunCommandError},
};
use chrono::{DateTime, Utc};
use rdkafka::{
producer::{FutureProducer, FutureRecord},
util::Timeout,
Message,
};
use std::{collections::VecDeque, time::Duration};
use std::{collections::VecDeque, num::TryFromIntError, time::Duration};
use supermusr_common::{tracer::FutureRecordTracerExt, Channel, DigitizerId};
use supermusr_streaming_types::{
ecs_6s4t_run_stop_generated::{finish_run_stop_buffer, RunStop, RunStopArgs},
Expand All @@ -34,8 +37,19 @@ use supermusr_streaming_types::{
flatbuffers::FlatBufferBuilder,
FrameMetadata,
};
use thiserror::Error;
use tracing::{debug, debug_span, error, Span};

#[derive(Debug, Error)]
pub(crate) enum SendError {
#[error("Run Command Error: {0}")]
RunCommand(#[from] RunCommandError),
#[error("Int Conversion Error: {0}")]
TryFromInt(#[from] TryFromIntError),
#[error("Timestamp cannot be Converted to Nanos: {0}")]
TimestampToNanos(DateTime<Utc>),
}

struct SendMessageArgs<'a> {
use_otel: bool,
producer: FutureProducer,
Expand Down Expand Up @@ -84,29 +98,27 @@ async fn send_message(args: SendMessageArgs<'_>) {
};
}

fn get_time_since_epoch_ms(
timestamp: &DateTime<Utc>,
) -> anyhow::Result<u64, <i64 as TryInto<u64>>::Error> {
timestamp.timestamp_millis().try_into()
fn get_time_since_epoch_ms(timestamp: &DateTime<Utc>) -> Result<u64, SendError> {
Ok(timestamp.timestamp_millis().try_into()?)
}

fn get_time_since_epoch_ns(timestamp: &DateTime<Utc>) -> anyhow::Result<i64> {
fn get_time_since_epoch_ns(timestamp: &DateTime<Utc>) -> Result<i64, SendError> {
timestamp
.timestamp_nanos_opt()
.ok_or(anyhow::anyhow!("Invalid Run Log Timestamp {timestamp}"))
.ok_or(SendError::TimestampToNanos(*timestamp))
}

#[tracing::instrument(skip_all, target = "otel")]
pub(crate) fn send_run_start_command(
externals: &mut SimulationEngineExternals,
status: &SendRunStart,
timestamp: &DateTime<Utc>,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();
let run_start = RunStartArgs {
start_time: get_time_since_epoch_ms(timestamp)?,
run_name: Some(fbb.create_string(&status.name)),
instrument_name: Some(fbb.create_string(&status.instrument)),
run_name: Some(fbb.create_string(&status.name.value())),
instrument_name: Some(fbb.create_string(&status.instrument.value())),
..Default::default()
};
let message = RunStart::create(&mut fbb, &run_start);
Expand All @@ -130,11 +142,11 @@ pub(crate) fn send_run_stop_command(
externals: &mut SimulationEngineExternals,
status: &SendRunStop,
timestamp: &DateTime<Utc>,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();
let run_stop = RunStopArgs {
stop_time: get_time_since_epoch_ms(timestamp)?,
run_name: Some(fbb.create_string(&status.name)),
run_name: Some(fbb.create_string(&status.name.value())),
..Default::default()
};
let message = RunStop::create(&mut fbb, &run_stop);
Expand All @@ -158,12 +170,12 @@ pub(crate) fn send_run_log_command(
externals: &mut SimulationEngineExternals,
timestamp: &DateTime<Utc>,
status: &SendRunLogData,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let value_type = status.value_type.clone().into();

let mut fbb = FlatBufferBuilder::new();
let run_log_args = f144_LogDataArgs {
source_name: Some(fbb.create_string(&status.source_name)),
source_name: Some(fbb.create_string(&status.source_name.value())),
timestamp: get_time_since_epoch_ns(timestamp)?,
value_type,
value: Some(runlog::make_value(&mut fbb, value_type, &status.value)?),
Expand All @@ -189,7 +201,7 @@ pub(crate) fn send_se_log_command(
externals: &mut SimulationEngineExternals,
timestamp: &DateTime<Utc>,
sample_env: &SendSampleEnvLog,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();

let timestamp_location = sample_env.location.clone().into();
Expand All @@ -214,7 +226,7 @@ pub(crate) fn send_se_log_command(
));

let se_log_args = se00_SampleEnvironmentDataArgs {
name: Some(fbb.create_string(&sample_env.name)),
name: Some(fbb.create_string(&sample_env.name.value())),
channel: sample_env.channel.unwrap_or(-1),
time_delta: sample_env.time_delta.unwrap_or(0.0),
timestamp_location,
Expand Down Expand Up @@ -245,11 +257,11 @@ pub(crate) fn send_alarm_command(
externals: &mut SimulationEngineExternals,
timestamp: &DateTime<Utc>,
alarm: &SendAlarm,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();
let severity = alarm.severity.clone().into();
let alarm_args = AlarmArgs {
source_name: Some(fbb.create_string(&alarm.source_name)),
source_name: Some(fbb.create_string(&alarm.source_name.value())),
timestamp: get_time_since_epoch_ns(timestamp)?,
severity,
message: Some(fbb.create_string(&alarm.message)),
Expand All @@ -271,15 +283,15 @@ pub(crate) fn send_alarm_command(
}

#[tracing::instrument(skip_all, target = "otel", fields(digitizer_id = digitizer_id))]
pub(crate) fn send_trace_message(
pub(crate) fn send_digitiser_trace_message(
externals: &mut SimulationEngineExternals,
sample_rate: u64,
cache: &mut VecDeque<Trace>,
metadata: &FrameMetadata,
digitizer_id: DigitizerId,
channels: &[Channel],
selection_mode: SelectionModeOptions,
) -> anyhow::Result<()> {
) {
let mut fbb = FlatBufferBuilder::new();

build_trace_message(
Expand All @@ -290,8 +302,7 @@ pub(crate) fn send_trace_message(
digitizer_id,
channels,
selection_mode,
)
.unwrap();
);

let send_args = SendMessageArgs::new(
externals.use_otel,
Expand All @@ -303,7 +314,6 @@ pub(crate) fn send_trace_message(
externals
.kafka_producer_thread_set
.spawn(send_message(send_args));
Ok(())
}

#[tracing::instrument(skip_all, target = "otel", fields(digitizer_id = digitizer_id))]
Expand All @@ -314,7 +324,7 @@ pub(crate) fn send_digitiser_event_list_message(
digitizer_id: DigitizerId,
channels: &[Channel],
source_options: &SourceOptions,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();

build_digitiser_event_list_message(
Expand All @@ -324,8 +334,7 @@ pub(crate) fn send_digitiser_event_list_message(
digitizer_id,
channels,
source_options,
)
.unwrap();
);

let send_args = SendMessageArgs::new(
externals.use_otel,
Expand All @@ -347,11 +356,10 @@ pub(crate) fn send_aggregated_frame_event_list_message(
metadata: &FrameMetadata,
channels: &[Channel],
source_options: &SourceOptions,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();

build_aggregated_event_list_message(&mut fbb, cache, metadata, channels, source_options)
.unwrap();
build_aggregated_event_list_message(&mut fbb, cache, metadata, channels, source_options);

let send_args = SendMessageArgs::new(
externals.use_otel,
Expand Down
Loading

0 comments on commit 88e4cc0

Please sign in to comment.