Skip to content

Commit

Permalink
Bump version to 0.2.0 and remove deprecated example
Browse files Browse the repository at this point in the history
  • Loading branch information
korewaChino committed Dec 6, 2024
1 parent dfa3d26 commit 3206fa6
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 108 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion skystreamer-bin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "skystreamer-bin"
version = "0.1.0"
version = "0.2.0"
edition = "2021"

authors = ["Cappy Ishihara <[email protected]>"]
Expand Down
49 changes: 0 additions & 49 deletions skystreamer-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,6 @@ pub struct Consumer {
exporter: Box<dyn exporter::Exporter>,
pub atproto_relay: String,
}
// #[derive(Debug)]
// pub struct TaskQueue {
// workers: Vec<tokio::task::JoinHandle<()>>,
// semaphore: Arc<tokio::sync::Semaphore>,
// }

// impl TaskQueue {
// pub fn new() -> Self {
// TaskQueue {
// workers: Vec::new(),
// semaphore: Arc::new(tokio::sync::Semaphore::new(16)),
// }
// }

// /// Add a new task on the queue from a tokio join handle
// /// Remove the task from the queue when it finishes
// pub fn add_task(&mut self, task: tokio::task::JoinHandle<()>) {
// self.workers.retain(|worker| !worker.is_finished());
// // tracing::info!("Running workers: {}", self.workers.len());
// let semaphore = self.semaphore.clone();
// let worker = tokio::spawn(async move {
// let _permit = semaphore.acquire().await;
// tracing::info!("Available permits: {}", semaphore.available_permits());
// tokio::join!(task).0.unwrap();
// // release permit when task is done
// });
// self.workers.push(worker);
// }

// pub fn handle_interrupt(&mut self) {
// for worker in self.workers.drain(..) {
// self.semaphore.clone().close();
// tracing::info!("Cancelling workers");
// worker.abort();
// }
// }
// }

// impl Default for TaskQueue {
// fn default() -> Self {
// Self::new()
// }
// }

// thread_local! {
// static LOCAL_THREAD_POOL: std::cell::RefCell<TaskQueue> = std::cell::RefCell::new(TaskQueue::new());
// }

// const GLOBAL_THREAD_POOL: OnceCell<ThreadPool> = OnceCell::new();

impl Consumer {
pub fn new(exporter: Box<dyn exporter::Exporter>, relay: &str) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions skystreamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[package]
name = "skystreamer"
version = "0.1.1"
version = "0.2.0"
edition = "2021"

authors = ["Cappy Ishihara <[email protected]>"]
description = "AT Firehose collector for Bluesky"
description = "Idiomatic Rust library for the AT Firehose streaming API"
license = "MIT"
repository = "https://github.com/FyraLabs/skystreamer"
categories = [
Expand Down
23 changes: 11 additions & 12 deletions skystreamer/examples/stream.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
// use color_eyre::Result;
use futures::StreamExt;
use skystreamer::{stream::PostStream, RepoSubscription};
use futures::{pin_mut, StreamExt};
use skystreamer::{stream::EventStream, RepoSubscription};

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create subscription to bsky.network
let subscription = RepoSubscription::new("bsky.network").await.unwrap();
// let subscription = repo;

// Wrap in PostStream
let post_stream = PostStream::new(subscription);
let mut post_stream = post_stream.await;
let stream = post_stream.stream().await?;
let mut binding = EventStream::new(subscription);
let event_stream = binding.stream().await?;

// Pin the stream before processing
futures::pin_mut!(stream);
// let commit_stream = subscription.stream_commits().await;
pin_mut!(event_stream);

// Process posts as they arrive
// should be Result<Post, Error>
while let Some(post) = stream.next().await {
println!("{:?}", post);
while let Some(record) = event_stream.next().await {
// stream unknown record types
if let skystreamer::types::commit::Record::Other(val) = record {
println!("{:?}", val);
}
}

Ok(())
Expand Down
34 changes: 0 additions & 34 deletions skystreamer/examples/stream_raw.rs

This file was deleted.

51 changes: 46 additions & 5 deletions skystreamer/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use crate::types::{commit, Post};
use crate::Result;
use futures::StreamExt;

#[deprecated(note = "Please use [`EventStream`] instead")]
#[deprecated(
note = "Please use [`skystreamer::stream::EventStream`] instead as it provides a more generic interface.",
since = "0.2.0"
)]
pub struct PostStream {
// inner: Box<dyn futures::Stream<Item = Post> + Unpin + Send>,
subscription: crate::RepoSubscription,
Expand Down Expand Up @@ -48,10 +51,27 @@ impl PostStream {
}
}

/// A stream of every event from the firehose.
/// Replaces the old [`PostStream`] type.
///
/// This stream will yield every single event from the firehose,
/// A helper for streaming events from the Firehose.
///
/// This struct wraps a [`crate::RepoSubscription`] and provides a stream of [`commit::Record`]s,
/// which can be used to export a [`futures::Stream`]-compatible stream of [`commit::Record`]s.
///
/// # Example
/// ```no_run
/// use futures::{pin_mut, StreamExt};
/// use skystreamer::{stream::EventStream, RepoSubscription};
///
/// let subscription = RepoSubscription::new("bsky.network").await.unwrap();
/// let mut binding = EventStream::new(subscription);
/// let event_stream = binding.stream().await?;
///
/// pin_mut!(event_stream);
/// // let's stream the data from the firehose!
/// while let Some(record) = event_stream.next().await {
/// // Outputs every known item in the stream
/// println!("{:?}", record);
/// }
/// ```
pub struct EventStream {
subscription: crate::RepoSubscription,
}
Expand Down Expand Up @@ -85,3 +105,24 @@ impl EventStream {
Ok(stream)
}
}


/// Simple helper function to create an [`EventStream`] from a domain directly.
///
/// ```no_run
/// use futures::{pin_mut, StreamExt};
/// use skystreamer::{stream::event_stream};
///
/// let mut event_stream = event_stream("bsky.network").await.unwrap();
/// let stream = event_stream.stream().await.unwrap();
///
/// pin_mut!(stream);
///
/// while let Some(record) = stream.next().await {
/// // do something with your data here
/// }
/// ```
pub async fn event_stream(domain: &str) -> EventStream {
let subscription = crate::RepoSubscription::new(domain).await.unwrap();
EventStream::new(subscription)
}
1 change: 0 additions & 1 deletion skystreamer/src/types/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use chrono::{DateTime, FixedOffset};
use cid::Cid;
use serde::{Deserialize, Serialize};


/// An event where someone likes a post
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LikeEvent {
Expand Down
6 changes: 5 additions & 1 deletion skystreamer/src/types/operation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use atrium_api::{app::bsky, com::atproto::sync::subscribe_repos::RepoOp, types::{CidLink, Collection}};
use atrium_api::{
app::bsky,
com::atproto::sync::subscribe_repos::RepoOp,
types::{CidLink, Collection},
};

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Operation {
Expand Down
1 change: 0 additions & 1 deletion skystreamer/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use atrium_api::types::string::{Cid as ACid, Datetime};
use chrono::{DateTime, FixedOffset};
use cid::Cid;


#[inline]
pub fn datetime_to_chrono(dt: &Datetime) -> DateTime<FixedOffset> {
DateTime::parse_from_rfc3339(dt.as_str()).unwrap()
Expand Down

0 comments on commit 3206fa6

Please sign in to comment.