Skip to content

Commit

Permalink
fix(rust): Add feature to Reduce to flush empty batches (#332)
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker authored Jan 19, 2024
1 parent 7f2eed8 commit f90eeff
Showing 1 changed file with 66 additions and 1 deletion.
67 changes: 66 additions & 1 deletion rust-arroyo/src/processing/strategies/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct Reduce<T, TResult> {
message_carried_over: Option<Message<TResult>>,
commit_request_carried_over: Option<CommitRequest>,
compute_batch_size: fn(&T) -> usize,
flush_empty_batches: bool,
}

impl<T: Send + Sync, TResult: Clone + Send + Sync> ProcessingStrategy<T> for Reduce<T, TResult> {
Expand Down Expand Up @@ -146,9 +147,15 @@ impl<T, TResult: Clone> Reduce<T, TResult> {
message_carried_over: None,
commit_request_carried_over: None,
compute_batch_size,
flush_empty_batches: false,
}
}

pub fn flush_empty_batches(mut self, yes: bool) -> Self {
self.flush_empty_batches = yes;
self
}

fn flush(&mut self, force: bool) -> Result<(), InvalidMessage> {
// Try re-submitting the carried over message if there is one
if let Some(message) = self.message_carried_over.take() {
Expand All @@ -165,7 +172,7 @@ impl<T, TResult: Clone> Reduce<T, TResult> {
}
}

if self.batch_state.message_count == 0 {
if self.batch_state.message_count == 0 && !self.flush_empty_batches {
return Ok(());
}

Expand Down Expand Up @@ -406,4 +413,62 @@ mod tests {
// no batches were created
assert!(submitted_messages_clone.lock().unwrap().is_empty());
}

fn test_reduce_with_zero_batch_size_flush() {
let submitted_messages = Arc::new(Mutex::new(Vec::new()));
let submitted_messages_clone = submitted_messages.clone();

let partition1 = Partition::new(Topic::new("test"), 0);

let max_batch_size = 1;
let max_batch_time = Duration::from_secs(100);

let initial_value = Vec::new();
let accumulator = Arc::new(|mut acc: Vec<u64>, value: u64| {
acc.push(value);
acc
});
let compute_batch_size = |_: &_| -> usize { 0 };

let next_step = NextStep {
submitted: submitted_messages,
};

let mut strategy = Reduce::new(
next_step,
accumulator,
initial_value,
max_batch_size,
max_batch_time,
compute_batch_size,
)
.flush_empty_batches(true);

for i in 0..3 {
let msg = Message {
inner_message: InnerMessage::BrokerMessage(BrokerMessage::new(
i,
partition1,
i,
chrono::Utc::now(),
)),
};
strategy.submit(msg).unwrap();
let _ = strategy.poll();
}

// since all submitted values had length 0, do not forward any messages to the next step
// until timeout (which will not happen as part of this test)
assert_eq!(strategy.batch_state.message_count, 0);

strategy.close();
let _ = strategy.join(None);

// "empty" batch was created -- flushed even though the batch size callback claims it is of
// size zero
assert_eq!(
*submitted_messages_clone.lock().unwrap(),
vec![vec![0, 1, 2]]
);
}
}

0 comments on commit f90eeff

Please sign in to comment.