Skip to content

Commit

Permalink
Oplog enumeration (#985)
Browse files Browse the repository at this point in the history
* Introducing the new Golem Host API version

* Forwarding 1.1 implementation to 0.2

* Using updated golem-wasm-ast and wasm-rpc

* Code cleanup and reduced redundancy

* Moved ValueAndType to wasm-rpc

* WIP

* Progress

* Progress

* Backward compatibility fix

* Backward compatibility fix

* Host implementation

* Serialization fix

* WIT representation

* gRPC and OpenAPI support

* gRPC and REST APIs

* Finished implementation

* Final dependencies

* Tests and fixes
  • Loading branch information
vigoo authored Oct 9, 2024
1 parent 666f93c commit 3d09b24
Show file tree
Hide file tree
Showing 122 changed files with 10,043 additions and 2,765 deletions.
1,855 changes: 949 additions & 906 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ futures = "0.3"
futures-core = "0.3.29"
futures-util = "0.3.29"
git-version = "0.3.9"
golem-wasm-ast = "1.0.0"
golem-wasm-rpc = { version = "1.0.3", default-features = false, features = [
golem-wasm-ast = "1.0.1"
golem-wasm-rpc = { version = "1.0.5", default-features = false, features = [
"host",
] }

http = "1.0.0" # keep in sync with wasmtime
http_02 = { package = "http", version = "0.2.11" }
humansize = "2.1.3"
Expand Down Expand Up @@ -177,7 +178,7 @@ tokio-rustls = { version = "0.26.0" }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = "0.7.10"
toml = "0.8.14"
tonic = "0.11.0"
tonic = { version = "0.11.0", features = ["gzip"] }
tonic-reflection = "0.11.0"
tonic-health = "0.11.0"
tracing = { version = "0.1.40", features = ["log"] }
Expand Down
2 changes: 2 additions & 0 deletions golem-api-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"proto/golem/worker/invoke_result.proto",
"proto/golem/worker/log_event.proto",
"proto/golem/worker/promise_id.proto",
"proto/golem/worker/public_oplog.proto",
"proto/golem/worker/update_mode.proto",
"proto/golem/worker/worker_id.proto",
"proto/golem/worker/worker_metadata.proto",
"proto/golem/worker/worker_filter.proto",
Expand Down
8 changes: 8 additions & 0 deletions golem-api-grpc/proto/golem/worker/oplog_cursor.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";

package golem.worker;

message OplogCursor {
uint64 next_oplog_index = 1;
uint64 current_component_version = 2;
}
211 changes: 211 additions & 0 deletions golem-api-grpc/proto/golem/worker/public_oplog.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
syntax = "proto3";

import "golem/common/account_id.proto";
import "golem/common/empty.proto";
import "golem/worker/idempotency_key.proto";
import "golem/worker/worker_id.proto";
import "google/protobuf/timestamp.proto";
import "wasm/rpc/type_annotated_value.proto";

package golem.worker;

message OplogEntry {
oneof entry {
CreateParameters Create = 1;
ImportedFunctionInvokedParameters ImportedFunctionInvoked = 2;
ExportedFunctionInvokedParameters ExportedFunctionInvoked = 3;
ExportedFunctionCompletedParameters ExportedFunctionCompleted = 4;
TimestampParameter Suspend = 5;
ErrorParameters Error = 6;
TimestampParameter NoOp = 7;
JumpParameters Jump = 8;
TimestampParameter Interrupted = 9;
TimestampParameter Exited = 10;
ChangeRetryPolicyParameters ChangeRetryPolicy = 11;
TimestampParameter BeginAtomicRegion = 12;
EndAtomicRegionParameters EndAtomicRegion = 13;
TimestampParameter BeginRemoteWrite = 14;
EndRemoteWriteParameters EndRemoteWrite = 15;
PendingWorkerInvocationParameters PendingWorkerInvocation = 16;
PendingUpdateParameters PendingUpdate = 17;
SuccessfulUpdateParameters SuccessfulUpdate = 18;
FailedUpdateParameters FailedUpdate = 19;
GrowMemoryParameters GrowMemory = 20;
CreateResourceParameters CreateResource = 21;
DropResourceParameters DropResource = 22;
DescribeResourceParameters DescribeResource = 23;
LogParameters Log = 24;
TimestampParameter Restart = 25;
}
}

message WrappedFunctionType {
enum Type {
READ_LOCAL = 0;
WRITE_LOCAL = 1;
READ_REMOTE = 2;
WRITE_REMOTE = 3;
WRITE_REMOTE_BATCHED = 4;
}
Type type = 1;
optional uint64 oplog_index = 2;
}

message CreateParameters {
google.protobuf.Timestamp timestamp = 1;
WorkerId worker_id = 2;
uint64 component_version = 3;
repeated string args = 4;
map<string, string> env = 5;
golem.common.AccountId account_id = 6;
optional WorkerId parent = 7;
uint64 component_size = 8;
uint64 initial_total_linear_memory_size = 9;
}

message ImportedFunctionInvokedParameters {
google.protobuf.Timestamp timestamp = 1;
string function_name = 2;
wasm.rpc.TypeAnnotatedValue request = 3;
wasm.rpc.TypeAnnotatedValue response = 4;
WrappedFunctionType wrapped_function_type = 5;
}

message ExportedFunctionInvokedParameters {
google.protobuf.Timestamp timestamp = 1;
string function_name = 2;
repeated wasm.rpc.TypeAnnotatedValue request = 3;
IdempotencyKey idempotency_key = 4;
}

message ExportedFunctionCompletedParameters {
google.protobuf.Timestamp timestamp = 1;
wasm.rpc.TypeAnnotatedValue response = 2;
int64 consumed_fuel = 3;
}

message ErrorParameters {
google.protobuf.Timestamp timestamp = 1;
string error = 2;
}

message JumpParameters {
google.protobuf.Timestamp timestamp = 1;
uint64 start = 2;
uint64 end = 3;
}

message ChangeRetryPolicyParameters {
google.protobuf.Timestamp timestamp = 1;
RetryPolicy retry_policy = 2;
}

message RetryPolicy {
uint32 max_attempts = 1;
uint64 min_delay = 2;
uint64 max_delay = 3;
double multiplier = 4;
optional double max_jitter_factor = 5;
}

message EndAtomicRegionParameters {
google.protobuf.Timestamp timestamp = 1;
uint64 begin_index = 2;
}

message EndRemoteWriteParameters {
google.protobuf.Timestamp timestamp = 1;
uint64 begin_index = 2;
}

message ExportedFunctionInvocationParameters {
IdempotencyKey idempotency_key = 1;
string function_name = 2;
repeated wasm.rpc.TypeAnnotatedValue input = 3;
bool valid_input = 4;
}

message WorkerInvocation {
oneof invocation {
ExportedFunctionInvocationParameters exported_function = 1;
uint64 manual_update = 2;
}
}

message PendingWorkerInvocationParameters {
google.protobuf.Timestamp timestamp = 1;
WorkerInvocation invocation = 2;
}

message SnapshotBasedUpdateParameters {
bytes payload = 1;
}

message UpdateDescription {
oneof description {
golem.common.Empty auto_update = 1;
SnapshotBasedUpdateParameters snapshot_based = 2;
}
}

message PendingUpdateParameters {
google.protobuf.Timestamp timestamp = 1;
uint64 target_version = 2;
UpdateDescription update_description = 3;
}

message SuccessfulUpdateParameters {
google.protobuf.Timestamp timestamp = 1;
uint64 target_version = 2;
uint64 new_component_size = 3;
}

message FailedUpdateParameters {
google.protobuf.Timestamp timestamp = 1;
uint64 target_version = 2;
optional string details = 3;
}

message GrowMemoryParameters {
google.protobuf.Timestamp timestamp = 1;
uint64 delta = 2;
}

message CreateResourceParameters {
google.protobuf.Timestamp timestamp = 1;
uint64 resource_id = 2;
}

message DropResourceParameters {
google.protobuf.Timestamp timestamp = 1;
uint64 resource_id = 2;
}

message DescribeResourceParameters {
google.protobuf.Timestamp timestamp = 1;
uint64 resource_id = 2;
string resource_name = 3;
repeated wasm.rpc.TypeAnnotatedValue resource_params = 4;
}

message TimestampParameter {
google.protobuf.Timestamp timestamp = 1;
}

enum OplogLogLevel {
OPLOG_STDOUT = 0;
OPLOG_STDERR = 1;
OPLOG_TRACE = 2;
OPLOG_DEBUG = 3;
OPLOG_INFO = 4;
OPLOG_WARN = 5;
OPLOG_ERROR = 6;
OPLOG_CRITICAL = 7;
}

message LogParameters {
google.protobuf.Timestamp timestamp = 1;
OplogLogLevel level = 2;
string context = 3;
string message = 4;
}
33 changes: 33 additions & 0 deletions golem-api-grpc/proto/golem/worker/v1/worker_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import public "golem/worker/v1/worker_error.proto";
import public "golem/worker/worker_filter.proto";
import public "golem/worker/worker_metadata.proto";
import public "golem/worker/log_event.proto";
import public "golem/worker/oplog_cursor.proto";
import public "golem/worker/public_oplog.proto";
import public "golem/worker/worker_id.proto";
import public "golem/component/component_id.proto";
import public "golem/worker/update_mode.proto";
Expand All @@ -28,13 +30,16 @@ service WorkerService {
rpc InterruptWorker (InterruptWorkerRequest) returns (InterruptWorkerResponse);
rpc InvokeAndAwait (InvokeAndAwaitRequest) returns (InvokeAndAwaitResponse);
rpc InvokeAndAwaitJson (InvokeAndAwaitJsonRequest) returns (InvokeAndAwaitJsonResponse);
rpc InvokeAndAwaitTyped (InvokeAndAwaitRequest) returns (InvokeAndAwaitTypedResponse);
rpc Invoke (InvokeRequest) returns (InvokeResponse);
rpc InvokeJson (InvokeJsonRequest) returns (InvokeResponse);
rpc ResumeWorker (ResumeWorkerRequest) returns (ResumeWorkerResponse);
rpc ConnectWorker(ConnectWorkerRequest) returns (stream golem.worker.LogEvent);
rpc GetWorkersMetadata(GetWorkersMetadataRequest) returns (GetWorkersMetadataResponse);

rpc UpdateWorker(UpdateWorkerRequest) returns (UpdateWorkerResponse);

rpc GetOplog(GetOplogRequest) returns (GetOplogResponse);
}

message LaunchNewWorkerRequest {
Expand Down Expand Up @@ -117,6 +122,13 @@ message InvokeAndAwaitResponse {
}
}

message InvokeAndAwaitTypedResponse {
oneof result {
golem.worker.InvokeResultTyped success = 1;
golem.worker.v1.WorkerError error = 2;
}
}

message InvokeAndAwaitJsonRequest {
golem.worker.TargetWorkerId workerId = 1;
golem.worker.IdempotencyKey idempotencyKey = 2;
Expand Down Expand Up @@ -203,4 +215,25 @@ message UpdateWorkerResponse {
golem.common.Empty success = 1;
WorkerError error = 2;
}
}

message GetOplogRequest {
golem.worker.WorkerId worker_id = 1;
uint64 from_oplog_index = 3;
optional golem.worker.OplogCursor cursor = 4;
uint64 count = 5;
}

message GetOplogResponse {
oneof result {
GetOplogSuccessResponse success = 1;
WorkerError error = 2;
}
}

message GetOplogSuccessResponse {
repeated golem.worker.OplogEntry entries = 1;
optional golem.worker.OplogCursor next = 2;
uint64 first_index_in_chunk = 3;
uint64 last_index = 5;
}
25 changes: 25 additions & 0 deletions golem-api-grpc/proto/golem/workerexecutor/v1/worker_executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import public "golem/common/resource_limits.proto";
import public "golem/shardmanager/shard_id.proto";
import public "golem/component/component_id.proto";
import public "golem/worker/cursor.proto";
import public "golem/worker/oplog_cursor.proto";
import public "golem/worker/public_oplog.proto";
import public "golem/worker/update_mode.proto";
import public "golem/worker/target_worker_id.proto";
import public "golem/worker/worker_id.proto";
Expand Down Expand Up @@ -39,6 +41,7 @@ service WorkerExecutor {
rpc GetRunningWorkersMetadata(GetRunningWorkersMetadataRequest) returns (GetRunningWorkersMetadataResponse);
rpc GetWorkersMetadata(GetWorkersMetadataRequest) returns (GetWorkersMetadataResponse);
rpc UpdateWorker(UpdateWorkerRequest) returns (UpdateWorkerResponse);
rpc GetOplog(GetOplogRequest) returns (GetOplogResponse);
}

message InvokeWorkerResponse {
Expand Down Expand Up @@ -253,4 +256,26 @@ message UpdateWorkerResponse {
golem.common.Empty success = 1;
golem.worker.v1.WorkerExecutionError failure = 2;
}
}

message GetOplogRequest {
golem.worker.WorkerId worker_id = 1;
golem.common.AccountId account_id = 2;
uint64 from_oplog_index = 3;
optional golem.worker.OplogCursor cursor = 4;
uint64 count = 5;
}

message GetOplogResponse {
oneof result {
GetOplogSuccessResponse success = 1;
golem.worker.v1.WorkerExecutionError failure = 2;
}
}

message GetOplogSuccessResponse {
repeated golem.worker.OplogEntry entries = 1;
optional golem.worker.OplogCursor next = 2;
uint64 first_index_in_chunk = 3;
uint64 last_index = 5;
}
Loading

0 comments on commit 3d09b24

Please sign in to comment.