diff --git a/Dockerfile b/Dockerfile index ea2b809..1c0a4a0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,12 +26,30 @@ COPY ./api-interface-definitions ./api-interface-definitions RUN rm ./target/release/deps/server* RUN cargo build --release -FROM amd64/rust:1.75-slim +FROM nginx:alpine -# copy the build artifact from the build stage -WORKDIR /app -COPY ./.cargo ./.cargo -COPY ./Rocket.toml ./Rocket.toml +# Copy Nginx configuration +COPY nginx.conf /etc/nginx/nginx.conf + +# Copy the build artifact from the build stage COPY --from=builder /statsig_forward_proxy/target/release/server /usr/local/bin/statsig_forward_proxy + +# Copy other necessary files +COPY ./.cargo /app/.cargo +COPY ./Rocket.toml /app/Rocket.toml + +# Set working directory +WORKDIR /app + +# Set environment variable ENV ROCKET_ENV=prod -ENTRYPOINT [ "statsig_forward_proxy" ] + +# Expose port 8001 for Nginx and 8000 for the proxy +EXPOSE 8000 8001 + +# Create an entrypoint script +COPY entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +# Use ENTRYPOINT to run the script +ENTRYPOINT ["/entrypoint.sh"] diff --git a/README.md b/README.md index a4701be..f92c1a3 100644 --- a/README.md +++ b/README.md @@ -21,26 +21,51 @@ Usage: server [OPTIONS] Arguments: [possible values: grpc-and-http, grpc, http] - [possible values: disabled, local, redis] + [possible values: disabled, redis] Options: + --double-write-cache-for-legacy-key + --datadog-logging + --statsd-logging + --statsig-logging + --debug-logging - -m, --maximum-concurrent-sdk-keys [default: 1000] - -p, --polling-interval-in-s [default: 10] - -u, --update-batch-size [default: 64] - -r, --redis-leader-key-ttl [default: 70] - --redis-cache-ttl-in-s [default: 86400] + + -m, --maximum-concurrent-sdk-keys + [default: 1000] + -p, --polling-interval-in-s + [default: 10] + -u, --update-batch-size + [default: 64] + -r, --redis-leader-key-ttl + [default: 70] + --redis-cache-ttl-in-s + [default: 86400] + --log-event-process-queue-size + [default: 20000] --force-gcp-profiling-enabled - -g, --grpc-max-concurrent-streams [default: 500] - --clear-external-datastore-on-unauthorized + + -g, --grpc-max-concurrent-streams + [default: 500] + --clear-datastore-on-unauthorized + --x509-server-cert-path + --x509-server-key-path + --x509-client-cert-path - -h, --help Print help + + --enforce-tls + [default: true] [possible values: true, false] + --enforce-mtls + + -h, --help + Print help -V, --version + Print version ``` 1. MODE: This can be configured as grpc or http or grpc-and-http to allow for easy migrations. 2. CACHE: This is for the backup cache. Local uses in process memory to cache backup values, while redis, @@ -53,11 +78,12 @@ Additional logging parameters we support: ``` --debug-logging: This enables state tracking logging that is emitted for various events within the proxy. It will also emit useful data such as latency for some events. +--double-write-cache-for-legacy-key: Starting in version 2.x.x, we begin to use a new key schema in preparation for SDKs to manage all key creation in external caches. This allows you to gracefully migrate by double writing to the old key as well. --statsig-logging: Send events to Statsig to monitor performance and behaviour of proxy. --statsd-logging: This will emit the same metrics and events using statsd. --datadog-logging: Same as statsd logging, but uses distribution metrics instead of timing --force_gcp_profiling_enabled: Enable gcp cloud profiler by force. ---clear-external-datastore-on-unauthorized: When a 401/403 is received, clear external caches (such as redis). Noting that this is a potential reliability trade off. +--clear-datastore-on-unauthorized: When a 401/403 is received, clear external caches (such as redis), as well as, internal caches. Noting that this is a potential reliability trade off. ``` # Configuration diff --git a/Rocket.toml b/Rocket.toml index 2b7f0b3..63f2a1c 100644 --- a/Rocket.toml +++ b/Rocket.toml @@ -1,5 +1,6 @@ [global] -address = "0.0.0.0" +address = "127.0.0.1" +port = 8001 limits = { json = "5 MiB", string = "5 MiB" } log_level = "critical" workers = 128 diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..4936410 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,8 @@ +#!/bin/sh +set -e + +# Start Nginx in the background, redirecting output to /dev/null +nginx -g 'daemon off; error_log /dev/stderr error;' > /dev/null 2>&1 & + +# Execute the main application +exec statsig_forward_proxy "$@" diff --git a/nginx.conf b/nginx.conf new file mode 100644 index 0000000..f638c64 --- /dev/null +++ b/nginx.conf @@ -0,0 +1,47 @@ +worker_processes auto; +worker_cpu_affinity auto; + +events { + worker_connections 1024; + multi_accept on; + use epoll; +} + +http { + proxy_cache_path /dev/shm/nginx levels=1:2 keys_zone=download_cache:10m max_size=1024m inactive=1m use_temp_path=off; + + server { + listen 8000; + server_name localhost; + + # Disable caching for /v1/log_event + location /v1/log_event { + proxy_pass http://127.0.0.1:8001; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Disable caching for this location + proxy_cache off; + } + + # Cache everything else + location / { + proxy_pass http://127.0.0.1:8001; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Enable caching + proxy_cache download_cache; + proxy_cache_methods GET POST; + proxy_cache_key "$request_method$request_uri$http_statsig_api_key$http_accept_encoding$request_body"; + proxy_cache_use_stale updating error timeout http_500 http_502 http_503 http_504; + proxy_cache_background_update on; + proxy_cache_valid 200 5s; + proxy_cache_lock on; + } + } +} diff --git a/src/datastore/caching/mod.rs b/src/datastore/caching/mod.rs index fed0b18..a439ab5 100644 --- a/src/datastore/caching/mod.rs +++ b/src/datastore/caching/mod.rs @@ -1,3 +1,2 @@ pub mod disabled_cache; -pub mod in_memory_cache; pub mod redis_cache; diff --git a/src/datastore/caching/redis_cache.rs b/src/datastore/caching/redis_cache.rs index 039f249..6666f2b 100644 --- a/src/datastore/caching/redis_cache.rs +++ b/src/datastore/caching/redis_cache.rs @@ -33,8 +33,8 @@ pub struct RedisCache { uuid: String, leader_key_ttl: i64, check_lcut: bool, - clear_external_datastore_on_unauthorized: bool, redis_cache_ttl_in_s: i64, + double_write_cache_for_legacy_key: bool, } #[derive(Deserialize, Debug, Clone)] @@ -50,14 +50,146 @@ pub struct RedisEnvConfig { const REDIS_LEADER_KEY: &str = "statsig_forward_proxy::leader"; +use async_trait::async_trait; +#[async_trait] +impl HttpDataProviderObserverTrait for RedisCache { + fn force_notifier_to_wait_for_update(&self) -> bool { + false + } + + async fn update( + &self, + result: &DataProviderRequestResult, + request_context: &Arc, + lcut: u64, + data: &Arc, + ) { + self.update_impl( + self.get_redis_key(request_context).await, + result, + request_context, + lcut, + data, + ) + .await; + + if self.double_write_cache_for_legacy_key { + self.update_impl( + format!( + "{}::{}", + self.key_prefix, + self.hash_key(&request_context.sdk_key).await + ), + result, + request_context, + lcut, + data, + ) + .await; + } + } + + async fn get( + &self, + request_context: &Arc, + ) -> Option> { + let connection = self.connection.get().await; + let redis_key = self.get_redis_key(request_context).await; + match connection { + Ok(mut conn) => { + let res: Result, RedisError> = conn.hget(redis_key, "config").await; + match res { + Ok(data) => { + if data.is_empty() { + ProxyEventObserver::publish_event( + ProxyEvent::new_with_rc( + ProxyEventType::RedisCacheReadMiss, + request_context, + ) + .with_stat(EventStat { + operation_type: OperationType::IncrByValue, + value: 1, + }), + ); + None + } else { + ProxyEventObserver::publish_event( + ProxyEvent::new_with_rc( + ProxyEventType::RedisCacheReadSucceed, + request_context, + ) + .with_stat(EventStat { + operation_type: OperationType::IncrByValue, + value: 1, + }), + ); + + // Only decompress before writing for legacy use case + match request_context.use_gzip && self.double_write_cache_for_legacy_key + { + true => { + let mut compressed = Vec::new(); + let mut encoder = + GzEncoder::new(&mut compressed, Compression::best()); + if let Err(e) = encoder.write_all(&data) { + eprintln!("Failed to gzip data from redis: {:?}", e); + return None; + } + if let Err(e) = encoder.finish() { + eprintln!("Failed to gzip data from redis: {:?}", e); + return None; + } + Some(Arc::new(ResponsePayload { + encoding: Arc::new(Some("gzip".to_string())), + data: Arc::from(Bytes::from(compressed)), + })) + } + false => Some(Arc::new(ResponsePayload { + encoding: Arc::new(None), + data: Arc::from(Bytes::from(data)), + })), + } + } + } + Err(e) => { + ProxyEventObserver::publish_event( + ProxyEvent::new_with_rc( + ProxyEventType::RedisCacheReadFailed, + request_context, + ) + .with_stat(EventStat { + operation_type: OperationType::IncrByValue, + value: 1, + }), + ); + eprintln!("Failed to get key from redis: {:?}", e); + None + } + } + } + Err(e) => { + ProxyEventObserver::publish_event( + ProxyEvent::new_with_rc(ProxyEventType::RedisCacheReadFailed, request_context) + .with_stat(EventStat { + operation_type: OperationType::IncrByValue, + value: 1, + }), + ); + eprintln!("Failed to get connection to redis: {:?}", e); + None + } + } + } +} + impl RedisCache { pub async fn new( key_prefix: String, leader_key_ttl: i64, uuid: &str, check_lcut: bool, - clear_external_datastore_on_unauthorized: bool, redis_cache_ttl_in_s: i64, + double_write_cache_for_legacy_key: bool, ) -> Self { let config = envy::from_env::().expect("Malformed config"); let protocol = match config.redis_tls.is_some_and(|x| x) { @@ -91,11 +223,20 @@ impl RedisCache { uuid: uuid.to_string(), leader_key_ttl, check_lcut, - clear_external_datastore_on_unauthorized, redis_cache_ttl_in_s, + double_write_cache_for_legacy_key, } } + async fn get_redis_key(&self, request_context: &Arc) -> String { + format!( + "statsig|{}|{}|{}", + request_context.path, + request_context.use_gzip, + self.hash_key(&request_context.sdk_key).await + ) + } + async fn hash_key(&self, key: &str) -> String { if self.hash_cache.read().contains_key(key) { return self @@ -108,36 +249,21 @@ impl RedisCache { // Hash key so that we aren't loading a bunch of sdk keys // into memory - let hashed_key = format!("{}::{:x}", self.key_prefix, Sha256::digest(key)); + let hashed_key = format!("{:x}", Sha256::digest(key)); self.hash_cache .write() .insert(key.to_string(), hashed_key.clone()); hashed_key } -} - -use async_trait::async_trait; -#[async_trait] -impl HttpDataProviderObserverTrait for RedisCache { - fn force_notifier_to_wait_for_update(&self) -> bool { - false - } - async fn update( + async fn update_impl( &self, + redis_key: String, result: &DataProviderRequestResult, request_context: &Arc, lcut: u64, data: &Arc, ) { - // TODO: This will be a problem if we start using DCS v2 with the forward - // proxy because the redis data adapter currently has no way - // to differentiate between the DCS v1 and DCS v2. - // - // So for now, to keep functionality, continue using just - // the sdk key. - let redis_key = self.hash_key(&request_context.sdk_key).await; - if result == &DataProviderRequestResult::DataAvailable { let connection: Result< bb8::PooledConnection, @@ -185,8 +311,9 @@ impl HttpDataProviderObserverTrait for RedisCache { }; if should_update { + // Only decompress before writing for legacy use case let data_to_write = match request_context.use_gzip - && data.encoding.as_deref() == Some("gzip") + && self.double_write_cache_for_legacy_key { true => { let mut decoder = GzDecoder::new(Cursor::new(&**data.data)); @@ -278,9 +405,7 @@ impl HttpDataProviderObserverTrait for RedisCache { ); } } - } else if result == &DataProviderRequestResult::Unauthorized - && self.clear_external_datastore_on_unauthorized - { + } else if result == &DataProviderRequestResult::Unauthorized { let connection = self.connection.get().await; match connection { Ok(mut conn) => match conn.del(&redis_key).await { @@ -329,102 +454,4 @@ impl HttpDataProviderObserverTrait for RedisCache { } } } - - async fn get( - &self, - request_context: &Arc, - ) -> Option> { - let connection = self.connection.get().await; - let redis_key = self.hash_key(&request_context.sdk_key).await; - match connection { - Ok(mut conn) => { - let mut pipe = redis::pipe(); - let res = pipe - .hget(redis_key.clone(), "encoding") - .hget(redis_key, "config") - .query_async::, Vec)>(&mut *conn) - .await; - - match res { - Ok(payload) => { - if payload.1.is_empty() { - ProxyEventObserver::publish_event( - ProxyEvent::new_with_rc( - ProxyEventType::RedisCacheReadMiss, - request_context, - ) - .with_stat(EventStat { - operation_type: OperationType::IncrByValue, - value: 1, - }), - ); - None - } else { - ProxyEventObserver::publish_event( - ProxyEvent::new_with_rc( - ProxyEventType::RedisCacheReadSucceed, - request_context, - ) - .with_stat(EventStat { - operation_type: OperationType::IncrByValue, - value: 1, - }), - ); - - match request_context.use_gzip - && payload.0.is_some_and(|encoding| encoding == "plain_text") - { - true => { - let mut compressed = Vec::new(); - let mut encoder = - GzEncoder::new(&mut compressed, Compression::best()); - if let Err(e) = encoder.write_all(&payload.1) { - eprintln!("Failed to gzip data from redis: {:?}", e); - return None; - } - if let Err(e) = encoder.finish() { - eprintln!("Failed to gzip data from redis: {:?}", e); - return None; - } - Some(Arc::new(ResponsePayload { - encoding: Arc::new(Some("gzip".to_string())), - data: Arc::new(Bytes::from(compressed)), - })) - } - false => Some(Arc::new(ResponsePayload { - encoding: Arc::new(None), - data: Arc::new(Bytes::from(payload.1)), - })), - } - } - } - Err(e) => { - ProxyEventObserver::publish_event( - ProxyEvent::new_with_rc( - ProxyEventType::RedisCacheReadFailed, - request_context, - ) - .with_stat(EventStat { - operation_type: OperationType::IncrByValue, - value: 1, - }), - ); - eprintln!("Failed to get key from redis: {:?}", e); - None - } - } - } - Err(e) => { - ProxyEventObserver::publish_event( - ProxyEvent::new_with_rc(ProxyEventType::RedisCacheReadFailed, request_context) - .with_stat(EventStat { - operation_type: OperationType::IncrByValue, - value: 1, - }), - ); - eprintln!("Failed to get connection to redis: {:?}", e); - None - } - } - } } diff --git a/src/datastore/config_spec_store.rs b/src/datastore/config_spec_store.rs index be5285a..1bc6aef 100644 --- a/src/datastore/config_spec_store.rs +++ b/src/datastore/config_spec_store.rs @@ -129,7 +129,16 @@ impl ConfigSpecStore { if !self.sdk_key_store.has_key(request_context) { // Since it's a cache-miss, just fill with a full payload // and check if we should return no update manually - foreground_fetch(self.background_data_provider.clone(), request_context, 0).await; + foreground_fetch( + self.background_data_provider.clone(), + request_context, + 0, + // Since it's a cache-miss, it doesn't matter what we do + // if we receive a 4xx, so no point clearing any + // caches + false, + ) + .await; } // TODO: Since we use peek as an optimization diff --git a/src/datastore/data_providers/background_data_provider.rs b/src/datastore/data_providers/background_data_provider.rs index 2870b03..d396f31 100644 --- a/src/datastore/data_providers/background_data_provider.rs +++ b/src/datastore/data_providers/background_data_provider.rs @@ -5,8 +5,7 @@ use crate::GRACEFUL_SHUTDOWN_TOKEN; use super::http_data_provider::ResponsePayload; use super::request_builder::{CachedRequestBuilders, RequestBuilderTrait}; use super::{http_data_provider::HttpDataProvider, DataProviderRequestResult, DataProviderTrait}; -use std::collections::hash_map::Entry; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use bytes::Bytes; use tokio::runtime::Handle; @@ -14,31 +13,28 @@ use tokio::sync::RwLock; use tokio::time::timeout; use tokio::time::{Duration, Instant}; +use dashmap::DashMap; + pub struct BackgroundDataProvider { http_data_prover: Arc, polling_interval_in_s: u64, update_batch_size: u64, sdk_key_store: Arc, - foreground_fetch_lock: - RwLock, Arc>>>>, + foreground_fetch_lock: DashMap, Arc>>>, + clear_datastore_on_unauthorized: bool, } pub async fn foreground_fetch( bdp: Arc, request_context: &Arc, since_time: u64, + clear_datastore_on_unauthorized: bool, ) { - let lock_ref: Arc>> = { - let mut master_lock = bdp.foreground_fetch_lock.write().await; - match master_lock.entry(Arc::clone(request_context)) { - Entry::Occupied(entry) => Arc::clone(entry.get()), - Entry::Vacant(entry) => { - let new_lock_ref = Arc::new(RwLock::new(None)); - entry.insert(Arc::clone(&new_lock_ref)); - new_lock_ref - } - } - }; + let lock_ref: Arc>> = bdp + .foreground_fetch_lock + .entry(Arc::clone(request_context)) + .or_insert_with(|| Arc::new(RwLock::new(None))) + .clone(); let should_fetch = { let per_key_lock = lock_ref.read().await; @@ -57,6 +53,7 @@ pub async fn foreground_fetch( vec![(Arc::clone(request_context), since_time)], &bdp.http_data_prover, 1, + clear_datastore_on_unauthorized, ) .await; } @@ -79,13 +76,15 @@ impl BackgroundDataProvider { polling_interval_in_s: u64, update_batch_size: u64, sdk_key_store: Arc, + clear_datastore_on_unauthorized: bool, ) -> Self { BackgroundDataProvider { http_data_prover: data_provider, polling_interval_in_s, update_batch_size, - foreground_fetch_lock: RwLock::new(HashMap::new()), + foreground_fetch_lock: DashMap::new(), sdk_key_store, + clear_datastore_on_unauthorized, } } @@ -95,13 +94,15 @@ impl BackgroundDataProvider { let polling_interval_in_s = self.polling_interval_in_s; let sdk_key_store = Arc::clone(&self.sdk_key_store); let graceful_shutdown_token = GRACEFUL_SHUTDOWN_TOKEN.clone(); + let clear_datastore_on_unauthorized = self.clear_datastore_on_unauthorized; rocket::tokio::task::spawn_blocking(move || { Handle::current().block_on(async move { loop { BackgroundDataProvider::impl_foreground_fetch( - sdk_key_store.get_registered_store().await, + sdk_key_store.get_registered_store(), &shared_data_provider, batch_size, + clear_datastore_on_unauthorized, ) .await; @@ -122,6 +123,7 @@ impl BackgroundDataProvider { store_iter: Vec<(Arc, u64)>, data_provider: &Arc, update_batch_size: u64, + clear_datastore_on_unauthorized: bool, ) { match reqwest::Client::builder() .timeout(Duration::from_secs(30)) @@ -151,6 +153,7 @@ impl BackgroundDataProvider { &request_context, lcut, &client_clone, + clear_datastore_on_unauthorized, ), ) .await @@ -189,6 +192,7 @@ impl BackgroundDataProvider { request_context: &Arc, lcut: u64, http_client: &reqwest::Client, + clear_datastore_on_unauthorized: bool, ) { let dp_result = data_provider .get(http_client, &request_builder, request_context, lcut) @@ -226,17 +230,19 @@ impl BackgroundDataProvider { } } DataProviderRequestResult::Unauthorized => { - Self::notify_observers( - &request_builder, - &dp_result.result, - request_context, - lcut, - &(Arc::new(ResponsePayload { - encoding: Arc::new(None), - data: Arc::new(Bytes::new()), - })), - ) - .await; + if clear_datastore_on_unauthorized { + Self::notify_observers( + &request_builder, + &dp_result.result, + request_context, + lcut, + &(Arc::new(ResponsePayload { + encoding: Arc::new(None), + data: Arc::new(Bytes::new()), + })), + ) + .await; + } } _ => {} } diff --git a/src/datastore/data_providers/request_builder.rs b/src/datastore/data_providers/request_builder.rs index 0508313..ea1a35c 100644 --- a/src/datastore/data_providers/request_builder.rs +++ b/src/datastore/data_providers/request_builder.rs @@ -1,9 +1,8 @@ use async_trait::async_trait; use parking_lot::RwLock; -use sha2::{Digest, Sha256}; -use std::{collections::HashMap, sync::Arc, time::Duration}; -use tokio::time::Instant; +use std::{collections::HashMap, sync::Arc}; +use tokio::time::Duration; use crate::{ observers::{ @@ -151,98 +150,3 @@ impl RequestBuilderTrait for DcsRequestBuilder { true } } - -pub struct IdlistRequestBuilder { - pub http_observers: Arc, - pub backup_cache: Arc, - last_request_by_key: RwLock>, - last_response_hash: RwLock>, -} - -impl IdlistRequestBuilder { - pub fn new( - http_observers: Arc, - backup_cache: Arc, - ) -> IdlistRequestBuilder { - IdlistRequestBuilder { - http_observers, - backup_cache, - last_request_by_key: RwLock::new(HashMap::new()), - last_response_hash: RwLock::new(HashMap::new()), - } - } -} - -#[async_trait] -impl RequestBuilderTrait for IdlistRequestBuilder { - async fn make_request( - &self, - http_client: &reqwest::Client, - request_context: &Arc, - _lcut: u64, - ) -> Result { - match http_client - .post("https://api.statsig.com/v1/get_id_lists".to_string()) - .header("statsig-api-key", request_context.sdk_key.clone()) - .body("{}".to_string()) - .send() - .await - { - Ok(response) => { - let status_code = response.status().as_u16(); - // If unauthorized, remove key from last response hash such that - // we will reload data into memory if for some reason the key is - // re-authorized - if status_code == 401 || status_code == 403 { - self.last_response_hash - .write() - .remove(&request_context.sdk_key); - } - Ok(response) - } - Err(e) => Err(e), - } - } - - async fn is_an_update(&self, body: &str, sdk_key: &str) -> bool { - let hash = format!("{:x}", Sha256::digest(body)); - let mut wlock = self.last_response_hash.write(); - let mut is_an_update = true; - if let Some(old_hash) = wlock.get(sdk_key) { - is_an_update = hash != *old_hash; - } - - if is_an_update { - wlock.insert(sdk_key.to_string(), hash); - } - - is_an_update - } - - fn get_observers(&self) -> Arc { - Arc::clone(&self.http_observers) - } - - fn get_backup_cache(&self) -> Arc { - Arc::clone(&self.backup_cache) - } - - async fn should_make_request(&self, rc: &Arc) -> bool { - let mut wlock = self.last_request_by_key.write(); - let key = rc.to_string(); - match wlock.get_mut(&key) { - Some(last_request) => { - if last_request.elapsed() > Duration::from_secs(60) { - wlock.insert(key, Instant::now()); - return true; - } - - return false; - } - None => { - wlock.insert(key, Instant::now()); - return true; - } - } - } -} diff --git a/src/datastore/mod.rs b/src/datastore/mod.rs index f16cb6a..fd9bd36 100644 --- a/src/datastore/mod.rs +++ b/src/datastore/mod.rs @@ -1,6 +1,5 @@ pub mod caching; pub mod config_spec_store; pub mod data_providers; -pub mod get_id_list_store; pub mod log_event_store; pub mod sdk_key_store; diff --git a/src/datastore/sdk_key_store.rs b/src/datastore/sdk_key_store.rs index a698755..d4e2ee7 100644 --- a/src/datastore/sdk_key_store.rs +++ b/src/datastore/sdk_key_store.rs @@ -70,7 +70,7 @@ impl SdkKeyStore { self.keystore.read().contains_key(request_context) } - pub async fn get_registered_store(&self) -> Vec<(Arc, u64)> { + pub fn get_registered_store(&self) -> Vec<(Arc, u64)> { self.keystore .read() .iter() diff --git a/src/server.rs b/src/server.rs index 4bc8521..16593ee 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,11 +7,9 @@ use datastore::caching::disabled_cache; use datastore::config_spec_store::ConfigSpecStore; use datastore::data_providers::request_builder::CachedRequestBuilders; use datastore::data_providers::request_builder::DcsRequestBuilder; -use datastore::data_providers::request_builder::IdlistRequestBuilder; -use datastore::get_id_list_store::GetIdListStore; use datastore::log_event_store::LogEventStore; use datastore::{ - caching::{in_memory_cache, redis_cache}, + caching::redis_cache, data_providers::{background_data_provider, http_data_provider}, sdk_key_store, }; @@ -52,6 +50,8 @@ pub struct Cli { mode: TransportMode, #[arg(value_enum)] cache: CacheMode, + #[clap(long, action)] + double_write_cache_for_legacy_key: bool, // Deprecated: Same as statsd logging #[clap(long, action)] datadog_logging: bool, @@ -77,12 +77,14 @@ pub struct Cli { force_gcp_profiling_enabled: bool, #[clap(short, long, default_value = "500")] grpc_max_concurrent_streams: u32, - // If you set this flag, you do not need an external process - // to clean up the external datastore if your key becomes unauthorized. - // The downside is that if there are issues with auth upstream, you might create - // inavailability of the config. + // By default, we do not enable this configuration. This allows you to ensure + // if there are any issues with authorization on Statsig's end you still + // have a payload to server. + // + // This means if you delete a key, until the service is restarted, the entry + // from internal or external caches #[clap(long, action)] - clear_external_datastore_on_unauthorized: bool, + clear_datastore_on_unauthorized: bool, // Authorization and TLS Configuration: #[clap(long, default_value = None)] @@ -115,7 +117,6 @@ enum TransportMode { #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] enum CacheMode { Disabled, - Local, Redis, } @@ -127,8 +128,8 @@ async fn try_initialize_statsig_sdk_and_profiling(cli: &Cli, config: &Configurat // to decide whether or not to enable GCP Profiling api_for_download_config_specs: match cli.mode { TransportMode::Grpc => "https://api.statsigcdn.com/v1".to_string(), - TransportMode::Http => "http://0.0.0.0:8000/v1".to_string(), - TransportMode::GrpcAndHttp => "http://0.0.0.0:8000/v1".to_string(), + TransportMode::Http => "http://127.0.0.1:8001/v1".to_string(), + TransportMode::GrpcAndHttp => "http://127.0.0.1:8001/v1".to_string(), }, disable_user_agent_support: true, ..StatsigOptions::default() @@ -219,29 +220,6 @@ async fn create_config_spec_store( config_spec_store } -async fn create_id_list_store( - _overrides: &ConfigurationAndOverrides, - background_data_provider: Arc, - idlist_observer: Arc, - shared_cache: &Arc, - sdk_key_store: &Arc, -) -> Arc { - let idlist_request_builder = Arc::new(IdlistRequestBuilder::new( - Arc::clone(&idlist_observer), - Arc::clone(shared_cache), - )); - CachedRequestBuilders::add_request_builder("/v1/get_id_lists/", idlist_request_builder); - let id_list_store = Arc::new(datastore::get_id_list_store::GetIdListStore::new( - sdk_key_store.clone(), - background_data_provider.clone(), - )); - idlist_observer.add_observer(sdk_key_store.clone()).await; - idlist_observer.add_observer(id_list_store.clone()).await; - idlist_observer.add_observer(Arc::clone(shared_cache)).await; - - id_list_store -} - async fn create_log_event_store( http_client: reqwest::Client, config: &ConfigurationAndOverrides, @@ -286,43 +264,23 @@ async fn main() -> Result<(), Box> { cli.polling_interval_in_s, cli.update_batch_size, Arc::clone(&sdk_key_store), + cli.clear_datastore_on_unauthorized, )); let cache_uuid = Uuid::new_v4().to_string(); let config_spec_cache: Arc = match cli.cache { - CacheMode::Local => Arc::new(in_memory_cache::InMemoryCache::new( - cli.maximum_concurrent_sdk_keys, - )), CacheMode::Redis => Arc::new( redis_cache::RedisCache::new( "statsig".to_string(), cli.redis_leader_key_ttl, &cache_uuid, true, /* check lcut */ - cli.clear_external_datastore_on_unauthorized, - cli.redis_cache_ttl_in_s, - ) - .await, - ), - CacheMode::Disabled => Arc::new(disabled_cache::DisabledCache::default()), - }; - let id_list_cache: Arc = match cli.cache { - CacheMode::Local => Arc::new(in_memory_cache::InMemoryCache::new( - cli.maximum_concurrent_sdk_keys, - )), - CacheMode::Redis => Arc::new( - redis_cache::RedisCache::new( - "statsig_id_list".to_string(), - cli.redis_leader_key_ttl, - &cache_uuid, - false, /* check lcut */ - cli.clear_external_datastore_on_unauthorized, cli.redis_cache_ttl_in_s, + cli.double_write_cache_for_legacy_key, ) .await, ), CacheMode::Disabled => Arc::new(disabled_cache::DisabledCache::default()), }; - let config_spec_observer = Arc::new(HttpDataProviderObserver::new()); let config_spec_store = create_config_spec_store( &cli, @@ -333,15 +291,6 @@ async fn main() -> Result<(), Box> { &sdk_key_store, ) .await; - let idlist_observer = Arc::new(HttpDataProviderObserver::new()); - let id_list_store = create_id_list_store( - &overrides, - Arc::clone(&background_data_provider), - Arc::clone(&idlist_observer), - &id_list_cache, - &sdk_key_store, - ) - .await; background_data_provider.start_background_thread().await; let rc_cache = Arc::new(AuthorizedRequestContextCache::new()); // Default buffer size is 20000 messages @@ -366,7 +315,6 @@ async fn main() -> Result<(), Box> { servers::http_server::HttpServer::start_server( &cli, config_spec_store, - id_list_store, log_event_store, rc_cache, ) @@ -382,7 +330,6 @@ async fn main() -> Result<(), Box> { let http_server = servers::http_server::HttpServer::start_server( &cli, config_spec_store, - id_list_store, log_event_store, rc_cache, ); diff --git a/src/servers/http_server.rs b/src/servers/http_server.rs index fa01aa1..0b7d3e3 100644 --- a/src/servers/http_server.rs +++ b/src/servers/http_server.rs @@ -2,14 +2,12 @@ use std::collections::HashMap; use std::io::Cursor; use std::sync::Arc; -use crate::datastore::data_providers::http_data_provider::ResponsePayload; -use crate::datastore::get_id_list_store::GetIdListStore; - use crate::datastore::log_event_store::LogEventStore; use crate::datatypes::gzip_data::LoggedBodyJSON; use crate::datatypes::log_event::LogEventRequest; use crate::datatypes::log_event::LogEventResponse; +use crate::http_data_provider::ResponsePayload; use crate::observers::EventStat; use crate::observers::OperationType; use crate::observers::{ProxyEvent, ProxyEventType}; @@ -168,17 +166,6 @@ async fn post_download_config_specs( } } -#[post("/get_id_lists")] -async fn post_get_id_lists( - get_id_list_store: &State>, - authorized_rc: AuthorizedRequestContextWrapper, -) -> RequestPayloads { - match get_id_list_store.get_id_lists(&authorized_rc.inner()).await { - Some(data) => RequestPayloads::Plain(Arc::clone(&data.idlists.data)), - None => RequestPayloads::Unauthorized(), - } -} - #[post("/log_event", data = "")] async fn post_log_event( log_event_store: &State>, @@ -211,7 +198,6 @@ impl HttpServer { pub async fn start_server( cli: &Cli, config_spec_store: Arc, - id_list_store: Arc, log_event_store: Arc, rc_cache: Arc, ) -> Result<(), Box> { @@ -223,7 +209,6 @@ impl HttpServer { routes![ get_download_config_specs, post_download_config_specs, - post_get_id_lists, post_log_event, http_apis::healthchecks::startup, http_apis::healthchecks::ready, @@ -235,7 +220,6 @@ impl HttpServer { routes![get_download_config_specs, post_download_config_specs], ) .manage(config_spec_store) - .manage(id_list_store) .manage(log_event_store) .manage(rc_cache) .manage(sdk_key_cache)