From f97f12bbdb8009c00ec756f6da56b1e4cf02b96d Mon Sep 17 00:00:00 2001 From: Lunar Date: Mon, 1 Feb 2021 14:49:11 +0100 Subject: [PATCH] WIP: add an example of a bidirectional gRPC call via tonic --- Cargo.toml | 1 + tonic-bidirectional/Cargo.toml | 17 +++++ tonic-bidirectional/build.rs | 8 ++ tonic-bidirectional/proto/echo.proto | 15 ++++ tonic-bidirectional/src/echo_rpc.rs | 42 +++++++++++ tonic-bidirectional/src/echo_service.rs | 96 ++++++++++++++++++++++++ tonic-bidirectional/src/grpc_api.rs | 13 ++++ tonic-bidirectional/src/main.rs | 97 +++++++++++++++++++++++++ 8 files changed, 289 insertions(+) create mode 100644 tonic-bidirectional/Cargo.toml create mode 100644 tonic-bidirectional/build.rs create mode 100644 tonic-bidirectional/proto/echo.proto create mode 100644 tonic-bidirectional/src/echo_rpc.rs create mode 100644 tonic-bidirectional/src/echo_service.rs create mode 100644 tonic-bidirectional/src/grpc_api.rs create mode 100644 tonic-bidirectional/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 9886e48d..b25c9969 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ members = [ "template_tinytemplate", "template_yarte", "todo", + "tonic-bidirectional", "udp-echo", "unix-socket", "web-cors/backend", diff --git a/tonic-bidirectional/Cargo.toml b/tonic-bidirectional/Cargo.toml new file mode 100644 index 00000000..cb71ed3d --- /dev/null +++ b/tonic-bidirectional/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "tonic-bidirectional" +version = "0.1.0" +authors = ["Lunar"] +edition = "2018" + +[dependencies] +tonic = "0.4" +actix = "0.10" +actix-rt = "1.1" +env_logger = "0.8" +log = "0.4" +futures = "0.3" +prost = "0.7" + +[build-dependencies] +tonic-build = { version = "0.4" } \ No newline at end of file diff --git a/tonic-bidirectional/build.rs b/tonic-bidirectional/build.rs new file mode 100644 index 00000000..80ca8945 --- /dev/null +++ b/tonic-bidirectional/build.rs @@ -0,0 +1,8 @@ +fn main() -> Result<(), Box> { + tonic_build::configure() + .compile( + &["proto/echo.proto"], + &["proto"], + )?; + Ok(()) + } \ No newline at end of file diff --git a/tonic-bidirectional/proto/echo.proto b/tonic-bidirectional/proto/echo.proto new file mode 100644 index 00000000..3efab5f9 --- /dev/null +++ b/tonic-bidirectional/proto/echo.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package echo; + +service Echo { + rpc Echo(stream EchoRequest) returns (stream EchoReply) {} +} + +message EchoRequest { + string payload = 1; +} + +message EchoReply { + string payload = 1; +} \ No newline at end of file diff --git a/tonic-bidirectional/src/echo_rpc.rs b/tonic-bidirectional/src/echo_rpc.rs new file mode 100644 index 00000000..ce9177ea --- /dev/null +++ b/tonic-bidirectional/src/echo_rpc.rs @@ -0,0 +1,42 @@ +use actix::prelude::*; +use futures::channel::mpsc::Sender; +use log::info; + +use crate::EchoReceived; +use crate::grpc_api::{ + EchoRequest, + EchoReply, +}; + +pub struct EchoRpc { + addr: actix::prelude::Recipient, + tx: Sender, +} + +impl Actor for EchoRpc { + type Context = Context; +} + +impl Handler for EchoRpc { + type Result = (); + + fn handle(&mut self, msg: EchoRequest, ctx: &mut Context) { + if let Err(_) = self.tx.try_send(msg) { + info!("Sending echo request failed. Stopping."); + ctx.stop(); + } + } +} + +impl StreamHandler> for EchoRpc { + fn handle(&mut self, msg: Result, _: &mut Context) { + match msg { + Ok(msg) => { + self.addr.send(EchoReceived { payload: msg.payload }); + } + Err(status) => { + info!("Stream error: {}", status.message()); + } + } + } +} diff --git a/tonic-bidirectional/src/echo_service.rs b/tonic-bidirectional/src/echo_service.rs new file mode 100644 index 00000000..e3008fbb --- /dev/null +++ b/tonic-bidirectional/src/echo_service.rs @@ -0,0 +1,96 @@ +use actix::prelude::*; +use log::{info, error}; + +use crate::EchoReceived; +use crate::echo_rpc::EchoRpc; +use crate::grpc_api::echo_client::EchoClient; + +#[derive(Message)] +#[rtype(result = "Result, RunningEchoFailed>")] +pub struct RunEcho { + pub addr: actix::prelude::Recipient, +} + +#[derive(Debug)] +pub struct NotConnectedError; + +#[derive(Debug)] +pub struct RunningEchoFailed; + +pub struct EchoService { + endpoint: String, + client: Option>, +} + +impl EchoService { + pub fn new(endpoint: String) -> EchoService { + EchoService { + endpoint, + client: None, + } + } +} + +// EchoService should be responsible for connecting on startup and it'll have a RunEcho message + +impl Actor for EchoService { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + EchoClient::connect(self.endpoint.clone()) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(echo_client) => { + act.client = Some(echo_client); + } + Err(err) => { + error!("Unable to connect to echo server {:?}", err); + ctx.stop(); + } + } + fut::ready(()) + }) + .wait(ctx) + } +} + +impl Handler for EchoService { + type Result = ResponseActFuture, RunningEchoFailed>>; + + fn handle(&mut self, msg: RunEcho, _ctx: &mut Context) -> Self::Result { + if let Some(mut client) = &mut self.client { + const OUTBOUND_CHANNEL_BUFFER: usize = 10; + let (tx, rx) = futures::channel::mpsc::channel(OUTBOUND_CHANNEL_BUFFER); + + info!("Sending echo RPC!"); + Box::pin( + client.echo(tonic::Request::new(rx)) + .into_actor(self) + .map(|res, _act, _ctx| { + match res { + Ok(inbound) => { + Ok(EchoRpc::create(|ctx| { + ctx.add_stream(inbound.into_inner()); + EchoRpc { + addr: msg.addr, + tx + } + })) + } + Err(_) => { + // XXX: This is not really useful + Err(RunningEchoFailed) + } + } + }) + ) + } else { + // XXX: do something smart about retrying. maybe ctx.stop()? + error!("Not connected to the echo server"); + Box::pin(fut::err(RunningEchoFailed)) + } + } +} + +impl Supervised for EchoService {} diff --git a/tonic-bidirectional/src/grpc_api.rs b/tonic-bidirectional/src/grpc_api.rs new file mode 100644 index 00000000..a84a7dd9 --- /dev/null +++ b/tonic-bidirectional/src/grpc_api.rs @@ -0,0 +1,13 @@ +use actix::prelude::{ + Message as ActixMessage, +}; + +tonic::include_proto!("echo"); + +impl ActixMessage for EchoRequest { + type Result = (); +} + +impl ActixMessage for EchoReply { + type Result = (); +} \ No newline at end of file diff --git a/tonic-bidirectional/src/main.rs b/tonic-bidirectional/src/main.rs new file mode 100644 index 00000000..9da5f164 --- /dev/null +++ b/tonic-bidirectional/src/main.rs @@ -0,0 +1,97 @@ +mod echo_service; +mod echo_rpc; +mod grpc_api; + +use actix::prelude::*; +use log::{error, info}; + +use crate::echo_service::{EchoService, RunEcho}; +use crate::echo_rpc::EchoRpc; + +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendEcho { + pub payload: String, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct EchoReceived { + pub payload: String, +} + +struct EchoSender { + service: Addr, + echo_rpc: Option>, +} + +impl EchoSender { + fn new(service: Addr) -> EchoSender { + EchoSender { + service, + echo_rpc: None, + } + } +} + +impl Actor for EchoSender { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + self.service.send(RunEcho { addr: ctx.address().recipient() }) + .into_actor(self) + .map(|res, act, ctx| { + match res { + Ok(Ok(echo_rpc)) => { + act.echo_rpc = Some(echo_rpc); + } + _ => { + error!("Unable to start echo RPC"); + ctx.stop(); + } + } + }) + .wait(ctx) + } +} + +impl Handler for EchoSender { + type Result = (); + + fn handle(&mut self, msg: SendEcho, ctx: &mut Context) { + info!("Sending echo: {}", msg.payload); + match &self.echo_rpc { + Some(echo_rpc) => { + echo_rpc.do_send(grpc_api::EchoRequest { payload: msg.payload }); + } + None => { + // Maybe we could do something smart like trying to (re)connect here. + error!("Not connected!"); + ctx.stop(); + } + } + } +} + +impl Handler for EchoSender { + type Result = (); + + fn handle(&mut self, msg: EchoReceived, _: &mut Context) { + info!("EchoSender has just received: {}", msg.payload) + } +} + +const ENDPOINT: &str = "http://127.0.0.1:50051"; + +#[actix_rt::main] +async fn main() { + env_logger::init(); + + let service = EchoService::new(ENDPOINT.to_string()).start(); + let sender = EchoSender::new(service).start(); + sender.do_send(SendEcho { payload: "Alpha".to_string() }); + sender.do_send(SendEcho { payload: "Beta".to_string() }); + sender.do_send(SendEcho { payload: "Gamma".to_string() }); + + actix_rt::Arbiter::local_join().await; +}