Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rstout committed Aug 13, 2024
1 parent 009a7cf commit ffb55ad
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 64 deletions.
10 changes: 7 additions & 3 deletions commitrmnocb/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"
)

const maxReportTransmissionCheckAttempts = 5

// PluginFactoryConstructor implements common OCR3ReportingPluginClient and is used for initializing a plugin factory
// and a validation service.
type PluginFactoryConstructor struct{}
Expand Down Expand Up @@ -112,16 +114,18 @@ func (p *PluginFactory) NewReportingPlugin(config ocr3types.ReportingPluginConfi
config.OracleID,
oracleIDToP2PID,
pluginconfig.CommitPluginConfig{
DestChain: p.ocrConfig.Config.ChainSelector,
NewMsgScanBatchSize: merklemulti.MaxNumberTreeLeaves,
OffchainConfig: offchainConfig,
DestChain: p.ocrConfig.Config.ChainSelector,
NewMsgScanBatchSize: merklemulti.MaxNumberTreeLeaves,
MaxReportTransmissionCheckAttempts: maxReportTransmissionCheckAttempts,
OffchainConfig: offchainConfig,
},
ccipReader,
onChainTokenPricesReader,
p.commitCodec,
p.msgHasher,
p.lggr,
p.homeChainReader,
config,
), ocr3types.ReportingPluginInfo{
Name: "CCIPRoleCommit",
Limits: ocr3types.ReportingPluginLimits{
Expand Down
15 changes: 8 additions & 7 deletions commitrmnocb/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (p *Plugin) Observation(
return observation.Encode()
}

// ObserveOffRampNextSeqNums Observe the next sequence numbers for each source chain from the OffRamp
// ObserveOffRampNextSeqNums observes the next sequence numbers for each source chain from the OffRamp
func (p *Plugin) ObserveOffRampNextSeqNums(ctx context.Context) []plugintypes.SeqNumChain {
supportsDestChain, err := p.supportsDestChain(p.nodeID)
if err != nil {
Expand Down Expand Up @@ -92,9 +92,9 @@ func (p *Plugin) ObserveOffRampNextSeqNums(ctx context.Context) []plugintypes.Se
return nil
}

// ObserveMerkleRoots Compute the merkle roots for the given sequence number ranges
// ObserveMerkleRoots computes the merkle roots for the given sequence number ranges
func (p *Plugin) ObserveMerkleRoots(ctx context.Context, ranges []ChainRange) []cciptypes.MerkleRootChain {
roots := make([]cciptypes.MerkleRootChain, 0)
var roots []cciptypes.MerkleRootChain
supportedChains, err := p.supportedChains(p.nodeID)
if err != nil {
p.lggr.Warnw("call to supportedChains failed", "err", err)
Expand Down Expand Up @@ -125,9 +125,9 @@ func (p *Plugin) ObserveMerkleRoots(ctx context.Context, ranges []ChainRange) []
return roots
}

// computeMerkleRoot Compute the merkle root of a list of messages
// computeMerkleRoot computes the merkle root of a list of messages
func (p *Plugin) computeMerkleRoot(ctx context.Context, msgs []cciptypes.Message) (cciptypes.Bytes32, error) {
hashes := make([][32]byte, 0)
var hashes [][32]byte
sort.Slice(msgs, func(i, j int) bool { return msgs[i].Header.SequenceNumber < msgs[j].Header.SequenceNumber })

for i, msg := range msgs {
Expand All @@ -142,8 +142,9 @@ func (p *Plugin) computeMerkleRoot(ctx context.Context, msgs []cciptypes.Message

msgHash, err := p.msgHasher.Hash(ctx, msg)
if err != nil {
p.lggr.Warnw("failed to hash message", "msg", msg, "err", err)
return cciptypes.Bytes32{}, err
msgID := hex.EncodeToString(msg.Header.MessageID[:])
p.lggr.Warnw("failed to hash message", "msg", msg, "msg_id", msgID, "err", err)
return cciptypes.Bytes32{}, fmt.Errorf("failed to hash message with id %s: %w", msgID, err)
}

hashes = append(hashes, msgHash)
Expand Down
53 changes: 22 additions & 31 deletions commitrmnocb/outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ func (p *Plugin) ReportRangesOutcome(
}
}

// We sort here so that Outcome serializes deterministically
sort.Slice(rangesToReport, func(i, j int) bool { return rangesToReport[i].ChainSel < rangesToReport[j].ChainSel })

outcome := Outcome{
OutcomeType: ReportIntervalsSelected,
RangesSelectedForReport: rangesToReport,
Expand All @@ -101,11 +98,6 @@ func (p *Plugin) buildReport(
) Outcome {
roots := maps.Values(consensusObservation.MerkleRoots)

// We sort here so that Outcome serializes deterministically
sort.Slice(roots, func(i, j int) bool {
return roots[i].ChainSel < roots[j].ChainSel
})

outcomeType := ReportGenerated
if len(roots) == 0 {
outcomeType = ReportEmpty
Expand All @@ -114,8 +106,8 @@ func (p *Plugin) buildReport(
outcome := Outcome{
OutcomeType: outcomeType,
RootsToReport: roots,
GasPrices: consensusObservation.GasPricesSortedArray(),
TokenPrices: consensusObservation.TokenPricesSortedArray(),
GasPrices: consensusObservation.GasPricesArray(),
TokenPrices: consensusObservation.TokenPricesArray(),
}

return outcome
Expand Down Expand Up @@ -198,15 +190,15 @@ func (p *Plugin) merkleRootConsensus(
consensus := make(map[cciptypes.ChainSelector]cciptypes.MerkleRootChain)

for chain, roots := range rootsByChain {
if f, exists := fChains[chain]; exists {
if fChain, exists := fChains[chain]; exists {
root, count := mostFrequentElem(roots)

if count <= f {
if count <= fChain {
// TODO: metrics
p.lggr.Warnf("failed to reach consensus on a merkle root for chain %d "+
"because no single merkle root was observed more than the expected %d times, found merkle root %d "+
"observed by only %d oracles, all observed merkle roots: %v",
chain, f, root, count, roots)
"because no single merkle root was observed more than the expected fChain (%d) times, found "+
"merkle root %d observed by only %d oracles, all observed merkle roots: %v",
chain, fChain, root, count, roots)
}

consensus[chain] = root
Expand All @@ -229,21 +221,21 @@ func (p *Plugin) onRampMaxSeqNumsConsensus(
consensus := make(map[cciptypes.ChainSelector]cciptypes.SeqNum)

for chain, onRampMaxSeqNums := range onRampMaxSeqNumsByChain {
if f, exists := fChains[chain]; exists {
if len(onRampMaxSeqNums) < 2*f+1 {
if fChain, exists := fChains[chain]; exists {
if len(onRampMaxSeqNums) < 2*fChain+1 {
// TODO: metrics
p.lggr.Warnf("could not reach consensus on onRampMaxSeqNums for chain %d "+
"because we did not receive more than 2f+1 observed sequence numbers, 2f+1: %d, "+
"because we did not receive more than 2fChain+1 observed sequence numbers, 2fChain+1: %d, "+
"len(onRampMaxSeqNums): %d, onRampMaxSeqNums: %v",
chain, 2*f+1, len(onRampMaxSeqNums), onRampMaxSeqNums)
chain, 2*fChain+1, len(onRampMaxSeqNums), onRampMaxSeqNums)
} else {
sort.Slice(onRampMaxSeqNums, func(i, j int) bool { return onRampMaxSeqNums[i] < onRampMaxSeqNums[j] })
consensus[chain] = onRampMaxSeqNums[f]
consensus[chain] = onRampMaxSeqNums[fChain]
}
} else {
// TODO: metrics
p.lggr.Warnf("could not reach consensus on onRampMaxSeqNums for chain %d "+
"because there was no consensus f value for this chain", chain)
"because there was no consensus fChain value for this chain", chain)
}
}

Expand All @@ -264,7 +256,7 @@ func (p *Plugin) offRampMaxSeqNumsConsensus(
if count <= fDestChain {
// TODO: metrics
p.lggr.Warnf("could not reach consensus on offRampMaxSeqNums for chain %d "+
"because we did not receive a sequence number that was observed by at least f (%d) oracles, "+
"because we did not receive a sequence number that was observed by at least fChain (%d) oracles, "+
"offRampMaxSeqNums: %v", chain, fDestChain, offRampMaxSeqNums)
} else {
consensus[chain] = seqNum
Expand All @@ -281,15 +273,14 @@ func (p *Plugin) fChainConsensus(fChainValues map[cciptypes.ChainSelector][]int)
consensus := make(map[cciptypes.ChainSelector]int)

for chain, fValues := range fChainValues {
f, _ := mostFrequentElem(fValues)
// TODO: uncomment when p.reportingCfg is added back
//if count < p.reportingCfg.F {
// // TODO: metrics
// p.lggr.Warnf("failed to reach consensus on fChain values for chain %d because no single f "+
// "value was observed more than the expected %d times, found f value %d observed by only %d oracles, "+
// "f values: %v",
// chain, p.reportingCfg.F, f, count, fValues)
//}
f, count := mostFrequentElem(fValues)
if count < p.reportingCfg.F {
// TODO: metrics
p.lggr.Warnf("failed to reach consensus on fChain values for chain %d because no single f "+
"value was observed more than the expected %d times, found f value %d observed by only %d oracles, "+
"f values: %v",
chain, p.reportingCfg.F, f, count, fValues)
}

consensus[chain] = f
}
Expand Down
12 changes: 5 additions & 7 deletions commitrmnocb/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ type Plugin struct {
msgHasher cciptypes.MessageHasher
lggr logger.Logger
homeChain reader.HomeChain

// TODO: add back
// reportingCfg ocr3types.ReportingPluginConfig
reportingCfg ocr3types.ReportingPluginConfig
}

func NewPlugin(
Expand All @@ -48,7 +46,7 @@ func NewPlugin(
msgHasher cciptypes.MessageHasher,
lggr logger.Logger,
homeChain reader.HomeChain,
// reportingCfg ocr3types.ReportingPluginConfig,
reportingCfg ocr3types.ReportingPluginConfig,
) *Plugin {
readerSyncer := plugincommon.NewBackgroundReaderSyncer(
lggr,
Expand All @@ -61,7 +59,6 @@ func NewPlugin(
}

return &Plugin{
// reportingCfg: reportingCfg,
nodeID: nodeID,
oracleIDToP2pID: oracleIDToP2pID,
lggr: lggr,
Expand All @@ -72,6 +69,7 @@ func NewPlugin(
readerSyncer: readerSyncer,
reportCodec: reportCodec,
msgHasher: msgHasher,
reportingCfg: reportingCfg,
}
}

Expand Down Expand Up @@ -119,7 +117,7 @@ func (p *Plugin) knownSourceChainsSlice() []cciptypes.ChainSelector {
return slicelib.Filter(knownSourceChainsSlice, func(ch cciptypes.ChainSelector) bool { return ch != p.cfg.DestChain })
}

// Return the set of chains that the given Oracle is configured to access
// Returns the set of chains that the given Oracle is configured to access
func (p *Plugin) supportedChains(oracleID commontypes.OracleID) (mapset.Set[cciptypes.ChainSelector], error) {
p2pID, exists := p.oracleIDToP2pID[oracleID]
if !exists {
Expand All @@ -134,7 +132,7 @@ func (p *Plugin) supportedChains(oracleID commontypes.OracleID) (mapset.Set[ccip
return supportedChains, nil
}

// supportsDestChain Returns true if the given oracle supports the dest chain, returns false otherwise
// supportsDestChain returns true if the given oracle supports the dest chain, returns false otherwise
func (p *Plugin) supportsDestChain(oracle commontypes.OracleID) (bool, error) {
destChainConfig, err := p.homeChain.GetChainConfig(p.cfg.DestChain)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion commitrmnocb/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commitrmnocb

import (
"context"
"encoding/hex"
"fmt"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
Expand All @@ -16,7 +17,7 @@ func (p *Plugin) Reports(seqNr uint64, outcomeBytes ocr3types.Outcome) ([]ocr3ty
if err != nil {
// TODO: metrics
p.lggr.Errorw("failed to decode Outcome", "outcomeBytes", outcomeBytes, "err", err)
return nil, fmt.Errorf("failed to decode Outcome: %w", err)
return nil, fmt.Errorf("failed to decode Outcome (%s): %w", hex.EncodeToString(outcomeBytes), err)
}

if outcome.OutcomeType != ReportGenerated {
Expand Down
45 changes: 30 additions & 15 deletions commitrmnocb/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,38 +157,30 @@ type ConsensusObservation struct {
FChain map[cciptypes.ChainSelector]int
}

// GasPricesSortedArray returns a sorted list of gas prices
func (co ConsensusObservation) GasPricesSortedArray() []cciptypes.GasPriceChain {
// GasPricesArray returns a list of gas prices
func (co ConsensusObservation) GasPricesArray() []cciptypes.GasPriceChain {
gasPrices := make([]cciptypes.GasPriceChain, 0, len(co.GasPrices))
for chain, gasPrice := range co.GasPrices {
gasPrices = append(gasPrices, cciptypes.NewGasPriceChain(gasPrice.Int, chain))
}

sort.Slice(gasPrices, func(i, j int) bool {
return gasPrices[i].ChainSel < gasPrices[j].ChainSel
})

return gasPrices
}

// TokenPricesSortedArray returns a sorted list of token prices
func (co ConsensusObservation) TokenPricesSortedArray() []cciptypes.TokenPrice {
// TokenPricesArray returns a list of token prices
func (co ConsensusObservation) TokenPricesArray() []cciptypes.TokenPrice {
tokenPrices := make([]cciptypes.TokenPrice, 0, len(co.TokenPrices))
for tokenID, tokenPrice := range co.TokenPrices {
tokenPrices = append(tokenPrices, cciptypes.NewTokenPrice(tokenID, tokenPrice.Int))
}

sort.Slice(tokenPrices, func(i, j int) bool {
return tokenPrices[i].TokenID < tokenPrices[j].TokenID
})

return tokenPrices
}

type OutcomeType int

const (
ReportIntervalsSelected OutcomeType = iota
ReportIntervalsSelected OutcomeType = iota + 1
ReportGenerated
ReportEmpty
ReportInFlight
Expand All @@ -206,8 +198,31 @@ type Outcome struct {
ReportTransmissionCheckAttempts uint `json:"reportTransmissionCheckAttempts"`
}

// Encode TODO: sort all lists here to ensure deterministic serialization
// Sort all fields of the given Outcome
func (o Outcome) sort() {
sort.Slice(o.RangesSelectedForReport, func(i, j int) bool {
return o.RangesSelectedForReport[i].ChainSel < o.RangesSelectedForReport[j].ChainSel
})
sort.Slice(o.RootsToReport, func(i, j int) bool {
return o.RootsToReport[i].ChainSel < o.RootsToReport[j].ChainSel
})
sort.Slice(o.OffRampNextSeqNums, func(i, j int) bool {
return o.OffRampNextSeqNums[i].ChainSel < o.OffRampNextSeqNums[j].ChainSel
})
sort.Slice(o.TokenPrices, func(i, j int) bool {
return o.TokenPrices[i].TokenID < o.TokenPrices[j].TokenID
})
sort.Slice(o.GasPrices, func(i, j int) bool {
return o.GasPrices[i].ChainSel < o.GasPrices[j].ChainSel
})
}

// Encode encodes an Outcome deterministically
func (o Outcome) Encode() ([]byte, error) {

// Sort all lists to ensure deterministic serialization
o.sort()

encodedOutcome, err := json.Marshal(o)
if err != nil {
return nil, fmt.Errorf("failed to encode Outcome: %w", err)
Expand Down Expand Up @@ -244,7 +259,7 @@ func (o Outcome) NextState() CommitPluginState {
type CommitPluginState int

const (
SelectingRangesForReport CommitPluginState = iota
SelectingRangesForReport CommitPluginState = iota + 1
BuildingReport
WaitingForReportTransmission
)
Expand Down

0 comments on commit ffb55ad

Please sign in to comment.