From 8806ffcb9601cb03129fe5771e576fc9b2e4a7dd Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Fri, 22 Mar 2024 14:23:24 +0100 Subject: [PATCH] Interrupt oplog entry (#290) * Interrupt oplog entry * Do not write Suspend entry after invocations --- golem-cli/tests/cli.rs | 5 +- golem-cli/tests/sharding.rs | 29 ++++---- golem-common/src/model/oplog.rs | 16 +++-- .../src/durable_host/mod.rs | 66 +++++++------------ golem-worker-executor-base/src/grpc.rs | 28 +++++--- .../src/services/recovery.rs | 12 ++-- golem-worker-executor-base/src/worker.rs | 53 ++++++++++++--- golem-worker-executor-base/src/workerctx.rs | 7 +- golem-worker-executor-base/tests/api.rs | 55 +++++++++++++++- .../tests/common/mod.rs | 6 +- golem-worker-executor-base/tests/lib.rs | 11 +++- golem-worker-executor/src/context.rs | 8 +-- 12 files changed, 193 insertions(+), 103 deletions(-) diff --git a/golem-cli/tests/cli.rs b/golem-cli/tests/cli.rs index 17c755ccd..baa53682c 100644 --- a/golem-cli/tests/cli.rs +++ b/golem-cli/tests/cli.rs @@ -24,10 +24,7 @@ impl CliConfig { } pub trait Cli { - fn run<'a, T: DeserializeOwned, S: AsRef + Debug>( - &self, - args: &[S], - ) -> Result; + fn run + Debug>(&self, args: &[S]) -> Result; fn run_json + Debug>(&self, args: &[S]) -> Result; fn run_unit + Debug>(&self, args: &[S]) -> Result<(), Failed>; fn run_stdout + Debug>(&self, args: &[S]) -> Result; diff --git a/golem-cli/tests/sharding.rs b/golem-cli/tests/sharding.rs index 1cdfaf77c..2de6806f0 100644 --- a/golem-cli/tests/sharding.rs +++ b/golem-cli/tests/sharding.rs @@ -87,24 +87,21 @@ fn start_shard(context: &mut Context) { let mut rng = thread_rng(); ids.shuffle(&mut rng); - match ids.first() { - Some(id) => { - match WorkerExecutor::start( - context.docker, - *id, - &context.env, - &context.redis.info(), - &context.golem_worker_service.info(), - &context.golem_template_service.info(), - &context.shard_manager.as_ref().unwrap().info(), - ) { - Ok(we) => context.worker_executors.worker_executors.push(we), - Err(e) => { - println!("Failed to start worker: {e:?}"); - } + if let Some(id) = ids.first() { + match WorkerExecutor::start( + context.docker, + *id, + &context.env, + &context.redis.info(), + &context.golem_worker_service.info(), + &context.golem_template_service.info(), + &context.shard_manager.as_ref().unwrap().info(), + ) { + Ok(we) => context.worker_executors.worker_executors.push(we), + Err(e) => { + println!("Failed to start worker: {e:?}"); } } - None => {} } } diff --git a/golem-common/src/model/oplog.rs b/golem-common/src/model/oplog.rs index bd4111548..b7049338e 100644 --- a/golem-common/src/model/oplog.rs +++ b/golem-common/src/model/oplog.rs @@ -46,11 +46,6 @@ pub enum OplogEntry { Suspend { timestamp: Timestamp }, /// Worker failed Error { timestamp: Timestamp }, - /// Debug log entry - Debug { - timestamp: Timestamp, - message: String, - }, /// Marker entry added when get-oplog-index is called from the worker, to make the jumping behavior /// more predictable. NoOp { timestamp: Timestamp }, @@ -62,6 +57,9 @@ pub enum OplogEntry { timestamp: Timestamp, jump: OplogRegion, }, + /// Indicates that the worker has been interrupted at this point. + /// Only used to recompute the worker's (cached) status, has no effect on execution. + Interrupted { timestamp: Timestamp }, } impl OplogEntry { @@ -202,6 +200,14 @@ impl OplogEntry { pub fn nop(timestamp: Timestamp) -> OplogEntry { OplogEntry::NoOp { timestamp } } + + /// True if the oplog entry is a "hint" that should be skipped during replay + pub fn is_hint(&self) -> bool { + matches!( + self, + OplogEntry::Suspend { .. } | OplogEntry::Error { .. } | OplogEntry::Interrupted { .. } + ) + } } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] diff --git a/golem-worker-executor-base/src/durable_host/mod.rs b/golem-worker-executor-base/src/durable_host/mod.rs index 4ebc5b105..468c247d8 100644 --- a/golem-worker-executor-base/src/durable_host/mod.rs +++ b/golem-worker-executor-base/src/durable_host/mod.rs @@ -50,7 +50,7 @@ use golem_common::model::oplog::{OplogEntry, WrappedFunctionType}; use golem_common::model::regions::{DeletedRegions, OplogRegion}; use golem_common::model::{ AccountId, CallingConvention, InvocationKey, PromiseId, Timestamp, VersionedWorkerId, WorkerId, - WorkerMetadata, WorkerStatus, + WorkerMetadata, WorkerStatus, WorkerStatusRecord, }; use golem_wasm_rpc::wasmtime::ResourceStore; use golem_wasm_rpc::{Uri, Value}; @@ -738,8 +738,9 @@ impl InvocationHooks for DurableWorkerCtx { let is_interrupt = is_interrupt(error); let is_suspend = is_suspend(error); let is_jump = is_jump(error); + let is_failure = !is_interrupt && !is_suspend && !is_jump; - if is_live_after && !is_interrupt && !is_suspend && !is_jump { + if is_live_after && is_failure { self.set_oplog_entry(OplogEntry::Error { timestamp: Timestamp::now_utc(), }) @@ -748,6 +749,16 @@ impl InvocationHooks for DurableWorkerCtx { self.commit_oplog().await; } + if is_live_after && is_interrupt { + // appending an Interrupted hint to the end of the oplog, which can be used to determine worker status + self.set_oplog_entry(OplogEntry::Interrupted { + timestamp: Timestamp::now_utc(), + }) + .await; + + self.commit_oplog().await; + } + Ok(()) } @@ -812,10 +823,6 @@ impl InvocationHooks for DurableWorkerCtx { }); self.set_oplog_entry(oplog_entry).await; - self.set_oplog_entry(OplogEntry::Suspend { - timestamp: Timestamp::now_utc(), - }) - .await; self.commit_oplog().await; } else { let response = self.get_oplog_entry_exported_function_completed().await?; @@ -871,7 +878,8 @@ impl> ExternalOperations for Dur worker_id: &WorkerId, status: WorkerStatus, ) -> Result<(), GolemError> { - let latest_status = calculate_last_known_status(this, worker_id).await?; + let metadata = this.worker_service().get(worker_id).await; + let latest_status = calculate_last_known_status(this, worker_id, &metadata).await?; this.worker_service() .update_status( worker_id, @@ -890,26 +898,12 @@ impl> ExternalOperations for Dur trailing_error_count(this, worker_id).await } - async fn get_assumed_worker_status + Send + Sync>( + async fn compute_latest_worker_status + Send + Sync>( this: &T, worker_id: &WorkerId, metadata: &Option, - ) -> WorkerStatus { - let last_oplog_idx = last_oplog_idx(this, worker_id).await; - let worker_status = match metadata { - Some(metadata) => { - if metadata.last_known_status.oplog_idx == last_oplog_idx { - debug!("get_assumed_instance_status for {worker_id}: stored last oplog idx matches current one, using stored status"); - metadata.last_known_status.status.clone() - } else { - debug!("get_assumed_instance_status for {worker_id}: stored last oplog idx ({}) does not match current one ({last_oplog_idx}), using stored status", metadata.last_known_status.oplog_idx); - WorkerStatus::Running - } - } - None => WorkerStatus::Idle, - }; - debug!("get_assumed_instance_status for {worker_id}: assuming status {worker_status:?}"); - worker_status + ) -> Result { + calculate_last_known_status(this, worker_id, metadata).await } async fn prepare_instance( @@ -1024,10 +1018,6 @@ impl> ExternalOperations for Dur } } -async fn last_oplog_idx(this: &T, worker_id: &WorkerId) -> u64 { - this.oplog_service().get_size(worker_id).await -} - async fn trailing_error_count(this: &T, worker_id: &WorkerId) -> u64 { let mut idx = this.oplog_service().get_size(worker_id).await; let mut count = 0; @@ -1154,9 +1144,7 @@ impl PrivateDurableWorkerState { OplogEntry::NoOp { .. } => { break Ok(()); } - OplogEntry::Suspend { .. } => (), - OplogEntry::Error { .. } => (), - OplogEntry::Debug { message, .. } => debug!("Debug: {}", message), + entry if entry.is_hint() => {} _ => { break Err(GolemError::unexpected_oplog_entry( "NoOp", @@ -1185,9 +1173,7 @@ impl PrivateDurableWorkerState { }) .unwrap()); } - OplogEntry::Suspend { .. } => (), - OplogEntry::Error { .. } => (), - OplogEntry::Debug { message, .. } => debug!("Debug: {}", message), + entry if entry.is_hint() => {} _ => { break Err(GolemError::unexpected_oplog_entry( "ImportedFunctionInvoked", @@ -1237,9 +1223,7 @@ impl PrivateDurableWorkerState { calling_convention.clone(), ))); } - OplogEntry::Suspend { .. } => (), - OplogEntry::Error { .. } => (), - OplogEntry::Debug { message, .. } => debug!("Debug: {}", message), + entry if entry.is_hint() => {} _ => { break Err(GolemError::unexpected_oplog_entry( "ExportedFunctionInvoked", @@ -1274,8 +1258,7 @@ impl PrivateDurableWorkerState { .collect(); break Ok(Some(response)); } - OplogEntry::Suspend { .. } => (), - OplogEntry::Debug { message, .. } => debug!("Debug: {}", message), + entry if entry.is_hint() => {} _ => { break Err(GolemError::unexpected_oplog_entry( "ExportedFunctionCompleted", @@ -1289,7 +1272,7 @@ impl PrivateDurableWorkerState { } } - /// Consumes Suspend and Error entries which are hints for the server to decide whether to + /// Consumes Suspend, Error and Interrupt entries which are hints for the server to decide whether to /// keep workers in memory or allow them to rerun etc., but contain no actionable information /// during replay async fn consume_hint_entries(&mut self) { @@ -1297,8 +1280,7 @@ impl PrivateDurableWorkerState { if self.is_replay() { let oplog_entry = self.get_oplog_entry().await; match oplog_entry { - OplogEntry::Suspend { .. } => (), - OplogEntry::Error { .. } => (), + entry if entry.is_hint() => {} _ => { self.oplog_idx -= 1; break; diff --git a/golem-worker-executor-base/src/grpc.rs b/golem-worker-executor-base/src/grpc.rs index 4a621839e..bbe1f9448 100644 --- a/golem-worker-executor-base/src/grpc.rs +++ b/golem-worker-executor-base/src/grpc.rs @@ -172,7 +172,9 @@ impl + UsesAllDeps + Send + Sync + worker_id: &common_model::WorkerId, metadata: &Option, ) -> Result { - let worker_status = Ctx::get_assumed_worker_status(self, worker_id, metadata).await; + let worker_status = Ctx::compute_latest_worker_status(self, worker_id, metadata) + .await? + .status; match worker_status { WorkerStatus::Failed => Err(GolemError::PreviousInvocationFailed), @@ -270,7 +272,9 @@ impl + UsesAllDeps + Send + Sync + .ok_or(GolemError::worker_not_found(worker_id.clone()))?; let worker_status = - Ctx::get_assumed_worker_status(self, &worker_id, &Some(metadata.clone())).await; + Ctx::compute_latest_worker_status(self, &worker_id, &Some(metadata.clone())) + .await? + .status; let should_activate = match worker_status { WorkerStatus::Interrupted | WorkerStatus::Running @@ -307,8 +311,9 @@ impl + UsesAllDeps + Send + Sync + self.validate_worker_id(&worker_id)?; let metadata = self.worker_service().get(&worker_id).await; - - let worker_status = Ctx::get_assumed_worker_status(self, &worker_id, &metadata).await; + let worker_status = Ctx::compute_latest_worker_status(self, &worker_id, &metadata) + .await? + .status; let metadata = metadata.ok_or(GolemError::invalid_request("Worker not found"))?; let should_interrupt = match worker_status { @@ -374,7 +379,9 @@ impl + UsesAllDeps + Send + Sync + let worker_id = common_model::WorkerId::from_proto(worker_id); let metadata = self.worker_service().get(&worker_id).await; - let worker_status = Ctx::get_assumed_worker_status(self, &worker_id, &metadata).await; + let worker_status = Ctx::compute_latest_worker_status(self, &worker_id, &metadata) + .await? + .status; if metadata.is_none() { // Worker does not exist, we still check if it is in the list active workers due to some inconsistency @@ -452,7 +459,9 @@ impl + UsesAllDeps + Send + Sync + let metadata = self.worker_service().get(&worker_id).await; self.validate_worker_status(&worker_id, &metadata).await?; - let worker_status = Ctx::get_assumed_worker_status(self, &worker_id, &metadata).await; + let worker_status = Ctx::compute_latest_worker_status(self, &worker_id, &metadata) + .await? + .status; match worker_status { WorkerStatus::Suspended | WorkerStatus::Interrupted => { @@ -624,6 +633,10 @@ impl + UsesAllDeps + Send + Sync + .await .ok_or(GolemError::worker_not_found(worker_id.clone()))?; + let latest_status = + Ctx::compute_latest_worker_status(self, &worker_id, &Some(metadata.clone())) + .await? + .status; let retry_count = Ctx::get_worker_retry_count(self, &worker_id).await as i32; Ok(golem::worker::WorkerMetadata { @@ -632,8 +645,7 @@ impl + UsesAllDeps + Send + Sync + env: HashMap::from_iter(metadata.env.iter().cloned()), account_id: Some(metadata.account_id.into()), template_version: metadata.worker_id.template_version, - status: Into::::into(metadata.last_known_status.status) - .into(), + status: Into::::into(latest_status).into(), retry_count, }) } diff --git a/golem-worker-executor-base/src/services/recovery.rs b/golem-worker-executor-base/src/services/recovery.rs index 014c79958..04ab6acf3 100644 --- a/golem-worker-executor-base/src/services/recovery.rs +++ b/golem-worker-executor-base/src/services/recovery.rs @@ -392,8 +392,10 @@ impl RecoveryManagementDefault { async fn is_marked_as_interrupted(&self, worker_id: &WorkerId) -> bool { let worker_metadata = self.worker_service().get(worker_id).await; - let worker_status = Ctx::get_assumed_worker_status(self, worker_id, &worker_metadata).await; - worker_status == WorkerStatus::Interrupted + Ctx::compute_latest_worker_status(self, worker_id, &worker_metadata) + .await + .map(|s| s.status == WorkerStatus::Interrupted) + .unwrap_or(false) } } @@ -522,7 +524,7 @@ mod tests { use bytes::Bytes; use golem_common::model::{ AccountId, CallingConvention, InvocationKey, TemplateId, VersionedWorkerId, WorkerId, - WorkerMetadata, WorkerStatus, + WorkerMetadata, WorkerStatus, WorkerStatusRecord, }; use golem_wasm_rpc::wasmtime::ResourceStore; use golem_wasm_rpc::{Uri, Value}; @@ -693,11 +695,11 @@ mod tests { unimplemented!() } - async fn get_assumed_worker_status + Send + Sync>( + async fn compute_latest_worker_status + Send + Sync>( _this: &T, _worker_id: &WorkerId, _metadata: &Option, - ) -> WorkerStatus { + ) -> Result { unimplemented!() } diff --git a/golem-worker-executor-base/src/worker.rs b/golem-worker-executor-base/src/worker.rs index 8587f30e2..9426022d1 100644 --- a/golem-worker-executor-base/src/worker.rs +++ b/golem-worker-executor-base/src/worker.rs @@ -111,6 +111,7 @@ impl Worker { template_version, }; + let previous_metadata = this.worker_service().get(&worker_id).await; let worker_metadata = WorkerMetadata { worker_id: versioned_worker_id.clone(), args: worker_args.clone(), @@ -119,6 +120,7 @@ impl Worker { last_known_status: calculate_last_known_status( this, &versioned_worker_id.worker_id, + &previous_metadata, ) .await?, }; @@ -656,14 +658,13 @@ where pub async fn calculate_last_known_status( this: &T, worker_id: &WorkerId, + metadata: &Option, ) -> Result where T: HasOplogService + HasWorkerService, { - let last_known = this - .worker_service() - .get(worker_id) - .await + let last_known = metadata + .as_ref() .map(|metadata| metadata.last_known_status.clone()) .unwrap_or_default(); @@ -691,12 +692,44 @@ where } } -fn calculate_latest_worker_status( - _initial: &WorkerStatus, - _entries: &[OplogEntry], -) -> WorkerStatus { - // TODO: this is currently not 100% accurate because we don't have an Interrupted oplog entry, see https://github.com/golemcloud/golem/issues/239 - WorkerStatus::Running +fn calculate_latest_worker_status(initial: &WorkerStatus, entries: &[OplogEntry]) -> WorkerStatus { + let mut result = initial.clone(); + for entry in entries { + match entry { + OplogEntry::ImportedFunctionInvoked { .. } => { + result = WorkerStatus::Running; + } + OplogEntry::ExportedFunctionInvoked { .. } => { + result = WorkerStatus::Running; + } + OplogEntry::ExportedFunctionCompleted { .. } => { + result = WorkerStatus::Idle; + } + OplogEntry::CreatePromise { .. } => { + result = WorkerStatus::Running; + } + OplogEntry::CompletePromise { .. } => { + result = WorkerStatus::Running; + } + OplogEntry::Suspend { .. } => { + result = WorkerStatus::Suspended; + } + OplogEntry::Error { .. } => { + // TODO: currently we cannot compute Failed and Exit statuses + result = WorkerStatus::Retrying; + } + OplogEntry::NoOp { .. } => { + result = WorkerStatus::Running; + } + OplogEntry::Jump { .. } => { + result = WorkerStatus::Running; + } + OplogEntry::Interrupted { .. } => { + result = WorkerStatus::Interrupted; + } + } + } + result } fn calculate_deleted_regions(initial: DeletedRegions, entries: &[OplogEntry]) -> DeletedRegions { diff --git a/golem-worker-executor-base/src/workerctx.rs b/golem-worker-executor-base/src/workerctx.rs index c0d028c13..a05442ea3 100644 --- a/golem-worker-executor-base/src/workerctx.rs +++ b/golem-worker-executor-base/src/workerctx.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use bytes::Bytes; use golem_common::model::{ AccountId, CallingConvention, InvocationKey, VersionedWorkerId, WorkerId, WorkerMetadata, - WorkerStatus, + WorkerStatus, WorkerStatusRecord, }; use golem_wasm_rpc::wasmtime::ResourceStore; use golem_wasm_rpc::Value; @@ -288,6 +288,7 @@ pub trait ExternalOperations { status: WorkerStatus, ) -> Result<(), GolemError>; + // TODO: move this to WorkerStatusRecord /// Gets how many times the worker has been retried to recover from an error. async fn get_worker_retry_count + Send + Sync>( this: &T, @@ -295,11 +296,11 @@ pub trait ExternalOperations { ) -> u64; /// Gets a best-effort current worker status without activating the worker - async fn get_assumed_worker_status + Send + Sync>( + async fn compute_latest_worker_status + Send + Sync>( this: &T, worker_id: &WorkerId, metadata: &Option, - ) -> WorkerStatus; + ) -> Result; /// Prepares a wasmtime instance after it has been created, but before it can be invoked. /// This can be used to restore the previous state of the worker but by general it can be no-op. diff --git a/golem-worker-executor-base/tests/api.rs b/golem-worker-executor-base/tests/api.rs index d740cb253..7999bdc8f 100644 --- a/golem-worker-executor-base/tests/api.rs +++ b/golem-worker-executor-base/tests/api.rs @@ -1,4 +1,4 @@ -use crate::common; +use crate::{common, REDIS}; use std::collections::HashMap; use std::env; use std::io::Write; @@ -10,6 +10,7 @@ use std::time::Duration; use assert2::check; use http_02::{Response, StatusCode}; +use redis::Commands; use golem_api_grpc::proto::golem::worker::{worker_execution_error, LogEvent, TemplateParseFailed}; use golem_api_grpc::proto::golem::workerexecutor::CompletePromiseRequest; @@ -1634,3 +1635,55 @@ async fn counter_resource_test_1() { )])]) ); } + +#[tokio::test] +async fn reconstruct_interrupted_state() { + let context = common::TestContext::new(); + let mut executor = common::start(&context).await.unwrap(); + + let template_id = executor.store_template(Path::new("../test-templates/interruption.wasm")); + let worker_id = executor.start_worker(&template_id, "interruption-1").await; + + let mut executor_clone = executor.async_clone().await; + let worker_id_clone = worker_id.clone(); + let fiber = tokio::spawn(async move { + executor_clone + .invoke_and_await(&worker_id_clone, "run", vec![]) + .await + }); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let _ = executor.interrupt(&worker_id).await; + let result = fiber.await.unwrap(); + + // Explicitly deleting the status information from Redis to check if it can be + // reconstructed from Redis + + let mut redis = REDIS.get_connection(); + let _: () = redis + .del(format!( + "{}instance:status:{}", + context.redis_prefix(), + worker_id.to_redis_key() + )) + .unwrap(); + debug!("Deleted status information from Redis"); + + let status = executor + .get_worker_metadata(&worker_id) + .await + .unwrap() + .last_known_status + .status; + + drop(executor); + + check!(result.is_err()); + check!(result + .err() + .unwrap() + .to_string() + .contains("Interrupted via the Golem API")); + check!(status == WorkerStatus::Interrupted); +} diff --git a/golem-worker-executor-base/tests/common/mod.rs b/golem-worker-executor-base/tests/common/mod.rs index f232fad19..882176c8a 100644 --- a/golem-worker-executor-base/tests/common/mod.rs +++ b/golem-worker-executor-base/tests/common/mod.rs @@ -1084,12 +1084,12 @@ impl ExternalOperations for TestWorkerCtx { DurableWorkerCtx::::get_worker_retry_count(this, worker_id).await } - async fn get_assumed_worker_status + Send + Sync>( + async fn compute_latest_worker_status + Send + Sync>( this: &T, worker_id: &WorkerId, metadata: &Option, - ) -> WorkerStatus { - DurableWorkerCtx::::get_assumed_worker_status(this, worker_id, metadata) + ) -> Result { + DurableWorkerCtx::::compute_latest_worker_status(this, worker_id, metadata) .await } diff --git a/golem-worker-executor-base/tests/lib.rs b/golem-worker-executor-base/tests/lib.rs index 50d44d136..24bffea62 100644 --- a/golem-worker-executor-base/tests/lib.rs +++ b/golem-worker-executor-base/tests/lib.rs @@ -33,6 +33,7 @@ struct Redis { impl Redis { pub fn new() -> Self { + let host = "localhost".to_string(); let port = 6379; let child = Command::new("redis-server") .arg("--port") @@ -45,7 +46,7 @@ impl Redis { .expect("Failed to spawn redis server"); let start = Instant::now(); - let mut client = redis::Client::open("redis://localhost:6379").unwrap(); + let mut client = redis::Client::open(format!("redis://{host}:{port}")).unwrap(); loop { let result: RedisResult> = client.keys("*"); if result.is_ok() { @@ -58,7 +59,7 @@ impl Redis { } Self { - host: "localhost".to_string(), + host, port, child: Some(child), valid: AtomicBool::new(true), @@ -77,6 +78,12 @@ impl Redis { let _ = child.kill(); } } + + pub fn get_connection(&self) -> redis::Connection { + self.assert_valid(); + let client = redis::Client::open(format!("redis://{}:{}", self.host, self.port)).unwrap(); + client.get_connection().unwrap() + } } impl Drop for Redis { diff --git a/golem-worker-executor/src/context.rs b/golem-worker-executor/src/context.rs index a8aabbe34..ff57c8409 100644 --- a/golem-worker-executor/src/context.rs +++ b/golem-worker-executor/src/context.rs @@ -20,7 +20,7 @@ use anyhow::Error; use async_trait::async_trait; use golem_common::model::{ AccountId, CallingConvention, InvocationKey, VersionedWorkerId, WorkerId, WorkerMetadata, - WorkerStatus, + WorkerStatus, WorkerStatusRecord, }; use golem_wasm_rpc::wasmtime::ResourceStore; use golem_wasm_rpc::{Uri, Value}; @@ -101,12 +101,12 @@ impl ExternalOperations for Context { DurableWorkerCtx::::get_worker_retry_count(this, worker_id).await } - async fn get_assumed_worker_status + Send + Sync>( + async fn compute_latest_worker_status + Send + Sync>( this: &T, worker_id: &WorkerId, metadata: &Option, - ) -> WorkerStatus { - DurableWorkerCtx::::get_assumed_worker_status(this, worker_id, metadata).await + ) -> Result { + DurableWorkerCtx::::compute_latest_worker_status(this, worker_id, metadata).await } async fn prepare_instance(