Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: into_stream #30

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions sea-streamer-redis/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ mod future;
mod node;
mod options;
mod shard;
mod stream;

use cluster::*;
use future::StreamFuture;
pub use future::{NextFuture, StreamFuture as RedisMessageStream};
use node::*;
pub use options::*;
use shard::*;
pub use stream::*;

use flume::{bounded, unbounded, Receiver, Sender};
use std::{fmt::Debug, future::Future, sync::Arc, time::Duration};
Expand Down Expand Up @@ -209,10 +211,14 @@ impl RedisConsumer {
}
}

#[inline]
fn auto_ack(&self, header: &MessageHeader) -> RedisResult<()> {
Self::auto_ack_static(&self.handle, header)
}

fn auto_ack_static(handle: &Sender<CtrlMsg>, header: &MessageHeader) -> RedisResult<()> {
// unbounded, so never blocks
if self
.handle
if handle
.try_send(CtrlMsg::Ack(
(header.stream_key().clone(), *header.shard_id()),
get_message_id(header),
Expand Down Expand Up @@ -268,6 +274,15 @@ impl RedisConsumer {
}
Ok(())
}

pub fn into_stream<'a>(self) -> RedisMessStream<'a> {
RedisMessStream {
config: self.config,
stream: self.receiver.into_stream(),
handle: self.handle,
read: false,
}
}
}

pub(crate) async fn create_consumer(
Expand Down
59 changes: 59 additions & 0 deletions sea-streamer-redis/src/consumer/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use super::{ConsumerConfig, CtrlMsg, RedisConsumer};
use crate::{RedisErr, RedisResult};
use flume::{r#async::RecvStream, Sender};
use sea_streamer_types::{export::futures::Stream, SharedMessage, StreamErr};
use std::{fmt::Debug, pin::Pin, task::Poll};

pub struct RedisMessStream<'a> {
pub(super) config: ConsumerConfig,
pub(super) stream: RecvStream<'a, RedisResult<SharedMessage>>,
pub(super) handle: Sender<CtrlMsg>,
pub(super) read: bool,
}

impl<'a> Debug for RedisMessStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisMessStream").finish()
}
}

// logic must mirror that of sea-streamer-redis/src/consumer/future.rs

impl<'a> Stream for RedisMessStream<'a> {
type Item = RedisResult<SharedMessage>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
use std::task::Poll::{Pending, Ready};
if !self.read && !self.config.pre_fetch {
self.read = true;
self.handle.try_send(CtrlMsg::Read).ok();
}
match Pin::new(&mut self.stream).poll_next(cx) {
Ready(res) => match res {
Some(Ok(msg)) => {
if self.config.auto_ack
&& RedisConsumer::auto_ack_static(&self.handle, msg.header()).is_err()
{
return Ready(Some(Err(StreamErr::Backend(RedisErr::ConsumerDied))));
}
self.read = false;
Ready(Some(Ok(msg)))
}
Some(Err(err)) => Ready(Some(Err(err))),
None => Ready(Some(Err(StreamErr::Backend(RedisErr::ConsumerDied)))),
},
Pending => Pending,
}
}
}

impl<'a> Drop for RedisMessStream<'a> {
fn drop(&mut self) {
if self.read {
self.handle.try_send(CtrlMsg::Unread).ok();
}
}
}
121 changes: 119 additions & 2 deletions sea-streamer-redis/tests/realtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use util::*;
#[cfg(feature = "test")]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
async fn main() -> anyhow::Result<()> {
async fn realtime_1() -> anyhow::Result<()> {
use sea_streamer_redis::{
AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisStreamer,
};
Expand All @@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> {
};
use std::time::Duration;

const TEST: &str = "realtime";
const TEST: &str = "realtime_1";
env_logger::init();
test(false).await?;
test(true).await?;
Expand Down Expand Up @@ -135,3 +135,120 @@ async fn main() -> anyhow::Result<()> {

Ok(())
}

#[cfg(feature = "test")]
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
async fn realtime_2() -> anyhow::Result<()> {
use sea_streamer_redis::{
AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisResult, RedisStreamer,
};
use sea_streamer_runtime::sleep;
use sea_streamer_types::{
export::futures::{Stream, StreamExt},
Buffer, ConsumerMode, ConsumerOptions, Message, Producer, ShardId, SharedMessage,
StreamKey, Streamer, StreamerUri, Timestamp,
};
use std::time::Duration;

const TEST: &str = "realtime_2";
env_logger::init();
test(false).await?;

async fn test(enable_cluster: bool) -> anyhow::Result<()> {
println!("Enable Cluster = {enable_cluster} ...");

let mut options = RedisConnectOptions::default();
options.set_enable_cluster(enable_cluster);
let streamer = RedisStreamer::connect(
std::env::var("BROKERS_URL")
.unwrap_or_else(|_| "redis://localhost".to_owned())
.parse::<StreamerUri>()
.unwrap(),
options,
)
.await?;
println!("Connect Streamer ... ok");

let now = Timestamp::now_utc();
let stream_key = StreamKey::new(format!(
"{}-{}a",
TEST,
now.unix_timestamp_nanos() / 1_000_000
))?;
let zero = ShardId::new(0);

let mut producer = streamer.create_generic_producer(Default::default()).await?;

println!("Producing 0..5 ...");
let mut sequence = 0;
for i in 0..5 {
let message = format!("{i}");
let receipt = producer.send_to(&stream_key, message)?.await?;
assert_eq!(receipt.stream_key(), &stream_key);
// should always increase
assert!(receipt.sequence() > &sequence);
sequence = *receipt.sequence();
assert_eq!(receipt.shard_id(), &zero);
}

let mut options = RedisConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_stream_reset(AutoStreamReset::Latest);

let mut half = streamer
.create_consumer(&[stream_key.clone()], options.clone())
.await?
.into_stream();

// Why do we have to wait? We want consumer to have started reading
// before producing any messages. While after `create` returns the consumer
// is ready (connection opened), there is still a small delay to send an `XREAD`
// operation to the server.
sleep(Duration::from_millis(5)).await;

println!("Producing 5..10 ...");
for i in 5..10 {
let message = format!("{i}");
producer.send_to(&stream_key, message)?;
}

println!("Flush producer ...");
producer.flush().await?;

options.set_auto_stream_reset(AutoStreamReset::Earliest);
let mut full = streamer
.create_consumer(&[stream_key.clone()], options)
.await?
.into_stream();

let seq = stream_n(&mut half, 5).await?;
assert_eq!(seq, [5, 6, 7, 8, 9]);
println!("Stream latest ... ok");

let seq = stream_n(&mut full, 10).await?;
assert_eq!(seq, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
println!("Stream all ... ok");

println!("End test case.");
Ok(())
}

async fn stream_n<S: Stream<Item = RedisResult<SharedMessage>> + std::marker::Unpin>(
stream: &mut S,
num: usize,
) -> anyhow::Result<Vec<usize>> {
let mut numbers = Vec::new();
for _ in 0..num {
match stream.next().await {
Some(mess) => {
let mess = mess?;
numbers.push(mess.message().as_str().unwrap().parse::<usize>().unwrap());
}
None => panic!("Stream ended?"),
}
}
Ok(numbers)
}

Ok(())
}
Loading