Skip to content

Commit

Permalink
change return type of runtask
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker committed Oct 17, 2024
1 parent c475f0c commit 45eef29
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 8 deletions.
6 changes: 4 additions & 2 deletions rust-arroyo/examples/transform_and_produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ use rust_arroyo::processing::strategies::produce::Produce;
use rust_arroyo::processing::strategies::run_task::RunTask;
use rust_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig;
use rust_arroyo::processing::strategies::{
InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory,
ProcessingStrategy, ProcessingStrategyFactory, SubmitError,
};
use rust_arroyo::processing::StreamProcessor;
use rust_arroyo::types::{Message, Topic, TopicOrPartition};

fn reverse_string(message: Message<KafkaPayload>) -> Result<Message<KafkaPayload>, InvalidMessage> {
fn reverse_string(
message: Message<KafkaPayload>,
) -> Result<Message<KafkaPayload>, SubmitError<KafkaPayload>> {
let value = message.payload();
let payload = value.payload().unwrap();
let str_payload = std::str::from_utf8(payload).unwrap();
Expand Down
1 change: 0 additions & 1 deletion rust-arroyo/src/processing/strategies/commit_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ mod tests {
#[test]
fn test_commit_offsets() {
tracing_subscriber::fmt().with_test_writer().init();
let updater = coarsetime::Updater::new(10).start().unwrap();

let partition1 = Partition::new(Topic::new("noop-commit"), 0);
let partition2 = Partition::new(Topic::new("noop-commit"), 1);
Expand Down
10 changes: 5 additions & 5 deletions rust-arroyo/src/processing/strategies/run_task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::processing::strategies::{
merge_commit_request, CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy,
StrategyError, SubmitError,
merge_commit_request, CommitRequest, MessageRejected, ProcessingStrategy, StrategyError,
SubmitError,
};
use crate::types::Message;
use std::time::Duration;
Expand All @@ -26,7 +26,7 @@ impl<TTransformed, F, N> RunTask<TTransformed, F, N> {
impl<TPayload, TTransformed, F, N> ProcessingStrategy<TPayload> for RunTask<TTransformed, F, N>
where
TTransformed: Send + Sync,
F: FnMut(Message<TPayload>) -> Result<Message<TTransformed>, InvalidMessage>
F: FnMut(Message<TPayload>) -> Result<Message<TTransformed>, SubmitError<TPayload>>
+ Send
+ Sync
+ 'static,
Expand Down Expand Up @@ -63,7 +63,7 @@ where
return Err(SubmitError::MessageRejected(MessageRejected { message }));
}

let next_message = (self.function)(message).map_err(SubmitError::InvalidMessage)?;
let next_message = (self.function)(message)?;

match self.next_step.submit(next_message) {
Err(SubmitError::MessageRejected(MessageRejected {
Expand Down Expand Up @@ -103,7 +103,7 @@ mod tests {

#[test]
fn test_run_task() {
fn identity(value: Message<String>) -> Result<Message<String>, InvalidMessage> {
fn identity(value: Message<String>) -> Result<Message<String>, SubmitError<String>> {
Ok(value)
}

Expand Down

0 comments on commit 45eef29

Please sign in to comment.