Skip to content

Commit

Permalink
rename rmn related interfaces and add empty PeerClient implementation (
Browse files Browse the repository at this point in the history
…#142)

The names we used so far:
Client for the RMN interface that the plugin 'talks-to' for rmn related operations.
RawClient for the interface that Client 'talks-to' for low-level peer networking.

This names are not correct.

Renamed them to: rmn.Controller and rmn.PeerClient.

Added a base empty PeerClient implementation.
  • Loading branch information
dimkouv authored Sep 19, 2024
1 parent 824045e commit 24682d5
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 155 deletions.
2 changes: 1 addition & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ packages:
PluginProcessor:
github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn:
interfaces:
Client:
Controller:
github.com/smartcontractkit/chainlink-ccip/pkg/reader:
interfaces:
CCIPReader:
Expand Down
4 changes: 2 additions & 2 deletions commit/merkleroot/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Processor struct {
ccipReader readerpkg.CCIPReader
reportingCfg ocr3types.ReportingPluginConfig
chainSupport plugincommon.ChainSupport
rmnClient rmn.Client
rmnClient rmn.Controller
rmnCrypto cciptypes.RMNCrypto
rmnConfig rmn.Config
}
Expand All @@ -41,7 +41,7 @@ func NewProcessor(
msgHasher cciptypes.MessageHasher,
reportingCfg ocr3types.ReportingPluginConfig,
chainSupport plugincommon.ChainSupport,
rmnClient rmn.Client,
rmnClient rmn.Controller,
rmnCrypto cciptypes.RMNCrypto,
rmnConfig rmn.Config,
) *Processor {
Expand Down
18 changes: 9 additions & 9 deletions commit/merkleroot/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestProcessor_Query(t *testing.T) {
prevOutcome Outcome
contractAddresses map[ccipocr3.ChainSelector]map[string][]byte
cfg pluginconfig.CommitPluginConfig
rmnClient func(t *testing.T) *rmnmocks.MockClient
rmnClient func(t *testing.T) *rmnmocks.MockController
expQuery Query
expErr bool
}{
Expand All @@ -78,8 +78,8 @@ func TestProcessor_Query(t *testing.T) {
RMNSignaturesTimeout: 5 * time.Second,
DestChain: dstChain,
},
rmnClient: func(t *testing.T) *rmnmocks.MockClient {
cl := rmnmocks.NewMockClient(t)
rmnClient: func(t *testing.T) *rmnmocks.MockController {
cl := rmnmocks.NewMockController(t)
cl.EXPECT().
ComputeReportSignatures(
mock.Anything,
Expand Down Expand Up @@ -128,8 +128,8 @@ func TestProcessor_Query(t *testing.T) {
RMNSignaturesTimeout: time.Second,
DestChain: dstChain,
},
rmnClient: func(t *testing.T) *rmnmocks.MockClient {
cl := rmnmocks.NewMockClient(t)
rmnClient: func(t *testing.T) *rmnmocks.MockController {
cl := rmnmocks.NewMockController(t)
time.Sleep(time.Millisecond)
cl.EXPECT().ComputeReportSignatures(mock.Anything, mock.Anything, mock.Anything).
Return(expSigs1, rmn.ErrTimeout) // <------------------------------------ timeout error
Expand All @@ -156,8 +156,8 @@ func TestProcessor_Query(t *testing.T) {
RMNSignaturesTimeout: time.Second,
DestChain: dstChain,
},
rmnClient: func(t *testing.T) *rmnmocks.MockClient {
cl := rmnmocks.NewMockClient(t)
rmnClient: func(t *testing.T) *rmnmocks.MockController {
cl := rmnmocks.NewMockController(t)
time.Sleep(time.Millisecond)
cl.EXPECT().ComputeReportSignatures(mock.Anything, mock.Anything, mock.Anything).
Return(expSigs1, fmt.Errorf("some error")) // <------------------------- some random error
Expand All @@ -176,7 +176,7 @@ func TestProcessor_Query(t *testing.T) {
RMNSignaturesTimeout: 5 * time.Second,
DestChain: dstChain,
},
rmnClient: func(t *testing.T) *rmnmocks.MockClient { return rmnmocks.NewMockClient(t) },
rmnClient: func(t *testing.T) *rmnmocks.MockController { return rmnmocks.NewMockController(t) },
expQuery: Query{},
expErr: false,
},
Expand All @@ -190,7 +190,7 @@ func TestProcessor_Query(t *testing.T) {
RMNSignaturesTimeout: 5 * time.Second,
DestChain: dstChain,
},
rmnClient: func(t *testing.T) *rmnmocks.MockClient { return rmnmocks.NewMockClient(t) },
rmnClient: func(t *testing.T) *rmnmocks.MockController { return rmnmocks.NewMockController(t) },
expQuery: Query{},
expErr: false,
},
Expand Down
48 changes: 0 additions & 48 deletions commit/merkleroot/rmn/common.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// controller.go contains functions and types related to the RMN controller.
// Controller functionality should be high-level functionality that the plugin can directly use.

package rmn

import (
Expand All @@ -15,6 +18,7 @@ import (
chainsel "github.com/smartcontractkit/chain-selectors"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn/rmnpb"
)
Expand All @@ -31,8 +35,8 @@ var (
"sure RMN is enabled, nodes configured correctly, minObservers value is correct")
)

// Client contains the methods required by the plugin to interact with the RMN nodes.
type Client interface {
// Controller contains the high-level functionality required by the plugin to interact with the RMN nodes.
type Controller interface {
// ComputeReportSignatures computes and returns the signatures for the provided lane updates.
//
// This method abstracts away the RMN specific requests (ObservationRequest, ReportSignaturesRequest) and all the
Expand All @@ -54,31 +58,31 @@ type ReportSignatures struct {

type NodeID uint32

// client is the base RMN Client implementation.
type client struct {
// controller is the base RMN Controller implementation.
type controller struct {
lggr logger.Logger
rmnCrypto cciptypes.RMNCrypto
rawRmnClient RawRmnClient
peerClient PeerClient
rmnCfg Config
ed25519Verifier ED25519Verifier

observationsInitialRequestTimerDuration time.Duration
reportsInitialRequestTimerDuration time.Duration
}

// NewClient creates a new RMN Client to be used by the plugin.
func NewClient(
// NewController creates a new RMN Controller to be used by the plugin.
func NewController(
lggr logger.Logger,
rmnCrypto cciptypes.RMNCrypto,
rawRmnClient RawRmnClient,
peerClient PeerClient,
rmnConfig Config,
observationsInitialRequestTimerDuration time.Duration,
reportsInitialRequestTimerDuration time.Duration,
) Client {
return &client{
) Controller {
return &controller{
lggr: lggr,
rmnCrypto: rmnCrypto,
rawRmnClient: rawRmnClient,
peerClient: peerClient,
rmnCfg: rmnConfig,
ed25519Verifier: NewED25519Verifier(),

Expand All @@ -88,7 +92,7 @@ func NewClient(
}

// ComputeReportSignatures sends a request to each rmn node to handle requests and build signatures.
func (c *client) ComputeReportSignatures(
func (c *controller) ComputeReportSignatures(
ctx context.Context,
destChain *rmnpb.LaneDest,
updateRequests []*rmnpb.FixedDestLaneUpdateRequest,
Expand All @@ -97,7 +101,7 @@ func (c *client) ComputeReportSignatures(
updatesPerChain := make(map[uint64]updateRequestWithMeta)
for _, updateReq := range updateRequests {
if _, exists := updatesPerChain[updateReq.LaneSource.SourceChainSelector]; exists {
return nil, errors.New("this Client implementation assumes each lane update is for a different chain")
return nil, errors.New("controller implementation assumes each lane update is for a different chain")
}

updatesPerChain[updateReq.LaneSource.SourceChainSelector] = updateRequestWithMeta{
Expand Down Expand Up @@ -137,7 +141,7 @@ func (c *client) ComputeReportSignatures(
return rmnReportSignatures, nil
}

func (c *client) getRmnSignedObservations(
func (c *controller) getRmnSignedObservations(
ctx context.Context,
destChain *rmnpb.LaneDest,
updateRequestsPerChain map[uint64]updateRequestWithMeta,
Expand Down Expand Up @@ -181,7 +185,7 @@ func (c *client) getRmnSignedObservations(
return signedObservations, nil
}

func (c *client) sendObservationRequests(
func (c *controller) sendObservationRequests(
destChain *rmnpb.LaneDest,
requestsPerNode map[NodeID][]*rmnpb.FixedDestLaneUpdateRequest,
) (requestIDs mapset.Set[uint64]) {
Expand Down Expand Up @@ -209,7 +213,7 @@ func (c *client) sendObservationRequests(
}

// nolint:gocyclo // todo
func (c *client) listenForRmnObservationResponses(
func (c *controller) listenForRmnObservationResponses(
ctx context.Context,
destChain *rmnpb.LaneDest,
requestIDs mapset.Set[uint64],
Expand All @@ -218,7 +222,7 @@ func (c *client) listenForRmnObservationResponses(
) ([]rmnSignedObservationWithMeta, error) {
c.lggr.Infow("Waiting for observation requests", "requestIDs", requestIDs.String())

respChan := c.rawRmnClient.Recv()
respChan := c.peerClient.Recv()

finishedRequestIDs := mapset.NewSet[uint64]()
rmnObservationResponses := make([]rmnSignedObservationWithMeta, 0)
Expand Down Expand Up @@ -316,7 +320,7 @@ func (c *client) listenForRmnObservationResponses(
}

// nolint:gocyclo // todo
func (c *client) validateSignedObservationResponse(
func (c *controller) validateSignedObservationResponse(
rmnNodeID NodeID,
lurs map[uint64]updateRequestWithMeta,
signedObs *rmnpb.SignedObservation,
Expand Down Expand Up @@ -377,7 +381,7 @@ func (c *client) validateSignedObservationResponse(
}

// nolint:gocyclo // todo
func (c *client) getRmnReportSignatures(
func (c *controller) getRmnReportSignatures(
ctx context.Context,
destChain *rmnpb.LaneDest,
rmnSignedObservations []rmnSignedObservationWithMeta,
Expand Down Expand Up @@ -524,7 +528,7 @@ type reportSigWithNodeID struct {
}

// nolint:gocyclo // todo
func (c *client) listenForRmnReportSignatures(
func (c *controller) listenForRmnReportSignatures(
ctx context.Context,
requestIDs mapset.Set[uint64],
fixedDestLaneUpdates []*rmnpb.FixedDestLaneUpdate,
Expand All @@ -535,7 +539,7 @@ func (c *client) listenForRmnReportSignatures(
tReportsInitialRequest := time.NewTimer(c.reportsInitialRequestTimerDuration)
reportSigs := make([]reportSigWithNodeID, 0)
finishedRequests := mapset.NewSet[uint64]()
respChan := c.rawRmnClient.Recv()
respChan := c.peerClient.Recv()
requestIDs = requestIDs.Clone()
c.lggr.Infof("Waiting for report signatures, requestIDs: %s", requestIDs.String())

Expand Down Expand Up @@ -642,7 +646,7 @@ func (c *client) listenForRmnReportSignatures(
}
}

func (c *client) ensureEnoughSignedObservations(rmnSignedObservations []rmnSignedObservationWithMeta) error {
func (c *controller) ensureEnoughSignedObservations(rmnSignedObservations []rmnSignedObservationWithMeta) error {
counts := make(map[uint64]int)
for _, so := range rmnSignedObservations {
for _, lu := range so.SignedObservation.Observation.FixedDestLaneUpdates {
Expand All @@ -660,7 +664,7 @@ func (c *client) ensureEnoughSignedObservations(rmnSignedObservations []rmnSigne
return nil
}

func (c *client) getRmnNodeByID(nodeID NodeID) (RMNNodeInfo, bool) {
func (c *controller) getRmnNodeByID(nodeID NodeID) (RMNNodeInfo, bool) {
for _, node := range c.rmnCfg.Home.RmnNodes {
if node.ID == nodeID {
return node, true
Expand All @@ -669,19 +673,6 @@ func (c *client) getRmnNodeByID(nodeID NodeID) (RMNNodeInfo, bool) {
return RMNNodeInfo{}, false
}

type RawRmnClient interface {
Send(rmnNodeID NodeID, request []byte) error
// Recv returns a channel which can be used to listen on for
// responses by all RMN nodes. This is expected to be monitored
// by the plugin in order to get RMN responses.
Recv() <-chan RawRmnResponse
}

type RawRmnResponse struct {
RMNNodeID NodeID
Body []byte // pb
}

type updateRequestWithMeta struct {
data *rmnpb.FixedDestLaneUpdateRequest
rmnNodes mapset.Set[NodeID]
Expand All @@ -692,6 +683,44 @@ type rmnSignedObservationWithMeta struct {
RMNNodeID NodeID
}

func (c *controller) marshalAndSend(req *rmnpb.Request, nodeID NodeID) error {
reqBytes, err := proto.Marshal(req)
if err != nil {
return fmt.Errorf("proto marshal RMN request: %w", err)
}

if err := c.peerClient.Send(nodeID, reqBytes); err != nil {
return fmt.Errorf("send rmn request: %w", err)
}

return nil
}

// parseResponse parses the response from the RMN and returns the response.
// Validates that the response is expected and not a duplicate.
func (c *controller) parseResponse(
resp *PeerResponse, requestIDs, gotResponses mapset.Set[uint64]) (*rmnpb.Response, error) {

c.lggr.Infof("requests we are waiting for: %s", requestIDs.String())

responseTyp := &rmnpb.Response{}
err := proto.Unmarshal(resp.Body, responseTyp)
if err != nil {
return nil, fmt.Errorf("proto unmarshal: %w", err)
}

if !requestIDs.Contains(responseTyp.RequestId) {
return nil, fmt.Errorf(
"got an RMN response that we are not waiting for: %d (%s)", responseTyp.RequestId, requestIDs.String())
}

if gotResponses.Contains(responseTyp.RequestId) {
return nil, fmt.Errorf("got a duplicate RMN response: %d", responseTyp.RequestId)
}

return responseTyp, nil
}

func randomShuffle[T any](s []T) []T {
ret := make([]T, len(s))
for i, randIndex := range rand.Perm(len(s)) {
Expand Down
Loading

0 comments on commit 24682d5

Please sign in to comment.