Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

cleanup and upgrade deps #20

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
545 changes: 292 additions & 253 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 7 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,27 @@ members = [
"hook-janitor",
]

# [profile.release]
# debug = 2 # https://www.polarsignals.com/docs/rust

[workspace.dependencies]
anyhow = "1.0"
assert-json-diff = "2.0.2"
async-trait = "0.1.74"
axum = { version = "0.7.1", features = ["http2"] }
axum-client-ip = "0.4.1"
base64 = "0.21.1"
axum = { version = "0.7.5", features = ["http2", "macros"] }
axum-client-ip = "0.6.0"
base64 = "0.22.0"
bytes = "1"
chrono = { version = "0.4" }
envconfig = "0.10.0"
eyre = "0.6.9"
flate2 = "1.0"
futures = { version = "0.3.29" }
governor = { version = "0.5.1", features = ["dashmap"] }
http = { version = "0.2" }
http = { version = "1.1.0" }
http-body-util = "0.1.0"
metrics = "0.22.0"
metrics-exporter-prometheus = "0.13.0"
metrics-exporter-prometheus = "0.14.0"
rand = "0.8.5"
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl", "tracing"] }
regex = "1.10.2"
reqwest = { version = "0.11" }
reqwest = { version = "0.12.3" }
serde = { version = "1.0", features = ["derive"] }
serde_derive = { version = "1.0" }
serde_json = { version = "1.0" }
Expand All @@ -57,8 +53,7 @@ time = { version = "0.3.20", features = [
] }
tokio = { version = "1.34.0", features = ["full"] }
tower = "0.4.13"
tower_governor = "0.0.4"
tower-http = { version = "0.4.0", features = ["cors", "trace"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
url = { version = "2.5.0 " }
Expand Down
14 changes: 6 additions & 8 deletions capture-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@ version = "0.1.0"
edition = "2021"

[dependencies]
axum = { workspace = true }
capture = { path = "../capture" }
envconfig = { workspace = true }
opentelemetry = { version = "0.21.0", features = ["trace"]}
opentelemetry-otlp = "0.14.0"
opentelemetry_sdk = { version = "0.21.0", features = ["trace", "rt-tokio"] }
time = { workspace = true }
opentelemetry = { version = "0.22.0", features = ["trace"]}
opentelemetry-otlp = "0.15.0"
opentelemetry_sdk = { version = "0.22.1", features = ["trace", "rt-tokio"] }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = "0.22.0"
tracing-opentelemetry = "0.23.0"
tracing-subscriber = { workspace = true, features = ["env-filter"] }

[dev-dependencies]
Expand All @@ -23,5 +21,5 @@ futures = "0.3.29"
once_cell = "1.18.0"
rand = { workspace = true }
rdkafka = { workspace = true }
reqwest = "0.11.22"
serde_json = { workspace = true }
reqwest = { workspace = true }
serde_json = { workspace = true }
5 changes: 3 additions & 2 deletions capture-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::net::TcpListener;
use std::time::Duration;

use envconfig::Envconfig;
Expand Down Expand Up @@ -76,6 +75,8 @@ async fn main() {
.init();

// Open the TCP port and start the server
let listener = TcpListener::bind(config.address).unwrap();
let listener = tokio::net::TcpListener::bind(config.address)
.await
.expect("could not bind port");
serve(config, listener, shutdown()).await
}
17 changes: 9 additions & 8 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code)]

use std::default::Default;
use std::net::{SocketAddr, TcpListener};
use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::str::FromStr;
use std::string::ToString;
Expand All @@ -17,6 +17,7 @@ use rdkafka::config::{ClientConfig, FromClientConfig};
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::util::Timeout;
use rdkafka::{Message, TopicPartitionList};
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::time::timeout;
use tracing::{debug, warn};
Expand Down Expand Up @@ -59,20 +60,20 @@ pub struct ServerHandle {
}

