diff --git a/sea-streamer-file/src/streamer.rs b/sea-streamer-file/src/streamer.rs index 2354db0..aa83e13 100644 --- a/sea-streamer-file/src/streamer.rs +++ b/sea-streamer-file/src/streamer.rs @@ -86,11 +86,7 @@ impl StreamerTrait for FileStreamer { /// First check whether the file exists. /// If not, depending on the options, either create it, or error. - async fn connect(streamer: S, options: Self::ConnectOptions) -> FileResult - where - S: Into + Send, - { - let uri = streamer.into(); + async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> FileResult { if uri.nodes().is_empty() { return Err(StreamErr::StreamUrlErr(StreamUrlErr::ZeroNode)); } diff --git a/sea-streamer-kafka/src/streamer.rs b/sea-streamer-kafka/src/streamer.rs index 0d5cf75..47317fb 100644 --- a/sea-streamer-kafka/src/streamer.rs +++ b/sea-streamer-kafka/src/streamer.rs @@ -204,11 +204,7 @@ impl Streamer for KafkaStreamer { type ConsumerOptions = KafkaConsumerOptions; type ProducerOptions = KafkaProducerOptions; - async fn connect(streamer: S, options: Self::ConnectOptions) -> KafkaResult - where - S: Into + Send, - { - let uri = streamer.into(); + async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> KafkaResult { let admin = create_admin(&uri, &options).map_err(StreamErr::Backend)?; let timeout = options.timeout().unwrap_or(Duration::from_secs(1)); spawn_blocking(move || admin.inner().fetch_cluster_id(timeout)) diff --git a/sea-streamer-kafka/tests/consumer.rs b/sea-streamer-kafka/tests/consumer.rs index c46f394..ca8f138 100644 --- a/sea-streamer-kafka/tests/consumer.rs +++ b/sea-streamer-kafka/tests/consumer.rs @@ -9,13 +9,13 @@ async fn main() -> anyhow::Result<()> { use sea_streamer_kafka::{AutoOffsetReset, KafkaConsumer, KafkaConsumerOptions, KafkaStreamer}; use sea_streamer_types::{ export::futures::StreamExt, Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, - Producer, SeqPos, ShardId, StreamKey, Streamer, StreamerUri, Timestamp, + Producer, SeqPos, ShardId, StreamKey, Streamer, Timestamp, }; let streamer = KafkaStreamer::connect( std::env::var("BROKERS_URL") .unwrap_or_else(|_| "localhost:9092".to_owned()) - .parse::() + .parse() .unwrap(), Default::default(), ) diff --git a/sea-streamer-redis/src/streamer.rs b/sea-streamer-redis/src/streamer.rs index 80fd97b..4e7862d 100644 --- a/sea-streamer-redis/src/streamer.rs +++ b/sea-streamer-redis/src/streamer.rs @@ -34,11 +34,7 @@ impl Streamer for RedisStreamer { type ConsumerOptions = RedisConsumerOptions; type ProducerOptions = RedisProducerOptions; - async fn connect(streamer: S, options: Self::ConnectOptions) -> RedisResult - where - S: Into + Send, - { - let uri = streamer.into(); + async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> RedisResult { if uri.protocol().is_none() { return Err(StreamErr::StreamUrlErr(StreamUrlErr::ProtocolRequired)); } diff --git a/sea-streamer-redis/tests/consumer-group.rs b/sea-streamer-redis/tests/consumer-group.rs index cbb0328..79f8197 100644 --- a/sea-streamer-redis/tests/consumer-group.rs +++ b/sea-streamer-redis/tests/consumer-group.rs @@ -13,8 +13,7 @@ async fn consumer_group() -> anyhow::Result<()> { AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisStreamer, }; use sea_streamer_types::{ - Consumer, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, StreamerUri, - Timestamp, + Consumer, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, Timestamp, }; const TEST: &str = "group-1"; @@ -30,7 +29,7 @@ async fn consumer_group() -> anyhow::Result<()> { let streamer = RedisStreamer::connect( std::env::var("BROKERS_URL") .unwrap_or_else(|_| "redis://localhost".to_owned()) - .parse::() + .parse() .unwrap(), options, ) diff --git a/sea-streamer-redis/tests/load-balanced.rs b/sea-streamer-redis/tests/load-balanced.rs index 1bfa441..42260e5 100644 --- a/sea-streamer-redis/tests/load-balanced.rs +++ b/sea-streamer-redis/tests/load-balanced.rs @@ -23,7 +23,7 @@ async fn load_balance() -> anyhow::Result<()> { use sea_streamer_runtime::{sleep, spawn_task}; use sea_streamer_types::{ export::futures::stream::StreamExt, Buffer, Consumer, ConsumerMode, ConsumerOptions, - Message, Producer, StreamKey, Streamer, StreamerUri, Timestamp, + Message, Producer, StreamKey, Streamer, Timestamp, }; use std::time::Duration; @@ -39,7 +39,7 @@ async fn load_balance() -> anyhow::Result<()> { let streamer = RedisStreamer::connect( std::env::var("BROKERS_URL") .unwrap_or_else(|_| "redis://localhost".to_owned()) - .parse::() + .parse() .unwrap(), options, ) @@ -176,8 +176,7 @@ async fn failover() -> anyhow::Result<()> { AutoCommit, AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisStreamer, }; use sea_streamer_types::{ - ConsumerGroup, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, StreamerUri, - Timestamp, + ConsumerGroup, ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, Timestamp, }; use std::time::Duration; @@ -193,7 +192,7 @@ async fn failover() -> anyhow::Result<()> { let streamer = RedisStreamer::connect( std::env::var("BROKERS_URL") .unwrap_or_else(|_| "redis://localhost".to_owned()) - .parse::() + .parse() .unwrap(), options, ) diff --git a/sea-streamer-redis/tests/realtime.rs b/sea-streamer-redis/tests/realtime.rs index e91f0c8..d1442cd 100644 --- a/sea-streamer-redis/tests/realtime.rs +++ b/sea-streamer-redis/tests/realtime.rs @@ -12,8 +12,7 @@ async fn main() -> anyhow::Result<()> { }; use sea_streamer_runtime::sleep; use sea_streamer_types::{ - ConsumerMode, ConsumerOptions, Producer, ShardId, StreamKey, Streamer, StreamerUri, - Timestamp, + ConsumerMode, ConsumerOptions, Producer, ShardId, StreamKey, Streamer, Timestamp, }; use std::time::Duration; @@ -30,7 +29,7 @@ async fn main() -> anyhow::Result<()> { let streamer = RedisStreamer::connect( std::env::var("BROKERS_URL") .unwrap_or_else(|_| "redis://localhost".to_owned()) - .parse::() + .parse() .unwrap(), options, ) diff --git a/sea-streamer-redis/tests/resumable.rs b/sea-streamer-redis/tests/resumable.rs index b368795..8d4d729 100644 --- a/sea-streamer-redis/tests/resumable.rs +++ b/sea-streamer-redis/tests/resumable.rs @@ -16,7 +16,7 @@ async fn immediate_and_delayed() -> anyhow::Result<()> { use sea_streamer_runtime::timeout; use sea_streamer_types::{ Consumer, ConsumerGroup, ConsumerId, ConsumerMode, ConsumerOptions, Producer, ShardId, - StreamKey, Streamer, StreamerUri, Timestamp, + StreamKey, Streamer, Timestamp, }; use std::time::Duration; @@ -37,7 +37,7 @@ async fn immediate_and_delayed() -> anyhow::Result<()> { let streamer = RedisStreamer::connect( std::env::var("BROKERS_URL") .unwrap_or_else(|_| "redis://localhost".to_owned()) - .parse::() + .parse() .unwrap(), options, ) @@ -126,7 +126,7 @@ async fn rolling_and_disabled() -> anyhow::Result<()> { }; use sea_streamer_types::{ export::futures::StreamExt, Buffer, Consumer, ConsumerGroup, ConsumerId, ConsumerMode, - ConsumerOptions, Message, Producer, StreamKey, Streamer, StreamerUri, Timestamp, + ConsumerOptions, Message, Producer, StreamKey, Streamer, Timestamp, }; use std::time::Duration; @@ -147,7 +147,7 @@ async fn rolling_and_disabled() -> anyhow::Result<()> { let streamer = RedisStreamer::connect( std::env::var("BROKERS_URL") .unwrap_or_else(|_| "redis://localhost".to_owned()) - .parse::() + .parse() .unwrap(), options, ) diff --git a/sea-streamer-redis/tests/seek-rewind.rs b/sea-streamer-redis/tests/seek-rewind.rs index 8a02aaa..3a5d5d9 100644 --- a/sea-streamer-redis/tests/seek-rewind.rs +++ b/sea-streamer-redis/tests/seek-rewind.rs @@ -12,8 +12,7 @@ async fn main() -> anyhow::Result<()> { }; use sea_streamer_runtime::{sleep, timeout}; use sea_streamer_types::{ - Consumer, ConsumerMode, ConsumerOptions, Producer, SeqPos, StreamKey, Streamer, - StreamerUri, Timestamp, + Consumer, ConsumerMode, ConsumerOptions, Producer, SeqPos, StreamKey, Streamer, Timestamp, }; use std::time::Duration; @@ -35,7 +34,7 @@ async fn main() -> anyhow::Result<()> { let streamer = RedisStreamer::connect( std::env::var("BROKERS_URL") .unwrap_or_else(|_| "redis://localhost".to_owned()) - .parse::() + .parse() .unwrap(), options, ) diff --git a/sea-streamer-redis/tests/sharding.rs b/sea-streamer-redis/tests/sharding.rs index c47c883..030a25b 100644 --- a/sea-streamer-redis/tests/sharding.rs +++ b/sea-streamer-redis/tests/sharding.rs @@ -13,7 +13,7 @@ async fn main() -> anyhow::Result<()> { }; use sea_streamer_runtime::sleep; use sea_streamer_types::{ - ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, StreamerUri, Timestamp, + ConsumerMode, ConsumerOptions, Producer, StreamKey, Streamer, Timestamp, }; use std::time::Duration; @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { let streamer = RedisStreamer::connect( std::env::var("BROKERS_URL") .unwrap_or_else(|_| "redis://localhost".to_owned()) - .parse::() + .parse() .unwrap(), options, ) diff --git a/sea-streamer-socket/src/streamer.rs b/sea-streamer-socket/src/streamer.rs index 9e2c898..0525400 100644 --- a/sea-streamer-socket/src/streamer.rs +++ b/sea-streamer-socket/src/streamer.rs @@ -153,11 +153,7 @@ impl Streamer for SeaStreamer { type ConsumerOptions = SeaConsumerOptions; type ProducerOptions = SeaProducerOptions; - async fn connect(streamer: S, options: Self::ConnectOptions) -> SeaResult - where - S: Into + Send, - { - let uri = streamer.into(); + async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> SeaResult { let backend = match uri.protocol() { Some(protocol) => match protocol { #[cfg(feature = "backend-kafka")] diff --git a/sea-streamer-stdio/src/streamer.rs b/sea-streamer-stdio/src/streamer.rs index 0b2a61c..80f4ac9 100644 --- a/sea-streamer-stdio/src/streamer.rs +++ b/sea-streamer-stdio/src/streamer.rs @@ -37,10 +37,7 @@ impl StreamerTrait for StdioStreamer { type ProducerOptions = StdioProducerOptions; /// Nothing will happen until you create a producer/consumer - async fn connect(_: S, options: Self::ConnectOptions) -> StdioResult - where - S: Into + Send, - { + async fn connect(_: StreamerUri, options: Self::ConnectOptions) -> StdioResult { let StdioConnectOptions { loopback } = options; Ok(StdioStreamer { loopback }) } diff --git a/sea-streamer-types/src/streamer.rs b/sea-streamer-types/src/streamer.rs index 892f90e..b1bcbe0 100644 --- a/sea-streamer-types/src/streamer.rs +++ b/sea-streamer-types/src/streamer.rs @@ -48,12 +48,10 @@ pub trait Streamer: Sized { type ProducerOptions: ProducerOptions; /// Establish a connection to the streaming server. - fn connect( - streamer: S, + fn connect( + streamer: StreamerUri, options: Self::ConnectOptions, - ) -> impl Future> + Send - where - S: Into + Send; + ) -> impl Future> + Send; /// Flush and disconnect from the streaming server. fn disconnect(self) -> impl Future> + Send; @@ -100,20 +98,6 @@ impl Display for StreamerUri { } } -impl From for StreamerUri { - fn from(value: Url) -> Self { - Self { nodes: vec![value] } - } -} - -impl FromIterator for StreamerUri { - fn from_iter>(iter: T) -> Self { - Self { - nodes: iter.into_iter().collect(), - } - } -} - impl StreamerUri { pub fn zero() -> Self { Self { nodes: Vec::new() } @@ -363,23 +347,6 @@ mod test { assert_eq!(uri.nodes(), &["file:///path/to/hi".parse().unwrap()]); } - #[test] - fn test_into_streamer_uri() { - let url: Url = "proto://sea-ql.org:1234".parse().unwrap(); - let uri: StreamerUri = url.clone().into(); - assert!(uri.nodes.len() == 1); - assert_eq!(url, uri.nodes.first().unwrap().clone()); - - let urls: [Url; 3] = [ - "proto://sea-ql.org:1".parse().unwrap(), - "proto://sea-ql.org:2".parse().unwrap(), - "proto://sea-ql.org:3".parse().unwrap(), - ]; - let uri: StreamerUri = StreamerUri::from_iter(urls.clone().into_iter()); - assert!(uri.nodes.len() == 3); - assert!(uri.nodes.iter().eq(urls.iter())); - } - #[test] fn test_parse_stream_url_err() { use crate::StreamKeyErr;