From fb9d1a198c50da3951c462de1e08dca57d946906 Mon Sep 17 00:00:00 2001 From: paomian Date: Fri, 17 May 2024 12:31:52 +0800 Subject: [PATCH] chore: add compression encoding for request/response --- examples/ingest.rs | 9 +- examples/stream_ingest.rs | 6 +- src/client.rs | 167 ++++++++++++++++++++++++++++++++------ src/database.rs | 5 +- src/lib.rs | 2 +- src/load_balance.rs | 4 +- 6 files changed, 156 insertions(+), 37 deletions(-) diff --git a/examples/ingest.rs b/examples/ingest.rs index 4d2d89a..6803282 100644 --- a/examples/ingest.rs +++ b/examples/ingest.rs @@ -18,7 +18,7 @@ use greptimedb_ingester::api::v1::*; use greptimedb_ingester::helpers::schema::*; use greptimedb_ingester::helpers::values::*; use greptimedb_ingester::{ - ChannelConfig, ChannelManager, Client, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME, + ChannelConfig, ChannelManager, ClientBuilder, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME, }; #[tokio::main] @@ -31,14 +31,17 @@ async fn main() { .map(|s| s == "1") .unwrap_or(false); + let builder = ClientBuilder::default() + .peers(vec![&greptimedb_endpoint]) + .compression(greptimedb_ingester::Compression::Gzip); let grpc_client = if greptimedb_secure { let channel_config = ChannelConfig::default().client_tls_config(ClientTlsOption::default()); let channel_manager = ChannelManager::with_tls_config(channel_config) .expect("Failed to create channel manager"); - Client::with_manager_and_urls(channel_manager, vec![&greptimedb_endpoint]) + builder.channel_manager(channel_manager).build() } else { - Client::with_urls(vec![&greptimedb_endpoint]) + builder.build() }; let client = Database::new_with_dbname(greptimedb_dbname, grpc_client); diff --git a/examples/stream_ingest.rs b/examples/stream_ingest.rs index eb2fe8a..3cb7d8d 100644 --- a/examples/stream_ingest.rs +++ b/examples/stream_ingest.rs @@ -15,7 +15,7 @@ use derive_new::new; use greptimedb_ingester::api::v1::*; -use greptimedb_ingester::{Client, Database, DEFAULT_SCHEMA_NAME}; +use greptimedb_ingester::{ClientBuilder, Database, DEFAULT_SCHEMA_NAME}; #[tokio::main] async fn main() { @@ -25,7 +25,9 @@ async fn main() { let greptimedb_dbname = std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned()); - let grpc_client = Client::with_urls(vec![&greptimedb_endpoint]); + let grpc_client = ClientBuilder::default() + .peers(vec![&greptimedb_endpoint]) + .build(); let client = Database::new_with_dbname(greptimedb_dbname, grpc_client); diff --git a/src/client.rs b/src/client.rs index a556427..79c2865 100644 --- a/src/client.rs +++ b/src/client.rs @@ -20,6 +20,7 @@ use crate::api::v1::HealthCheckRequest; use crate::channel_manager::ChannelManager; use parking_lot::RwLock; use snafu::OptionExt; +use tonic::codec::CompressionEncoding; use tonic::transport::Channel; use crate::load_balance::{LoadBalance, Loadbalancer}; @@ -36,22 +37,74 @@ pub struct Client { inner: Arc, } +#[derive(Default)] +pub struct ClientBuilder { + channel_manager: ChannelManager, + load_balance: Loadbalancer, + compression: Compression, + peers: Vec, +} + +impl ClientBuilder { + pub fn channel_manager(mut self, channel_manager: ChannelManager) -> Self { + self.channel_manager = channel_manager; + self + } + + pub fn load_balance(mut self, load_balance: Loadbalancer) -> Self { + self.load_balance = load_balance; + self + } + + pub fn compression(mut self, compression: Compression) -> Self { + self.compression = compression; + self + } + + pub fn peers(mut self, peers: A) -> Self + where + U: AsRef, + A: AsRef<[U]>, + { + self.peers = normailze_urls(peers); + self + } + + pub fn build(self) -> Client { + let inner = InnerBuilder::default() + .channel_manager(self.channel_manager) + .load_balance(self.load_balance) + .compression(self.compression) + .peers(self.peers) + .build(); + Client { + inner: Arc::new(inner), + } + } +} + +#[derive(Debug, Clone)] +pub enum Compression { + Gzip, + Zstd, + Plain, +} + +impl Default for Compression { + fn default() -> Self { + Self::Gzip + } +} + #[derive(Debug, Default)] struct Inner { channel_manager: ChannelManager, peers: Arc>>, load_balance: Loadbalancer, + compression: Compression, } impl Inner { - fn with_manager(channel_manager: ChannelManager) -> Self { - Self { - channel_manager, - peers: Default::default(), - load_balance: Default::default(), - } - } - fn set_peers(&self, peers: Vec) { let mut guard = self.peers.write(); *guard = peers; @@ -63,51 +116,93 @@ impl Inner { } } +#[derive(Default)] +pub struct InnerBuilder { + channel_manager: ChannelManager, + load_balance: Loadbalancer, + compression: Compression, + peers: Arc>>, +} + +impl InnerBuilder { + pub(self) fn channel_manager(mut self, channel_manager: ChannelManager) -> Self { + self.channel_manager = channel_manager; + self + } + + pub(self) fn load_balance(mut self, load_balance: Loadbalancer) -> Self { + self.load_balance = load_balance; + self + } + + pub(self) fn compression(mut self, compression: Compression) -> Self { + self.compression = compression; + self + } + + pub(self) fn peers(mut self, peers: Vec) -> Self { + self.peers = Arc::new(RwLock::new(peers)); + self + } + + pub(self) fn build(self) -> Inner { + Inner { + channel_manager: self.channel_manager, + load_balance: self.load_balance, + compression: self.compression, + peers: self.peers, + } + } +} + impl Client { + #[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")] pub fn new() -> Self { Default::default() } + #[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")] pub fn with_manager(channel_manager: ChannelManager) -> Self { - let inner = Arc::new(Inner::with_manager(channel_manager)); - Self { inner } + let inner = InnerBuilder::default() + .channel_manager(channel_manager) + .build(); + Self { + inner: Arc::new(inner), + } } + #[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")] pub fn with_urls(urls: A) -> Self where U: AsRef, A: AsRef<[U]>, { - Self::with_manager_and_urls(ChannelManager::new(), urls) + ClientBuilder::default().peers(urls).build() } + #[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")] pub fn with_manager_and_urls(channel_manager: ChannelManager, urls: A) -> Self where U: AsRef, A: AsRef<[U]>, { - let inner = Inner::with_manager(channel_manager); - let urls: Vec = urls - .as_ref() - .iter() - .map(|peer| peer.as_ref().to_string()) - .collect(); - inner.set_peers(urls); + let inner = InnerBuilder::default() + .channel_manager(channel_manager) + .peers(normailze_urls(urls)) + .build(); + Self { inner: Arc::new(inner), } } + #[deprecated(since = "0.1.0", note = "should be removed in the future")] pub fn start(&self, urls: A) where U: AsRef, A: AsRef<[U]>, { - let urls: Vec = urls - .as_ref() - .iter() - .map(|peer| peer.as_ref().to_string()) - .collect(); + let urls: Vec = normailze_urls(urls); self.inner.set_peers(urls); } @@ -127,8 +222,19 @@ impl Client { pub(crate) fn make_database_client(&self) -> Result { let (_, channel) = self.find_channel()?; - let client = - GreptimeDatabaseClient::new(channel).max_decoding_message_size(MAX_MESSAGE_SIZE); + let mut client = GreptimeDatabaseClient::new(channel) + .max_decoding_message_size(MAX_MESSAGE_SIZE) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd); + match self.inner.compression { + Compression::Gzip => { + client = client.send_compressed(CompressionEncoding::Gzip); + } + Compression::Zstd => { + client = client.send_compressed(CompressionEncoding::Zstd); + } + Compression::Plain => {} + } Ok(DatabaseClient { inner: client }) } @@ -140,6 +246,17 @@ impl Client { } } +fn normailze_urls(urls: A) -> Vec +where + U: AsRef, + A: AsRef<[U]>, +{ + urls.as_ref() + .iter() + .map(|peer| peer.as_ref().to_string()) + .collect() +} + #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/src/database.rs b/src/database.rs index d035f0c..44bb13a 100644 --- a/src/database.rs +++ b/src/database.rs @@ -21,7 +21,6 @@ use crate::api::v1::{ use crate::stream_insert::StreamInserter; use snafu::OptionExt; -use tonic::codec::CompressionEncoding; use crate::error::IllegalDatabaseResponseSnafu; use crate::{Client, Result}; @@ -116,11 +115,9 @@ impl Database { } async fn handle(&self, request: Request) -> Result { - let client = self.client.make_database_client()?.inner; + let mut client = self.client.make_database_client()?.inner; let request = self.to_rpc_request(request); let response = client - .send_compressed(CompressionEncoding::Zstd) - .accept_compressed(CompressionEncoding::Zstd) .handle(request) .await? .into_inner() diff --git a/src/lib.rs b/src/lib.rs index 35b3722..dfa49e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,7 @@ pub mod load_balance; mod stream_insert; pub use self::channel_manager::{ChannelConfig, ChannelManager, ClientTlsOption}; -pub use self::client::Client; +pub use self::client::{Client, ClientBuilder, Compression}; pub use self::database::Database; pub use self::error::{Error, Result}; pub use self::stream_insert::StreamInserter; diff --git a/src/load_balance.rs b/src/load_balance.rs index d283788..2350aa9 100644 --- a/src/load_balance.rs +++ b/src/load_balance.rs @@ -21,7 +21,7 @@ pub trait LoadBalance { } #[enum_dispatch(LoadBalance)] -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Loadbalancer { Random, } @@ -32,7 +32,7 @@ impl Default for Loadbalancer { } } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub struct Random; impl LoadBalance for Random {