diff --git a/sea-streamer-redis/src/consumer/mod.rs b/sea-streamer-redis/src/consumer/mod.rs index 468b79a..4741aec 100644 --- a/sea-streamer-redis/src/consumer/mod.rs +++ b/sea-streamer-redis/src/consumer/mod.rs @@ -3,6 +3,7 @@ mod future; mod node; mod options; mod shard; +mod stream; use cluster::*; use future::StreamFuture; @@ -10,6 +11,7 @@ pub use future::{NextFuture, StreamFuture as RedisMessageStream}; use node::*; pub use options::*; use shard::*; +pub use stream::*; use flume::{bounded, unbounded, Receiver, Sender}; use std::{fmt::Debug, future::Future, sync::Arc, time::Duration}; @@ -209,10 +211,14 @@ impl RedisConsumer { } } + #[inline] fn auto_ack(&self, header: &MessageHeader) -> RedisResult<()> { + Self::auto_ack_static(&self.handle, header) + } + + fn auto_ack_static(handle: &Sender, header: &MessageHeader) -> RedisResult<()> { // unbounded, so never blocks - if self - .handle + if handle .try_send(CtrlMsg::Ack( (header.stream_key().clone(), *header.shard_id()), get_message_id(header), @@ -268,6 +274,15 @@ impl RedisConsumer { } Ok(()) } + + pub fn into_stream<'a>(self) -> RedisMessStream<'a> { + RedisMessStream { + config: self.config, + stream: self.receiver.into_stream(), + handle: self.handle, + read: false, + } + } } pub(crate) async fn create_consumer( diff --git a/sea-streamer-redis/src/consumer/stream.rs b/sea-streamer-redis/src/consumer/stream.rs new file mode 100644 index 0000000..e18fb57 --- /dev/null +++ b/sea-streamer-redis/src/consumer/stream.rs @@ -0,0 +1,59 @@ +use super::{ConsumerConfig, CtrlMsg, RedisConsumer}; +use crate::{RedisErr, RedisResult}; +use flume::{r#async::RecvStream, Sender}; +use sea_streamer_types::{export::futures::Stream, SharedMessage, StreamErr}; +use std::{fmt::Debug, pin::Pin, task::Poll}; + +pub struct RedisMessStream<'a> { + pub(super) config: ConsumerConfig, + pub(super) stream: RecvStream<'a, RedisResult>, + pub(super) handle: Sender, + pub(super) read: bool, +} + +impl<'a> Debug for RedisMessStream<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RedisMessStream").finish() + } +} + +// logic must mirror that of sea-streamer-redis/src/consumer/future.rs + +impl<'a> Stream for RedisMessStream<'a> { + type Item = RedisResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + use std::task::Poll::{Pending, Ready}; + if !self.read && !self.config.pre_fetch { + self.read = true; + self.handle.try_send(CtrlMsg::Read).ok(); + } + match Pin::new(&mut self.stream).poll_next(cx) { + Ready(res) => match res { + Some(Ok(msg)) => { + if self.config.auto_ack + && RedisConsumer::auto_ack_static(&self.handle, msg.header()).is_err() + { + return Ready(Some(Err(StreamErr::Backend(RedisErr::ConsumerDied)))); + } + self.read = false; + Ready(Some(Ok(msg))) + } + Some(Err(err)) => Ready(Some(Err(err))), + None => Ready(Some(Err(StreamErr::Backend(RedisErr::ConsumerDied)))), + }, + Pending => Pending, + } + } +} + +impl<'a> Drop for RedisMessStream<'a> { + fn drop(&mut self) { + if self.read { + self.handle.try_send(CtrlMsg::Unread).ok(); + } + } +} diff --git a/sea-streamer-redis/tests/realtime.rs b/sea-streamer-redis/tests/realtime.rs index e91f0c8..2c60d0c 100644 --- a/sea-streamer-redis/tests/realtime.rs +++ b/sea-streamer-redis/tests/realtime.rs @@ -6,7 +6,7 @@ use util::*; #[cfg(feature = "test")] #[cfg_attr(feature = "runtime-tokio", tokio::test)] #[cfg_attr(feature = "runtime-async-std", async_std::test)] -async fn main() -> anyhow::Result<()> { +async fn realtime_1() -> anyhow::Result<()> { use sea_streamer_redis::{ AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisStreamer, }; @@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> { }; use std::time::Duration; - const TEST: &str = "realtime"; + const TEST: &str = "realtime_1"; env_logger::init(); test(false).await?; test(true).await?; @@ -135,3 +135,120 @@ async fn main() -> anyhow::Result<()> { Ok(()) } + +#[cfg(feature = "test")] +#[cfg_attr(feature = "runtime-tokio", tokio::test)] +#[cfg_attr(feature = "runtime-async-std", async_std::test)] +async fn realtime_2() -> anyhow::Result<()> { + use sea_streamer_redis::{ + AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisResult, RedisStreamer, + }; + use sea_streamer_runtime::sleep; + use sea_streamer_types::{ + export::futures::{Stream, StreamExt}, + Buffer, ConsumerMode, ConsumerOptions, Message, Producer, ShardId, SharedMessage, + StreamKey, Streamer, StreamerUri, Timestamp, + }; + use std::time::Duration; + + const TEST: &str = "realtime_2"; + env_logger::init(); + test(false).await?; + + async fn test(enable_cluster: bool) -> anyhow::Result<()> { + println!("Enable Cluster = {enable_cluster} ..."); + + let mut options = RedisConnectOptions::default(); + options.set_enable_cluster(enable_cluster); + let streamer = RedisStreamer::connect( + std::env::var("BROKERS_URL") + .unwrap_or_else(|_| "redis://localhost".to_owned()) + .parse::() + .unwrap(), + options, + ) + .await?; + println!("Connect Streamer ... ok"); + + let now = Timestamp::now_utc(); + let stream_key = StreamKey::new(format!( + "{}-{}a", + TEST, + now.unix_timestamp_nanos() / 1_000_000 + ))?; + let zero = ShardId::new(0); + + let mut producer = streamer.create_generic_producer(Default::default()).await?; + + println!("Producing 0..5 ..."); + let mut sequence = 0; + for i in 0..5 { + let message = format!("{i}"); + let receipt = producer.send_to(&stream_key, message)?.await?; + assert_eq!(receipt.stream_key(), &stream_key); + // should always increase + assert!(receipt.sequence() > &sequence); + sequence = *receipt.sequence(); + assert_eq!(receipt.shard_id(), &zero); + } + + let mut options = RedisConsumerOptions::new(ConsumerMode::RealTime); + options.set_auto_stream_reset(AutoStreamReset::Latest); + + let mut half = streamer + .create_consumer(&[stream_key.clone()], options.clone()) + .await? + .into_stream(); + + // Why do we have to wait? We want consumer to have started reading + // before producing any messages. While after `create` returns the consumer + // is ready (connection opened), there is still a small delay to send an `XREAD` + // operation to the server. + sleep(Duration::from_millis(5)).await; + + println!("Producing 5..10 ..."); + for i in 5..10 { + let message = format!("{i}"); + producer.send_to(&stream_key, message)?; + } + + println!("Flush producer ..."); + producer.flush().await?; + + options.set_auto_stream_reset(AutoStreamReset::Earliest); + let mut full = streamer + .create_consumer(&[stream_key.clone()], options) + .await? + .into_stream(); + + let seq = stream_n(&mut half, 5).await?; + assert_eq!(seq, [5, 6, 7, 8, 9]); + println!("Stream latest ... ok"); + + let seq = stream_n(&mut full, 10).await?; + assert_eq!(seq, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + println!("Stream all ... ok"); + + println!("End test case."); + Ok(()) + } + + async fn stream_n> + std::marker::Unpin>( + stream: &mut S, + num: usize, + ) -> anyhow::Result> { + let mut numbers = Vec::new(); + for _ in 0..num { + match stream.next().await { + Some(mess) => { + let mess = mess?; + numbers.push(mess.message().as_str().unwrap().parse::().unwrap()); + } + None => panic!("Stream ended?"), + } + } + Ok(numbers) + } + + Ok(()) +}