Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

feat(flags): Do token validation and extract distinct id #41

Merged
merged 7 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions feature-flags/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ redis = { version = "0.23.3", features = [
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
serde-pickle = { version = "1.1.1"}

[lints]
workspace = true
Expand Down
9 changes: 9 additions & 0 deletions feature-flags/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ pub enum FlagError {

#[error("rate limited")]
RateLimited,

#[error("failed to parse redis cache data")]
DataParsingError,
#[error("redis unavailable")]
RedisUnavailable,
}

impl IntoResponse for FlagError {
Expand All @@ -52,6 +57,10 @@ impl IntoResponse for FlagError {
}

FlagError::RateLimited => (StatusCode::TOO_MANY_REQUESTS, self.to_string()),

FlagError::DataParsingError | FlagError::RedisUnavailable => {
(StatusCode::SERVICE_UNAVAILABLE, self.to_string())
}
}
.into_response()
}
Expand Down
2 changes: 1 addition & 1 deletion feature-flags/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use envconfig::Envconfig;

#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(default = "127.0.0.1:0")]
#[envconfig(default = "127.0.0.1:3001")]
pub address: SocketAddr,

#[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")]
Expand Down
9 changes: 9 additions & 0 deletions feature-flags/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,14 @@ pub mod config;
pub mod redis;
pub mod router;
pub mod server;
pub mod team;
pub mod v0_endpoint;
pub mod v0_request;

// Test modules don't need to be compiled with main binary
// #[cfg(test)]
// TODO: To use in integration tests, we need to compile with binary
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking for suggestions here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not worth adding too much complexity for the time being. Once we have enough helpers that are generic enough, they could be moved to a separate common crate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

works, sounds good!

// or make it a separate feature using cfg(feature = "integration-tests")
// and then use this feature only in tests.
// For now, ok to just include in binary
pub mod test_utils;
73 changes: 46 additions & 27 deletions feature-flags/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,38 @@ use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use redis::AsyncCommands;
use redis::{AsyncCommands, RedisError};
use thiserror::Error;
use tokio::time::timeout;

// average for all commands is <10ms, check grafana
const REDIS_TIMEOUT_MILLISECS: u64 = 10;

#[derive(Error, Debug)]
pub enum CustomRedisError {
#[error("Not found in redis")]
NotFound,

#[error("Pickle error: {0}")]
PickleError(#[from] serde_pickle::Error),

#[error("Redis error: {0}")]
Other(#[from] RedisError),

#[error("Timeout error")]
Timeout(#[from] tokio::time::error::Elapsed),
}
/// A simple redis wrapper
/// Copied from capture/src/redis.rs.
/// TODO: Modify this to support hincrby, get, and set commands.
/// TODO: Modify this to support hincrby

#[async_trait]
pub trait Client {
// A very simplified wrapper, but works for our usage
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>>;

async fn get(&self, k: String) -> Result<String, CustomRedisError>;
async fn set(&self, k: String, v: String) -> Result<()>;
}

pub struct RedisClient {
Expand All @@ -40,38 +58,39 @@ impl Client for RedisClient {

Ok(fut?)
}
}

// TODO: Find if there's a better way around this.
#[derive(Clone)]
pub struct MockRedisClient {
zrangebyscore_ret: Vec<String>,
}
async fn get(&self, k: String) -> Result<String, CustomRedisError> {
let mut conn = self.client.get_async_connection().await?;

impl MockRedisClient {
pub fn new() -> MockRedisClient {
MockRedisClient {
zrangebyscore_ret: Vec::new(),
let results = conn.get(k);
let fut: Result<Vec<u8>, RedisError> =
timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?;

// return NotFound error when empty or not found
if match &fut {
Ok(v) => v.is_empty(),
Err(_) => false,
} {
return Err(CustomRedisError::NotFound);
}
}

pub fn zrangebyscore_ret(&mut self, ret: Vec<String>) -> Self {
self.zrangebyscore_ret = ret;
// TRICKY: We serialise data to json, then django pickles it.
// Here we deserialize the bytes using serde_pickle, to get the json string.
let string_response: String = serde_pickle::from_slice(&fut?, Default::default())?;

self.clone()
Ok(string_response)
}
}

impl Default for MockRedisClient {
fn default() -> Self {
Self::new()
}
}
async fn set(&self, k: String, v: String) -> Result<()> {
// TRICKY: We serialise data to json, then django pickles it.
// Here we serialize the json string to bytes using serde_pickle.
let bytes = serde_pickle::to_vec(&v, Default::default())?;

#[async_trait]
impl Client for MockRedisClient {
// A very simplified wrapper, but works for our usage
async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result<Vec<String>> {
Ok(self.zrangebyscore_ret.clone())
let mut conn = self.client.get_async_connection().await?;

let results = conn.set(k, bytes);
let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?;

Ok(fut?)
}
}
139 changes: 139 additions & 0 deletions feature-flags/src/team.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::instrument;

use crate::{
api::FlagError,
redis::{Client, CustomRedisError},
};

// TRICKY: This cache data is coming from django-redis. If it ever goes out of sync, we'll bork.
// TODO: Add integration tests across repos to ensure this doesn't happen.
pub const TEAM_TOKEN_CACHE_PREFIX: &str = "posthog:1:team_token:";

#[derive(Debug, Deserialize, Serialize)]
pub struct Team {
pub id: i64,
pub name: String,
pub api_token: String,
}

impl Team {
/// Validates a token, and returns a team if it exists.

#[instrument(skip_all)]
pub async fn from_redis(
client: Arc<dyn Client + Send + Sync>,
token: String,
) -> Result<Team, FlagError> {
// TODO: Instead of failing here, i.e. if not in redis, fallback to pg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since that dataset is relatively small, an in-process LRU cache would be very useful. We do have it in ingestion's TeamManager for example. Can be added later for sure, but happy to help there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oof tricky, how do you handle invalidations in this case? Team config options / api tokens would get updated out of sync via django. Or is the TTL here so low that it doesn't make a difference in practice 👀 .

Either way, good idea with the low ttl, will look into this!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin-server cache has a 2 minute TTL, so tokens are still accepted 2 minutes after rotation, which is good enough in my book. Negative lookups (token is not valid) are cached for 5 minutes too, as it's less probable.

let serialized_team = client
.get(format!("{TEAM_TOKEN_CACHE_PREFIX}{}", token))
.await
.map_err(|e| match e {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit for another PR: this map_err could be removed if you implement From<CustomRedisError> for FlagError

CustomRedisError::NotFound => FlagError::TokenValidationError,
CustomRedisError::PickleError(_) => {
tracing::error!("failed to fetch data: {}", e);
FlagError::DataParsingError
}
_ => {
tracing::error!("Unknown redis error: {}", e);
FlagError::RedisUnavailable
}
})?;

let team: Team = serde_json::from_str(&serialized_team).map_err(|e| {
tracing::error!("failed to parse data to team: {}", e);
FlagError::DataParsingError
})?;

Ok(team)
}
}

#[cfg(test)]
mod tests {
use rand::Rng;
use redis::AsyncCommands;

use super::*;
use crate::{
team,
test_utils::{insert_new_team_in_redis, random_string, setup_redis_client},
};

#[tokio::test]
async fn test_fetch_team_from_redis() {
let client = setup_redis_client(None);

let team = insert_new_team_in_redis(client.clone()).await.unwrap();

let target_token = team.api_token;

let team_from_redis = Team::from_redis(client.clone(), target_token.clone())
.await
.unwrap();
assert_eq!(team_from_redis.api_token, target_token);
assert_eq!(team_from_redis.id, team.id);
}

#[tokio::test]
async fn test_fetch_invalid_team_from_redis() {
let client = setup_redis_client(None);

match Team::from_redis(client.clone(), "banana".to_string()).await {
Err(FlagError::TokenValidationError) => (),
_ => panic!("Expected TokenValidationError"),
};
}

#[tokio::test]
async fn test_cant_connect_to_redis_error_is_not_token_validation_error() {
let client = setup_redis_client(Some("redis://localhost:1111/".to_string()));

match Team::from_redis(client.clone(), "banana".to_string()).await {
Err(FlagError::RedisUnavailable) => (),
_ => panic!("Expected RedisUnavailable"),
};
}

#[tokio::test]
async fn test_corrupted_data_in_redis_is_handled() {
// TODO: Extend this test with fallback to pg
let id = rand::thread_rng().gen_range(0..10_000_000);
let token = random_string("phc_", 12);
let team = Team {
id,
name: "team".to_string(),
api_token: token,
};
let serialized_team = serde_json::to_string(&team).expect("Failed to serialise team");

// manually insert non-pickled data in redis
let client =
redis::Client::open("redis://localhost:6379/").expect("Failed to create redis client");
let mut conn = client
.get_async_connection()
.await
.expect("Failed to get redis connection");
conn.set::<String, String, ()>(
format!(
"{}{}",
team::TEAM_TOKEN_CACHE_PREFIX,
team.api_token.clone()
),
serialized_team,
)
.await
.expect("Failed to write data to redis");

// now get client connection for data
let client = setup_redis_client(None);

match Team::from_redis(client.clone(), team.api_token.clone()).await {
Err(FlagError::DataParsingError) => (),
Err(other) => panic!("Expected DataParsingError, got {:?}", other),
Ok(_) => panic!("Expected DataParsingError"),
};
}
}
Loading
Loading