From 5e584c94ee51d8e31810994d3616ec7cf5b51c5a Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Fri, 27 Oct 2023 18:00:56 -0700 Subject: [PATCH] chore: nats broker integration, configuration, docs --- CHANGELOG.md | 2 ++ Makefile | 2 +- cli/cli.go | 12 ++++++------ cli/runner_options.go | 7 +++++++ config/presets.go | 14 ++++++++++++-- config/presets_test.go | 5 +++-- docs/configuration.md | 2 +- docs/reliable_streams.md | 25 +++++++++++++++++++++++++ 8 files changed, 57 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d1c6faa..d4b5709a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master +- Add NATS-based broker. ([@palkan][]) + ## 1.4.6 (2023-10-25) - Add `@anycable/anycable-go` NPM package to install `anycable-go` as a JS project dependency. ([@palkan][]) diff --git a/Makefile b/Makefile index b0ab758d..da8bb546 100644 --- a/Makefile +++ b/Makefile @@ -153,7 +153,7 @@ test-conformance-broker-redis: tmp/anycable-go-test ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=redisx ANYCABLE_HTTP_BROADCAST_SECRET=any_secret ANYCABLE_PUBSUB=redis bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb test-conformance-broker-nats: tmp/anycable-go-test - ANYCABLE_BROKER=memory ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb + ANYCABLE_BROKER=nats ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb test-conformance-all: test-conformance test-conformance-ssl test-conformance-http diff --git a/cli/cli.go b/cli/cli.go index bc4c2cb9..d00daa3e 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -203,12 +203,6 @@ func (r *Runner) Run() error { go disconnector.Run() // nolint:errcheck appNode.SetDisconnector(disconnector) - err = appNode.Start() - - if err != nil { - return errorx.Decorate(err, "!!! Failed to initialize application !!!") - } - if r.config.EmbedNats { service, enatsErr := r.embedNATS(&r.config.EmbeddedNats) @@ -227,6 +221,12 @@ func (r *Runner) Run() error { r.shutdownables = append(r.shutdownables, service) } + err = appNode.Start() + + if err != nil { + return errorx.Decorate(err, "!!! Failed to initialize application !!!") + } + err = subscriber.Start(r.errChan) if err != nil { return errorx.Decorate(err, "!!! Subscriber failed !!!") diff --git a/cli/runner_options.go b/cli/runner_options.go index b5d53061..62260b47 100644 --- a/cli/runner_options.go +++ b/cli/runner_options.go @@ -162,6 +162,13 @@ func WithDefaultBroker() Option { case "memory": b := broker.NewMemoryBroker(br, &c.Broker) return b, nil + case "nats": + // TODO: Figure out a better place for this hack. + // We don't want to enable JetStream by default (if NATS is used only for pub/sub), + // currently, we only need it when NATS is used as a broker. + c.EmbeddedNats.JetStream = true + b := broker.NewNATSBroker(br, &c.Broker, &c.NATS) + return b, nil default: return nil, errorx.IllegalArgument.New("Unsupported broker adapter: %s", c.BrokerAdapter) } diff --git a/config/presets.go b/config/presets.go index 7e07e066..bf8afdb2 100644 --- a/config/presets.go +++ b/config/presets.go @@ -100,13 +100,19 @@ func (c *Config) loadFlyPreset(defaults *Config) error { c.PubSubAdapter = "nats" // NATS hasn't been configured, so we can embed it - if c.EmbedNats || c.NATS.Servers == defaults.NATS.Servers { + if !c.EmbedNats || c.NATS.Servers == defaults.NATS.Servers { c.EmbedNats = true c.NATS.Servers = c.EmbeddedNats.ServiceAddr } } } + if c.BrokerAdapter == defaults.BrokerAdapter { + if c.EmbedNats { + c.BrokerAdapter = "nats" + } + } + if rpcName, ok := os.LookupEnv("ANYCABLE_FLY_RPC_APP_NAME"); ok { if c.RPC.Host == defaults.RPC.Host { c.RPC.Host = fmt.Sprintf("dns:///%s.%s.internal:50051", region, rpcName) @@ -140,7 +146,11 @@ func (c *Config) loadBrokerPreset(defaults *Config) error { enatsEnabled := c.EmbedNats if c.BrokerAdapter == defaults.BrokerAdapter { - c.BrokerAdapter = "memory" + if enatsEnabled { + c.BrokerAdapter = "nats" + } else { + c.BrokerAdapter = "memory" + } } if c.BroadcastAdapter == defaults.BroadcastAdapter { diff --git a/config/presets_test.go b/config/presets_test.go index 7a7aa717..d1a6edc7 100644 --- a/config/presets_test.go +++ b/config/presets_test.go @@ -42,6 +42,7 @@ func TestFlyPresets(t *testing.T) { assert.Equal(t, 8989, config.HTTPBroadcast.Port) assert.Equal(t, true, config.EmbedNats) assert.Equal(t, "nats", config.PubSubAdapter) + assert.Equal(t, "nats", config.BrokerAdapter) assert.Equal(t, "nats://0.0.0.0:4222", config.EmbeddedNats.ServiceAddr) assert.Equal(t, "nats://0.0.0.0:5222", config.EmbeddedNats.ClusterAddr) assert.Equal(t, "any-test-mag-cluster", config.EmbeddedNats.ClusterName) @@ -141,7 +142,7 @@ func TestBrokerWhenENATSConfigured(t *testing.T) { require.NoError(t, err) - assert.Equal(t, "memory", config.BrokerAdapter) + assert.Equal(t, "nats", config.BrokerAdapter) assert.Equal(t, "http,nats", config.BroadcastAdapter) assert.Equal(t, "nats", config.PubSubAdapter) } @@ -169,7 +170,7 @@ func TestFlyWithBrokerPresets(t *testing.T) { assert.Equal(t, 8989, config.HTTPBroadcast.Port) assert.Equal(t, true, config.EmbedNats) assert.Equal(t, "nats", config.PubSubAdapter) - assert.Equal(t, "memory", config.BrokerAdapter) + assert.Equal(t, "nats", config.BrokerAdapter) assert.Equal(t, "http,nats", config.BroadcastAdapter) assert.Equal(t, "nats://0.0.0.0:4222", config.EmbeddedNats.ServiceAddr) assert.Equal(t, "nats://0.0.0.0:5222", config.EmbeddedNats.ClusterAddr) diff --git a/docs/configuration.md b/docs/configuration.md index 89e407c9..1c8f4c06 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -137,7 +137,7 @@ The preset provide the following defaults: - `enats_cluster_routes`: "nats://\.\.internal:5222" - `enats_gateway_advertise`: "\.\.internal:7222" (**NOTE:** You must set `ANYCABLE_ENATS_GATEWAY` to `nats://0.0.0.0:7222` and configure at least one gateway address manually to enable gateways). -Also, [embedded NATS](./embedded_nats.md) is enabled automatically if no other pub/sub adapter neither Redis is configured. Similarly, pub/sub and broadcast adapters using embedded NATS are configured, too. Thus, by default, AnyCable-Go setups a NATS cluster automatically (within a single region), no configuration is required. +Also, [embedded NATS](./embedded_nats.md) is enabled automatically if no other pub/sub adapter neither Redis is configured. Similarly, pub/sub, broker and broadcast adapters using embedded NATS are configured automatically, too. Thus, by default, AnyCable-Go setups a NATS cluster automatically (within a single region), no configuration is required. If the `ANYCABLE_FLY_RPC_APP_NAME` env variable is provided, the following defaults are configured as well: diff --git a/docs/reliable_streams.md b/docs/reliable_streams.md index 6d259156..43dc5e78 100644 --- a/docs/reliable_streams.md +++ b/docs/reliable_streams.md @@ -112,6 +112,31 @@ The default broker adapter. It stores all data in memory. It can be used **only **NOTE:** Storing data in memory may result into the increased RAM usage of an AnyCable-Go process. +### NATS + +This adapter uses [NATS JetStream](https://nats.io/) as a shared distributed storage for sessions and streams cache and also keeps a local snapshot in memory (using the in-memory adapter described above). + +It can be used with both external NATS and [embedded NATS](./embedded_nats.md): + +```sh +$ anycable-go --broker=nats --nats_servers=nats://localhost:4222 + + INFO 2023-10-28T00:57:53.937Z context=main Starting AnyCable 1.4.6-c31c153 (with mruby 1.2.0 (2015-11-17)) (pid: 29874, open file limit: 122880, gomaxprocs: 8) + INFO 2023-10-28T00:57:53.937Z context=main Starting NATS broker: nats://localhost:4222 (history limit: 100, history ttl: 300s, sessions ttl: 300s) + ... +``` + +Or with embedded NATS: + +```sh +$ anycable-go --embed_nats --broker=nats + + INFO 2023-10-28T00:59:01.177Z context=main Starting AnyCable 1.4.6-c31c153 (with mruby 1.2.0 (2015-11-17)) (pid: 30693, open file limit: 122880, gomaxprocs: 8) + INFO 2023-10-28T00:59:01.177Z context=main Starting NATS broker: nats://127.0.0.1:4222 (history limit: 100, history ttl: 300s, sessions ttl: 300s) + INFO 2023-10-28T00:59:01.205Z context=main Embedded NATS server started: nats://127.0.0.1:4222 + ... +``` + ### Redis