Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pastebin example #62

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pastebin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "pastebin"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
3 changes: 3 additions & 0 deletions pastebin/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}
19 changes: 19 additions & 0 deletions watchdir/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "iroh-watchdir"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.83"
bytes = "1.6.0"
crossbeam-channel = "0.5.12"
futures = "0.3.30"
iroh = "0.16.2"
jwalk = "0.8.1"
notify = "6.1.1"
tokio = "1.37.0"

[dev-dependencies]
tempfile = "3.10.1"
65 changes: 65 additions & 0 deletions watchdir/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::str::FromStr;

use anyhow::Result;
use iroh::docs::DocTicket;

mod sync;

use crate::sync::async_watch;

/// Async, futures channel based event watching
#[tokio::main]
async fn main() -> Result<()> {
let path = std::env::args()
.nth(1)
.expect("Argument 1 needs to be a path");

let mut doc_ticket = None;
if let Some(ticket) = std::env::args().nth(2) {
let ticket = iroh::docs::DocTicket::from_str(ticket.as_str())?;
doc_ticket = Some(ticket);
}
println!("watching {}", path);

let path = std::path::PathBuf::from(&path);
let iroh_path = path.join(".iroh");
let node = iroh::node::Node::persistent(iroh_path)
.await?
.spawn()
.await?;

let doc = open_or_create_document(&node, doc_ticket).await?;
let author = node.authors.create().await?;

tokio::spawn(async_watch(path, doc, author)).await??;

Ok(())
}

async fn open_or_create_document(
node: &iroh::client::MemIroh,
ticket: Option<DocTicket>,
) -> anyhow::Result<iroh::client::MemDoc> {
let doc = match ticket {
Some(ticket) => {
println!("importing ticket: {:?}", ticket);
node.docs.import(ticket).await?
}
None => {
let doc = node.docs.create().await?;
let ticket = doc
.share(
iroh::client::docs::ShareMode::Write,
iroh::base::node_addr::AddrInfoOptions::Relay,
)
.await?;
println!(
"created new doc {:?} write ticket:\n{:?}",
doc.id(),
ticket.to_string()
);
doc
}
};
Ok(doc)
}
295 changes: 295 additions & 0 deletions watchdir/src/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
use std::{
path::{Component, Path, PathBuf},
sync::mpsc::channel,
};

use anyhow::Result;
use futures::StreamExt;
use iroh::docs::AuthorId;
use iroh::{client::MemDoc as Doc, docs::store::Query};
use jwalk::WalkDir;
use notify::{Config, Event, FsEventWatcher, RecommendedWatcher, RecursiveMode, Watcher};

// reconcile performs a full, one-time sync of the filesystem with the doc
async fn reconcile(doc: Doc, base_path: &str, author: AuthorId) -> Result<()> {
// walk the filesystem
for entry in WalkDir::new(base_path).sort(true) {
let path = entry?.path();
if path.is_file() {
let relative = path.strip_prefix(base_path)?;
let key = canonicalized_path_to_bytes(relative, true)?;

let query = Query::single_latest_per_key()
.key_exact(key.clone())
.build();
match doc.get_one(query).await? {
Some(_) => {
// doc.update_file(path).await?;
// TODO - compare hashes and update if necessary
}
None => {
// doc.create_file(path).await?;
let abs_path = path.canonicalize()?;
doc.import_file(author, key, &abs_path, true)
.await?
.finish()
.await?;
}
}
}
}

// walk the document
let query = Query::single_latest_per_key().build();
let mut res = doc.get_many(query).await?;
while let Some(entry) = res.next().await {
let entry = entry?;
let key = String::from_utf8_lossy(entry.key()).to_string();
let path = PathBuf::from(base_path).join(key);
// TODO - add an empty entry check
if !path.exists() {
// doc.delete(key).await?;
doc.export_file(entry, path, iroh::blobs::store::ExportMode::TryReference)
.await?;
}
}

Ok(())
}

// update applies incremental changes to the doc from the filesystem
pub(crate) async fn update(_doc: Doc, _event: Event) -> Result<()> {
todo!("update");
// let mut doc = Doc::from(doc);
// let mut path = path;
// let mut events = watchdir::watch(path.clone()).await?;
// while let Some(event) = events.next().await {
// match event {
// watchdir::Event::Create(path) => {
// let path = path.strip_prefix(&path).unwrap();
// let path = path.to_str().unwrap();
// doc.create(path).await?;
// }
// watchdir::Event::Delete(path) => {
// let path = path.strip_prefix(&path).unwrap();
// let path = path.to_str().unwrap();
// doc.delete(path).await?;
// }
// watchdir::Event::Modify(path) => {
// let path = path.strip_prefix(&path).unwrap();
// let path = path.to_str().unwrap();
// doc.modify(path).await?;
// }
// watchdir::Event::Rename(old, new) => {
// let old = old.strip_prefix(&path).unwrap();
// let old = old.to_str().unwrap();
// let new = new.strip_prefix(&path).unwrap();
// let new = new.to_str().unwrap();
// doc.rename(old, new).await?;
// }
// }
// }
// Ok(())
}

