Skip to content

Commit

Permalink
Optimize WASM Rust SDK's body caching logic. (alibaba#1181)
Browse files Browse the repository at this point in the history
  • Loading branch information
007gzs authored Aug 7, 2024
1 parent 0a578c2 commit 980ffde
Showing 1 changed file with 50 additions and 41 deletions.
91 changes: 50 additions & 41 deletions plugins/wasm-rust/src/plugin_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ pub trait RootContextWrapper<PluginConfig>: RootContext
where
PluginConfig: Default + DeserializeOwned + 'static + Clone,
{
// fn create_http_context(&self, _context_id: u32) -> Option<Box<dyn HttpContext>> {
fn create_http_context_use_wrapper(&self, _context_id: u32) -> Option<Box<dyn HttpContext>> {
// fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
fn create_http_context_use_wrapper(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
// trait 继承没法重写 RootContext 的 create_http_context,先写个函数让上层调下吧
match self.create_http_context_wrapper(_context_id) {
match self.create_http_context_wrapper(context_id) {
Some(http_context) => Some(Box::new(PluginHttpWrapper::new(
self.rule_matcher(),
http_context,
Expand Down Expand Up @@ -58,11 +58,17 @@ pub trait HttpContextWrapper<PluginConfig>: HttpContext {
fn on_http_response_body_ok(&mut self, _res_body: &Bytes) -> Action {
Action::Continue
}
fn replace_http_request_body(&mut self, body: &[u8]) {
self.set_http_request_body(0, i32::MAX as usize, body)
}
fn replace_http_response_body(&mut self, body: &[u8]) {
self.set_http_response_body(0, i32::MAX as usize, body)
}
}
pub struct PluginHttpWrapper<PluginConfig> {
req_headers: MultiMap<String, String>,
req_body: Bytes,
res_body: Bytes,
req_body_len: usize,
res_body_len: usize,
config: Option<PluginConfig>,
rule_matcher: SharedRuleMatcher<PluginConfig>,
http_content: Box<dyn HttpContextWrapper<PluginConfig>>,
Expand All @@ -74,8 +80,8 @@ impl<PluginConfig> PluginHttpWrapper<PluginConfig> {
) -> Self {
PluginHttpWrapper {
req_headers: MultiMap::new(),
req_body: Bytes::new(),
res_body: Bytes::new(),
req_body_len: 0,
res_body_len: 0,
config: None,
rule_matcher: rule_matcher.clone(),
http_content,
Expand Down Expand Up @@ -123,12 +129,9 @@ impl<PluginConfig> HttpContext for PluginHttpWrapper<PluginConfig>
where
PluginConfig: Default + DeserializeOwned + Clone,
{
fn on_http_request_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action {
fn on_http_request_headers(&mut self, num_headers: usize, end_of_stream: bool) -> Action {
let binding = self.rule_matcher.borrow();
self.config = match binding.get_match_config() {
None => None,
Some(config) => Some(config.1.clone()),
};
self.config = binding.get_match_config().map(|config| config.1.clone());
for (k, v) in self.get_http_request_headers() {
self.req_headers.insert(k, v);
}
Expand All @@ -137,61 +140,67 @@ where
}
let ret = self
.http_content
.on_http_request_headers(_num_headers, _end_of_stream);
.on_http_request_headers(num_headers, end_of_stream);
if ret != Action::Continue {
return ret;
}
self.http_content
.on_http_request_headers_ok(&self.req_headers)
}

fn on_http_request_body(&mut self, _body_size: usize, _end_of_stream: bool) -> Action {
let mut ret = self
fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
self.req_body_len += body_size;
if !end_of_stream {
return Action::Pause;
}
let ret = self
.http_content
.on_http_request_body(_body_size, _end_of_stream);
if !self.http_content.cache_request_body() {
.on_http_request_body(self.req_body_len, end_of_stream);
if ret != Action::Continue || !self.http_content.cache_request_body() {
return ret;
}
if _body_size > 0 {
if let Some(body) = self.get_http_request_body(0, _body_size) {
self.req_body.extend(body)
let mut req_body = Bytes::new();
if self.req_body_len > 0 {
if let Some(body) = self.get_http_request_body(0, self.req_body_len) {
req_body = body;
}
}
if _end_of_stream && ret == Action::Continue {
ret = self.http_content.on_http_request_body_ok(&self.req_body);
}
ret
self.http_content.on_http_request_body_ok(&req_body)
}

fn on_http_request_trailers(&mut self, _num_trailers: usize) -> Action {
self.http_content.on_http_request_trailers(_num_trailers)
fn on_http_request_trailers(&mut self, num_trailers: usize) -> Action {
self.http_content.on_http_request_trailers(num_trailers)
}

fn on_http_response_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action {
fn on_http_response_headers(&mut self, num_headers: usize, end_of_stream: bool) -> Action {
self.http_content
.on_http_response_headers(_num_headers, _end_of_stream)
.on_http_response_headers(num_headers, end_of_stream)
}

fn on_http_response_body(&mut self, _body_size: usize, _end_of_stream: bool) -> Action {
let mut ret = self
fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
self.res_body_len += body_size;

if !end_of_stream {
return Action::Pause;
}
let ret = self
.http_content
.on_http_response_body(_body_size, _end_of_stream);
if !self.http_content.cache_response_body() {
.on_http_response_body(self.res_body_len, end_of_stream);
if ret != Action::Continue || !self.http_content.cache_response_body() {
return ret;
}
if _body_size > 0 {
if let Some(body) = self.get_http_response_body(0, _body_size) {
self.res_body.extend(body);

let mut res_body = Bytes::new();
if self.res_body_len > 0 {
if let Some(body) = self.get_http_response_body(0, self.res_body_len) {
res_body = body;
}
}
if _end_of_stream && ret == Action::Continue {
ret = self.http_content.on_http_response_body_ok(&self.res_body);
}
ret
self.http_content.on_http_response_body_ok(&res_body)
}

fn on_http_response_trailers(&mut self, _num_trailers: usize) -> Action {
self.http_content.on_http_response_trailers(_num_trailers)
fn on_http_response_trailers(&mut self, num_trailers: usize) -> Action {
self.http_content.on_http_response_trailers(num_trailers)
}

fn on_log(&mut self) {
Expand Down

0 comments on commit 980ffde

Please sign in to comment.