Skip to content

Commit

Permalink
refactor: using tinyufo to improve performance of file cache
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Jul 5, 2024
1 parent 9db645a commit 9b62fc8
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ perf = ["pyro", "dhat"]
criterion = { version = "0.5.1", features = ["html_reports"] }
pretty_assertions = "1.4.0"
tokio-test = "0.4.4"
tempfile = "3.10.1"

[profile.release]
codegen-units = 1
Expand Down
49 changes: 47 additions & 2 deletions src/cache/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ use super::{Error, Result};
use crate::util;
use async_trait::async_trait;
use std::path::Path;
use tinyufo::TinyUfo;
use tokio::fs;
use tracing::info;

pub struct FileCache {
directory: String,
cache: TinyUfo<String, CacheObject>,
}

pub fn new_file_cache(dir: &str) -> Result<FileCache> {
Expand All @@ -32,12 +34,18 @@ pub fn new_file_cache(dir: &str) -> Result<FileCache> {
}
info!(dir, "new file cache");

Ok(FileCache { directory: dir })
Ok(FileCache {
directory: dir,
cache: TinyUfo::new(100, 100),
})
}

#[async_trait]
impl HttpCacheStorage for FileCache {
async fn get(&self, key: &str) -> Option<CacheObject> {
if let Some(obj) = self.cache.get(&key.to_string()) {
return Some(obj);
}
let file = Path::new(&self.directory).join(key);
let Ok(buf) = fs::read(file).await else {
return None;
Expand All @@ -52,8 +60,9 @@ impl HttpCacheStorage for FileCache {
&self,
key: String,
data: CacheObject,
_weight: u16,
weight: u16,
) -> Result<()> {
self.cache.put(key.clone(), data.clone(), weight);
let buf: Vec<u8> = data.into();
let file = Path::new(&self.directory).join(key);
fs::write(file, buf)
Expand All @@ -62,10 +71,46 @@ impl HttpCacheStorage for FileCache {
Ok(())
}
async fn remove(&self, key: &str) -> Result<Option<CacheObject>> {
// TODO remove from tinyufo
let file = Path::new(&self.directory).join(key);
fs::remove_file(file)
.await
.map_err(|e| Error::Io { source: e })?;
Ok(None)
}
}

#[cfg(test)]
mod tests {
use super::new_file_cache;
use crate::cache::http_cache::{CacheObject, HttpCacheStorage};
use pretty_assertions::assert_eq;
use std::sync::Arc;
use tempfile::TempDir;

#[tokio::test]
async fn test_file_cache() {
let dir = TempDir::new().unwrap();
let dir = dir.into_path().to_string_lossy().to_string();
let cache = new_file_cache(&dir).unwrap();
let key = "key".to_string();
let obj = CacheObject {
meta: (b"Hello".to_vec(), b"World".to_vec()),
body: Arc::new(b"Hello World!".to_vec()),
};
let result = cache.get(&key).await;
assert_eq!(true, result.is_none());
cache.put(key.clone(), obj.clone(), 1).await.unwrap();
let result = cache.get(&key).await.unwrap();
assert_eq!(obj, result);

// empty tinyufo, get from file
let cache = new_file_cache(&dir).unwrap();
let result = cache.get(&key).await.unwrap();
assert_eq!(obj, result);

cache.remove(&key).await.unwrap();
let result = cache.get(&key).await;
assert_eq!(true, result.is_none());
}
}
57 changes: 56 additions & 1 deletion src/cache/http_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::sync::Arc;

type BinaryMeta = (Vec<u8>, Vec<u8>);

#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, PartialEq)]
pub struct CacheObject {
pub meta: BinaryMeta,
pub body: Arc<Vec<u8>>,
Expand Down Expand Up @@ -198,6 +198,7 @@ impl HandleMiss for ObjectMissHandler {
1,
)
.await?;

Ok(size)
}
}
Expand Down Expand Up @@ -299,3 +300,57 @@ impl Storage for HttpCache {
self
}
}

#[cfg(test)]
mod tests {
use super::{CompleteHit, HttpCacheStorage, ObjectMissHandler};
use crate::cache::tiny::new_tiny_ufo_cache;
use bytes::{Bytes, BytesMut};
use pingora::cache::storage::{HitHandler, MissHandler};
use pretty_assertions::assert_eq;
use std::sync::Arc;

#[tokio::test]
async fn test_complete_hit() {
let body = Arc::new(b"Hello World!".to_vec());
let size = body.len();
let hit = CompleteHit {
body,
done: false,
range_start: 0,
range_end: size,
};
let mut handle: HitHandler = Box::new(hit);
let body = handle.read_body().await.unwrap();
assert_eq!(true, body.is_some());
assert_eq!(b"Hello World!", body.unwrap().as_ref());

handle.seek(1, Some(size - 1)).unwrap();
let body = handle.read_body().await.unwrap();
assert_eq!(true, body.is_some());
assert_eq!(b"ello World", body.unwrap().as_ref());
}

#[tokio::test]
async fn test_object_miss_handler() {
let key = "key";

let cache = Arc::new(new_tiny_ufo_cache(10, 10));
let obj = ObjectMissHandler {
meta: (b"Hello".to_vec(), b"World".to_vec()),
body: BytesMut::new(),
key: key.to_string(),
cache: cache.clone(),
};
let mut handle: MissHandler = Box::new(obj);

handle
.write_body(Bytes::from_static(b"Hello World!"), true)
.await
.unwrap();
handle.finish().await.unwrap();

let data = cache.get(key).await.unwrap();
assert_eq!("Hello World!", std::str::from_utf8(&data.body).unwrap());
}
}
22 changes: 22 additions & 0 deletions src/cache/tiny.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,25 @@ impl HttpCacheStorage for TinyUfoCache {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::new_tiny_ufo_cache;
use crate::cache::http_cache::{CacheObject, HttpCacheStorage};
use pretty_assertions::assert_eq;
use std::sync::Arc;
#[tokio::test]
async fn test_tiny_ufo_cache() {
let cache = new_tiny_ufo_cache(10, 10);
let key = "key".to_string();
let obj = CacheObject {
meta: (b"Hello".to_vec(), b"World".to_vec()),
body: Arc::new(b"Hello World!".to_vec()),
};
let result = cache.get(&key).await;
assert_eq!(true, result.is_none());
cache.put(key.clone(), obj.clone(), 1).await.unwrap();
let result = cache.get(&key).await.unwrap();
assert_eq!(obj, result);
}
}
7 changes: 2 additions & 5 deletions src/http_extra/http_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,15 @@ pub fn convert_header_value(
_ => {
if buf.starts_with(b"$") {
if let Ok(value) = std::env::var(
std::string::String::from_utf8_lossy(&buf[1..buf.len()])
.to_string(),
std::str::from_utf8(&buf[1..buf.len()]).unwrap_or_default(),
) {
return HeaderValue::from_str(&value).ok();
}
} else if buf.starts_with(b":") {
let mut value = BytesMut::with_capacity(20);
value = ctx.append_value(
value,
std::string::String::from_utf8_lossy(&buf[1..buf.len()])
.to_string()
.as_str(),
std::str::from_utf8(&buf[1..buf.len()]).unwrap_or_default(),
);
if !value.is_empty() {
return HeaderValue::from_bytes(&value).ok();
Expand Down
78 changes: 76 additions & 2 deletions src/plugin/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl TryFrom<&PluginConf> for Cache {
predictor: value.contains_key("predictor"),
lock: lock.as_secs().max(1) as u8,
max_ttl,
max_file_size: max_file_size.as_u64().max(5 * 1024 * 1024) as usize,
max_file_size: max_file_size.as_u64().max(20 * 1024) as usize,
namespace,
headers,
};
Expand Down Expand Up @@ -226,11 +226,85 @@ impl Plugin for Cache {
}
if !keys.is_empty() {
let prefix =
std::string::String::from_utf8_lossy(&keys).to_string();
std::str::from_utf8(&keys).unwrap_or_default().to_string();
debug!("Cache prefix: {prefix}");
ctx.cache_prefix = Some(prefix);
}

Ok(None)
}
}

#[cfg(test)]
mod tests {
use super::Cache;
use crate::config::{PluginConf, PluginStep};
use crate::plugin::Plugin;
use crate::state::State;
use pingora::proxy::Session;
use pretty_assertions::assert_eq;
use tokio_test::io::Builder;

#[test]
fn test_cache_params() {
let params = Cache::try_from(
&toml::from_str::<PluginConf>(
r###"
eviction = true
headers = ["Accept-Encoding"]
lock = "2s"
max_file_size = "100kb"
predictor = true
max_ttl = "1m"
"###,
)
.unwrap(),
)
.unwrap();
assert_eq!(true, params.eviction);
assert_eq!(
r#"Some(["Accept-Encoding"])"#,
format!("{:?}", params.headers)
);
assert_eq!(2, params.lock);
assert_eq!(100 * 1000, params.max_file_size);
assert_eq!(60, params.max_ttl.unwrap().as_secs());
assert_eq!(true, params.predictor);
}
#[tokio::test]
async fn test_cache() {
let cache = Cache::try_from(
&toml::from_str::<PluginConf>(
r###"
namespace = "pingap"
eviction = true
headers = ["Accept-Encoding"]
lock = "2s"
max_file_size = "100kb"
predictor = true
max_ttl = "1m"
"###,
)
.unwrap(),
)
.unwrap();

assert_eq!("request", cache.step().to_string());
assert_eq!("cache", cache.category().to_string());

let headers = ["Accept-Encoding: gzip"].join("\r\n");
let input_header =
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
let mock_io = Builder::new().read(input_header.as_bytes()).build();
let mut session = Session::new_h1(Box::new(mock_io));
session.read_request().await.unwrap();
let mut ctx = State::default();
cache
.handle_request(PluginStep::Request, &mut session, &mut ctx)
.await
.unwrap();
assert_eq!("pingap:gzip:", ctx.cache_prefix.unwrap());
assert_eq!(true, session.cache.enabled());
assert_eq!(100 * 1000, cache.max_file_size);
}
}
6 changes: 4 additions & 2 deletions src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,11 +601,13 @@ impl ProxyHttp for Server {
session: &Session,
ctx: &mut Self::CTX,
) -> pingora::Result<CacheKey> {
Ok(CacheKey::new(
let key = CacheKey::new(
ctx.cache_prefix.clone().unwrap_or_default(),
format!("{}", session.req_header().uri),
"".to_string(),
))
);
debug!(key = format!("{key:?}"), "cache key callback");
Ok(key)
}

fn response_cache_filter(
Expand Down
Loading

0 comments on commit 9b62fc8

Please sign in to comment.