From 2d5aee90b89b95ca9a223598ad54dcab8568996a Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Wed, 7 Aug 2024 21:35:43 +0100 Subject: [PATCH] Stream join components --- Cargo.toml | 1 + sea-streamer-fuse/Cargo.toml | 26 +++ sea-streamer-fuse/src/lib.rs | 289 ++++++++++++++++++++++++++++++ sea-streamer-types/src/message.rs | 1 + 4 files changed, 317 insertions(+) create mode 100644 sea-streamer-fuse/Cargo.toml create mode 100644 sea-streamer-fuse/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 7ccb738..9bc10e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "examples/price-feed", "benchmark", "sea-streamer-file", + "sea-streamer-fuse", "sea-streamer-kafka", "sea-streamer-redis", "sea-streamer-redis/redis-streams-dump", diff --git a/sea-streamer-fuse/Cargo.toml b/sea-streamer-fuse/Cargo.toml new file mode 100644 index 0000000..34e6846 --- /dev/null +++ b/sea-streamer-fuse/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "sea-streamer-fuse" +version = "0.5.0" +authors = ["Chris Tsang "] +edition = "2021" +description = "Stream processing toolbox" +license = "MIT OR Apache-2.0" +documentation = "https://docs.rs/sea-streamer-fuse" +repository = "https://github.com/SeaQL/sea-streamer" +categories = ["concurrency"] +keywords = ["async", "stream", "stream-processing"] +rust-version = "1.60" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[dependencies] +thiserror = { version = "1", default-features = false } +pin-project = { version = "1.1" } + +sea-streamer-types = { version = "0.5", path = "../sea-streamer-types" } + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } +sea-streamer-socket = { version = "0.5", path = "../sea-streamer-socket" } diff --git a/sea-streamer-fuse/src/lib.rs b/sea-streamer-fuse/src/lib.rs new file mode 100644 index 0000000..020545e --- /dev/null +++ b/sea-streamer-fuse/src/lib.rs @@ -0,0 +1,289 @@ +use pin_project::pin_project; +use sea_streamer_types::{export::futures::Stream, Message, StreamKey}; +use std::{ + collections::{BTreeMap, VecDeque}, + pin::Pin, + task::Poll, +}; + +type Keys = BTreeMap>; + +/// Join two streams, but reorder messages by timestamp. +/// Since a stream can potentially infinite and the streams in the Stream cannot be known priori, +/// the internal buffer can potentially grow infinite. +/// +/// `align()` must be called manually to specify which streams to be aligned. +/// +/// Messages within each stream are assumed to be causal. +/// +/// A typical use would be to join two streams from different sources, each with a different update frequency. +/// Messages from the fast stream will be buffered, until a message from the slow stream arrives. +/// +/// ```ignore +/// fast | (1) (2) (3) (4) (5) +/// slow | (2) (6) +/// ``` +/// +/// In the example above, messages 1, 2 from fast will be buffered, until 2 from the slow stream arrives. +/// Likewise, messages 3, 4, 5 will be buffered until 6 arrives. +/// +/// If two messages has the same timestamp, the order will be determined by the alphabetic order of the stream keys. +#[pin_project] +pub struct StreamJoin +where + S: Stream>, + M: Message, + E: std::error::Error, +{ + #[pin] + fused: S, + keys: Keys, + ended: bool, +} + +impl StreamJoin +where + S: Stream>, + M: Message, + E: std::error::Error, +{ + /// Takes an already multiplexed stream + pub fn fused(fused: S) -> Self { + Self { + fused, + keys: Default::default(), + ended: false, + } + } + + /// Add a stream key that needs to be joined. You can call this multiple times + pub fn align(&mut self, stream_key: StreamKey) { + self.keys.insert(stream_key, Default::default()); + } + + fn next(keys: &mut Keys) -> Option { + let mut min_key = None; + let mut min_ts = None; + for (k, ms) in keys.iter() { + if let Some(m) = ms.front() { + let m_ts = m.timestamp(); + if min_ts.is_none() || m_ts < min_ts.unwrap() { + min_ts = Some(m_ts); + min_key = Some(k.clone()); + } + } + } + if let Some(min_key) = min_key { + Some( + keys.get_mut(&min_key) + .unwrap() + .pop_front() + .expect("Checked above"), + ) + } else { + // all streams ended + None + } + } +} + +impl Stream for StreamJoin +where + S: Stream>, + M: Message, + E: std::error::Error, +{ + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let mut this = self.project(); + while !*this.ended { + match this.fused.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(mes))) => { + let key = mes.stream_key(); + this.keys.entry(key).or_default().push_back(mes); + } + Poll::Ready(Some(Err(err))) => { + *this.ended = true; + return Poll::Ready(Some(Err(err))); + } + Poll::Ready(None) => { + *this.ended = true; + break; + } + Poll::Pending => return Poll::Pending, + } + if !this.keys.values().any(|ms| ms.is_empty()) { + // if none of the streams are empty + break; + } + } + Poll::Ready(Self::next(this.keys).map(Ok)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use sea_streamer_socket::{BackendErr, SeaMessage, SeaMessageStream}; + use sea_streamer_types::{ + export::futures::{self, TryStreamExt}, + MessageHeader, OwnedMessage, StreamErr, Timestamp, + }; + + // just to see if this compiles + #[allow(dead_code)] + fn wrap<'a>( + s: SeaMessageStream<'a>, + ) -> StreamJoin, SeaMessage<'a>, StreamErr> { + StreamJoin::fused(s) + } + + fn make_seq(key: StreamKey, items: &[u64]) -> Vec> { + items + .iter() + .copied() + .map(|i| { + Ok(OwnedMessage::new( + MessageHeader::new( + key.clone(), + Default::default(), + i, + Timestamp::from_unix_timestamp(i as i64).unwrap(), + ), + Vec::new(), + )) + }) + .collect() + } + + fn compare(messages: Vec, expected: &[(&str, u64)]) { + assert_eq!(messages.len(), expected.len()); + for (i, m) in messages.iter().enumerate() { + assert_eq!(m.stream_key().name(), expected[i].0); + assert_eq!(m.sequence(), expected[i].1); + } + } + + #[tokio::test] + async fn test_mux_streams_2() { + let a = StreamKey::new("a").unwrap(); + let b = StreamKey::new("b").unwrap(); + let stream = futures::stream::iter( + make_seq(a.clone(), &[1, 3, 5, 7, 9]) + .into_iter() + .chain(make_seq(b.clone(), &[2, 4, 6, 8, 10]).into_iter()), + ); + let mut join = StreamJoin::fused(stream); + join.align(a); + join.align(b); + let messages: Vec<_> = join.try_collect().await.unwrap(); + compare( + messages, + &[ + ("a", 1), + ("b", 2), + ("a", 3), + ("b", 4), + ("a", 5), + ("b", 6), + ("a", 7), + ("b", 8), + ("a", 9), + ("b", 10), + ], + ); + } + + #[tokio::test] + async fn test_mux_streams_2_2() { + let a = StreamKey::new("a").unwrap(); + let b = StreamKey::new("b").unwrap(); + let stream = futures::stream::iter( + make_seq(a.clone(), &[1, 2, 5, 8, 9]) + .into_iter() + .chain(make_seq(b.clone(), &[3, 4, 6, 7, 10]).into_iter()), + ); + let mut join = StreamJoin::fused(stream); + join.align(a); + join.align(b); + let messages: Vec<_> = join.try_collect().await.unwrap(); + compare( + messages, + &[ + ("a", 1), + ("a", 2), + ("b", 3), + ("b", 4), + ("a", 5), + ("b", 6), + ("b", 7), + ("a", 8), + ("a", 9), + ("b", 10), + ], + ); + } + + #[tokio::test] + async fn test_mux_streams_3() { + let a = StreamKey::new("a").unwrap(); + let b = StreamKey::new("b").unwrap(); + let c = StreamKey::new("c").unwrap(); + let stream = futures::stream::iter( + make_seq(a.clone(), &[1, 3, 5, 7, 9]) + .into_iter() + .chain(make_seq(c.clone(), &[5]).into_iter()) + .chain(make_seq(b.clone(), &[2, 4, 6, 8, 10]).into_iter()), + ); + let mut join = StreamJoin::fused(stream); + join.align(a); + join.align(b); + join.align(c); + let messages: Vec<_> = join.try_collect().await.unwrap(); + compare( + messages, + &[ + ("a", 1), + ("b", 2), + ("a", 3), + ("b", 4), + ("a", 5), + ("c", 5), + ("b", 6), + ("a", 7), + ("b", 8), + ("a", 9), + ("b", 10), + ], + ); + } + + #[tokio::test] + async fn test_mux_streams_4() { + let a = StreamKey::new("a").unwrap(); + let b = StreamKey::new("b").unwrap(); + let c = StreamKey::new("c").unwrap(); + let d = StreamKey::new("d").unwrap(); + let stream = futures::stream::iter( + make_seq(a.clone(), &[1, 3]) + .into_iter() + .chain(make_seq(d.clone(), &[5]).into_iter()) + .chain(make_seq(b.clone(), &[2, 4]).into_iter()) + .chain(make_seq(c.clone(), &[3]).into_iter()), + ); + let mut join = StreamJoin::fused(stream); + join.align(a); + join.align(b); + join.align(c); + join.align(d); + let messages: Vec<_> = join.try_collect().await.unwrap(); + compare( + messages, + &[("a", 1), ("b", 2), ("a", 3), ("c", 3), ("b", 4), ("d", 5)], + ); + } +} diff --git a/sea-streamer-types/src/message.rs b/sea-streamer-types/src/message.rs index 1156d5a..da11967 100644 --- a/sea-streamer-types/src/message.rs +++ b/sea-streamer-types/src/message.rs @@ -93,6 +93,7 @@ pub trait Message: Send { ) } + /// tuple to uniquely identify a message fn identifier(&self) -> (StreamKey, ShardId, SeqNo) { (self.stream_key(), self.shard_id(), self.sequence()) }