Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implemented submit chunking for RFD #0010 #737

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions hipcheck-common/proto/hipcheck/v1/hipcheck.proto
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,11 @@ enum QueryState {
// Something has gone wrong.
QUERY_STATE_UNSPECIFIED = 0;

// We are submitting a new query.
QUERY_STATE_SUBMIT = 1;
// We are sending a query to a plugin and we are expecting to need to send more chunks
QUERY_STATE_SUBMIT_IN_PROGRESS = 4;

// We are completed submitting a new query.
QUERY_STATE_SUBMIT_COMPLETE = 1;

// We are replying to a query and expect more chunks.
QUERY_STATE_REPLY_IN_PROGRESS = 2;
Expand Down
97 changes: 65 additions & 32 deletions hipcheck-common/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,20 @@ fn estimate_size(msg: &PluginQuery) -> usize {
}

pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result<Vec<PluginQuery>> {
// Chunking only does something on response objects, mostly because
// we don't have a state to represent "SubmitInProgress"
if msg.state == QueryState::Submit as i32 {
return Ok(vec![msg]);
}
// in_progress_state - the state the PluginQuery is in for all queries in the resulting Vec,
// EXCEPT the last one
//
// completion_state - the state the PluginQuery is in if it is the last chunked message
let (in_progress_state, completion_state) = match msg.state() {
// if the message gets chunked, then it must either be a reply or submission that is in process
QueryState::Unspecified => return Err(anyhow!("msg in Unspecified query state")),
QueryState::SubmitInProgress | QueryState::SubmitComplete => {
(QueryState::SubmitInProgress, QueryState::SubmitComplete)
}
QueryState::ReplyInProgress | QueryState::ReplyComplete => {
(QueryState::ReplyInProgress, QueryState::ReplyComplete)
}
};

let mut out: Vec<PluginQuery> = vec![];
let mut base: PluginQuery = msg;
Expand All @@ -58,9 +67,9 @@ pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result<Vec<Plug
// For this loop, we want to take at most MAX_SIZE bytes because that's
// all that can fit in a PluginQuery
let mut remaining = max_est_size;
let mut query = PluginQuery {
let mut chunked_query = PluginQuery {
id: base.id,
state: QueryState::ReplyInProgress as i32,
state: in_progress_state as i32,
publisher_name: base.publisher_name.clone(),
plugin_name: base.plugin_name.clone(),
query_name: base.query_name.clone(),
Expand All @@ -71,15 +80,15 @@ pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result<Vec<Plug

if remaining > 0 && base.key.bytes().len() > 0 {
// steal from key
query.key = drain_at_most_n_bytes(&mut base.key, remaining)?;
remaining -= query.key.bytes().len();
chunked_query.key = drain_at_most_n_bytes(&mut base.key, remaining)?;
remaining -= chunked_query.key.bytes().len();
made_progress = true;
}

if remaining > 0 && base.output.bytes().len() > 0 {
// steal from output
query.output = drain_at_most_n_bytes(&mut base.output, remaining)?;
remaining -= query.output.bytes().len();
chunked_query.output = drain_at_most_n_bytes(&mut base.output, remaining)?;
remaining -= chunked_query.output.bytes().len();
made_progress = true;
}

Expand All @@ -96,7 +105,7 @@ pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result<Vec<Plug
} else if c_bytes <= remaining {
// steal this concern
let concern = base.concern.swap_remove(i);
query.concern.push(concern);
chunked_query.concern.push(concern);
remaining -= c_bytes;
made_progress = true;
}
Expand All @@ -106,9 +115,14 @@ pub fn chunk_with_size(msg: PluginQuery, max_est_size: usize) -> Result<Vec<Plug
l -= 1;
}

out.push(query);
out.push(chunked_query);
}
out.push(base);

// ensure the last message in the chunked messages is set to the appropriate Complete state
if let Some(last) = out.last_mut() {
last.state = completion_state as i32;
}
Ok(out)
}

