diff --git a/commitrmnocb/factory.go b/commitrmnocb/factory.go index e9a7b319..6fe4996b 100644 --- a/commitrmnocb/factory.go +++ b/commitrmnocb/factory.go @@ -24,6 +24,7 @@ import ( ) const maxReportTransmissionCheckAttempts = 5 +const maxQueryLength = 1024 * 1024 // 1MB // PluginFactoryConstructor implements common OCR3ReportingPluginClient and is used for initializing a plugin factory // and a validation service. @@ -129,10 +130,10 @@ func (p *PluginFactory) NewReportingPlugin(config ocr3types.ReportingPluginConfi ), ocr3types.ReportingPluginInfo{ Name: "CCIPRoleCommit", Limits: ocr3types.ReportingPluginLimits{ - MaxQueryLength: 1024 * 1024, // 1MB - MaxObservationLength: 20_000, // 20kB - MaxOutcomeLength: 10_000, // 10kB - MaxReportLength: 10_000, // 10kB + MaxQueryLength: maxQueryLength, + MaxObservationLength: 20_000, // 20kB + MaxOutcomeLength: 10_000, // 10kB + MaxReportLength: 10_000, // 10kB MaxReportCount: 10, }, }, nil diff --git a/commitrmnocb/observation.go b/commitrmnocb/observation.go index 3b91a9f6..6068d54e 100644 --- a/commitrmnocb/observation.go +++ b/commitrmnocb/observation.go @@ -34,6 +34,10 @@ func (p *Plugin) Observation( case SelectingRangesForReport: offRampNextSeqNums := p.observer.ObserveOffRampNextSeqNums(ctx) observation = Observation{ + // TODO: observe OnRamp max seq nums. The use of offRampNextSeqNums here effectively disables batching, + // e.g. the ranges selected for each chain will be [x, x] (e.g. [46, 46]), which means reports will only + // contain one message per chain. Querying the OnRamp contract requires changes to reader.CCIP, which will + // need to be done in a future change. OnRampMaxSeqNums: offRampNextSeqNums, OffRampNextSeqNums: offRampNextSeqNums, FChain: p.observer.ObserveFChain(), @@ -54,7 +58,7 @@ func (p *Plugin) Observation( } default: - p.lggr.Warnw("Unexpected state", "state", nextState) + p.lggr.Errorw("Unexpected state", "state", nextState) return observation.Encode() } @@ -93,33 +97,33 @@ func (o ObserverImpl) ObserveOffRampNextSeqNums(ctx context.Context) []plugintyp return nil } - if supportsDestChain { - sourceChains, err := o.chainSupport.KnownSourceChainsSlice() - if err != nil { - o.lggr.Warnw("call to KnownSourceChainsSlice failed", "err", err) - return nil - } - offRampNextSeqNums, err := o.ccipReader.NextSeqNum(ctx, sourceChains) - if err != nil { - o.lggr.Warnw("call to NextSeqNum failed", "err", err) - return nil - } + if !supportsDestChain { + return nil + } - if len(offRampNextSeqNums) != len(sourceChains) { - o.lggr.Warnf("call to NextSeqNum returned unexpected number of seq nums, got %d, expected %d", - len(offRampNextSeqNums), len(sourceChains)) - return nil - } + sourceChains, err := o.chainSupport.KnownSourceChainsSlice() + if err != nil { + o.lggr.Warnw("call to KnownSourceChainsSlice failed", "err", err) + return nil + } + offRampNextSeqNums, err := o.ccipReader.NextSeqNum(ctx, sourceChains) + if err != nil { + o.lggr.Warnw("call to NextSeqNum failed", "err", err) + return nil + } - result := make([]plugintypes.SeqNumChain, len(sourceChains)) - for i := range sourceChains { - result[i] = plugintypes.SeqNumChain{ChainSel: sourceChains[i], SeqNum: offRampNextSeqNums[i]} - } + if len(offRampNextSeqNums) != len(sourceChains) { + o.lggr.Errorf("call to NextSeqNum returned unexpected number of seq nums, got %d, expected %d", + len(offRampNextSeqNums), len(sourceChains)) + return nil + } - return result + result := make([]plugintypes.SeqNumChain, len(sourceChains)) + for i := range sourceChains { + result[i] = plugintypes.SeqNumChain{ChainSel: sourceChains[i], SeqNum: offRampNextSeqNums[i]} } - return nil + return result } // ObserveMerkleRoots computes the merkle roots for the given sequence number ranges @@ -139,19 +143,19 @@ func (o ObserverImpl) ObserveMerkleRoots( msgs, err := o.ccipReader.MsgsBetweenSeqNums(ctx, chainRange.ChainSel, chainRange.SeqNumRange) if err != nil { o.lggr.Warnw("call to MsgsBetweenSeqNums failed", "err", err) - } else { - root, err := o.computeMerkleRoot(ctx, msgs) - if err != nil { - o.lggr.Warnw("call to computeMerkleRoot failed", "err", err) - } else { - merkleRoot := cciptypes.MerkleRootChain{ - ChainSel: chainRange.ChainSel, - SeqNumsRange: chainRange.SeqNumRange, - MerkleRoot: root, - } - roots = append(roots, merkleRoot) - } + continue + } + root, err := o.computeMerkleRoot(ctx, msgs) + if err != nil { + o.lggr.Warnw("call to computeMerkleRoot failed", "err", err) + continue + } + merkleRoot := cciptypes.MerkleRootChain{ + ChainSel: chainRange.ChainSel, + SeqNumsRange: chainRange.SeqNumRange, + MerkleRoot: root, } + roots = append(roots, merkleRoot) } } @@ -183,6 +187,7 @@ func (o ObserverImpl) computeMerkleRoot(ctx context.Context, msgs []cciptypes.Me hashes = append(hashes, msgHash) } + // TODO: Do not hard code the hash function, it should be derived from the message hasher tree, err := merklemulti.NewTree(hashutil.NewKeccak(), hashes) if err != nil { return [32]byte{}, fmt.Errorf("failed to construct merkle tree from %d leaves: %w", len(hashes), err) diff --git a/commitrmnocb/observation_test.go b/commitrmnocb/observation_test.go index a0d41537..50ec5b5c 100644 --- a/commitrmnocb/observation_test.go +++ b/commitrmnocb/observation_test.go @@ -239,7 +239,7 @@ func Test_ObserveOffRampNextSeqNums(t *testing.T) { o := ObserverImpl{ nodeID: nodeID, lggr: logger.Test(t), - msgHasher: NewMessageHasher(), + msgHasher: mocks.NewMessageHasher(), ccipReader: reader, chainSupport: chainSupport, } @@ -449,7 +449,7 @@ func Test_ObserveMerkleRoots(t *testing.T) { o := ObserverImpl{ nodeID: nodeID, lggr: logger.Test(t), - msgHasher: NewMessageHasher(), + msgHasher: mocks.NewMessageHasher(), ccipReader: reader, chainSupport: chainSupport, } @@ -481,7 +481,7 @@ func Test_computeMerkleRoot(t *testing.T) { MessageID: mustNewMessageID("0x1a"), SequenceNumber: 112, }}, - messageHasher: NewMessageHasher(), + messageHasher: mocks.NewMessageHasher(), expMerkleRoot: "1a00000000000000000000000000000000000000000000000000000000000000", expErr: false, }, @@ -500,7 +500,7 @@ func Test_computeMerkleRoot(t *testing.T) { MessageID: mustNewMessageID("0x87"), SequenceNumber: 114, }}, - messageHasher: NewMessageHasher(), + messageHasher: mocks.NewMessageHasher(), expMerkleRoot: "94c7e711e6f2acf41dca598ced55b6925e55aaed83520dc5ea6cbc054344564b", expErr: false, }, @@ -515,14 +515,14 @@ func Test_computeMerkleRoot(t *testing.T) { MessageID: mustNewMessageID("0x12"), SequenceNumber: 36, }}, - messageHasher: NewMessageHasher(), + messageHasher: mocks.NewMessageHasher(), expMerkleRoot: "", expErr: true, }, { name: "Empty messages", messageHeaders: []cciptypes.RampMessageHeader{}, - messageHasher: NewMessageHasher(), + messageHasher: mocks.NewMessageHasher(), expMerkleRoot: "", expErr: true, }, @@ -580,19 +580,6 @@ func mustNewMessageID(msgIDHex string) cciptypes.Bytes32 { return msgID } -type MessageHasher struct{} - -func NewMessageHasher() *MessageHasher { - return &MessageHasher{} -} - -func (m *MessageHasher) Hash(ctx context.Context, msg cciptypes.Message) (cciptypes.Bytes32, error) { - // simply return the msg id as bytes32 - var b32 [32]byte - copy(b32[:], msg.Header.MessageID[:]) - return b32, nil -} - type BadMessageHasher struct{} func NewBadMessageHasher() *BadMessageHasher { diff --git a/commitrmnocb/outcome.go b/commitrmnocb/outcome.go index 95c09628..bea98f47 100644 --- a/commitrmnocb/outcome.go +++ b/commitrmnocb/outcome.go @@ -9,6 +9,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3" "github.com/smartcontractkit/chainlink-ccip/plugintypes" @@ -24,7 +25,7 @@ func (p *Plugin) Outcome( previousOutcome, nextState := p.decodeOutcome(outCtx.PreviousOutcome) commitQuery := Query{} - consensusObservation, err := p.getConsensusObservation(aos) + consensusObservation, err := getConsensusObservation(p.lggr, p.reportingCfg.F, p.cfg.DestChain, aos) if err != nil { return ocr3types.Outcome{}, err } @@ -33,13 +34,14 @@ func (p *Plugin) Outcome( switch nextState { case SelectingRangesForReport: - outcome = p.ReportRangesOutcome(commitQuery, consensusObservation) + outcome = ReportRangesOutcome(commitQuery, consensusObservation) case BuildingReport: - outcome = p.buildReport(commitQuery, consensusObservation) + outcome = buildReport(commitQuery, consensusObservation) case WaitingForReportTransmission: - outcome = p.checkForReportTransmission(previousOutcome, consensusObservation) + outcome = checkForReportTransmission( + p.lggr, p.cfg.MaxReportTransmissionCheckAttempts, previousOutcome, consensusObservation) default: p.lggr.Warnw("Unexpected state in Outcome", "state", nextState) @@ -51,7 +53,8 @@ func (p *Plugin) Outcome( } // ReportRangesOutcome determines the sequence number ranges for each chain to build a report from in the next round -func (p *Plugin) ReportRangesOutcome( +// TODO: ensure each range is below a limit +func ReportRangesOutcome( query Query, consensusObservation ConsensusObservation, ) Outcome { @@ -94,7 +97,7 @@ func (p *Plugin) ReportRangesOutcome( // Given a set of observed merkle roots, gas prices and token prices, and roots from RMN, construct a report // to transmit on-chain -func (p *Plugin) buildReport( +func buildReport( _ Query, consensusObservation ConsensusObservation, ) Outcome { @@ -122,7 +125,9 @@ func (p *Plugin) buildReport( // ReportTransmissionFailed to signify we stop checking for updates and start a new report generation phase. If no // update is detected, and we haven't exhausted our check attempts, output ReportInFlight to signify that we should // check again next round. -func (p *Plugin) checkForReportTransmission( +func checkForReportTransmission( + lggr logger.Logger, + maxReportTransmissionCheckAttempts uint, previousOutcome Outcome, consensusObservation ConsensusObservation, ) Outcome { @@ -143,8 +148,8 @@ func (p *Plugin) checkForReportTransmission( } } - if previousOutcome.ReportTransmissionCheckAttempts+1 >= p.cfg.MaxReportTransmissionCheckAttempts { - p.lggr.Warnw("Failed to detect report transmission") + if previousOutcome.ReportTransmissionCheckAttempts+1 >= maxReportTransmissionCheckAttempts { + lggr.Warnw("Failed to detect report transmission") return Outcome{ OutcomeType: ReportTransmissionFailed, } @@ -158,24 +163,29 @@ func (p *Plugin) checkForReportTransmission( } // getConsensusObservation Combine the list of observations into a single consensus observation -func (p *Plugin) getConsensusObservation(aos []types.AttributedObservation) (ConsensusObservation, error) { +func getConsensusObservation( + lggr logger.Logger, + F int, + destChain cciptypes.ChainSelector, + aos []types.AttributedObservation, +) (ConsensusObservation, error) { aggObs := aggregateObservations(aos) - fChains := p.fChainConsensus(aggObs.FChain) + fChains := fChainConsensus(lggr, F, aggObs.FChain) - fDestChain, exists := fChains[p.cfg.DestChain] + fDestChain, exists := fChains[destChain] if !exists { return ConsensusObservation{}, - fmt.Errorf("no consensus value for fDestChain, destChain: %d", p.cfg.DestChain) + fmt.Errorf("no consensus value for fDestChain, destChain: %d", destChain) } consensusObs := ConsensusObservation{ - MerkleRoots: p.merkleRootConsensus(aggObs.MerkleRoots, fChains), + MerkleRoots: merkleRootConsensus(lggr, aggObs.MerkleRoots, fChains), // TODO: use consensus of observed gas prices GasPrices: make(map[cciptypes.ChainSelector]cciptypes.BigInt), // TODO: use consensus of observed token prices TokenPrices: make(map[types.Account]cciptypes.BigInt), - OnRampMaxSeqNums: p.onRampMaxSeqNumsConsensus(aggObs.OnRampMaxSeqNums, fChains), - OffRampNextSeqNums: p.offRampMaxSeqNumsConsensus(aggObs.OffRampNextSeqNums, fDestChain), + OnRampMaxSeqNums: onRampMaxSeqNumsConsensus(lggr, aggObs.OnRampMaxSeqNums, fChains), + OffRampNextSeqNums: offRampMaxSeqNumsConsensus(lggr, aggObs.OffRampNextSeqNums, fDestChain), FChain: fChains, } @@ -185,7 +195,8 @@ func (p *Plugin) getConsensusObservation(aos []types.AttributedObservation) (Con // Given a mapping from chains to a list of merkle roots, return a mapping from chains to a single consensus merkle // root. The consensus merkle root for a given chain is the merkle root with the most observations that was observed at // least fChain times. -func (p *Plugin) merkleRootConsensus( +func merkleRootConsensus( + lggr logger.Logger, rootsByChain map[cciptypes.ChainSelector][]cciptypes.MerkleRootChain, fChains map[cciptypes.ChainSelector]int, ) map[cciptypes.ChainSelector]cciptypes.MerkleRootChain { @@ -197,16 +208,16 @@ func (p *Plugin) merkleRootConsensus( if count <= fChain { // TODO: metrics - p.lggr.Warnf("failed to reach consensus on a merkle root for chain %d "+ + lggr.Warnf("failed to reach consensus on a merkle root for chain %d "+ "because no single merkle root was observed more than the expected fChain (%d) times, found "+ "merkle root %d observed by only %d oracles, all observed merkle roots: %v", chain, fChain, root, count, roots) + } else { + consensus[chain] = root } - - consensus[chain] = root } else { // TODO: metrics - p.lggr.Warnf("merkleRootConsensus: fChain not found for chain %d", chain) + lggr.Warnf("merkleRootConsensus: fChain not found for chain %d", chain) } } @@ -216,7 +227,8 @@ func (p *Plugin) merkleRootConsensus( // Given a mapping from chains to a list of max seq nums on their corresponding OnRamp, return a mapping from chains // to a single max seq num. The consensus max seq num for a given chain is the f'th lowest max seq num if the number // of max seq num observations is greater or equal than 2f+1, where f is the FChain of the corresponding source chain. -func (p *Plugin) onRampMaxSeqNumsConsensus( +func onRampMaxSeqNumsConsensus( + lggr logger.Logger, onRampMaxSeqNumsByChain map[cciptypes.ChainSelector][]cciptypes.SeqNum, fChains map[cciptypes.ChainSelector]int, ) map[cciptypes.ChainSelector]cciptypes.SeqNum { @@ -226,7 +238,7 @@ func (p *Plugin) onRampMaxSeqNumsConsensus( if fChain, exists := fChains[chain]; exists { if len(onRampMaxSeqNums) < 2*fChain+1 { // TODO: metrics - p.lggr.Warnf("could not reach consensus on onRampMaxSeqNums for chain %d "+ + lggr.Warnf("could not reach consensus on onRampMaxSeqNums for chain %d "+ "because we did not receive more than 2fChain+1 observed sequence numbers, 2fChain+1: %d, "+ "len(onRampMaxSeqNums): %d, onRampMaxSeqNums: %v", chain, 2*fChain+1, len(onRampMaxSeqNums), onRampMaxSeqNums) @@ -236,7 +248,7 @@ func (p *Plugin) onRampMaxSeqNumsConsensus( } } else { // TODO: metrics - p.lggr.Warnf("could not reach consensus on onRampMaxSeqNums for chain %d "+ + lggr.Warnf("could not reach consensus on onRampMaxSeqNums for chain %d "+ "because there was no consensus fChain value for this chain", chain) } } @@ -247,7 +259,8 @@ func (p *Plugin) onRampMaxSeqNumsConsensus( // Given a mapping from chains to a list of max seq nums on the OffRamp, return a mapping from chains // to a single max seq num. The consensus max seq num for a given chain is the max seq num with the most observations // that was observed at least f times, where f is the FChain of the dest chain. -func (p *Plugin) offRampMaxSeqNumsConsensus( +func offRampMaxSeqNumsConsensus( + lggr logger.Logger, offRampMaxSeqNumsByChain map[cciptypes.ChainSelector][]cciptypes.SeqNum, fDestChain int, ) map[cciptypes.ChainSelector]cciptypes.SeqNum { @@ -257,7 +270,7 @@ func (p *Plugin) offRampMaxSeqNumsConsensus( seqNum, count := mostFrequentElem(offRampMaxSeqNums) if count <= fDestChain { // TODO: metrics - p.lggr.Warnf("could not reach consensus on offRampMaxSeqNums for chain %d "+ + lggr.Warnf("could not reach consensus on offRampMaxSeqNums for chain %d "+ "because we did not receive a sequence number that was observed by at least fChain (%d) oracles, "+ "offRampMaxSeqNums: %v", chain, fDestChain, offRampMaxSeqNums) } else { @@ -271,20 +284,24 @@ func (p *Plugin) offRampMaxSeqNumsConsensus( // Given a mapping from chains to a list of FChain values for each chain, return a mapping from chains // to a single FChain. The consensus FChain for a given chain is the FChain with the most observations // that was observed at least f times, where f is the F of the DON (p.reportingCfg.F). -func (p *Plugin) fChainConsensus(fChainValues map[cciptypes.ChainSelector][]int) map[cciptypes.ChainSelector]int { +func fChainConsensus( + lggr logger.Logger, + F int, + fChainValues map[cciptypes.ChainSelector][]int, +) map[cciptypes.ChainSelector]int { consensus := make(map[cciptypes.ChainSelector]int) for chain, fValues := range fChainValues { - f, count := mostFrequentElem(fValues) - if count < p.reportingCfg.F { + fChain, count := mostFrequentElem(fValues) + if count < F { // TODO: metrics - p.lggr.Warnf("failed to reach consensus on fChain values for chain %d because no single f "+ - "value was observed more than the expected %d times, found f value %d observed by only %d oracles, "+ - "f values: %v", - chain, p.reportingCfg.F, f, count, fValues) + lggr.Warnf("failed to reach consensus on fChain values for chain %d because no single fChain "+ + "value was observed more than the expected %d times, found fChain value %d observed by only %d oracles, "+ + "fChain values: %v", + chain, F, fChain, count, fValues) } - consensus[chain] = f + consensus[chain] = fChain } return consensus diff --git a/commitrmnocb/plugin.go b/commitrmnocb/plugin.go index bccd8f5a..0f234008 100644 --- a/commitrmnocb/plugin.go +++ b/commitrmnocb/plugin.go @@ -105,12 +105,12 @@ func (p *Plugin) Close() error { return nil } -func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) (Outcome, CommitPluginState) { +func (p *Plugin) decodeOutcome(outcome ocr3types.Outcome) (Outcome, State) { if len(outcome) == 0 { return Outcome{}, SelectingRangesForReport } - decodedOutcome, err := DecodeCommitPluginOutcome(outcome) + decodedOutcome, err := DecodeOutcome(outcome) if err != nil { p.lggr.Errorw("Failed to decode Outcome", "outcome", outcome, "err", err) return Outcome{}, SelectingRangesForReport diff --git a/commitrmnocb/report.go b/commitrmnocb/report.go index b02baa95..4c424fb0 100644 --- a/commitrmnocb/report.go +++ b/commitrmnocb/report.go @@ -13,13 +13,14 @@ import ( ) func (p *Plugin) Reports(seqNr uint64, outcomeBytes ocr3types.Outcome) ([]ocr3types.ReportWithInfo[[]byte], error) { - outcome, err := DecodeCommitPluginOutcome(outcomeBytes) + outcome, err := DecodeOutcome(outcomeBytes) if err != nil { // TODO: metrics p.lggr.Errorw("failed to decode Outcome", "outcomeBytes", outcomeBytes, "err", err) return nil, fmt.Errorf("failed to decode Outcome (%s): %w", hex.EncodeToString(outcomeBytes), err) } + // Reports are only generated from "ReportGenerated" outcomes if outcome.OutcomeType != ReportGenerated { return []ocr3types.ReportWithInfo[[]byte]{}, nil } diff --git a/commitrmnocb/types.go b/commitrmnocb/types.go index 80ea5a99..d23fb225 100644 --- a/commitrmnocb/types.go +++ b/commitrmnocb/types.go @@ -231,13 +231,13 @@ func (o Outcome) Encode() ([]byte, error) { return encodedOutcome, nil } -func DecodeCommitPluginOutcome(b []byte) (Outcome, error) { +func DecodeOutcome(b []byte) (Outcome, error) { o := Outcome{} err := json.Unmarshal(b, &o) return o, err } -func (o Outcome) NextState() CommitPluginState { +func (o Outcome) NextState() State { switch o.OutcomeType { case ReportIntervalsSelected: return BuildingReport @@ -256,10 +256,10 @@ func (o Outcome) NextState() CommitPluginState { } } -type CommitPluginState int +type State int const ( - SelectingRangesForReport CommitPluginState = iota + 1 + SelectingRangesForReport State = iota + 1 BuildingReport WaitingForReportTransmission )