Skip to content

Commit

Permalink
Test and fixes for idempotence mode
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo committed Mar 28, 2024
1 parent c090187 commit 62815f5
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 14 deletions.
10 changes: 1 addition & 9 deletions golem-common/src/model/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl OplogEntry {
let deserialized_array: Vec<Vec<u8>> = serde_json::from_slice(payload)
.unwrap_or_else(|err| {
panic!(
"Failed to deserialize oplog payload: {:?}: {err}",
"Failed to deserialize oplog payload for {function_name}: {:?}: {err}",
std::str::from_utf8(payload).unwrap_or("???")
)
});
Expand Down Expand Up @@ -359,9 +359,7 @@ mod tests {

#[test]
fn oplog_entry_imported_function_invoked_payload_roundtrip() {
let timestamp = Timestamp::now_utc();
let entry = OplogEntry::imported_function_invoked(
timestamp,
"function_name".to_string(),
&("example payload".to_string()),
WrappedFunctionType::ReadLocal,
Expand Down Expand Up @@ -396,8 +394,6 @@ mod tests {

#[test]
fn oplog_entry_exported_function_invoked_payload_roundtrip() {
let timestamp = Timestamp::now_utc();

let val1 = Val {
val: Some(val::Val::Result(Box::new(ValResult {
discriminant: 0,
Expand All @@ -407,7 +403,6 @@ mod tests {
}))),
};
let entry = OplogEntry::exported_function_invoked(
timestamp,
"function_name".to_string(),
&vec![val1.clone()],
Some(InvocationKey {
Expand Down Expand Up @@ -459,8 +454,6 @@ mod tests {

#[test]
fn oplog_entry_exported_function_completed_roundtrip() {
let timestamp = Timestamp::now_utc();

let val1 = Val {
val: Some(val::Val::Result(Box::new(ValResult {
discriminant: 0,
Expand All @@ -474,7 +467,6 @@ mod tests {
};

let entry = OplogEntry::exported_function_completed(
timestamp,
&vec![val1.clone(), val2.clone()],
1_000_000_000,
)
Expand Down
2 changes: 1 addition & 1 deletion golem-worker-executor-base/src/durable_host/golem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl<Ctx: WorkerCtx> golem::api::host::Host for DurableWorkerCtx<Ctx> {
|_ctx| {
Box::pin(async move {
let uuid = Uuid::new_v4();
Ok::<Uuid, GolemError>(uuid.into())
Ok::<Uuid, GolemError>(uuid)
})
},
|_ctx, uuid: &Uuid| Ok(uuid.as_u64_pair()),
Expand Down
4 changes: 3 additions & 1 deletion golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,9 @@ impl<Ctx: WorkerCtx> PrivateDurableWorkerState<Ctx> {
let end_index = self
.lookup_oplog_entry(begin_index, OplogEntry::is_end_remote_write)
.await;
if end_index == None {
if end_index.is_none() {
// Must switch to live mode before failing to be able to commit an Error entry
self.oplog_idx = self.oplog_size;
Err(GolemError::runtime(
"Non-idempotent remote write operation was not completed, cannot retry",
))
Expand Down
4 changes: 3 additions & 1 deletion golem-worker-executor-base/src/durable_host/serialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ impl From<GolemError> for SerializableError {

impl From<&GolemError> for SerializableError {
fn from(value: &GolemError) -> Self {
Self::Golem { error: value.clone() }
Self::Golem {
error: value.clone(),
}
}
}

Expand Down
167 changes: 167 additions & 0 deletions golem-worker-executor-base/tests/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,170 @@ async fn atomic_region() {

check!(events == vec!["1", "2", "1", "2", "1", "2", "3", "4", "5", "5", "5", "6"]);
}

#[tokio::test]
#[tracing::instrument]
async fn idempotence_on() {
let context = common::TestContext::new();
let mut executor = common::start(&context).await.unwrap();

let host_http_port = context.host_http_port();

let events = Arc::new(Mutex::new(Vec::new()));
let events_clone = events.clone();

let http_server = tokio::spawn(async move {
let call_count_per_step = Arc::new(Mutex::new(HashMap::<u64, u64>::new()));
let route = warp::path("step")
.and(warp::path::param())
.and(warp::get())
.map(move |step: u64| {
let mut steps = call_count_per_step.lock().unwrap();
let step_count = steps.entry(step).and_modify(|e| *e += 1).or_insert(0);

println!("step: {step} occurrence {step_count}");

match &step_count {
0 => Response::builder()
.status(StatusCode::OK)
.body(Body::from("true"))
.unwrap(),
_ => Response::builder()
.status(StatusCode::OK)
.body(Body::from("false"))
.unwrap(),
}
})
.or(warp::path("side-effect")
.and(warp::post())
.and(warp::body::bytes())
.map(move |body: Bytes| {
let body = String::from_utf8(body.to_vec()).unwrap();
info!("received POST message: {body}");
events_clone.lock().unwrap().push(body.clone());
Response::builder()
.status(StatusCode::OK)
.body("OK")
.unwrap()
}));

warp::serve(route)
.run(
format!("0.0.0.0:{}", host_http_port)
.parse::<SocketAddr>()
.unwrap(),
)
.await;
});

let template_id = executor.store_template(Path::new("../test-templates/runtime-service.wasm"));

let mut env = HashMap::new();
env.insert("PORT".to_string(), context.host_http_port().to_string());

let worker_id = executor
.try_start_worker_versioned(&template_id, 0, "idempotence-flag", vec![], env)
.await
.unwrap();

let _ = executor
.invoke_and_await(
&worker_id,
"golem:it/api/idempotence-flag",
vec![common::val_bool(true)],
)
.await
.unwrap();

drop(executor);
http_server.abort();

let events = events.lock().unwrap().clone();
println!("events:\n - {}", events.join("\n - "));

check!(events == vec!["1", "1"]);
}

#[tokio::test]
#[tracing::instrument]
async fn idempotence_off() {
let context = common::TestContext::new();
let mut executor = common::start(&context).await.unwrap();

let host_http_port = context.host_http_port();

let events = Arc::new(Mutex::new(Vec::new()));
let events_clone = events.clone();

let http_server = tokio::spawn(async move {
let call_count_per_step = Arc::new(Mutex::new(HashMap::<u64, u64>::new()));
let route = warp::path("step")
.and(warp::path::param())
.and(warp::get())
.map(move |step: u64| {
let mut steps = call_count_per_step.lock().unwrap();
let step_count = steps.entry(step).and_modify(|e| *e += 1).or_insert(0);

println!("step: {step} occurrence {step_count}");

match &step_count {
0 => Response::builder()
.status(StatusCode::OK)
.body(Body::from("true"))
.unwrap(),
_ => Response::builder()
.status(StatusCode::OK)
.body(Body::from("false"))
.unwrap(),
}
})
.or(warp::path("side-effect")
.and(warp::post())
.and(warp::body::bytes())
.map(move |body: Bytes| {
let body = String::from_utf8(body.to_vec()).unwrap();
info!("received POST message: {body}");
events_clone.lock().unwrap().push(body.clone());
Response::builder()
.status(StatusCode::OK)
.body("OK")
.unwrap()
}));

warp::serve(route)
.run(
format!("0.0.0.0:{}", host_http_port)
.parse::<SocketAddr>()
.unwrap(),
)
.await;
});

let template_id = executor.store_template(Path::new("../test-templates/runtime-service.wasm"));

let mut env = HashMap::new();
env.insert("PORT".to_string(), context.host_http_port().to_string());

let worker_id = executor
.try_start_worker_versioned(&template_id, 0, "idempotence-flag", vec![], env)
.await
.unwrap();

let result = executor
.invoke_and_await(
&worker_id,
"golem:it/api/idempotence-flag",
vec![common::val_bool(false)],
)
.await;

drop(executor);
http_server.abort();

let events = events.lock().unwrap().clone();
println!("events:\n - {}", events.join("\n - "));
println!("result: {:?}", result);

check!(events == vec!["1"]);
check!(result.is_err());
}
Binary file modified test-templates/runtime-service.wasm
Binary file not shown.
7 changes: 6 additions & 1 deletion test-templates/runtime-service/cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ path = "wit"
"golem:api" = { path = "wit/deps/golem" }
"golem:rpc" = { path = "wit/deps/wasm-rpc" }
"wasi:clocks" = { path = "wit/deps/clocks" }
"wasi:io" = { path = "wit/deps/io" }
"wasi:io" = { path = "wit/deps/io" }
"wasi:http" = { path = "wit/deps/http" }
"wasi:random" = { path = "wit/deps/random" }
"wasi:cli" = { path = "wit/deps/cli" }
"wasi:filesystem" = { path = "wit/deps/filesystem" }
"wasi:sockets" = { path = "wit/deps/sockets" }
100 changes: 99 additions & 1 deletion test-templates/runtime-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ mod bindings;
use reqwest::{Client, Response};
use crate::bindings::exports::golem::it::api::Guest;
use crate::bindings::golem::api::host::*;
use crate::bindings::wasi;
use crate::bindings::wasi::io::streams::StreamError;

struct Component;

Expand Down Expand Up @@ -106,6 +108,28 @@ impl Guest for Component {

remote_side_effect("6"); // only performed once
}

fn idempotence_flag(enabled: bool) {
let original = get_idempotence_mode();
if original != enabled {
set_idempotence_mode(enabled);
println!("Changed idempotence mode from {original} to {enabled}");
}

let future_response = send_remote_side_effect("1");

let begin = mark_begin_operation();
let decision = remote_call(1); // will return false on the 2nd call
if decision {
panic!("crash 1");
}
mark_end_operation(begin);

let incoming_response = get_incoming_response(&future_response);
let body = read_body(&incoming_response);

println!("Received response from remote side-effect: {} {}", incoming_response.status(), String::from_utf8(body).unwrap());
}
}

fn remote_call(param: u64) -> bool {
Expand Down Expand Up @@ -145,4 +169,78 @@ fn remote_side_effect(message: &str) {
let status = response.status();

println!("Received {status}");
}
}

// Using the low-level wasi-http API to initiate a remote call without reading the response
// useful for failing in the middle of an ongoing request to test idempotence modes
fn send_remote_side_effect(message: &str) -> wasi::http::types::FutureIncomingResponse {
let port = std::env::var("PORT").unwrap_or("9999".to_string());

let headers = wasi::http::types::Fields::new();
let request = wasi::http::types::OutgoingRequest::new(headers);
request.set_method(&wasi::http::types::Method::Post).unwrap();
request.set_path_with_query(Some("/side-effect")).unwrap();
request.set_scheme(Some(&wasi::http::types::Scheme::Http)).unwrap();
request.set_authority(Some(&format!("localhost:{port}"))).unwrap();

let request_body = request.body().unwrap();
let request_body_stream = request_body.write().unwrap();
request_body_stream.write(message.as_bytes()).unwrap();
drop(request_body_stream);
wasi::http::types::OutgoingBody::finish(request_body, None).unwrap();

let options = wasi::http::types::RequestOptions::new();
options.set_connect_timeout(Some(5000000000)).unwrap(); // 5s
options.set_first_byte_timeout(Some(5000000000)).unwrap(); // 5s
options.set_between_bytes_timeout(Some(5000000000)).unwrap(); // 5s

let future_incoming_response = wasi::http::outgoing_handler::handle(request, Some(options)).unwrap();

future_incoming_response
}

fn get_incoming_response(
future_incoming_response: &wasi::http::types::FutureIncomingResponse,
) -> wasi::http::types::IncomingResponse {
let incoming_response = match future_incoming_response.get() {
Some(Ok(Ok(incoming_response))) => {
println!("Got incoming response");
incoming_response
}
Some(Ok(Err(err))) => {
println!("Returned with error code: {err:?}");
panic!("Error: {:?}", err)
}
Some(Err(err)) => {
println!("Returned with error: {err:?}");
panic!("Error: {:?}", err)
}
None => {
println!("No incoming response yet, polling");
let pollable = future_incoming_response.subscribe();
let _ = wasi::io::poll::poll(&[&pollable]);
get_incoming_response(future_incoming_response)
}
};
incoming_response
}

fn read_body(incoming_response: &wasi::http::types::IncomingResponse) -> Vec<u8> {
let response_body = incoming_response.consume().unwrap();
let response_body_stream = response_body.stream().unwrap();
let mut body = Vec::new();

let mut eof = false;
while !eof {
match response_body_stream.read(u64::MAX) {
Ok(mut body_chunk) => {
body.append(&mut body_chunk);
}
Err(StreamError::Closed) => {
eof = true;
}
Err(err) => panic!("Error: {:?}", err),
}
}
body
}
Loading

0 comments on commit 62815f5

Please sign in to comment.