Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of atomic operation regions #328

Merged
merged 10 commits into from
Mar 29, 2024
10 changes: 10 additions & 0 deletions golem-common/src/model/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ pub enum OplogEntry {
timestamp: Timestamp,
new_policy: RetryConfig,
},
/// Begins an atomic region. All oplog entries after `BeginAtomicRegion` are to be ignored during
/// recovery except if there is a corresponding `EndAtomicRegion` entry.
BeginAtomicRegion { timestamp: Timestamp },
/// Ends an atomic region. All oplog entries between the corresponding `BeginAtomicRegion` and this
/// entry are to be considered during recovery, and the begin/end markers can be removed during oplog
/// compaction.
EndAtomicRegion {
timestamp: Timestamp,
begin_index: u64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

},
}

impl OplogEntry {
Expand Down
32 changes: 16 additions & 16 deletions golem-worker-executor-base/src/durable_host/blobstore/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
end: u64,
) -> anyhow::Result<Result<Resource<IncomingValue>, Error>> {
record_host_function_call("blobstore::container::container", "get_data");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let container_name = self
.as_wasi_view()
.table()
Expand All @@ -79,7 +79,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
WrappedFunctionType::ReadRemote,
"golem blobstore::container::get_data",
|ctx| {
ctx.private_state.blob_store_service.get_data(
ctx.state.blob_store_service.get_data(
account_id.clone(),
container_name.clone(),
name.clone(),
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
data: Resource<OutgoingValue>,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::container::container", "write_data");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let container_name = self
.as_wasi_view()
.table()
Expand All @@ -124,7 +124,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
WrappedFunctionType::WriteRemote,
"golem blobstore::container::write_data",
|ctx| {
ctx.private_state.blob_store_service.write_data(
ctx.state.blob_store_service.write_data(
account_id.clone(),
container_name.clone(),
name.clone(),
Expand All @@ -144,7 +144,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
container: Resource<Container>,
) -> anyhow::Result<Result<Resource<StreamObjectNames>, Error>> {
record_host_function_call("blobstore::container::container", "list_objects");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let container_name = self
.as_wasi_view()
.table()
Expand All @@ -155,7 +155,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
WrappedFunctionType::ReadRemote,
"golem blobstore::container::list_objects",
|ctx| {
ctx.private_state
ctx.state
.blob_store_service
.list_objects(account_id.clone(), container_name.clone())
},
Expand All @@ -179,7 +179,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
name: ObjectName,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::container::container", "delete_object");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let container_name = self
.as_wasi_view()
.table()
Expand All @@ -190,7 +190,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
WrappedFunctionType::WriteRemote,
"golem blobstore::container::delete_object",
|ctx| {
ctx.private_state.blob_store_service.delete_object(
ctx.state.blob_store_service.delete_object(
account_id.clone(),
container_name.clone(),
name.clone(),
Expand All @@ -210,7 +210,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
names: Vec<ObjectName>,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::container::container", "delete_objects");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let container_name = self
.as_wasi_view()
.table()
Expand All @@ -221,7 +221,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
WrappedFunctionType::WriteRemote,
"golem blobstore::container::delete_objects",
|ctx| {
ctx.private_state.blob_store_service.delete_objects(
ctx.state.blob_store_service.delete_objects(
account_id.clone(),
container_name.clone(),
names.clone(),
Expand All @@ -241,7 +241,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
name: ObjectName,
) -> anyhow::Result<Result<bool, Error>> {
record_host_function_call("blobstore::container::container", "has_object");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let container_name = self
.as_wasi_view()
.table()
Expand All @@ -252,7 +252,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
WrappedFunctionType::ReadRemote,
"golem blobstore::container::has_object",
|ctx| {
ctx.private_state.blob_store_service.has_object(
ctx.state.blob_store_service.has_object(
account_id.clone(),
container_name.clone(),
name.clone(),
Expand All @@ -272,7 +272,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
name: ObjectName,
) -> anyhow::Result<Result<ObjectMetadata, Error>> {
record_host_function_call("blobstore::container::container", "object_info");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let container_name = self
.as_wasi_view()
.table()
Expand All @@ -287,7 +287,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
WrappedFunctionType::ReadRemote,
"golem blobstore::container::object_info",
|ctx| {
ctx.private_state.blob_store_service.object_info(
ctx.state.blob_store_service.object_info(
account_id.clone(),
container_name.clone(),
name.clone(),
Expand All @@ -311,7 +311,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {

async fn clear(&mut self, container: Resource<Container>) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::container::container", "clear");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let container_name = self
.as_wasi_view()
.table()
Expand All @@ -322,7 +322,7 @@ impl<Ctx: WorkerCtx> HostContainer for DurableWorkerCtx<Ctx> {
WrappedFunctionType::WriteRemote,
"golem blobstore::container::clear",
|ctx| {
ctx.private_state
ctx.state
.blob_store_service
.clear(account_id.clone(), container_name.clone())
},
Expand Down
24 changes: 12 additions & 12 deletions golem-worker-executor-base/src/durable_host/blobstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
name: ContainerName,
) -> anyhow::Result<Result<Resource<Container>, Error>> {
record_host_function_call("blobstore::blobstore", "create_container");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let result = Durability::<Ctx, u64, SerializableError>::wrap(
self,
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::create_container",
|ctx| {
ctx.private_state
ctx.state
.blob_store_service
.create_container(account_id.clone(), name.clone())
},
Expand All @@ -65,13 +65,13 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
name: ContainerName,
) -> anyhow::Result<Result<Resource<Container>, Error>> {
record_host_function_call("blobstore::blobstore", "get_container");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let result = Durability::<Ctx, Option<u64>, SerializableError>::wrap(
self,
WrappedFunctionType::ReadRemote,
"golem blobstore::blobstore::get_container",
|ctx| {
ctx.private_state
ctx.state
.blob_store_service
.get_container(account_id.clone(), name.clone())
},
Expand All @@ -92,13 +92,13 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {

async fn delete_container(&mut self, name: ContainerName) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::blobstore", "delete_container");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let result = Durability::<Ctx, (), SerializableError>::wrap(
self,
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::delete_container",
|ctx| {
ctx.private_state
ctx.state
.blob_store_service
.delete_container(account_id.clone(), name.clone())
},
Expand All @@ -115,13 +115,13 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
name: ContainerName,
) -> anyhow::Result<Result<bool, Error>> {
record_host_function_call("blobstore::blobstore", "container_exists");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let result = Durability::<Ctx, bool, SerializableError>::wrap(
self,
WrappedFunctionType::ReadRemote,
"golem blobstore::blobstore::container_exists",
|ctx| {
ctx.private_state
ctx.state
.blob_store_service
.container_exists(account_id.clone(), name.clone())
},
Expand All @@ -139,13 +139,13 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
dest: ObjectId,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::blobstore", "copy_object");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let result = Durability::<Ctx, (), SerializableError>::wrap(
self,
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::copy_object",
|ctx| {
ctx.private_state.blob_store_service.copy_object(
ctx.state.blob_store_service.copy_object(
account_id.clone(),
src.container.clone(),
src.object.clone(),
Expand All @@ -167,13 +167,13 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
dest: ObjectId,
) -> anyhow::Result<Result<(), Error>> {
record_host_function_call("blobstore::blobstore", "move_object");
let account_id = self.private_state.account_id.clone();
let account_id = self.state.account_id.clone();
let result = Durability::<Ctx, (), SerializableError>::wrap(
self,
WrappedFunctionType::WriteRemote,
"golem blobstore::blobstore::move_object",
|ctx| {
ctx.private_state.blob_store_service.move_object(
ctx.state.blob_store_service.move_object(
account_id.clone(),
src.container.clone(),
src.object.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
async fn subscribe_duration(&mut self, when: Duration) -> anyhow::Result<Resource<Pollable>> {
record_host_function_call("clocks::monotonic_clock", "subscribe_duration");
let now = self.now().await?;
self.commit_oplog().await;
self.state.commit_oplog().await;
let when = now.saturating_add(when);
Host::subscribe_instant(&mut self.as_wasi_view(), when).await
}
Expand Down
Loading
Loading