Skip to content

Commit

Permalink
fix(core): Do not propagate blocks if subscribed to blocks from incor…
Browse files Browse the repository at this point in the history
…rect chain (#3086)

Right now, it is possible for a **bridge** node that was initialised and
started on one chain (e.g. `mocha-4`) to be stopped and restarted on a
different chain (e.g. `mainnet`) and propagate headers from the old
chain (`mocha-4`) into the different network (`mainnet`).

This PR fixes this issue by causing the listener to fatal if the
listener recognises it is receiving blocks from a different chain to
that which it expects.

Error will look like this: 

```
2024-01-10T16:06:22.001+0100	ERROR	core	core/listener.go:175	listener: received block with unexpected chain ID: expected arabica-11, received mocha-4
2024-01-10T16:06:22.001+0100	INFO	core	core/listener.go:177	listener: listening stopped
2024-01-10T16:06:22.001+0100	FATAL	core	core/listener.go:126	listener: invalid subscription
```

Resolves #3071
  • Loading branch information
renaynay authored Jan 17, 2024
1 parent ad92edb commit 36205cc
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 10 deletions.
4 changes: 3 additions & 1 deletion core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

fetcher, _ := createCoreFetcher(t, DefaultTestConfig())
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)

// generate 10 blocks
generateBlocks(t, fetcher)
Expand Down
20 changes: 17 additions & 3 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
var (
tracer = otel.Tracer("core/listener")
retrySubscriptionDelay = 5 * time.Second

errInvalidSubscription = errors.New("invalid subscription")
)

// Listener is responsible for listening to Core for
Expand All @@ -41,11 +43,12 @@ type Listener struct {
headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn

listenerTimeout time.Duration

metrics *listenerMetrics

cancel context.CancelFunc
chainID string

listenerTimeout time.Duration
cancel context.CancelFunc
}

func NewListener(
Expand Down Expand Up @@ -81,6 +84,7 @@ func NewListener(
store: store,
listenerTimeout: 5 * blocktime,
metrics: metrics,
chainID: p.chainID,
}, nil
}

Expand Down Expand Up @@ -117,6 +121,10 @@ func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDat
// listener stopped because external context was canceled
return
}
if errors.Is(err, errInvalidSubscription) {
// stop node if there is a critical issue with the block subscription
log.Fatalf("listener: %v", err)
}

log.Warnw("listener: subscriber error, resubscribing...", "err", err)
sub = cl.resubscribe(ctx)
Expand Down Expand Up @@ -163,6 +171,12 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSigned
return errors.New("underlying subscription was closed")
}

if cl.chainID != "" && b.Header.ChainID != cl.chainID {
log.Errorf("listener: received block with unexpected chain ID: expected %s,"+
" received %s", cl.chainID, b.Header.ChainID)
return errInvalidSubscription
}

log.Debugw("listener: new block from core", "height", b.Header.Height)

err := cl.handleNewSignedBlock(ctx, b)
Expand Down
45 changes: 41 additions & 4 deletions core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ func TestListener(t *testing.T) {
t.Cleanup(subs.Cancel)

// create one block to store as Head in local store and then unsubscribe from block events
fetcher, _ := createCoreFetcher(t, DefaultTestConfig())
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)

eds := createEdsPubSub(ctx, t)

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t))
cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t), networkID)
err = cl.Start(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -80,6 +84,7 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {

// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, cctx := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

Expand All @@ -92,7 +97,7 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {
})

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, store)
cl := createListener(ctx, t, fetcher, ps0, eds, store, networkID)
err = cl.Start(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -124,6 +129,36 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {
require.Nil(t, cl.cancel)
}

func TestListenerWithWrongChainRPC(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

// create mocknet with two pubsub endpoints
ps0, _ := createMocknetWithTwoPubsubEndpoints(ctx, t)

// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

store := createStore(t)
err := store.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
err = store.Stop(ctx)
require.NoError(t, err)
})

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, store, "wrong-chain-rpc")
sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)

err = cl.listen(ctx, sub)
assert.ErrorIs(t, err, errInvalidSubscription)
}

func createMocknetWithTwoPubsubEndpoints(ctx context.Context, t *testing.T) (*pubsub.PubSub, *pubsub.PubSub) {
net, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
Expand Down Expand Up @@ -166,6 +201,7 @@ func createListener(
ps *pubsub.PubSub,
edsSub *shrexsub.PubSub,
store *eds.Store,
chainID string,
) *Listener {
p2pSub, err := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, p2p.WithSubscriberNetworkID(networkID))
require.NoError(t, err)
Expand All @@ -180,7 +216,8 @@ func createListener(
require.NoError(t, p2pSub.Stop(ctx))
})

listener, err := NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader, store, nodep2p.BlockTime)
listener, err := NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader,
store, nodep2p.BlockTime, WithChainID(nodep2p.Network(chainID)))
require.NoError(t, err)
return listener
}
Expand Down
10 changes: 10 additions & 0 deletions core/option.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package core

import "github.com/celestiaorg/celestia-node/nodebuilder/p2p"

type Option func(*params)

type params struct {
metrics bool

chainID string
}

// WithMetrics is a functional option that enables metrics
Expand All @@ -13,3 +17,9 @@ func WithMetrics() Option {
p.metrics = true
}
}

func WithChainID(id p2p.Network) Option {
return func(p *params) {
p.chainID = id.String()
}
}
4 changes: 3 additions & 1 deletion libs/utils/resetctx.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package utils

import "context"
import (
"context"
)

// ResetContextOnError returns a fresh context if the given context has an error.
func ResetContextOnError(ctx context.Context) context.Context {
Expand Down
3 changes: 2 additions & 1 deletion nodebuilder/core/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
pubsub *shrexsub.PubSub,
construct header.ConstructFn,
store *eds.Store,
chainID p2p.Network,
) (*core.Listener, error) {
var opts []core.Option
opts := []core.Option{core.WithChainID(chainID)}
if MetricsEnabled {
opts = append(opts, core.WithMetrics())
}
Expand Down
1 change: 1 addition & 0 deletions nodebuilder/tests/swamp/swamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func NewSwamp(t *testing.T, options ...Option) *Swamp {
// Now, we are making an assumption that consensus mechanism is already tested out
// so, we are not creating bridge nodes with each one containing its own core client
// instead we are assigning all created BNs to 1 Core from the swamp
ic.WithChainID("private")
cctx := core.StartTestNodeWithConfig(t, ic)
swp := &Swamp{
t: t,
Expand Down

0 comments on commit 36205cc

Please sign in to comment.