diff --git a/rust-arroyo/examples/transform_and_produce.rs b/rust-arroyo/examples/transform_and_produce.rs index a44f188b..cbdfc2e0 100644 --- a/rust-arroyo/examples/transform_and_produce.rs +++ b/rust-arroyo/examples/transform_and_produce.rs @@ -14,12 +14,14 @@ use rust_arroyo::processing::strategies::produce::Produce; use rust_arroyo::processing::strategies::run_task::RunTask; use rust_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; use rust_arroyo::processing::strategies::{ - InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory, + ProcessingStrategy, ProcessingStrategyFactory, SubmitError, }; use rust_arroyo::processing::StreamProcessor; use rust_arroyo::types::{Message, Topic, TopicOrPartition}; -fn reverse_string(message: Message) -> Result, InvalidMessage> { +fn reverse_string( + message: Message, +) -> Result, SubmitError> { let value = message.payload(); let payload = value.payload().unwrap(); let str_payload = std::str::from_utf8(payload).unwrap(); diff --git a/rust-arroyo/src/processing/strategies/commit_offsets.rs b/rust-arroyo/src/processing/strategies/commit_offsets.rs index 398fbfa9..65f5e882 100644 --- a/rust-arroyo/src/processing/strategies/commit_offsets.rs +++ b/rust-arroyo/src/processing/strategies/commit_offsets.rs @@ -95,7 +95,6 @@ mod tests { #[test] fn test_commit_offsets() { tracing_subscriber::fmt().with_test_writer().init(); - let updater = coarsetime::Updater::new(10).start().unwrap(); let partition1 = Partition::new(Topic::new("noop-commit"), 0); let partition2 = Partition::new(Topic::new("noop-commit"), 1); diff --git a/rust-arroyo/src/processing/strategies/run_task.rs b/rust-arroyo/src/processing/strategies/run_task.rs index 33d798ed..d97c0d8b 100644 --- a/rust-arroyo/src/processing/strategies/run_task.rs +++ b/rust-arroyo/src/processing/strategies/run_task.rs @@ -1,6 +1,6 @@ use crate::processing::strategies::{ - merge_commit_request, CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy, - StrategyError, SubmitError, + merge_commit_request, CommitRequest, MessageRejected, ProcessingStrategy, StrategyError, + SubmitError, }; use crate::types::Message; use std::time::Duration; @@ -26,7 +26,7 @@ impl RunTask { impl ProcessingStrategy for RunTask where TTransformed: Send + Sync, - F: FnMut(Message) -> Result, InvalidMessage> + F: FnMut(Message) -> Result, SubmitError> + Send + Sync + 'static, @@ -63,7 +63,7 @@ where return Err(SubmitError::MessageRejected(MessageRejected { message })); } - let next_message = (self.function)(message).map_err(SubmitError::InvalidMessage)?; + let next_message = (self.function)(message)?; match self.next_step.submit(next_message) { Err(SubmitError::MessageRejected(MessageRejected { @@ -103,7 +103,7 @@ mod tests { #[test] fn test_run_task() { - fn identity(value: Message) -> Result, InvalidMessage> { + fn identity(value: Message) -> Result, SubmitError> { Ok(value) }