Skip to content

Commit

Permalink
deprecate PostStream, implmement generic handler for unsupported types
Browse files Browse the repository at this point in the history
  • Loading branch information
korewaChino committed Dec 6, 2024
1 parent 1c45739 commit ef7e295
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 6 deletions.
4 changes: 2 additions & 2 deletions skystreamer/examples/stream_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {

// println!("{:?}", record);

if let skystreamer::types::commit::Record::Other(_) = record {
println!("{:?}", record);
if let skystreamer::types::commit::Record::Other(val) = record {
println!("{:?}", val);
}
}

Expand Down
6 changes: 4 additions & 2 deletions skystreamer/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
use crate::types::{commit, Post};
use crate::Result;
use futures::StreamExt;
// a commit turns into a block of Posts
// is there a way to turn a block of posts into a stream of individual posts?

#[deprecated(note = "Please use [`EventStream`] instead")]
pub struct PostStream {
// inner: Box<dyn futures::Stream<Item = Post> + Unpin + Send>,
subscription: crate::RepoSubscription,
}

#[allow(deprecated)]
impl PostStream {
pub async fn new(inner: crate::RepoSubscription) -> Self {
PostStream {
Expand Down Expand Up @@ -50,6 +50,8 @@ impl PostStream {

/// A stream of every event from the firehose.
/// Replaces the old [`PostStream`] type.
///
/// This stream will yield every single event from the firehose,
pub struct EventStream {
subscription: crate::RepoSubscription,
}
Expand Down
9 changes: 7 additions & 2 deletions skystreamer/src/types/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ pub enum Record {
Repost(Box<RepostEvent>),
ListItem(Box<ListItemEvent>),
Profile(Box<Profile>),
Other(String),
// Other(Box<serde::de::value::>),
Other((Operation, Box<serde_json::Value>)),
}

impl Record {
Expand Down Expand Up @@ -105,7 +106,11 @@ impl Record {

other => {
tracing::trace!("Unhandled operation: {:?}", other);
records.push(Record::Other(format!("{:?}", other)));
// todo: some kind of generic Serde value?
records.push(Record::Other(*Box::new((
other.clone(),
serde_ipld_dagcbor::from_reader(&mut item.as_slice())?,
))));
}
}

Expand Down

0 comments on commit ef7e295

Please sign in to comment.