From 9925183a2350f65dafb8caead1c7e8019cd35e6b Mon Sep 17 00:00:00 2001 From: aiooss-anssi Date: Tue, 6 Aug 2024 13:35:15 +0200 Subject: [PATCH] suricata-eve-sqlite-output: fix dropped events --- .../src/database.rs | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/suricata/suricata-eve-sqlite-output/src/database.rs b/suricata/suricata-eve-sqlite-output/src/database.rs index 20f562f..b6bdd32 100644 --- a/suricata/suricata-eve-sqlite-output/src/database.rs +++ b/suricata/suricata-eve-sqlite-output/src/database.rs @@ -103,6 +103,7 @@ pub struct Database { conn: rusqlite::Connection, rx: std::sync::mpsc::Receiver, count: usize, + count_inserted: usize, } impl Database { @@ -118,21 +119,32 @@ impl Database { .expect("Failed to set synchronous=off"); conn.execute_batch(include_str!("schema.sql")) .expect("Failed to initialize database schema"); - Ok(Self { conn, rx, count: 0 }) + Ok(Self { + conn, + rx, + count: 0, + count_inserted: 0, + }) } fn batch_write_events(&mut self) -> Result<(), rusqlite::Error> { - while self.rx.iter().peekable().peek().is_some() { + while let Ok(buf) = self.rx.recv() { let transaction = self.conn.transaction()?; - let n_insert: usize = self + + // Insert first event + self.count += 1; + self.count_inserted += write_event(&transaction, &buf)?; + + // Insert remaining events + let batch = self .rx .try_iter() .map(|buf| write_event(&transaction, &buf)) - .collect::, _>>()? - .iter() - .sum(); + .collect::, _>>()?; + self.count += batch.len(); + self.count_inserted += batch.iter().sum::(); + transaction.commit()?; - self.count += n_insert; } Ok(()) } @@ -142,6 +154,10 @@ impl Database { if let Err(err) = self.batch_write_events() { log::error!("Failed to write batch of events: {err:?}"); } - log::info!("Database thread finished: count={}", self.count); + log::info!( + "Database thread finished: count={} inserted={}", + self.count, + self.count_inserted + ); } }