async fn async_watcher() -> Result<(
FsEventWatcher,
std::sync::mpsc::Receiver<notify::Result<Event>>,
)> {
// let (tx, rx) = tokio::sync::mpsc::channel(1000);
let (tx, rx) = channel();
let tx_2 = tx.clone();
// // use the pollwatcher and set a callback for the scanning events
// let watcher = PollWatcher::with_initial_scan(
// move |watch_event| {
// let tx_2 = tx_2.clone();
// // tx_2.blocking_send(Message::Event(watch_event)).unwrap();
// tokio::task::spawn(async move {
// println!("watch_event: {:?}", &watch_event);
// tx_2.send(Message::Event(watch_event)).unwrap();
// });
// },
// Config::default(),
// move |scan_event| {
// // tx_3.blocking_send(Message::Scan(scan_event));
// let tx_3 = tx_3.clone();
// println!("scan_event: {:?}", &scan_event);
// tokio::task::spawn(async move {
// if let Err(err) = tx_3.send(Message::Scan(scan_event)) {
// println!("send error: {:?}", err);
// }
// });
// },
// )?;

// let (tx, rx) = std::sync::mpsc::channel();
// let (mut tx, rx) = tokio::sync::mpsc::channel(1);

// let tx_c = tx.clone();
// // use the pollwatcher and set a callback for the scanning events
// let mut watcher = PollWatcher::with_initial_scan(
// move |watch_event| {
// (|| async {
// tx_c.send(Message::Event(watch_event)).await.unwrap();
// });
// // tokio::task::spawn_blocking(move || async {
// // tx_c.send(Message::Event(watch_event)).await.unwrap();
// // });
// },
// Config::default(),
// move |scan_event| {
// tokio::task::block_in_place(|| async {
// tx.send(Message::Scan(scan_event)).await.unwrap();
// });
// },
// )?;

// // Automatically select the best implementation for your platform.
// // You can also access each implementation directly e.g. INotifyWatcher.
let watcher = RecommendedWatcher::new(
move |res| {
let tx = tx_2.clone();
tokio::task::spawn(async move {
tx.send(res).unwrap();
});
},
Config::default(),
)?;

Ok((watcher, rx))
}

pub async fn async_watch<P: AsRef<Path>>(
path: P,
doc: iroh::client::MemDoc,
author: iroh::docs::AuthorId,
) -> anyhow::Result<()> {
let (mut watcher, rx) = async_watcher().await?;

// start with a reconciliation
reconcile(doc.clone(), path.as_ref().to_str().unwrap(), author).await?;

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;

loop {
let res = rx.recv()?;
match res {
Ok(event) => {
println!("event: {:?}", event);
// update(doc.clone(), event).await?;
println!("event: {:?}", event);
for path in event.paths {
let path = path.canonicalize()?;

// skip .iroh directory
if path
.components()
.any(|c| c == Component::Normal(".iroh".as_ref()))
{
continue;
}

if path.is_file() {
// let key = canonicalized_path_to_bytes(&path, true)?;
let key = bytes::Bytes::from(path.display().to_string());
doc.import_file(author, key, &path, true)
.await?
.finish()
.await?;
}
}
}
Err(e) => {
println!("watch error: {:?}", e);
break;
}
}
}

Ok(())
}

/// This function converts an already canonicalized path to a string.
///
/// If `must_be_relative` is true, the function will fail if any component of the path is
/// `Component::RootDir`
///
/// This function will also fail if the path is non canonical, i.e. contains
/// `..` or `.`, or if the path components contain any windows or unix path
/// separators.
fn canonicalized_path_to_bytes(
path: impl AsRef<Path>,
must_be_relative: bool,
) -> anyhow::Result<bytes::Bytes> {
let mut path_str = String::new();
let parts = path
.as_ref()
.components()
.filter_map(|c| match c {
Component::Normal(x) => {
let c = match x.to_str() {
Some(c) => c,
None => return Some(Err(anyhow::anyhow!("invalid character in path"))),
};

if !c.contains('/') && !c.contains('\\') {
Some(Ok(c))
} else {
Some(Err(anyhow::anyhow!("invalid path component {:?}", c)))
}
}
Component::RootDir => {
if must_be_relative {
Some(Err(anyhow::anyhow!("invalid path component {:?}", c)))
} else {
path_str.push('/');
None
}
}
_ => Some(Err(anyhow::anyhow!("invalid path component {:?}", c))),
})
.collect::<anyhow::Result<Vec<_>>>()?;
let parts = parts.join("/");
path_str.push_str(&parts);
let path_bytes = bytes::Bytes::from(path_str);
Ok(path_bytes)
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_reconcile() -> Result<()> {
// TODO: Add test cases for the reconcile function
let dir = tempfile::tempdir()?;
let node = iroh::node::Node::memory().spawn().await?;
let doc = node.docs.create().await?;
let author = node.authors.create().await?;

doc.set_bytes(author, "test/from/doc.txt", bytes::Bytes::from("hello"))
.await?;

tokio::fs::write(dir.path().join("local_path"), b"hello from fs").await?;
reconcile(doc.clone(), dir.path().to_str().unwrap(), author).await?;

assert_doc_keys(doc.clone(), vec!["local_path", "test/from/doc.txt"]).await?;

Ok(())
}

async fn assert_doc_keys(doc: Doc, keys: Vec<&str>) -> Result<()> {
let query = Query::single_latest_per_key().build();
let mut res = doc.get_many(query).await?;
let mut doc_keys = vec![];
while let Some(entry) = res.next().await {
let entry = entry?;
let key = String::from_utf8_lossy(entry.key()).to_string();
doc_keys.push(key);
}
assert_eq!(doc_keys, keys);
Ok(())
}
}
Loading