diff --git a/src/reports_updater.rs b/src/reports_updater.rs index 96ee756..9bebde2 100644 --- a/src/reports_updater.rs +++ b/src/reports_updater.rs @@ -24,7 +24,7 @@ use std::{ time::Instant, }; use tcn::{SignedReport, TemporaryContactNumber}; -use timer::Timer; +use timer::{Guard, Timer}; pub trait TcnMatcher { fn match_reports( @@ -217,7 +217,12 @@ where T: 'static + TcnDao, { tcn_batches_manager: Arc>>, - timer: Arc>, + _timer_data: TimerData +} + +struct TimerData { + _timer: Arc>, + _guard: Guard } impl ObservedTcnProcessorImpl @@ -225,27 +230,28 @@ where T: 'static + TcnDao, { pub fn new(tcn_batches_manager: TcnBatchesManager) -> ObservedTcnProcessorImpl { + let tcn_batches_manager = Arc::new(Mutex::new(tcn_batches_manager)); let instance = ObservedTcnProcessorImpl { - tcn_batches_manager: Arc::new(Mutex::new(tcn_batches_manager)), - timer: Arc::new(Mutex::new(Timer::new())), + tcn_batches_manager: tcn_batches_manager.clone(), + _timer_data: Self::schedule_process_batches(tcn_batches_manager) + }; - instance.schedule_process_batches(); instance } - fn schedule_process_batches(&self) { - let timer_mutex = self.timer.clone(); - let tcn_batches_manager = self.tcn_batches_manager.clone(); - thread::spawn(move || { - let timer = timer_mutex.lock().unwrap(); - timer.schedule_repeating(chrono::Duration::seconds(10), move || { + fn schedule_process_batches(tcn_batches_manager: Arc>>) -> TimerData { + let timer = Arc::new(Mutex::new(Timer::new())); + TimerData { + _timer: timer.clone(), + _guard: timer.clone().lock().unwrap().schedule_repeating(chrono::Duration::seconds(1), move || { debug!("Flushing TCN batches into database"); let tcn_batches_manager_res = tcn_batches_manager.lock(); let tcn_batches_manager = expect_log!(tcn_batches_manager_res, "error"); let flush_res = tcn_batches_manager.flush(); expect_log!(flush_res, "Couldn't flush TCNs"); }) - }); + + } } } @@ -372,6 +378,8 @@ impl TcnDao for TcnDaoImpl { // Overwrites if already exists fn save_batch(&self, observed_tcns: Vec) -> Result<(), ServicesError> { + debug!("Saving TCN batch: {:?}", observed_tcns); + self.db.transaction(|t| { for tcn in observed_tcns { let tcn_str = hex::encode(tcn.tcn.0);