Skip to content

Commit

Permalink
Merge branch 'main' into documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
cooperq committed Jun 6, 2024
2 parents 0d0cad2 + dfd8138 commit f261940
Show file tree
Hide file tree
Showing 18 changed files with 593 additions and 276 deletions.
11 changes: 7 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ edition = "2021"

[[bin]]
name = "rayhunter-daemon"
path = "src/main.rs"
path = "src/daemon.rs"

[[bin]]
name = "rayhunter-check"
path = "src/check.rs"

[dependencies]
rayhunter = { path = "../lib" }
Expand All @@ -25,3 +29,5 @@ tempdir = "0.3.7"
chrono = { version = "0.4.31", features = ["serde"] }
tokio-stream = "0.1.14"
futures = "0.3.30"
clap = { version = "4.5.2", features = ["derive"] }
serde_json = "1.0.114"
31 changes: 31 additions & 0 deletions bin/src/check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::{future, path::PathBuf, pin::pin};
use rayhunter::{analysis::analyzer::Harness, diag::DataType, qmdl::QmdlReader};
use tokio::fs::File;
use clap::Parser;
use futures::TryStreamExt;

#[derive(Parser, Debug)]
#[command(version, about)]
struct Args {
#[arg(short, long)]
qmdl_path: PathBuf,
}

#[tokio::main]
async fn main() {
env_logger::init();
let args = Args::parse();

let mut harness = Harness::new_with_all_analyzers();

let qmdl_file = File::open(args.qmdl_path).await.expect("failed to open QMDL file");
let file_size = qmdl_file.metadata().await.expect("failed to get QMDL file metadata").len();
let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(file_size as usize));
let mut qmdl_stream = pin!(qmdl_reader.as_stream()
.try_filter(|container| future::ready(container.data_type == DataType::UserSpace)));
println!("{}\n", serde_json::to_string(&harness.get_metadata()).expect("failed to serialize report metadata"));
while let Some(container) = qmdl_stream.try_next().await.expect("failed getting QMDL container") {
let row = harness.analyze_qmdl_messages(container);
println!("{}\n", serde_json::to_string(&row).expect("failed to serialize row"));
}
}
19 changes: 10 additions & 9 deletions bin/src/main.rs → bin/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ mod diag;

use crate::config::{parse_config, parse_args};
use crate::diag::run_diag_read_thread;
use crate::qmdl_store::QmdlStore;
use crate::qmdl_store::RecordingStore;
use crate::server::{ServerState, get_qmdl, serve_static};
use crate::pcap::get_pcap;
use crate::stats::get_system_stats;
use crate::error::RayhunterError;

