Skip to content

Commit

Permalink
feat: delta api implementation (#626)
Browse files Browse the repository at this point in the history
* feat: git status

* Update

* Update

* Update

* Finish test

* fix: undo deadlock hold on value() from dashmap

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Add flag

* Fix

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Christopher Kolstad <[email protected]>
  • Loading branch information
3 people authored Jan 10, 2025
1 parent ccf4114 commit cb95f1a
Show file tree
Hide file tree
Showing 17 changed files with 454 additions and 28 deletions.
3 changes: 3 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,6 @@ tracing-test = "0.2.5"

[build-dependencies]
shadow-rs = "0.37.0"

[features]
delta = []
2 changes: 1 addition & 1 deletion server/src/auth/token_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use dashmap::DashMap;
use tracing::trace;
use unleash_types::Upsert;

use crate::http::feature_refresher::FeatureRefresher;
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::http::unleash_client::UnleashClient;
use crate::persistence::EdgePersistence;
use crate::types::{
Expand Down
6 changes: 4 additions & 2 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use unleash_yggdrasil::EngineState;

use crate::cli::RedisMode;
use crate::feature_cache::FeatureCache;
use crate::http::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode};
use crate::http::refresher::feature_refresher::{FeatureRefreshConfig, FeatureRefresherMode};
use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation};
use crate::offline::offline_hotload::{load_bootstrap, load_offline_engine_cache};
use crate::persistence::file::FilePersister;
Expand All @@ -23,7 +23,7 @@ use crate::{
auth::token_validator::TokenValidator,
cli::{CliArgs, EdgeArgs, EdgeMode, OfflineArgs},
error::EdgeError,
http::{feature_refresher::FeatureRefresher, unleash_client::UnleashClient},
http::{refresher::feature_refresher::FeatureRefresher, unleash_client::UnleashClient},
types::{EdgeResult, EdgeToken, TokenType},
};

