Skip to content

Commit

Permalink
fix: ensure block subscriptions do not conflict (v25) (#3340)
Browse files Browse the repository at this point in the history
* Implement pkg/fanout

* Apply fanout to block subscriber

---------

Co-authored-by: Dmitry S <[email protected]>
  • Loading branch information
gartnera and swift1337 authored Jan 9, 2025
1 parent 47c9444 commit 00624be
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 13 deletions.
66 changes: 66 additions & 0 deletions pkg/fanout/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Package fanout provides a fan-out pattern implementation.
// It allows one channel to stream data to multiple independent channels.
// Note that context handling is out of the scope of this package.
package fanout

import "sync"

const DefaultBuffer = 8

// FanOut is a fan-out pattern implementation.
// It is NOT a worker pool, so use it wisely.
type FanOut[T any] struct {
input <-chan T
outputs []chan T

// outputBuffer chan buffer size for outputs channels.
// This helps with writing to chan in case of slow consumers.
outputBuffer int

mu sync.RWMutex
}

// New constructs FanOut
func New[T any](source <-chan T, buf int) *FanOut[T] {
return &FanOut[T]{
input: source,
outputs: make([]chan T, 0),
outputBuffer: buf,
}
}

func (f *FanOut[T]) Add() <-chan T {
out := make(chan T, f.outputBuffer)

f.mu.Lock()
defer f.mu.Unlock()

f.outputs = append(f.outputs, out)

return out
}

// Start starts the fan-out process
func (f *FanOut[T]) Start() {
go func() {
// loop for new data
for data := range f.input {
f.mu.RLock()
for _, output := range f.outputs {
// note that this might spawn lots of goroutines.
// it is a naive approach, but should be more than enough for our use cases.
go func(output chan<- T) { output <- data }(output)
}
f.mu.RUnlock()
}

// at this point, the input was closed
f.mu.Lock()
defer f.mu.Unlock()
for _, out := range f.outputs {
close(out)
}

f.outputs = nil
}()
}
72 changes: 72 additions & 0 deletions pkg/fanout/fanout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package fanout

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestFanOut(t *testing.T) {
// ARRANGE
// Given an input
input := make(chan int)

// Given a fanout
f := New(input, DefaultBuffer)

// That has 3 outputs
out1 := f.Add()
out2 := f.Add()
out3 := f.Add()

// Given a wait group
wg := sync.WaitGroup{}
wg.Add(3)

// Given a sample number
var total int32

// Given a consumer
consumer := func(out <-chan int, name string, lag time.Duration) {
defer wg.Done()
var local int32
for i := range out {
// simulate some work
time.Sleep(lag)

local += int32(i)
t.Logf("%s: received %d", name, i)
}

// add only if input was closed
atomic.AddInt32(&total, local)
}

// ACT
f.Start()

// Write to the channel
go func() {
for i := 1; i <= 10; i++ {
input <- i
t.Logf("fan-out: sent %d", i)
time.Sleep(50 * time.Millisecond)
}

close(input)
}()

go consumer(out1, "out1: fast consumer", 10*time.Millisecond)
go consumer(out2, "out2: average consumer", 60*time.Millisecond)
go consumer(out3, "out3: slow consumer", 150*time.Millisecond)

wg.Wait()

// ASSERT
// Check that total is valid
// total == sum(1...10) * 3 = n(n+1)/2 * 3 = 55 * 3 = 165
require.Equal(t, int32(165), total)
}
5 changes: 5 additions & 0 deletions zetaclient/zetacore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

cometbftrpc "github.com/cometbft/cometbft/rpc/client"
cometbfthttp "github.com/cometbft/cometbft/rpc/client/http"
ctypes "github.com/cometbft/cometbft/types"
cosmosclient "github.com/cosmos/cosmos-sdk/client"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
"github.com/pkg/errors"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/zeta-chain/node/app"
"github.com/zeta-chain/node/pkg/authz"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/fanout"
zetacorerpc "github.com/zeta-chain/node/pkg/rpc"
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
"github.com/zeta-chain/node/zetaclient/config"
Expand Down Expand Up @@ -47,6 +49,9 @@ type Client struct {
chainID string
chain chains.Chain

// blocksFanout that receives new block events from Zetacore via websockets
blocksFanout *fanout.FanOut[ctypes.EventDataNewBlock]

mu sync.RWMutex
}

Expand Down
78 changes: 65 additions & 13 deletions zetaclient/zetacore/client_subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,85 @@ package zetacore
import (
"context"

cometbfttypes "github.com/cometbft/cometbft/types"
ctypes "github.com/cometbft/cometbft/types"

"github.com/zeta-chain/node/pkg/fanout"
)

// NewBlockSubscriber subscribes to cometbft new block events
func (c *Client) NewBlockSubscriber(ctx context.Context) (chan cometbfttypes.EventDataNewBlock, error) {
rawBlockEventChan, err := c.cometBFTClient.Subscribe(ctx, "", cometbfttypes.EventQueryNewBlock.String())
// NewBlockSubscriber subscribes to comet bft new block events.
// Subscribes share the same websocket connection but their channels are independent (fanout)
func (c *Client) NewBlockSubscriber(ctx context.Context) (chan ctypes.EventDataNewBlock, error) {
blockSubscriber, err := c.resolveBlockSubscriber()
if err != nil {
return nil, err
}

blockEventChan := make(chan cometbfttypes.EventDataNewBlock)
// we need a "proxy" chan instead of directly returning blockSubscriber.Add()
// to support context cancellation
blocksChan := make(chan ctypes.EventDataNewBlock)

go func() {
consumer := blockSubscriber.Add()

for {
select {
case <-ctx.Done():
return
case event := <-rawBlockEventChan:
newBlockEvent, ok := event.Data.(cometbfttypes.EventDataNewBlock)
if !ok {
c.logger.Error().Msgf("expecting new block event, got %T", event.Data)
continue
}
blockEventChan <- newBlockEvent
case block := <-consumer:
blocksChan <- block
}
}
}()

return blocksChan, nil
}

// resolveBlockSubscriber returns the block subscriber channel
// or subscribes to it for the first time.
func (c *Client) resolveBlockSubscriber() (*fanout.FanOut[ctypes.EventDataNewBlock], error) {
// noop
if blocksFanout, ok := c.getBlockFanoutChan(); ok {
c.logger.Info().Msg("Resolved existing block subscriber")
return blocksFanout, nil
}

// Subscribe to comet bft events
eventsChan, err := c.cometBFTClient.Subscribe(context.Background(), "", ctypes.EventQueryNewBlock.String())
if err != nil {
return nil, err
}

c.logger.Info().Msg("Subscribed to new block events")

// Create block chan
blockChan := make(chan ctypes.EventDataNewBlock)

// Spin up a pipeline to forward block events to the blockChan
go func() {
for event := range eventsChan {
newBlockEvent, ok := event.Data.(ctypes.EventDataNewBlock)
if !ok {
c.logger.Error().Msgf("expecting new block event, got %T", event.Data)
continue
}

blockChan <- newBlockEvent
}
}()

return blockEventChan, nil
// Create a fanout
// It allows a "global" chan (i.e. blockChan) to stream to multiple consumers independently.
c.mu.Lock()
defer c.mu.Unlock()
c.blocksFanout = fanout.New[ctypes.EventDataNewBlock](blockChan, fanout.DefaultBuffer)

c.blocksFanout.Start()

return c.blocksFanout, nil
}

func (c *Client) getBlockFanoutChan() (*fanout.FanOut[ctypes.EventDataNewBlock], bool) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.blocksFanout, c.blocksFanout != nil
}

0 comments on commit 00624be

Please sign in to comment.