Skip to content

Commit

Permalink
chore: add compression encoding for request/response
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed May 17, 2024
1 parent d71b1ae commit fb9d1a1
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 37 deletions.
9 changes: 6 additions & 3 deletions examples/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::helpers::schema::*;
use greptimedb_ingester::helpers::values::*;
use greptimedb_ingester::{
ChannelConfig, ChannelManager, Client, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME,
ChannelConfig, ChannelManager, ClientBuilder, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME,
};

#[tokio::main]
Expand All @@ -31,14 +31,17 @@ async fn main() {
.map(|s| s == "1")
.unwrap_or(false);

let builder = ClientBuilder::default()
.peers(vec![&greptimedb_endpoint])
.compression(greptimedb_ingester::Compression::Gzip);
let grpc_client = if greptimedb_secure {
let channel_config = ChannelConfig::default().client_tls_config(ClientTlsOption::default());

let channel_manager = ChannelManager::with_tls_config(channel_config)
.expect("Failed to create channel manager");
Client::with_manager_and_urls(channel_manager, vec![&greptimedb_endpoint])
builder.channel_manager(channel_manager).build()
} else {
Client::with_urls(vec![&greptimedb_endpoint])
builder.build()
};

let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);
Expand Down
6 changes: 4 additions & 2 deletions examples/stream_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use derive_new::new;

use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::{Client, Database, DEFAULT_SCHEMA_NAME};
use greptimedb_ingester::{ClientBuilder, Database, DEFAULT_SCHEMA_NAME};

#[tokio::main]
async fn main() {
Expand All @@ -25,7 +25,9 @@ async fn main() {
let greptimedb_dbname =
std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned());

let grpc_client = Client::with_urls(vec![&greptimedb_endpoint]);
let grpc_client = ClientBuilder::default()
.peers(vec![&greptimedb_endpoint])
.build();

let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);

Expand Down
167 changes: 142 additions & 25 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::api::v1::HealthCheckRequest;
use crate::channel_manager::ChannelManager;
use parking_lot::RwLock;
use snafu::OptionExt;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;

use crate::load_balance::{LoadBalance, Loadbalancer};
Expand All @@ -36,22 +37,74 @@ pub struct Client {
inner: Arc<Inner>,
}

#[derive(Default)]
pub struct ClientBuilder {
channel_manager: ChannelManager,
load_balance: Loadbalancer,
compression: Compression,
peers: Vec<String>,
}

impl ClientBuilder {
pub fn channel_manager(mut self, channel_manager: ChannelManager) -> Self {
self.channel_manager = channel_manager;
self
}

pub fn load_balance(mut self, load_balance: Loadbalancer) -> Self {
self.load_balance = load_balance;
self
}

pub fn compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}

pub fn peers<U, A>(mut self, peers: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
self.peers = normailze_urls(peers);
self
}

pub fn build(self) -> Client {
let inner = InnerBuilder::default()
.channel_manager(self.channel_manager)
.load_balance(self.load_balance)
.compression(self.compression)
.peers(self.peers)
.build();
Client {
inner: Arc::new(inner),
}
}
}

#[derive(Debug, Clone)]
pub enum Compression {
Gzip,
Zstd,
Plain,
}

impl Default for Compression {
fn default() -> Self {
Self::Gzip
}
}

#[derive(Debug, Default)]
struct Inner {
channel_manager: ChannelManager,
peers: Arc<RwLock<Vec<String>>>,
load_balance: Loadbalancer,
compression: Compression,
}

