Skip to content

Commit

Permalink
fix: add concurrency limiter (#24413)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Aug 16, 2024
1 parent e03bc32 commit 24a1e22
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 9 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ time = { version = "0.3.36", features = [
] }
thiserror = { version = "1.0" }
tokio = { version = "1.34.0", features = ["full"] }
tower = "0.4.13"
tower = { version = "0.4.13", features = ["default", "limit"] }
tower-http = { version = "0.5.2", features = ["cors", "limit", "trace"] }
tracing = "0.1.40"
tracing-opentelemetry = "0.23.0"
Expand Down
3 changes: 3 additions & 0 deletions rust/hook-api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub struct Config {
#[envconfig(default = "5000000")]
pub max_body_size: usize,

#[envconfig(default = "100")]
pub concurrency_limit: usize,

#[envconfig(default = "false")]
pub hog_mode: bool,
}
Expand Down
8 changes: 7 additions & 1 deletion rust/hook-api/src/handlers/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::convert::Infallible;

use axum::{routing, Router};
use tower::limit::ConcurrencyLimitLayer;
use tower_http::limit::RequestBodyLimitLayer;

use hook_common::pgqueue::PgQueue;
Expand All @@ -10,6 +13,7 @@ pub fn add_routes(
pg_pool: PgQueue,
hog_mode: bool,
max_body_size: usize,
concurrency_limit: usize,
) -> Router {
let router = router
.route("/", routing::get(index))
Expand All @@ -21,13 +25,15 @@ pub fn add_routes(
"/hoghook",
routing::post(webhook::post_hoghook)
.with_state(pg_pool)
.layer::<_, Infallible>(ConcurrencyLimitLayer::new(concurrency_limit))
.layer(RequestBodyLimitLayer::new(max_body_size)),
)
} else {
router.route(
"/webhook",
routing::post(webhook::post_webhook)
.with_state(pg_pool)
.layer::<_, Infallible>(ConcurrencyLimitLayer::new(concurrency_limit))
.layer(RequestBodyLimitLayer::new(max_body_size)),
)
}
Expand All @@ -54,7 +60,7 @@ mod tests {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;

let app = add_routes(Router::new(), pg_queue, hog_mode, 1_000_000);
let app = add_routes(Router::new(), pg_queue, hog_mode, 1_000_000, 10);

let response = app
.oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())
Expand Down
57 changes: 50 additions & 7 deletions rust/hook-api/src/handlers/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,20 @@ mod tests {
use crate::handlers::app::add_routes;

const MAX_BODY_SIZE: usize = 1_000_000;
const CONCURRENCY_LIMIT: usize = 10;

#[sqlx::test(migrations = "../migrations")]
async fn webhook_success(db: PgPool) {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;

let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let app = add_routes(
Router::new(),
pg_queue,
hog_mode,
MAX_BODY_SIZE,
CONCURRENCY_LIMIT,
);

let mut headers = collections::HashMap::new();
headers.insert("Content-Type".to_owned(), "application/json".to_owned());
Expand Down Expand Up @@ -292,7 +299,13 @@ mod tests {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;

let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let app = add_routes(
Router::new(),
pg_queue,
hog_mode,
MAX_BODY_SIZE,
CONCURRENCY_LIMIT,
);

let response = app
.oneshot(
Expand Down Expand Up @@ -330,7 +343,13 @@ mod tests {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;

let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let app = add_routes(
Router::new(),
pg_queue,
hog_mode,
MAX_BODY_SIZE,
CONCURRENCY_LIMIT,
);

let response = app
.oneshot(
Expand All @@ -352,7 +371,13 @@ mod tests {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;

let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let app = add_routes(
Router::new(),
pg_queue,
hog_mode,
MAX_BODY_SIZE,
CONCURRENCY_LIMIT,
);

let response = app
.oneshot(
Expand All @@ -374,7 +399,13 @@ mod tests {
let pg_queue = PgQueue::new_from_pool("test_index", db).await;
let hog_mode = false;

let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let app = add_routes(
Router::new(),
pg_queue,
hog_mode,
MAX_BODY_SIZE,
CONCURRENCY_LIMIT,
);

let bytes: Vec<u8> = vec![b'a'; MAX_BODY_SIZE + 1];
let long_string = String::from_utf8_lossy(&bytes);
Expand Down Expand Up @@ -422,7 +453,13 @@ mod tests {
let pg_queue = PgQueue::new_from_pool("test_index", db.clone()).await;
let hog_mode = true;

let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let app = add_routes(
Router::new(),
pg_queue,
hog_mode,
MAX_BODY_SIZE,
CONCURRENCY_LIMIT,
);

let valid_payloads = vec![
(
Expand Down Expand Up @@ -507,7 +544,13 @@ mod tests {
let pg_queue = PgQueue::new_from_pool("test_index", db.clone()).await;
let hog_mode = true;

let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);
let app = add_routes(
Router::new(),
pg_queue,
hog_mode,
MAX_BODY_SIZE,
CONCURRENCY_LIMIT,
);

let invalid_payloads = vec![
r#"{}"#,
Expand Down
1 change: 1 addition & 0 deletions rust/hook-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async fn main() {
pg_queue,
config.hog_mode,
config.max_body_size,
config.concurrency_limit,
);
let app = setup_metrics_routes(app);

Expand Down

0 comments on commit 24a1e22

Please sign in to comment.