-
Notifications
You must be signed in to change notification settings - Fork 0
feat(flags): Do token validation and extract distinct id #41
Changes from 5 commits
a6d7309
d0e9bc0
327074c
838dd2c
ad04232
756c62c
2ca642a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,20 +2,38 @@ use std::time::Duration; | |
|
||
use anyhow::Result; | ||
use async_trait::async_trait; | ||
use redis::AsyncCommands; | ||
use redis::{AsyncCommands, RedisError}; | ||
use thiserror::Error; | ||
use tokio::time::timeout; | ||
|
||
// average for all commands is <10ms, check grafana | ||
const REDIS_TIMEOUT_MILLISECS: u64 = 10; | ||
|
||
#[derive(Error, Debug)] | ||
pub enum CustomRedisError { | ||
#[error("Not found in redis")] | ||
NotFound, | ||
|
||
#[error("Pickle error: {0}")] | ||
PickleError(#[from] serde_pickle::Error), | ||
|
||
#[error("Redis error: {0}")] | ||
Other(#[from] RedisError), | ||
|
||
#[error("Timeout error")] | ||
Timeout(#[from] tokio::time::error::Elapsed), | ||
} | ||
/// A simple redis wrapper | ||
/// Copied from capture/src/redis.rs. | ||
/// TODO: Modify this to support hincrby, get, and set commands. | ||
/// TODO: Modify this to support hincrby | ||
|
||
#[async_trait] | ||
pub trait Client { | ||
// A very simplified wrapper, but works for our usage | ||
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>>; | ||
|
||
async fn get(&self, k: String) -> Result<String, CustomRedisError>; | ||
async fn set(&self, k: String, v: String) -> Result<()>; | ||
} | ||
|
||
pub struct RedisClient { | ||
|
@@ -40,38 +58,44 @@ impl Client for RedisClient { | |
|
||
Ok(fut?) | ||
} | ||
} | ||
|
||
// TODO: Find if there's a better way around this. | ||
#[derive(Clone)] | ||
pub struct MockRedisClient { | ||
zrangebyscore_ret: Vec<String>, | ||
} | ||
// TODO: Ask Xavier if there's a better way to handle this. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looking for suggestions here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe you could downcast from anyhow? something like impl From<anyhow::Error> for FlagError {
fn from(error: anyhow::Error) -> Self {
match error.downcast::<CustomRedisError>() {
Ok(CustomRedisError::NotFound) => FlagError::NotFound,
Err(_) => FlagError::InternalServerError,
}
}
} from what I could understand you would like to error when redis returns empty for a key and the other errors you bubble up to the api handler without the need to customise each one, and then let the "FlagError into response impl" assign the http status based on what you logic implies. what I did in the past was something like this https://github.com/thiagovarela/rust-minijinja-htmx/blob/main/server/src/error.rs#L16-L47, without thiserror, and manually implementing whatever specialised error I'd get. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ooh nice, thank you @thiagovarela , this is what I was missing! - implementing the From trait here, and downcasting (what I meant by reverse-coerce lol). I think I want to handle these errors a little differently, so I'll keep the CustomRedisError, but good to know this is how I'd do it implicitly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
As we probably will want to extract the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, thanks! Yeah, agree, I didn't want to just return FlagErrors from redis. I feel a lot more comfortable doing this now that the type system will protect me from unknown thrown errors. |
||
// The problem: I want to match on the error type from this function, and do appropriate things like 400 or 500 response. | ||
// Buuut, if I use anyhow::Error, I can't reverse-coerce into a NotFound or serde_pickle::Error. | ||
// Thus, I need to create a custom error enum of all possible errors + my own custom not found, so I can match on it. | ||
// Is this the canonical way? | ||
async fn get(&self, k: String) -> Result<String, CustomRedisError> { | ||
let mut conn = self.client.get_async_connection().await?; | ||
|
||
impl MockRedisClient { | ||
pub fn new() -> MockRedisClient { | ||
MockRedisClient { | ||
zrangebyscore_ret: Vec::new(), | ||
let results = conn.get(k); | ||
let fut: Result<Vec<u8>, RedisError> = | ||
timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; | ||
|
||
// return NotFound error when empty or not found | ||
if match &fut { | ||
Ok(v) => v.is_empty(), | ||
Err(_) => false, | ||
} { | ||
return Err(CustomRedisError::NotFound); | ||
} | ||
} | ||
|
||
pub fn zrangebyscore_ret(&mut self, ret: Vec<String>) -> Self { | ||
self.zrangebyscore_ret = ret; | ||
// TRICKY: We serialise data to json, then django pickles it. | ||
// Here we deserialize the bytes using serde_pickle, to get the json string. | ||
let string_response: String = serde_pickle::from_slice(&fut?, Default::default())?; | ||
|
||
self.clone() | ||
Ok(string_response) | ||
} | ||
} | ||
|
||
impl Default for MockRedisClient { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
async fn set(&self, k: String, v: String) -> Result<()> { | ||
// TRICKY: We serialise data to json, then django pickles it. | ||
// Here we serialize the json string to bytes using serde_pickle. | ||
let bytes = serde_pickle::to_vec(&v, Default::default())?; | ||
|
||
#[async_trait] | ||
impl Client for MockRedisClient { | ||
// A very simplified wrapper, but works for our usage | ||
async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result<Vec<String>> { | ||
Ok(self.zrangebyscore_ret.clone()) | ||
let mut conn = self.client.get_async_connection().await?; | ||
|
||
let results = conn.set(k, bytes); | ||
let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; | ||
|
||
Ok(fut?) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
use serde::{Deserialize, Serialize}; | ||
use std::sync::Arc; | ||
use tracing::instrument; | ||
|
||
use crate::{ | ||
api::FlagError, | ||
redis::{Client, CustomRedisError}, | ||
}; | ||
|
||
// TRICKY: This cache data is coming from django-redis. If it ever goes out of sync, we'll bork. | ||
// TODO: Add integration tests across repos to ensure this doesn't happen. | ||
pub const TEAM_TOKEN_CACHE_PREFIX: &str = "posthog:1:team_token:"; | ||
|
||
#[derive(Debug, Deserialize, Serialize)] | ||
pub struct Team { | ||
pub id: i64, | ||
pub name: String, | ||
pub api_token: String, | ||
} | ||
|
||
impl Team { | ||
/// Validates a token, and returns a team if it exists. | ||
|
||
#[instrument(skip_all)] | ||
pub async fn from_redis( | ||
client: Arc<dyn Client + Send + Sync>, | ||
token: String, | ||
) -> Result<Team, FlagError> { | ||
// TODO: Instead of failing here, i.e. if not in redis, fallback to pg | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: since that dataset is relatively small, an in-process LRU cache would be very useful. We do have it in ingestion's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oof tricky, how do you handle invalidations in this case? Team config options / api tokens would get updated out of sync via django. Or is the TTL here so low that it doesn't make a difference in practice 👀 . Either way, good idea with the low ttl, will look into this! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The plugin-server cache has a 2 minute TTL, so tokens are still accepted 2 minutes after rotation, which is good enough in my book. Negative lookups (token is not valid) are cached for 5 minutes too, as it's less probable. |
||
let serialized_team = client | ||
.get(format!("{TEAM_TOKEN_CACHE_PREFIX}{}", token)) | ||
.await | ||
.map_err(|e| match e { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit for another PR: this |
||
CustomRedisError::NotFound => FlagError::TokenValidationError, | ||
CustomRedisError::PickleError(_) => { | ||
tracing::error!("failed to fetch data: {}", e); | ||
FlagError::DataParsingError | ||
} | ||
_ => { | ||
tracing::error!("Unknown redis error: {}", e); | ||
FlagError::RedisUnavailable | ||
} | ||
})?; | ||
|
||
let team: Team = serde_json::from_str(&serialized_team).map_err(|e| { | ||
tracing::error!("failed to parse data to team: {}", e); | ||
FlagError::DataParsingError | ||
})?; | ||
|
||
Ok(team) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use rand::Rng; | ||
use redis::AsyncCommands; | ||
|
||
use super::*; | ||
use crate::{ | ||
team, | ||
test_utils::{insert_new_team_in_redis, random_string, setup_redis_client}, | ||
}; | ||
|
||
#[tokio::test] | ||
async fn test_fetch_team_from_redis() { | ||
let client = setup_redis_client(None); | ||
|
||
let team = insert_new_team_in_redis(client.clone()).await.unwrap(); | ||
|
||
let target_token = team.api_token; | ||
|
||
let team_from_redis = Team::from_redis(client.clone(), target_token.clone()) | ||
.await | ||
.unwrap(); | ||
assert_eq!(team_from_redis.api_token, target_token); | ||
assert_eq!(team_from_redis.id, team.id); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_fetch_invalid_team_from_redis() { | ||
let client = setup_redis_client(None); | ||
|
||
match Team::from_redis(client.clone(), "banana".to_string()).await { | ||
Err(FlagError::TokenValidationError) => (), | ||
_ => panic!("Expected TokenValidationError"), | ||
}; | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_cant_connect_to_redis_error_is_not_token_validation_error() { | ||
let client = setup_redis_client(Some("redis://localhost:1111/".to_string())); | ||
|
||
match Team::from_redis(client.clone(), "banana".to_string()).await { | ||
Err(FlagError::RedisUnavailable) => (), | ||
_ => panic!("Expected RedisUnavailable"), | ||
}; | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_corrupted_data_in_redis_is_handled() { | ||
// TODO: Extend this test with fallback to pg | ||
let id = rand::thread_rng().gen_range(0..10_000_000); | ||
let token = random_string("phc_", 12); | ||
let team = Team { | ||
id, | ||
name: "team".to_string(), | ||
api_token: token, | ||
}; | ||
let serialized_team = serde_json::to_string(&team).expect("Failed to serialise team"); | ||
|
||
// manually insert non-pickled data in redis | ||
let client = | ||
redis::Client::open("redis://localhost:6379/").expect("Failed to create redis client"); | ||
let mut conn = client | ||
.get_async_connection() | ||
.await | ||
.expect("Failed to get redis connection"); | ||
conn.set::<String, String, ()>( | ||
format!( | ||
"{}{}", | ||
team::TEAM_TOKEN_CACHE_PREFIX, | ||
team.api_token.clone() | ||
), | ||
serialized_team, | ||
) | ||
.await | ||
.expect("Failed to write data to redis"); | ||
|
||
// now get client connection for data | ||
let client = setup_redis_client(None); | ||
|
||
match Team::from_redis(client.clone(), team.api_token.clone()).await { | ||
Err(FlagError::DataParsingError) => (), | ||
Err(other) => panic!("Expected DataParsingError, got {:?}", other), | ||
Ok(_) => panic!("Expected DataParsingError"), | ||
}; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking for suggestions here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not worth adding too much complexity for the time being. Once we have enough helpers that are generic enough, they could be moved to a separate common crate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
works, sounds good!