Skip to content

Commit

Permalink
Apply fanout to block subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
gartnera committed Jan 8, 2025
1 parent 6fe0daa commit 370270b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 13 deletions.
5 changes: 5 additions & 0 deletions zetaclient/zetacore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

cometbftrpc "github.com/cometbft/cometbft/rpc/client"
cometbfthttp "github.com/cometbft/cometbft/rpc/client/http"
ctypes "github.com/cometbft/cometbft/types"
cosmosclient "github.com/cosmos/cosmos-sdk/client"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
"github.com/pkg/errors"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/zeta-chain/node/app"
"github.com/zeta-chain/node/pkg/authz"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/fanout"
zetacorerpc "github.com/zeta-chain/node/pkg/rpc"
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
"github.com/zeta-chain/node/zetaclient/config"
Expand Down Expand Up @@ -47,6 +49,9 @@ type Client struct {
chainID string
chain chains.Chain

// blocksFanout that receives new block events from Zetacore via websockets
blocksFanout *fanout.FanOut[ctypes.EventDataNewBlock]

mu sync.RWMutex
}

Expand Down
78 changes: 65 additions & 13 deletions zetaclient/zetacore/client_subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,85 @@ package zetacore
import (
"context"

cometbfttypes "github.com/cometbft/cometbft/types"
ctypes "github.com/cometbft/cometbft/types"

"github.com/zeta-chain/node/pkg/fanout"
)

// NewBlockSubscriber subscribes to cometbft new block events
func (c *Client) NewBlockSubscriber(ctx context.Context) (chan cometbfttypes.EventDataNewBlock, error) {
rawBlockEventChan, err := c.cometBFTClient.Subscribe(ctx, "", cometbfttypes.EventQueryNewBlock.String())
// NewBlockSubscriber subscribes to comet bft new block events.
// Subscribes share the same websocket connection but their channels are independent (fanout)
func (c *Client) NewBlockSubscriber(ctx context.Context) (chan ctypes.EventDataNewBlock, error) {
blockSubscriber, err := c.resolveBlockSubscriber()
if err != nil {
return nil, err
}

blockEventChan := make(chan cometbfttypes.EventDataNewBlock)
// we need a "proxy" chan instead of directly returning blockSubscriber.Add()
// to support context cancellation
blocksChan := make(chan ctypes.EventDataNewBlock)

go func() {
consumer := blockSubscriber.Add()

for {
select {
case <-ctx.Done():
return
case event := <-rawBlockEventChan:
newBlockEvent, ok := event.Data.(cometbfttypes.EventDataNewBlock)
if !ok {
c.logger.Error().Msgf("expecting new block event, got %T", event.Data)
continue
}
blockEventChan <- newBlockEvent
case block := <-consumer:
blocksChan <- block
}
}
}()

return blocksChan, nil
}

// resolveBlockSubscriber returns the block subscriber channel
// or subscribes to it for the first time.
func (c *Client) resolveBlockSubscriber() (*fanout.FanOut[ctypes.EventDataNewBlock], error) {
// noop
if blocksFanout, ok := c.getBlockFanoutChan(); ok {
c.logger.Info().Msg("Resolved existing block subscriber")
return blocksFanout, nil
}

Check warning on line 46 in zetaclient/zetacore/client_subscriptions.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client_subscriptions.go#L44-L46

Added lines #L44 - L46 were not covered by tests

// Subscribe to comet bft events
eventsChan, err := c.cometBFTClient.Subscribe(context.Background(), "", ctypes.EventQueryNewBlock.String())
if err != nil {
return nil, err
}

Check warning on line 52 in zetaclient/zetacore/client_subscriptions.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client_subscriptions.go#L51-L52

Added lines #L51 - L52 were not covered by tests

c.logger.Info().Msg("Subscribed to new block events")

// Create block chan
blockChan := make(chan ctypes.EventDataNewBlock)

// Spin up a pipeline to forward block events to the blockChan
go func() {
for event := range eventsChan {
newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock)
if !ok {
c.logger.Error().Msgf("expecting new block event, got %T", event.Data)
continue

Check warning on line 65 in zetaclient/zetacore/client_subscriptions.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client_subscriptions.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}

blockChan <- newBlockEvent
}
}()

return blockEventChan, nil
// Create a fanout
// It allows a "global" chan (i.e. blockChan) to stream to multiple consumers independently.
c.mu.Lock()
defer c.mu.Unlock()
c.blocksFanout = fanout.New[ctypes.EventDataNewBlock](blockChan, fanout.DefaultBuffer)

c.blocksFanout.Start()

return c.blocksFanout, nil
}

func (c *Client) getBlockFanoutChan() (*fanout.FanOut[ctypes.EventDataNewBlock], bool) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.blocksFanout, c.blocksFanout != nil
}

0 comments on commit 370270b

Please sign in to comment.