Skip to content

Commit

Permalink
Add channel name to websocket message
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Feb 2, 2024
1 parent d5127f3 commit 082a912
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 22 deletions.
3 changes: 0 additions & 3 deletions cmd/api/bus/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

type Dispatcher struct {
listener storage.Listener
state storage.IState
blocks storage.IBlock

mx *sync.RWMutex
Expand All @@ -29,7 +28,6 @@ type Dispatcher struct {

func NewDispatcher(
factory storage.ListenerFactory,
state storage.IState,
blocks storage.IBlock,
) (*Dispatcher, error) {
if factory == nil {
Expand All @@ -38,7 +36,6 @@ func NewDispatcher(
listener := factory.CreateListener()
return &Dispatcher{
listener: listener,
state: state,
blocks: blocks,
observers: make([]*Observer, 0),
mx: new(sync.RWMutex),
Expand Down
6 changes: 3 additions & 3 deletions cmd/api/handler/websocket/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
"github.com/pkg/errors"
)

type processor[I, M any] func(data I) M
type processor[I any, M INotification] func(data I) Notification[M]

type Channel[I, M any] struct {
type Channel[I any, M INotification] struct {
clients *sdkSync.Map[uint64, client]
processor processor[I, M]
filters Filterable[M]
}

func NewChannel[I, M any](processor processor[I, M], filters Filterable[M]) *Channel[I, M] {
func NewChannel[I any, M INotification](processor processor[I, M], filters Filterable[M]) *Channel[I, M] {
return &Channel[I, M]{
clients: sdkSync.NewMap[uint64, client](),
processor: processor,
Expand Down
12 changes: 6 additions & 6 deletions cmd/api/handler/websocket/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"github.com/celenium-io/astria-indexer/cmd/api/handler/responses"
)

type Filterable[M any] interface {
Filter(c client, msg M) bool
type Filterable[M INotification] interface {
Filter(c client, msg Notification[M]) bool
}

type HeadFilter struct{}

func (hf HeadFilter) Filter(c client, msg *responses.State) bool {
if msg == nil {
func (f HeadFilter) Filter(c client, msg Notification[*responses.State]) bool {
if msg.Body == nil {
return false
}
fltrs := c.Filters()
Expand All @@ -26,8 +26,8 @@ func (hf HeadFilter) Filter(c client, msg *responses.State) bool {

type BlockFilter struct{}

func (hf BlockFilter) Filter(c client, msg *responses.Block) bool {
if msg == nil {
func (f BlockFilter) Filter(c client, msg Notification[*responses.Block]) bool {
if msg.Body == nil {
return false
}
fltrs := c.Filters()
Expand Down
28 changes: 27 additions & 1 deletion cmd/api/handler/websocket/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

package websocket

import "github.com/goccy/go-json"
import (
"github.com/celenium-io/astria-indexer/cmd/api/handler/responses"
"github.com/goccy/go-json"
)

// methods
const (
Expand Down Expand Up @@ -35,3 +38,26 @@ type TransactionFilters struct {
Status []string `json:"status,omitempty"`
Actions []string `json:"action_type,omitempty"`
}

type INotification interface {
*responses.Block | *responses.State
}

type Notification[T INotification] struct {
Channel string `json:"channel"`
Body T `json:"body"`
}

func NewBlockNotification(block responses.Block) Notification[*responses.Block] {
return Notification[*responses.Block]{
Channel: ChannelBlocks,
Body: &block,
}
}

func NewStateNotification(state responses.State) Notification[*responses.State] {
return Notification[*responses.State]{
Channel: ChannelHead,
Body: &state,
}
}
12 changes: 6 additions & 6 deletions cmd/api/handler/websocket/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"github.com/celenium-io/astria-indexer/internal/storage"
)

func headProcessor(state storage.State) *responses.State {
response := responses.NewState(state)
return &response
func blockProcessor(block storage.Block) Notification[*responses.Block] {
response := responses.NewBlock(block)
return NewBlockNotification(response)
}

func blockProcessor(block storage.Block) *responses.Block {
response := responses.NewBlock(block)
return &response
func headProcessor(state storage.State) Notification[*responses.State] {
response := responses.NewState(state)
return NewStateNotification(response)
}
164 changes: 164 additions & 0 deletions cmd/api/handler/ws_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package handler

import (
"context"
"crypto/rand"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"

"github.com/celenium-io/astria-indexer/pkg/types"

"github.com/celenium-io/astria-indexer/cmd/api/bus"
"github.com/celenium-io/astria-indexer/cmd/api/handler/responses"
ws "github.com/celenium-io/astria-indexer/cmd/api/handler/websocket"
"github.com/celenium-io/astria-indexer/internal/storage"
"github.com/celenium-io/astria-indexer/internal/storage/mock"
"github.com/goccy/go-json"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/lib/pq"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func TestWebsocket(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
listenerFactory := mock.NewMockListenerFactory(ctrl)
listener := mock.NewMockListener(ctrl)

listenerFactory.EXPECT().CreateListener().Return(listener).Times(1)

headChannel := make(chan *pq.Notification, 10)
listener.EXPECT().Listen().Return(headChannel).AnyTimes()
listener.EXPECT().Subscribe(gomock.Any(), storage.ChannelHead).Return(nil).Times(1)
listener.EXPECT().Close().Return(nil).MaxTimes(1)

ctx, cancel := context.WithCancel(context.Background())

blockMock := mock.NewMockIBlock(ctrl)
dispatcher, err := bus.NewDispatcher(listenerFactory, blockMock)
require.NoError(t, err)
dispatcher.Start(ctx)
observer := dispatcher.Observe(storage.ChannelHead, storage.ChannelBlock)

for i := 0; i < 10; i++ {
hash := make([]byte, 32)
_, err := rand.Read(hash)
require.NoError(t, err)

blockMock.EXPECT().ByIdWithRelations(ctx, uint64(i)).Return(storage.Block{
Id: uint64(i),
Height: types.Level(i),
Time: time.Now(),
Hash: hash,
Stats: testBlock.Stats,
}, nil).MaxTimes(1)
}

go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

var id uint64

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
id++

headChannel <- &pq.Notification{
Channel: storage.ChannelBlock,
Extra: strconv.FormatUint(id, 10),
}
}
}
}()
manager := ws.NewManager(observer)
manager.Start(ctx)

server := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
e := echo.New()
c := e.NewContext(r, w)
err := manager.Handle(c)
require.NoError(t, err, "handle")
<-ctx.Done()
},
))
defer server.Close()

wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/ws"
dialed, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
require.NoError(t, err, "dial")

body, err := json.Marshal(ws.Subscribe{
Channel: ws.ChannelBlocks,
})
require.NoError(t, err, "marshal subscribe")

err = dialed.WriteJSON(ws.Message{
Method: ws.MethodSubscribe,
Body: body,
})
require.NoError(t, err, "send subscribe message")

ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

for {
select {
case <-ticker.C:
body, err := json.Marshal(ws.Unsubscribe{
Channel: ws.ChannelHead,
})
require.NoError(t, err, "marshal unsubscribe")

err = dialed.WriteJSON(ws.Message{
Method: ws.MethodUnsubscribe,
Body: body,
})
require.NoError(t, err, "send unsubscribe message")

err = dialed.Close()
require.NoError(t, err, "closing connection")

time.Sleep(time.Second)
cancel()

err = manager.Close()
require.NoError(t, err, "closing manager")

close(headChannel)
return
default:
err := dialed.SetReadDeadline(time.Now().Add(time.Second * 3))
require.NoError(t, err)

_, msg, err := dialed.ReadMessage()
require.NoError(t, err, err)

var notification ws.Notification[*responses.Block]
err = json.Unmarshal(msg, &notification)
require.NoError(t, err, err)

require.Equal(t, ws.ChannelBlocks, notification.Channel)
require.Greater(t, notification.Body.Id, uint64(0))
require.Greater(t, notification.Body.Height, uint64(0))
require.False(t, notification.Body.Time.IsZero())
require.Len(t, notification.Body.Hash, 32)

log.Info().
Uint64("height", notification.Body.Height).
Time("block_time", notification.Body.Time).
Msg("new block")
}
}
}
2 changes: 1 addition & 1 deletion cmd/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func initEcho(cfg ApiConfig, env string) *echo.Echo {
var dispatcher *bus.Dispatcher

func initDispatcher(ctx context.Context, db postgres.Storage) {
d, err := bus.NewDispatcher(db, db.State, db.Blocks)
d, err := bus.NewDispatcher(db, db.Blocks)
if err != nil {
panic(err)
}
Expand Down
15 changes: 13 additions & 2 deletions cmd/api/markdown/websocket.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
## Documentation for websocket API

### Notification

The structure of notification is following in all channels:

```json
{
"channel": "channel_name",
"body": "<object or array>" // depends on channel
}
```

### Subscribe

To receive updates from websocket API send `subscribe` request to server.
Expand Down Expand Up @@ -29,7 +40,7 @@ Now 2 channels are supported:
}
```

In that channel messages of `responses.State` type will be sent.
Notification body of `responses.State` type will be sent to the channel.

* `blocks` - receive information about new blocks. Channel does not have any filters. Subscribe message should looks like:

Expand All @@ -42,7 +53,7 @@ In that channel messages of `responses.State` type will be sent.
}
```

In that channel messages of `responses.Block` type will be sent.
Notification body of `responses.Block` type will be sent to the channel.


### Unsubscribe
Expand Down

0 comments on commit 082a912

Please sign in to comment.