diff --git a/node/pkg/node/node.go b/node/pkg/node/node.go index d957db3218..b94e094a03 100644 --- a/node/pkg/node/node.go +++ b/node/pkg/node/node.go @@ -20,8 +20,14 @@ import ( ) const ( - // gossipSendBufferSize configures the size of the gossip network send buffer - gossipSendBufferSize = 5000 + // gossipControlSendBufferSize configures the size of the gossip network send buffer + gossipControlSendBufferSize = 100 + + // gossipAttestationSendBufferSize configures the size of the gossip network send buffer + gossipAttestationSendBufferSize = 5000 + + // gossipVaaSendBufferSize configures the size of the gossip network send buffer + gossipVaaSendBufferSize = 5000 // inboundObservationBufferSize configures the size of the obsvC channel that contains observations from other Guardians. // One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s @@ -69,8 +75,10 @@ type G struct { runnables map[string]supervisor.Runnable // various channels - // Outbound gossip message queue (needs to be read/write because p2p needs read/write) - gossipSendC chan []byte + // Outbound gossip message queues (need to be read/write because p2p needs read/write) + gossipControlSendC chan []byte + gossipAttestationSendC chan []byte + gossipVaaSendC chan []byte // Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations. obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation] // Finalized guardian observations aggregated across all chains @@ -109,7 +117,9 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) { g.rootCtxCancel = rootCtxCancel // Setup various channels... - g.gossipSendC = make(chan []byte, gossipSendBufferSize) + g.gossipControlSendC = make(chan []byte, gossipControlSendBufferSize) + g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize) + g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize) g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize) g.msgC = makeChannelPair[*common.MessagePublication](0) g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup. diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 7bd0d88be1..e71fc7b5af 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -67,7 +67,9 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers, g.obsvC, g.signedInC.writeC, g.obsvReqC.writeC, - g.gossipSendC, + g.gossipControlSendC, + g.gossipAttestationSendC, + g.gossipVaaSendC, g.obsvReqSendC.readC, g.acct, g.gov, @@ -564,7 +566,8 @@ func GuardianOptionProcessor() *GuardianOption { g.db, g.msgC.readC, g.setC.readC, - g.gossipSendC, + g.gossipAttestationSendC, + g.gossipVaaSendC, g.obsvC, g.obsvReqSendC.writeC, g.signedInC.readC, diff --git a/node/pkg/p2p/gossip_cutover.go b/node/pkg/p2p/gossip_cutover.go new file mode 100644 index 0000000000..8629644e84 --- /dev/null +++ b/node/pkg/p2p/gossip_cutover.go @@ -0,0 +1,87 @@ +package p2p + +import ( + "fmt" + "strings" + "sync/atomic" + "time" + + "go.uber.org/zap" +) + +// The format of this time is very picky. Please use the exact format specified by cutOverFmtStr! +const mainnetCutOverTimeStr = "" +const testnetCutOverTimeStr = "" +const devnetCutOverTimeStr = "" +const cutOverFmtStr = "2006-01-02T15:04:05-0700" + +// gossipCutoverCompleteFlag indicates if the cutover time has passed, meaning we should publish only on the new topics. +var gossipCutoverCompleteFlag atomic.Bool + +// GossipCutoverComplete returns true if the cutover time has passed, meaning we should publish on the new topic. +func GossipCutoverComplete() bool { + return gossipCutoverCompleteFlag.Load() +} + +// evaluateCutOver determines if the gossip cutover time has passed yet and sets the global flag accordingly. If the time has +// not yet passed, it creates a go routine to wait for that time and then set the flag. +func evaluateGossipCutOver(logger *zap.Logger, networkID string) error { + cutOverTimeStr := getCutOverTimeStr(networkID) + + sco, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, time.Now()) + if err != nil { + return err + } + + gossipCutoverCompleteFlag.Store(sco) + logger.Info("evaluated cutover flag", zap.Bool("cutOverFlag", GossipCutoverComplete()), zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco")) + + if delay != time.Duration(0) { + // Wait for the cut over time and then update the flag. + go func() { + time.Sleep(delay) + logger.Info("time to cut over to new gossip topics", zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco")) + gossipCutoverCompleteFlag.Store(true) + }() + } + + return nil +} + +// evaluateGossipCutOverImpl performs the actual cut over check. It is a separate function for testing purposes. +func evaluateGossipCutOverImpl(logger *zap.Logger, cutOverTimeStr string, now time.Time) (bool, time.Duration, error) { + if cutOverTimeStr == "" { + return false, 0, nil + } + + cutOverTime, err := time.Parse(cutOverFmtStr, cutOverTimeStr) + if err != nil { + return false, 0, fmt.Errorf(`failed to parse cut over time: %w`, err) + } + + if cutOverTime.Before(now) { + logger.Info("cut over time has passed, should use new gossip topics", zap.String("cutOverTime", cutOverTime.Format(cutOverFmtStr)), zap.String("now", now.Format(cutOverFmtStr)), zap.String("component", "p2pco")) + return true, 0, nil + } + + // If we get here, we need to wait for the cutover and then switch the global flag. + delay := cutOverTime.Sub(now) + logger.Info("still waiting for cut over time", + zap.Stringer("cutOverTime", cutOverTime), + zap.String("now", now.Format(cutOverFmtStr)), + zap.Stringer("delay", delay), + zap.String("component", "p2pco")) + + return false, delay, nil +} + +// getCutOverTimeStr returns the cut over time string based on the network ID passed in. +func getCutOverTimeStr(networkID string) string { //nolint:unparam + if strings.Contains(networkID, "/mainnet/") { + return mainnetCutOverTimeStr + } + if strings.Contains(networkID, "/testnet/") { + return testnetCutOverTimeStr + } + return devnetCutOverTimeStr +} diff --git a/node/pkg/p2p/gossip_cutover_test.go b/node/pkg/p2p/gossip_cutover_test.go new file mode 100644 index 0000000000..f398985b08 --- /dev/null +++ b/node/pkg/p2p/gossip_cutover_test.go @@ -0,0 +1,81 @@ +package p2p + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestVerifyCutOverTime(t *testing.T) { + if mainnetCutOverTimeStr != "" { + _, err := time.Parse(cutOverFmtStr, mainnetCutOverTimeStr) + require.NoError(t, err) + } + if testnetCutOverTimeStr != "" { + _, err := time.Parse(cutOverFmtStr, testnetCutOverTimeStr) + require.NoError(t, err) + } + if devnetCutOverTimeStr != "" { + _, err := time.Parse(cutOverFmtStr, devnetCutOverTimeStr) + require.NoError(t, err) + } +} + +func TestGetCutOverTimeStr(t *testing.T) { + assert.Equal(t, mainnetCutOverTimeStr, getCutOverTimeStr("blah/blah/mainnet/blah")) + assert.Equal(t, testnetCutOverTimeStr, getCutOverTimeStr("blah/blah/testnet/blah")) + assert.Equal(t, devnetCutOverTimeStr, getCutOverTimeStr("blah/blah/devnet/blah")) +} + +func TestCutOverDisabled(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000") + require.NoError(t, err) + + cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.NoError(t, err) + assert.False(t, cuttingOver) + assert.Equal(t, time.Duration(0), delay) +} + +func TestCutOverInvalidTime(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "Hello World" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000") + require.NoError(t, err) + + _, _, err = evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.EqualError(t, err, `failed to parse cut over time: parsing time "Hello World" as "2006-01-02T15:04:05-0700": cannot parse "Hello World" as "2006"`) +} + +func TestCutOverAlreadyHappened(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "2023-10-06T18:18:00-0000" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000") + require.NoError(t, err) + + cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.NoError(t, err) + assert.True(t, cuttingOver) + assert.Equal(t, time.Duration(0), delay) +} + +func TestCutOverDelayRequired(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "2023-10-06T18:18:00-0000" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T17:18:00-0000") + require.NoError(t, err) + + cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.NoError(t, err) + assert.False(t, cuttingOver) + assert.Equal(t, time.Duration(60*time.Minute), delay) +} diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index ed353938b3..bf3cdfe3a2 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -54,11 +54,11 @@ var ( Name: "wormhole_p2p_heartbeats_sent_total", Help: "Total number of p2p heartbeats sent", }) - p2pMessagesSent = promauto.NewCounter( + p2pMessagesSent = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "wormhole_p2p_broadcast_messages_sent_total", Help: "Total number of p2p pubsub broadcast messages sent", - }) + }, []string{"type"}) p2pMessagesReceived = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "wormhole_p2p_broadcast_messages_received_total", @@ -110,6 +110,7 @@ type Components struct { // is only accessed by a single routine at any given time in a running Guardian. ProtectedHostByGuardianKeyLock sync.Mutex // WarnChannelOverflow: If true, errors due to overflowing channels will produce logger.Warn + // WARNING: This should not be enabled in production. It is only used in node tests to watch for overflows. WarnChannelOverflow bool // SignedHeartbeatLogLevel is the log level at which SignedHeartbeatReceived events will be logged. SignedHeartbeatLogLevel zapcore.Level @@ -301,12 +302,25 @@ func Run(params *RunParams) func(ctx context.Context) error { } return func(ctx context.Context) error { + p2pMessagesSent.WithLabelValues("control").Add(0) + p2pMessagesSent.WithLabelValues("attestation").Add(0) + p2pMessagesSent.WithLabelValues("vaa").Add(0) p2pReceiveChannelOverflow.WithLabelValues("observation").Add(0) p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Add(0) p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Add(0) logger := supervisor.Logger(ctx) + // Evaluate the gossip cutover time. If it has passed, then the flag will be set to make us publish on the new topics. + // If not, a routine will be started to wait for that time before starting to publish on the new topics. + cutoverErr := evaluateGossipCutOver(logger, params.networkID) + if cutoverErr != nil { + panic(cutoverErr) + } + + // If the cutover has not happened yet, we need to join and subscribe to the VAA topic because it is also the old topic. + needOldTopic := !GossipCutoverComplete() + defer func() { // TODO: Right now we're canceling the root context because it used to be the case that libp2p cannot be cleanly restarted. // But that seems to no longer be the case. We may want to revisit this. See (https://github.com/libp2p/go-libp2p/issues/992) for background. @@ -330,8 +344,6 @@ func Run(params *RunParams) func(ctx context.Context) error { panic(err) } - topic := fmt.Sprintf("%s/%s", params.networkID, "broadcast") - bootstrappers, bootstrapNode := BootstrapAddrs(logger, params.bootstrapPeers, h.ID()) if bootstrapNode { @@ -342,7 +354,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } } - logger.Info("Subscribing pubsub topic", zap.String("topic", topic)) + logger.Info("connecting to pubsub") ourTracer := &traceHandler{} ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithValidateQueueSize(P2P_VALIDATE_QUEUE_SIZE), @@ -355,24 +367,84 @@ func Run(params *RunParams) func(ctx context.Context) error { panic(err) } - th, err := ps.Join(topic) - if err != nil { - return fmt.Errorf("failed to join topic: %w", err) + // These will only be non-nil if the application plans to listen for or publish to that topic. + var controlPubsubTopic, attestationPubsubTopic, vaaPubsubTopic *pubsub.Topic + var controlSubscription, attestationSubscription, vaaSubscription *pubsub.Subscription + + // Set up the control channel. //////////////////////////////////////////////////////////////////// + if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil { + controlTopic := fmt.Sprintf("%s/%s", params.networkID, "control") + logger.Info("joining the control topic", zap.String("topic", controlTopic)) + controlPubsubTopic, err = ps.Join(controlTopic) + if err != nil { + return fmt.Errorf("failed to join the control topic: %w", err) + } + + defer func() { + if err := controlPubsubTopic.Close(); err != nil && !errors.Is(err, context.Canceled) { + logger.Error("Error closing the control topic", zap.Error(err)) + } + }() + + if params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil { + logger.Info("subscribing to the control topic", zap.String("topic", controlTopic)) + controlSubscription, err = controlPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) + if err != nil { + return fmt.Errorf("failed to subscribe to the control topic: %w", err) + } + defer controlSubscription.Cancel() + } } - defer func() { - if err := th.Close(); err != nil && !errors.Is(err, context.Canceled) { - logger.Error("Error closing the topic", zap.Error(err)) + // Set up the attestation channel. //////////////////////////////////////////////////////////////////// + if params.gossipAttestationSendC != nil || params.obsvRecvC != nil { + attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation") + logger.Info("joining the attestation topic", zap.String("topic", attestationTopic)) + attestationPubsubTopic, err = ps.Join(attestationTopic) + if err != nil { + return fmt.Errorf("failed to join the attestation topic: %w", err) } - }() - // Increase the buffer size to prevent failed delivery - // to slower subscribers - sub, err := th.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) - if err != nil { - return fmt.Errorf("failed to subscribe topic: %w", err) + defer func() { + if err := attestationPubsubTopic.Close(); err != nil && !errors.Is(err, context.Canceled) { + logger.Error("Error closing the attestation topic", zap.Error(err)) + } + }() + + if params.obsvRecvC != nil { + logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic)) + attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) + if err != nil { + return fmt.Errorf("failed to subscribe to the attestation topic: %w", err) + } + defer attestationSubscription.Cancel() + } + } + + // Set up the VAA channel. //////////////////////////////////////////////////////////////////// + if params.gossipVaaSendC != nil || params.signedIncomingVaaRecvC != nil || needOldTopic { + vaaTopic := fmt.Sprintf("%s/%s", params.networkID, "broadcast") + logger.Info("joining the vaa topic", zap.String("topic", vaaTopic)) + vaaPubsubTopic, err = ps.Join(vaaTopic) + if err != nil { + return fmt.Errorf("failed to join the vaa topic: %w", err) + } + + defer func() { + if err := vaaPubsubTopic.Close(); err != nil && !errors.Is(err, context.Canceled) { + logger.Error("Error closing the vaa topic", zap.Error(err)) + } + }() + + if params.signedIncomingVaaRecvC != nil || needOldTopic { + logger.Info("subscribing to the vaa topic", zap.String("topic", vaaTopic)) + vaaSubscription, err = vaaPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) + if err != nil { + return fmt.Errorf("failed to subscribe to the vaa topic: %w", err) + } + defer vaaSubscription.Cancel() + } } - defer sub.Cancel() // Make sure we connect to at least 1 bootstrap node (this is particularly important in a local devnet and CI // as peer discovery can take a long time). @@ -426,119 +498,169 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - go func() { - // Disable heartbeat when no node name is provided (spy mode) - if params.nodeName == "" { - return - } - ourAddr := ethcrypto.PubkeyToAddress(params.gk.PublicKey) + // Start up heartbeating if it is enabled. + if params.nodeName != "" { + go func() { + ourAddr := ethcrypto.PubkeyToAddress(params.gk.PublicKey) - ctr := int64(0) - // Guardians should send out their first heartbeat immediately to speed up test runs. - // But we also want to wait a little bit such that network connections can be established by then. - timer := time.NewTimer(time.Second * 2) - defer timer.Stop() + ctr := int64(0) + // Guardians should send out their first heartbeat immediately to speed up test runs. + // But we also want to wait a little bit such that network connections can be established by then. + timer := time.NewTimer(time.Second * 2) + defer timer.Stop() - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - timer.Reset(15 * time.Second) - - // create a heartbeat - b := func() []byte { - DefaultRegistry.mu.Lock() - defer DefaultRegistry.mu.Unlock() - networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats)) - for _, v := range DefaultRegistry.networkStats { - errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id)) - v.ErrorCount = errCtr - networks = append(networks, v) - } + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + timer.Reset(15 * time.Second) + + // create a heartbeat + b := func() []byte { + DefaultRegistry.mu.Lock() + defer DefaultRegistry.mu.Unlock() + networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats)) + for _, v := range DefaultRegistry.networkStats { + errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id)) + v.ErrorCount = errCtr + networks = append(networks, v) + } - features := make([]string, 0) - if params.gov != nil { - if params.gov.IsFlowCancelEnabled() { - features = append(features, "governor:fc") - } else { - features = append(features, "governor") + features := make([]string, 0) + if params.gov != nil { + if params.gov.IsFlowCancelEnabled() { + features = append(features, "governor:fc") + } else { + features = append(features, "governor") + } } - } - if params.acct != nil { - features = append(features, params.acct.FeatureString()) - } - if params.ibcFeaturesFunc != nil { - ibcFlags := params.ibcFeaturesFunc() - if ibcFlags != "" { - features = append(features, ibcFlags) + if params.acct != nil { + features = append(features, params.acct.FeatureString()) + } + if params.ibcFeaturesFunc != nil { + ibcFlags := params.ibcFeaturesFunc() + if ibcFlags != "" { + features = append(features, ibcFlags) + } + } + if params.gatewayRelayerEnabled { + features = append(features, "gwrelayer") + } + if params.ccqEnabled { + features = append(features, "ccq") } - } - if params.gatewayRelayerEnabled { - features = append(features, "gwrelayer") - } - if params.ccqEnabled { - features = append(features, "ccq") - } - heartbeat := &gossipv1.Heartbeat{ - NodeName: params.nodeName, - Counter: ctr, - Timestamp: time.Now().UnixNano(), - Networks: networks, - Version: version.Version(), - GuardianAddr: ourAddr.String(), - BootTimestamp: bootTime.UnixNano(), - Features: features, - } + heartbeat := &gossipv1.Heartbeat{ + NodeName: params.nodeName, + Counter: ctr, + Timestamp: time.Now().UnixNano(), + Networks: networks, + Version: version.Version(), + GuardianAddr: ourAddr.String(), + BootTimestamp: bootTime.UnixNano(), + Features: features, + } - if params.components.P2PIDInHeartbeat { - heartbeat.P2PNodeId = nodeIdBytes - } + if params.components.P2PIDInHeartbeat { + heartbeat.P2PNodeId = nodeIdBytes + } - if err := params.gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil { - panic(err) - } - collectNodeMetrics(ourAddr, h.ID(), heartbeat) + if err := params.gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil { + panic(err) + } + collectNodeMetrics(ourAddr, h.ID(), heartbeat) - if params.gov != nil { - params.gov.CollectMetrics(heartbeat, params.gossipSendC, params.gk, ourAddr) - } + if params.gov != nil { + params.gov.CollectMetrics(heartbeat, params.gossipControlSendC, params.gk, ourAddr) + } - msg := gossipv1.GossipMessage{ - Message: &gossipv1.GossipMessage_SignedHeartbeat{ - SignedHeartbeat: createSignedHeartbeat(params.gk, heartbeat), - }, - } + msg := gossipv1.GossipMessage{ + Message: &gossipv1.GossipMessage_SignedHeartbeat{ + SignedHeartbeat: createSignedHeartbeat(params.gk, heartbeat), + }, + } - b, err := proto.Marshal(&msg) - if err != nil { - panic(err) + b, err := proto.Marshal(&msg) + if err != nil { + panic(err) + } + return b + }() + + if GossipCutoverComplete() { + if controlPubsubTopic == nil { + panic("controlPubsubTopic should not be nil when nodeName is set") + } + err = controlPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Warn("failed to publish heartbeat message", zap.Error(err)) + } + } else if vaaPubsubTopic != nil { + err = vaaPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Warn("failed to publish heartbeat message to old topic", zap.Error(err)) + } } - return b - }() - err = th.Publish(ctx, b) - if err != nil { - logger.Warn("failed to publish heartbeat message", zap.Error(err)) + p2pHeartbeatsSent.Inc() + ctr += 1 } - - p2pHeartbeatsSent.Inc() - ctr += 1 } - } - }() + }() + } + // This routine processes messages received from the internal channels and publishes them to gossip. /////////////////// + // NOTE: The go specification says that it is safe to receive on a nil channel, it just blocks forever. go func() { for { select { case <-ctx.Done(): return - case msg := <-params.gossipSendC: - err := th.Publish(ctx, msg) - p2pMessagesSent.Inc() + case msg := <-params.gossipControlSendC: + if GossipCutoverComplete() { + if controlPubsubTopic == nil { + panic("controlPubsubTopic should not be nil when gossipControlSendC is set") + } + err := controlPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish message from control queue", zap.Error(err)) + } + } else if vaaPubsubTopic != nil { + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Error("failed to publish message from control queue to old topic", zap.Error(err)) + } + } + case msg := <-params.gossipAttestationSendC: + if GossipCutoverComplete() { + if attestationPubsubTopic == nil { + panic("attestationPubsubTopic should not be nil when gossipAttestationSendC is set") + } + err := attestationPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("attestation").Inc() + if err != nil { + logger.Error("failed to publish message from attestation queue", zap.Error(err)) + } + } else if vaaPubsubTopic != nil { + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("old_attestation").Inc() + if err != nil { + logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err)) + } + } + case msg := <-params.gossipVaaSendC: + if vaaPubsubTopic == nil { + panic("vaaPubsubTopic should not be nil when gossipVaaSendC is set") + } + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("vaa").Inc() if err != nil { - logger.Error("failed to publish message from queue", zap.Error(err)) + logger.Error("failed to publish message from vaa queue", zap.Error(err)) } case msg := <-params.obsvReqSendC: b, err := proto.Marshal(msg) @@ -569,193 +691,429 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Send to local observation request queue (the loopback message is ignored) - if params.obsvReqC != nil { - params.obsvReqC <- msg + if params.obsvReqRecvC != nil { + params.obsvReqRecvC <- msg } - err = th.Publish(ctx, b) - p2pMessagesSent.Inc() - if err != nil { - logger.Error("failed to publish observation request", zap.Error(err)) - } else { - logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) + if GossipCutoverComplete() { + if controlPubsubTopic == nil { + panic("controlPubsubTopic should not be nil when obsvReqSendC is set") + } + err = controlPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish observation request", zap.Error(err)) + } else { + logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) + } + } else if vaaPubsubTopic != nil { + err = vaaPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Error("failed to publish observation request to old topic", zap.Error(err)) + } } } } }() - for { - envelope, err := sub.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled - if err != nil { - return fmt.Errorf("failed to receive pubsub message: %w", err) - } + errC := make(chan error) - var msg gossipv1.GossipMessage - err = proto.Unmarshal(envelope.Data, &msg) - if err != nil { - logger.Info("received invalid message", - zap.Binary("data", envelope.Data), - zap.String("from", envelope.GetFrom().String())) - p2pMessagesReceived.WithLabelValues("invalid").Inc() - continue - } + // This routine processes control messages received from gossip. ////////////////////////////////////////////// + if controlSubscription != nil { + go func() { + for { + envelope, err := controlSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled + if err != nil { + errC <- fmt.Errorf("failed to receive pubsub message on control topic: %w", err) + return + } - if envelope.GetFrom() == h.ID() { - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) - } - p2pMessagesReceived.WithLabelValues("loopback").Inc() - continue - } + var msg gossipv1.GossipMessage + err = proto.Unmarshal(envelope.Data, &msg) + if err != nil { + logger.Info("received invalid message on control topic", + zap.Binary("data", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + p2pMessagesReceived.WithLabelValues("invalid").Inc() + continue + } - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message", - zap.Any("payload", msg.Message), - zap.Binary("raw", envelope.Data), - zap.String("from", envelope.GetFrom().String())) - } + if envelope.GetFrom() == h.ID() { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("received message from ourselves on control topic, ignoring", zap.Any("payload", msg.Message)) + } + p2pMessagesReceived.WithLabelValues("loopback").Inc() + continue + } - switch m := msg.Message.(type) { - case *gossipv1.GossipMessage_SignedHeartbeat: - s := m.SignedHeartbeat - gs := params.gst.Get() - if gs == nil { - // No valid guardian set yet - dropping heartbeat - logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", - zap.Any("value", s), - zap.String("from", envelope.GetFrom().String())) - break - } - if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil { - p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() - logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", - zap.Error(err), - zap.Any("payload", msg.Message), - zap.Any("value", s), - zap.Binary("raw", envelope.Data), - zap.String("from", envelope.GetFrom().String())) - } else { - p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() - logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received", - zap.Any("value", heartbeat), - zap.String("from", envelope.GetFrom().String())) - - func() { - if len(heartbeat.P2PNodeId) != 0 { - params.components.ProtectedHostByGuardianKeyLock.Lock() - defer params.components.ProtectedHostByGuardianKeyLock.Unlock() - var peerId peer.ID - if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { - logger.Error("p2p_node_id_in_heartbeat_invalid", - zap.Any("payload", msg.Message), - zap.Any("value", s), - zap.Binary("raw", envelope.Data), - zap.String("from", envelope.GetFrom().String())) - } else { - guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) - if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) { - prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr] - if ok { - if prevPeerId != peerId { - logger.Info("p2p_guardian_peer_changed", - zap.String("guardian_addr", guardianAddr.String()), - zap.String("prevPeerId", prevPeerId.String()), - zap.String("newPeerId", peerId.String()), - ) - params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat") - params.components.ConnMgr.Protect(peerId, "heartbeat") - params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId - } + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("received message on control topic", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } + + switch m := msg.Message.(type) { + case *gossipv1.GossipMessage_SignedHeartbeat: + s := m.SignedHeartbeat + gs := params.gst.Get() + if gs == nil { + // No valid guardian set yet - dropping heartbeat + logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", + zap.Any("value", s), + zap.String("from", envelope.GetFrom().String())) + break + } + if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil { + p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() + logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", + zap.Error(err), + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } else { + p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() + logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received", + zap.Any("value", heartbeat), + zap.String("from", envelope.GetFrom().String())) + + func() { + if len(heartbeat.P2PNodeId) != 0 { + params.components.ProtectedHostByGuardianKeyLock.Lock() + defer params.components.ProtectedHostByGuardianKeyLock.Unlock() + var peerId peer.ID + if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { + logger.Error("p2p_node_id_in_heartbeat_invalid", + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) } else { - params.components.ConnMgr.Protect(peerId, "heartbeat") - params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) + if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) { + prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr] + if ok { + if prevPeerId != peerId { + logger.Info("p2p_guardian_peer_changed", + zap.String("guardian_addr", guardianAddr.String()), + zap.String("prevPeerId", prevPeerId.String()), + zap.String("newPeerId", peerId.String()), + ) + params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat") + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + } + } else { + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + } + } + } + } else { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName)) } } + }() + } + case *gossipv1.GossipMessage_SignedObservationRequest: + if params.obsvReqRecvC != nil { + s := m.SignedObservationRequest + gs := params.gst.Get() + if gs == nil { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) + } + break } - } else { - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName)) + r, err := processSignedObservationRequest(s, gs) + if err != nil { + p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("invalid signed observation request received", + zap.Error(err), + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } + } else { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String())) + } + + select { + case params.obsvReqRecvC <- r: + p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() + default: + p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() + } } } - }() - } - case *gossipv1.GossipMessage_SignedObservation: - if params.obsvC != nil { - if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, params.obsvC); err == nil { - p2pMessagesReceived.WithLabelValues("observation").Inc() - } else { - if params.components.WarnChannelOverflow { - logger.Warn("Ignoring SignedObservation because obsvC full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) + case *gossipv1.GossipMessage_SignedChainGovernorConfig: + if params.signedGovCfgRecvC != nil { + params.signedGovCfgRecvC <- m.SignedChainGovernorConfig + } + case *gossipv1.GossipMessage_SignedChainGovernorStatus: + if params.signedGovStatusRecvC != nil { + params.signedGovStatusRecvC <- m.SignedChainGovernorStatus } - p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() + default: + p2pMessagesReceived.WithLabelValues("unknown").Inc() + logger.Warn("received unknown message type on control topic (running outdated software?)", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) } } - case *gossipv1.GossipMessage_SignedVaaWithQuorum: - if params.signedInC != nil { - select { - case params.signedInC <- m.SignedVaaWithQuorum: - p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() - default: - if params.components.WarnChannelOverflow { - // TODO do not log this in production - var hexStr string - if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil { - hexStr = vaa.HexDigest() + }() + } + + // This routine processes attestation messages received from gossip. ////////////////////////////////////////////// + if attestationSubscription != nil { + go func() { + for { + envelope, err := attestationSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled + if err != nil { + errC <- fmt.Errorf("failed to receive pubsub message on attestation topic: %w", err) + return + } + + var msg gossipv1.GossipMessage + err = proto.Unmarshal(envelope.Data, &msg) + if err != nil { + logger.Info("received invalid message on attestation topic", + zap.Binary("data", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + p2pMessagesReceived.WithLabelValues("invalid").Inc() + continue + } + + if envelope.GetFrom() == h.ID() { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("received message from ourselves on attestation topic, ignoring", zap.Any("payload", msg.Message)) + } + p2pMessagesReceived.WithLabelValues("loopback").Inc() + continue + } + + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("received message on attestation topic", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } + + switch m := msg.Message.(type) { + case *gossipv1.GossipMessage_SignedObservation: + if params.obsvRecvC != nil { + if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil { + p2pMessagesReceived.WithLabelValues("observation").Inc() + } else { + if params.components.WarnChannelOverflow { + logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr))) + } + p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() } - logger.Warn("Ignoring SignedVaaWithQuorum because signedInC full", zap.String("hash", hexStr)) } - p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc() + default: + p2pMessagesReceived.WithLabelValues("unknown").Inc() + logger.Warn("received unknown message type on attestation topic (running outdated software?)", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) } } - case *gossipv1.GossipMessage_SignedObservationRequest: - if params.obsvReqC != nil { - s := m.SignedObservationRequest - gs := params.gst.Get() - if gs == nil { - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) - } - break + }() + } + + // This routine processes signed VAA messages received from gossip. ////////////////////////////////////////////// + if vaaSubscription != nil { + go func() { + for { + envelope, err := vaaSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled + if err != nil { + errC <- fmt.Errorf("failed to receive pubsub message on vaa topic: %w", err) + return } - r, err := processSignedObservationRequest(s, gs) + + var msg gossipv1.GossipMessage + err = proto.Unmarshal(envelope.Data, &msg) if err != nil { - p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() + logger.Info("received invalid message on vaa topic", + zap.Binary("data", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + p2pMessagesReceived.WithLabelValues("invalid").Inc() + continue + } + + if envelope.GetFrom() == h.ID() { if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("invalid signed observation request received", + logger.Debug("received message from ourselves on vaa topic, ignoring", zap.Any("payload", msg.Message)) + } + p2pMessagesReceived.WithLabelValues("loopback").Inc() + continue + } + + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("received message on vaa topic", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } + + switch m := msg.Message.(type) { + case *gossipv1.GossipMessage_SignedHeartbeat: // TODO: Get rid of this after the cutover. + s := m.SignedHeartbeat + gs := params.gst.Get() + if gs == nil { + // No valid guardian set yet - dropping heartbeat + logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", + zap.Any("value", s), + zap.String("from", envelope.GetFrom().String())) + break + } + if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil { + p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() + logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", zap.Error(err), zap.Any("payload", msg.Message), zap.Any("value", s), zap.Binary("raw", envelope.Data), zap.String("from", envelope.GetFrom().String())) + } else { + p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() + logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received", + zap.Any("value", heartbeat), + zap.String("from", envelope.GetFrom().String())) + + func() { + if len(heartbeat.P2PNodeId) != 0 { + params.components.ProtectedHostByGuardianKeyLock.Lock() + defer params.components.ProtectedHostByGuardianKeyLock.Unlock() + var peerId peer.ID + if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { + logger.Error("p2p_node_id_in_heartbeat_invalid", + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } else { + guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) + if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) { + prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr] + if ok { + if prevPeerId != peerId { + logger.Info("p2p_guardian_peer_changed", + zap.String("guardian_addr", guardianAddr.String()), + zap.String("prevPeerId", prevPeerId.String()), + zap.String("newPeerId", peerId.String()), + ) + params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat") + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + } + } else { + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + } + } + } + } else { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName)) + } + } + }() } - } else { - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String())) + case *gossipv1.GossipMessage_SignedObservation: // TODO: Get rid of this after the cutover. + if params.obsvRecvC != nil { + if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil { + p2pMessagesReceived.WithLabelValues("observation").Inc() + } else { + if params.components.WarnChannelOverflow { + logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) + } + p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() + } } + case *gossipv1.GossipMessage_SignedVaaWithQuorum: + if params.signedIncomingVaaRecvC != nil { + select { + case params.signedIncomingVaaRecvC <- m.SignedVaaWithQuorum: + p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() + default: + if params.components.WarnChannelOverflow { + var hexStr string + if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil { + hexStr = vaa.HexDigest() + } + logger.Warn("Ignoring SignedVaaWithQuorum because signedIncomingVaaRecvC full", zap.String("hash", hexStr)) + } + p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc() + } + } + case *gossipv1.GossipMessage_SignedObservationRequest: // TODO: Get rid of this after the cutover. + if params.obsvReqRecvC != nil { + s := m.SignedObservationRequest + gs := params.gst.Get() + if gs == nil { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) + } + break + } + r, err := processSignedObservationRequest(s, gs) + if err != nil { + p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("invalid signed observation request received", + zap.Error(err), + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } + } else { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String())) + } - select { - case params.obsvReqC <- r: - p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() - default: - p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() + select { + case params.obsvReqRecvC <- r: + p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() + default: + p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() + } + } } + case *gossipv1.GossipMessage_SignedChainGovernorConfig: // TODO: Get rid of this after the cutover. + if params.signedGovCfgRecvC != nil { + params.signedGovCfgRecvC <- m.SignedChainGovernorConfig + } + case *gossipv1.GossipMessage_SignedChainGovernorStatus: // TODO: Get rid of this after the cutover. + if params.signedGovStatusRecvC != nil { + params.signedGovStatusRecvC <- m.SignedChainGovernorStatus + } + default: + p2pMessagesReceived.WithLabelValues("unknown").Inc() + logger.Warn("received unknown message type on vaa topic (running outdated software?)", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) } } - case *gossipv1.GossipMessage_SignedChainGovernorConfig: - if params.signedGovCfg != nil { - params.signedGovCfg <- m.SignedChainGovernorConfig - } - case *gossipv1.GossipMessage_SignedChainGovernorStatus: - if params.signedGovSt != nil { - params.signedGovSt <- m.SignedChainGovernorStatus - } - default: - p2pMessagesReceived.WithLabelValues("unknown").Inc() - logger.Warn("received unknown message type (running outdated software?)", - zap.Any("payload", msg.Message), - zap.Binary("raw", envelope.Data), - zap.String("from", envelope.GetFrom().String())) - } + }() + } + + // Wait for either a shutdown or a fatal error from a pubsub subscription. + select { + case <-ctx.Done(): + return nil + case err := <-errC: + return err } } } diff --git a/node/pkg/p2p/run_params.go b/node/pkg/p2p/run_params.go index 59652f21ca..d4a0c0b0b5 100644 --- a/node/pkg/p2p/run_params.go +++ b/node/pkg/p2p/run_params.go @@ -23,40 +23,42 @@ type ( gst *common.GuardianSetState rootCtxCancel context.CancelFunc - // obsvC is optional and can be set with `WithSignedObservationListener`. - obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation] + // obsvRecvC is optional and can be set with `WithSignedObservationListener`. + obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation] - // obsvReqC is optional and can be set with `WithObservationRequestListener`. - obsvReqC chan<- *gossipv1.ObservationRequest + // obsvReqRecvC is optional and can be set with `WithObservationRequestListener`. + obsvReqRecvC chan<- *gossipv1.ObservationRequest - // signedInC is optional and can be set with `WithSignedVAAListener`. - signedInC chan<- *gossipv1.SignedVAAWithQuorum + // signedIncomingVaaRecvC is optional and can be set with `WithSignedVAAListener`. + signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum - // signedGovCfg is optional and can be set with `WithChainGovernorConfigListener`. - signedGovCfg chan *gossipv1.SignedChainGovernorConfig + // signedGovCfgRecvC is optional and can be set with `WithChainGovernorConfigListener`. + signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig - // WithChainGovernorStatusListener is optional and can be set with `WithChainGovernorStatusListener`. - signedGovSt chan *gossipv1.SignedChainGovernorStatus + // signedGovStatusRecvC is optional and can be set with `WithChainGovernorStatusListener`. + signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus // disableHeartbeatVerify is optional and can be set with `WithDisableHeartbeatVerify` or `WithGuardianOptions`. disableHeartbeatVerify bool // The following options are guardian specific. Set with `WithGuardianOptions`. - nodeName string - gk *ecdsa.PrivateKey - gossipSendC chan []byte - obsvReqSendC <-chan *gossipv1.ObservationRequest - acct *accountant.Accountant - gov *governor.ChainGovernor - components *Components - ibcFeaturesFunc func() string - gatewayRelayerEnabled bool - ccqEnabled bool - signedQueryReqC chan<- *gossipv1.SignedQueryRequest - queryResponseReadC <-chan *query.QueryResponsePublication - ccqBootstrapPeers string - ccqPort uint - ccqAllowedPeers string + nodeName string + gk *ecdsa.PrivateKey + gossipControlSendC chan []byte + gossipAttestationSendC chan []byte + gossipVaaSendC chan []byte + obsvReqSendC <-chan *gossipv1.ObservationRequest + acct *accountant.Accountant + gov *governor.ChainGovernor + components *Components + ibcFeaturesFunc func() string + gatewayRelayerEnabled bool + ccqEnabled bool + signedQueryReqC chan<- *gossipv1.SignedQueryRequest + queryResponseReadC <-chan *query.QueryResponsePublication + ccqBootstrapPeers string + ccqPort uint + ccqAllowedPeers string } // RunOpt is used to specify optional parameters. @@ -96,41 +98,41 @@ func NewRunParams( } // WithSignedObservationListener is used to set the channel to receive `SignedObservation“ messages. -func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt { +func WithSignedObservationListener(obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt { return func(p *RunParams) error { - p.obsvC = obsvC + p.obsvRecvC = obsvRecvC return nil } } // WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages. -func WithSignedVAAListener(signedInC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt { +func WithSignedVAAListener(signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt { return func(p *RunParams) error { - p.signedInC = signedInC + p.signedIncomingVaaRecvC = signedIncomingVaaRecvC return nil } } // WithObservationRequestListener is used to set the channel to receive `ObservationRequest messages. -func WithObservationRequestListener(obsvReqC chan<- *gossipv1.ObservationRequest) RunOpt { +func WithObservationRequestListener(obsvReqRecvC chan<- *gossipv1.ObservationRequest) RunOpt { return func(p *RunParams) error { - p.obsvReqC = obsvReqC + p.obsvReqRecvC = obsvReqRecvC return nil } } // WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig messages. -func WithChainGovernorConfigListener(signedGovCfg chan *gossipv1.SignedChainGovernorConfig) RunOpt { +func WithChainGovernorConfigListener(signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig) RunOpt { return func(p *RunParams) error { - p.signedGovCfg = signedGovCfg + p.signedGovCfgRecvC = signedGovCfgRecvC return nil } } // WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus messages. -func WithChainGovernorStatusListener(signedGovSt chan *gossipv1.SignedChainGovernorStatus) RunOpt { +func WithChainGovernorStatusListener(signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus) RunOpt { return func(p *RunParams) error { - p.signedGovSt = signedGovSt + p.signedGovStatusRecvC = signedGovStatusRecvC return nil } } @@ -147,10 +149,12 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt { func WithGuardianOptions( nodeName string, gk *ecdsa.PrivateKey, - obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], - signedInC chan<- *gossipv1.SignedVAAWithQuorum, - obsvReqC chan<- *gossipv1.ObservationRequest, - gossipSendC chan []byte, + obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], + signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum, + obsvReqRecvC chan<- *gossipv1.ObservationRequest, + gossipControlSendC chan []byte, + gossipAttestationSendC chan []byte, + gossipVaaSendC chan []byte, obsvReqSendC <-chan *gossipv1.ObservationRequest, acct *accountant.Accountant, gov *governor.ChainGovernor, @@ -168,10 +172,12 @@ func WithGuardianOptions( return func(p *RunParams) error { p.nodeName = nodeName p.gk = gk - p.obsvC = obsvC - p.signedInC = signedInC - p.obsvReqC = obsvReqC - p.gossipSendC = gossipSendC + p.obsvRecvC = obsvRecvC + p.signedIncomingVaaRecvC = signedIncomingVaaRecvC + p.obsvReqRecvC = obsvReqRecvC + p.gossipControlSendC = gossipControlSendC + p.gossipAttestationSendC = gossipAttestationSendC + p.gossipVaaSendC = gossipVaaSendC p.obsvReqSendC = obsvReqSendC p.acct = acct p.gov = gov diff --git a/node/pkg/p2p/run_params_test.go b/node/pkg/p2p/run_params_test.go index aebebd952c..a1fd98cab7 100644 --- a/node/pkg/p2p/run_params_test.go +++ b/node/pkg/p2p/run_params_test.go @@ -143,7 +143,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42) signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42) obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42) - gossipSendC := make(chan []byte, 42) + gossipControlSendC := make(chan []byte, 42) + gossipAttestationSendC := make(chan []byte, 42) + gossipVaaSendC := make(chan []byte, 42) obsvReqSendC := make(<-chan *gossipv1.ObservationRequest, 42) acct := &accountant.Accountant{} @@ -172,7 +174,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { obsvC, signedInC, obsvReqC, - gossipSendC, + gossipControlSendC, + gossipAttestationSendC, + gossipVaaSendC, obsvReqSendC, acct, gov, @@ -191,10 +195,12 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { require.NoError(t, err) require.NotNil(t, params) assert.Equal(t, nodeName, params.nodeName) - assert.Equal(t, obsvC, params.obsvC) - assert.Equal(t, signedInC, params.signedInC) - assert.Equal(t, obsvReqC, params.obsvReqC) - assert.Equal(t, gossipSendC, params.gossipSendC) + assert.Equal(t, obsvC, params.obsvRecvC) + assert.Equal(t, signedInC, params.signedIncomingVaaRecvC) + assert.Equal(t, obsvReqC, params.obsvReqRecvC) + assert.Equal(t, gossipControlSendC, params.gossipControlSendC) + assert.Equal(t, gossipAttestationSendC, params.gossipAttestationSendC) + assert.Equal(t, gossipVaaSendC, params.gossipVaaSendC) assert.Equal(t, obsvReqSendC, params.obsvReqSendC) assert.Equal(t, acct, params.acct) assert.Equal(t, gov, params.gov) diff --git a/node/pkg/p2p/watermark_test.go b/node/pkg/p2p/watermark_test.go index 661decf30e..c8e8c98286 100644 --- a/node/pkg/p2p/watermark_test.go +++ b/node/pkg/p2p/watermark_test.go @@ -30,7 +30,9 @@ type G struct { obsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation] obsvReqC chan *gossipv1.ObservationRequest obsvReqSendC chan *gossipv1.ObservationRequest - sendC chan []byte + controlSendC chan []byte + attestationSendC chan []byte + vaaSendC chan []byte signedInC chan *gossipv1.SignedVAAWithQuorum priv p2pcrypto.PrivKey gk *ecdsa.PrivateKey @@ -67,7 +69,9 @@ func NewG(t *testing.T, nodeName string) *G { obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs), obsvReqC: make(chan *gossipv1.ObservationRequest, cs), obsvReqSendC: make(chan *gossipv1.ObservationRequest, cs), - sendC: make(chan []byte, cs), + controlSendC: make(chan []byte, cs), + attestationSendC: make(chan []byte, cs), + vaaSendC: make(chan []byte, cs), signedInC: make(chan *gossipv1.SignedVAAWithQuorum, cs), priv: p2ppriv, gk: guardianpriv, @@ -91,7 +95,9 @@ func NewG(t *testing.T, nodeName string) *G { case <-g.signedInC: case <-g.signedGovCfg: case <-g.signedGovSt: - case <-g.sendC: + case <-g.controlSendC: + case <-g.attestationSendC: + case <-g.vaaSendC: } }() @@ -178,7 +184,9 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) { g.obsvC, g.signedInC, g.obsvReqC, - g.sendC, + g.controlSendC, + g.attestationSendC, + g.vaaSendC, g.obsvReqSendC, g.acct, g.gov, diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index 641332de33..a35f8a773c 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -57,7 +57,7 @@ func (p *Processor) broadcastSignature( } // Broadcast the observation. - p.gossipSendC <- msg + p.gossipAttestationSendC <- msg observationsBroadcast.Inc() hash := hex.EncodeToString(digest.Bytes()) @@ -106,7 +106,7 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) { } // Broadcast the signed VAA. - p.gossipSendC <- msg + p.gossipVaaSendC <- msg signedVAAsBroadcast.Inc() if p.gatewayRelayer != nil { diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index bcc5e6cd6d..014b76344b 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -228,7 +228,8 @@ func (p *Processor) handleCleanup(ctx context.Context) { if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err)) } - p.gossipSendC <- s.ourMsg + + p.gossipAttestationSendC <- s.ourMsg s.retryCtr++ s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr)) aggregationStateRetries.Inc() diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index b1e88268ef..aee81bca88 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -103,8 +103,13 @@ type Processor struct { msgC <-chan *common.MessagePublication // setC is a channel of guardian set updates setC <-chan *common.GuardianSet - // gossipSendC is a channel of outbound messages to broadcast on p2p - gossipSendC chan<- []byte + + // gossipAttestationSendC is a channel of outbound observation messages to broadcast on p2p + gossipAttestationSendC chan<- []byte + + // gossipVaaSendC is a channel of outbound VAA messages to broadcast on p2p + gossipVaaSendC chan<- []byte + // obsvC is a channel of inbound decoded observations from p2p obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation] @@ -162,7 +167,8 @@ func NewProcessor( db *db.Database, msgC <-chan *common.MessagePublication, setC <-chan *common.GuardianSet, - gossipSendC chan<- []byte, + gossipAttestationSendC chan<- []byte, + gossipVaaSendC chan<- []byte, obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], obsvReqSendC chan<- *gossipv1.ObservationRequest, signedInC <-chan *gossipv1.SignedVAAWithQuorum, @@ -175,15 +181,16 @@ func NewProcessor( ) *Processor { return &Processor{ - msgC: msgC, - setC: setC, - gossipSendC: gossipSendC, - obsvC: obsvC, - obsvReqSendC: obsvReqSendC, - signedInC: signedInC, - gk: gk, - gst: gst, - db: db, + msgC: msgC, + setC: setC, + gossipAttestationSendC: gossipAttestationSendC, + gossipVaaSendC: gossipVaaSendC, + obsvC: obsvC, + obsvReqSendC: obsvReqSendC, + signedInC: signedInC, + gk: gk, + gst: gst, + db: db, logger: supervisor.Logger(ctx), state: &aggregationState{observationMap{}},