Skip to content

Commit

Permalink
Use impl Future instead of async-trait (#20)
Browse files Browse the repository at this point in the history
* refactor: use impl future instead of async-trait

* chore(deps): remove asyn-trait
  • Loading branch information
negezor authored Apr 13, 2024
1 parent ba703b7 commit f0023d3
Show file tree
Hide file tree
Showing 20 changed files with 50 additions and 82 deletions.
6 changes: 1 addition & 5 deletions sea-streamer-file/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ pub use future::StreamFuture as FileMessageStream;

use flume::{r#async::RecvFut, Receiver, Sender, TrySendError};
use sea_streamer_types::{
export::{
async_trait,
futures::{Future, FutureExt},
},
export::futures::{Future, FutureExt},
Consumer as ConsumerTrait, SeqPos, ShardId, SharedMessage, StreamErr, StreamKey, Timestamp,
};

Expand Down Expand Up @@ -66,7 +63,6 @@ impl Drop for FileConsumer {
}
}

#[async_trait]
impl ConsumerTrait for FileConsumer {
type Error = FileErr;
type Message<'a> = SharedMessage;
Expand Down
6 changes: 2 additions & 4 deletions sea-streamer-file/src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use std::{fmt::Debug, future::Future};

use crate::{Bytes, FileErr, FileId, FileResult};
use sea_streamer_types::{
export::{async_trait, futures::FutureExt},
Buffer, MessageHeader, Producer as ProducerTrait, ShardId, StreamErr, StreamKey, StreamResult,
Timestamp,
export::futures::FutureExt, Buffer, MessageHeader, Producer as ProducerTrait, ShardId,
StreamErr, StreamKey, StreamResult, Timestamp,
};

pub(crate) use backend::{end_producer, new_producer};
Expand Down Expand Up @@ -75,7 +74,6 @@ impl Debug for SendFuture {
}
}

#[async_trait]
impl ProducerTrait for FileProducer {
type Error = FileErr;
type SendFuture = SendFuture;
Expand Down
3 changes: 1 addition & 2 deletions sea-streamer-file/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
DEFAULT_PREFETCH_MESSAGE,
};
use sea_streamer_types::{
export::async_trait, ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode,
ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode,
ConsumerOptions as ConsumerOptionsTrait, ProducerOptions as ProducerOptionsTrait, StreamErr,
StreamKey, StreamUrlErr, Streamer as StreamerTrait, StreamerUri,
};
Expand Down Expand Up @@ -74,7 +74,6 @@ pub enum ConfigErr {
InvalidBeaconInterval,
}

#[async_trait]
impl StreamerTrait for FileStreamer {
type Error = FileErr;
type Producer = FileProducer;
Expand Down
12 changes: 4 additions & 8 deletions sea-streamer-kafka/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ use sea_streamer_runtime::spawn_blocking;
use std::{collections::HashSet, fmt::Debug, time::Duration};

use sea_streamer_types::{
export::{
async_trait,
futures::{
future::Map,
stream::{Map as StreamMap, StreamFuture},
FutureExt, StreamExt,
},
export::futures::{
future::Map,
stream::{Map as StreamMap, StreamFuture},
FutureExt, StreamExt,
},
runtime_error, Consumer as ConsumerTrait, ConsumerGroup, ConsumerMode, ConsumerOptions,
Message, Payload, SeqNo, SeqPos, ShardId, StreamErr, StreamKey, StreamerUri, Timestamp,
Expand Down Expand Up @@ -251,7 +248,6 @@ impl std::fmt::Debug for KafkaConsumer {
}
}

#[async_trait]
impl ConsumerTrait for KafkaConsumer {
type Error = KafkaErr;
type Message<'a> = KafkaMessage<'a>;
Expand Down
6 changes: 2 additions & 4 deletions sea-streamer-kafka/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use rdkafka::{
pub use rdkafka::{consumer::ConsumerGroupMetadata, producer::FutureRecord, TopicPartitionList};
use sea_streamer_runtime::spawn_blocking;
use sea_streamer_types::{
export::{async_trait, futures::FutureExt},
runtime_error, Buffer, MessageHeader, Producer, ProducerOptions, ShardId, StreamErr, StreamKey,
StreamResult, StreamerUri, Timestamp,
export::futures::FutureExt, runtime_error, Buffer, MessageHeader, Producer, ProducerOptions,
ShardId, StreamErr, StreamKey, StreamResult, StreamerUri, Timestamp,
};

#[derive(Clone)]
Expand Down Expand Up @@ -80,7 +79,6 @@ impl Default for CompressionType {
}
}

#[async_trait]
impl Producer for KafkaProducer {
type Error = KafkaErr;
type SendFuture = SendFuture;
Expand Down
5 changes: 2 additions & 3 deletions sea-streamer-kafka/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::{

use sea_streamer_runtime::spawn_blocking;
use sea_streamer_types::{
export::async_trait, runtime_error, ConnectOptions, ConsumerGroup, ConsumerMode,
ConsumerOptions, StreamErr, StreamKey, Streamer, StreamerUri,
runtime_error, ConnectOptions, ConsumerGroup, ConsumerMode, ConsumerOptions, StreamErr,
StreamKey, Streamer, StreamerUri,
};

use crate::{
Expand Down Expand Up @@ -196,7 +196,6 @@ impl_into_string!(BaseOptionKey);
impl_into_string!(SecurityProtocol);
impl_into_string!(SaslMechanism);

#[async_trait]
impl Streamer for KafkaStreamer {
type Error = KafkaErr;
type Producer = KafkaProducer;
Expand Down
8 changes: 3 additions & 5 deletions sea-streamer-redis/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ use crate::{
};
use sea_streamer_runtime::{spawn_task, timeout};
use sea_streamer_types::{
export::{async_trait, futures::FutureExt},
Buffer, ConnectOptions, Consumer, ConsumerGroup, ConsumerId, ConsumerMode, ConsumerOptions,
Message, MessageHeader, SeqNo, SeqPos, ShardId, SharedMessage, StreamErr, StreamKey, Timestamp,
SEA_STREAMER_INTERNAL,
export::futures::FutureExt, Buffer, ConnectOptions, Consumer, ConsumerGroup, ConsumerId,
ConsumerMode, ConsumerOptions, Message, MessageHeader, SeqNo, SeqPos, ShardId, SharedMessage,
StreamErr, StreamKey, Timestamp, SEA_STREAMER_INTERNAL,
};

#[derive(Debug)]
Expand Down Expand Up @@ -77,7 +76,6 @@ pub mod constants {
pub const HEARTBEAT: Duration = Duration::from_secs(10);
}

#[async_trait]
impl Consumer for RedisConsumer {
type Error = RedisErr;
type Message<'a> = SharedMessage;
Expand Down
6 changes: 2 additions & 4 deletions sea-streamer-redis/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use crate::{
};
use sea_streamer_runtime::{sleep, spawn_task};
use sea_streamer_types::{
export::{async_trait, futures::FutureExt},
Buffer, MessageHeader, Producer, ProducerOptions, ShardId, StreamErr, StreamKey, Timestamp,
SEA_STREAMER_INTERNAL,
export::futures::FutureExt, Buffer, MessageHeader, Producer, ProducerOptions, ShardId,
StreamErr, StreamKey, Timestamp, SEA_STREAMER_INTERNAL,
};

const MAX_RETRY: usize = 100;
Expand Down Expand Up @@ -87,7 +86,6 @@ pub struct RoundRobinSharder {
state: u32,
}

#[async_trait]
impl Producer for RedisProducer {
type Error = RedisErr;
type SendFuture = SendFuture;
Expand Down
3 changes: 1 addition & 2 deletions sea-streamer-redis/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
RedisProducer, RedisProducerOptions, RedisResult, REDIS_PORT,
};
use sea_streamer_types::{
export::async_trait, ConnectOptions, StreamErr, StreamKey, StreamUrlErr, Streamer, StreamerUri,
ConnectOptions, StreamErr, StreamKey, StreamUrlErr, Streamer, StreamerUri,
};

#[derive(Debug, Clone)]
Expand All @@ -26,7 +26,6 @@ pub struct RedisConnectOptions {
disable_hostname_verification: bool,
}

#[async_trait]
impl Streamer for RedisStreamer {
type Error = RedisErr;
type Producer = RedisProducer;
Expand Down
6 changes: 1 addition & 5 deletions sea-streamer-socket/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use sea_streamer_stdio::StdioConsumer;

use crate::{map_err, Backend, BackendErr, SeaMessage, SeaResult, SeaStreamerBackend};
use sea_streamer_types::{
export::{
async_trait,
futures::{FutureExt, Stream},
},
export::futures::{FutureExt, Stream},
Consumer, SeqPos, ShardId, StreamKey, StreamResult, Timestamp,
};
use std::{fmt::Debug, future::Future, pin::Pin, task::Poll};
Expand Down Expand Up @@ -201,7 +198,6 @@ impl SeaStreamerBackend for SeaConsumer {
}
}

#[async_trait]
impl Consumer for SeaConsumer {
type Error = BackendErr;
type Message<'a> = SeaMessage<'a>;
Expand Down
4 changes: 1 addition & 3 deletions sea-streamer-socket/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use sea_streamer_stdio::StdioProducer;

use crate::{map_err, Backend, BackendErr, SeaResult, SeaStreamerBackend};
use sea_streamer_types::{
export::{async_trait, futures::FutureExt},
Buffer, Producer, Receipt, StreamKey, StreamResult,
export::futures::FutureExt, Buffer, Producer, Receipt, StreamKey, StreamResult,
};
use std::{future::Future, pin::Pin, task::Poll};

Expand Down Expand Up @@ -157,7 +156,6 @@ pub enum SendFuture {
File(sea_streamer_file::SendFuture),
}

#[async_trait]
impl Producer for SeaProducer {
type Error = BackendErr;

Expand Down
3 changes: 1 addition & 2 deletions sea-streamer-socket/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use sea_streamer_redis::RedisStreamer;
#[cfg(feature = "backend-stdio")]
use sea_streamer_stdio::StdioStreamer;

use sea_streamer_types::{export::async_trait, StreamErr, StreamKey, Streamer, StreamerUri};
use sea_streamer_types::{StreamErr, StreamKey, Streamer, StreamerUri};

use crate::{
map_err, Backend, BackendErr, SeaConnectOptions, SeaConsumer, SeaConsumerBackend,
Expand Down Expand Up @@ -145,7 +145,6 @@ impl SeaStreamerBackend for SeaStreamer {
}
}

#[async_trait]
impl Streamer for SeaStreamer {
type Error = BackendErr;
type Producer = SeaProducer;
Expand Down
6 changes: 1 addition & 5 deletions sea-streamer-stdio/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use flume::{
use std::sync::Mutex;

use sea_streamer_types::{
export::{
async_trait,
futures::{future::MapErr, stream::Map as StreamMap, StreamExt, TryFutureExt},
},
export::futures::{future::MapErr, stream::Map as StreamMap, StreamExt, TryFutureExt},
Consumer as ConsumerTrait, ConsumerGroup, SeqPos, ShardId, SharedMessage, StreamErr, StreamKey,
Timestamp,
};
Expand Down Expand Up @@ -118,7 +115,6 @@ impl Drop for StdioConsumer {
}
}

#[async_trait]
impl ConsumerTrait for StdioConsumer {
type Error = StdioErr;
type Message<'a> = SharedMessage;
Expand Down
6 changes: 2 additions & 4 deletions sea-streamer-stdio/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use flume::{bounded, r#async::RecvFut, unbounded, Sender};
use std::{collections::HashMap, fmt::Debug, future::Future, sync::Mutex};

use sea_streamer_types::{
export::{async_trait, futures::FutureExt},
Buffer, Message, MessageHeader, Producer as ProducerTrait, Receipt, SeqNo, ShardId,
SharedMessage, StreamErr, StreamKey, StreamResult, Timestamp,
export::futures::FutureExt, Buffer, Message, MessageHeader, Producer as ProducerTrait, Receipt,
SeqNo, ShardId, SharedMessage, StreamErr, StreamKey, StreamResult, Timestamp,
};

use crate::{PartialHeader, StdioErr, StdioResult, BROADCAST, TIMESTAMP_FORMAT};
Expand Down Expand Up @@ -174,7 +173,6 @@ impl Debug for SendFuture {
}
}

#[async_trait]
impl ProducerTrait for StdioProducer {
type Error = StdioErr;
type SendFuture = SendFuture;
Expand Down
3 changes: 1 addition & 2 deletions sea-streamer-stdio/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
consumer, create_consumer, producer, StdioConsumer, StdioErr, StdioProducer, StdioResult,
};
use sea_streamer_types::{
export::async_trait, ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode,
ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode,
ConsumerOptions as ConsumerOptionsTrait, ProducerOptions as ProducerOptionsTrait, StreamErr,
StreamKey, Streamer as StreamerTrait, StreamerUri,
};
Expand All @@ -28,7 +28,6 @@ pub struct StdioConsumerOptions {
#[derive(Debug, Default, Clone)]
pub struct StdioProducerOptions {}

#[async_trait]
impl StreamerTrait for StdioStreamer {
type Error = StdioErr;
type Producer = StdioProducer;
Expand Down
1 change: 0 additions & 1 deletion sea-streamer-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
async-trait = { version = "0.1" }
futures = { version = "0.3", default-features = false, features = ["std", "alloc", "async-await"] }
thiserror = { version = "1", default-features = false }
time = { version = "0.3", default-features = false, features = ["std", "macros", "formatting"] }
Expand Down
10 changes: 6 additions & 4 deletions sea-streamer-types/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{Message, SeqPos, ShardId, StreamKey, StreamResult, Timestamp};
use async_trait::async_trait;
use futures::{Future, Stream};

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -50,7 +49,6 @@ pub trait ConsumerOptions: Default + Clone + Send {
) -> StreamResult<&mut Self, Self::Error>;
}

#[async_trait]
/// Common interface of consumers, to be implemented by all backends.
pub trait Consumer: Sized + Send + Sync {
type Error: std::error::Error;
Expand All @@ -69,12 +67,16 @@ pub trait Consumer: Sized + Send + Sync {
/// with a timestamp later than `to`.
///
/// If the consumer is not already assigned, shard ZERO will be used.
async fn seek(&mut self, to: Timestamp) -> StreamResult<(), Self::Error>;
fn seek(&mut self, to: Timestamp)
-> impl Future<Output = StreamResult<(), Self::Error>> + Send;

/// Rewind all streams to a particular sequence number.
///
/// If the consumer is not already assigned, shard ZERO will be used.
async fn rewind(&mut self, offset: SeqPos) -> StreamResult<(), Self::Error>;
fn rewind(
&mut self,
offset: SeqPos,
) -> impl Future<Output = StreamResult<(), Self::Error>> + Send;

/// Assign this consumer to a particular shard. Can be called multiple times to assign
/// to multiple shards. Returns error `StreamKeyNotFound` if the stream is not currently subscribed.
Expand Down
1 change: 0 additions & 1 deletion sea-streamer-types/src/export.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub use async_trait::async_trait;
pub use futures;
pub use time;
pub use url;
6 changes: 2 additions & 4 deletions sea-streamer-types/src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use futures::Future;

use crate::{Buffer, MessageHeader, StreamKey, StreamResult};
Expand All @@ -9,7 +8,6 @@ pub trait ProducerOptions: Default + Clone + Send {}
/// Delivery receipt.
pub type Receipt = MessageHeader;

#[async_trait]
/// Common interface of producers, to be implemented by all backends.
pub trait Producer: Clone + Send + Sync {
type Error: std::error::Error;
Expand All @@ -32,10 +30,10 @@ pub trait Producer: Clone + Send + Sync {
}

/// End this producer, only after flushing all it's pending messages.
async fn end(self) -> StreamResult<(), Self::Error>;
fn end(self) -> impl Future<Output = StreamResult<(), Self::Error>> + Send;

/// Flush all pending messages.
async fn flush(&mut self) -> StreamResult<(), Self::Error>;
fn flush(&mut self) -> impl Future<Output = StreamResult<(), Self::Error>> + Send;

/// Lock this producer to a particular stream. This function can only be called once.
/// Subsequent calls should return `StreamErr::AlreadyAnchored` error.
Expand Down
Loading

0 comments on commit f0023d3

Please sign in to comment.