Expand Down Expand Up @@ -162,7 +176,9 @@ impl QuerySynthesizer {
.map_err(|_| Error::UnspecifiedQueryState)?;
match state {
QueryState::Unspecified => return Err(Error::UnspecifiedQueryState),
QueryState::Submit => return Err(Error::ReceivedSubmitWhenExpectingReplyChunk),
QueryState::SubmitInProgress | QueryState::SubmitComplete => {
return Err(Error::ReceivedSubmitWhenExpectingReplyChunk)
}
Comment on lines +179 to +181
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the QuerySynthesizer needs to be modified as well to support de-chunking chunked Submit queries.

QueryState::ReplyInProgress | QueryState::ReplyComplete => {
if state == QueryState::ReplyComplete {
raw.state = QueryState::ReplyComplete.into();
Expand Down Expand Up @@ -209,23 +225,40 @@ mod test {

#[test]
fn test_chunking() {
let query = PluginQuery {
id: 0,
state: QueryState::ReplyComplete as i32,
publisher_name: "".to_owned(),
plugin_name: "".to_owned(),
query_name: "".to_owned(),
// This key will cause the chunk not to occur on a char boundary
key: "aこれは実験です".to_owned(),
output: "".to_owned(),
concern: vec!["< 10".to_owned(), "0123456789".to_owned()],
};
let res = match chunk_with_size(query, 10) {
Ok(r) => r,
Err(e) => {
panic!("{e}");
}
};
assert_eq!(res.len(), 4);
// test both reply and submission chunking
let states = [
(QueryState::SubmitInProgress, QueryState::SubmitComplete),
(QueryState::ReplyInProgress, QueryState::ReplyComplete),
];

for (intermediate_state, final_state) in states.into_iter() {
let query = PluginQuery {
id: 0,
state: final_state as i32,
publisher_name: "".to_owned(),
plugin_name: "".to_owned(),
query_name: "".to_owned(),
// This key will cause the chunk not to occur on a char boundary
key: "aこれは実験です".to_owned(),
output: "".to_owned(),
concern: vec!["< 10".to_owned(), "0123456789".to_owned()],
};
let res = match chunk_with_size(query, 10) {
Ok(r) => r,
Err(e) => {
panic!("{e}");
}
};
// ensure first 3 are ...InProgress
assert_eq!(
res.iter()
.filter(|x| x.state() == intermediate_state)
.count(),
3
);
// ensure last one is ...Complete
assert_eq!(res.last().unwrap().state(), final_state);
assert_eq!(res.len(), 4);
}
Comment on lines +259 to +262
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure the above QuerySynthesizer is implemented correctly, perhaps there should be an additional stage to this test that tries to reassemble the original message from the fragments.

}
}
4 changes: 4 additions & 0 deletions hipcheck-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ pub enum Error {
#[error("unexpected ReplyInProgress state for query")]
UnexpectedReplyInProgress,

/// The `PluginEngine` received a message with the unexpected status `RequestInProgress`
#[error("unexpected RequestInProgress state for query")]
UnexpectedRequestInProgress,

/// The `PluginEngine` received a message with a request-type status when it expected a reply
#[error("remote sent QuerySubmit when reply chunk expected")]
ReceivedSubmitWhenExpectingReplyChunk,
Expand Down
5 changes: 3 additions & 2 deletions hipcheck-common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ impl TryFrom<QueryState> for QueryDirection {
fn try_from(value: QueryState) -> Result<Self, Self::Error> {
match value {
QueryState::Unspecified => Err(Error::UnspecifiedQueryState),
QueryState::Submit => Ok(QueryDirection::Request),
QueryState::SubmitInProgress => Err(Error::UnexpectedRequestInProgress),
QueryState::SubmitComplete => Ok(QueryDirection::Request),
QueryState::ReplyInProgress => Err(Error::UnexpectedReplyInProgress),
QueryState::ReplyComplete => Ok(QueryDirection::Response),
}
Expand All @@ -39,7 +40,7 @@ impl TryFrom<QueryState> for QueryDirection {
impl From<QueryDirection> for QueryState {
fn from(value: QueryDirection) -> Self {
match value {
QueryDirection::Request => QueryState::Submit,
QueryDirection::Request => QueryState::SubmitComplete,
QueryDirection::Response => QueryState::ReplyComplete,
}
}
Expand Down
1 change: 1 addition & 0 deletions sdk/rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl From<hipcheck_common::error::Error> for Error {
use hipcheck_common::error::Error::*;
match value {
UnspecifiedQueryState => Error::UnspecifiedQueryState,
UnexpectedRequestInProgress => Error::UnexpectedReplyInProgress,
patrickjcasey marked this conversation as resolved.
Show resolved Hide resolved
UnexpectedReplyInProgress => Error::UnexpectedReplyInProgress,
ReceivedSubmitWhenExpectingReplyChunk => Error::ReceivedSubmitWhenExpectingReplyChunk,
MoreAfterQueryComplete { id } => Error::MoreAfterQueryComplete { id },
Expand Down
2 changes: 1 addition & 1 deletion sdk/rust/src/plugin_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl HcSessionSocket {
return Ok(HandleAction::ForwardMsgToExistingSession(tx));
}

if query.state() == QueryState::Submit {
if [QueryState::SubmitInProgress, QueryState::SubmitComplete].contains(&query.state()) {
return Ok(HandleAction::CreateSession);
}

Expand Down
2 changes: 1 addition & 1 deletion xtask/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ pub mod buf;
pub mod changelog;
pub mod check;
pub mod ci;
pub mod manifest;
pub mod rfd;
pub mod site;
pub mod manifest;
Loading