Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

feat(flags): Basic flags service #31

Merged
merged 11 commits into from
May 7, 2024
48 changes: 36 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ resolver = "2"
members = [
"capture",
"common/health",
"feature-flags",
"hook-api",
"hook-common",
"hook-janitor",
Expand Down Expand Up @@ -49,7 +50,7 @@ opentelemetry-otlp = "0.15.0"
opentelemetry_sdk = { version = "0.22.1", features = ["trace", "rt-tokio"] }
rand = "0.8.5"
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl", "tracing"] }
reqwest = { version = "0.12.3", features = ["stream"] }
reqwest = { version = "0.12.3", features = ["json", "stream"] }
serde = { version = "1.0", features = ["derive"] }
serde_derive = { version = "1.0" }
serde_json = { version = "1.0" }
Expand Down
35 changes: 35 additions & 0 deletions feature-flags/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "feature-flags"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
axum-client-ip = { workspace = true }
envconfig = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
bytes = { workspace = true }
rand = { workspace = true }
redis = { version = "0.23.3", features = [
"tokio-comp",
"cluster",
"cluster-async",
] }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }

[lints]
workspace = true

[dev-dependencies]
assert-json-diff = { workspace = true }
once_cell = "1.18.0"
reqwest = { workspace = true }

58 changes: 58 additions & 0 deletions feature-flags/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::collections::HashMap;

use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use thiserror::Error;

#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum FlagsResponseCode {
Ok = 1,
}

#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FlagsResponse {
pub error_while_computing_flags: bool,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider making this a string and renaming the field to just error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. to give client more info about the error?

This would usually only be a db error, so internal, not very useful to expose to clients.

For now I want the same shape as the existing endpoint, so will keep this as is

// TODO: better typing here, support bool responses
pub feature_flags: HashMap<String, String>,
}

#[derive(Error, Debug)]
pub enum FlagError {
#[error("failed to decode request: {0}")]
RequestDecodingError(String),
#[error("failed to parse request: {0}")]
RequestParsingError(#[from] serde_json::Error),

#[error("Empty distinct_id in request")]
EmptyDistinctId,
#[error("No distinct_id in request")]
MissingDistinctId,

#[error("No api_key in request")]
NoTokenError,
#[error("API key is not valid")]
TokenValidationError,

#[error("rate limited")]
RateLimited,
}

impl IntoResponse for FlagError {
fn into_response(self) -> Response {
match self {
FlagError::RequestDecodingError(_)
| FlagError::RequestParsingError(_)
| FlagError::EmptyDistinctId
| FlagError::MissingDistinctId => (StatusCode::BAD_REQUEST, self.to_string()),

FlagError::NoTokenError | FlagError::TokenValidationError => {
(StatusCode::UNAUTHORIZED, self.to_string())
}

FlagError::RateLimited => (StatusCode::TOO_MANY_REQUESTS, self.to_string()),
}
.into_response()
}
}
24 changes: 24 additions & 0 deletions feature-flags/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::net::SocketAddr;

use envconfig::Envconfig;

#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(default = "127.0.0.1:0")]
pub address: SocketAddr,

#[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")]
pub write_database_url: String,

#[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")]
pub read_database_url: String,

#[envconfig(default = "1024")]
pub max_concurrent_jobs: usize,

#[envconfig(default = "100")]
pub max_pg_connections: u32,

#[envconfig(default = "redis://localhost:6379/")]
pub redis_url: String,
}
7 changes: 7 additions & 0 deletions feature-flags/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub mod api;
pub mod config;
pub mod redis;
pub mod router;
pub mod server;
pub mod v0_endpoint;
pub mod v0_request;
39 changes: 39 additions & 0 deletions feature-flags/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use envconfig::Envconfig;
use tokio::signal;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};

use feature_flags::config::Config;
use feature_flags::server::serve;

async fn shutdown() {
let mut term = signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to register SIGTERM handler");

let mut interrupt = signal::unix::signal(signal::unix::SignalKind::interrupt())
.expect("failed to register SIGINT handler");

tokio::select! {
_ = term.recv() => {},
_ = interrupt.recv() => {},
};

tracing::info!("Shutting down gracefully...");
}

#[tokio::main]
async fn main() {
let config = Config::init_from_env().expect("Invalid configuration:");

// Basic logging for now:
// - stdout with a level configured by the RUST_LOG envvar (default=ERROR)
let log_layer = tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env());
tracing_subscriber::registry().with(log_layer).init();

// Open the TCP port and start the server
let listener = tokio::net::TcpListener::bind(config.address)
.await
.expect("could not bind port");
serve(config, listener, shutdown()).await
}
77 changes: 77 additions & 0 deletions feature-flags/src/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use redis::AsyncCommands;
use tokio::time::timeout;

// average for all commands is <10ms, check grafana
const REDIS_TIMEOUT_MILLISECS: u64 = 10;

/// A simple redis wrapper
/// Copied from capture/src/redis.rs.
/// TODO: Modify this to support hincrby, get, and set commands.

#[async_trait]
pub trait Client {
// A very simplified wrapper, but works for our usage
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>>;
}

pub struct RedisClient {
client: redis::Client,
}

impl RedisClient {
pub fn new(addr: String) -> Result<RedisClient> {
let client = redis::Client::open(addr)?;

Ok(RedisClient { client })
}
}

#[async_trait]
impl Client for RedisClient {
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>> {
let mut conn = self.client.get_async_connection().await?;

let results = conn.zrangebyscore(k, min, max);
let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?;

Ok(fut?)
}
}

// TODO: Find if there's a better way around this.
#[derive(Clone)]
pub struct MockRedisClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, all cargo tests in a module run concurrently, so you indeed want isolation. Mocking in rust is hard, so I recommend using the real data stores from the compose stack:

  • for Redis, my recommendation is to have the RedisClient have a configurable key prefix (passed as a Option<String>) that it adds on all commands. This way, we can generate random prefixes and clean them on Drop. Happy to pair on that.
  • for PG, you should be able to get enough isolation by generating new teams?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I don't even need to go that far for redis, since all redis keys I care about are scoped by team tokens anyway, so generating a new team ought to be sufficient for both redis & pg.

Will dive into this more once I actually connect - but I do like your idea of not using these mocks - just copied this for now to get a feel for how capture does it, but indeed not too happy with these mocks right now.

zrangebyscore_ret: Vec<String>,
}

impl MockRedisClient {
pub fn new() -> MockRedisClient {
MockRedisClient {
zrangebyscore_ret: Vec::new(),
}
}

pub fn zrangebyscore_ret(&mut self, ret: Vec<String>) -> Self {
self.zrangebyscore_ret = ret;

self.clone()
}
}

impl Default for MockRedisClient {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl Client for MockRedisClient {
// A very simplified wrapper, but works for our usage
async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result<Vec<String>> {
Ok(self.zrangebyscore_ret.clone())
}
}
Loading
Loading