impl ServerHandle {
pub fn for_topic(topic: &EphemeralTopic) -> Self {
pub async fn for_topic(topic: &EphemeralTopic) -> Self {
let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
Self::for_config(config)
Self::for_config(config).await
}
pub fn for_config(config: Config) -> Self {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
pub async fn for_config(config: Config) -> Self {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let notify = Arc::new(Notify::new());
let shutdown = notify.clone();

tokio::spawn(
async move { serve(config, listener, async { notify.notified().await }).await },
);
tokio::spawn(async move {
serve(config, listener, async move { notify.notified().await }).await
});
Self { addr, shutdown }
}

Expand Down
10 changes: 5 additions & 5 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn it_captures_one_event() -> Result<()> {
let token = random_string("token", 16);
let distinct_id = random_string("id", 16);
let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerHandle::for_topic(&topic).await;

let event = json!({
"token": token,
Expand Down Expand Up @@ -44,7 +44,7 @@ async fn it_captures_a_batch() -> Result<()> {
let distinct_id2 = random_string("id", 16);

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerHandle::for_topic(&topic).await;

let event = json!([{
"token": token,
Expand Down Expand Up @@ -90,7 +90,7 @@ async fn it_overflows_events_on_burst() -> Result<()> {
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);
let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
Expand Down Expand Up @@ -139,7 +139,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {
config.overflow_burst_limit = NonZeroU32::new(1).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);
let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
Expand Down Expand Up @@ -176,7 +176,7 @@ async fn it_trims_distinct_id() -> Result<()> {
let (trimmed_distinct_id2, _) = distinct_id2.split_at(200); // works because ascii chars

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerHandle::for_topic(&topic).await;

let event = json!([{
"token": token,
Expand Down
9 changes: 2 additions & 7 deletions capture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
axum = { version = "0.6.15" } # TODO: Bring up to date with the workspace.
axum = { workspace = true }
axum-client-ip = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
governor = { workspace = true }
tower_governor = { workspace = true }
time = { workspace = true }
tower-http = { workspace = true }
bytes = { workspace = true }
Expand All @@ -35,10 +33,7 @@ redis = { version = "0.23.3", features = [
"cluster-async",
] }
envconfig = { workspace = true }
dashmap = "5.5.3"

[dev-dependencies]
assert-json-diff = { workspace = true }
axum-test-helper = "0.2.0"
mockall = "0.11.2"
redis-test = "0.2.3"
axum-test-helper = { git = "https://github.com/orphan-rs/axum-test-helper.git" } # TODO: remove, directly use reqwest like capture-server tests
3 changes: 2 additions & 1 deletion capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use bytes::Bytes;

use axum::Json;
use axum::{debug_handler, Json};
// TODO: stream this instead
use axum::extract::{Query, State};
use axum::http::{HeaderMap, Method};
Expand Down Expand Up @@ -38,6 +38,7 @@ use crate::{
compression
)
)]
#[debug_handler]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shows more informative compiler errors. Allowed me to pinpoint that the issue was InsecureClientIp needed an upgrade

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This macro has no effect when compiled with the release profile.

Nice

pub async fn event(
state: State<router::State>,
InsecureClientIp(ip): InsecureClientIp,
Expand Down
3 changes: 2 additions & 1 deletion capture/src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::time::Instant;

use axum::body::Body;
use axum::{extract::MatchedPath, http::Request, middleware::Next, response::IntoResponse};
use metrics::counter;
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
Expand Down Expand Up @@ -38,7 +39,7 @@ pub fn setup_metrics_recorder() -> PrometheusHandle {
/// Middleware to record some common HTTP metrics
/// Generic over B to allow for arbitrary body types (eg Vec<u8>, Streams, a deserialized thing, etc)
/// Someday tower-http might provide a metrics middleware: https://github.com/tower-rs/tower-http/issues/57
pub async fn track_metrics<B>(req: Request<B>, next: Next<B>) -> impl IntoResponse {
pub async fn track_metrics(req: Request<Body>, next: Next) -> impl IntoResponse {
let start = Instant::now();

let path = if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
Expand Down
18 changes: 10 additions & 8 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::future::Future;
use std::net::{SocketAddr, TcpListener};
use std::net::SocketAddr;
use std::sync::Arc;

use time::Duration;
use tokio::net::TcpListener;

use crate::config::Config;
use crate::health::{ComponentStatus, HealthRegistry};
Expand All @@ -15,7 +16,7 @@ use crate::sinks::print::PrintSink;

pub async fn serve<F>(config: Config, listener: TcpListener, shutdown: F)
where
F: Future<Output = ()>,
F: Future<Output = ()> + Send + 'static,
{
let liveness = HealthRegistry::new("liveness");

Expand Down Expand Up @@ -80,10 +81,11 @@ where
// run our app with hyper
// `axum::Server` is a re-export of `hyper::Server`
tracing::info!("listening on {:?}", listener.local_addr().unwrap());
axum::Server::from_tcp(listener)
.unwrap()
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.with_graceful_shutdown(shutdown)
.await
.unwrap()
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(shutdown)
.await
.unwrap()
}
1 change: 0 additions & 1 deletion hook-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ eyre = { workspace = true }
hook-common = { path = "../hook-common" }
http-body-util = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
Expand Down
2 changes: 0 additions & 2 deletions hook-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ http = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
reqwest = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
time = { workspace = true }
Expand Down
6 changes: 0 additions & 6 deletions hook-janitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,12 @@ envconfig = { workspace = true }
eyre = { workspace = true }
futures = { workspace = true }
hook-common = { path = "../hook-common" }
http-body-util = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
rdkafka = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
time = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
4 changes: 1 addition & 3 deletions hook-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ chrono = { workspace = true }
envconfig = { workspace = true }
futures = "0.3"
hook-common = { path = "../hook-common" }
http = { version = "0.2" }
http = { workspace = true }
metrics = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
sqlx = { workspace = true }
time = { workspace = true }
thiserror = { workspace = true }
Expand Down
Loading