Skip to content

Commit

Permalink
Upgrade to tonic 4.0, tokio 1.5
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed May 7, 2021
1 parent 9996b05 commit 186540b
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 17 deletions.
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ homepage = "https://vyuham.github.io/dstore"
edition = "2018"

[dependencies]
tokio = { version = "0.2", features = ["macros", "sync", "stream", "time"] }
futures-core = "0.3"
futures-util = "0.3"
tokio = { version = "1.5", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
bytes = "1"
tonic = "0.3"
prost = "0.6"
tonic = "0.4"
prost = "0.7"

[build-dependencies]
tonic-build = "0.3.0"
tonic-build = "0.4"
2 changes: 1 addition & 1 deletion examples/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::error::Error;
/// Start Global server on defined IP:PORT address
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let addr = "127.0.0.1:50051".parse().unwrap();
let addr = "[::1]:50051".parse().unwrap();
println!("Dstore server listening on {}", addr);
Global::start_server(addr).await
}
12 changes: 7 additions & 5 deletions examples/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ impl REPL {
}

/// Runs the Command Line Interface REPL
async fn run(&mut self) {
async fn run(&mut self) -> Result<(), Box<dyn Error>> {
print!("dstore v0.1.0 (uid: {})\nThis is an experimental database, do contribute to further developments at https://github.com/vyuham/dstore. \nUse `.exit` to exit the repl\ndb > ", self.local.lock().await.addr);
io::stdout().flush().expect("Error");
for cmd in stdin().lock().lines() {
match cmd {
Ok(cmd) => {
self.parse_input(cmd.trim().to_string()).await;
self.parse_input(cmd.trim().to_string()).await?;
}
Err(_) => eprint!("Error in reading command, exiting REPL."),
}
print!("db > ");
io::stdout().flush().expect("Error");
}

Ok(())
}

/// Convert REPL input into actionable commands
Expand Down Expand Up @@ -97,12 +99,12 @@ impl REPL {
async fn main() -> Result<(), Box<dyn Error>> {
// Create a Local with a certain UID, connected to Global on defined address.
// Store reference counted pointer for future use
let global_addr = "127.0.0.1:50051".to_string();
let local_addr = "127.0.0.1:50052".to_string(); // UID for Local
let global_addr = "[::1]:50051".to_string();
let local_addr = "[::1]:50052".to_string(); // UID for Local
let local_store = Local::new(global_addr, local_addr).await?;

// Create REPL interface with reference counted pointer to Local
REPL::new(local_store).await.run().await;
REPL::new(local_store).await.run().await?;

Ok(())
}
10 changes: 6 additions & 4 deletions src/global.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use bytes::Bytes;
use futures_util::StreamExt;
use futures::StreamExt;
use std::{
collections::{HashMap, VecDeque},
net::SocketAddr,
str,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Server, Request, Response, Status};

use crate::{
Expand Down Expand Up @@ -124,15 +125,15 @@ impl Dstore for Global {
}

/// Type to allow streaming og VALUE via RPC
type PullFileStream = mpsc::Receiver<Result<Byte, Status>>;
type PullFileStream = ReceiverStream<Result<Byte, Status>>;

/// RPC that streams VALUE associated with KEY, if it exist on Global
async fn pull_file(
&self,
args: Request<Byte>,
) -> Result<Response<Self::PullFileStream>, Status> {
// Create a double ended channel for transporting VALUE packets processed within thread
let (mut tx, rx) = mpsc::channel(4);
let (tx, rx) = mpsc::channel(4);
let db = self.db.clone();

// Spawn thread to manage partitioning of a large VALUE into packet frames
Expand All @@ -153,7 +154,7 @@ impl Dstore for Global {
}
});

Ok(Response::new(rx))
Ok(Response::new(ReceiverStream::new(rx)))
}

/// RPC to remove KEY mappings on Global and add KEY to invalidate queues of Locals in cluster
Expand Down Expand Up @@ -196,3 +197,4 @@ impl Dstore for Global {
}
}
}

2 changes: 1 addition & 1 deletion src/local.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use futures_util::{stream, StreamExt};
use futures::{stream, StreamExt};
use std::{collections::HashMap, error::Error, sync::Arc};
use tokio::{
sync::Mutex,
Expand Down

0 comments on commit 186540b

Please sign in to comment.