Skip to content

Commit

Permalink
Changelog:
Browse files Browse the repository at this point in the history
- Add hostname metadata to grpc stream to improve debugging
- Fix problem which could cause primary to never change if a process is terminated before expiry is set
- Optimize get_id_lists such that we dont always write to redis
- Added support to specify maximum concurrent streams for server
- Implemented graceful shutdown for grpc stream
  • Loading branch information
ealui-statsig committed May 31, 2024
1 parent 725d617 commit 4ac22be
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 93 deletions.
31 changes: 23 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
since_time: Some(1234),
sdk_key: "1234".into(),
});
let mut stream = client.stream_config_spec(request).await?.into_inner();
while let Some(value) = stream.message().await? {
println!(
"STREAMING={:?}, CURRENT TIME={}",
value.last_updated,
Local::now()
);
}
let response = client.stream_config_spec(request).await?;
println!("Metadata={:?}", response.metadata());
let mut stream = response.into_inner();

loop {
match stream.message().await {
Ok(Some(value)) => {
println!(
"STREAMING={:?}, CURRENT TIME={}",
value.last_updated,
Local::now()
);
}
Ok(None) => {
println!("STREAMING DONE");
break;
}
Err(e) => {
println!("CURRENT TIME={}", Local::now());
println!("Error={:?}", e);
break;
}
}
}
Ok(())
}
25 changes: 20 additions & 5 deletions src/datastore/caching/redis_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,41 @@ impl HttpDataProviderObserverTrait for RedisCache {
path: &str,
) {
if result == &DataProviderRequestResult::DataAvailable {
let connection = self.connection.get().await;
let connection: Result<
bb8::PooledConnection<RedisConnectionManager>,
bb8::RunError<RedisError>,
> = self.connection.get().await;
let redis_key = self.hash_key(key).await;
match connection {
Ok(mut conn) => {
let mut pipe = redis::pipe();
pipe.atomic();
let should_update = match pipe
.ttl(REDIS_LEADER_KEY)
.set_nx(REDIS_LEADER_KEY, self.uuid.clone())
.get(REDIS_LEADER_KEY)
.hget(&redis_key, "lcut")
.query_async::<MultiplexedConnection, (i32, String, Option<String>)>(
.query_async::<MultiplexedConnection, (i32, i32, String, Option<String>)>(
&mut *conn,
)
.await
{
Ok(query_result) => {
let is_leader = query_result.1 == self.uuid;
if self.check_lcut && query_result.2.is_some() {
let is_leader = query_result.2 == self.uuid;

// Incase there was a crash without cleaning up the leader key
// validate on startup, and set expiry if needed. THis is best
// effort, so we don't check result
if query_result.0 == -1 && !is_leader {
pipe.expire::<&str>(REDIS_LEADER_KEY, self.leader_key_ttl)
.query_async::<MultiplexedConnection, i32>(&mut *conn)
.await
.ok();
}

if self.check_lcut && query_result.3.is_some() {
let should_update =
query_result.2.expect("exists").parse().unwrap_or(0) < lcut;
query_result.3.expect("exists").parse().unwrap_or(0) < lcut;
is_leader && should_update
} else {
is_leader
Expand Down
3 changes: 1 addition & 2 deletions src/datastore/data_providers/http_data_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ impl DataProviderTrait for HttpDataProvider {
Err(_err) => -2,
};
if err_msg.is_empty() {
// TODO: This should be more robust
if body == "{\"has_updates\":false}" {
if !request_builder.is_an_update(&body, key).await {
ProxyEventObserver::publish_event(
ProxyEvent::new(ProxyEventType::HttpDataProviderNoData, key.to_string())
.with_path(request_builder.get_path())
Expand Down
26 changes: 25 additions & 1 deletion src/datastore/data_providers/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use std::{sync::Arc, time::Duration};
use sha2::{Digest, Sha256};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{sync::RwLock, time::Instant};

use crate::{
Expand All @@ -18,6 +19,7 @@ pub trait RequestBuilderTrait: Send + Sync + 'static {
lcut: u64,
) -> Result<reqwest::Response, reqwest::Error>;
fn get_path(&self) -> String;
async fn is_an_update(&self, body: &str, sdk_key: &str) -> bool;
fn get_observers(&self) -> Arc<HttpDataProviderObserver>;
fn get_backup_cache(&self) -> Arc<dyn HttpDataProviderObserverTrait + Sync + Send>;
fn get_sdk_key_store(&self) -> Arc<SdkKeyStore>;
Expand Down Expand Up @@ -71,6 +73,11 @@ impl RequestBuilderTrait for DcsRequestBuilder {
"/v1/download_config_specs".to_string()
}

async fn is_an_update(&self, body: &str, _sdk_key: &str) -> bool {
// TODO: This should be more robust
!body.eq("{\"has_updates\":false}")
}

fn get_observers(&self) -> Arc<HttpDataProviderObserver> {
Arc::clone(&self.http_observers)
}
Expand All @@ -97,6 +104,7 @@ pub struct IdlistRequestBuilder {
pub backup_cache: Arc<dyn HttpDataProviderObserverTrait + Sync + Send>,
pub sdk_key_store: Arc<SdkKeyStore>,
last_request: RwLock<Instant>,
last_response_hash: RwLock<HashMap<String, String>>,
}

impl IdlistRequestBuilder {
Expand All @@ -110,6 +118,7 @@ impl IdlistRequestBuilder {
backup_cache,
sdk_key_store,
last_request: RwLock::new(Instant::now()),
last_response_hash: RwLock::new(HashMap::new()),
}
}
}
Expand All @@ -134,6 +143,21 @@ impl RequestBuilderTrait for IdlistRequestBuilder {
"/v1/get_id_lists".to_string()
}

async fn is_an_update(&self, body: &str, sdk_key: &str) -> bool {
let hash = format!("{:x}", Sha256::digest(body));
let mut wlock = self.last_response_hash.write().await;
let mut is_an_update = true;
if let Some(old_hash) = wlock.get(sdk_key) {
is_an_update = hash != *old_hash;
}

if is_an_update {
wlock.insert(sdk_key.to_string(), hash);
}

is_an_update
}

fn get_observers(&self) -> Arc<HttpDataProviderObserver> {
Arc::clone(&self.http_observers)
}
Expand Down
28 changes: 13 additions & 15 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datastore::{
data_providers::{background_data_provider, http_data_provider},
sdk_key_store,
};
use futures::join;
use loggers::datadog_logger;
use loggers::debug_logger;
use observers::http_data_provider_observer::HttpDataProviderObserver;
Expand Down Expand Up @@ -49,6 +50,8 @@ struct Cli {
redis_leader_key_ttl: i64,
#[clap(long, action)]
force_gcp_profiling_enabled: bool,
#[clap(short, long, default_value = "500")]
grpc_max_concurrent_streams: u32,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -258,32 +261,27 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

match cli.mode {
TransportMode::Grpc => {
servers::grpc_server::GrpcServer::start_server(config_spec_store, config_spec_observer)
.await?
servers::grpc_server::GrpcServer::start_server(
cli.grpc_max_concurrent_streams,
config_spec_store,
config_spec_observer,
)
.await?
}
TransportMode::Http => {
servers::http_server::HttpServer::start_server(config_spec_store, id_list_store).await?
}
TransportMode::GrpcAndHttp => {
let grpc_server = servers::grpc_server::GrpcServer::start_server(
cli.grpc_max_concurrent_streams,
config_spec_store.clone(),
config_spec_observer.clone(),
);
let http_server =
servers::http_server::HttpServer::start_server(config_spec_store, id_list_store);

tokio::select! {
res = grpc_server => {
if let Err(err) = res {
eprintln!("gRPC server failed: {}, terminating server...", err);
}
}
res = http_server => {
if let Err(err) = res {
eprintln!("HTTP server failed: {}, terminating server...", err);
}
}
}
join!(async { grpc_server.await.ok() }, async {
http_server.await.ok()
},);
}
}
Ok(())
Expand Down
Loading

0 comments on commit 4ac22be

Please sign in to comment.