Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rust async functions #483

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions c/opendht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,24 @@ void dht_runner_bootstrap(dht_runner* r, const char* host, const char* service)
runner->bootstrap(host);
}

void dht_runner_bootstrap2(dht_runner* r, struct sockaddr *addrs[], socklen_t addrs_len[], dht_done_cb done_cb, void* cb_user_data) {
auto runner = reinterpret_cast<dht::DhtRunner*>(r);

std::vector<dht::SockAddr> sa;

size_t i = 0;
while(addrs != nullptr && addrs[i] != nullptr) {
sa.push_back(dht::SockAddr(addrs[i], addrs_len[i]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emplace_back

i++;
}

auto cb = [done_cb, cb_user_data](bool success) {
done_cb(success, cb_user_data);
};

runner->bootstrap(sa, cb);
}

void dht_runner_get(dht_runner* r, const dht_infohash* h, dht_get_cb cb, dht_done_cb done_cb, void* cb_user_data) {
auto runner = reinterpret_cast<dht::DhtRunner*>(r);
auto hash = reinterpret_cast<const dht::InfoHash*>(h);
Expand Down
2 changes: 2 additions & 0 deletions c/opendht_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ OPENDHT_C_PUBLIC const char* dht_value_get_user_type(const dht_value* data);
typedef bool (*dht_get_cb)(const dht_value* value, void* user_data);
typedef bool (*dht_value_cb)(const dht_value* value, bool expired, void* user_data);
typedef void (*dht_done_cb)(bool ok, void* user_data);
typedef void (*dht_bootstrap_cb)(bool ok);
typedef void (*dht_shutdown_cb)(void* user_data);

struct OPENDHT_C_PUBLIC dht_op_token;
Expand Down Expand Up @@ -139,6 +140,7 @@ OPENDHT_C_PUBLIC void dht_runner_run(dht_runner* runner, in_port_t port);
OPENDHT_C_PUBLIC void dht_runner_run_config(dht_runner* runner, in_port_t port, const dht_runner_config* config);
OPENDHT_C_PUBLIC void dht_runner_ping(dht_runner* runner, struct sockaddr* addr, socklen_t addr_len);
OPENDHT_C_PUBLIC void dht_runner_bootstrap(dht_runner* runner, const char* host, const char* service);
OPENDHT_C_PUBLIC void dht_runner_bootstrap2(dht_runner* r, struct sockaddr *addrs[], socklen_t addrs_len[], dht_done_cb done_cb, void* cb_user_data);
OPENDHT_C_PUBLIC void dht_runner_get(dht_runner* runner, const dht_infohash* hash, dht_get_cb cb, dht_done_cb done_cb, void* cb_user_data);
OPENDHT_C_PUBLIC dht_op_token* dht_runner_listen(dht_runner* runner, const dht_infohash* hash, dht_value_cb cb, dht_shutdown_cb done_cb, void* cb_user_data);
OPENDHT_C_PUBLIC void dht_runner_cancel_listen(dht_runner* runner, const dht_infohash* hash, dht_op_token* token);
Expand Down
3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ edition = "2018"

[dependencies]
libc="0.2.0"
os_socketaddr="0.1.0"
os_socketaddr="0.1.0"
futures="0.3.4"
69 changes: 69 additions & 0 deletions rust/examples/dhtnode_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (C) 2014-2020 Savoir-faire Linux Inc.
* Author: Sébastien Blin <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

extern crate opendht;
use std::{thread, time};

use opendht::{ InfoHash, DhtRunner, DhtRunnerConfig, Value };
// use opendht::crypto::*;
use futures::prelude::*;

fn main() {
println!("{}", InfoHash::random());
println!("{}", InfoHash::new());
println!("{}", InfoHash::new().is_zero());
println!("{}", InfoHash::get("alice"));
println!("{}", InfoHash::get("alice").is_zero());


let mut dht = DhtRunner::new();
let /*mut*/ config = DhtRunnerConfig::new();
//// If you want to inject a certificate, uncomment the following lines and previous mut.
//// Note: you can generate a certificate with
//// openssl req -x509 -newkey rsa:4096 -sha256 -days 3650 -nodes -keyout example.key -out example.crt -subj /CN=example.com
//let cert = DhtCertificate::import("example.crt").ok().expect("Invalid cert file");
//let pk = PrivateKey::import("example.key", "");
//config.set_identity(cert, pk);
dht.run_config(1412, config);
use std::net::ToSocketAddrs;
let addrs = "bootstrap.jami.net:4222".to_socket_addrs().unwrap();

futures::executor::block_on(async {
let r = dht.bootstrap_async(addrs).await;

println!("Current node id: {}", dht.node_id());

let mut stream = dht.get_async(&InfoHash::get("bob"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imho, we can do like reqwest crate. Async by default and a sync Dht builder if someone needs to do some sync operations. So we can have DhtRunner::new(); and DhtRunner::blocking::new(), then .get() for both versions (https://docs.rs/reqwest/0.10.1/reqwest/blocking/index.html vs https://docs.rs/reqwest/0.10.1/reqwest/)

If you want I can take some time to modify this :). I think adding _async everywhere is not the best way


while let Ok(Some(value)) = stream.try_next().await {
println!("GOT: VALUE - value: {}", value);
}

dht.put_async(&InfoHash::get("bob"), Value::new("hi!"), false).await;

println!("Start listening /foo (sleep 10s)");
let mut stream = dht.listen_async(&InfoHash::get("foo"));
let one_min = time::Duration::from_secs(10);
thread::sleep(one_min);
while let Some((v, expired)) = stream.next().await {
println!("LISTEN: DONE CB - v: {} - expired: {}", v, expired);
}
});

println!("Public ips: {:#?}", dht.public_addresses());
}
152 changes: 150 additions & 2 deletions rust/src/dhtrunner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use std::ptr;
pub use crate::ffi::*;
use std::net::SocketAddr;
use os_socketaddr::OsSocketAddr;
use futures::prelude::*;
use futures::channel::mpsc;

impl DhtRunnerConfig {

Expand Down Expand Up @@ -114,13 +116,49 @@ extern fn get_handler_cb(v: *mut Value, ptr: *mut c_void) -> bool {
}
}

extern fn get_async_handler_cb(v: *mut Value, ptr: *mut c_void) -> bool {
if ptr.is_null() {
return true;
}
let f = unsafe {
let tx = ptr as *mut mpsc::UnboundedSender<Option<std::io::Result<Box<Value>>>>;
(*tx).send(Some(Ok((*v).boxed())))
};
futures::executor::block_on(f).is_ok()
}

extern fn done_handler_cb(ok: bool, ptr: *mut c_void) {
unsafe {
let handler = Box::from_raw(ptr as *mut GetHandler);
(*handler.done_cb)(ok)
}
}

extern fn done_async_handler_cb(ok: bool, ptr: *mut c_void) {
if ptr.is_null() {
return;
}

let mut tx = unsafe {
let ptr = ptr as *mut mpsc::UnboundedSender<Option<std::io::Result<Box<Value>>>>;
Box::from_raw(ptr)
};
let item = if ok { None } else { Some(Err(std::io::Error::new(std::io::ErrorKind::Other, "get failed"))) };
let _ = futures::executor::block_on((*tx).send(item));
}

extern fn bootstrap_done_async_handler_cb(ok: bool, ptr: *mut c_void) {
if ptr.is_null() {
return;
}

let tx = unsafe {
let ptr = ptr as *mut futures::channel::oneshot::Sender<bool>;
Box::from_raw(ptr)
};
let _ = (*tx).send(ok);
}

struct PutHandler<'a>
{
done_cb: &'a mut(dyn FnMut(bool))
Expand Down Expand Up @@ -165,6 +203,14 @@ extern fn listen_handler_done(ptr: *mut c_void) {
}
}

struct VerboseDrop<'a, T>(T, &'a str);

impl<'a, T> VerboseDrop<'a, T> {
fn drop(&mut self) {
println!("{}", self.1);
}
}

impl DhtRunner {
pub fn new() -> Box<DhtRunner> {
unsafe {
Expand All @@ -187,11 +233,30 @@ impl DhtRunner {
pub fn bootstrap(&mut self, host: &str, service: u16) {
unsafe {
dht_runner_bootstrap(&mut *self,
CString::new(host).unwrap().as_ptr(),
CString::new(service.to_string()).unwrap().as_ptr())
CString::new(host).unwrap().as_ptr(),
CString::new(service.to_string()).unwrap().as_ptr())
}
}

pub async fn bootstrap_async<A: Iterator<Item=SocketAddr>>(&mut self, addrs: A) -> std::io::Result<bool> {
let socks: Vec<OsSocketAddr> = addrs.map(|a| a.into()).collect();
let sizes: Vec<libc::socklen_t> = socks.iter().map(|s| s.len()).collect();

let (tx, rx) = futures::channel::oneshot::channel();

let tx = Box::new(tx);
let tx = Box::into_raw(tx) as *mut c_void;

unsafe {
dht_runner_bootstrap2(&mut *self, socks.as_ptr() as *const *const _,
sizes.as_ptr() as *const *const _, bootstrap_done_async_handler_cb as *mut c_void, tx);
}

let success = rx.await.expect("bootstrap_async() sender was dropped unexpectedly");

Ok(success)
}

pub fn node_id(&self) -> InfoHash {
unsafe {
dht_runner_get_node_id(&*self)
Expand All @@ -217,6 +282,19 @@ impl DhtRunner {
}
}

pub fn get_async(&mut self, h: &InfoHash)
-> impl TryStream<Ok=Box<Value>, Error=std::io::Error> + Unpin {
let (tx, rx) = mpsc::unbounded();
let tx = Box::new(tx);
let tx = Box::into_raw(tx) as *mut c_void;

unsafe {
dht_runner_get(&mut *self, h, get_async_handler_cb, done_async_handler_cb, tx)
}
rx.take_while(|item: &Option<_>| futures::future::ready(item.is_some()))
.filter_map(|item| futures::future::ready(item))
}

pub fn put<'a>(&mut self, h: &InfoHash, v: Box<Value>,
done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) {
let handler = Box::new(PutHandler {
Expand All @@ -228,6 +306,21 @@ impl DhtRunner {
}
}

pub async fn put_async(&mut self, h: &InfoHash, v: Box<Value>, permanent: bool) -> bool {
let (tx, rx) = futures::channel::oneshot::channel();
let mut tx = Some(tx);

let mut done_cb = move |success| {
if let Some(tx) = tx.take() {
tx.send(success).expect("put_async() receiver was dropped unexpectedly");
}
};

self.put(h, v, &mut done_cb, permanent);

rx.await.expect("put_async() sender was dropped unexpectedly")
}

pub fn put_signed<'a>(&mut self, h: &InfoHash, v: Box<Value>,
done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) {
let handler = Box::new(PutHandler {
Expand All @@ -239,6 +332,21 @@ impl DhtRunner {
}
}

pub async fn put_signed_async(&mut self, h: &InfoHash, v: Box<Value>, permanent: bool) -> bool {
let (tx, rx) = futures::channel::oneshot::channel();
let mut tx = Some(tx);

let mut done_cb = move |success| {
if let Some(tx) = tx.take() {
tx.send(success).expect("put_signed_async() receiver was dropped unexpectedly");
}
};

self.put_signed(h, v, &mut done_cb, permanent);

rx.await.expect("put_signed_async() sender was dropped unexpectedly")
}

pub fn put_encrypted<'a>(&mut self, h: &InfoHash, to: &InfoHash, v: Box<Value>,
done_cb: &'a mut(dyn FnMut(bool)), permanent: bool) {
let handler = Box::new(PutHandler {
Expand All @@ -250,6 +358,22 @@ impl DhtRunner {
}
}

pub async fn put_encrypted_async(&mut self, h: &InfoHash, to: &InfoHash, v: Box<Value>,
permanent: bool) -> bool {
let (tx, rx) = futures::channel::oneshot::channel();
let mut tx = Some(tx);

let mut done_cb = move |success| {
if let Some(tx) = tx.take() {
tx.send(success).expect("put_encrypted_async() receiver was dropped unexpectedly");
}
};

self.put_encrypted(h, to, v, &mut done_cb, permanent);

rx.await.expect("put_encrypted_async() sender was dropped unexpectedly")
}

pub fn cancel_put<'a>(&mut self, h: &InfoHash, vid: u64) {
unsafe {
dht_runner_cancel_put(&mut *self, h, vid)
Expand All @@ -267,6 +391,20 @@ impl DhtRunner {
}
}

pub fn listen_async(&mut self, h: &InfoHash)
-> impl Stream<Item=(Box<Value>, bool)> + Unpin
{
let (mut tx, rx) = mpsc::unbounded();

let mut value_cb = move |v, expired| {
futures::executor::block_on(tx.send((v, expired))).is_ok()
};

let _token = self.listen(h, &mut value_cb);

return Box::pin(rx);
}

pub fn cancel_listen(&mut self, h: &InfoHash, token: Box<OpToken>) {
unsafe {
dht_runner_cancel_listen(&mut *self, h, &*token)
Expand All @@ -282,6 +420,16 @@ impl DhtRunner {
}
}

pub async fn shutdown_async<'a>(&'a mut self) -> bool {
let (tx, rx) = futures::channel::oneshot::channel();
let tx = Box::new(tx);
let ptr = Box::into_raw(tx) as *mut c_void;

self.shutdown(done_async_handler_cb, ptr);

rx.await.expect("shutdown_async() sender was dropped unexpectedly")
}

pub fn public_addresses(&self) -> Vec<SocketAddr> {
let mut result = Vec::new();
unsafe {
Expand Down
3 changes: 3 additions & 0 deletions rust/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ extern {
pub fn dht_runner_run(dht: *mut DhtRunner, port: in_port_t);
pub fn dht_runner_run_config(dht: *mut DhtRunner, port: in_port_t, config: *const DhtRunnerConfig);
pub fn dht_runner_bootstrap(dht: *mut DhtRunner, host: *const c_char, service: *const c_char);
pub fn dht_runner_bootstrap2(dht: *mut DhtRunner, addrs: *const *const OsSocketAddr,
addr_lens: *const *const libc::socklen_t,
done_cb: *mut c_void, cb_user_data: *mut c_void);
pub fn dht_runner_get(dht: *mut DhtRunner, h: *const InfoHash,
get_cb: extern fn(*mut Value, *mut c_void) -> bool,
done_cb: extern fn(bool, *mut c_void),
Expand Down