Skip to content

Commit

Permalink
fix(tests): isolate embedded nats in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Oct 30, 2023
1 parent 5206656 commit 3f9d6b5
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 11 deletions.
3 changes: 2 additions & 1 deletion broker/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,11 @@ bucketSetup:
bucket, err := n.js.KeyValue(context.Background(), key)

if err == jetstream.ErrBucketNotFound {
n.log.Debugf("no JetStream bucket found, creating a new one: %s", key)
var berr error
bucket, berr = n.js.CreateKeyValue(context.Background(), jetstream.KeyValueConfig{
Bucket: key,
TTL: time.Duration(n.conf.SessionsTTL * int64(time.Second)),
TTL: ttl,
})

if berr != nil {
Expand Down
45 changes: 38 additions & 7 deletions broker/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ func (FakeBroadastHandler) ExecuteRemoteCommand(cmd *common.RemoteCommandMessage
var _ pubsub.Handler = (*FakeBroadastHandler)(nil)

func TestNATSBroker_HistorySince_expiration(t *testing.T) {
server := buildNATSServer()
port := 32
addr := fmt.Sprintf("nats://127.0.0.1:44%d", port)
server := buildNATSServer(t, addr)
err := server.Start()
require.NoError(t, err)
defer server.Shutdown(context.Background()) // nolint:errcheck
Expand All @@ -37,6 +39,8 @@ func TestNATSBroker_HistorySince_expiration(t *testing.T) {
config.HistoryTTL = 1

nconfig := natsconfig.NewNATSConfig()
nconfig.Servers = addr

broadcastHandler := FakeBroadastHandler{}
broadcaster := pubsub.NewLegacySubscriber(broadcastHandler)
broker := NewNATSBroker(broadcaster, &config, &nconfig)
Expand Down Expand Up @@ -81,7 +85,9 @@ func TestNATSBroker_HistorySince_expiration(t *testing.T) {
}

func TestNATSBroker_HistorySince_with_limit(t *testing.T) {
server := buildNATSServer()
port := 33
addr := fmt.Sprintf("nats://127.0.0.1:44%d", port)
server := buildNATSServer(t, addr)
err := server.Start()
require.NoError(t, err)
defer server.Shutdown(context.Background()) // nolint:errcheck
Expand All @@ -90,6 +96,8 @@ func TestNATSBroker_HistorySince_with_limit(t *testing.T) {
config.HistoryLimit = 2

nconfig := natsconfig.NewNATSConfig()
nconfig.Servers = addr

broadcastHandler := FakeBroadastHandler{}
broadcaster := pubsub.NewLegacySubscriber(broadcastHandler)
broker := NewNATSBroker(broadcaster, &config, &nconfig)
Expand Down Expand Up @@ -120,14 +128,19 @@ func TestNATSBroker_HistorySince_with_limit(t *testing.T) {
}

func TestNATSBroker_HistoryFrom(t *testing.T) {
server := buildNATSServer()
port := 34
addr := fmt.Sprintf("nats://127.0.0.1:44%d", port)
server := buildNATSServer(t, addr)

err := server.Start()
require.NoError(t, err)
defer server.Shutdown(context.Background()) // nolint:errcheck

config := NewConfig()

nconfig := natsconfig.NewNATSConfig()
nconfig.Servers = addr

broadcastHandler := FakeBroadastHandler{}
broadcaster := pubsub.NewLegacySubscriber(broadcastHandler)
broker := NewNATSBroker(broadcaster, &config, &nconfig)
Expand Down Expand Up @@ -202,7 +215,9 @@ func (t *TestCacheable) ToCacheEntry() ([]byte, error) {
}

func TestNATSBroker_Sessions(t *testing.T) {
server := buildNATSServer()
port := 41
addr := fmt.Sprintf("nats://127.0.0.1:44%d", port)
server := buildNATSServer(t, addr)
err := server.Start()
require.NoError(t, err)
defer server.Shutdown(context.Background()) // nolint:errcheck
Expand All @@ -211,6 +226,8 @@ func TestNATSBroker_Sessions(t *testing.T) {
config.SessionsTTL = 1

nconfig := natsconfig.NewNATSConfig()
nconfig.Servers = addr

broker := NewNATSBroker(nil, &config, &nconfig)

err = broker.Start()
Expand Down Expand Up @@ -257,7 +274,10 @@ func TestNATSBroker_Sessions(t *testing.T) {
}

func TestNATSBroker_SessionsTTLChange(t *testing.T) {
server := buildNATSServer()
port := 43
addr := fmt.Sprintf("nats://127.0.0.1:44%d", port)

server := buildNATSServer(t, addr)
err := server.Start()
require.NoError(t, err)
defer server.Shutdown(context.Background()) // nolint:errcheck
Expand All @@ -266,6 +286,8 @@ func TestNATSBroker_SessionsTTLChange(t *testing.T) {
config.SessionsTTL = 1

nconfig := natsconfig.NewNATSConfig()
nconfig.Servers = addr

broker := NewNATSBroker(nil, &config, &nconfig)

err = broker.Start()
Expand Down Expand Up @@ -317,20 +339,27 @@ func TestNATSBroker_SessionsTTLChange(t *testing.T) {
}

func TestNATSBroker_Epoch(t *testing.T) {
server := buildNATSServer()
port := 45
addr := fmt.Sprintf("nats://127.0.0.1:44%d", port)

server := buildNATSServer(t, addr)
err := server.Start()
require.NoError(t, err)
defer server.Shutdown(context.Background()) // nolint:errcheck

config := NewConfig()

nconfig := natsconfig.NewNATSConfig()
nconfig.Servers = addr

broker := NewNATSBroker(nil, &config, &nconfig)

err = broker.Start()
require.NoError(t, err)
defer broker.Shutdown(context.Background()) // nolint: errcheck

broker.Reset() // nolint: errcheck

epoch := broker.Epoch()

anotherBroker := NewNATSBroker(nil, &config, &nconfig)
Expand All @@ -340,9 +369,11 @@ func TestNATSBroker_Epoch(t *testing.T) {
assert.Equal(t, epoch, anotherBroker.Epoch())
}

func buildNATSServer() *enats.Service {
func buildNATSServer(t *testing.T, addr string) *enats.Service {
conf := enats.NewConfig()
conf.JetStream = true
conf.ServiceAddr = addr
conf.StoreDir = t.TempDir()
service := enats.NewService(&conf)

return service
Expand Down
1 change: 1 addition & 0 deletions enats/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type Config struct {
Gateways []string
Routes []string
JetStream bool
StoreDir string
}
4 changes: 4 additions & 0 deletions enats/enats.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (s *Service) Start() error {
JetStream: s.config.JetStream,
}

if s.config.StoreDir != "" {
opts.StoreDir = s.config.StoreDir
}

s.server, err = server.NewServer(opts)
if err != nil {
return errorx.Decorate(err, "Failed to start NATS server")
Expand Down
18 changes: 15 additions & 3 deletions node/broker_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func TestIntegrationRestore_Memory(t *testing.T) {
}

func TestIntegrationRestore_NATS(t *testing.T) {
server := buildNATSServer()
port := 32
addr := fmt.Sprintf("nats://127.0.0.1:45%d", port)

server := buildNATSServer(t, addr)
err := server.Start()
require.NoError(t, err)
defer server.Shutdown(context.Background()) // nolint:errcheck
Expand All @@ -63,6 +66,8 @@ func TestIntegrationRestore_NATS(t *testing.T) {
bconf.SessionsTTL = 2

nconfig := natsconfig.NewNATSConfig()
nconfig.Servers = addr

broadcaster := pubsub.NewLegacySubscriber(node)
broker := broker.NewNATSBroker(broadcaster, &bconf, &nconfig)
node.SetBroker(broker)
Expand Down Expand Up @@ -244,7 +249,10 @@ func TestIntegrationHistory_Memory(t *testing.T) {
}

func TestIntegrationHistory_NATS(t *testing.T) {
server := buildNATSServer()
port := 33
addr := fmt.Sprintf("nats://127.0.0.1:45%d", port)

server := buildNATSServer(t, addr)
err := server.Start()
require.NoError(t, err)
defer server.Shutdown(context.Background()) // nolint:errcheck
Expand All @@ -254,6 +262,8 @@ func TestIntegrationHistory_NATS(t *testing.T) {
bconf := broker.NewConfig()

nconfig := natsconfig.NewNATSConfig()
nconfig.Servers = addr

broadcaster := pubsub.NewLegacySubscriber(node)
broker := broker.NewNATSBroker(broadcaster, &bconf, &nconfig)
node.SetBroker(broker)
Expand Down Expand Up @@ -427,9 +437,11 @@ func requireAuthenticatedSession(t *testing.T, node *Node, sid string) *Session
return session
}

func buildNATSServer() *enats.Service {
func buildNATSServer(t *testing.T, addr string) *enats.Service {
conf := enats.NewConfig()
conf.JetStream = true
conf.ServiceAddr = addr
conf.StoreDir = t.TempDir()
service := enats.NewService(&conf)

return service
Expand Down

0 comments on commit 3f9d6b5

Please sign in to comment.