Skip to content

Commit

Permalink
Optimization: websocket block channel (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky authored Jan 10, 2025
1 parent 52a2acb commit c2c3b67
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 62 deletions.
22 changes: 11 additions & 11 deletions cmd/api/bus/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package bus

import (
"context"
"strconv"
"sync"

"github.com/celenium-io/celestia-indexer/internal/storage"
Expand All @@ -18,7 +17,6 @@ import (

type Dispatcher struct {
listener storage.Listener
blocks storage.IBlock
validators storage.IValidator

mx *sync.RWMutex
Expand All @@ -29,7 +27,6 @@ type Dispatcher struct {

func NewDispatcher(
factory storage.ListenerFactory,
blocks storage.IBlock,
validators storage.IValidator,
) (*Dispatcher, error) {
if factory == nil {
Expand All @@ -38,7 +35,6 @@ func NewDispatcher(
listener := factory.CreateListener()
return &Dispatcher{
listener: listener,
blocks: blocks,
validators: validators,
observers: make([]*Observer, 0),
mx: new(sync.RWMutex),
Expand Down Expand Up @@ -111,18 +107,22 @@ func (d *Dispatcher) handleNotification(ctx context.Context, notification *pq.No
}

func (d *Dispatcher) handleBlock(ctx context.Context, payload string) error {
id, err := strconv.ParseUint(payload, 10, 64)
if err != nil {
return errors.Wrapf(err, "parse block id: %s", payload)
block := new(storage.Block)
if err := jsoniter.UnmarshalFromString(payload, block); err != nil {
return err
}

block, err := d.blocks.ByIdWithRelations(ctx, id)
if err != nil {
return err
if block.ProposerId > 0 {
validator, err := d.validators.GetByID(ctx, block.ProposerId)
if err != nil {
return err
}
block.Proposer = *validator
}

d.mx.RLock()
for i := range d.observers {
d.observers[i].notifyBlocks(&block)
d.observers[i].notifyBlocks(block)
}
d.mx.RUnlock()
return nil
Expand Down
46 changes: 26 additions & 20 deletions cmd/api/handler/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,26 @@ package handler

import (
"context"
"crypto/rand"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"

"github.com/celenium-io/celestia-indexer/pkg/types"

"github.com/celenium-io/celestia-indexer/cmd/api/bus"
"github.com/celenium-io/celestia-indexer/cmd/api/handler/responses"
ws "github.com/celenium-io/celestia-indexer/cmd/api/handler/websocket"
"github.com/celenium-io/celestia-indexer/internal/storage"
"github.com/celenium-io/celestia-indexer/internal/storage/mock"
storageTypes "github.com/celenium-io/celestia-indexer/internal/storage/types"
"github.com/celenium-io/celestia-indexer/pkg/types"
"github.com/goccy/go-json"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/lib/pq"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/require"

"go.uber.org/mock/gomock"
)

Expand All @@ -45,26 +43,22 @@ func TestWebsocket(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

blockMock := mock.NewMockIBlock(ctrl)
validatorsMock := mock.NewMockIValidator(ctrl)
dispatcher, err := bus.NewDispatcher(listenerFactory, blockMock, validatorsMock)
dispatcher, err := bus.NewDispatcher(listenerFactory, validatorsMock)
require.NoError(t, err)
dispatcher.Start(ctx)
observer := dispatcher.Observe(storage.ChannelHead, storage.ChannelBlock)

for i := uint64(0); i < 10; i++ {
hash := make([]byte, 32)
_, err := rand.Read(hash)
require.NoError(t, err)

blockMock.EXPECT().ByIdWithRelations(ctx, i).Return(storage.Block{
Id: i,
Height: types.Level(i),
Time: time.Now(),
Hash: hash,
MessageTypes: storageTypes.NewMsgTypeBits(),
Stats: testBlock.Stats,
}, nil).MaxTimes(1)
for i := uint64(1); i < 7; i++ {
validatorsMock.
EXPECT().
GetByID(ctx, i).
Return(&storage.Validator{
Id: i,
Moniker: "moniker",
ConsAddress: "cons_address",
}, nil).
Times(1)
}

go func() {
Expand All @@ -80,9 +74,21 @@ func TestWebsocket(t *testing.T) {
case <-ticker.C:
id++

block := storage.Block{
Id: id,
ProposerId: id,
Height: types.Level(id),
Time: time.Now(),
Hash: testBlock.Hash,
Stats: testBlock.Stats,
MessageTypes: storageTypes.NewMsgTypeBits(),
}
data, err := json.Marshal(block)
require.NoError(t, err)

headChannel <- &pq.Notification{
Channel: storage.ChannelBlock,
Extra: strconv.FormatUint(id, 10),
Extra: string(data),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func initEcho(cfg ApiConfig, env string) *echo.Echo {
var dispatcher *bus.Dispatcher

func initDispatcher(ctx context.Context, db postgres.Storage) {
d, err := bus.NewDispatcher(db, db.Blocks, db.Validator)
d, err := bus.NewDispatcher(db, db.Validator)
if err != nil {
panic(err)
}
Expand Down
12 changes: 6 additions & 6 deletions internal/storage/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ type Block struct {
EvidenceHash pkgTypes.Hex `bun:"evidence_hash" comment:"Evidence hash"`
ProposerId uint64 `bun:"proposer_id,nullzero" comment:"Proposer internal id"`

ChainId string `bun:"-"` // internal field for filling state
ProposerAddress string `bun:"-"` // internal field for proposer
BlockSignatures []BlockSignature `bun:"-"` // internal field for block signature
ChainId string `bun:"-" json:"-"` // internal field for filling state
ProposerAddress string `bun:"-" json:"-"` // internal field for proposer
BlockSignatures []BlockSignature `bun:"-" json:"-"` // internal field for block signature

Txs []Tx `bun:"rel:has-many"`
Events []Event `bun:"rel:has-many"`
Txs []Tx `bun:"rel:has-many" json:"-"`
Events []Event `bun:"rel:has-many" json:"-"`
Stats BlockStats `bun:"rel:has-one,join:height=height"`
Proposer Validator `bun:"rel:belongs-to"`
Proposer Validator `bun:"rel:belongs-to" json:"-"`
}

// TableName -
Expand Down
20 changes: 20 additions & 0 deletions internal/storage/types/msg_type_bitmask.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,3 +659,23 @@ func (mask MsgTypeBits) Value() (driver.Value, error) {
}
return fmt.Sprintf("%076b", mask.value), nil
}

func (mask MsgTypeBits) MarshalJSON() (data []byte, err error) {
if mask.value == nil {
data = []byte{'0'}
return
}
return json.Marshal(mask.value.String())
}

func (mask *MsgTypeBits) UnmarshalJSON(data []byte) error {
mask.Bits = NewEmptyBits()
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}
if _, ok := mask.value.SetString(s, 10); !ok {
return errors.Errorf("invalid big.Int: %s", s)
}
return nil
}
33 changes: 33 additions & 0 deletions internal/storage/types/msg_type_bitmask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,3 +729,36 @@ func TestMsgTypeBits_HasOne(t *testing.T) {
})
}
}

func TestMarshall(t *testing.T) {
tests := []struct {
name string
mask MsgTypeBits
}{
{
name: "test 1",
mask: NewMsgTypeBitMask(MsgBeginRedelegate),
}, {
name: "test 2",
mask: NewMsgTypeBitMask(MsgBeginRedelegate, MsgDelegate, MsgSend),
}, {
name: "test 3",
mask: NewMsgTypeBitMask(MsgAcknowledgement, MsgCancelUpgrade),
}, {
name: "test 4",
mask: NewMsgTypeBitMask(MsgChannelOpenInit, MsgConnectionOpenConfirm),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data, err := json.Marshal(tt.mask)
require.NoError(t, err)

var newMask MsgTypeBits
err = json.Unmarshal(data, &newMask)
require.NoError(t, err)
require.Equal(t, tt.mask, newMask)
})
}
}
42 changes: 21 additions & 21 deletions internal/storage/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,27 @@ type Validator struct {
bun.BaseModel `bun:"validator" comment:"Table with celestia validators."`

Id uint64 `bun:"id,pk,notnull,autoincrement" comment:"Unique internal identity"`
Delegator string `bun:"delegator,type:text" comment:"Delegator address"`
Address string `bun:"address,unique:address_validator,type:text" comment:"Validator address"`
ConsAddress string `bun:"cons_address" comment:"Consensus address"`

Moniker string `bun:"moniker,type:text" comment:"Human-readable name for the validator"`
Website string `bun:"website,type:text" comment:"Website link"`
Identity string `bun:"identity,type:text" comment:"Optional identity signature"`
Contacts string `bun:"contacts,type:text" comment:"Contacts"`
Details string `bun:"details,type:text" comment:"Detailed information about validator"`

Rate decimal.Decimal `bun:"rate,type:numeric" comment:"Commission rate charged to delegators, as a fraction"`
MaxRate decimal.Decimal `bun:"max_rate,type:numeric" comment:"Maximum commission rate which validator can ever charge, as a fraction"`
MaxChangeRate decimal.Decimal `bun:"max_change_rate,type:numeric" comment:"Maximum daily increase of the validator commission, as a fraction"`
MinSelfDelegation decimal.Decimal `bun:"min_self_delegation,type:numeric" comment:""`

Stake decimal.Decimal `bun:"stake,type:numeric" comment:"Validator's stake"`
Rewards decimal.Decimal `bun:"rewards,type:numeric" comment:"Validator's rewards"`
Commissions decimal.Decimal `bun:"commissions,type:numeric" comment:"Commissions"`
Height pkgTypes.Level `bun:"height" comment:"Height when validator was created"`

Jailed *bool `bun:"jailed" comment:"True if validator was punished"`
Delegator string `bun:"delegator,type:text" comment:"Delegator address" json:"-"`
Address string `bun:"address,unique:address_validator,type:text" comment:"Validator address" json:"-"`
ConsAddress string `bun:"cons_address" comment:"Consensus address" json:"-"`

Moniker string `bun:"moniker,type:text" comment:"Human-readable name for the validator" json:"-"`
Website string `bun:"website,type:text" comment:"Website link" json:"-"`
Identity string `bun:"identity,type:text" comment:"Optional identity signature" json:"-"`
Contacts string `bun:"contacts,type:text" comment:"Contacts" json:"-"`
Details string `bun:"details,type:text" comment:"Detailed information about validator" json:"-"`

Rate decimal.Decimal `bun:"rate,type:numeric" comment:"Commission rate charged to delegators, as a fraction" json:"-"`
MaxRate decimal.Decimal `bun:"max_rate,type:numeric" comment:"Maximum commission rate which validator can ever charge, as a fraction" json:"-"`
MaxChangeRate decimal.Decimal `bun:"max_change_rate,type:numeric" comment:"Maximum daily increase of the validator commission, as a fraction" json:"-"`
MinSelfDelegation decimal.Decimal `bun:"min_self_delegation,type:numeric" comment:"" json:"-"`

Stake decimal.Decimal `bun:"stake,type:numeric" comment:"Validator's stake" json:"-"`
Rewards decimal.Decimal `bun:"rewards,type:numeric" comment:"Validator's rewards" json:"-"`
Commissions decimal.Decimal `bun:"commissions,type:numeric" comment:"Commissions" json:"-"`
Height pkgTypes.Level `bun:"height" comment:"Height when validator was created" json:"-"`

Jailed *bool `bun:"jailed" comment:"True if validator was punished" json:"-"`
}

func (Validator) TableName() string {
Expand Down
8 changes: 5 additions & 3 deletions pkg/indexer/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package storage

import (
"context"
"strconv"
"time"

"github.com/celenium-io/celestia-indexer/pkg/indexer/config"
Expand Down Expand Up @@ -348,8 +347,11 @@ func (module *Module) notify(ctx context.Context, state storage.State, block sto
return err
}

blockId := strconv.FormatUint(block.Id, 10)
if err := module.notificator.Notify(ctx, storage.ChannelBlock, blockId); err != nil {
rawBlock, err := jsoniter.MarshalToString(block)
if err != nil {
return err
}
if err := module.notificator.Notify(ctx, storage.ChannelBlock, rawBlock); err != nil {
return err
}

Expand Down

0 comments on commit c2c3b67

Please sign in to comment.