Skip to content

Commit

Permalink
Merge pull request #11 from statsig-io/09-17-performance_optimizations
Browse files Browse the repository at this point in the history
Performance optimizations
  • Loading branch information
ealui-statsig authored Sep 17, 2024
2 parents 212fb9b + 348e2b5 commit f821e79
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 150 deletions.
26 changes: 24 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ serde_with = "3.9.0"
libflate = "2.1.0"
serde_json = "1.0.120"
once_cell = "1.19.0"
parking_lot = "0.12.3"
dashmap = "6.1.0"


[build-dependencies]
Expand Down
6 changes: 2 additions & 4 deletions src/datastore/caching/redis_cache.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{collections::HashMap, sync::Arc};

use bb8_redis::{redis::AsyncCommands, RedisConnectionManager};
use parking_lot::RwLock;
use redis::aio::MultiplexedConnection;
use tokio::sync::RwLock;

use crate::{
datastore::data_providers::DataProviderRequestResult,
Expand Down Expand Up @@ -91,11 +91,10 @@ impl RedisCache {
}

async fn hash_key(&self, key: &str) -> String {
if self.hash_cache.read().await.contains_key(key) {
if self.hash_cache.read().contains_key(key) {
return self
.hash_cache
.read()
.await
.get(key)
.expect("Must have key")
.to_string();
Expand All @@ -106,7 +105,6 @@ impl RedisCache {
let hashed_key = format!("{}::{:x}", self.key_prefix, Sha256::digest(key));
self.hash_cache
.write()
.await
.insert(key.to_string(), hashed_key.clone());
hashed_key
}
Expand Down
100 changes: 50 additions & 50 deletions src/datastore/config_spec_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use crate::observers::{
EventStat, HttpDataProviderObserverTrait, OperationType, ProxyEvent, ProxyEventType,
};
use crate::servers::http_server::AuthorizedRequestContext;
use std::collections::HashMap;
use dashmap::DashMap;

use chrono::Utc;
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Clone, Debug)]
pub struct ConfigSpecForCompany {
Expand All @@ -20,7 +20,7 @@ pub struct ConfigSpecForCompany {
}

pub struct ConfigSpecStore {
store: Arc<RwLock<HashMap<AuthorizedRequestContext, Arc<RwLock<ConfigSpecForCompany>>>>>,
store: Arc<DashMap<AuthorizedRequestContext, Arc<RwLock<ConfigSpecForCompany>>>>,
sdk_key_store: Arc<SdkKeyStore>,
background_data_provider: Arc<BackgroundDataProvider>,
no_update_payload: Arc<String>,
Expand All @@ -40,37 +40,39 @@ impl HttpDataProviderObserverTrait for ConfigSpecStore {
lcut: u64,
data: &Arc<String>,
) {
let record = self.store.read().await.get(request_context).cloned();
if (result == &DataProviderRequestResult::Error
|| result == &DataProviderRequestResult::DataAvailable)
&& record.is_none()
{
self.store.write().await.insert(
request_context.clone(),
Arc::new(RwLock::new(ConfigSpecForCompany {
let should_insert = result == &DataProviderRequestResult::Error
|| (result == &DataProviderRequestResult::DataAvailable
&& !self.store.contains_key(request_context));

if should_insert {
let rc = request_context.clone();
let new_data = Arc::new(RwLock::new(ConfigSpecForCompany {
lcut,
config: data.clone(),
}));
self.store.insert(rc, new_data);
return;
}

if result == &DataProviderRequestResult::DataAvailable {
let stored_lcut = self
.store
.get(request_context)
.map(|record| record.read().lcut)
.unwrap_or(0);

if lcut > stored_lcut {
let new_data = ConfigSpecForCompany {
lcut,
config: data.clone(),
})),
);
} else if result == &DataProviderRequestResult::DataAvailable {
let stored_lcut = match record {
Some(record) => record.read().await.lcut,
None => 0,
};

// If LCUT is not newer, then there is nothing to save
if stored_lcut >= lcut {
return;
// Lcut is newer than stored lcut, so update everything
} else {
let hm_r_lock = self.store.read().await;
let mut w_lock = hm_r_lock
.get(request_context)
.expect("Record must exist")
.write()
.await;
w_lock.lcut = lcut;
w_lock.config = data.clone();
};
if let Some(entry) = self.store.get_mut(request_context) {
*entry.write() = new_data;
} else {
let rc = request_context.clone();
let wrapped_data = Arc::new(RwLock::new(new_data));
self.store.insert(rc, wrapped_data);
}

ProxyEventObserver::publish_event(
ProxyEvent::new_with_rc(
Expand All @@ -85,16 +87,15 @@ impl HttpDataProviderObserverTrait for ConfigSpecStore {
)
.await;
}
} else if result == &DataProviderRequestResult::Unauthorized && record.is_some() {
self.store.write().await.remove(request_context);
} else if result == &DataProviderRequestResult::Unauthorized {
self.store.remove(request_context);
}
}

async fn get(&self, request_context: &AuthorizedRequestContext) -> Option<Arc<String>> {
match self.store.read().await.get(request_context) {
Some(record) => Some(record.read().await.config.clone()),
None => None,
}
self.store
.get(request_context)
.map(|record| record.read().config.clone())
}
}

Expand All @@ -104,7 +105,7 @@ impl ConfigSpecStore {
background_data_provider: Arc<BackgroundDataProvider>,
) -> Self {
ConfigSpecStore {
store: Arc::new(RwLock::new(HashMap::new())),
store: Arc::new(DashMap::new()),
sdk_key_store,
background_data_provider,
no_update_payload: Arc::new("{\"has_updates\":false}".to_string()),
Expand Down Expand Up @@ -134,20 +135,19 @@ impl ConfigSpecStore {
//
// If the payload for sinceTime 0 is greater than since_time
// then return the full payload.
let read_lock = self.store.read().await;
let record = read_lock.get(request_context);
let record = self.store.get(request_context).map(|r| r.clone());

match record {
Some(record) => {
if record.read().await.lcut > since_time {
Some(Arc::clone(record))
// Move the read operation outside the lock
let lcut = record.read().lcut;
if lcut > since_time {
Some(record)
} else {
Some(Arc::new(
ConfigSpecForCompany {
lcut: since_time,
config: self.no_update_payload.clone(),
}
.into(),
))
Some(Arc::new(RwLock::new(ConfigSpecForCompany {
lcut: since_time,
config: Arc::clone(&self.no_update_payload),
})))
}
}
None => {
Expand Down
78 changes: 39 additions & 39 deletions src/datastore/data_providers/background_data_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use crate::servers::http_server::AuthorizedRequestContext;

use super::request_builder::CachedRequestBuilders;
use super::{http_data_provider::HttpDataProvider, DataProviderRequestResult, DataProviderTrait};
use std::collections::hash_map::Entry;
use std::{collections::HashMap, sync::Arc};


use tokio::sync::RwLock;
use tokio::time::{sleep, Duration, Instant};

Expand All @@ -23,40 +23,47 @@ pub async fn foreground_fetch(
since_time: u64,
sdk_key_store: Arc<SdkKeyStore>,
) {
let mut master_lock = bdp.foreground_fetch_lock.write().await;
let lock_ref: Arc<RwLock<Option<Instant>>> = match master_lock.contains_key(request_context) {
true => Arc::clone(
master_lock
.get(request_context)
.expect("validated existence"),
),
false => {
let new_lock_ref: Arc<RwLock<Option<Instant>>> = Arc::new(RwLock::new(None));
master_lock.insert(request_context.clone(), Arc::clone(&new_lock_ref));
new_lock_ref
let lock_ref: Arc<RwLock<Option<Instant>>> = {
let mut master_lock = bdp.foreground_fetch_lock.write().await;
match master_lock.entry(request_context.clone()) {
Entry::Occupied(entry) => Arc::clone(entry.get()),
Entry::Vacant(entry) => {
let new_lock_ref = Arc::new(RwLock::new(None));
entry.insert(Arc::clone(&new_lock_ref));
new_lock_ref
}
}
};
// Explicitly drop the master lock to allow other sdk keys to be processed
drop(master_lock);

// If already initialized, and we checked in the last minute
// then return
let mut per_key_lock = lock_ref.write().await;
if let Some(init_time) = *per_key_lock {
if !sdk_key_store.has_key(request_context).await
&& Instant::now().duration_since(init_time) < Duration::from_secs(60)
{
return;
let should_fetch = {
let per_key_lock = lock_ref.read().await;
match *per_key_lock {
Some(init_time) => {
sdk_key_store.has_key(request_context).await
|| Instant::now().duration_since(init_time) >= Duration::from_secs(60)
}
None => true,
}
}
};

// If we won the race, then initialize and set
// has initialized to true
let mut data = HashMap::new();
data.insert(request_context.clone(), since_time);
BackgroundDataProvider::impl_foreground_fetch(data.into_iter(), &bdp.http_data_prover, 1).await;
if should_fetch {
let mut per_key_lock = lock_ref.write().await;
// Double-check in case another thread updated while we were waiting for the write lock
if per_key_lock.is_none() || per_key_lock.unwrap().elapsed() >= Duration::from_secs(60) {
// Release the lock before the potentially long-running operation
*per_key_lock = Some(Instant::now());
drop(per_key_lock);

*per_key_lock = Some(Instant::now());
let mut data = HashMap::new();
data.insert(request_context.clone(), since_time);
BackgroundDataProvider::impl_foreground_fetch(
data.into_iter(),
&bdp.http_data_prover,
1,
)
.await;
}
}
}

impl BackgroundDataProvider {
Expand Down Expand Up @@ -98,11 +105,10 @@ impl BackgroundDataProvider {
data_provider: &Arc<HttpDataProvider>,
update_batch_size: u64,
) {
let mut join_handles = Vec::new();
let mut join_handles = Vec::with_capacity(update_batch_size as usize);

for (request_context, lcut) in store_iter {
let request_builder =
CachedRequestBuilders::get_request_builder(&request_context.path).await;
let request_builder = CachedRequestBuilders::get_request_builder(&request_context.path);
if !request_builder.should_make_request(&request_context).await {
continue;
}
Expand All @@ -117,11 +123,6 @@ impl BackgroundDataProvider {
.data
.expect("If data is available, data must exist");

// [For DCS] Occassionally, origin will return the same payload multiple times
// This could happen from cache inconsistency. If this happens,
// rather than notifying, just skip updating anyone.
//
// This allows all listeners to not worry about doing double work
if !request_context.use_lcut || lcut != data.1 {
request_builder
.get_observers()
Expand Down Expand Up @@ -154,8 +155,7 @@ impl BackgroundDataProvider {

join_handles.push(join_handle);
if join_handles.len() >= (update_batch_size as usize) {
futures::future::join_all(join_handles).await;
join_handles = Vec::new();
futures::future::join_all(join_handles.drain(..)).await;
}
}
futures::future::join_all(join_handles).await;
Expand Down
Loading

0 comments on commit f821e79

Please sign in to comment.