diff --git a/cli/options.go b/cli/options.go index b02c2301..f397eb0d 100644 --- a/cli/options.go +++ b/cli/options.go @@ -1096,6 +1096,12 @@ func signedStreamsCLIFlags(c *config.Config, turboRailsKey *string, cableReadyKe Destination: &c.Streams.Public, }, + &cli.BoolFlag{ + Name: "streams_whisper", + Usage: "Enable whispering for signed pub/sub streams", + Destination: &c.Streams.Whisper, + }, + &cli.BoolFlag{ Name: "turbo_streams", Usage: "Enable Turbo Streams support", diff --git a/docs/signed_streams.md b/docs/signed_streams.md index 41af76a6..73d79b62 100644 --- a/docs/signed_streams.md +++ b/docs/signed_streams.md @@ -142,6 +142,27 @@ $digest = hash_hmac('sha256', $encoded, $SECRET_KEY); $signed_stream_name = $encoded . '--' . $digest; ``` +## Whispering + +_Whispering_ is an ability to publish _transient_ broadcasts from clients, i.e., without touching your backend. This is useful when you want to share client-only information from one connection to others. Typical examples include typing indicators, cursor position sharing, etc. + +Whispering must be enabled explicitly for signed streams via the `--streams_whisper` (`ANYCABLE_STREAMS_WHISPER=true`) option. Public streams always allow whispering. + +Here is an example client code using AnyCable JS SDK: + +```js +let channel = cable.streamFrom("chat/22"); + +channel.on("message", (msg) => { + if (msg.event === "typing") { + console.log(`user ${msg.name} is typing`); + } +}) + +// publishing whispers +channel.whisper({event: "typing", name: user.name}) +``` + ## Hotwire and CableReady support AnyCable provides an ability to terminate Hotwire ([Turbo Streams](https://turbo.hotwired.dev/handbook/streams)) and [CableReady](https://cableready.stimulusreflex.com) (v5+) subscriptions at the WebSocker server using the same signed streams functionality under the hood (and, thus, without performing any RPC calls to authorize subscriptions). diff --git a/node/node.go b/node/node.go index 0bd017b2..8a5f4cb0 100644 --- a/node/node.go +++ b/node/node.go @@ -653,7 +653,6 @@ func (n *Node) Whisper(s *Session, msg *common.Message) error { Data: string(utils.ToJSON(msg.Data)), Meta: &common.StreamMessageMetadata{ ExcludeSocket: s.GetID(), - BroadcastType: common.WhisperType, Transient: true, }, } diff --git a/node/node_test.go b/node/node_test.go index 9f9c77d9..3e26ac84 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -277,7 +277,7 @@ func TestWhisper(t *testing.T) { err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"}) assert.Nil(t, err) - expected := `{"type":"whisper","identifier":"test_channel_2","message":"tshh... it's a secret"}` + expected := `{"identifier":"test_channel_2","message":"tshh... it's a secret"}` msg, err := session2.conn.Read() assert.NoError(t, err) diff --git a/streams/config.go b/streams/config.go index b0760b04..0dfe0849 100644 --- a/streams/config.go +++ b/streams/config.go @@ -9,6 +9,9 @@ type Config struct { // Public determines if public (unsigned) streams are allowed Public bool + // Whisper determines if whispering is enabled for pub/sub streams + Whisper bool + // PubSubChannel is the channel name used for direct pub/sub PubSubChannel string diff --git a/streams/controller.go b/streams/controller.go index 0c1213eb..72f09139 100644 --- a/streams/controller.go +++ b/streams/controller.go @@ -14,6 +14,8 @@ import ( type SubscribeRequest struct { StreamName string `json:"stream_name"` SignedStreamName string `json:"signed_stream_name"` + + whisper bool } func (r *SubscribeRequest) IsPresent() bool { @@ -106,11 +108,18 @@ func (c *Controller) Subscribe(sid string, env *common.SessionEnv, ids string, i c.log.With("identifier", identifier).Debug("verified", "stream", stream) } + var state map[string]string + + if request.whisper { + state = map[string]string{common.WHISPER_STREAM_STATE: stream} + } + return &common.CommandResult{ Status: common.SUCCESS, Transmissions: []string{common.ConfirmationMessage(identifier)}, Streams: []string{stream}, DisconnectInterest: -1, + IState: state, }, nil } @@ -134,6 +143,7 @@ func (c *Controller) Disconnect(sid string, env *common.SessionEnv, ids string, func NewStreamsController(conf *Config, l *slog.Logger) *Controller { key := conf.Secret allowPublic := conf.Public + whispers := conf.Whisper resolver := func(identifier string) (*SubscribeRequest, error) { var request SubscribeRequest @@ -146,6 +156,10 @@ func NewStreamsController(conf *Config, l *slog.Logger) *Controller { return nil, errors.New("public streams are not allowed") } + if whispers || (request.StreamName != "") { + request.whisper = true + } + return &request, nil } diff --git a/streams/controller_test.go b/streams/controller_test.go index 67f8c4c3..c0a60b55 100644 --- a/streams/controller_test.go +++ b/streams/controller_test.go @@ -53,6 +53,7 @@ func TestStreamsController(t *testing.T) { assert.Equal(t, []string{common.ConfirmationMessage(`{"channel":"$pubsub","stream_name":"chat:2024"}`)}, res.Transmissions) assert.Equal(t, []string{"chat:2024"}, res.Streams) assert.Equal(t, -1, res.DisconnectInterest) + assert.Equal(t, "chat:2024", res.IState[common.WHISPER_STREAM_STATE]) }) t.Run("Subscribe - no public allowed", func(t *testing.T) { @@ -85,6 +86,28 @@ func TestStreamsController(t *testing.T) { assert.Equal(t, []string{common.ConfirmationMessage(identifier)}, res.Transmissions) assert.Equal(t, []string{"chat:2021"}, res.Streams) assert.Equal(t, -1, res.DisconnectInterest) + assert.Nil(t, res.IState) + }) + + t.Run("Subscribe - signed - whisper", func(t *testing.T) { + conf := NewConfig() + conf.Secret = key + conf.Whisper = true + subject := NewStreamsController(&conf, slog.Default()) + + require.NotNil(t, subject) + + identifier := `{"channel":"$pubsub","signed_stream_name":"` + stream + `"}` + + res, err := subject.Subscribe("42", nil, "name=jack", identifier) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(identifier)}, res.Transmissions) + assert.Equal(t, []string{"chat:2021"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + assert.Equal(t, "chat:2021", res.IState[common.WHISPER_STREAM_STATE]) }) }