diff --git a/rust-arroyo/src/processing/strategies/reduce.rs b/rust-arroyo/src/processing/strategies/reduce.rs index 1ce00125..e9d8fe64 100644 --- a/rust-arroyo/src/processing/strategies/reduce.rs +++ b/rust-arroyo/src/processing/strategies/reduce.rs @@ -60,6 +60,7 @@ pub struct Reduce { message_carried_over: Option>, commit_request_carried_over: Option, compute_batch_size: fn(&T) -> usize, + flush_empty_batches: bool, } impl ProcessingStrategy for Reduce { @@ -146,9 +147,15 @@ impl Reduce { message_carried_over: None, commit_request_carried_over: None, compute_batch_size, + flush_empty_batches: false, } } + pub fn flush_empty_batches(mut self, yes: bool) -> Self { + self.flush_empty_batches = yes; + self + } + fn flush(&mut self, force: bool) -> Result<(), InvalidMessage> { // Try re-submitting the carried over message if there is one if let Some(message) = self.message_carried_over.take() { @@ -165,7 +172,7 @@ impl Reduce { } } - if self.batch_state.message_count == 0 { + if self.batch_state.message_count == 0 && !self.flush_empty_batches { return Ok(()); } @@ -406,4 +413,62 @@ mod tests { // no batches were created assert!(submitted_messages_clone.lock().unwrap().is_empty()); } + + fn test_reduce_with_zero_batch_size_flush() { + let submitted_messages = Arc::new(Mutex::new(Vec::new())); + let submitted_messages_clone = submitted_messages.clone(); + + let partition1 = Partition::new(Topic::new("test"), 0); + + let max_batch_size = 1; + let max_batch_time = Duration::from_secs(100); + + let initial_value = Vec::new(); + let accumulator = Arc::new(|mut acc: Vec, value: u64| { + acc.push(value); + acc + }); + let compute_batch_size = |_: &_| -> usize { 0 }; + + let next_step = NextStep { + submitted: submitted_messages, + }; + + let mut strategy = Reduce::new( + next_step, + accumulator, + initial_value, + max_batch_size, + max_batch_time, + compute_batch_size, + ) + .flush_empty_batches(true); + + for i in 0..3 { + let msg = Message { + inner_message: InnerMessage::BrokerMessage(BrokerMessage::new( + i, + partition1, + i, + chrono::Utc::now(), + )), + }; + strategy.submit(msg).unwrap(); + let _ = strategy.poll(); + } + + // since all submitted values had length 0, do not forward any messages to the next step + // until timeout (which will not happen as part of this test) + assert_eq!(strategy.batch_state.message_count, 0); + + strategy.close(); + let _ = strategy.join(None); + + // "empty" batch was created -- flushed even though the batch size callback claims it is of + // size zero + assert_eq!( + *submitted_messages_clone.lock().unwrap(), + vec![vec![0, 1, 2]] + ); + } }