Skip to content

Commit

Permalink
chore: nats broker integration, configuration, docs
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Oct 30, 2023
1 parent 3f9d6b5 commit 5e584c9
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master

- Add NATS-based broker. ([@palkan][])

## 1.4.6 (2023-10-25)

- Add `@anycable/anycable-go` NPM package to install `anycable-go` as a JS project dependency. ([@palkan][])
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ test-conformance-broker-redis: tmp/anycable-go-test
ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=redisx ANYCABLE_HTTP_BROADCAST_SECRET=any_secret ANYCABLE_PUBSUB=redis bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb

test-conformance-broker-nats: tmp/anycable-go-test
ANYCABLE_BROKER=memory ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb
ANYCABLE_BROKER=nats ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb

test-conformance-all: test-conformance test-conformance-ssl test-conformance-http

Expand Down
12 changes: 6 additions & 6 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,6 @@ func (r *Runner) Run() error {
go disconnector.Run() // nolint:errcheck
appNode.SetDisconnector(disconnector)

err = appNode.Start()

if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize application !!!")
}

if r.config.EmbedNats {
service, enatsErr := r.embedNATS(&r.config.EmbeddedNats)

Expand All @@ -227,6 +221,12 @@ func (r *Runner) Run() error {
r.shutdownables = append(r.shutdownables, service)
}

err = appNode.Start()

if err != nil {
return errorx.Decorate(err, "!!! Failed to initialize application !!!")
}

err = subscriber.Start(r.errChan)
if err != nil {
return errorx.Decorate(err, "!!! Subscriber failed !!!")
Expand Down
7 changes: 7 additions & 0 deletions cli/runner_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ func WithDefaultBroker() Option {
case "memory":
b := broker.NewMemoryBroker(br, &c.Broker)
return b, nil
case "nats":
// TODO: Figure out a better place for this hack.
// We don't want to enable JetStream by default (if NATS is used only for pub/sub),
// currently, we only need it when NATS is used as a broker.
c.EmbeddedNats.JetStream = true
b := broker.NewNATSBroker(br, &c.Broker, &c.NATS)
return b, nil
default:
return nil, errorx.IllegalArgument.New("Unsupported broker adapter: %s", c.BrokerAdapter)
}
Expand Down
14 changes: 12 additions & 2 deletions config/presets.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,19 @@ func (c *Config) loadFlyPreset(defaults *Config) error {
c.PubSubAdapter = "nats"

// NATS hasn't been configured, so we can embed it
if c.EmbedNats || c.NATS.Servers == defaults.NATS.Servers {
if !c.EmbedNats || c.NATS.Servers == defaults.NATS.Servers {
c.EmbedNats = true
c.NATS.Servers = c.EmbeddedNats.ServiceAddr
}
}
}

if c.BrokerAdapter == defaults.BrokerAdapter {
if c.EmbedNats {
c.BrokerAdapter = "nats"
}
}

if rpcName, ok := os.LookupEnv("ANYCABLE_FLY_RPC_APP_NAME"); ok {
if c.RPC.Host == defaults.RPC.Host {
c.RPC.Host = fmt.Sprintf("dns:///%s.%s.internal:50051", region, rpcName)
Expand Down Expand Up @@ -140,7 +146,11 @@ func (c *Config) loadBrokerPreset(defaults *Config) error {
enatsEnabled := c.EmbedNats

if c.BrokerAdapter == defaults.BrokerAdapter {
c.BrokerAdapter = "memory"
if enatsEnabled {
c.BrokerAdapter = "nats"
} else {
c.BrokerAdapter = "memory"
}
}

if c.BroadcastAdapter == defaults.BroadcastAdapter {
Expand Down
5 changes: 3 additions & 2 deletions config/presets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestFlyPresets(t *testing.T) {
assert.Equal(t, 8989, config.HTTPBroadcast.Port)
assert.Equal(t, true, config.EmbedNats)
assert.Equal(t, "nats", config.PubSubAdapter)
assert.Equal(t, "nats", config.BrokerAdapter)
assert.Equal(t, "nats://0.0.0.0:4222", config.EmbeddedNats.ServiceAddr)
assert.Equal(t, "nats://0.0.0.0:5222", config.EmbeddedNats.ClusterAddr)
assert.Equal(t, "any-test-mag-cluster", config.EmbeddedNats.ClusterName)
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestBrokerWhenENATSConfigured(t *testing.T) {

require.NoError(t, err)

assert.Equal(t, "memory", config.BrokerAdapter)
assert.Equal(t, "nats", config.BrokerAdapter)
assert.Equal(t, "http,nats", config.BroadcastAdapter)
assert.Equal(t, "nats", config.PubSubAdapter)
}
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestFlyWithBrokerPresets(t *testing.T) {
assert.Equal(t, 8989, config.HTTPBroadcast.Port)
assert.Equal(t, true, config.EmbedNats)
assert.Equal(t, "nats", config.PubSubAdapter)
assert.Equal(t, "memory", config.BrokerAdapter)
assert.Equal(t, "nats", config.BrokerAdapter)
assert.Equal(t, "http,nats", config.BroadcastAdapter)
assert.Equal(t, "nats://0.0.0.0:4222", config.EmbeddedNats.ServiceAddr)
assert.Equal(t, "nats://0.0.0.0:5222", config.EmbeddedNats.ClusterAddr)
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ The preset provide the following defaults:
- `enats_cluster_routes`: "nats://\<FLY_REGION\>.\<FLY_APP_NAME\>.internal:5222"
- `enats_gateway_advertise`: "\<FLY_REGION\>.\<FLY_APP_NAME\>.internal:7222" (**NOTE:** You must set `ANYCABLE_ENATS_GATEWAY` to `nats://0.0.0.0:7222` and configure at least one gateway address manually to enable gateways).

Also, [embedded NATS](./embedded_nats.md) is enabled automatically if no other pub/sub adapter neither Redis is configured. Similarly, pub/sub and broadcast adapters using embedded NATS are configured, too. Thus, by default, AnyCable-Go setups a NATS cluster automatically (within a single region), no configuration is required.
Also, [embedded NATS](./embedded_nats.md) is enabled automatically if no other pub/sub adapter neither Redis is configured. Similarly, pub/sub, broker and broadcast adapters using embedded NATS are configured automatically, too. Thus, by default, AnyCable-Go setups a NATS cluster automatically (within a single region), no configuration is required.

If the `ANYCABLE_FLY_RPC_APP_NAME` env variable is provided, the following defaults are configured as well:

Expand Down
25 changes: 25 additions & 0 deletions docs/reliable_streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,31 @@ The default broker adapter. It stores all data in memory. It can be used **only

**NOTE:** Storing data in memory may result into the increased RAM usage of an AnyCable-Go process.

### NATS

This adapter uses [NATS JetStream](https://nats.io/) as a shared distributed storage for sessions and streams cache and also keeps a local snapshot in memory (using the in-memory adapter described above).

It can be used with both external NATS and [embedded NATS](./embedded_nats.md):

```sh
$ anycable-go --broker=nats --nats_servers=nats://localhost:4222
INFO 2023-10-28T00:57:53.937Z context=main Starting AnyCable 1.4.6-c31c153 (with mruby 1.2.0 (2015-11-17)) (pid: 29874, open file limit: 122880, gomaxprocs: 8)
INFO 2023-10-28T00:57:53.937Z context=main Starting NATS broker: nats://localhost:4222 (history limit: 100, history ttl: 300s, sessions ttl: 300s)
...
```

Or with embedded NATS:

```sh
$ anycable-go --embed_nats --broker=nats
INFO 2023-10-28T00:59:01.177Z context=main Starting AnyCable 1.4.6-c31c153 (with mruby 1.2.0 (2015-11-17)) (pid: 30693, open file limit: 122880, gomaxprocs: 8)
INFO 2023-10-28T00:59:01.177Z context=main Starting NATS broker: nats://127.0.0.1:4222 (history limit: 100, history ttl: 300s, sessions ttl: 300s)
INFO 2023-10-28T00:59:01.205Z context=main Embedded NATS server started: nats://127.0.0.1:4222
...
```

### Redis

<p class="pro-badge-header"></p>
Expand Down

0 comments on commit 5e584c9

Please sign in to comment.