Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Whispering #203

Merged
merged 5 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion broadcast/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *HTTPBroadcaster) Start(done chan (error)) error {
}

if s.enableCORS {
verifiedVia = verifiedVia + ", CORS enabled"
verifiedVia += ", CORS enabled"
}

s.log.Info(fmt.Sprintf("Accept broadcast requests at %s%s (%s)", s.server.Address(), s.path, verifiedVia))
Expand Down
5 changes: 5 additions & 0 deletions broker/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ func (b *Memory) Shutdown(ctx context.Context) error {
}

func (b *Memory) HandleBroadcast(msg *common.StreamMessage) {
if msg.Meta != nil && msg.Meta.Transient {
b.broadcaster.Broadcast(msg)
return
}

offset := b.add(msg.Stream, msg.Data)

msg.Epoch = b.GetEpoch()
Expand Down
5 changes: 5 additions & 0 deletions broker/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ func (n *NATS) writeEpoch(val string) {
}

func (n *NATS) HandleBroadcast(msg *common.StreamMessage) {
if msg.Meta != nil && msg.Meta.Transient {
n.broadcaster.Broadcast(msg)
return
}

err := n.Ready(jetstreamReadyTimeout)
if err != nil {
n.log.Debug("JetStream is not ready yet to publish messages, add to backlog")
Expand Down
6 changes: 6 additions & 0 deletions cli/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,12 @@ func signedStreamsCLIFlags(c *config.Config, turboRailsKey *string, cableReadyKe
Destination: &c.Streams.Public,
},

&cli.BoolFlag{
Name: "streams_whisper",
Usage: "Enable whispering for signed pub/sub streams",
Destination: &c.Streams.Whisper,
},

&cli.BoolFlag{
Name: "turbo_streams",
Usage: "Enable Turbo Streams support",
Expand Down
15 changes: 15 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (

HistoryConfirmedType = "confirm_history"
HistoryRejectedType = "reject_history"

WhisperType = "whisper"
)

// Disconnect reasons
Expand All @@ -75,6 +77,11 @@ const (
UNAUTHORIZED_REASON = "unauthorized"
)

// Reserver state fields
const (
WHISPER_STREAM_STATE = "$w"
)

// SessionEnv represents the underlying HTTP connection data:
// URL and request headers.
// It also carries channel and connection state information used by the RPC app.
Expand Down Expand Up @@ -128,6 +135,10 @@ func (st *SessionEnv) MergeChannelState(id string, other *map[string]string) {
}
}

func (st *SessionEnv) RemoveChannelState(id string) {
delete((*st.ChannelStates), id)
}

// Returns a value for the specified key of the specified channel
func (st *SessionEnv) GetChannelStateField(id string, field string) string {
cst, ok := (*st.ChannelStates)[id]
Expand Down Expand Up @@ -289,6 +300,10 @@ func (m *Message) LogValue() slog.Value {
// which can be used to modify delivery behavior
type StreamMessageMetadata struct {
ExcludeSocket string `json:"exclude_socket,omitempty"`
// BroadcastType defines the message type to be used for messages sent to clients
BroadcastType string `json:"broadcast_type,omitempty"`
// Transient defines whether this message should be stored in the history
Transient bool `json:"transient,omitempty"`
}

func (smm *StreamMessageMetadata) LogValue() slog.Value {
Expand Down
21 changes: 21 additions & 0 deletions docs/signed_streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,27 @@ $digest = hash_hmac('sha256', $encoded, $SECRET_KEY);
$signed_stream_name = $encoded . '--' . $digest;
```

## Whispering

_Whispering_ is an ability to publish _transient_ broadcasts from clients, i.e., without touching your backend. This is useful when you want to share client-only information from one connection to others. Typical examples include typing indicators, cursor position sharing, etc.

Whispering must be enabled explicitly for signed streams via the `--streams_whisper` (`ANYCABLE_STREAMS_WHISPER=true`) option. Public streams always allow whispering.

Here is an example client code using AnyCable JS SDK:

```js
let channel = cable.streamFrom("chat/22");

channel.on("message", (msg) => {
if (msg.event === "typing") {
console.log(`user ${msg.name} is typing`);
}
})

// publishing whispers
channel.whisper({event: "typing", name: user.name})
```

## Hotwire and CableReady support

AnyCable provides an ability to terminate Hotwire ([Turbo Streams](https://turbo.hotwired.dev/handbook/streams)) and [CableReady](https://cableready.stimulusreflex.com) (v5+) subscriptions at the WebSocker server using the same signed streams functionality under the hood (and, thus, without performing any RPC calls to authorize subscriptions).
Expand Down
51 changes: 51 additions & 0 deletions etc/anyt/broadcast_tests/whisper_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# frozen_string_literal: true

feature "Whisper" do
channel do
def subscribed
stream_from "a"
# whispers_to "a" ???
__istate__["$w"] = "a"
end
end

let(:client2) { build_client(ignore: %w[ping welcome]) }
let(:client3) { build_client(ignore: %w[ping welcome]) }

before do
subscribe_request = {command: "subscribe", identifier: {channel: channel}.to_json}

client.send(subscribe_request)
client2.send(subscribe_request)
client3.send(subscribe_request)

ack = {
"identifier" => {channel: channel}.to_json, "type" => "confirm_subscription"
}

assert_message ack, client.receive
assert_message ack, client2.receive
assert_message ack, client3.receive
end

scenario %(
Only other clients receive the whisper message
) do
perform_request = {
:command => "whisper",
:identifier => {channel: channel}.to_json,
"data" => {"event" => "typing", "user" => "Vova"}
}

client.send(perform_request)

msg = {"identifier" => {channel: channel}.to_json, "message" => {"event" => "typing", "user" => "Vova"}}

assert_message msg, client2.receive
assert_message msg, client3.receive
assert_raises(Anyt::Client::TimeoutError) do
msg = client.receive(timeout: 0.5)
raise "Client 1 should not receive the message: #{msg}"
end
end
end
8 changes: 7 additions & 1 deletion hub/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,13 @@ func (g *Gate) performBroadcast(streamMsg *common.StreamMessage) {
}

func buildMessage(msg *common.StreamMessage, identifier string) encoders.EncodedMessage {
return encoders.NewCachedEncodedMessage(msg.ToReplyFor(identifier))
reply := msg.ToReplyFor(identifier)

if msg.Meta != nil {
reply.Type = msg.Meta.BroadcastType
}

return encoders.NewCachedEncodedMessage(reply)
}

func streamSessionsSnapshot[T comparable](src map[T]map[string]bool) map[T][]string {
Expand Down
2 changes: 2 additions & 0 deletions node/broker_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ func sharedIntegrationHistory(t *testing.T, node *Node, controller *mocks.Contro
ts := time.Now().Unix()

node.HandleBroadcast([]byte(`{"stream": "messages_1","data":"Flavia: buona sera"}`))
// Transient messages must not be stored in the history
node.HandleBroadcast([]byte(`{"stream": "messages_1","data":"Who's there?","meta":{"transient":true}}`))
node.HandleBroadcast([]byte(`{"stream": "messages_1","data":"Mario: ta-dam!"}`))

node.HandleBroadcast([]byte(`{"stream": "presence_1","data":"1 new notification"}`))
Expand Down
36 changes: 36 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error) {
_, err = n.Perform(s, msg)
case "history":
err = n.History(s, msg)
case "whisper":
err = n.Whisper(s, msg)
default:
err = fmt.Errorf("unknown command: %s", msg.Command)
}
Expand Down Expand Up @@ -492,6 +494,7 @@ func (n *Node) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResu
// Make sure to remove all streams subscriptions
res.StopAllStreams = true

s.env.RemoveChannelState(msg.Identifier)
s.subscriptions.RemoveChannel(msg.Identifier)

s.Log.Debug("unsubscribed", "identifier", msg.Identifier)
Expand Down Expand Up @@ -628,6 +631,39 @@ func (n *Node) retreiveHistory(history *common.HistoryRequest, streams []string)
return backlog, nil
}

// Whisper broadcasts the message to the specified whispering stream to
// all clients except the sender
func (n *Node) Whisper(s *Session, msg *common.Message) error {
// The session must have the whisper stream name defined in the state to be able to whisper
// If the stream is not defined, the whisper message is ignored
env := s.GetEnv()
if env == nil {
return errors.New("session environment is missing")
}

stream := env.GetChannelStateField(msg.Identifier, common.WHISPER_STREAM_STATE)

if stream == "" {
s.Log.Debug("whisper stream not found", "identifier", msg.Identifier)
return nil
}

broadcast := &common.StreamMessage{
Stream: stream,
Data: string(utils.ToJSON(msg.Data)),
Meta: &common.StreamMessageMetadata{
ExcludeSocket: s.GetID(),
Transient: true,
},
}

n.broker.HandleBroadcast(broadcast)

s.Log.Debug("whispered", "stream", stream)

return nil
}

// Broadcast message to stream (locally)
func (n *Node) Broadcast(msg *common.StreamMessage) {
n.metrics.CounterIncrement(metricsBroadcastMsg)
Expand Down
61 changes: 61 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,54 @@ func TestPerform(t *testing.T) {
})
}

func TestWhisper(t *testing.T) {
node := NewMockNode()
session := NewMockSession("14", node)
session2 := NewMockSession("15", node)

// Subscribe using different identifiers to make sure whisper is working
// per stream name, not per identifier
defer subscribeSessionToStream(session, node, "test_channel", "test_whisper")()
defer subscribeSessionToStream(session2, node, "test_channel_2", "test_whisper")()

go node.hub.Run()
defer node.hub.Shutdown()

t.Run("When whispering stream is configured for sending subscription", func(t *testing.T) {
session.env.MergeChannelState("test_channel", &map[string]string{common.WHISPER_STREAM_STATE: "test_whisper"})

err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"})
assert.Nil(t, err)

expected := `{"identifier":"test_channel_2","message":"tshh... it's a secret"}`

msg, err := session2.conn.Read()
assert.NoError(t, err)
assert.Equal(t, expected, string(msg))

// Sender do not receive the message
msg, err = session.conn.Read()
assert.Nil(t, msg)
assert.Error(t, err, "Session hasn't received any messages")
})

t.Run("When whispering stream is not configured", func(t *testing.T) {
session.env.RemoveChannelState("test_channel")

err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"})
assert.Nil(t, err)

msg, err := session2.conn.Read()
assert.Error(t, err)
assert.Nil(t, msg)

// Sender do not receive the message
msg, err = session.conn.Read()
assert.Nil(t, msg)
assert.Error(t, err, "Session hasn't received any messages")
})
}

func TestStreamSubscriptionRaceConditions(t *testing.T) {
node := NewMockNode()
session := NewMockSession("14", node)
Expand Down Expand Up @@ -781,6 +829,19 @@ func toJSON(msg encoders.EncodedMessage) []byte {
return b
}

func subscribeSessionToStream(s *Session, n *Node, identifier string, stream string) func() {
n.hub.AddSession(s)

s.subscriptions.AddChannel(identifier)
s.subscriptions.AddChannelStream(identifier, stream)
n.hub.SubscribeSession(s, stream, identifier)
n.broker.Subscribe(stream)

return func() {
n.hub.RemoveSession(s)
}
}

func readMessages(conn Connection, count int) ([]string, error) {
var messages []string

Expand Down
3 changes: 3 additions & 0 deletions streams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ type Config struct {
// Public determines if public (unsigned) streams are allowed
Public bool

// Whisper determines if whispering is enabled for pub/sub streams
Whisper bool

// PubSubChannel is the channel name used for direct pub/sub
PubSubChannel string

Expand Down
14 changes: 14 additions & 0 deletions streams/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
type SubscribeRequest struct {
StreamName string `json:"stream_name"`
SignedStreamName string `json:"signed_stream_name"`

whisper bool
}

func (r *SubscribeRequest) IsPresent() bool {
Expand Down Expand Up @@ -106,11 +108,18 @@ func (c *Controller) Subscribe(sid string, env *common.SessionEnv, ids string, i
c.log.With("identifier", identifier).Debug("verified", "stream", stream)
}

var state map[string]string

if request.whisper {
state = map[string]string{common.WHISPER_STREAM_STATE: stream}
}

return &common.CommandResult{
Status: common.SUCCESS,
Transmissions: []string{common.ConfirmationMessage(identifier)},
Streams: []string{stream},
DisconnectInterest: -1,
IState: state,
}, nil
}

Expand All @@ -134,6 +143,7 @@ func (c *Controller) Disconnect(sid string, env *common.SessionEnv, ids string,
func NewStreamsController(conf *Config, l *slog.Logger) *Controller {
key := conf.Secret
allowPublic := conf.Public
whispers := conf.Whisper

resolver := func(identifier string) (*SubscribeRequest, error) {
var request SubscribeRequest
Expand All @@ -146,6 +156,10 @@ func NewStreamsController(conf *Config, l *slog.Logger) *Controller {
return nil, errors.New("public streams are not allowed")
}

if whispers || (request.StreamName != "") {
request.whisper = true
}

return &request, nil
}

Expand Down
Loading
Loading