diff --git a/Cargo.toml b/Cargo.toml index 2b9e87a..f316c34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ perf = ["pyro", "dhat"] criterion = { version = "0.5.1", features = ["html_reports"] } pretty_assertions = "1.4.0" tokio-test = "0.4.4" +tempfile = "3.10.1" [profile.release] codegen-units = 1 diff --git a/src/cache/file.rs b/src/cache/file.rs index 2e21092..3085cae 100644 --- a/src/cache/file.rs +++ b/src/cache/file.rs @@ -17,11 +17,13 @@ use super::{Error, Result}; use crate::util; use async_trait::async_trait; use std::path::Path; +use tinyufo::TinyUfo; use tokio::fs; use tracing::info; pub struct FileCache { directory: String, + cache: TinyUfo, } pub fn new_file_cache(dir: &str) -> Result { @@ -32,12 +34,18 @@ pub fn new_file_cache(dir: &str) -> Result { } info!(dir, "new file cache"); - Ok(FileCache { directory: dir }) + Ok(FileCache { + directory: dir, + cache: TinyUfo::new(100, 100), + }) } #[async_trait] impl HttpCacheStorage for FileCache { async fn get(&self, key: &str) -> Option { + if let Some(obj) = self.cache.get(&key.to_string()) { + return Some(obj); + } let file = Path::new(&self.directory).join(key); let Ok(buf) = fs::read(file).await else { return None; @@ -52,8 +60,9 @@ impl HttpCacheStorage for FileCache { &self, key: String, data: CacheObject, - _weight: u16, + weight: u16, ) -> Result<()> { + self.cache.put(key.clone(), data.clone(), weight); let buf: Vec = data.into(); let file = Path::new(&self.directory).join(key); fs::write(file, buf) @@ -62,6 +71,7 @@ impl HttpCacheStorage for FileCache { Ok(()) } async fn remove(&self, key: &str) -> Result> { + // TODO remove from tinyufo let file = Path::new(&self.directory).join(key); fs::remove_file(file) .await @@ -69,3 +79,38 @@ impl HttpCacheStorage for FileCache { Ok(None) } } + +#[cfg(test)] +mod tests { + use super::new_file_cache; + use crate::cache::http_cache::{CacheObject, HttpCacheStorage}; + use pretty_assertions::assert_eq; + use std::sync::Arc; + use tempfile::TempDir; + + #[tokio::test] + async fn test_file_cache() { + let dir = TempDir::new().unwrap(); + let dir = dir.into_path().to_string_lossy().to_string(); + let cache = new_file_cache(&dir).unwrap(); + let key = "key".to_string(); + let obj = CacheObject { + meta: (b"Hello".to_vec(), b"World".to_vec()), + body: Arc::new(b"Hello World!".to_vec()), + }; + let result = cache.get(&key).await; + assert_eq!(true, result.is_none()); + cache.put(key.clone(), obj.clone(), 1).await.unwrap(); + let result = cache.get(&key).await.unwrap(); + assert_eq!(obj, result); + + // empty tinyufo, get from file + let cache = new_file_cache(&dir).unwrap(); + let result = cache.get(&key).await.unwrap(); + assert_eq!(obj, result); + + cache.remove(&key).await.unwrap(); + let result = cache.get(&key).await; + assert_eq!(true, result.is_none()); + } +} diff --git a/src/cache/http_cache.rs b/src/cache/http_cache.rs index d3c0b74..74745ef 100644 --- a/src/cache/http_cache.rs +++ b/src/cache/http_cache.rs @@ -29,7 +29,7 @@ use std::sync::Arc; type BinaryMeta = (Vec, Vec); -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq)] pub struct CacheObject { pub meta: BinaryMeta, pub body: Arc>, @@ -198,6 +198,7 @@ impl HandleMiss for ObjectMissHandler { 1, ) .await?; + Ok(size) } } @@ -299,3 +300,57 @@ impl Storage for HttpCache { self } } + +#[cfg(test)] +mod tests { + use super::{CompleteHit, HttpCacheStorage, ObjectMissHandler}; + use crate::cache::tiny::new_tiny_ufo_cache; + use bytes::{Bytes, BytesMut}; + use pingora::cache::storage::{HitHandler, MissHandler}; + use pretty_assertions::assert_eq; + use std::sync::Arc; + + #[tokio::test] + async fn test_complete_hit() { + let body = Arc::new(b"Hello World!".to_vec()); + let size = body.len(); + let hit = CompleteHit { + body, + done: false, + range_start: 0, + range_end: size, + }; + let mut handle: HitHandler = Box::new(hit); + let body = handle.read_body().await.unwrap(); + assert_eq!(true, body.is_some()); + assert_eq!(b"Hello World!", body.unwrap().as_ref()); + + handle.seek(1, Some(size - 1)).unwrap(); + let body = handle.read_body().await.unwrap(); + assert_eq!(true, body.is_some()); + assert_eq!(b"ello World", body.unwrap().as_ref()); + } + + #[tokio::test] + async fn test_object_miss_handler() { + let key = "key"; + + let cache = Arc::new(new_tiny_ufo_cache(10, 10)); + let obj = ObjectMissHandler { + meta: (b"Hello".to_vec(), b"World".to_vec()), + body: BytesMut::new(), + key: key.to_string(), + cache: cache.clone(), + }; + let mut handle: MissHandler = Box::new(obj); + + handle + .write_body(Bytes::from_static(b"Hello World!"), true) + .await + .unwrap(); + handle.finish().await.unwrap(); + + let data = cache.get(key).await.unwrap(); + assert_eq!("Hello World!", std::str::from_utf8(&data.body).unwrap()); + } +} diff --git a/src/cache/tiny.rs b/src/cache/tiny.rs index 7d0f110..89be4b6 100644 --- a/src/cache/tiny.rs +++ b/src/cache/tiny.rs @@ -51,3 +51,25 @@ impl HttpCacheStorage for TinyUfoCache { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::new_tiny_ufo_cache; + use crate::cache::http_cache::{CacheObject, HttpCacheStorage}; + use pretty_assertions::assert_eq; + use std::sync::Arc; + #[tokio::test] + async fn test_tiny_ufo_cache() { + let cache = new_tiny_ufo_cache(10, 10); + let key = "key".to_string(); + let obj = CacheObject { + meta: (b"Hello".to_vec(), b"World".to_vec()), + body: Arc::new(b"Hello World!".to_vec()), + }; + let result = cache.get(&key).await; + assert_eq!(true, result.is_none()); + cache.put(key.clone(), obj.clone(), 1).await.unwrap(); + let result = cache.get(&key).await.unwrap(); + assert_eq!(obj, result); + } +} diff --git a/src/http_extra/http_header.rs b/src/http_extra/http_header.rs index 871c43c..87cdf5a 100644 --- a/src/http_extra/http_header.rs +++ b/src/http_extra/http_header.rs @@ -101,8 +101,7 @@ pub fn convert_header_value( _ => { if buf.starts_with(b"$") { if let Ok(value) = std::env::var( - std::string::String::from_utf8_lossy(&buf[1..buf.len()]) - .to_string(), + std::str::from_utf8(&buf[1..buf.len()]).unwrap_or_default(), ) { return HeaderValue::from_str(&value).ok(); } @@ -110,9 +109,7 @@ pub fn convert_header_value( let mut value = BytesMut::with_capacity(20); value = ctx.append_value( value, - std::string::String::from_utf8_lossy(&buf[1..buf.len()]) - .to_string() - .as_str(), + std::str::from_utf8(&buf[1..buf.len()]).unwrap_or_default(), ); if !value.is_empty() { return HeaderValue::from_bytes(&value).ok(); diff --git a/src/plugin/cache.rs b/src/plugin/cache.rs index c1ba19b..3bbba53 100644 --- a/src/plugin/cache.rs +++ b/src/plugin/cache.rs @@ -140,7 +140,7 @@ impl TryFrom<&PluginConf> for Cache { predictor: value.contains_key("predictor"), lock: lock.as_secs().max(1) as u8, max_ttl, - max_file_size: max_file_size.as_u64().max(5 * 1024 * 1024) as usize, + max_file_size: max_file_size.as_u64().max(20 * 1024) as usize, namespace, headers, }; @@ -226,7 +226,7 @@ impl Plugin for Cache { } if !keys.is_empty() { let prefix = - std::string::String::from_utf8_lossy(&keys).to_string(); + std::str::from_utf8(&keys).unwrap_or_default().to_string(); debug!("Cache prefix: {prefix}"); ctx.cache_prefix = Some(prefix); } @@ -234,3 +234,77 @@ impl Plugin for Cache { Ok(None) } } + +#[cfg(test)] +mod tests { + use super::Cache; + use crate::config::{PluginConf, PluginStep}; + use crate::plugin::Plugin; + use crate::state::State; + use pingora::proxy::Session; + use pretty_assertions::assert_eq; + use tokio_test::io::Builder; + + #[test] + fn test_cache_params() { + let params = Cache::try_from( + &toml::from_str::( + r###" +eviction = true +headers = ["Accept-Encoding"] +lock = "2s" +max_file_size = "100kb" +predictor = true +max_ttl = "1m" +"###, + ) + .unwrap(), + ) + .unwrap(); + assert_eq!(true, params.eviction); + assert_eq!( + r#"Some(["Accept-Encoding"])"#, + format!("{:?}", params.headers) + ); + assert_eq!(2, params.lock); + assert_eq!(100 * 1000, params.max_file_size); + assert_eq!(60, params.max_ttl.unwrap().as_secs()); + assert_eq!(true, params.predictor); + } + #[tokio::test] + async fn test_cache() { + let cache = Cache::try_from( + &toml::from_str::( + r###" +namespace = "pingap" +eviction = true +headers = ["Accept-Encoding"] +lock = "2s" +max_file_size = "100kb" +predictor = true +max_ttl = "1m" +"###, + ) + .unwrap(), + ) + .unwrap(); + + assert_eq!("request", cache.step().to_string()); + assert_eq!("cache", cache.category().to_string()); + + let headers = ["Accept-Encoding: gzip"].join("\r\n"); + let input_header = + format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n"); + let mock_io = Builder::new().read(input_header.as_bytes()).build(); + let mut session = Session::new_h1(Box::new(mock_io)); + session.read_request().await.unwrap(); + let mut ctx = State::default(); + cache + .handle_request(PluginStep::Request, &mut session, &mut ctx) + .await + .unwrap(); + assert_eq!("pingap:gzip:", ctx.cache_prefix.unwrap()); + assert_eq!(true, session.cache.enabled()); + assert_eq!(100 * 1000, cache.max_file_size); + } +} diff --git a/src/proxy/server.rs b/src/proxy/server.rs index a7b1707..1b40ee0 100644 --- a/src/proxy/server.rs +++ b/src/proxy/server.rs @@ -601,11 +601,13 @@ impl ProxyHttp for Server { session: &Session, ctx: &mut Self::CTX, ) -> pingora::Result { - Ok(CacheKey::new( + let key = CacheKey::new( ctx.cache_prefix.clone().unwrap_or_default(), format!("{}", session.req_header().uri), "".to_string(), - )) + ); + debug!(key = format!("{key:?}"), "cache key callback"); + Ok(key) } fn response_cache_filter( diff --git a/src/state/ctx.rs b/src/state/ctx.rs index 12e4657..e481624 100644 --- a/src/state/ctx.rs +++ b/src/state/ctx.rs @@ -205,3 +205,126 @@ impl State { buf } } + +#[cfg(test)] +mod tests { + use super::State; + use crate::state::CompressionStat; + use crate::util; + use bytes::BytesMut; + use pretty_assertions::assert_eq; + use std::time::Duration; + + #[test] + fn test_state() { + let mut ctx = State { + ..Default::default() + }; + + assert_eq!( + b"false", + ctx.append_value(BytesMut::new(), "reused").as_ref() + ); + + ctx.reused = true; + assert_eq!( + b"true", + ctx.append_value(BytesMut::new(), "reused").as_ref() + ); + + ctx.upstream_address = "192.168.1.1:80".to_string(); + assert_eq!( + b"192.168.1.1:80", + ctx.append_value(BytesMut::new(), "upstream_addr").as_ref() + ); + + ctx.processing = 10; + assert_eq!( + b"10", + ctx.append_value(BytesMut::new(), "processing").as_ref() + ); + + ctx.upstream_connect_time = Some(1); + assert_eq!( + b"1ms", + ctx.append_value(BytesMut::new(), "upstream_connect_time") + .as_ref() + ); + + ctx.upstream_connected = Some(30); + assert_eq!( + b"30", + ctx.append_value(BytesMut::new(), "upstream_connected") + .as_ref() + ); + + ctx.upstream_processing_time = Some(2); + assert_eq!( + b"2ms", + ctx.append_value(BytesMut::new(), "upstream_processing_time") + .as_ref() + ); + + ctx.upstream_response_time = Some(3); + assert_eq!( + b"3ms", + ctx.append_value(BytesMut::new(), "upstream_response_time") + .as_ref() + ); + + ctx.location = "pingap".to_string(); + assert_eq!( + b"pingap", + ctx.append_value(BytesMut::new(), "location").as_ref() + ); + + ctx.connection_time = 4; + assert_eq!( + b"4ms", + ctx.append_value(BytesMut::new(), "connection_time") + .as_ref() + ); + + ctx.tls_version = Some("tls1.3".to_string()); + assert_eq!( + b"tls1.3", + ctx.append_value(BytesMut::new(), "tls_version").as_ref() + ); + + ctx.compression_stat = Some(CompressionStat { + in_bytes: 1024, + out_bytes: 500, + duration: Duration::from_millis(5), + }); + assert_eq!( + b"5ms", + ctx.append_value(BytesMut::new(), "compression_time") + .as_ref() + ); + assert_eq!( + b"2.0", + ctx.append_value(BytesMut::new(), "compression_ratio") + .as_ref() + ); + + ctx.cache_lookup_time = Some(6); + assert_eq!( + b"6ms", + ctx.append_value(BytesMut::new(), "cache_lookup_time") + .as_ref() + ); + + ctx.cache_lock_time = Some(7); + assert_eq!( + b"7ms", + ctx.append_value(BytesMut::new(), "cache_lock_time") + .as_ref() + ); + + ctx.created_at = util::now().as_millis() as u64 - 1; + assert_eq!( + b"1ms", + ctx.append_value(BytesMut::new(), "service_time").as_ref() + ); + } +}