Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add isWorker flag to actor startup #62

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions examples/file_cache/file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,19 @@ func (f *FileCacheModule) Instantiate(
payload []byte,
host virtual.HostCapabilities,
) (virtual.Actor, error) {
p := &FileCacheInstantiatePayload{}
p := &types.InstantiatePayload{}
if err := json.Unmarshal(payload, p); err != nil {
return nil, fmt.Errorf("error unmarshaling InstantiatePayload: %w", err)
}
fcp := &FileCacheInstantiatePayload{}
if err := json.Unmarshal(p.Payload, fcp); err != nil {
return nil, fmt.Errorf("error unmarshaling FileCacheInstantiatePayload: %w", err)
}
if p.FileSize <= 0 {
return nil, fmt.Errorf("filesize cannot be <= 0, but was: %d", p.FileSize)
if fcp.FileSize <= 0 {
return nil, fmt.Errorf("filesize cannot be <= 0, but was: %d", fcp.FileSize)
}

return NewFileCacheActor(p.FileSize, f.chunkSize, f.fetchSize, f.fetcher, f.chunkCache)
return NewFileCacheActor(fcp.FileSize, f.chunkSize, f.fetchSize, f.fetcher, f.chunkCache)
}

func (f *FileCacheModule) Close(ctx context.Context) error {
Expand Down
41 changes: 36 additions & 5 deletions testdata/tinygo/util/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package main

import (
"encoding/base64"
"fmt"
"strconv"

"github.com/buger/jsonparser"
"github.com/richardartoul/nola/virtual/types"
"github.com/richardartoul/nola/wapcutils"
wapc "github.com/wapc/wapc-guest-tinygo"
)
Expand Down Expand Up @@ -33,12 +37,14 @@ func main() {

var (
count int64
instantiatePayload []byte
instantiatePayload types.InstantiatePayload
startupWasCalled = false
shutdownWasCalled = false
)

// getInstantiatePayload returns the payload provided to the Startup invocation.
func getInstantiatePayload(payload []byte) ([]byte, error) {
return instantiatePayload, nil
return instantiatePayload.Payload, nil
}

// inc increments the actor's in-memory global counter.
Expand Down Expand Up @@ -144,11 +150,29 @@ func invokeCustomHostFn(payload []byte) ([]byte, error) {
return wapc.HostCall("wapc", "nola", string(payload), payload)
}

var startupWasCalled = false

func startup(payload []byte) ([]byte, error) {
startupWasCalled = true
instantiatePayload = append([]byte(nil), payload...)
err := jsonparser.ObjectEach(payload, func(key, value []byte, dataType jsonparser.ValueType, offset int) error {
switch string(key) {
case "IsWorker":
isWorker, err := strconv.ParseBool(string(value))
if err != nil {
return fmt.Errorf("failed to parse IsWorker bool: %w", err)
}
instantiatePayload.IsWorker = isWorker
case "Payload":
payload, err := base64.StdEncoding.DecodeString(string(value))
if err != nil {
return fmt.Errorf("failed to parse Payload: %w", err)
}
instantiatePayload.Payload = payload
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to parse InstantiatePayload: %w", err)
}

return nil, nil
}

Expand All @@ -160,11 +184,18 @@ func getStartupWasCalled(payload []byte) ([]byte, error) {
}

func shutdown(payload []byte) ([]byte, error) {
if instantiatePayload.IsWorker {
shutdownWasCalled = true
return nil, nil
}
_, err := wapc.HostCall("wapc", "nola", wapcutils.KVPutOperationName, wapcutils.EncodePutPayload(nil, []byte("shutdown"), []byte("true")))
return nil, err
}

func getShutdownValue(payload []byte) ([]byte, error) {
if instantiatePayload.IsWorker {
return []byte(strconv.FormatBool(shutdownWasCalled)), nil
}
res, err := wapc.HostCall("wapc", "nola", wapcutils.KVGetOperationName, []byte("shutdown"))
if err != nil {
return nil, err
Expand Down
Binary file modified testdata/tinygo/util/main.wasm
Binary file not shown.
17 changes: 15 additions & 2 deletions virtual/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package virtual

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -530,7 +531,13 @@ func (r *environment) InvokeActorDirectStream(
heartbeatResult.ServerVersion, serverVersion)
}

return r.activations.invoke(ctx, reference, operation, create.InstantiatePayload, payload, false)
// Wrap instantiation payload into a struct that provides metadata to the actor
b, err := json.Marshal(types.InstantiatePayload{Payload: create.InstantiatePayload, IsWorker: false})
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should push these down into activations.go that way we can avoid the allocations/marshalling work except for the case where a new actor is actually being created which will make it ~free

if err != nil {
return nil, fmt.Errorf("failed to marshal instantiation payload: %w", err)
}

return r.activations.invoke(ctx, reference, operation, b, payload, false)
}

func (r *environment) InvokeWorker(
Expand Down Expand Up @@ -582,9 +589,15 @@ func (r *environment) InvokeWorkerStream(
return nil, fmt.Errorf("InvokeWorker: error creating actor reference: %w", err)
}

// Wrap instantiation payload into a struct that provides metadata to the actor
b, err := json.Marshal(types.InstantiatePayload{Payload: create.InstantiatePayload, IsWorker: true})
if err != nil {
return nil, fmt.Errorf("failed to marshal instantiation payload: %w", err)
}

// Workers provide none of the consistency / linearizability guarantees that actor's do, so we
// can bypass the registry entirely and just immediately invoke the function.
return r.activations.invoke(ctx, ref, operation, create.InstantiatePayload, payload, false)
return r.activations.invoke(ctx, ref, operation, b, payload, false)
}

func (r *environment) Close(ctx context.Context) error {
Expand Down
23 changes: 16 additions & 7 deletions virtual/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestCreateIfNotExistWithInstantiatePayload(t *testing.T) {
ctx, ns, "a", "test-module",
"getInstantiatePayload", nil, types.CreateIfNotExist{})
require.NoError(t, err)
require.Equal(t, []byte("abc"), result)
require.Equal(t, "abc", string(result))
}
}
}
Expand Down Expand Up @@ -978,9 +978,13 @@ func (tm testModule) Instantiate(
payload []byte,
host HostCapabilities,
) (Actor, error) {
p := types.InstantiatePayload{}
if err := json.Unmarshal(payload, &p); err != nil {
return nil, fmt.Errorf("failed to unmarshal InstantiatePayload: %w", err)
}
return &testActor{
host: host,
instantiatePayload: payload,
instantiatePayload: p,
}, nil
}

Expand All @@ -994,7 +998,7 @@ type testActor struct {
count int
startupWasCalled bool
shutdownWasCalled bool
instantiatePayload []byte
instantiatePayload types.InstantiatePayload
}

func (ta *testActor) Invoke(
Expand All @@ -1009,18 +1013,18 @@ func (ta *testActor) Invoke(
return nil, nil
case wapcutils.ShutdownOperationName:
ta.shutdownWasCalled = true
if _, ok := transaction.(noopTransaction); !ok {
if !ta.instantiatePayload.IsWorker {
return nil, transaction.Put(ctx, []byte("shutdown"), []byte("true"))
}
return nil, nil
case "getShutdownValue":
if _, ok := transaction.(noopTransaction); !ok {
if !ta.instantiatePayload.IsWorker {
result, _, err := transaction.Get(ctx, []byte("shutdown"))
return result, err
}
return []byte(strconv.FormatBool(ta.shutdownWasCalled)), nil
case "getInstantiatePayload":
return ta.instantiatePayload, nil
return []byte(ta.instantiatePayload.Payload), nil
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no byte cast here needed

case "inc":
ta.count++
return []byte(strconv.Itoa(ta.count)), nil
Expand Down Expand Up @@ -1084,10 +1088,15 @@ func (tm testStreamModule) Instantiate(
streamInterfaceWasCalledMutex.Lock()
defer streamInterfaceWasCalledMutex.Unlock()
streamInterfaceWasCalled = true

p := types.InstantiatePayload{}
if err := json.Unmarshal(payload, &p); err != nil {
return nil, fmt.Errorf("failed to unmarshal InstantiatePayload: %w", err)
}
return &testStreamActor{
a: &testActor{
host: host,
instantiatePayload: payload,
instantiatePayload: p,
},
}, nil
}
Expand Down
11 changes: 11 additions & 0 deletions virtual/types/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ type CreateIfNotExist struct {
InstantiatePayload []byte
}

// InstantiatePayload provides the arguments for initialiazing actors on the STARTUP call.
type InstantiatePayload struct {
// IsWorker is a flag that is used to indicate whether the payload is intended for a worker or not
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: . at end

IsWorker bool
// InstantiatePayload is the []byte that will be provided to the actor on
// instantiation. It is generally used to provide any actor-specific constructor
// arguments that are required to instantiate the actor in memory.
// It is the value passed at CreateIfNotExist.InstantiatePayload
Payload []byte
}

// ActorOptions contains the options for a given actor.
type ActorOptions struct {
}