impl Inner {
fn with_manager(channel_manager: ChannelManager) -> Self {
Self {
channel_manager,
peers: Default::default(),
load_balance: Default::default(),
}
}

fn set_peers(&self, peers: Vec<String>) {
let mut guard = self.peers.write();
*guard = peers;
Expand All @@ -63,51 +116,93 @@ impl Inner {
}
}

#[derive(Default)]
pub struct InnerBuilder {
channel_manager: ChannelManager,
load_balance: Loadbalancer,
compression: Compression,
peers: Arc<RwLock<Vec<String>>>,
}

impl InnerBuilder {
pub(self) fn channel_manager(mut self, channel_manager: ChannelManager) -> Self {
self.channel_manager = channel_manager;
self
}

pub(self) fn load_balance(mut self, load_balance: Loadbalancer) -> Self {
self.load_balance = load_balance;
self
}

pub(self) fn compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}

pub(self) fn peers(mut self, peers: Vec<String>) -> Self {
self.peers = Arc::new(RwLock::new(peers));
self
}

pub(self) fn build(self) -> Inner {
Inner {
channel_manager: self.channel_manager,
load_balance: self.load_balance,
compression: self.compression,
peers: self.peers,
}
}
}

impl Client {
#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn new() -> Self {
Default::default()
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn with_manager(channel_manager: ChannelManager) -> Self {
let inner = Arc::new(Inner::with_manager(channel_manager));
Self { inner }
let inner = InnerBuilder::default()
.channel_manager(channel_manager)
.build();
Self {
inner: Arc::new(inner),
}
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn with_urls<U, A>(urls: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
Self::with_manager_and_urls(ChannelManager::new(), urls)
ClientBuilder::default().peers(urls).build()
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn with_manager_and_urls<U, A>(channel_manager: ChannelManager, urls: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let inner = Inner::with_manager(channel_manager);
let urls: Vec<String> = urls
.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect();
inner.set_peers(urls);
let inner = InnerBuilder::default()
.channel_manager(channel_manager)
.peers(normailze_urls(urls))
.build();

Self {
inner: Arc::new(inner),
}
}

#[deprecated(since = "0.1.0", note = "should be removed in the future")]
pub fn start<U, A>(&self, urls: A)
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let urls: Vec<String> = urls
.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect();
let urls: Vec<String> = normailze_urls(urls);

self.inner.set_peers(urls);
}
Expand All @@ -127,8 +222,19 @@ impl Client {

pub(crate) fn make_database_client(&self) -> Result<DatabaseClient> {
let (_, channel) = self.find_channel()?;
let client =
GreptimeDatabaseClient::new(channel).max_decoding_message_size(MAX_MESSAGE_SIZE);
let mut client = GreptimeDatabaseClient::new(channel)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd);
match self.inner.compression {
Compression::Gzip => {
client = client.send_compressed(CompressionEncoding::Gzip);
}
Compression::Zstd => {
client = client.send_compressed(CompressionEncoding::Zstd);
}
Compression::Plain => {}
}
Ok(DatabaseClient { inner: client })
}

Expand All @@ -140,6 +246,17 @@ impl Client {
}
}

fn normailze_urls<U, A>(urls: A) -> Vec<String>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
urls.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect()
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand Down
5 changes: 1 addition & 4 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::api::v1::{
use crate::stream_insert::StreamInserter;

use snafu::OptionExt;
use tonic::codec::CompressionEncoding;

use crate::error::IllegalDatabaseResponseSnafu;
use crate::{Client, Result};
Expand Down Expand Up @@ -116,11 +115,9 @@ impl Database {
}

async fn handle(&self, request: Request) -> Result<u32> {
let client = self.client.make_database_client()?.inner;
let mut client = self.client.make_database_client()?.inner;
let request = self.to_rpc_request(request);
let response = client
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd)
.handle(request)
.await?
.into_inner()
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod load_balance;
mod stream_insert;

pub use self::channel_manager::{ChannelConfig, ChannelManager, ClientTlsOption};
pub use self::client::Client;
pub use self::client::{Client, ClientBuilder, Compression};
pub use self::database::Database;
pub use self::error::{Error, Result};
pub use self::stream_insert::StreamInserter;
Expand Down
4 changes: 2 additions & 2 deletions src/load_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait LoadBalance {
}

#[enum_dispatch(LoadBalance)]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Loadbalancer {
Random,
}
Expand All @@ -32,7 +32,7 @@ impl Default for Loadbalancer {
}
}

#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub struct Random;

impl LoadBalance for Random {
Expand Down

0 comments on commit fb9d1a1

Please sign in to comment.