diff --git a/commit/merkleroot/observation.go b/commit/merkleroot/observation.go index 57ef440d..cb10625b 100644 --- a/commit/merkleroot/observation.go +++ b/commit/merkleroot/observation.go @@ -8,10 +8,12 @@ import ( "sync" "time" + mapset "github.com/deckarep/golang-set/v2" chainsel "github.com/smartcontractkit/chain-selectors" "github.com/smartcontractkit/libocr/commontypes" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "golang.org/x/sync/errgroup" "github.com/smartcontractkit/chainlink-common/pkg/hashutil" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -114,12 +116,10 @@ func (w *Processor) getObservation(ctx context.Context, q Query, previousOutcome switch nextState { case SelectingRangesForReport: offRampNextSeqNums := w.observer.ObserveOffRampNextSeqNums(ctx) + onRampLatestSeqNums := w.observer.ObserveLatestOnRampSeqNums(ctx, w.cfg.DestChain) + return Observation{ - // TODO: observe OnRamp max seq nums. The use of offRampNextSeqNums here effectively disables batching, - // e.g. the ranges selected for each chain will be [x, x] (e.g. [46, 46]), which means reports will only - // contain one message per chain. Querying the OnRamp contract requires changes to reader.CCIPReader, - // which will need to be done in a future change. - OnRampMaxSeqNums: offRampNextSeqNums, + OnRampMaxSeqNums: onRampLatestSeqNums, OffRampNextSeqNums: offRampNextSeqNums, FChain: w.observer.ObserveFChain(), }, nextState @@ -145,9 +145,12 @@ func (w *Processor) getObservation(ctx context.Context, q Query, previousOutcome } type Observer interface { - // ObserveOffRampNextSeqNums observes the next sequence numbers for each source chain from the OffRamp + // ObserveOffRampNextSeqNums observes the next OffRamp sequence numbers for each source chain ObserveOffRampNextSeqNums(ctx context.Context) []plugintypes.SeqNumChain + // ObserveLatestOnRampSeqNums observes the latest OnRamp sequence numbers for each configured source chain. + ObserveLatestOnRampSeqNums(ctx context.Context, destChain cciptypes.ChainSelector) []plugintypes.SeqNumChain + // ObserveMerkleRoots computes the merkle roots for the given sequence number ranges ObserveMerkleRoots(ctx context.Context, ranges []plugintypes.ChainRange) []cciptypes.MerkleRootChain @@ -202,6 +205,55 @@ func (o ObserverImpl) ObserveOffRampNextSeqNums(ctx context.Context) []plugintyp return result } +// ObserveLatestOnRampSeqNums observes the latest onRamp sequence numbers for each configured source chain. +func (o ObserverImpl) ObserveLatestOnRampSeqNums( + ctx context.Context, destChain cciptypes.ChainSelector) []plugintypes.SeqNumChain { + + allSourceChains, err := o.chainSupport.KnownSourceChainsSlice() + if err != nil { + o.lggr.Warnw("call to KnownSourceChainsSlice failed", "err", err) + return nil + } + + supportedChains, err := o.chainSupport.SupportedChains(o.nodeID) + if err != nil { + o.lggr.Warnw("call to KnownSourceChainsSlice failed", "err", err) + return nil + } + + sourceChains := mapset.NewSet(allSourceChains...).Intersect(supportedChains).ToSlice() + sort.Slice(sourceChains, func(i, j int) bool { return sourceChains[i] < sourceChains[j] }) + + latestOnRampSeqNums := make([]plugintypes.SeqNumChain, len(sourceChains)) + eg := &errgroup.Group{} + + for i, sourceChain := range sourceChains { + i, sourceChain := i, sourceChain + eg.Go(func() error { + nextOnRampSeqNum, err := o.ccipReader.GetExpectedNextSequenceNumber(ctx, sourceChain, destChain) + if err != nil { + return fmt.Errorf("failed to get expected next sequence number for source chain %d: %w", sourceChain, err) + } + if nextOnRampSeqNum == 0 { + return fmt.Errorf("expected next sequence number for source chain %d is 0", sourceChain) + } + + latestOnRampSeqNums[i] = plugintypes.SeqNumChain{ + ChainSel: sourceChain, + SeqNum: nextOnRampSeqNum - 1, // Latest is the next one minus one. + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + o.lggr.Warnw("call to GetExpectedNextSequenceNumber failed", "err", err) + return nil + } + + return latestOnRampSeqNums +} + // ObserveMerkleRoots computes the merkle roots for the given sequence number ranges func (o ObserverImpl) ObserveMerkleRoots( ctx context.Context, diff --git a/commit/merkleroot/observation_test.go b/commit/merkleroot/observation_test.go index 4f11191d..1afd5ab4 100644 --- a/commit/merkleroot/observation_test.go +++ b/commit/merkleroot/observation_test.go @@ -35,10 +35,10 @@ func Test_Observation(t *testing.T) { }, } offRampNextSeqNums := []plugintypes.SeqNumChain{ - { - ChainSel: 456, - SeqNum: 9987, - }, + {ChainSel: 456, SeqNum: 9987}, + } + onRampLatestSeqNums := []plugintypes.SeqNumChain{ + {ChainSel: 456, SeqNum: 9990}, } fChain := map[cciptypes.ChainSelector]int{ 872: 3, @@ -58,11 +58,12 @@ func Test_Observation(t *testing.T) { getObserver: func(t *testing.T) *merkleroot.MockObserver { observer := merkleroot.NewMockObserver(t) observer.EXPECT().ObserveOffRampNextSeqNums(mock.Anything).Once().Return(offRampNextSeqNums) + observer.EXPECT().ObserveLatestOnRampSeqNums(mock.Anything, mock.Anything).Return(onRampLatestSeqNums) observer.EXPECT().ObserveFChain().Once().Return(fChain) return observer }, expObs: Observation{ - OnRampMaxSeqNums: offRampNextSeqNums, + OnRampMaxSeqNums: onRampLatestSeqNums, OffRampNextSeqNums: offRampNextSeqNums, FChain: fChain, }, diff --git a/commit/merkleroot/outcome.go b/commit/merkleroot/outcome.go index 14dc900e..e8602ac6 100644 --- a/commit/merkleroot/outcome.go +++ b/commit/merkleroot/outcome.go @@ -46,7 +46,7 @@ func (w *Processor) getOutcome( switch nextState { case SelectingRangesForReport: - return reportRangesOutcome(q, consensusObservation), nextState + return reportRangesOutcome(q, w.lggr, consensusObservation, w.cfg.MaxMerkleTreeSize), nextState case BuildingReport: if q.RetryRMNSignatures { // We want to retry getting the RMN signatures on the exact same outcome we had before. @@ -64,10 +64,11 @@ func (w *Processor) getOutcome( } // reportRangesOutcome determines the sequence number ranges for each chain to build a report from in the next round -// TODO: ensure each range is below a limit func reportRangesOutcome( _ Query, + lggr logger.Logger, consensusObservation ConsensusObservation, + maxMerkleTreeSize uint64, ) Outcome { rangesToReport := make([]plugintypes.ChainRange, 0) @@ -82,11 +83,19 @@ func reportRangesOutcome( } if offRampNextSeqNum <= onRampMaxSeqNum { + rng := cciptypes.NewSeqNumRange(offRampNextSeqNum, onRampMaxSeqNum) + chainRange := plugintypes.ChainRange{ ChainSel: chainSel, - SeqNumRange: [2]cciptypes.SeqNum{offRampNextSeqNum, onRampMaxSeqNum}, + SeqNumRange: rng.Limit(maxMerkleTreeSize), } rangesToReport = append(rangesToReport, chainRange) + + if rng.End() != chainRange.SeqNumRange.End() { // Check if the range was truncated. + lggr.Infof("Range for chain %d: %s (before truncate: %v)", chainSel, chainRange.SeqNumRange, rng) + } else { + lggr.Infof("Range for chain %d: %s", chainSel, chainRange.SeqNumRange) + } } offRampNextSeqNums = append(offRampNextSeqNums, plugintypes.SeqNumChain{ diff --git a/commit/merkleroot/outcome_test.go b/commit/merkleroot/outcome_test.go index 34309587..6241dc63 100644 --- a/commit/merkleroot/outcome_test.go +++ b/commit/merkleroot/outcome_test.go @@ -7,6 +7,8 @@ import ( "github.com/stretchr/testify/require" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3" + + "github.com/smartcontractkit/chainlink-ccip/internal/plugintypes" ) func Test_buildReport(t *testing.T) { @@ -37,3 +39,80 @@ func Test_buildReport(t *testing.T) { } }) } + +func Test_reportRangesOutcome(t *testing.T) { + lggr := logger.Test(t) + + testCases := []struct { + name string + consensusObservation ConsensusObservation + merkleTreeSizeLimit uint64 + expectedOutcome Outcome + }{ + { + name: "base empty outcome", + expectedOutcome: Outcome{ + OutcomeType: ReportIntervalsSelected, + RangesSelectedForReport: []plugintypes.ChainRange{}, + OffRampNextSeqNums: []plugintypes.SeqNumChain{}, + }, + }, + { + name: "simple scenario with one chain", + consensusObservation: ConsensusObservation{ + OnRampMaxSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{ + 1: 20, + }, + OffRampNextSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{ + 1: 18, // off ramp next is 18, on ramp max is 20 so new msgs are: [18, 19, 20] + }, + }, + merkleTreeSizeLimit: 256, // default limit should be used + expectedOutcome: Outcome{ + OutcomeType: ReportIntervalsSelected, + RangesSelectedForReport: []plugintypes.ChainRange{ + {ChainSel: 1, SeqNumRange: cciptypes.NewSeqNumRange(18, 20)}, + }, + OffRampNextSeqNums: []plugintypes.SeqNumChain{ + {ChainSel: 1, SeqNum: 18}, + }, + }, + }, + { + name: "simple scenario with one chain", + consensusObservation: ConsensusObservation{ + OnRampMaxSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{ + 1: 20, + 2: 1000, + 3: 10000, + }, + OffRampNextSeqNums: map[cciptypes.ChainSelector]cciptypes.SeqNum{ + 1: 18, // off ramp next is 18, on ramp max is 20 so new msgs are: [18, 19, 20] + 2: 995, // off ramp next is 995, on ramp max is 1000 so new msgs are: [995, 996, 997, 998, 999, 1000] + 3: 500, // off ramp next is 500, we have new messages up to 10000 (default limit applied) + }, + }, + merkleTreeSizeLimit: 5, + expectedOutcome: Outcome{ + OutcomeType: ReportIntervalsSelected, + RangesSelectedForReport: []plugintypes.ChainRange{ + {ChainSel: 1, SeqNumRange: cciptypes.NewSeqNumRange(18, 20)}, + {ChainSel: 2, SeqNumRange: cciptypes.NewSeqNumRange(995, 999)}, + {ChainSel: 3, SeqNumRange: cciptypes.NewSeqNumRange(500, 504)}, + }, + OffRampNextSeqNums: []plugintypes.SeqNumChain{ + {ChainSel: 1, SeqNum: 18}, + {ChainSel: 2, SeqNum: 995}, + {ChainSel: 3, SeqNum: 500}, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + outc := reportRangesOutcome(Query{}, lggr, tc.consensusObservation, tc.merkleTreeSizeLimit) + require.Equal(t, tc.expectedOutcome, outc) + }) + } +} diff --git a/commit/plugin.go b/commit/plugin.go index 41d54f3e..2e46288d 100644 --- a/commit/plugin.go +++ b/commit/plugin.go @@ -59,6 +59,11 @@ func NewPlugin( reportingCfg ocr3types.ReportingPluginConfig, rmnConfig rmn.Config, ) *Plugin { + if cfg.MaxMerkleTreeSize == 0 { + lggr.Warnw("MaxMerkleTreeSize not set, using default value", "default", pluginconfig.EvmDefaultMaxMerkleTreeSize) + cfg.MaxMerkleTreeSize = pluginconfig.EvmDefaultMaxMerkleTreeSize + } + readerSyncer := plugincommon.NewBackgroundReaderSyncer( lggr, ccipReader, diff --git a/commit/plugin_e2e_test.go b/commit/plugin_e2e_test.go index 93f20ff4..58379c6b 100644 --- a/commit/plugin_e2e_test.go +++ b/commit/plugin_e2e_test.go @@ -331,6 +331,11 @@ func setupNode( } ccipReader.EXPECT().NextSeqNum(ctx, sourceChains).Return(offRampNextSeqNums, nil).Maybe() + for _, ch := range sourceChains { + ccipReader.EXPECT().GetExpectedNextSequenceNumber( + ctx, ch, destChain).Return(offRampNextSeqNum[ch]+1, nil).Maybe() + } + p := NewPlugin( ctx, nodeID, diff --git a/go.mod b/go.mod index fae21f1c..084c6b60 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.22.5 require ( github.com/deckarep/golang-set/v2 v2.6.0 github.com/smartcontractkit/chain-selectors v1.0.23 - github.com/smartcontractkit/chainlink-common v0.2.3-0.20240918103207-e78a0de3f684 + github.com/smartcontractkit/chainlink-common v0.2.3-0.20240919092417-53e784c2e420 github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7 github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 diff --git a/go.sum b/go.sum index 6046ec30..992bddc6 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.23 h1:D2Eaex4Cw/O7Lg3tX6WklOqnjjIQAEBnutCtksPzVDY= github.com/smartcontractkit/chain-selectors v1.0.23/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20240918103207-e78a0de3f684 h1:BZqdLoybuZ1JCqSt2cNe6r9WLLYOA1PvqeI+xL9keLY= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20240918103207-e78a0de3f684/go.mod h1:zm+l8gN4LQS1+YvwQDhRz/njirVeWGNiDJKIhCGwaoQ= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20240919092417-53e784c2e420 h1:+xNnYYgkxzKUIkLCOfzfAKUxeLLtuxlalDI70kNJ8No= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20240919092417-53e784c2e420/go.mod h1:zm+l8gN4LQS1+YvwQDhRz/njirVeWGNiDJKIhCGwaoQ= github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7 h1:e38V5FYE7DA1JfKXeD5Buo/7lczALuVXlJ8YNTAUxcw= github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7/go.mod h1:fb1ZDVXACvu4frX3APHZaEBp0xi1DIm34DcA0CwTsZM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/mocks/commit/merkleroot/observer.go b/mocks/commit/merkleroot/observer.go index 76cad185..b7c55072 100644 --- a/mocks/commit/merkleroot/observer.go +++ b/mocks/commit/merkleroot/observer.go @@ -72,6 +72,55 @@ func (_c *MockObserver_ObserveFChain_Call) RunAndReturn(run func() map[ccipocr3. return _c } +// ObserveLatestOnRampSeqNums provides a mock function with given fields: ctx, destChain +func (_m *MockObserver) ObserveLatestOnRampSeqNums(ctx context.Context, destChain ccipocr3.ChainSelector) []plugintypes.SeqNumChain { + ret := _m.Called(ctx, destChain) + + if len(ret) == 0 { + panic("no return value specified for ObserveLatestOnRampSeqNums") + } + + var r0 []plugintypes.SeqNumChain + if rf, ok := ret.Get(0).(func(context.Context, ccipocr3.ChainSelector) []plugintypes.SeqNumChain); ok { + r0 = rf(ctx, destChain) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]plugintypes.SeqNumChain) + } + } + + return r0 +} + +// MockObserver_ObserveLatestOnRampSeqNums_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ObserveLatestOnRampSeqNums' +type MockObserver_ObserveLatestOnRampSeqNums_Call struct { + *mock.Call +} + +// ObserveLatestOnRampSeqNums is a helper method to define mock.On call +// - ctx context.Context +// - destChain ccipocr3.ChainSelector +func (_e *MockObserver_Expecter) ObserveLatestOnRampSeqNums(ctx interface{}, destChain interface{}) *MockObserver_ObserveLatestOnRampSeqNums_Call { + return &MockObserver_ObserveLatestOnRampSeqNums_Call{Call: _e.mock.On("ObserveLatestOnRampSeqNums", ctx, destChain)} +} + +func (_c *MockObserver_ObserveLatestOnRampSeqNums_Call) Run(run func(ctx context.Context, destChain ccipocr3.ChainSelector)) *MockObserver_ObserveLatestOnRampSeqNums_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(ccipocr3.ChainSelector)) + }) + return _c +} + +func (_c *MockObserver_ObserveLatestOnRampSeqNums_Call) Return(_a0 []plugintypes.SeqNumChain) *MockObserver_ObserveLatestOnRampSeqNums_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockObserver_ObserveLatestOnRampSeqNums_Call) RunAndReturn(run func(context.Context, ccipocr3.ChainSelector) []plugintypes.SeqNumChain) *MockObserver_ObserveLatestOnRampSeqNums_Call { + _c.Call.Return(run) + return _c +} + // ObserveMerkleRoots provides a mock function with given fields: ctx, ranges func (_m *MockObserver) ObserveMerkleRoots(ctx context.Context, ranges []plugintypes.ChainRange) []ccipocr3.MerkleRootChain { ret := _m.Called(ctx, ranges) diff --git a/pluginconfig/commit.go b/pluginconfig/commit.go index d51eea5b..1f5b6a3d 100644 --- a/pluginconfig/commit.go +++ b/pluginconfig/commit.go @@ -9,11 +9,17 @@ import ( "strings" "time" + "github.com/smartcontractkit/chainlink-common/pkg/merklemulti" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3" - "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ) +// EvmDefaultMaxMerkleTreeSize is the default number of max new messages to put in a merkle tree. +// We use this default value when the config is not set for a specific chain. +const EvmDefaultMaxMerkleTreeSize = merklemulti.MaxNumberTreeLeaves + type CommitPluginConfig struct { // DestChain is the ccip destination chain configured for the commit plugin DON. DestChain cciptypes.ChainSelector `json:"destChain"` @@ -39,6 +45,11 @@ type CommitPluginConfig struct { // RMNSignaturesTimeout is the timeout for RMN signature verification. // Typically set to `MaxQueryDuration - e`, where e some small duration. RMNSignaturesTimeout time.Duration `json:"rmnSignaturesTimeout"` + + // MaxMerkleTreeSize is the maximum size of a merkle tree to create prior to calculating the merkle root. + // If for example in the next round we have 1000 pending messages and a max tree size of 256, only 256 seq nums + // will be in the report. If a value is not set we fallback to EvmDefaultMaxMerkleTreeSize. + MaxMerkleTreeSize uint64 `json:"maxTreeSize"` } func (c CommitPluginConfig) Validate() error {