Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commit plugin - support batch sizes greater than one #131

Merged
merged 9 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions commit/merkleroot/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
chainsel "github.com/smartcontractkit/chain-selectors"
"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"golang.org/x/sync/errgroup"

"github.com/smartcontractkit/chainlink-common/pkg/hashutil"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand Down Expand Up @@ -114,12 +116,10 @@ func (w *Processor) getObservation(ctx context.Context, q Query, previousOutcome
switch nextState {
case SelectingRangesForReport:
offRampNextSeqNums := w.observer.ObserveOffRampNextSeqNums(ctx)
onRampLatestSeqNums := w.observer.ObserveLatestOnRampSeqNums(ctx, w.cfg.DestChain)

return Observation{
// TODO: observe OnRamp max seq nums. The use of offRampNextSeqNums here effectively disables batching,
// e.g. the ranges selected for each chain will be [x, x] (e.g. [46, 46]), which means reports will only
// contain one message per chain. Querying the OnRamp contract requires changes to reader.CCIPReader,
// which will need to be done in a future change.
OnRampMaxSeqNums: offRampNextSeqNums,
OnRampMaxSeqNums: onRampLatestSeqNums,
OffRampNextSeqNums: offRampNextSeqNums,
FChain: w.observer.ObserveFChain(),
}, nextState
Expand All @@ -145,9 +145,12 @@ func (w *Processor) getObservation(ctx context.Context, q Query, previousOutcome
}

type Observer interface {
// ObserveOffRampNextSeqNums observes the next sequence numbers for each source chain from the OffRamp
// ObserveOffRampNextSeqNums observes the next OffRamp sequence numbers for each source chain
ObserveOffRampNextSeqNums(ctx context.Context) []plugintypes.SeqNumChain

// ObserveLatestOnRampSeqNums observes the latest OnRamp sequence numbers for each configured source chain.
ObserveLatestOnRampSeqNums(ctx context.Context, destChain cciptypes.ChainSelector) []plugintypes.SeqNumChain

// ObserveMerkleRoots computes the merkle roots for the given sequence number ranges
ObserveMerkleRoots(ctx context.Context, ranges []plugintypes.ChainRange) []cciptypes.MerkleRootChain

Expand Down Expand Up @@ -202,6 +205,55 @@ func (o ObserverImpl) ObserveOffRampNextSeqNums(ctx context.Context) []plugintyp
return result
}

// ObserveLatestOnRampSeqNums observes the latest onRamp sequence numbers for each configured source chain.
func (o ObserverImpl) ObserveLatestOnRampSeqNums(
ctx context.Context, destChain cciptypes.ChainSelector) []plugintypes.SeqNumChain {

allSourceChains, err := o.chainSupport.KnownSourceChainsSlice()
if err != nil {
o.lggr.Warnw("call to KnownSourceChainsSlice failed", "err", err)
return nil
}

supportedChains, err := o.chainSupport.SupportedChains(o.nodeID)
if err != nil {
o.lggr.Warnw("call to KnownSourceChainsSlice failed", "err", err)
return nil
}

sourceChains := mapset.NewSet(allSourceChains...).Intersect(supportedChains).ToSlice()
sort.Slice(sourceChains, func(i, j int) bool { return sourceChains[i] < sourceChains[j] })

latestOnRampSeqNums := make([]plugintypes.SeqNumChain, len(sourceChains))
eg := &errgroup.Group{}

for i, sourceChain := range sourceChains {
i, sourceChain := i, sourceChain
eg.Go(func() error {
nextOnRampSeqNum, err := o.ccipReader.GetExpectedNextSequenceNumber(ctx, sourceChain, destChain)
if err != nil {
return fmt.Errorf("failed to get expected next sequence number for source chain %d: %w", sourceChain, err)
}
if nextOnRampSeqNum == 0 {
return fmt.Errorf("expected next sequence number for source chain %d is 0", sourceChain)
}

latestOnRampSeqNums[i] = plugintypes.SeqNumChain{
ChainSel: sourceChain,
SeqNum: nextOnRampSeqNum - 1, // Latest is the next one minus one.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestOnRampSeqNum would be set to 0 for a new chain. Is that OK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, in that case there won't be any range reported, since (onRampMax < offRampNext).

}
return nil
})
}

if err := eg.Wait(); err != nil {
o.lggr.Warnw("call to GetExpectedNextSequenceNumber failed", "err", err)
return nil
}

return latestOnRampSeqNums
}

// ObserveMerkleRoots computes the merkle roots for the given sequence number ranges
func (o ObserverImpl) ObserveMerkleRoots(
ctx context.Context,
Expand Down
11 changes: 6 additions & 5 deletions commit/merkleroot/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func Test_Observation(t *testing.T) {
},
}
offRampNextSeqNums := []plugintypes.SeqNumChain{
{
ChainSel: 456,
SeqNum: 9987,
},
{ChainSel: 456, SeqNum: 9987},
}
onRampLatestSeqNums := []plugintypes.SeqNumChain{
{ChainSel: 456, SeqNum: 9990},
}
fChain := map[cciptypes.ChainSelector]int{
872: 3,
Expand All @@ -58,11 +58,12 @@ func Test_Observation(t *testing.T) {
getObserver: func(t *testing.T) *merkleroot.MockObserver {
observer := merkleroot.NewMockObserver(t)
observer.EXPECT().ObserveOffRampNextSeqNums(mock.Anything).Once().Return(offRampNextSeqNums)
observer.EXPECT().ObserveLatestOnRampSeqNums(mock.Anything, mock.Anything).Return(onRampLatestSeqNums)
observer.EXPECT().ObserveFChain().Once().Return(fChain)
return observer
},
expObs: Observation{
OnRampMaxSeqNums: offRampNextSeqNums,
OnRampMaxSeqNums: onRampLatestSeqNums,
OffRampNextSeqNums: offRampNextSeqNums,
FChain: fChain,
},
Expand Down
22 changes: 19 additions & 3 deletions commit/merkleroot/outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink-ccip/internal/libs/cciptypeutil"

"github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn"
"github.com/smartcontractkit/chainlink-ccip/internal/plugincommon"
"github.com/smartcontractkit/chainlink-ccip/internal/plugintypes"
Expand Down Expand Up @@ -46,7 +48,7 @@ func (w *Processor) getOutcome(

switch nextState {
case SelectingRangesForReport:
return reportRangesOutcome(q, consensusObservation), nextState
return reportRangesOutcome(q, w.lggr, consensusObservation, w.cfg.BatchLimits), nextState
case BuildingReport:
if q.RetryRMNSignatures {
// We want to retry getting the RMN signatures on the exact same outcome we had before.
Expand All @@ -64,10 +66,11 @@ func (w *Processor) getOutcome(
}

// reportRangesOutcome determines the sequence number ranges for each chain to build a report from in the next round
// TODO: ensure each range is below a limit
func reportRangesOutcome(
_ Query,
lggr logger.Logger,
consensusObservation ConsensusObservation,
rangeLimitsPerSourceChain map[cciptypes.ChainSelector]uint64,
) Outcome {
rangesToReport := make([]plugintypes.ChainRange, 0)

Expand All @@ -82,11 +85,24 @@ func reportRangesOutcome(
}

if offRampNextSeqNum <= onRampMaxSeqNum {
rngLimit, ok := rangeLimitsPerSourceChain[chainSel]
if !ok {
rngLimit = DefaultSeqNumsBatchLimit
}

rng := cciptypes.NewSeqNumRange(offRampNextSeqNum, onRampMaxSeqNum)

chainRange := plugintypes.ChainRange{
ChainSel: chainSel,
SeqNumRange: [2]cciptypes.SeqNum{offRampNextSeqNum, onRampMaxSeqNum},
SeqNumRange: cciptypeutil.SeqNumRangeLimit(rng, rngLimit),
}
rangesToReport = append(rangesToReport, chainRange)

if wasTruncated := rng.End() != chainRange.SeqNumRange.End(); wasTruncated {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if wasTruncated := rng.End() != chainRange.SeqNumRange.End(); wasTruncated {
// check if the range was truncated.
if rng.End() != chainRange.SeqNumRange.End() {

lggr.Infof("Range for chain %d: %s (before truncate: %v)", chainSel, chainRange.SeqNumRange, rng)
} else {
lggr.Infof("Range for chain %d: %s", chainSel, chainRange.SeqNumRange)
}
}

offRampNextSeqNums = append(offRampNextSeqNums, plugintypes.SeqNumChain{
Expand Down
81 changes: 81 additions & 0 deletions commit/merkleroot/outcome_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"github.com/stretchr/testify/require"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

"github.com/smartcontractkit/chainlink-ccip/plugintypes"
)

func Test_buildReport(t *testing.T) {
Expand Down Expand Up @@ -37,3 +39,82 @@
}
})
}

func Test_reportRangesOutcome(t *testing.T) {
lggr := logger.Test(t)

testCases := []struct {
name string
consensusObservation ConsensusObservation
rangeLimitsPerSourceChain map[cciptypes.ChainSelector]uint64
expectedOutcome Outcome
}{
{
name: "base empty outcome",
expectedOutcome: Outcome{
OutcomeType: ReportIntervalsSelected,
RangesSelectedForReport: []plugintypes.ChainRange{},

Check failure on line 56 in commit/merkleroot/outcome_test.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.22)

undefined: plugintypes.ChainRange
OffRampNextSeqNums: []plugintypes.SeqNumChain{},

Check failure on line 57 in commit/merkleroot/outcome_test.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.22)

undefined: plugintypes.SeqNumChain
},
},
{
name: "simple scenario with one chain",
consensusObservation: ConsensusObservation{
OnRampMaxSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{
1: 20,
},
OffRampNextSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{
1: 18, // off ramp next is 18, on ramp max is 20 so new msgs are: [18, 19, 20]
},
},
rangeLimitsPerSourceChain: map[cciptypes.ChainSelector]uint64{},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
rangeLimitsPerSourceChain: map[cciptypes.ChainSelector]uint64{},
rangeLimitsPerSourceChain: map[cciptypes.ChainSelector]uint64{}, // default limits

expectedOutcome: Outcome{
OutcomeType: ReportIntervalsSelected,
RangesSelectedForReport: []plugintypes.ChainRange{

Check failure on line 73 in commit/merkleroot/outcome_test.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.22)

undefined: plugintypes.ChainRange
{ChainSel: 1, SeqNumRange: cciptypes.NewSeqNumRange(18, 20)},
},
OffRampNextSeqNums: []plugintypes.SeqNumChain{

Check failure on line 76 in commit/merkleroot/outcome_test.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.22)

undefined: plugintypes.SeqNumChain
{ChainSel: 1, SeqNum: 18},
},
},
},
{
name: "simple scenario with one chain",
consensusObservation: ConsensusObservation{
OnRampMaxSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{
1: 20,
2: 1000,
3: 10000,
},
OffRampNextSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{
1: 18, // off ramp next is 18, on ramp max is 20 so new msgs are: [18, 19, 20]
2: 995, // off ramp next is 995, on ramp max is 1000 so new msgs are: [995, 996, 997, 998, 999, 1000]
3: 500, // off ramp next is 500, we have new messages up to 10000 (default limit applied)
},
},
rangeLimitsPerSourceChain: map[cciptypes.ChainSelector]uint64{
2: 5, // limit to 5 messages, [1000] should be excluded
},
expectedOutcome: Outcome{
OutcomeType: ReportIntervalsSelected,
RangesSelectedForReport: []plugintypes.ChainRange{

Check failure on line 100 in commit/merkleroot/outcome_test.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.22)

undefined: plugintypes.ChainRange
{ChainSel: 1, SeqNumRange: cciptypes.NewSeqNumRange(18, 20)},
{ChainSel: 2, SeqNumRange: cciptypes.NewSeqNumRange(995, 999)},
{ChainSel: 3, SeqNumRange: cciptypes.NewSeqNumRange(500, 755)},
},
OffRampNextSeqNums: []plugintypes.SeqNumChain{

Check failure on line 105 in commit/merkleroot/outcome_test.go

View workflow job for this annotation

GitHub Actions / build-lint-test (1.22)

undefined: plugintypes.SeqNumChain (typecheck)
{ChainSel: 1, SeqNum: 18},
{ChainSel: 2, SeqNum: 995},
{ChainSel: 3, SeqNum: 500},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
outc := reportRangesOutcome(Query{}, lggr, tc.consensusObservation, tc.rangeLimitsPerSourceChain)
require.Equal(t, tc.expectedOutcome, outc)
})
}
}
4 changes: 4 additions & 0 deletions commit/merkleroot/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"
)

// DefaultSeqNumsBatchLimit is the default number of max new messages to scan, we use this value when
// the config is not set for a specific chain.
const DefaultSeqNumsBatchLimit = 256
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxNumberTreeLeaves

Suggested change
const DefaultSeqNumsBatchLimit = 256
const DefaultSeqNumsBatchLimit = merklemulti.MaxNumberTreeLeaves

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be here, this should be in offchain config

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And make sure to prefix it with EVM/have EVM somewhere in the name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the default unless there's an override? Or you're saying there should be no default and the system needs to crash if the config is missing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally the latter (IMO defaults are often more trouble than they're worth in this scenario, because people would just use the default even though it might be wrong) but to start with we could fall back to the default and enforce that its provided later if we decide to do so

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added EVM prefix and moved to pluginconfig package


// Processor is the processor responsible for
// reading next messages and building merkle roots for them,
// It's setup to use RMN to query which messages to include in the merkle root and ensures
Expand Down
5 changes: 5 additions & 0 deletions commit/plugin_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ func setupNode(
}
ccipReader.EXPECT().NextSeqNum(ctx, sourceChains).Return(offRampNextSeqNums, nil).Maybe()

for _, ch := range sourceChains {
ccipReader.EXPECT().GetExpectedNextSequenceNumber(
ctx, ch, destChain).Return(offRampNextSeqNum[ch]+1, nil).Maybe()
}

p := NewPlugin(
ctx,
nodeID,
Expand Down
21 changes: 21 additions & 0 deletions internal/libs/cciptypeutil/seqnumrange.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cciptypeutil

import "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"

// SeqNumRangeLimit limits the range to n elements by truncating the end if necessary.
func SeqNumRangeLimit(rng ccipocr3.SeqNumRange, n uint64) ccipocr3.SeqNumRange {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could add a .Limit function to the SeqNumRange type in chainlink-common.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numElems := rng.End() - rng.Start() + 1
if numElems <= 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

< 0 is impossible for unsigned types since they'd just overflow, maybe just check == 0 here or have a End() >= Start() check at the top of the func to completely avoid the overflow scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

< 0 is impossible but checking <= 0 instead of == 0 should be harmless. There's an overflow check below.

return rng
}

if uint64(numElems) > n {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this cast necessary? Isn't numElems already a uint64 by virtue of both End() and Start() being uint64?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess its type is cciptypes.SeqNum?

newEnd := rng.Start() + ccipocr3.SeqNum(n) - 1
if newEnd > rng.End() { // overflow - do nothing
return rng
}
rng.SetEnd(newEnd)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of mutating func inputs, why not just return a brand new SeqNumRange?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed to chainlink-common: https://github.com/smartcontractkit/chainlink-common/pull/781/commits
works like this

}

return rng
}
62 changes: 62 additions & 0 deletions internal/libs/cciptypeutil/seqnumrange_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package cciptypeutil

import (
"testing"

"github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
)

func TestSeqNumRangeLimit(t *testing.T) {
testCases := []struct {
name string
rng ccipocr3.SeqNumRange
n uint64
want ccipocr3.SeqNumRange
}{
{
name: "no truncation",
rng: ccipocr3.NewSeqNumRange(0, 10),
n: 11,
want: ccipocr3.NewSeqNumRange(0, 10),
},
{
name: "no truncation 2",
rng: ccipocr3.NewSeqNumRange(100, 110),
n: 11,
want: ccipocr3.NewSeqNumRange(100, 110),
},
{
name: "truncation",
rng: ccipocr3.NewSeqNumRange(0, 10),
n: 10,
want: ccipocr3.NewSeqNumRange(0, 9),
},
{
name: "truncation 2",
rng: ccipocr3.NewSeqNumRange(100, 110),
n: 10,
want: ccipocr3.NewSeqNumRange(100, 109),
},
{
name: "empty",
rng: ccipocr3.NewSeqNumRange(0, 0),
n: 0,
want: ccipocr3.NewSeqNumRange(0, 0),
},
{
name: "wrong range",
rng: ccipocr3.NewSeqNumRange(20, 15),
n: 3,
want: ccipocr3.NewSeqNumRange(20, 15),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := SeqNumRangeLimit(tc.rng, tc.n)
if got != tc.want {
t.Errorf("SeqNumRangeLimit(%v, %v) = %v; want %v", tc.rng, tc.n, got, tc.want)
}
})
}
}
Loading
Loading