diff --git a/Cargo.lock b/Cargo.lock index 5b38212..9b521ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4191,7 +4191,7 @@ checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" [[package]] name = "skystreamer" -version = "0.1.1" +version = "0.2.0" dependencies = [ "atrium-api", "chrono", @@ -4212,7 +4212,7 @@ dependencies = [ [[package]] name = "skystreamer-bin" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-trait", "atrium-api", diff --git a/skystreamer-bin/Cargo.toml b/skystreamer-bin/Cargo.toml index 91bf8c5..e854828 100644 --- a/skystreamer-bin/Cargo.toml +++ b/skystreamer-bin/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "skystreamer-bin" -version = "0.1.0" +version = "0.2.0" edition = "2021" authors = ["Cappy Ishihara "] diff --git a/skystreamer-bin/src/main.rs b/skystreamer-bin/src/main.rs index 2ce7a85..978e92c 100644 --- a/skystreamer-bin/src/main.rs +++ b/skystreamer-bin/src/main.rs @@ -17,55 +17,6 @@ pub struct Consumer { exporter: Box, pub atproto_relay: String, } -// #[derive(Debug)] -// pub struct TaskQueue { -// workers: Vec>, -// semaphore: Arc, -// } - -// impl TaskQueue { -// pub fn new() -> Self { -// TaskQueue { -// workers: Vec::new(), -// semaphore: Arc::new(tokio::sync::Semaphore::new(16)), -// } -// } - -// /// Add a new task on the queue from a tokio join handle -// /// Remove the task from the queue when it finishes -// pub fn add_task(&mut self, task: tokio::task::JoinHandle<()>) { -// self.workers.retain(|worker| !worker.is_finished()); -// // tracing::info!("Running workers: {}", self.workers.len()); -// let semaphore = self.semaphore.clone(); -// let worker = tokio::spawn(async move { -// let _permit = semaphore.acquire().await; -// tracing::info!("Available permits: {}", semaphore.available_permits()); -// tokio::join!(task).0.unwrap(); -// // release permit when task is done -// }); -// self.workers.push(worker); -// } - -// pub fn handle_interrupt(&mut self) { -// for worker in self.workers.drain(..) { -// self.semaphore.clone().close(); -// tracing::info!("Cancelling workers"); -// worker.abort(); -// } -// } -// } - -// impl Default for TaskQueue { -// fn default() -> Self { -// Self::new() -// } -// } - -// thread_local! { -// static LOCAL_THREAD_POOL: std::cell::RefCell = std::cell::RefCell::new(TaskQueue::new()); -// } - -// const GLOBAL_THREAD_POOL: OnceCell = OnceCell::new(); impl Consumer { pub fn new(exporter: Box, relay: &str) -> Self { diff --git a/skystreamer/Cargo.toml b/skystreamer/Cargo.toml index e3f3bec..ec5cece 100644 --- a/skystreamer/Cargo.toml +++ b/skystreamer/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "skystreamer" -version = "0.1.1" +version = "0.2.0" edition = "2021" authors = ["Cappy Ishihara "] -description = "AT Firehose collector for Bluesky" +description = "Idiomatic Rust library for the AT Firehose streaming API" license = "MIT" repository = "https://github.com/FyraLabs/skystreamer" categories = [ diff --git a/skystreamer/examples/stream.rs b/skystreamer/examples/stream.rs index 435b26d..01ee199 100644 --- a/skystreamer/examples/stream.rs +++ b/skystreamer/examples/stream.rs @@ -1,6 +1,6 @@ // use color_eyre::Result; -use futures::StreamExt; -use skystreamer::{stream::PostStream, RepoSubscription}; +use futures::{pin_mut, StreamExt}; +use skystreamer::{stream::EventStream, RepoSubscription}; #[tokio::main] pub async fn main() -> Result<(), Box> { @@ -8,18 +8,17 @@ pub async fn main() -> Result<(), Box> { let subscription = RepoSubscription::new("bsky.network").await.unwrap(); // let subscription = repo; - // Wrap in PostStream - let post_stream = PostStream::new(subscription); - let mut post_stream = post_stream.await; - let stream = post_stream.stream().await?; + let mut binding = EventStream::new(subscription); + let event_stream = binding.stream().await?; - // Pin the stream before processing - futures::pin_mut!(stream); + // let commit_stream = subscription.stream_commits().await; + pin_mut!(event_stream); - // Process posts as they arrive - // should be Result - while let Some(post) = stream.next().await { - println!("{:?}", post); + while let Some(record) = event_stream.next().await { + // stream unknown record types + if let skystreamer::types::commit::Record::Other(val) = record { + println!("{:?}", val); + } } Ok(()) diff --git a/skystreamer/examples/stream_raw.rs b/skystreamer/examples/stream_raw.rs deleted file mode 100644 index 90c3f24..0000000 --- a/skystreamer/examples/stream_raw.rs +++ /dev/null @@ -1,34 +0,0 @@ -// use color_eyre::Result; -use futures::{pin_mut, StreamExt}; -use skystreamer::{stream::EventStream, RepoSubscription}; - -#[tokio::main] -pub async fn main() -> Result<(), Box> { - // Create subscription to bsky.network - let subscription = RepoSubscription::new("bsky.network").await.unwrap(); - // let subscription = repo; - - let mut binding = EventStream::new(subscription); - let event_stream = binding.stream().await?; - - // let commit_stream = subscription.stream_commits().await; - pin_mut!(event_stream); - - while let Some(record) = event_stream.next().await { - // let c = Commit::from(&commit.unwrap()); - - // let a = c.extract_records().await; - - // if !a.is_empty() { - // println!("{:?}", a); - // } - - // println!("{:?}", record); - - if let skystreamer::types::commit::Record::Other(val) = record { - println!("{:?}", val); - } - } - - Ok(()) -} diff --git a/skystreamer/src/stream.rs b/skystreamer/src/stream.rs index 0a882c0..e8b2279 100644 --- a/skystreamer/src/stream.rs +++ b/skystreamer/src/stream.rs @@ -7,7 +7,10 @@ use crate::types::{commit, Post}; use crate::Result; use futures::StreamExt; -#[deprecated(note = "Please use [`EventStream`] instead")] +#[deprecated( + note = "Please use [`skystreamer::stream::EventStream`] instead as it provides a more generic interface.", + since = "0.2.0" +)] pub struct PostStream { // inner: Box + Unpin + Send>, subscription: crate::RepoSubscription, @@ -48,10 +51,27 @@ impl PostStream { } } -/// A stream of every event from the firehose. -/// Replaces the old [`PostStream`] type. -/// -/// This stream will yield every single event from the firehose, +/// A helper for streaming events from the Firehose. +/// +/// This struct wraps a [`crate::RepoSubscription`] and provides a stream of [`commit::Record`]s, +/// which can be used to export a [`futures::Stream`]-compatible stream of [`commit::Record`]s. +/// +/// # Example +/// ```no_run +/// use futures::{pin_mut, StreamExt}; +/// use skystreamer::{stream::EventStream, RepoSubscription}; +/// +/// let subscription = RepoSubscription::new("bsky.network").await.unwrap(); +/// let mut binding = EventStream::new(subscription); +/// let event_stream = binding.stream().await?; +/// +/// pin_mut!(event_stream); +/// // let's stream the data from the firehose! +/// while let Some(record) = event_stream.next().await { +/// // Outputs every known item in the stream +/// println!("{:?}", record); +/// } +/// ``` pub struct EventStream { subscription: crate::RepoSubscription, } @@ -85,3 +105,24 @@ impl EventStream { Ok(stream) } } + + +/// Simple helper function to create an [`EventStream`] from a domain directly. +/// +/// ```no_run +/// use futures::{pin_mut, StreamExt}; +/// use skystreamer::{stream::event_stream}; +/// +/// let mut event_stream = event_stream("bsky.network").await.unwrap(); +/// let stream = event_stream.stream().await.unwrap(); +/// +/// pin_mut!(stream); +/// +/// while let Some(record) = stream.next().await { +/// // do something with your data here +/// } +/// ``` +pub async fn event_stream(domain: &str) -> EventStream { + let subscription = crate::RepoSubscription::new(domain).await.unwrap(); + EventStream::new(subscription) +} diff --git a/skystreamer/src/types/feed.rs b/skystreamer/src/types/feed.rs index 9995460..eb1b052 100644 --- a/skystreamer/src/types/feed.rs +++ b/skystreamer/src/types/feed.rs @@ -9,7 +9,6 @@ use chrono::{DateTime, FixedOffset}; use cid::Cid; use serde::{Deserialize, Serialize}; - /// An event where someone likes a post #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LikeEvent { diff --git a/skystreamer/src/types/operation.rs b/skystreamer/src/types/operation.rs index 1190877..806e7b1 100644 --- a/skystreamer/src/types/operation.rs +++ b/skystreamer/src/types/operation.rs @@ -1,4 +1,8 @@ -use atrium_api::{app::bsky, com::atproto::sync::subscribe_repos::RepoOp, types::{CidLink, Collection}}; +use atrium_api::{ + app::bsky, + com::atproto::sync::subscribe_repos::RepoOp, + types::{CidLink, Collection}, +}; #[derive(Debug, Clone, PartialEq, Eq)] pub enum Operation { diff --git a/skystreamer/src/util.rs b/skystreamer/src/util.rs index 7f0979b..79d96dd 100644 --- a/skystreamer/src/util.rs +++ b/skystreamer/src/util.rs @@ -2,7 +2,6 @@ use atrium_api::types::string::{Cid as ACid, Datetime}; use chrono::{DateTime, FixedOffset}; use cid::Cid; - #[inline] pub fn datetime_to_chrono(dt: &Datetime) -> DateTime { DateTime::parse_from_rfc3339(dt.as_str()).unwrap()