use axum::response::Redirect;
use diag::{DiagDeviceCtrlMessage, start_recording, stop_recording};
use diag::{get_analysis_report, start_recording, stop_recording, DiagDeviceCtrlMessage};
use log::{info, error};
use rayhunter::diag_device::DiagDevice;
use axum::routing::{get, post};
Expand All @@ -35,14 +35,14 @@ use std::sync::Arc;
async fn run_server(
task_tracker: &TaskTracker,
config: &config::Config,
qmdl_store_lock: Arc<RwLock<QmdlStore>>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>,
server_shutdown_rx: oneshot::Receiver<()>,
diag_device_sender: Sender<DiagDeviceCtrlMessage>
) -> JoinHandle<()> {
let state = Arc::new(ServerState {
qmdl_store_lock,
diag_device_ctrl_sender: diag_device_sender,
readonly_mode: config.readonly_mode,
readonly_mode: config.readonly_mode
});

let app = Router::new()
Expand All @@ -52,6 +52,7 @@ async fn run_server(
.route("/api/qmdl-manifest", get(get_qmdl_manifest))
.route("/api/start-recording", post(start_recording))
.route("/api/stop-recording", post(stop_recording))
.route("/api/analysis-report", get(get_analysis_report))
.route("/", get(|| async { Redirect::permanent("/index.html") }))
.route("/*path", get(serve_static))
.with_state(state);
Expand All @@ -72,10 +73,10 @@ async fn server_shutdown_signal(server_shutdown_rx: oneshot::Receiver<()>) {

// Loads a QmdlStore if one exists, and if not, only create one if we're not in
// readonly mode.
async fn init_qmdl_store(config: &config::Config) -> Result<QmdlStore, RayhunterError> {
match (QmdlStore::exists(&config.qmdl_store_path).await?, config.readonly_mode) {
(true, _) => Ok(QmdlStore::load(&config.qmdl_store_path).await?),
(false, false) => Ok(QmdlStore::create(&config.qmdl_store_path).await?),
async fn init_qmdl_store(config: &config::Config) -> Result<RecordingStore, RayhunterError> {
match (RecordingStore::exists(&config.qmdl_store_path).await?, config.readonly_mode) {
(true, _) => Ok(RecordingStore::load(&config.qmdl_store_path).await?),
(false, false) => Ok(RecordingStore::create(&config.qmdl_store_path).await?),
(false, true) => Err(RayhunterError::NoStoreReadonlyMode(config.qmdl_store_path.clone())),
}
}
Expand All @@ -87,7 +88,7 @@ fn run_ctrl_c_thread(
task_tracker: &TaskTracker,
diag_device_sender: Sender<DiagDeviceCtrlMessage>,
server_shutdown_tx: oneshot::Sender<()>,
qmdl_store_lock: Arc<RwLock<QmdlStore>>
qmdl_store_lock: Arc<RwLock<RecordingStore>>
) -> JoinHandle<Result<(), RayhunterError>> {
task_tracker.spawn(async move {
match tokio::signal::ctrl_c().await {
Expand Down
134 changes: 119 additions & 15 deletions bin/src/diag.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,122 @@
use std::pin::pin;
use std::sync::Arc;

use axum::body::Body;
use axum::extract::State;
use axum::http::header::CONTENT_TYPE;
use axum::http::StatusCode;
use rayhunter::diag::DataType;
use axum::response::{IntoResponse, Response};
use rayhunter::analysis::analyzer::Harness;
use rayhunter::diag::{DataType, MessagesContainer};
use rayhunter::diag_device::DiagDevice;
use serde::Serialize;
use tokio::sync::RwLock;
use tokio::sync::mpsc::Receiver;
use rayhunter::qmdl::QmdlWriter;
use log::{debug, error, info};
use tokio::fs::File;
use tokio::io::{BufWriter, AsyncWriteExt};
use tokio_util::io::ReaderStream;
use tokio_util::task::TaskTracker;
use futures::{StreamExt, TryStreamExt};

use crate::qmdl_store::QmdlStore;
use crate::qmdl_store::RecordingStore;
use crate::server::ServerState;

pub enum DiagDeviceCtrlMessage {
StopRecording,
StartRecording(QmdlWriter<File>),
StartRecording((QmdlWriter<File>, File)),
Exit,
}

pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>, qmdl_store_lock: Arc<RwLock<QmdlStore>>) {
struct AnalysisWriter {
writer: BufWriter<File>,
harness: Harness,
bytes_written: usize,
}

// We write our analysis results to a file immediately to minimize the amount of
// state Rayhunter has to keep track of in memory. The analysis file's format is
// Newline Delimited JSON
// (https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson), which
// lets us simply append new rows to the end without parsing the entire JSON
// object beforehand.
impl AnalysisWriter {
pub async fn new(file: File) -> Result<Self, std::io::Error> {
let mut result = Self {
writer: BufWriter::new(file),
harness: Harness::new_with_all_analyzers(),
bytes_written: 0,
};
let metadata = result.harness.get_metadata();
result.write(&metadata).await?;
Ok(result)
}

// Runs the analysis harness on the given container, serializing the results
// to the analysis file and returning the file's new length.
pub async fn analyze(&mut self, container: MessagesContainer) -> Result<usize, std::io::Error> {
let row = self.harness.analyze_qmdl_messages(container);
if !row.is_empty() {
self.write(&row).await?;
}
Ok(self.bytes_written)
}

async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), std::io::Error> {
let mut value_str = serde_json::to_string(value).unwrap();
value_str.push('\n');
self.bytes_written += value_str.len();
self.writer.write_all(value_str.as_bytes()).await?;
self.writer.flush().await?;
Ok(())
}

// Flushes any pending I/O to disk before dropping the writer
pub async fn close(mut self) -> Result<(), std::io::Error> {
self.writer.flush().await?;
Ok(())
}
}

pub fn run_diag_read_thread(
task_tracker: &TaskTracker,
mut dev: DiagDevice,
mut qmdl_file_rx: Receiver<DiagDeviceCtrlMessage>,
qmdl_store_lock: Arc<RwLock<RecordingStore>>
) {
task_tracker.spawn(async move {
let initial_file = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry");
let mut qmdl_writer: Option<QmdlWriter<File>> = Some(QmdlWriter::new(initial_file));
let (initial_qmdl_file, initial_analysis_file) = qmdl_store_lock.write().await.new_entry().await.expect("failed creating QMDL file entry");
let mut maybe_qmdl_writer: Option<QmdlWriter<File>> = Some(QmdlWriter::new(initial_qmdl_file));
let mut diag_stream = pin!(dev.as_stream().into_stream());
let mut maybe_analysis_writer = Some(AnalysisWriter::new(initial_analysis_file).await
.expect("failed to create analysis writer"));
loop {
tokio::select! {
msg = qmdl_file_rx.recv() => {
match msg {
Some(DiagDeviceCtrlMessage::StartRecording(new_writer)) => {
qmdl_writer = Some(new_writer);
Some(DiagDeviceCtrlMessage::StartRecording((new_writer, new_analysis_file))) => {
maybe_qmdl_writer = Some(new_writer);
if let Some(analysis_writer) = maybe_analysis_writer {
analysis_writer.close().await.expect("failed to close analysis writer");
}
maybe_analysis_writer = Some(AnalysisWriter::new(new_analysis_file).await
.expect("failed to write to analysis file"));
},
Some(DiagDeviceCtrlMessage::StopRecording) => {
maybe_qmdl_writer = None;
if let Some(analysis_writer) = maybe_analysis_writer {
analysis_writer.close().await.expect("failed to close analysis writer");
}
maybe_analysis_writer = None;
},
Some(DiagDeviceCtrlMessage::StopRecording) => qmdl_writer = None,
// None means all the Senders have been dropped, so it's
// time to go
Some(DiagDeviceCtrlMessage::Exit) | None => {
info!("Diag reader thread exiting...");
if let Some(analysis_writer) = maybe_analysis_writer {
analysis_writer.close().await.expect("failed to close analysis writer");
}
return Ok(())
},
}
Expand All @@ -52,17 +130,26 @@ pub fn run_diag_read_thread(task_tracker: &TaskTracker, mut dev: DiagDevice, mut
}
// keep track of how many bytes were written to the QMDL file so we can read
// a valid block of data from it in the HTTP server
if let Some(writer) = qmdl_writer.as_mut() {
writer.write_container(&container).await.expect("failed to write to QMDL writer");
debug!("total QMDL bytes written: {}, updating manifest...", writer.total_written);
if let Some(qmdl_writer) = maybe_qmdl_writer.as_mut() {
qmdl_writer.write_container(&container).await.expect("failed to write to QMDL writer");
debug!("total QMDL bytes written: {}, updating manifest...", qmdl_writer.total_written);
let mut qmdl_store = qmdl_store_lock.write().await;
let index = qmdl_store.current_entry.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
qmdl_store.update_entry(index, writer.total_written).await
qmdl_store.update_entry_qmdl_size(index, qmdl_writer.total_written).await
.expect("failed to update qmdl file size");
debug!("done!");
} else {
debug!("no qmdl_writer set, continuing...");
}

if let Some(analysis_writer) = maybe_analysis_writer.as_mut() {
let analysis_file_len = analysis_writer.analyze(container).await
.expect("failed to analyze container");
let mut qmdl_store = qmdl_store_lock.write().await;
let index = qmdl_store.current_entry.expect("DiagDevice had qmdl_writer, but QmdlStore didn't have current entry???");
qmdl_store.update_entry_analysis_size(index, analysis_file_len as usize).await
.expect("failed to update analysis file size");
}
},
Err(err) => {
error!("error reading diag device: {}", err);
Expand All @@ -80,10 +167,10 @@ pub async fn start_recording(State(state): State<Arc<ServerState>>) -> Result<(S
return Err((StatusCode::FORBIDDEN, "server is in readonly mode".to_string()));
}
let mut qmdl_store = state.qmdl_store_lock.write().await;
let qmdl_file = qmdl_store.new_entry().await
let (qmdl_file, analysis_file) = qmdl_store.new_entry().await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't create new qmdl entry: {}", e)))?;
let qmdl_writer = QmdlWriter::new(qmdl_file);
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StartRecording(qmdl_writer)).await
state.diag_device_ctrl_sender.send(DiagDeviceCtrlMessage::StartRecording((qmdl_writer, analysis_file))).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}
Expand All @@ -99,3 +186,20 @@ pub async fn stop_recording(State(state): State<Arc<ServerState>>) -> Result<(St
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("couldn't send stop recording message: {}", e)))?;
Ok((StatusCode::ACCEPTED, "ok".to_string()))
}

pub async fn get_analysis_report(State(state): State<Arc<ServerState>>) -> Result<Response, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let Some(entry) = qmdl_store.get_current_entry() else {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"No QMDL data's being recorded to analyze, try starting a new recording!".to_string()
));
};
let analysis_file = qmdl_store.open_entry_analysis(entry).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{:?}", e)))?;
let analysis_stream = ReaderStream::new(analysis_file);

let headers = [(CONTENT_TYPE, "application/x-ndjson")];
let body = Body::from_stream(analysis_stream);
Ok((headers, body).into_response())
}
4 changes: 2 additions & 2 deletions bin/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use thiserror::Error;
use rayhunter::diag_device::DiagDeviceError;

use crate::qmdl_store::QmdlStoreError;
use crate::qmdl_store::RecordingStoreError;

#[derive(Error, Debug)]
pub enum RayhunterError{
Expand All @@ -12,7 +12,7 @@ pub enum RayhunterError{
#[error("Tokio error: {0}")]
TokioError(#[from] tokio::io::Error),
#[error("QmdlStore error: {0}")]
QmdlStoreError(#[from] QmdlStoreError),
QmdlStoreError(#[from] RecordingStoreError),
#[error("No QMDL store found at path {0}, but can't create a new one due to readonly mode")]
NoStoreReadonlyMode(String),
}
Loading

0 comments on commit f261940

Please sign in to comment.