Expand Down Expand Up @@ -270,6 +270,7 @@ async fn build_edge(
Duration::seconds(args.features_refresh_interval_seconds as i64),
refresher_mode,
client_meta_information,
args.delta,
);
let feature_refresher = Arc::new(FeatureRefresher::new(
unleash_client,
Expand Down Expand Up @@ -385,6 +386,7 @@ mod tests {
prometheus_password: None,
prometheus_username: None,
streaming: false,
delta: false,
};

let result = build_edge(
Expand Down
4 changes: 4 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ pub struct EdgeArgs {
#[clap(long, env, default_value_t = false, requires = "strict")]
pub streaming: bool,

/// If set to true. Edge connects to upstream using delta polling instead of normal polling. This is experimental feature and might and change.
#[clap(long, env, default_value_t = false, conflicts_with = "streaming")]
pub delta: bool,

/// Sets a remote write url for prometheus metrics, if this is set, prometheus metrics will be written upstream
#[clap(long, env)]
pub prometheus_remote_write_url: Option<String>,
Expand Down
3 changes: 2 additions & 1 deletion server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::filters::{
filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet,
};
use crate::http::broadcaster::Broadcaster;
use crate::http::feature_refresher::FeatureRefresher;
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::metrics::client_metrics::MetricsCache;
use crate::tokens::cache_key;
use crate::types::{
Expand Down Expand Up @@ -1012,6 +1012,7 @@ mod tests {
strict: false,
streaming: false,
client_meta_information: ClientMetaInformation::test_config(),
delta: false,
});
let token_validator = Arc::new(TokenValidator {
unleash_client: unleash_client.clone(),
Expand Down
19 changes: 18 additions & 1 deletion server/src/feature_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use unleash_types::{
client_features::{ClientFeature, ClientFeatures, Segment},
Deduplicate,
};

use unleash_types::client_features::ClientFeaturesDelta;
use crate::types::EdgeToken;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -67,6 +67,23 @@ impl FeatureCache {
self.send_full_update(key);
}

pub fn apply_delta(&self, key: String, delta: &ClientFeaturesDelta) {
let client_features = ClientFeatures {
version : 2,
features : delta.updated.clone(),
segments: delta.segments.clone(),
query: None,
meta: None,
};
self.features
.entry(key.clone())
.and_modify(|existing_features| {
existing_features.modify_in_place(delta);
})
.or_insert(client_features);
self.send_full_update(key);
}

pub fn is_empty(&self) -> bool {
self.features.is_empty()
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/background_send_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
metrics::client_metrics::{size_of_batch, MetricsCache},
};

use super::feature_refresher::FeatureRefresher;
use super::refresher::feature_refresher::FeatureRefresher;

lazy_static! {
pub static ref METRICS_UPSTREAM_HTTP_ERRORS: IntGaugeVec = register_int_gauge_vec!(
Expand Down
2 changes: 1 addition & 1 deletion server/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(not(tarpaulin_include))]
pub mod background_send_metrics;
pub mod broadcaster;
pub mod feature_refresher;
pub(crate) mod headers;
pub mod unleash_client;
pub mod refresher;
271 changes: 271 additions & 0 deletions server/src/http/refresher/delta_refresher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
use actix_web::http::header::EntityTag;
use reqwest::StatusCode;
use tracing::{debug, info, warn};
use unleash_types::client_features::{ClientFeaturesDelta};
use unleash_yggdrasil::EngineState;

use crate::error::{EdgeError, FeatureError};
use crate::types::{ClientFeaturesDeltaResponse, ClientFeaturesRequest, EdgeToken, TokenRefresh};
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::tokens::cache_key;

impl FeatureRefresher {
async fn handle_client_features_delta_updated(
&self,
refresh_token: &EdgeToken,
delta: ClientFeaturesDelta,
etag: Option<EntityTag>,
) {
debug!("Got updated client features delta. Updating features with {etag:?}");
let key = cache_key(refresh_token);
self.features_cache.apply_delta(key.clone(), &delta);
self.update_last_refresh(
refresh_token,
etag,
self.features_cache.get(&key).unwrap().features.len(),
);
self.engine_cache
.entry(key.clone())
.and_modify(|engine| {
engine.take_delta(&delta);
})
.or_insert_with(|| {
let mut new_state = EngineState::default();

let warnings = new_state.take_delta(&delta);
if let Some(warnings) = warnings {
warn!("The following toggle failed to compile and will be defaulted to off: {warnings:?}");
};
new_state
});
}

pub async fn refresh_single_delta(&self, refresh: TokenRefresh) {
let features_result = self
.unleash_client
.get_client_features_delta(ClientFeaturesRequest {
api_key: refresh.token.token.clone(),
etag: refresh.etag,
})
.await;
match features_result {
Ok(delta_response) => match delta_response {
ClientFeaturesDeltaResponse::NoUpdate(tag) => {
debug!("No update needed. Will update last check time with {tag}");
self.update_last_check(&refresh.token.clone());
}
ClientFeaturesDeltaResponse::Updated(features, etag) => {
self.handle_client_features_delta_updated(&refresh.token, features, etag)
.await
}
},
Err(e) => {
match e {
EdgeError::ClientFeaturesFetchError(fe) => {
match fe {
FeatureError::Retriable(status_code) => match status_code {
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
info!("Upstream is having some problems, increasing my waiting period");
self.backoff(&refresh.token);
}
StatusCode::TOO_MANY_REQUESTS => {
info!("Got told that upstream is receiving too many requests");
self.backoff(&refresh.token);
}
_ => {
info!("Couldn't refresh features, but will retry next go")
}
},
FeatureError::AccessDenied => {
info!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
self.tokens_to_refresh.remove(&refresh.token.token);
if !self.tokens_to_refresh.iter().any(|e| {
e.value().token.environment == refresh.token.environment
}) {
let cache_key = cache_key(&refresh.token);
// No tokens left that access the environment of our current refresh. Deleting client features and engine cache
self.features_cache.remove(&cache_key);
self.engine_cache.remove(&cache_key);
}
}
FeatureError::NotFound => {
info!("Had a bad URL when trying to fetch features. Increasing waiting period for the token before trying again");
self.backoff(&refresh.token);
}
}
}
EdgeError::ClientCacheError => {
info!("Couldn't refresh features, but will retry next go")
}
_ => info!("Couldn't refresh features: {e:?}. Will retry next pass"),
}
}
}
}
}


#[cfg(feature = "delta")]
#[cfg(test)]
mod tests {
use actix_http::header::IF_NONE_MATCH;
use actix_http::HttpService;
use actix_http_test::{test_server, TestServer};
use actix_service::map_config;
use actix_web::dev::AppConfig;
use actix_web::http::header::{ETag, EntityTag};
use actix_web::{web, App, HttpRequest, HttpResponse};
use chrono::Duration;
use dashmap::DashMap;
use std::sync::Arc;
use crate::feature_cache::FeatureCache;
use crate::http::refresher::feature_refresher::FeatureRefresher;
use crate::http::unleash_client::{ClientMetaInformation, UnleashClient};
use crate::types::EdgeToken;
use unleash_types::client_features::{
ClientFeature, ClientFeatures, ClientFeaturesDelta, Constraint, Operator, Segment,
};
use unleash_yggdrasil::EngineState;

#[actix_web::test]
#[tracing_test::traced_test]
async fn test_delta() {
let srv = test_features_server().await;
let unleash_client = Arc::new(UnleashClient::new(srv.url("/").as_str(), None).unwrap());
let features_cache: Arc<FeatureCache> = Arc::new(FeatureCache::default());
let engine_cache: Arc<DashMap<String, EngineState>> = Arc::new(DashMap::default());

let feature_refresher = Arc::new(FeatureRefresher {
unleash_client: unleash_client.clone(),
tokens_to_refresh: Arc::new(Default::default()),
features_cache: features_cache.clone(),
engine_cache: engine_cache.clone(),
refresh_interval: Duration::seconds(6000),
persistence: None,
strict: false,
streaming: false,
delta: true,
client_meta_information:ClientMetaInformation::test_config(),
});
let features = ClientFeatures {
version: 2,
features: vec![],
segments: None,
query: None,
meta: None,
};
let initial_features = features.modify_and_copy(&revision(1));
let final_features = initial_features.modify_and_copy(&revision(2));
let token =
EdgeToken::try_from("*:development.abcdefghijklmnopqrstuvwxyz".to_string()).unwrap();
feature_refresher
.register_token_for_refresh(token.clone(), None)
.await;
feature_refresher.refresh_features().await;
let refreshed_features = features_cache
.get(&cache_key(&token))
.unwrap()
.value()
.clone();
assert_eq!(refreshed_features, initial_features);

let token_refresh = feature_refresher
.tokens_to_refresh
.get(&token.token)
.unwrap()
.clone();
feature_refresher.refresh_single_delta(token_refresh).await;
let refreshed_features = features_cache
.get(&cache_key(&token))
.unwrap()
.value()
.clone();
assert_eq!(refreshed_features, final_features);
}

fn cache_key(token: &EdgeToken) -> String {
token
.environment
.clone()
.unwrap_or_else(|| token.token.clone())
}

fn revision(revision_id: u32) -> ClientFeaturesDelta {
match revision_id {
1 => ClientFeaturesDelta {
updated: vec![
ClientFeature {
name: "test1".into(),
feature_type: Some("release".into()),
..Default::default()
},
ClientFeature {
name: "test2".into(),
feature_type: Some("release".into()),
..Default::default()
},
],
removed: vec![],
segments: Some(vec![Segment {
id: 1,
constraints: vec![Constraint {
context_name: "userId".into(),
operator: Operator::In,
case_insensitive: false,
inverted: false,
values: Some(vec!["7".into()]),
value: None,
}],
}]),
revision_id: 1,
},
_ => ClientFeaturesDelta {
updated: vec![ClientFeature {
name: "test1".into(),
feature_type: Some("release".into()),
..Default::default()
}],
removed: vec!["test2".to_string()],
segments: None,
revision_id: 2,
},
}
}

async fn return_client_features_delta(etag_header: Option<String>) -> HttpResponse {
match etag_header {
Some(value) => match value.as_str() {
"\"1\"" => HttpResponse::Ok()
.insert_header(ETag(EntityTag::new_strong("2".to_string())))
.json(revision(2)),
"\"2\"" => HttpResponse::NotModified().finish(),
_ => HttpResponse::NotModified().finish(),
},
None => HttpResponse::Ok()
.insert_header(ETag(EntityTag::new_strong("1".to_string())))
.json(revision(1)),
}
}

async fn test_features_server() -> TestServer {
test_server(move || {
HttpService::new(map_config(
App::new().service(web::resource("/api/client/delta").route(web::get().to(
|req: HttpRequest| {
let etag_header = req
.headers()
.get(IF_NONE_MATCH)
.and_then(|h| h.to_str().ok());
return_client_features_delta(etag_header.map(|s| s.to_string()))
},
))),
|_| AppConfig::default(),
))
.tcp()
})
.await
}
}
Loading

0 comments on commit cb95f1a

Please sign in to comment.