Skip to content

Commit

Permalink
Merge pull request #12 from statsig-io/10-17-sync_to_1.1.8
Browse files Browse the repository at this point in the history
Sync Repo to 1.1.8
  • Loading branch information
ealui-statsig authored Oct 18, 2024
2 parents ccbd890 + 59b2ba0 commit 4fd4cbe
Show file tree
Hide file tree
Showing 46 changed files with 1,799 additions and 509 deletions.
389 changes: 165 additions & 224 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ rust-version = "1.75.0"

[dependencies]
clap = { version = "4.5.17", features = ["derive"] }
tonic = "0.12.2"
tonic = { version = "0.12.3", features = ["tls"] }
protobuf = "3.5.1"
prost = "0.13.2"
rocket = { version = "0.5.1", features = ["json"] }
tokio = { version = "1.40.0", features = ["full"] }
reqwest = { version = "0.12.7" }
reqwest = { version = "0.12.8" }
futures = "0.3.30"
envy = "0.4.2"
serde = { version = "1.0.210", features = ["derive"] }
Expand All @@ -40,6 +40,9 @@ dashmap = "6.1.0"
memchr = "2.7.4"
smallvec = "1.13.2"
fxhash = "0.2.1"
tokio-util = "0.7.12"
bytes = "1.7.2"
flate2 = "1.0.34"


