Skip to content

Commit

Permalink
More comments
Browse files Browse the repository at this point in the history
  • Loading branch information
afsalthaj committed Jan 9, 2025
1 parent d74ae47 commit b70a7c5
Showing 1 changed file with 74 additions and 77 deletions.
151 changes: 74 additions & 77 deletions integration-tests/tests/fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ inherit_test_dep!(EnvBasedTestDependencies);
#[test]
#[tracing::instrument]
#[timeout(120000)]
async fn fork_interrupted_worker_to_completion(
deps: &EnvBasedTestDependencies,
_tracing: &Tracing,
) {
async fn fork_interrupted_worker(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {
let response = Arc::new(Mutex::new("initial".to_string()));
let host_http_port = 8586;

Expand Down Expand Up @@ -98,7 +95,7 @@ async fn fork_interrupted_worker_to_completion(
#[tracing::instrument]
#[flaky(5)]
#[timeout(120000)]
async fn fork_running_worker_to_completion(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {
async fn fork_running_worker_1(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {
let response = Arc::new(Mutex::new("initial".to_string()));
let host_http_port = 8587;
let http_server = run_http_server(&response, host_http_port);
Expand Down Expand Up @@ -171,12 +168,12 @@ async fn fork_running_worker_to_completion(deps: &EnvBasedTestDependencies, _tra
#[test]
#[tracing::instrument]
#[timeout(120000)]
async fn fork_idle_worker(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {
async fn fork_running_worker_2(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {
let component_id = deps.store_component("shopping-cart").await;

let source_worker_id = WorkerId {
component_id: component_id.clone(),
worker_name: "baz".to_string(),
worker_name: "sc".to_string(),
};

let _ = deps
Expand All @@ -187,19 +184,6 @@ async fn fork_idle_worker(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {
)
.await;

let _ = deps
.invoke_and_await(
&source_worker_id,
"golem:it/api.{add-item}",
vec![Value::Record(vec![
Value::String("G1001".to_string()),
Value::String("Golem Cloud Subscription 1y".to_string()),
Value::F32(999999.0),
Value::U32(1),
])],
)
.await;

let _ = deps
.invoke_and_await(
&source_worker_id,
Expand All @@ -215,76 +199,55 @@ async fn fork_idle_worker(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {

let target_worker_id = WorkerId {
component_id: component_id.clone(),
worker_name: "forked-baz".to_string(),
worker_name: "forked-sc".to_string(),
};

let source_oplog = deps.get_oplog(&source_worker_id, OplogIndex::INITIAL).await;

let oplog_index_of_function_completed_g1001 = OplogIndex::from_u64(11);
let oplog_index_of_function_invoked: OplogIndex = OplogIndex::from_u64(3);

// Minus 1 as oplog index starts from 1
let log_record = source_oplog
.get(u64::from(oplog_index_of_function_completed_g1001) as usize - 1)
.get(u64::from(oplog_index_of_function_invoked) as usize - 1)
.expect("Expect at least one entry in source oplog");

assert!(matches!(
log_record,
PublicOplogEntry::ExportedFunctionCompleted(_)
PublicOplogEntry::ExportedFunctionInvoked(_)
));

let _ = deps
.fork_worker(
&source_worker_id,
&target_worker_id,
oplog_index_of_function_completed_g1001,
)
.await;

//Invoking G1002 again in forked worker
let _ = deps
.invoke_and_await(
&target_worker_id,
"golem:it/api.{add-item}",
vec![Value::Record(vec![
Value::String("G1002".to_string()),
Value::String("Mud Golem".to_string()),
Value::F32(11.0),
Value::U32(10),
])],
oplog_index_of_function_invoked,
)
.await;

let _ = deps
.invoke_and_await(
&target_worker_id,
"golem:it/api.{update-item-quantity}",
vec![Value::String("G1002".to_string()), Value::U32(20)],
)
.await;
deps.wait_for_status(
&target_worker_id,
WorkerStatus::Idle,
Duration::from_secs(10),
)
.await;

let result1 = deps
.search_oplog(&target_worker_id, "G1002 AND NOT pending")
.await;
let result2 = deps
.search_oplog(&target_worker_id, "G1001 AND NOT pending")
let total_cart_initialisation = deps
.search_oplog(&target_worker_id, "initialize-cart AND NOT pending")
.await;

assert_eq!(result1.len(), 4); // two invocations for G1002 and two log messages preceded
assert_eq!(result2.len(), 2); // two invocations for G1001 which was in the original source oplog
// Since the fork point was before the completion, it re-intitialises making the total initialisation
// records 2 along with the new log in target worker.
assert_eq!(total_cart_initialisation.len(), 2);
}

#[test]
#[tracing::instrument]
#[timeout(120000)]
async fn fork_worker_before_completion_of_function(
deps: &EnvBasedTestDependencies,
_tracing: &Tracing,
) {
async fn fork_idle_worker(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {
let component_id = deps.store_component("shopping-cart").await;

let source_worker_id = WorkerId {
component_id: component_id.clone(),
worker_name: "sc".to_string(),
worker_name: "baz".to_string(),
};

let _ = deps
Expand All @@ -295,6 +258,19 @@ async fn fork_worker_before_completion_of_function(
)
.await;

let _ = deps
.invoke_and_await(
&source_worker_id,
"golem:it/api.{add-item}",
vec![Value::Record(vec![
Value::String("G1001".to_string()),
Value::String("Golem Cloud Subscription 1y".to_string()),
Value::F32(999999.0),
Value::U32(1),
])],
)
.await;

let _ = deps
.invoke_and_await(
&source_worker_id,
Expand All @@ -310,44 +286,62 @@ async fn fork_worker_before_completion_of_function(

let target_worker_id = WorkerId {
component_id: component_id.clone(),
worker_name: "forked-sc".to_string(),
worker_name: "forked-baz".to_string(),
};

let source_oplog = deps.get_oplog(&source_worker_id, OplogIndex::INITIAL).await;

let oplog_index_of_function_invoked: OplogIndex = OplogIndex::from_u64(3);
let oplog_index_of_function_completed_g1001 = OplogIndex::from_u64(11);

// Minus 1 as oplog index starts from 1
let log_record = source_oplog
.get(u64::from(oplog_index_of_function_invoked) as usize - 1)
.get(u64::from(oplog_index_of_function_completed_g1001) as usize - 1)
.expect("Expect at least one entry in source oplog");

assert!(matches!(
log_record,
PublicOplogEntry::ExportedFunctionInvoked(_)
PublicOplogEntry::ExportedFunctionCompleted(_)
));

let _ = deps
.fork_worker(
&source_worker_id,
&target_worker_id,
oplog_index_of_function_invoked,
oplog_index_of_function_completed_g1001,
)
.await;

deps.wait_for_status(
&target_worker_id,
WorkerStatus::Idle,
Duration::from_secs(10),
)
.await;
//Invoking G1002 again in forked worker
let _ = deps
.invoke_and_await(
&target_worker_id,
"golem:it/api.{add-item}",
vec![Value::Record(vec![
Value::String("G1002".to_string()),
Value::String("Mud Golem".to_string()),
Value::F32(11.0),
Value::U32(10),
])],
)
.await;

let total_cart_initialisation = deps
.search_oplog(&target_worker_id, "initialize-cart AND NOT pending")
let _ = deps
.invoke_and_await(
&target_worker_id,
"golem:it/api.{update-item-quantity}",
vec![Value::String("G1002".to_string()), Value::U32(20)],
)
.await;

// Since the fork point was before the completion, it re-intitialises making the total initialisation
// records 2 along with the new log in target worker.
assert_eq!(total_cart_initialisation.len(), 2);
let result1 = deps
.search_oplog(&target_worker_id, "G1002 AND NOT pending")
.await;
let result2 = deps
.search_oplog(&target_worker_id, "G1001 AND NOT pending")
.await;

assert_eq!(result1.len(), 4); // two invocations for G1002 and two log messages preceded
assert_eq!(result2.len(), 2); // two invocations for G1001 which was in the original source oplog
}

#[test]
Expand Down Expand Up @@ -437,7 +431,7 @@ async fn fork_worker_with_invalid_oplog_index_cut_off(
#[test]
#[tracing::instrument]
#[timeout(120000)]
async fn fork_worker_4(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {
async fn fork_invalid_worker(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {
let component_id = deps.store_component("shopping-cart").await;

let source_worker_id = WorkerId {
Expand All @@ -463,10 +457,13 @@ async fn fork_worker_4(deps: &EnvBasedTestDependencies, _tracing: &Tracing) {
assert!(error.contains("WorkerNotFound"));
}

// Divergence possibility is mainly respect to environment variables referring to worker-ids.
// Fork shouldn't change the original environment variable values of the source worker
// stored in oplog until cut off
#[test]
#[tracing::instrument]
#[timeout(120000)]
async fn fork_worker_no_divergence_until_fork_point(
async fn fork_worker_ensures_zero_divergence_until_cut_off(
deps: &EnvBasedTestDependencies,
_tracing: &Tracing,
) {
Expand Down

0 comments on commit b70a7c5

Please sign in to comment.