Skip to content

Commit

Permalink
Interrupt oplog entry (#290)
Browse files Browse the repository at this point in the history
* Interrupt oplog entry

* Do not write Suspend entry after invocations
  • Loading branch information
vigoo authored Mar 22, 2024
1 parent 8475f89 commit 8806ffc
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 103 deletions.
5 changes: 1 addition & 4 deletions golem-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ impl CliConfig {
}

pub trait Cli {
fn run<'a, T: DeserializeOwned, S: AsRef<OsStr> + Debug>(
&self,
args: &[S],
) -> Result<T, Failed>;
fn run<T: DeserializeOwned, S: AsRef<OsStr> + Debug>(&self, args: &[S]) -> Result<T, Failed>;
fn run_json<S: AsRef<OsStr> + Debug>(&self, args: &[S]) -> Result<Value, Failed>;
fn run_unit<S: AsRef<OsStr> + Debug>(&self, args: &[S]) -> Result<(), Failed>;
fn run_stdout<S: AsRef<OsStr> + Debug>(&self, args: &[S]) -> Result<Child, Failed>;
Expand Down
29 changes: 13 additions & 16 deletions golem-cli/tests/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,21 @@ fn start_shard(context: &mut Context) {
let mut rng = thread_rng();
ids.shuffle(&mut rng);

match ids.first() {
Some(id) => {
match WorkerExecutor::start(
context.docker,
*id,
&context.env,
&context.redis.info(),
&context.golem_worker_service.info(),
&context.golem_template_service.info(),
&context.shard_manager.as_ref().unwrap().info(),
) {
Ok(we) => context.worker_executors.worker_executors.push(we),
Err(e) => {
println!("Failed to start worker: {e:?}");
}
if let Some(id) = ids.first() {
match WorkerExecutor::start(
context.docker,
*id,
&context.env,
&context.redis.info(),
&context.golem_worker_service.info(),
&context.golem_template_service.info(),
&context.shard_manager.as_ref().unwrap().info(),
) {
Ok(we) => context.worker_executors.worker_executors.push(we),
Err(e) => {
println!("Failed to start worker: {e:?}");
}
}
None => {}
}
}

Expand Down
16 changes: 11 additions & 5 deletions golem-common/src/model/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ pub enum OplogEntry {
Suspend { timestamp: Timestamp },
/// Worker failed
Error { timestamp: Timestamp },
/// Debug log entry
Debug {
timestamp: Timestamp,
message: String,
},
/// Marker entry added when get-oplog-index is called from the worker, to make the jumping behavior
/// more predictable.
NoOp { timestamp: Timestamp },
Expand All @@ -62,6 +57,9 @@ pub enum OplogEntry {
timestamp: Timestamp,
jump: OplogRegion,
},
/// Indicates that the worker has been interrupted at this point.
/// Only used to recompute the worker's (cached) status, has no effect on execution.
Interrupted { timestamp: Timestamp },
}

impl OplogEntry {
Expand Down Expand Up @@ -202,6 +200,14 @@ impl OplogEntry {
pub fn nop(timestamp: Timestamp) -> OplogEntry {
OplogEntry::NoOp { timestamp }
}

/// True if the oplog entry is a "hint" that should be skipped during replay
pub fn is_hint(&self) -> bool {
matches!(
self,
OplogEntry::Suspend { .. } | OplogEntry::Error { .. } | OplogEntry::Interrupted { .. }
)
}
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
Expand Down
66 changes: 24 additions & 42 deletions golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use golem_common::model::oplog::{OplogEntry, WrappedFunctionType};
use golem_common::model::regions::{DeletedRegions, OplogRegion};
use golem_common::model::{
AccountId, CallingConvention, InvocationKey, PromiseId, Timestamp, VersionedWorkerId, WorkerId,
WorkerMetadata, WorkerStatus,
WorkerMetadata, WorkerStatus, WorkerStatusRecord,
};
use golem_wasm_rpc::wasmtime::ResourceStore;
use golem_wasm_rpc::{Uri, Value};
Expand Down Expand Up @@ -738,8 +738,9 @@ impl<Ctx: WorkerCtx> InvocationHooks for DurableWorkerCtx<Ctx> {
let is_interrupt = is_interrupt(error);
let is_suspend = is_suspend(error);
let is_jump = is_jump(error);
let is_failure = !is_interrupt && !is_suspend && !is_jump;

if is_live_after && !is_interrupt && !is_suspend && !is_jump {
if is_live_after && is_failure {
self.set_oplog_entry(OplogEntry::Error {
timestamp: Timestamp::now_utc(),
})
Expand All @@ -748,6 +749,16 @@ impl<Ctx: WorkerCtx> InvocationHooks for DurableWorkerCtx<Ctx> {
self.commit_oplog().await;
}

if is_live_after && is_interrupt {
// appending an Interrupted hint to the end of the oplog, which can be used to determine worker status
self.set_oplog_entry(OplogEntry::Interrupted {
timestamp: Timestamp::now_utc(),
})
.await;

self.commit_oplog().await;
}

Ok(())
}

Expand Down Expand Up @@ -812,10 +823,6 @@ impl<Ctx: WorkerCtx> InvocationHooks for DurableWorkerCtx<Ctx> {
});

self.set_oplog_entry(oplog_entry).await;
self.set_oplog_entry(OplogEntry::Suspend {
timestamp: Timestamp::now_utc(),
})
.await;
self.commit_oplog().await;
} else {
let response = self.get_oplog_entry_exported_function_completed().await?;
Expand Down Expand Up @@ -871,7 +878,8 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> ExternalOperations<Ctx> for Dur
worker_id: &WorkerId,
status: WorkerStatus,
) -> Result<(), GolemError> {
let latest_status = calculate_last_known_status(this, worker_id).await?;
let metadata = this.worker_service().get(worker_id).await;
let latest_status = calculate_last_known_status(this, worker_id, &metadata).await?;
this.worker_service()
.update_status(
worker_id,
Expand All @@ -890,26 +898,12 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> ExternalOperations<Ctx> for Dur
trailing_error_count(this, worker_id).await
}

async fn get_assumed_worker_status<T: HasAll<Ctx> + Send + Sync>(
async fn compute_latest_worker_status<T: HasAll<Ctx> + Send + Sync>(
this: &T,
worker_id: &WorkerId,
metadata: &Option<WorkerMetadata>,
) -> WorkerStatus {
let last_oplog_idx = last_oplog_idx(this, worker_id).await;
let worker_status = match metadata {
Some(metadata) => {
if metadata.last_known_status.oplog_idx == last_oplog_idx {
debug!("get_assumed_instance_status for {worker_id}: stored last oplog idx matches current one, using stored status");
metadata.last_known_status.status.clone()
} else {
debug!("get_assumed_instance_status for {worker_id}: stored last oplog idx ({}) does not match current one ({last_oplog_idx}), using stored status", metadata.last_known_status.oplog_idx);
WorkerStatus::Running
}
}
None => WorkerStatus::Idle,
};
debug!("get_assumed_instance_status for {worker_id}: assuming status {worker_status:?}");
worker_status
) -> Result<WorkerStatusRecord, GolemError> {
calculate_last_known_status(this, worker_id, metadata).await
}

async fn prepare_instance(
Expand Down Expand Up @@ -1024,10 +1018,6 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> ExternalOperations<Ctx> for Dur
}
}

async fn last_oplog_idx<T: HasOplogService>(this: &T, worker_id: &WorkerId) -> u64 {
this.oplog_service().get_size(worker_id).await
}

async fn trailing_error_count<T: HasOplogService>(this: &T, worker_id: &WorkerId) -> u64 {
let mut idx = this.oplog_service().get_size(worker_id).await;
let mut count = 0;
Expand Down Expand Up @@ -1154,9 +1144,7 @@ impl<Ctx: WorkerCtx> PrivateDurableWorkerState<Ctx> {
OplogEntry::NoOp { .. } => {
break Ok(());
}
OplogEntry::Suspend { .. } => (),
OplogEntry::Error { .. } => (),
OplogEntry::Debug { message, .. } => debug!("Debug: {}", message),
entry if entry.is_hint() => {}
_ => {
break Err(GolemError::unexpected_oplog_entry(
"NoOp",
Expand Down Expand Up @@ -1185,9 +1173,7 @@ impl<Ctx: WorkerCtx> PrivateDurableWorkerState<Ctx> {
})
.unwrap());
}
OplogEntry::Suspend { .. } => (),
OplogEntry::Error { .. } => (),
OplogEntry::Debug { message, .. } => debug!("Debug: {}", message),
entry if entry.is_hint() => {}
_ => {
break Err(GolemError::unexpected_oplog_entry(
"ImportedFunctionInvoked",
Expand Down Expand Up @@ -1237,9 +1223,7 @@ impl<Ctx: WorkerCtx> PrivateDurableWorkerState<Ctx> {
calling_convention.clone(),
)));
}
OplogEntry::Suspend { .. } => (),
OplogEntry::Error { .. } => (),
OplogEntry::Debug { message, .. } => debug!("Debug: {}", message),
entry if entry.is_hint() => {}
_ => {
break Err(GolemError::unexpected_oplog_entry(
"ExportedFunctionInvoked",
Expand Down Expand Up @@ -1274,8 +1258,7 @@ impl<Ctx: WorkerCtx> PrivateDurableWorkerState<Ctx> {
.collect();
break Ok(Some(response));
}
OplogEntry::Suspend { .. } => (),
OplogEntry::Debug { message, .. } => debug!("Debug: {}", message),
entry if entry.is_hint() => {}
_ => {
break Err(GolemError::unexpected_oplog_entry(
"ExportedFunctionCompleted",
Expand All @@ -1289,16 +1272,15 @@ impl<Ctx: WorkerCtx> PrivateDurableWorkerState<Ctx> {
}
}

/// Consumes Suspend and Error entries which are hints for the server to decide whether to
/// Consumes Suspend, Error and Interrupt entries which are hints for the server to decide whether to
/// keep workers in memory or allow them to rerun etc., but contain no actionable information
/// during replay
async fn consume_hint_entries(&mut self) {
loop {
if self.is_replay() {
let oplog_entry = self.get_oplog_entry().await;
match oplog_entry {
OplogEntry::Suspend { .. } => (),
OplogEntry::Error { .. } => (),
entry if entry.is_hint() => {}
_ => {
self.oplog_idx -= 1;
break;
Expand Down
28 changes: 20 additions & 8 deletions golem-worker-executor-base/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
worker_id: &common_model::WorkerId,
metadata: &Option<WorkerMetadata>,
) -> Result<WorkerStatus, GolemError> {
let worker_status = Ctx::get_assumed_worker_status(self, worker_id, metadata).await;
let worker_status = Ctx::compute_latest_worker_status(self, worker_id, metadata)
.await?
.status;

match worker_status {
WorkerStatus::Failed => Err(GolemError::PreviousInvocationFailed),
Expand Down Expand Up @@ -270,7 +272,9 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
.ok_or(GolemError::worker_not_found(worker_id.clone()))?;

let worker_status =
Ctx::get_assumed_worker_status(self, &worker_id, &Some(metadata.clone())).await;
Ctx::compute_latest_worker_status(self, &worker_id, &Some(metadata.clone()))
.await?
.status;
let should_activate = match worker_status {
WorkerStatus::Interrupted
| WorkerStatus::Running
Expand Down Expand Up @@ -307,8 +311,9 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
self.validate_worker_id(&worker_id)?;

let metadata = self.worker_service().get(&worker_id).await;

let worker_status = Ctx::get_assumed_worker_status(self, &worker_id, &metadata).await;
let worker_status = Ctx::compute_latest_worker_status(self, &worker_id, &metadata)
.await?
.status;
let metadata = metadata.ok_or(GolemError::invalid_request("Worker not found"))?;

let should_interrupt = match worker_status {
Expand Down Expand Up @@ -374,7 +379,9 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
let worker_id = common_model::WorkerId::from_proto(worker_id);

let metadata = self.worker_service().get(&worker_id).await;
let worker_status = Ctx::get_assumed_worker_status(self, &worker_id, &metadata).await;
let worker_status = Ctx::compute_latest_worker_status(self, &worker_id, &metadata)
.await?
.status;

if metadata.is_none() {
// Worker does not exist, we still check if it is in the list active workers due to some inconsistency
Expand Down Expand Up @@ -452,7 +459,9 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
let metadata = self.worker_service().get(&worker_id).await;
self.validate_worker_status(&worker_id, &metadata).await?;

let worker_status = Ctx::get_assumed_worker_status(self, &worker_id, &metadata).await;
let worker_status = Ctx::compute_latest_worker_status(self, &worker_id, &metadata)
.await?
.status;

match worker_status {
WorkerStatus::Suspended | WorkerStatus::Interrupted => {
Expand Down Expand Up @@ -624,6 +633,10 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
.await
.ok_or(GolemError::worker_not_found(worker_id.clone()))?;

let latest_status =
Ctx::compute_latest_worker_status(self, &worker_id, &Some(metadata.clone()))
.await?
.status;
let retry_count = Ctx::get_worker_retry_count(self, &worker_id).await as i32;

Ok(golem::worker::WorkerMetadata {
Expand All @@ -632,8 +645,7 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
env: HashMap::from_iter(metadata.env.iter().cloned()),
account_id: Some(metadata.account_id.into()),
template_version: metadata.worker_id.template_version,
status: Into::<golem::worker::WorkerStatus>::into(metadata.last_known_status.status)
.into(),
status: Into::<golem::worker::WorkerStatus>::into(latest_status).into(),
retry_count,
})
}
Expand Down
12 changes: 7 additions & 5 deletions golem-worker-executor-base/src/services/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,10 @@ impl<Ctx: WorkerCtx> RecoveryManagementDefault<Ctx> {

async fn is_marked_as_interrupted(&self, worker_id: &WorkerId) -> bool {
let worker_metadata = self.worker_service().get(worker_id).await;
let worker_status = Ctx::get_assumed_worker_status(self, worker_id, &worker_metadata).await;
worker_status == WorkerStatus::Interrupted
Ctx::compute_latest_worker_status(self, worker_id, &worker_metadata)
.await
.map(|s| s.status == WorkerStatus::Interrupted)
.unwrap_or(false)
}
}

Expand Down Expand Up @@ -522,7 +524,7 @@ mod tests {
use bytes::Bytes;
use golem_common::model::{
AccountId, CallingConvention, InvocationKey, TemplateId, VersionedWorkerId, WorkerId,
WorkerMetadata, WorkerStatus,
WorkerMetadata, WorkerStatus, WorkerStatusRecord,
};
use golem_wasm_rpc::wasmtime::ResourceStore;
use golem_wasm_rpc::{Uri, Value};
Expand Down Expand Up @@ -693,11 +695,11 @@ mod tests {
unimplemented!()
}

async fn get_assumed_worker_status<T: HasAll<Self> + Send + Sync>(
async fn compute_latest_worker_status<T: HasAll<Self> + Send + Sync>(
_this: &T,
_worker_id: &WorkerId,
_metadata: &Option<WorkerMetadata>,
) -> WorkerStatus {
) -> Result<WorkerStatusRecord, GolemError> {
unimplemented!()
}

Expand Down
Loading

0 comments on commit 8806ffc

Please sign in to comment.