[build-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ WORKDIR /app
COPY ./.cargo ./.cargo
COPY ./Rocket.toml ./Rocket.toml
COPY --from=builder /statsig_forward_proxy/target/release/server /usr/local/bin/statsig_forward_proxy

ENV ROCKET_ENV=prod
ENTRYPOINT [ "statsig_forward_proxy" ]
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,4 @@ curl -X GET 'http://0.0.0.0:8000/v1/download_config_specs/<INSERT_SDK_KEY>.json
Besides using the flag force_gcp_profiling_enabled, if you provide your statsig SDK key as an environment
variable (STATSIG_SERVER_SDK_KEY), you can also configure a feature gate called enable_gcp_profiler_for_sfp
to dynamically enable and disable gcp profiling.

6 changes: 5 additions & 1 deletion Rocket.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
[global]
address = "0.0.0.0"
limits = { json = "5 MiB", string = "5 MiB" }
limits = { json = "5 MiB", string = "5 MiB" }
log_level = "critical"
workers = 128
keep_alive=15
ident="statsig-forward-proxy"
26 changes: 20 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::fs;

use chrono::Local;
use statsig_forward_proxy::statsig_forward_proxy_client::StatsigForwardProxyClient;
use statsig_forward_proxy::ConfigSpecRequest;

use tonic::transport::{Certificate, Channel, ClientTlsConfig};

pub mod statsig_forward_proxy {
tonic::include_proto!("statsig_forward_proxy");
}
Expand All @@ -15,16 +19,26 @@ fn last_500_char(spec: &String) -> String {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = StatsigForwardProxyClient::connect("http://0.0.0.0:50051")
.await?
// 16mb -- default is 4mb
.max_decoding_message_size(16777216);
let pem = std::fs::read_to_string("./x509_test_certs/root/certs/root_ca.crt")?;
let cert = fs::read_to_string("./x509_test_certs/intermediate/certs/client.crt")?;
let key = fs::read_to_string("./x509_test_certs/intermediate/private/client.key")?;
let client_tls_id = tonic::transport::Identity::from_pem(cert.as_bytes(), key.as_bytes());
let ca = Certificate::from_pem(pem);
let tls = ClientTlsConfig::new()
.ca_certificate(ca)
.identity(client_tls_id);
let channel = Channel::from_static("http://0.0.0.0:50051")
.tls_config(tls)?
.connect()
.await?;

let mut client = StatsigForwardProxyClient::new(channel).max_decoding_message_size(20870203);

// Non-Streaming
for version in 0..3 {
let request = tonic::Request::new(ConfigSpecRequest {
since_time: Some(1234),
sdk_key: "1234".into(),
sdk_key: "secret-1234".into(),
version: Some(version),
});
let response: tonic::Response<statsig_forward_proxy::ConfigSpecResponse> =
Expand All @@ -42,7 +56,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Streaming
let request = tonic::Request::new(ConfigSpecRequest {
since_time: Some(1234),
sdk_key: "1234".into(),
sdk_key: "secret-1234".into(),
version: Some(2),
});
let response = client.stream_config_spec(request).await?;
Expand Down
10 changes: 7 additions & 3 deletions src/datastore/caching/disabled_cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
datastore::data_providers::DataProviderRequestResult, observers::HttpDataProviderObserverTrait,
datastore::data_providers::{http_data_provider::ResponsePayload, DataProviderRequestResult},
observers::HttpDataProviderObserverTrait,
servers::authorized_request_context::AuthorizedRequestContext,
};

Expand All @@ -20,12 +21,15 @@ impl HttpDataProviderObserverTrait for DisabledCache {
_result: &DataProviderRequestResult,
_request_context: &Arc<AuthorizedRequestContext>,
_lcut: u64,
_data: &Arc<str>,
_data: &Arc<ResponsePayload>,
) {
/* noop */
}

async fn get(&self, _request_context: &Arc<AuthorizedRequestContext>) -> Option<Arc<str>> {
async fn get(
&self,
_request_context: &Arc<AuthorizedRequestContext>,
) -> Option<Arc<ResponsePayload>> {
return None;
}
}
10 changes: 7 additions & 3 deletions src/datastore/caching/in_memory_cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
datastore::{
config_spec_store::ConfigSpecForCompany, data_providers::DataProviderRequestResult,
config_spec_store::ConfigSpecForCompany,
data_providers::{http_data_provider::ResponsePayload, DataProviderRequestResult},
},
observers::{
proxy_event_observer::ProxyEventObserver, EventStat, HttpDataProviderObserverTrait,
Expand Down Expand Up @@ -39,7 +40,7 @@ impl HttpDataProviderObserverTrait for InMemoryCache {
result: &DataProviderRequestResult,
request_context: &Arc<AuthorizedRequestContext>,
lcut: u64,
data: &Arc<str>,
data: &Arc<ResponsePayload>,
) {
let storage_key = request_context.to_string();
if result == &DataProviderRequestResult::DataAvailable {
Expand Down Expand Up @@ -95,7 +96,10 @@ impl HttpDataProviderObserverTrait for InMemoryCache {
}
}

async fn get(&self, request_context: &Arc<AuthorizedRequestContext>) -> Option<Arc<str>> {
async fn get(
&self,
request_context: &Arc<AuthorizedRequestContext>,
) -> Option<Arc<ResponsePayload>> {
ProxyEventObserver::publish_event(
ProxyEvent::new_with_rc(ProxyEventType::InMemoryCacheReadSucceed, request_context)
.with_stat(EventStat {
Expand Down
104 changes: 86 additions & 18 deletions src/datastore/caching/redis_cache.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
io::{Cursor, Read, Write},
sync::Arc,
};

use bb8_redis::{redis::AsyncCommands, RedisConnectionManager};
use parking_lot::RwLock;
use redis::aio::MultiplexedConnection;

use crate::{
datastore::data_providers::DataProviderRequestResult,
datastore::data_providers::{http_data_provider::ResponsePayload, DataProviderRequestResult},
observers::{
proxy_event_observer::ProxyEventObserver, HttpDataProviderObserverTrait, ProxyEvent,
ProxyEventType,
Expand All @@ -17,6 +21,8 @@ use crate::observers::EventStat;
use crate::observers::OperationType;

use bb8_redis::redis::RedisError;
use bytes::Bytes;
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use serde::Deserialize;
use sha2::{Digest, Sha256};

Expand Down Expand Up @@ -122,20 +128,21 @@ impl HttpDataProviderObserverTrait for RedisCache {
result: &DataProviderRequestResult,
request_context: &Arc<AuthorizedRequestContext>,
lcut: u64,
data: &Arc<str>,
data: &Arc<ResponsePayload>,
) {
// TODO: This will be a problem if we start using DCS v2 with the forward
// proxy because the redis data adapter currently has no way
// to differentiate between the DCS v1 and DCS v2.
//
// So for now, to keep functionality, continue using just
// the sdk key.
let redis_key = self.hash_key(&request_context.sdk_key).await;

if result == &DataProviderRequestResult::DataAvailable {
let connection: Result<
bb8::PooledConnection<RedisConnectionManager>,
bb8::RunError<RedisError>,
> = self.connection.get().await;
// TODO: This will be a problem if we start using DCS v2 with the forward
// proxy because the redis data adapter currently has no way
// to differentiate between the DCS v1 and DCS v2.
//
// So for now, to keep functionality, continue using just
// the sdk key.
let redis_key = self.hash_key(&request_context.sdk_key).await;
match connection {
Ok(mut conn) => {
let mut pipe = redis::pipe();
Expand Down Expand Up @@ -178,9 +185,38 @@ impl HttpDataProviderObserverTrait for RedisCache {
};

if should_update {
let data_to_write = match request_context.use_gzip
&& data.encoding.as_deref() == Some("gzip")
{
true => {
let mut decoder = GzDecoder::new(Cursor::new(&**data.data));
let mut decompressed = Vec::new();
match decoder.read_to_end(&mut decompressed) {
Ok(_) => decompressed,
Err(e) => {
ProxyEventObserver::publish_event(
ProxyEvent::new_with_rc(
ProxyEventType::RedisCacheWriteFailed,
request_context,
)
.with_stat(EventStat {
operation_type: OperationType::IncrByValue,
value: 1,
}),
);
eprintln!("Failed to decode gzipped data before writing to redis: {:?}", e);
return;
}
}
}
false => data.data.to_vec(),
};

// We currently only support writing data to redis as plain_text
match pipe
.hset(&redis_key, "encoding", "plain_text")
.hset(&redis_key, "lcut", lcut)
.hset(&redis_key, "config", data.to_string())
.hset(&redis_key, "config", data_to_write)
.expire(&redis_key, self.redis_cache_ttl_in_s)
.expire(REDIS_LEADER_KEY, self.leader_key_ttl)
.query_async::<MultiplexedConnection, ()>(&mut *conn)
Expand Down Expand Up @@ -246,7 +282,6 @@ impl HttpDataProviderObserverTrait for RedisCache {
&& self.clear_external_datastore_on_unauthorized
{
let connection = self.connection.get().await;
let redis_key = self.hash_key(&request_context.to_string()).await;
match connection {
Ok(mut conn) => match conn.del(&redis_key).await {
Ok(()) => {
Expand Down Expand Up @@ -295,16 +330,24 @@ impl HttpDataProviderObserverTrait for RedisCache {
}
}

async fn get(&self, request_context: &Arc<AuthorizedRequestContext>) -> Option<Arc<str>> {
async fn get(
&self,
request_context: &Arc<AuthorizedRequestContext>,
) -> Option<Arc<ResponsePayload>> {
let connection = self.connection.get().await;
let redis_key = self.hash_key(&request_context.sdk_key).await;
match connection {
Ok(mut conn) => {
let res: Result<Vec<String>, RedisError> = conn
.hget(self.hash_key(&request_context.to_string()).await, "config")
let mut pipe = redis::pipe();
let res = pipe
.hget(redis_key.clone(), "encoding")
.hget(redis_key, "config")
.query_async::<MultiplexedConnection, (Option<String>, Vec<u8>)>(&mut *conn)
.await;

match res {
Ok(data) => {
if data.is_empty() {
Ok(payload) => {
if payload.1.is_empty() {
ProxyEventObserver::publish_event(
ProxyEvent::new_with_rc(
ProxyEventType::RedisCacheReadMiss,
Expand All @@ -327,7 +370,32 @@ impl HttpDataProviderObserverTrait for RedisCache {
value: 1,
}),
);
Some(Arc::from(data.first().expect("Must have data").to_owned()))

match request_context.use_gzip
&& payload.0.is_some_and(|encoding| encoding == "plain_text")
{
true => {
let mut compressed = Vec::new();
let mut encoder =
GzEncoder::new(&mut compressed, Compression::best());
if let Err(e) = encoder.write_all(&payload.1) {
eprintln!("Failed to gzip data from redis: {:?}", e);
return None;
}
if let Err(e) = encoder.finish() {
eprintln!("Failed to gzip data from redis: {:?}", e);
return None;
}
Some(Arc::new(ResponsePayload {
encoding: Arc::new(Some("gzip".to_string())),
data: Arc::new(Bytes::from(compressed)),
}))
}
false => Some(Arc::new(ResponsePayload {
encoding: Arc::new(None),
data: Arc::new(Bytes::from(payload.1)),
})),
}
}
}
Err(e) => {
Expand Down
22 changes: 16 additions & 6 deletions src/datastore/config_spec_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::data_providers::background_data_provider::{foreground_fetch, BackgroundDataProvider};
use super::data_providers::http_data_provider::ResponsePayload;
use super::data_providers::DataProviderRequestResult;
use super::sdk_key_store::SdkKeyStore;

Expand All @@ -7,15 +8,18 @@ use crate::observers::{
EventStat, HttpDataProviderObserverTrait, OperationType, ProxyEvent, ProxyEventType,
};
use crate::servers::authorized_request_context::AuthorizedRequestContext;
use dashmap::DashMap;
use bytes::Bytes;

use chrono::Utc;

use std::sync::Arc;

use dashmap::DashMap;

#[derive(Clone, Debug)]
pub struct ConfigSpecForCompany {
pub lcut: u64,
pub config: Arc<str>,
pub config: Arc<ResponsePayload>,
}

pub struct ConfigSpecStore {
Expand All @@ -37,7 +41,7 @@ impl HttpDataProviderObserverTrait for ConfigSpecStore {
result: &DataProviderRequestResult,
request_context: &Arc<AuthorizedRequestContext>,
lcut: u64,
data: &Arc<str>,
data: &Arc<ResponsePayload>,
) {
let should_insert = result == &DataProviderRequestResult::Error
|| (result == &DataProviderRequestResult::DataAvailable
Expand Down Expand Up @@ -81,14 +85,17 @@ impl HttpDataProviderObserverTrait for ConfigSpecStore {
}),
);
})
.or_insert_with(|| new_data);
.or_insert(new_data);
}
} else if result == &DataProviderRequestResult::Unauthorized {
self.store.remove(request_context);
}
}

async fn get(&self, request_context: &Arc<AuthorizedRequestContext>) -> Option<Arc<str>> {
async fn get(
&self,
request_context: &Arc<AuthorizedRequestContext>,
) -> Option<Arc<ResponsePayload>> {
self.store
.get(request_context)
.map(|record| record.config.clone())
Expand All @@ -106,7 +113,10 @@ impl ConfigSpecStore {
background_data_provider,
no_update_config_spec: Arc::new(ConfigSpecForCompany {
lcut: 0,
config: Arc::from("{\"has_updates\":false}".to_string()),
config: Arc::new(ResponsePayload {
encoding: Arc::new(None),
data: Arc::new(Bytes::from("{\"has_updates\":false}".to_string())),
}),
}),
}
}
Expand Down
Loading

0 comments on commit 4fd4cbe

Please sign in to comment.