diff --git a/zetaclient/zetacore/client.go b/zetaclient/zetacore/client.go index df5b6dbeb6..a883aca855 100644 --- a/zetaclient/zetacore/client.go +++ b/zetaclient/zetacore/client.go @@ -8,6 +8,7 @@ import ( cometbftrpc "github.com/cometbft/cometbft/rpc/client" cometbfthttp "github.com/cometbft/cometbft/rpc/client/http" + ctypes "github.com/cometbft/cometbft/types" cosmosclient "github.com/cosmos/cosmos-sdk/client" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" "github.com/pkg/errors" @@ -19,6 +20,7 @@ import ( "github.com/zeta-chain/node/app" "github.com/zeta-chain/node/pkg/authz" "github.com/zeta-chain/node/pkg/chains" + "github.com/zeta-chain/node/pkg/fanout" zetacorerpc "github.com/zeta-chain/node/pkg/rpc" "github.com/zeta-chain/node/zetaclient/chains/interfaces" "github.com/zeta-chain/node/zetaclient/config" @@ -47,6 +49,9 @@ type Client struct { chainID string chain chains.Chain + // blocksFanout that receives new block events from Zetacore via websockets + blocksFanout *fanout.FanOut[ctypes.EventDataNewBlock] + mu sync.RWMutex } diff --git a/zetaclient/zetacore/client_subscriptions.go b/zetaclient/zetacore/client_subscriptions.go index cb4229b31b..971b1edfa2 100644 --- a/zetaclient/zetacore/client_subscriptions.go +++ b/zetaclient/zetacore/client_subscriptions.go @@ -3,33 +3,85 @@ package zetacore import ( "context" - cometbfttypes "github.com/cometbft/cometbft/types" + ctypes "github.com/cometbft/cometbft/types" + + "github.com/zeta-chain/node/pkg/fanout" ) -// NewBlockSubscriber subscribes to cometbft new block events -func (c *Client) NewBlockSubscriber(ctx context.Context) (chan cometbfttypes.EventDataNewBlock, error) { - rawBlockEventChan, err := c.cometBFTClient.Subscribe(ctx, "", cometbfttypes.EventQueryNewBlock.String()) +// NewBlockSubscriber subscribes to comet bft new block events. +// Subscribes share the same websocket connection but their channels are independent (fanout) +func (c *Client) NewBlockSubscriber(ctx context.Context) (chan ctypes.EventDataNewBlock, error) { + blockSubscriber, err := c.resolveBlockSubscriber() if err != nil { return nil, err } - blockEventChan := make(chan cometbfttypes.EventDataNewBlock) + // we need a "proxy" chan instead of directly returning blockSubscriber.Add() + // to support context cancellation + blocksChan := make(chan ctypes.EventDataNewBlock) go func() { + consumer := blockSubscriber.Add() + for { select { case <-ctx.Done(): return - case event := <-rawBlockEventChan: - newBlockEvent, ok := event.Data.(cometbfttypes.EventDataNewBlock) - if !ok { - c.logger.Error().Msgf("expecting new block event, got %T", event.Data) - continue - } - blockEventChan <- newBlockEvent + case block := <-consumer: + blocksChan <- block + } + } + }() + + return blocksChan, nil +} + +// resolveBlockSubscriber returns the block subscriber channel +// or subscribes to it for the first time. +func (c *Client) resolveBlockSubscriber() (*fanout.FanOut[ctypes.EventDataNewBlock], error) { + // noop + if blocksFanout, ok := c.getBlockFanoutChan(); ok { + c.logger.Info().Msg("Resolved existing block subscriber") + return blocksFanout, nil + } + + // Subscribe to comet bft events + eventsChan, err := c.cometBFTClient.Subscribe(context.Background(), "", ctypes.EventQueryNewBlock.String()) + if err != nil { + return nil, err + } + + c.logger.Info().Msg("Subscribed to new block events") + + // Create block chan + blockChan := make(chan ctypes.EventDataNewBlock) + + // Spin up a pipeline to forward block events to the blockChan + go func() { + for event := range eventsChan { + newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock) + if !ok { + c.logger.Error().Msgf("expecting new block event, got %T", event.Data) + continue } + + blockChan <- newBlockEvent } }() - return blockEventChan, nil + // Create a fanout + // It allows a "global" chan (i.e. blockChan) to stream to multiple consumers independently. + c.mu.Lock() + defer c.mu.Unlock() + c.blocksFanout = fanout.New[ctypes.EventDataNewBlock](blockChan, fanout.DefaultBuffer) + + c.blocksFanout.Start() + + return c.blocksFanout, nil +} + +func (c *Client) getBlockFanoutChan() (*fanout.FanOut[ctypes.EventDataNewBlock], bool) { + c.mu.RLock() + defer c.mu.RUnlock() + return c.blocksFanout, c.blocksFanout != nil }