Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve simulator functionality #249

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions simulator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ serde.workspace = true
serde_json.workspace = true
supermusr-common.workspace = true
supermusr-streaming-types.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

Expand Down
25 changes: 12 additions & 13 deletions simulator/src/integrated/build_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,17 @@ pub(crate) fn build_trace_message(
digitizer_id: DigitizerId,
channels: &[Channel],
selection_mode: SelectionModeOptions,
) -> Option<()> {
) {
let channels = channels
.iter()
.map(|&channel| {
info_span!(target: "otel", "channel", channel = channel).in_scope(|| {
let trace = cache.extract_one(selection_mode);
tracing::Span::current().follows_from(trace.span().get().unwrap());
let voltage = Some(fbb.create_vector::<Intensity>(trace));

tracing::Span::current()
.follows_from(trace.span().get().expect("Span should be initialised"));
let voltage = Some(fbb.create_vector::<Intensity>(trace.get_intensities()));

cache.finish_one(selection_mode);
ChannelTrace::create(fbb, &ChannelTraceArgs { channel, voltage })
})
Expand All @@ -73,8 +76,6 @@ pub(crate) fn build_trace_message(
};
let message = DigitizerAnalogTraceMessage::create(fbb, &message);
finish_digitizer_analog_trace_message_buffer(fbb, message);

Some(())
}

pub(crate) fn build_digitiser_event_list_message(
Expand All @@ -84,7 +85,7 @@ pub(crate) fn build_digitiser_event_list_message(
digitizer_id: DigitizerId,
channels: &[Channel],
source_options: &SourceOptions,
) -> anyhow::Result<()> {
) {
let mut time = Vec::<Time>::new();
let mut voltage = Vec::<Intensity>::new();
let mut channel = Vec::<Channel>::new();
Expand All @@ -96,7 +97,8 @@ pub(crate) fn build_digitiser_event_list_message(
.zip(event_lists)
.for_each(|(c, event_list)| {
info_span!(target: "otel", "channel", channel = c).in_scope(|| {
tracing::Span::current().follows_from(event_list.span().get().unwrap());
tracing::Span::current()
.follows_from(event_list.span().get().expect("Span exists"));
event_list.pulses.iter().for_each(|p| {
time.push(p.time());
voltage.push(p.intensity());
Expand All @@ -119,8 +121,6 @@ pub(crate) fn build_digitiser_event_list_message(
};
let message = DigitizerEventListMessage::create(fbb, &message);
finish_digitizer_event_list_message_buffer(fbb, message);

Ok(())
}

pub(crate) fn build_aggregated_event_list_message(
Expand All @@ -129,7 +129,7 @@ pub(crate) fn build_aggregated_event_list_message(
metadata: &FrameMetadata,
channels: &[Channel],
source_options: &SourceOptions,
) -> anyhow::Result<()> {
) {
let mut time = Vec::<Time>::new();
let mut voltage = Vec::<Intensity>::new();
let mut channel = Vec::<Channel>::new();
Expand All @@ -141,7 +141,8 @@ pub(crate) fn build_aggregated_event_list_message(
.zip(event_lists)
.for_each(|(c, event_list)| {
info_span!(target: "otel", "channel", channel = c).in_scope(|| {
tracing::Span::current().follows_from(event_list.span().get().unwrap());
tracing::Span::current()
.follows_from(event_list.span().get().expect("Span exists"));
event_list.pulses.iter().for_each(|p| {
time.push(p.time());
voltage.push(p.intensity());
Expand All @@ -163,6 +164,4 @@ pub(crate) fn build_aggregated_event_list_message(
};
let message = FrameAssembledEventListMessage::create(fbb, &message);
finish_frame_assembled_event_list_message_buffer(fbb, message);

Ok(())
}
15 changes: 13 additions & 2 deletions simulator/src/integrated/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ pub(crate) mod simulation_engine;

use crate::{Cli, Defined};
use rdkafka::producer::FutureProducer;
use simulation::Simulation;
use simulation::{Simulation, SimulationError};
use simulation_engine::{run_schedule, SimulationEngine, SimulationEngineExternals};
use std::fs::File;
use thiserror::Error;
use tokio::task::JoinSet;
use tracing::{error, trace};

Expand All @@ -23,12 +24,22 @@ pub(crate) struct Topics<'a> {
pub(crate) alarm: &'a str,
}

#[derive(Debug, Error)]
pub(crate) enum ConfiguredError {
#[error("Simulation Error: {0}")]
Simulation(#[from] SimulationError),
#[error("Json Error: {0}")]
Json(#[from] serde_json::Error),
#[error("File Error: {0}")]
IO(#[from] std::io::Error),
}

pub(crate) async fn run_configured_simulation(
use_otel: bool,
cli: &Cli,
producer: &FutureProducer,
defined: Defined,
) -> anyhow::Result<()> {
) -> Result<(), ConfiguredError> {
let Defined { file, .. } = defined;

let simulation: Simulation = serde_json::from_reader(File::open(file)?)?;
Expand Down
78 changes: 43 additions & 35 deletions simulator/src/integrated/send_messages.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
use super::build_messages::{
build_aggregated_event_list_message, build_digitiser_event_list_message, build_trace_message,
};
use crate::{
integrated::{
build_messages::{
build_aggregated_event_list_message, build_digitiser_event_list_message,
build_trace_message,
},
simulation_elements::{
run_messages::{
SendAlarm, SendRunLogData, SendRunStart, SendRunStop, SendSampleEnvLog,
},
EventList, Trace,
},
simulation_engine::actions::{SelectionModeOptions, SourceOptions},
simulation_engine::SimulationEngineExternals,
simulation_engine::{
actions::{SelectionModeOptions, SourceOptions},
SimulationEngineExternals,
},
},
runs::{runlog, sample_environment},
runs::{runlog, sample_environment, RunCommandError},
};
use chrono::{DateTime, Utc};
use rdkafka::{
producer::{FutureProducer, FutureRecord},
util::Timeout,
Message,
};
use std::{collections::VecDeque, time::Duration};
use std::{collections::VecDeque, num::TryFromIntError, time::Duration};
use supermusr_common::{tracer::FutureRecordTracerExt, Channel, DigitizerId};
use supermusr_streaming_types::{
ecs_6s4t_run_stop_generated::{finish_run_stop_buffer, RunStop, RunStopArgs},
Expand All @@ -34,8 +37,19 @@ use supermusr_streaming_types::{
flatbuffers::FlatBufferBuilder,
FrameMetadata,
};
use thiserror::Error;
use tracing::{debug, debug_span, error, Span};

#[derive(Debug, Error)]
pub(crate) enum SendError {
#[error("Run Command Error: {0}")]
RunCommand(#[from] RunCommandError),
#[error("Int Conversion Error: {0}")]
TryFromInt(#[from] TryFromIntError),
#[error("Timestamp cannot be Converted to Nanos: {0}")]
TimestampToNanos(DateTime<Utc>),
}

struct SendMessageArgs<'a> {
use_otel: bool,
producer: FutureProducer,
Expand Down Expand Up @@ -84,29 +98,27 @@ async fn send_message(args: SendMessageArgs<'_>) {
};
}

fn get_time_since_epoch_ms(
timestamp: &DateTime<Utc>,
) -> anyhow::Result<u64, <i64 as TryInto<u64>>::Error> {
timestamp.timestamp_millis().try_into()
fn get_time_since_epoch_ms(timestamp: &DateTime<Utc>) -> Result<u64, SendError> {
Ok(timestamp.timestamp_millis().try_into()?)
}

fn get_time_since_epoch_ns(timestamp: &DateTime<Utc>) -> anyhow::Result<i64> {
fn get_time_since_epoch_ns(timestamp: &DateTime<Utc>) -> Result<i64, SendError> {
timestamp
.timestamp_nanos_opt()
.ok_or(anyhow::anyhow!("Invalid Run Log Timestamp {timestamp}"))
.ok_or(SendError::TimestampToNanos(*timestamp))
}

#[tracing::instrument(skip_all, target = "otel")]
pub(crate) fn send_run_start_command(
externals: &mut SimulationEngineExternals,
status: &SendRunStart,
timestamp: &DateTime<Utc>,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();
let run_start = RunStartArgs {
start_time: get_time_since_epoch_ms(timestamp)?,
run_name: Some(fbb.create_string(&status.name)),
instrument_name: Some(fbb.create_string(&status.instrument)),
run_name: Some(fbb.create_string(&status.name.value())),
instrument_name: Some(fbb.create_string(&status.instrument.value())),
..Default::default()
};
let message = RunStart::create(&mut fbb, &run_start);
Expand All @@ -130,11 +142,11 @@ pub(crate) fn send_run_stop_command(
externals: &mut SimulationEngineExternals,
status: &SendRunStop,
timestamp: &DateTime<Utc>,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();
let run_stop = RunStopArgs {
stop_time: get_time_since_epoch_ms(timestamp)?,
run_name: Some(fbb.create_string(&status.name)),
run_name: Some(fbb.create_string(&status.name.value())),
..Default::default()
};
let message = RunStop::create(&mut fbb, &run_stop);
Expand All @@ -158,12 +170,12 @@ pub(crate) fn send_run_log_command(
externals: &mut SimulationEngineExternals,
timestamp: &DateTime<Utc>,
status: &SendRunLogData,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let value_type = status.value_type.clone().into();

let mut fbb = FlatBufferBuilder::new();
let run_log_args = f144_LogDataArgs {
source_name: Some(fbb.create_string(&status.source_name)),
source_name: Some(fbb.create_string(&status.source_name.value())),
timestamp: get_time_since_epoch_ns(timestamp)?,
value_type,
value: Some(runlog::make_value(&mut fbb, value_type, &status.value)?),
Expand All @@ -189,7 +201,7 @@ pub(crate) fn send_se_log_command(
externals: &mut SimulationEngineExternals,
timestamp: &DateTime<Utc>,
sample_env: &SendSampleEnvLog,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();

let timestamp_location = sample_env.location.clone().into();
Expand All @@ -214,7 +226,7 @@ pub(crate) fn send_se_log_command(
));

let se_log_args = se00_SampleEnvironmentDataArgs {
name: Some(fbb.create_string(&sample_env.name)),
name: Some(fbb.create_string(&sample_env.name.value())),
channel: sample_env.channel.unwrap_or(-1),
time_delta: sample_env.time_delta.unwrap_or(0.0),
timestamp_location,
Expand Down Expand Up @@ -245,11 +257,11 @@ pub(crate) fn send_alarm_command(
externals: &mut SimulationEngineExternals,
timestamp: &DateTime<Utc>,
alarm: &SendAlarm,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();
let severity = alarm.severity.clone().into();
let alarm_args = AlarmArgs {
source_name: Some(fbb.create_string(&alarm.source_name)),
source_name: Some(fbb.create_string(&alarm.source_name.value())),
timestamp: get_time_since_epoch_ns(timestamp)?,
severity,
message: Some(fbb.create_string(&alarm.message)),
Expand All @@ -271,15 +283,15 @@ pub(crate) fn send_alarm_command(
}

#[tracing::instrument(skip_all, target = "otel", fields(digitizer_id = digitizer_id))]
pub(crate) fn send_trace_message(
pub(crate) fn send_digitiser_trace_message(
externals: &mut SimulationEngineExternals,
sample_rate: u64,
cache: &mut VecDeque<Trace>,
metadata: &FrameMetadata,
digitizer_id: DigitizerId,
channels: &[Channel],
selection_mode: SelectionModeOptions,
) -> anyhow::Result<()> {
) {
let mut fbb = FlatBufferBuilder::new();

build_trace_message(
Expand All @@ -290,8 +302,7 @@ pub(crate) fn send_trace_message(
digitizer_id,
channels,
selection_mode,
)
.unwrap();
);

let send_args = SendMessageArgs::new(
externals.use_otel,
Expand All @@ -303,7 +314,6 @@ pub(crate) fn send_trace_message(
externals
.kafka_producer_thread_set
.spawn(send_message(send_args));
Ok(())
}

#[tracing::instrument(skip_all, target = "otel", fields(digitizer_id = digitizer_id))]
Expand All @@ -314,7 +324,7 @@ pub(crate) fn send_digitiser_event_list_message(
digitizer_id: DigitizerId,
channels: &[Channel],
source_options: &SourceOptions,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();

build_digitiser_event_list_message(
Expand All @@ -324,8 +334,7 @@ pub(crate) fn send_digitiser_event_list_message(
digitizer_id,
channels,
source_options,
)
.unwrap();
);

let send_args = SendMessageArgs::new(
externals.use_otel,
Expand All @@ -347,11 +356,10 @@ pub(crate) fn send_aggregated_frame_event_list_message(
metadata: &FrameMetadata,
channels: &[Channel],
source_options: &SourceOptions,
) -> anyhow::Result<()> {
) -> Result<(), SendError> {
let mut fbb = FlatBufferBuilder::new();

build_aggregated_event_list_message(&mut fbb, cache, metadata, channels, source_options)
.unwrap();
build_aggregated_event_list_message(&mut fbb, cache, metadata, channels, source_options);

let send_args = SendMessageArgs::new(
externals.use_otel,
Expand Down
Loading