Skip to content

Commit

Permalink
Node: Gossip Topic Split (#4000)
Browse files Browse the repository at this point in the history
* WIP: topic split

* Add cutover support

* Remove measurements that were moved to PR#3988

* Code review rework

* Code review rework

---------

Co-authored-by: Evan Gray <[email protected]>
  • Loading branch information
bruce-riley and evan-gray authored Aug 5, 2024
1 parent c2496cd commit d3533aa
Show file tree
Hide file tree
Showing 11 changed files with 900 additions and 333 deletions.
20 changes: 15 additions & 5 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ import (
)

const (
// gossipSendBufferSize configures the size of the gossip network send buffer
gossipSendBufferSize = 5000
// gossipControlSendBufferSize configures the size of the gossip network send buffer
gossipControlSendBufferSize = 100

// gossipAttestationSendBufferSize configures the size of the gossip network send buffer
gossipAttestationSendBufferSize = 5000

// gossipVaaSendBufferSize configures the size of the gossip network send buffer
gossipVaaSendBufferSize = 5000

// inboundObservationBufferSize configures the size of the obsvC channel that contains observations from other Guardians.
// One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
Expand Down Expand Up @@ -69,8 +75,10 @@ type G struct {
runnables map[string]supervisor.Runnable

// various channels
// Outbound gossip message queue (needs to be read/write because p2p needs read/write)
gossipSendC chan []byte
// Outbound gossip message queues (need to be read/write because p2p needs read/write)
gossipControlSendC chan []byte
gossipAttestationSendC chan []byte
gossipVaaSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Finalized guardian observations aggregated across all chains
Expand Down Expand Up @@ -109,7 +117,9 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
g.rootCtxCancel = rootCtxCancel

// Setup various channels...
g.gossipSendC = make(chan []byte, gossipSendBufferSize)
g.gossipControlSendC = make(chan []byte, gossipControlSendBufferSize)
g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize)
g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize)
g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
Expand Down
7 changes: 5 additions & 2 deletions node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers,
g.obsvC,
g.signedInC.writeC,
g.obsvReqC.writeC,
g.gossipSendC,
g.gossipControlSendC,
g.gossipAttestationSendC,
g.gossipVaaSendC,
g.obsvReqSendC.readC,
g.acct,
g.gov,
Expand Down Expand Up @@ -564,7 +566,8 @@ func GuardianOptionProcessor() *GuardianOption {
g.db,
g.msgC.readC,
g.setC.readC,
g.gossipSendC,
g.gossipAttestationSendC,
g.gossipVaaSendC,
g.obsvC,
g.obsvReqSendC.writeC,
g.signedInC.readC,
Expand Down
87 changes: 87 additions & 0 deletions node/pkg/p2p/gossip_cutover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package p2p

import (
"fmt"
"strings"
"sync/atomic"
"time"

"go.uber.org/zap"
)

// The format of this time is very picky. Please use the exact format specified by cutOverFmtStr!
const mainnetCutOverTimeStr = ""
const testnetCutOverTimeStr = ""
const devnetCutOverTimeStr = ""
const cutOverFmtStr = "2006-01-02T15:04:05-0700"

// gossipCutoverCompleteFlag indicates if the cutover time has passed, meaning we should publish only on the new topics.
var gossipCutoverCompleteFlag atomic.Bool

// GossipCutoverComplete returns true if the cutover time has passed, meaning we should publish on the new topic.
func GossipCutoverComplete() bool {
return gossipCutoverCompleteFlag.Load()
}

// evaluateCutOver determines if the gossip cutover time has passed yet and sets the global flag accordingly. If the time has
// not yet passed, it creates a go routine to wait for that time and then set the flag.
func evaluateGossipCutOver(logger *zap.Logger, networkID string) error {
cutOverTimeStr := getCutOverTimeStr(networkID)

sco, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, time.Now())
if err != nil {
return err
}

gossipCutoverCompleteFlag.Store(sco)
logger.Info("evaluated cutover flag", zap.Bool("cutOverFlag", GossipCutoverComplete()), zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco"))

if delay != time.Duration(0) {
// Wait for the cut over time and then update the flag.
go func() {
time.Sleep(delay)
logger.Info("time to cut over to new gossip topics", zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco"))
gossipCutoverCompleteFlag.Store(true)
}()
}

return nil
}

// evaluateGossipCutOverImpl performs the actual cut over check. It is a separate function for testing purposes.
func evaluateGossipCutOverImpl(logger *zap.Logger, cutOverTimeStr string, now time.Time) (bool, time.Duration, error) {
if cutOverTimeStr == "" {
return false, 0, nil
}

cutOverTime, err := time.Parse(cutOverFmtStr, cutOverTimeStr)
if err != nil {
return false, 0, fmt.Errorf(`failed to parse cut over time: %w`, err)
}

if cutOverTime.Before(now) {
logger.Info("cut over time has passed, should use new gossip topics", zap.String("cutOverTime", cutOverTime.Format(cutOverFmtStr)), zap.String("now", now.Format(cutOverFmtStr)), zap.String("component", "p2pco"))
return true, 0, nil
}

// If we get here, we need to wait for the cutover and then switch the global flag.
delay := cutOverTime.Sub(now)
logger.Info("still waiting for cut over time",
zap.Stringer("cutOverTime", cutOverTime),
zap.String("now", now.Format(cutOverFmtStr)),
zap.Stringer("delay", delay),
zap.String("component", "p2pco"))

return false, delay, nil
}

// getCutOverTimeStr returns the cut over time string based on the network ID passed in.
func getCutOverTimeStr(networkID string) string { //nolint:unparam
if strings.Contains(networkID, "/mainnet/") {
return mainnetCutOverTimeStr
}
if strings.Contains(networkID, "/testnet/") {
return testnetCutOverTimeStr
}
return devnetCutOverTimeStr
}
81 changes: 81 additions & 0 deletions node/pkg/p2p/gossip_cutover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package p2p

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestVerifyCutOverTime(t *testing.T) {
if mainnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, mainnetCutOverTimeStr)
require.NoError(t, err)
}
if testnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, testnetCutOverTimeStr)
require.NoError(t, err)
}
if devnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, devnetCutOverTimeStr)
require.NoError(t, err)
}
}

func TestGetCutOverTimeStr(t *testing.T) {
assert.Equal(t, mainnetCutOverTimeStr, getCutOverTimeStr("blah/blah/mainnet/blah"))
assert.Equal(t, testnetCutOverTimeStr, getCutOverTimeStr("blah/blah/testnet/blah"))
assert.Equal(t, devnetCutOverTimeStr, getCutOverTimeStr("blah/blah/devnet/blah"))
}

func TestCutOverDisabled(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := ""
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)

cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.False(t, cuttingOver)
assert.Equal(t, time.Duration(0), delay)
}

func TestCutOverInvalidTime(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := "Hello World"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)

_, _, err = evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.EqualError(t, err, `failed to parse cut over time: parsing time "Hello World" as "2006-01-02T15:04:05-0700": cannot parse "Hello World" as "2006"`)
}

func TestCutOverAlreadyHappened(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := "2023-10-06T18:18:00-0000"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)

cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.True(t, cuttingOver)
assert.Equal(t, time.Duration(0), delay)
}

func TestCutOverDelayRequired(t *testing.T) {
logger := zap.NewNop()

cutOverTimeStr := "2023-10-06T18:18:00-0000"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T17:18:00-0000")
require.NoError(t, err)

cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.False(t, cuttingOver)
assert.Equal(t, time.Duration(60*time.Minute), delay)
}
Loading

0 comments on commit d3533aa

Please sign in to comment.