Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make registry heartbeat TTL configurable #88

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions virtual/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const (
Localhost = "127.0.0.1"

maxNumActivationsToCache = 1e6 // 1 Million.
heartbeatTimeout = registry.HeartbeatTTL
)

var (
Expand All @@ -33,7 +32,7 @@ var (
ErrEnvironmentClosed = errors.New("environment is closed")

// Var so can be modified by tests.
defaultActivationsCacheTTL = heartbeatTimeout
defaultActivationsCacheTTL = registry.DefaultHeartbeatTTL
DefaultGCActorsAfterDurationWithNoInvocations = time.Minute
)

Expand Down Expand Up @@ -736,7 +735,7 @@ func (r *environment) NumActivatedActors() int {
}

func (r *environment) Heartbeat() error {
ctx, cc := context.WithTimeout(context.Background(), heartbeatTimeout)
ctx, cc := context.WithTimeout(context.Background(), r.registry.HeartbeatTTL())
defer cc()

var (
Expand Down
2 changes: 1 addition & 1 deletion virtual/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ func TestServerVersionIsHonored(t *testing.T) {

env.pauseHeartbeat()

time.Sleep(registry.HeartbeatTTL + time.Second)
time.Sleep(registry.DefaultHeartbeatTTL + time.Second)

env.resumeHeartbeat()

Expand Down
12 changes: 12 additions & 0 deletions virtual/registry/dnsregistry/dns_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type DNSRegistryOptions struct {
// ResolveEvery controls how often the LookupIP method will be
// called on the DNSResolver to detect which IPs are active.
ResolveEvery time.Duration

// HeartbeatTTL is the maximum amount of time between server heartbeats before
// the registry will consider a server as dead.
HeartbeatTTL time.Duration

// Logger is a logging instance used for logging messages.
// If no logger is provided, the default logger from the slog package (slog.Default()) will be used.
Logger *slog.Logger
Expand Down Expand Up @@ -93,6 +98,9 @@ func NewDNSRegistryFromResolver(
if opts.ResolveEvery == 0 {
opts.ResolveEvery = 5 * time.Second
}
if opts.HeartbeatTTL == 0 {
opts.HeartbeatTTL = registry.DefaultHeartbeatTTL
}
if opts.Logger == nil {
opts.Logger = slog.Default()
}
Expand Down Expand Up @@ -176,6 +184,10 @@ func (d *dnsRegistry) UnsafeWipeAll() error {
return nil
}

func (d *dnsRegistry) HeartbeatTTL() time.Duration {
return d.opts.HeartbeatTTL
}

func (d *dnsRegistry) discover() error {
addresses, err := d.resolver.LookupIP(d.host)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions virtual/registry/dnsregistry/dns_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dnsregistry

import (
"context"
"github.com/stretchr/testify/assert"
"net"
"strings"
"testing"
Expand Down Expand Up @@ -100,3 +101,11 @@ func TestDNSRegistrySingleNode(t *testing.T) {
require.Equal(t, DNSServerID, activations.References[0].Physical.ServerID)
require.Equal(t, DNSServerVersion, activations.References[0].Physical.ServerVersion)
}

func TestDNSRegistryTTL(t *testing.T) {
reg, err := NewDNSRegistry(Localhost, 9090, DNSRegistryOptions{
HeartbeatTTL: 25 * time.Millisecond,
})
require.NoError(t, err)
assert.Equal(t, reg.HeartbeatTTL(), 25*time.Millisecond, "Expected heartbeat TTL to be 25ms")
}
30 changes: 18 additions & 12 deletions virtual/registry/kv_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ import (
)

const (
// HeartbeatTTL is the maximum amount of time between server heartbeats before
// the registry will consider a server as dead.
//
// TODO: Should be configurable.
HeartbeatTTL = 5 * time.Second

// 2GiB, see KVRegistryOptions.RebalanceMemoryThreshold for more details.
DefaultRebalanceMemoryThreshold = 1 << 31
)
Expand Down Expand Up @@ -76,6 +70,10 @@ type KVRegistryOptions struct {
// from all the servers in the cluster.
MinSuccessiveHeartbeatsBeforeAllowActivations int

// HeartbeatTTL is the maximum amount of time between server heartbeats before
// the registry will consider a server as dead.
HeartbeatTTL time.Duration

// Logger is a logging instance used for logging messages.
// If no logger is provided, the default logger from the slog
// package (slog.Default()) will be used.
Expand All @@ -96,6 +94,9 @@ func NewKVRegistry(
if opts.RebalanceMemoryThreshold <= 0 {
opts.RebalanceMemoryThreshold = DefaultRebalanceMemoryThreshold
}
if opts.HeartbeatTTL <= 0 {
opts.HeartbeatTTL = DefaultHeartbeatTTL
}

return NewValidatedRegistry(&kvRegistry{
kv: kv,
Expand Down Expand Up @@ -271,7 +272,7 @@ func (k *kvRegistry) EnsureActivation(
// 3. One or more of the servers where the actor is currently activated has blacklisted the actor, typically for load balancing purposes.

// First to see where the new replicas should be activated we need to get a list of all available servers.
liveServers, err := getLiveServers(ctx, vs, tr)
liveServers, err := getLiveServers(ctx, vs, k.opts.HeartbeatTTL, tr)
if err != nil {
return fmt.Errorf("failed to get live servers: %w", err), err
}
Expand Down Expand Up @@ -426,7 +427,7 @@ func (k *kvRegistry) getExistingUnblacklistedActivations(
if err := json.Unmarshal(v, &server); err != nil {
return nil, nil, fmt.Errorf("error unmarsaling server state with ID: %s", req.ActorID)
}
if versionSince(vs, server.LastHeartbeatedAt) > HeartbeatTTL {
if versionSince(vs, server.LastHeartbeatedAt) > k.opts.HeartbeatTTL {
// Server "exists" but has not heartbeated recently. Assume its dead and ignore this activation.
continue
}
Expand Down Expand Up @@ -536,7 +537,7 @@ func (k *kvRegistry) Heartbeat(
return nil, fmt.Errorf("error getting versionstamp: %w", err)
}
timeSinceLastHeartbeat := versionSince(vs, state.LastHeartbeatedAt)
if timeSinceLastHeartbeat >= HeartbeatTTL {
if timeSinceLastHeartbeat >= k.opts.HeartbeatTTL {
state.ServerVersion++
}

Expand All @@ -559,15 +560,15 @@ func (k *kvRegistry) Heartbeat(
// package directly, but right now its tested in environment.go and
// examples/leaderregistry/main_test.go

liveServers, err := getLiveServers(ctx, vs, tr)
liveServers, err := getLiveServers(ctx, vs, k.opts.HeartbeatTTL, tr)
if err != nil {
return nil, fmt.Errorf("error getting live servers during heartbeat for load balancing: %w", err)
}

result := HeartbeatResult{
VersionStamp: vs,
// VersionStamp corresponds to ~ 1 million increments per second.
HeartbeatTTL: int64(HeartbeatTTL.Microseconds()),
HeartbeatTTL: int64(k.opts.HeartbeatTTL.Microseconds()),
ServerVersion: serverVersion,
}

Expand Down Expand Up @@ -596,6 +597,10 @@ func (k *kvRegistry) UnsafeWipeAll() error {
return k.kv.UnsafeWipeAll()
}

func (k *kvRegistry) HeartbeatTTL() time.Duration {
return k.opts.HeartbeatTTL
}

func (k *kvRegistry) getActorBytes(
ctx context.Context,
tr kv.Transaction,
Expand Down Expand Up @@ -713,6 +718,7 @@ func versionSince(curr, prev int64) time.Duration {
func getLiveServers(
ctx context.Context,
versionStamp int64,
heartbeatTtl time.Duration,
tr kv.Transaction,
) ([]serverState, error) {
liveServers := []serverState{}
Expand All @@ -722,7 +728,7 @@ func getLiveServers(
return fmt.Errorf("error unmarshaling server state: %w", err)
}

if versionSince(versionStamp, currServer.LastHeartbeatedAt) < HeartbeatTTL {
if versionSince(versionStamp, currServer.LastHeartbeatedAt) < heartbeatTtl {
liveServers = append(liveServers, currServer)
}
return nil
Expand Down
5 changes: 5 additions & 0 deletions virtual/registry/leaderregistry/leader_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/http"
"time"

"github.com/richardartoul/nola/virtual"
"github.com/richardartoul/nola/virtual/registry"
Expand Down Expand Up @@ -211,6 +212,10 @@ func (l *leaderRegistry) UnsafeWipeAll() error {
return errors.New("not implemented")
}

func (l *leaderRegistry) HeartbeatTTL() time.Duration {
return registry.DefaultHeartbeatTTL
}

type leaderActorModule struct {
serverID string
}
Expand Down
12 changes: 12 additions & 0 deletions virtual/registry/leaderregistry/leader_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package leaderregistry

import (
"context"
"github.com/stretchr/testify/assert"
"net"
"sync"
"testing"
Expand Down Expand Up @@ -35,6 +36,17 @@ func TestLeaderRegistry(t *testing.T) {
})
}

func TestLeaderRegistryTTL(t *testing.T) {
envOpts := virtual.EnvironmentOptions{Discovery: virtual.DiscoveryOptions{
DiscoveryType: virtual.DiscoveryTypeLocalHost,
Port: 9093,
}}
reg, err := NewLeaderRegistry(context.Background(), newTestLeaderProvider(), "test-registry-server-id", envOpts)
require.NoError(t, err)

assert.Equal(t, reg.HeartbeatTTL(), registry.DefaultHeartbeatTTL, "Expected leader registry to return default heartbeat TTL of 5s")
}

type testLeaderProvider struct {
sync.Mutex
leader registry.Address
Expand Down
16 changes: 8 additions & 8 deletions virtual/registry/test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func testRegistryServiceDiscoveryAndEnsureActivation(t *testing.T, registry Regi
})
require.NoError(t, err)
require.True(t, heartbeatResult.VersionStamp > 0)
require.Equal(t, HeartbeatTTL.Microseconds(), heartbeatResult.HeartbeatTTL)
require.Equal(t, DefaultHeartbeatTTL.Microseconds(), heartbeatResult.HeartbeatTTL)
}

// Should succeed now that we have a server to activate on.
Expand Down Expand Up @@ -191,7 +191,7 @@ func testRegistryServiceDiscoveryAndEnsureActivation(t *testing.T, registry Regi
//
// TODO: Sleeps in tests are bad, but I'm lazy to inject a clock right now and deal
// with all of that.
time.Sleep(HeartbeatTTL + time.Second)
time.Sleep(DefaultHeartbeatTTL + time.Second)

// Heartbeat server2. After this, the Registry should only consider server2 to be alive.
_, err = registry.Heartbeat(ctx, "server2", HeartbeatState{
Expand Down Expand Up @@ -252,15 +252,15 @@ func testRegistryReplication(t *testing.T, registry Registry) {
})
require.NoError(t, err)
require.True(t, heartbeatResult.VersionStamp > 0)
require.Equal(t, HeartbeatTTL.Microseconds(), heartbeatResult.HeartbeatTTL)
require.Equal(t, DefaultHeartbeatTTL.Microseconds(), heartbeatResult.HeartbeatTTL)

heartbeatResult, err = registry.Heartbeat(ctx, "server2", HeartbeatState{
NumActivatedActors: 10,
Address: "server2_address",
})
require.NoError(t, err)
require.True(t, heartbeatResult.VersionStamp > 0)
require.Equal(t, HeartbeatTTL.Microseconds(), heartbeatResult.HeartbeatTTL)
require.Equal(t, DefaultHeartbeatTTL.Microseconds(), heartbeatResult.HeartbeatTTL)
}

activations, err := registry.EnsureActivation(ctx, EnsureActivationRequest{
Expand Down Expand Up @@ -312,15 +312,15 @@ func testEnsureActivationPersistence(t *testing.T, registry Registry) {
})
require.NoError(t, err)
require.True(t, heartbeatResult.VersionStamp > 0)
require.Equal(t, HeartbeatTTL.Microseconds(), heartbeatResult.HeartbeatTTL)
require.Equal(t, DefaultHeartbeatTTL.Microseconds(), heartbeatResult.HeartbeatTTL)

heartbeatResult, err = registry.Heartbeat(ctx, "server2", HeartbeatState{
NumActivatedActors: 10,
Address: "server2_address",
})
require.NoError(t, err)
require.True(t, heartbeatResult.VersionStamp > 0)
require.Equal(t, HeartbeatTTL.Microseconds(), heartbeatResult.HeartbeatTTL)
require.Equal(t, DefaultHeartbeatTTL.Microseconds(), heartbeatResult.HeartbeatTTL)
}

go func() {
Expand All @@ -338,9 +338,9 @@ func testEnsureActivationPersistence(t *testing.T, registry Registry) {
Address: "server2_address",
})

// Wait for HeartbeatTTL / 2 before sending the next heartbeat
// Wait for DefaultHeartbeatTTL / 2 before sending the next heartbeat
select {
case <-time.After(HeartbeatTTL / 2):
case <-time.After(DefaultHeartbeatTTL / 2):
case <-ctx.Done():
return
}
Expand Down
9 changes: 9 additions & 0 deletions virtual/registry/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ package registry
import (
"context"
"net"
"time"

"github.com/richardartoul/nola/virtual/types"
)

const (
// DefaultHeartbeatTTL registry heartbeat TTL that defaults to 5 seconds.
DefaultHeartbeatTTL = 5 * time.Second
)

// Registry is the interface that is implemented by the virtual actor registry.
type Registry interface {
// Heartbeat updates the "lastHeartbeatedAt" value for the provided server ID. Server's
Expand Down Expand Up @@ -43,6 +49,9 @@ type Registry interface {
// UnsafeWipeAll wipes the entire registry. Only used for tests. Do not call it anywhere
// in production code.
UnsafeWipeAll() error

// HeartbeatTTL returns the configured heartbeat TTL duration.
HeartbeatTTL() time.Duration
}

// CreateActorResult is the result of a call to CreateActor().
Expand Down
5 changes: 5 additions & 0 deletions virtual/registry/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"time"
)

var (
Expand Down Expand Up @@ -123,6 +124,10 @@ func (v *validator) UnsafeWipeAll() error {
return v.r.UnsafeWipeAll()
}

func (v *validator) HeartbeatTTL() time.Duration {
return DefaultHeartbeatTTL
}

func validateString(name, x string) error {
if x == "" {
return fmt.Errorf("%s cannot be empty", name)
Expand Down