From 3cc98beb2f6bd70f7c6a7eded70c3079b86700af Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:49:34 -0700 Subject: [PATCH 01/24] Manually merged in from chainlink repo: BCI-3492 [LogPoller]: Allow withObservedExecAndRowsAffected to report non-zero rows affected (#14057) * Fix withObservedExecAndRowsAffected Also: - Change behavior of DeleteExpiredLogs to delete logs which don't match any filter - Add a test case to ensure the dataset size is published properly during pruning * pnpm changeset * changeset #fix -> #bugfix --- .changeset/sweet-pumas-refuse.md | 5 ++++ .../evm/logpoller/observability_test.go | 10 +++++++ core/chains/evm/logpoller/orm.go | 30 ++++++++----------- core/chains/evm/logpoller/orm_test.go | 13 ++++---- 4 files changed, 35 insertions(+), 23 deletions(-) create mode 100644 .changeset/sweet-pumas-refuse.md diff --git a/.changeset/sweet-pumas-refuse.md b/.changeset/sweet-pumas-refuse.md new file mode 100644 index 0000000000..fd642a9c94 --- /dev/null +++ b/.changeset/sweet-pumas-refuse.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#bugfix Addresses 2 minor issues with the pruning of LogPoller's db tables: logs not matching any filter will now be pruned, and rows deleted are now properly reported for observability diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 5e668a4ad1..2f502438bb 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -119,6 +119,16 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420"))) + rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3) + require.NoError(t, err) + require.Equal(t, int64(3), rowsAffected) + assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete")) + + rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0) + require.NoError(t, err) + require.Equal(t, int64(2), rowsAffected) + assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteBlocksBefore", "delete")) + // Don't update counters in case of an error require.Error(t, orm.InsertLogsWithBlock(ctx, logs, NewLogPollerBlock(utils.RandomBytes32(), 0, time.Now(), 0))) assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 9cbb21a606..21e1e71731 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -313,34 +314,29 @@ type Exp struct { ShouldDelete bool } +// DeleteExpiredLogs removes any logs which either: +// - don't match any currently registered filters, or +// - have a timestamp older than any matching filter's retention, UNLESS there is at +// least one matching filter with retention=0 func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { var err error var result sql.Result - if limit > 0 { - result, err = o.ds.ExecContext(ctx, ` - DELETE FROM evm.logs + query := `DELETE FROM evm.logs WHERE (evm_chain_id, address, event_sig, block_number) IN ( SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number FROM evm.logs l - INNER JOIN ( - SELECT address, event, MAX(retention) AS retention + LEFT JOIN ( + SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event - HAVING NOT 0 = ANY(ARRAY_AGG(retention)) ) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event - AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second') - LIMIT $2 - )`, ubig.New(o.chainID), limit) + WHERE r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)` + + if limit > 0 { + result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit) } else { - result, err = o.ds.ExecContext(ctx, `WITH r AS - ( SELECT address, event, MAX(retention) AS retention - FROM evm.log_poller_filters WHERE evm_chain_id=$1 - GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention)) - ) DELETE FROM evm.logs l USING r - WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event - AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT) - ubig.New(o.chainID)) + result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID)) } if err != nil { diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index ed3f58504a..f112f03322 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -457,20 +457,21 @@ func TestORM(t *testing.T) { time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period deleted, err := o1.DeleteExpiredLogs(ctx, 0) require.NoError(t, err) - assert.Equal(t, int64(1), deleted) + assert.Equal(t, int64(4), deleted) + logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) - // Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything - // matching filter12 should be kept regardless of what other filters it matches. - assert.Len(t, logs, 7) + // It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all + // 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1 + // of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12. + assert.Len(t, logs, 4) // Delete logs after should delete all logs. err = o1.DeleteLogsAndBlocksAfter(ctx, 1) require.NoError(t, err) logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - require.Zero(t, len(logs)) + assert.Zero(t, len(logs)) } type PgxLogger struct { From 679ba08f62f2db5c831e238873c25293121ecac2 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 23 Sep 2024 19:45:32 -0700 Subject: [PATCH 02/24] Manually merge in FilteredLogs receive []Expression from upstream commit 2761cd5bc5ed91bc17d4d67265ddc8fa03b84540 Author: Juan Farber Date: Wed Sep 4 17:16:00 2024 -0300 [BCI-3988] - FilteredLogs receive []Expression instead of whole KeyFilter (#14109) * FilteredLogs receive []Expression instead of whole KeyFilter * remove key from query.Where KeyFilter creation * add changeset * remove brackets from changeset * fix usage * fix comment lint * remove todo * refactor based on comments * add where func inside logpoller without key * fix reference --- .changeset/brown-geese-boil.md | 5 + core/chains/evm/logpoller/disabled.go | 2 +- core/chains/evm/logpoller/log_poller.go | 23 +++- core/chains/evm/logpoller/log_poller_test.go | 38 ++++++ core/chains/evm/logpoller/mocks/log_poller.go | 16 +-- core/chains/evm/logpoller/observability.go | 2 +- core/chains/evm/logpoller/orm.go | 7 +- core/chains/evm/logpoller/orm_test.go | 112 ++++++++---------- .../internal/ccipdata/v1_0_0/commit_store.go | 3 +- .../internal/ccipdata/v1_2_0/commit_store.go | 3 +- core/services/relay/evm/event_binding.go | 8 +- 11 files changed, 131 insertions(+), 88 deletions(-) create mode 100644 .changeset/brown-geese-boil.md diff --git a/.changeset/brown-geese-boil.md b/.changeset/brown-geese-boil.md new file mode 100644 index 0000000000..fa7f65f733 --- /dev/null +++ b/.changeset/brown-geese-boil.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +FilteredLogs receive Expression instead of whole KeyFilter. #internal diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index c0882ff76c..a04b4fdb19 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -118,7 +118,7 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash, return nil, ErrDisabled } -func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ string) ([]Log, error) { +func (d disabled) FilteredLogs(_ context.Context, _ []query.Expression, _ query.LimitAndSort, _ string) ([]Log, error) { return nil, ErrDisabled } diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index dee5d1d1a5..29a2eca355 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -68,7 +68,7 @@ type LogPoller interface { LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) // chainlink-common query filtering - FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) + FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) } type LogPollerTest interface { @@ -1518,6 +1518,25 @@ func EvmWord(i uint64) common.Hash { return common.BytesToHash(b) } -func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { +func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName) } + +// Where is a query.Where wrapper that ignores the Key and returns a slice of query.Expression rather than query.KeyFilter. +// If no expressions are provided, or an error occurs, an empty slice is returned. +func Where(expressions ...query.Expression) ([]query.Expression, error) { + filter, err := query.Where( + "", + expressions..., + ) + + if err != nil { + return []query.Expression{}, err + } + + if filter.Expressions == nil { + return []query.Expression{}, nil + } + + return filter.Expressions, nil +} diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 548711c19b..6a34e899df 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -26,6 +26,8 @@ import ( "go.uber.org/zap/zapcore" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils" htMocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks" @@ -2052,3 +2054,39 @@ func TestFindLCA(t *testing.T) { }) } } + +func TestWhere(t *testing.T) { + address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") + eventSig := common.HexToHash("0xabcdef1234567890abcdef1234567890abcdef1234") + ts := time.Now() + + expr1 := logpoller.NewAddressFilter(address) + expr2 := logpoller.NewEventSigFilter(eventSig) + expr3 := query.Timestamp(uint64(ts.Unix()), primitives.Gte) + expr4 := logpoller.NewConfirmationsFilter(evmtypes.Confirmations(0)) + + t.Run("Valid combination of filters", func(t *testing.T) { + result, err := logpoller.Where(expr1, expr2, expr3, expr4) + assert.NoError(t, err) + assert.Equal(t, []query.Expression{expr1, expr2, expr3, expr4}, result) + }) + + t.Run("No expressions (should return empty slice)", func(t *testing.T) { + result, err := logpoller.Where() + assert.NoError(t, err) + assert.Equal(t, []query.Expression{}, result) + }) + + t.Run("Invalid boolean expression", func(t *testing.T) { + invalidExpr := query.Expression{ + BoolExpression: query.BoolExpression{ + Expressions: []query.Expression{}, + }, + } + + result, err := logpoller.Where(invalidExpr) + assert.Error(t, err) + assert.EqualError(t, err, "all boolean expressions should have at least 2 expressions") + assert.Equal(t, []query.Expression{}, result) + }) +} diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index 4ce68839d1..9ae4d9767c 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -124,7 +124,7 @@ func (_c *LogPoller_DeleteLogsAndBlocksAfter_Call) RunAndReturn(run func(context } // FilteredLogs provides a mock function with given fields: ctx, filter, limitAndSort, queryName -func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]logpoller.Log, error) { +func (_m *LogPoller) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]logpoller.Log, error) { ret := _m.Called(ctx, filter, limitAndSort, queryName) if len(ret) == 0 { @@ -133,10 +133,10 @@ func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, l var r0 []logpoller.Log var r1 error - if rf, ok := ret.Get(0).(func(context.Context, query.KeyFilter, query.LimitAndSort, string) ([]logpoller.Log, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)); ok { return rf(ctx, filter, limitAndSort, queryName) } - if rf, ok := ret.Get(0).(func(context.Context, query.KeyFilter, query.LimitAndSort, string) []logpoller.Log); ok { + if rf, ok := ret.Get(0).(func(context.Context, []query.Expression, query.LimitAndSort, string) []logpoller.Log); ok { r0 = rf(ctx, filter, limitAndSort, queryName) } else { if ret.Get(0) != nil { @@ -144,7 +144,7 @@ func (_m *LogPoller) FilteredLogs(ctx context.Context, filter query.KeyFilter, l } } - if rf, ok := ret.Get(1).(func(context.Context, query.KeyFilter, query.LimitAndSort, string) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, []query.Expression, query.LimitAndSort, string) error); ok { r1 = rf(ctx, filter, limitAndSort, queryName) } else { r1 = ret.Error(1) @@ -160,16 +160,16 @@ type LogPoller_FilteredLogs_Call struct { // FilteredLogs is a helper method to define mock.On call // - ctx context.Context -// - filter query.KeyFilter +// - filter []query.Expression // - limitAndSort query.LimitAndSort // - queryName string func (_e *LogPoller_Expecter) FilteredLogs(ctx interface{}, filter interface{}, limitAndSort interface{}, queryName interface{}) *LogPoller_FilteredLogs_Call { return &LogPoller_FilteredLogs_Call{Call: _e.mock.On("FilteredLogs", ctx, filter, limitAndSort, queryName)} } -func (_c *LogPoller_FilteredLogs_Call) Run(run func(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string)) *LogPoller_FilteredLogs_Call { +func (_c *LogPoller_FilteredLogs_Call) Run(run func(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string)) *LogPoller_FilteredLogs_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(query.KeyFilter), args[2].(query.LimitAndSort), args[3].(string)) + run(args[0].(context.Context), args[1].([]query.Expression), args[2].(query.LimitAndSort), args[3].(string)) }) return _c } @@ -179,7 +179,7 @@ func (_c *LogPoller_FilteredLogs_Call) Return(_a0 []logpoller.Log, _a1 error) *L return _c } -func (_c *LogPoller_FilteredLogs_Call) RunAndReturn(run func(context.Context, query.KeyFilter, query.LimitAndSort, string) ([]logpoller.Log, error)) *LogPoller_FilteredLogs_Call { +func (_c *LogPoller_FilteredLogs_Call) RunAndReturn(run func(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)) *LogPoller_FilteredLogs_Call { _c.Call.Return(run) return _c } diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 782307e7d0..e0ed0cc478 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -262,7 +262,7 @@ func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address c }) } -func (o *ObservedORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { +func (o *ObservedORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) { return withObservedQueryAndResults(o, queryName, func() ([]Log, error) { return o.ORM.FilteredLogs(ctx, filter, limitAndSort, queryName) }) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 21e1e71731..dd3ecfa609 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -64,7 +64,7 @@ type ORM interface { SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error) // FilteredLogs accepts chainlink-common filtering DSL. - FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) + FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) } type DSORM struct { @@ -964,9 +964,8 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si return logs, nil } -// TODO flaky BCF-3258 -func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, _ string) ([]Log, error) { - qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter.Expressions, limitAndSort) +func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, _ string) ([]Log, error) { + qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter, limitAndSort) if err != nil { return nil, err } diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index f112f03322..4ef75ddc2b 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "math/big" + "strconv" "testing" "time" @@ -630,7 +631,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.Equal(t, 1, len(lgs)) assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes()) - lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1}).Expressions, limiter, "") require.NoError(t, err) require.Equal(t, 1, len(lgs)) assert.Equal(t, logpoller.EvmWord(1).Bytes(), lgs[0].GetTopics()[1].Bytes()) @@ -639,19 +640,17 @@ func TestORM_IndexedLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, standardFilter(1, []uint64{1, 2}).Expressions, limiter, "") require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - blockRangeFilter := func(start, end uint64, topicIdx uint64, topicValues []uint64) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - filtersForTopics(topicIdx, topicValues), - query.Block(start, primitives.Gte), - query.Block(end, primitives.Lte), - }, + blockRangeFilter := func(start, end string, topicIdx uint64, topicValues []uint64) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + filtersForTopics(topicIdx, topicValues), + query.Block(start, primitives.Gte), + query.Block(end, primitives.Lte), } } @@ -710,23 +709,21 @@ func TestORM_IndexedLogs(t *testing.T) { }, } - lgs, err = o1.FilteredLogs(ctx, filter, limiter, "") + lgs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "") require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - rangeFilter := func(topicIdx uint64, min, max uint64) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte}, - }), - logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte}, - }), - query.Confidence(primitives.Unconfirmed), - }, + rangeFilter := func(topicIdx uint64, min, max uint64) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ + {Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte}, + }), + logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ + {Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte}, + }), + query.Confidence(primitives.Unconfirmed), } } @@ -834,7 +831,7 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) { }, } - retrievedLogs, err = o1.FilteredLogs(ctx, filter, limiter, "") + retrievedLogs, err = o1.FilteredLogs(ctx, filter.Expressions, limiter, "") require.NoError(t, err) require.Equal(t, 2, len(retrievedLogs)) @@ -875,19 +872,17 @@ func TestORM_DataWords(t *testing.T) { }, })) - wordFilter := func(wordIdx uint8, word1, word2 uint64) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte}, - }), - logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte}, - }), - query.Confidence(primitives.Unconfirmed), - }, + wordFilter := func(wordIdx uint8, word1, word2 uint64) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ + {Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte}, + }), + logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ + {Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte}, + }), + query.Confidence(primitives.Unconfirmed), } } @@ -946,15 +941,13 @@ func TestORM_DataWords(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - filter := query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(addr), - logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte}, - }), - query.Confidence(primitives.Unconfirmed), - }, + filter := []query.Expression{ + logpoller.NewAddressFilter(addr), + logpoller.NewEventSigFilter(eventSig), + logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{ + {Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte}, + }), + query.Confidence(primitives.Unconfirmed), } lgs, err = o1.FilteredLogs(ctx, filter, limiter, "") @@ -1098,8 +1091,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { }) assertion(t, logs, err, startBlock, endBlock) - - logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, startBlock, endBlock), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, filter([]common.Hash{topic, topic2}, strconv.Itoa(int(startBlock)), strconv.Itoa(int(endBlock))).Expressions, limiter, "") assertion(t, logs, err, startBlock, endBlock) } @@ -1161,14 +1153,12 @@ func TestLogPoller_Logs(t *testing.T) { assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[4].BlockHash.String()) assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[5].BlockHash.String()) - logFilter := func(start, end uint64, address common.Address) query.KeyFilter { - return query.KeyFilter{ - Expressions: []query.Expression{ - logpoller.NewAddressFilter(address), - logpoller.NewEventSigFilter(event1), - query.Block(start, primitives.Gte), - query.Block(end, primitives.Lte), - }, + logFilter := func(start, end string, address common.Address) []query.Expression { + return []query.Expression{ + logpoller.NewAddressFilter(address), + logpoller.NewEventSigFilter(event1), + query.Block(start, primitives.Gte), + query.Block(end, primitives.Lte), } } @@ -1722,7 +1712,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { assertion(t, logs, err, tt.expectedLogs) - logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 0, nil).Expressions, limiter, "") assertion(t, logs, err, tt.expectedLogs) }) @@ -1735,7 +1725,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { assertion(t, logs, err, tt.expectedLogs) - logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, filter(tt.after, tt.confs, 1, []common.Hash{event}).Expressions, limiter, "") assertion(t, logs, err, tt.expectedLogs) }) @@ -1991,7 +1981,7 @@ func TestSelectLogsDataWordBetween(t *testing.T) { assertion(t, logs, err, tt.expectedLogs) - logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue), limiter, "") + logs, err = th.ORM.FilteredLogs(ctx, wordFilter(tt.wordValue).Expressions, limiter, "") assertion(t, logs, err, tt.expectedLogs) }) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go index 3e58143a28..bba12f240f 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go @@ -336,8 +336,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, return nil, err } - reportsQuery, err := query.Where( - c.address.String(), + reportsQuery, err := logpoller.Where( logpoller.NewAddressFilter(c.address), logpoller.NewEventSigFilter(c.reportAcceptedSig), query.Timestamp(uint64(ts.Unix()), primitives.Gte), diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go index ecc8acb576..b7354da364 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go @@ -351,8 +351,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, return nil, err } - reportsQuery, err := query.Where( - c.address.String(), + reportsQuery, err := logpoller.Where( logpoller.NewAddressFilter(c.address), logpoller.NewEventSigFilter(c.reportAcceptedSig), query.Timestamp(uint64(ts.Unix()), primitives.Gte), diff --git a/core/services/relay/evm/event_binding.go b/core/services/relay/evm/event_binding.go index acfb1aa630..9b5a0f2dc6 100644 --- a/core/services/relay/evm/event_binding.go +++ b/core/services/relay/evm/event_binding.go @@ -168,7 +168,7 @@ func (e *eventBinding) QueryKey(ctx context.Context, filter query.KeyFilter, lim } remapped.Expressions = append(defaultExpressions, remapped.Expressions...) - logs, err := e.lp.FilteredLogs(ctx, remapped, limitAndSort, e.contractName+"-"+e.eventName) + logs, err := e.lp.FilteredLogs(ctx, remapped.Expressions, limitAndSort, e.contractName+"-"+e.address.String()+"-"+e.eventName) if err != nil { return nil, err } @@ -224,14 +224,8 @@ func (e *eventBinding) getLatestValueWithFilters( return err } - fai := filtersAndIndices[0] remainingFilters := filtersAndIndices[1:] - logs, err := e.lp.IndexedLogs(ctx, e.hash, e.address, 1, []common.Hash{fai}, confs) - if err != nil { - return wrapInternalErr(err) - } - // TODO Use filtered logs here BCF-3316 // TODO: there should be a better way to ask log poller to filter these // First, you should be able to ask for as many topics to match From 7456353c515c187dc896f077569e59ad41a7d8b5 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:16:48 -0700 Subject: [PATCH 03/24] use tx in insertLogsWithinTx --- core/chains/evm/logpoller/orm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index dd3ecfa609..4befa25264 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -389,7 +389,7 @@ func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.D (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) ON CONFLICT DO NOTHING` - _, err := o.ds.NamedExecContext(ctx, query, logs[start:end]) + _, err := tx.NamedExecContext(ctx, query, logs[start:end]) if err != nil { if pkgerrors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 { // In case of DB timeouts, try to insert again with a smaller batch upto a limit From 5b22a2464a010e05107ef001fbad8d580723ed2f Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:54:20 -0700 Subject: [PATCH 04/24] Add id column as PRIMARY KEY for evm.logs & evm.log_poller_blocks Also: - Add UNIQUE INDEXes to replace previous primary keys (still necessary, both for optimizing queries and for enforcing uniqueness constraints) - Replace all SELECT *'s with helper functions for selecting all columns - Refactor nestedBlockQuery into withConfs, and make a bit more use of it --- core/chains/evm/logpoller/orm.go | 311 +++++++++--------- core/chains/evm/logpoller/parser.go | 7 +- core/chains/evm/logpoller/parser_test.go | 125 ++++--- .../0248_log_poller_primary_keys.sql | 18 + 4 files changed, 236 insertions(+), 225 deletions(-) create mode 100644 core/store/migrate/migrations/0248_log_poller_primary_keys.sql diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 4befa25264..de5abaf908 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -185,9 +185,52 @@ func (o *DSORM) LoadFilters(ctx context.Context) (map[string]Filter, error) { return filters, err } +func blocksQuery(clause string) string { + return fmt.Sprintf(`SELECT %s FROM evm.log_poller_blocks %s`, strings.Join(blocksFields[:], ", "), clause) +} +func logsQuery(clause string) string { + return fmt.Sprintf(`SELECT %s FROM evm.logs %s`, strings.Join(logsFields[:], ", "), clause) +} + +func logsQueryWithTablePrefix(tableAlias string, clause string) string { + var s strings.Builder + for i, field := range logsFields { + if i > 0 { + s.WriteString(", ") + } + s.WriteString(fmt.Sprintf("%s.%s", tableAlias, field)) + } + return fmt.Sprintf(`SELECT %s FROM evm.logs AS %s %s`, s.String(), tableAlias, clause) +} + +func withConfs(query string, tableAlias string, confs evmtypes.Confirmations) string { + var lastConfirmedBlock string + + var tablePrefix string + if tableAlias != "" { + tablePrefix = tableAlias + "." + } + if confs == evmtypes.Finalized { + lastConfirmedBlock = `finalized_block_number` + } else { + lastConfirmedBlock = `block_number - :confs` + } + return fmt.Sprintf(`%s %sblock_number <= ( + SELECT %s + FROM evm.log_poller_blocks + WHERE evm_chain_id = :evm_chain_id + ORDER BY block_number DESC LIMIT 1)`, query, tablePrefix, lastConfirmedBlock) +} + +func logsQueryWithConfs(clause string, confs evmtypes.Confirmations) string { + return withConfs(logsQuery(clause), "", confs) +} + func (o *DSORM) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE block_hash = $1 AND evm_chain_id = $2`, hash.Bytes(), ubig.New(o.chainID)); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE block_hash = $1 AND evm_chain_id = $2`), + hash.Bytes(), ubig.New(o.chainID)); err != nil { return nil, err } return &b, nil @@ -195,7 +238,9 @@ func (o *DSORM) SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPo func (o *DSORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE block_number = $1 AND evm_chain_id = $2`, n, ubig.New(o.chainID)); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE block_number = $1 AND evm_chain_id = $2`), n, ubig.New(o.chainID), + ); err != nil { return nil, err } return &b, nil @@ -203,7 +248,9 @@ func (o *DSORM) SelectBlockByNumber(ctx context.Context, n int64) (*LogPollerBlo func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1`, ubig.New(o.chainID)); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1`), ubig.New(o.chainID), + ); err != nil { return nil, err } return &b, nil @@ -211,7 +258,10 @@ func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error) func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) { var b LogPollerBlock - if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`, ubig.New(o.chainID), minAllowedBlockNumber); err != nil { + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`), + ubig.New(o.chainID), minAllowedBlockNumber, + ); err != nil { return nil, err } return &b, nil @@ -224,15 +274,11 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig if err != nil { return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id + query := logsQueryWithConfs( + `WHERE evm_chain_id = :evm_chain_id AND event_sig = :event_sig - AND address = :address - AND block_number <= %s - ORDER BY block_number desc, log_index DESC - LIMIT 1 - `, nestedBlockNumberQuery(confs)) + AND address = :address AND `, confs) + + ` ORDER BY block_number desc, log_index DESC LIMIT 1` var l Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -256,8 +302,7 @@ func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) WHERE block_number <= $1 AND evm_chain_id = $2 LIMIT $3 - ) - AND evm_chain_id = $2`, + ) AND evm_chain_id = $2`, end, ubig.New(o.chainID), limit) if err != nil { return 0, err @@ -421,11 +466,11 @@ func (o *DSORM) SelectLogsByBlockRange(ctx context.Context, start, end int64) ([ return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND block_number >= :start_block - AND block_number <= :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND block_number >= :start_block + AND block_number <= :end_block + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -450,13 +495,13 @@ func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address common return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND block_number >= :start_block - AND block_number <= :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND block_number >= :start_block + AND block_number <= :end_block + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -481,14 +526,12 @@ func (o *DSORM) SelectLogsCreatedAfter(ctx context.Context, address common.Addre return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND block_timestamp > :block_timestamp_after - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs( + `WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND block_timestamp > :block_timestamp_after AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -515,12 +558,12 @@ func (o *DSORM) SelectLogsWithSigs(ctx context.Context, start, end int64, addres return nil, err } - query := `SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = ANY(:event_sig_array) - AND block_number BETWEEN :start_block AND :end_block - ORDER BY block_number, log_index` + query := logsQuery(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = ANY(:event_sig_array) + AND block_number BETWEEN :start_block AND :end_block + ORDER BY block_number, log_index`) query, sqlArgs, err := o.ds.BindNamed(query, args) if err != nil { @@ -543,11 +586,11 @@ func (o *DSORM) GetBlocksRange(ctx context.Context, start int64, end int64) ([]L return nil, err } - query := `SELECT * FROM evm.log_poller_blocks + query := blocksQuery(` WHERE block_number >= :start_block AND block_number <= :end_block AND evm_chain_id = :evm_chain_id - ORDER BY block_number ASC` + ORDER BY block_number ASC`) var blocks []LogPollerBlock query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -574,17 +617,14 @@ func (o *DSORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, from return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs WHERE (block_number, address, event_sig) IN ( + query := logsQueryWithConfs(`WHERE (block_number, address, event_sig) IN ( SELECT MAX(block_number), address, event_sig FROM evm.logs WHERE evm_chain_id = :evm_chain_id AND event_sig = ANY(:event_sig_array) AND address = ANY(:address_array) - AND block_number > :start_block - AND block_number <= %s - GROUP BY event_sig, address - ) - ORDER BY block_number ASC`, nestedBlockNumberQuery(confs)) + AND block_number > :start_block AND `, confs) + + `GROUP BY event_sig, address) + ORDER BY block_number ASC` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -609,13 +649,11 @@ func (o *DSORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, if err != nil { return 0, err } - query := fmt.Sprintf(` - SELECT COALESCE(MAX(block_number), 0) FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND event_sig = ANY(:event_sig_array) - AND address = ANY(:address_array) - AND block_number > :start_block - AND block_number <= %s`, nestedBlockNumberQuery(confs)) + query := withConfs(`SELECT COALESCE(MAX(block_number), 0) FROM evm.logs + WHERE evm_chain_id = :evm_chain_id + AND event_sig = ANY(:event_sig_array) + AND address = ANY(:address_array) + AND block_number > :start_block AND `, "", confs) var blockNumber int64 query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -640,14 +678,12 @@ func (o *DSORM) SelectLogsDataWordRange(ctx context.Context, address common.Addr return nil, err } - query := fmt.Sprintf(`SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND substring(data from 32*:word_index+1 for 32) >= :word_value_min - AND substring(data from 32*:word_index+1 for 32) <= :word_value_max - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index+1 for 32) >= :word_value_min + AND substring(data from 32*:word_index+1 for 32) <= :word_value_max AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -671,14 +707,11 @@ func (o *DSORM) SelectLogsDataWordGreaterThan(ctx context.Context, address commo return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND substring(data from 32*:word_index+1 for 32) >= :word_value_min - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index+1 for 32) >= :word_value_min AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -702,15 +735,12 @@ func (o *DSORM) SelectLogsDataWordBetween(ctx context.Context, address common.Ad if err != nil { return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND substring(data from 32*:word_index_min+1 for 32) <= :word_value - AND substring(data from 32*:word_index_max+1 for 32) >= :word_value - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND substring(data from 32*:word_index_min+1 for 32) <= :word_value + AND substring(data from 32*:word_index_max+1 for 32) >= :word_value AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -734,14 +764,11 @@ func (o *DSORM) SelectIndexedLogsTopicGreaterThan(ctx context.Context, address c return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] >= :topic_value_min - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] >= :topic_value_min AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -766,15 +793,12 @@ func (o *DSORM) SelectIndexedLogsTopicRange(ctx context.Context, address common. return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] >= :topic_value_min - AND topics[:topic_index] <= :topic_value_max - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] >= :topic_value_min + AND topics[:topic_index] <= :topic_value_max AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -798,14 +822,12 @@ func (o *DSORM) SelectIndexedLogs(ctx context.Context, address common.Address, e return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] = ANY(:topic_values) - AND block_number <= %s - ORDER BY block_number, log_index`, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] = ANY(:topic_values) AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -831,14 +853,14 @@ func (o *DSORM) SelectIndexedLogsByBlockRange(ctx context.Context, start, end in return nil, err } - query := `SELECT * FROM evm.logs + query := logsQuery(` WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :event_sig AND topics[:topic_index] = ANY(:topic_values) AND block_number >= :start_block AND block_number <= :end_block - ORDER BY block_number, log_index` + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -864,16 +886,13 @@ func (o *DSORM) SelectIndexedLogsCreatedAfter(ctx context.Context, address commo return nil, err } - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] = ANY(:topic_values) - AND block_timestamp > :block_timestamp_after - AND block_number <= %s - ORDER BY block_number, log_index - `, nestedBlockNumberQuery(confs)) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] = ANY(:topic_values) + AND block_timestamp > :block_timestamp_after AND `, confs) + + `ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -897,12 +916,12 @@ func (o *DSORM) SelectIndexedLogsByTxHash(ctx context.Context, address common.Ad return nil, err } - query := `SELECT * FROM evm.logs + query := logsQuery(` WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :event_sig AND tx_hash = :tx_hash - ORDER BY block_number, log_index` + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -932,25 +951,22 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si return nil, err } - nestedQuery := nestedBlockNumberQuery(confs) - query := fmt.Sprintf(` - SELECT * FROM evm.logs - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :sigA - AND block_number BETWEEN :start_block AND :end_block - AND block_number <= %s - EXCEPT - SELECT a.* FROM evm.logs AS a - INNER JOIN evm.logs B - ON a.evm_chain_id = b.evm_chain_id - AND a.address = b.address - AND a.topics[:topic_index] = b.topics[:topic_index] - AND a.event_sig = :sigA - AND b.event_sig = :sigB - AND b.block_number BETWEEN :start_block AND :end_block - AND b.block_number <= %s - ORDER BY block_number, log_index`, nestedQuery, nestedQuery) + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :sigA + AND block_number BETWEEN :start_block AND :end_block AND `, confs) + + ` EXCEPT ` + + withConfs(logsQueryWithTablePrefix("a", ` + INNER JOIN evm.logs AS b + ON a.evm_chain_id = b.evm_chain_id + AND a.address = b.address + AND a.topics[:topic_index] = b.topics[:topic_index] + AND a.event_sig = :sigA + AND b.event_sig = :sigB + AND b.block_number BETWEEN :start_block AND :end_block + AND `), "b", confs) + + ` ORDER BY block_number, log_index` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -987,20 +1003,3 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim return logs, nil } - -func nestedBlockNumberQuery(confs evmtypes.Confirmations) string { - if confs == evmtypes.Finalized { - return ` - (SELECT finalized_block_number - FROM evm.log_poller_blocks - WHERE evm_chain_id = :evm_chain_id - ORDER BY block_number DESC LIMIT 1) ` - } - // Intentionally wrap with greatest() function and don't return negative block numbers when :confs > :block_number - // It doesn't impact logic of the outer query, because block numbers are never less or equal to 0 (guarded by log_poller_blocks_block_number_check) - return ` - (SELECT greatest(block_number - :confs, 0) - FROM evm.log_poller_blocks - WHERE evm_chain_id = :evm_chain_id - ORDER BY block_number DESC LIMIT 1) ` -} diff --git a/core/chains/evm/logpoller/parser.go b/core/chains/evm/logpoller/parser.go index e08ea93da7..0acac07575 100644 --- a/core/chains/evm/logpoller/parser.go +++ b/core/chains/evm/logpoller/parser.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -26,6 +27,10 @@ const ( var ( ErrUnexpectedCursorFormat = errors.New("unexpected cursor format") + logsFields = [...]string{"evm_chain_id", "log_index", "block_hash", "block_number", + "address", "event_sig", "topics", "tx_hash", "data", "created_at", "block_timestamp"} + blocksFields = [...]string{"evm_chain_id", "block_hash", "block_number", "block_timestamp", + "finalized_block_number", "created_at"} ) // The parser builds SQL expressions piece by piece for each Accept function call and resets the error and expression @@ -220,7 +225,7 @@ func (v *pgDSLParser) buildQuery(chainID *big.Int, expressions []query.Expressio v.err = nil // build the query string - clauses := []string{"SELECT evm.logs.* FROM evm.logs"} + clauses := []string{logsQuery("")} where, err := v.whereClause(expressions, limiter) if err != nil { diff --git a/core/chains/evm/logpoller/parser_test.go b/core/chains/evm/logpoller/parser_test.go index 5e99ec7ba8..b4099e000d 100644 --- a/core/chains/evm/logpoller/parser_test.go +++ b/core/chains/evm/logpoller/parser_test.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -34,7 +35,7 @@ func TestDSLParser(t *testing.T) { result, args, err := parser.buildQuery(chainID, expressions, limiter) require.NoError(t, err) - assert.Equal(t, "SELECT evm.logs.* FROM evm.logs WHERE evm_chain_id = :evm_chain_id ORDER BY "+defaultSort, result) + assert.Equal(t, logsQuery(" WHERE evm_chain_id = :evm_chain_id ORDER BY "+defaultSort), result) assertArgs(t, args, 1) }) @@ -52,15 +53,14 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CursorLimit("10-5-0x42", query.CursorFollowing, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0 " + - "AND block_number <= " + - "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + - "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " + - "ORDER BY block_number ASC, log_index ASC, tx_hash ASC " + - "LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (address = :address_0 AND event_sig = :event_sig_0 " + + "AND block_number <= " + + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number AND log_index > :cursor_log_index)) " + + "ORDER BY block_number ASC, log_index ASC, tx_hash ASC " + + "LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -80,12 +80,11 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CountLimit(20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (address = :address_0 AND event_sig = :event_sig_0) " + - "ORDER BY " + defaultSort + " " + - "LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (address = :address_0 AND event_sig = :event_sig_0) " + + "ORDER BY " + defaultSort + " " + + "LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -102,10 +101,9 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Desc)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "ORDER BY block_number DESC, log_index DESC, tx_hash DESC" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC") require.NoError(t, err) assert.Equal(t, expected, result) @@ -122,10 +120,9 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortByBlock(query.Asc), query.NewSortByTimestamp(query.Desc)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "ORDER BY block_number ASC, block_timestamp DESC" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "ORDER BY block_number ASC, block_timestamp DESC") require.NoError(t, err) assert.Equal(t, expected, result) @@ -147,16 +144,15 @@ func TestDSLParser(t *testing.T) { limiter := query.NewLimitAndSort(query.CursorLimit("10-20-0x42", query.CursorPrevious, 20)) result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp = :block_timestamp_0 " + - "AND tx_hash = :tx_hash_0 " + - "AND block_number != :block_number_0 " + - "AND block_number <= " + - "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + - "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " + - "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20" + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp = :block_timestamp_0 " + + "AND tx_hash = :tx_hash_0 " + + "AND block_number != :block_number_0 " + + "AND block_number <= " + + "(SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1)) " + + "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number AND log_index < :cursor_log_index)) " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20") require.NoError(t, err) assert.Equal(t, expected, result) @@ -175,10 +171,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -194,10 +189,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -213,10 +207,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -243,10 +236,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -268,10 +260,9 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND topics[:topic_index_0] > :topic_value_0 AND topics[:topic_index_0] < :topic_value_1 ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND topics[:topic_index_0] > :topic_value_0 AND topics[:topic_index_0] < :topic_value_1 ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -304,12 +295,11 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp >= :block_timestamp_0 " + - "AND (tx_hash = :tx_hash_0 " + - "OR block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1))) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp >= :block_timestamp_0 " + + "AND (tx_hash = :tx_hash_0 " + + "OR block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1))) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) @@ -353,14 +343,13 @@ func TestDSLParser(t *testing.T) { limiter := query.LimitAndSort{} result, args, err := parser.buildQuery(chainID, expressions, limiter) - expected := "SELECT evm.logs.* " + - "FROM evm.logs " + - "WHERE evm_chain_id = :evm_chain_id " + - "AND (block_timestamp = :block_timestamp_0 " + - "AND (tx_hash = :tx_hash_0 " + - "OR (block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " + - "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 " + - "AND substring(data from 32*:word_index_0+1 for 32) <= :word_value_1))) ORDER BY " + defaultSort + expected := logsQuery( + " WHERE evm_chain_id = :evm_chain_id " + + "AND (block_timestamp = :block_timestamp_0 " + + "AND (tx_hash = :tx_hash_0 " + + "OR (block_number <= (SELECT greatest(block_number - :confs_0, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id ORDER BY block_number DESC LIMIT 1) " + + "AND substring(data from 32*:word_index_0+1 for 32) > :word_value_0 " + + "AND substring(data from 32*:word_index_0+1 for 32) <= :word_value_1))) ORDER BY " + defaultSort) require.NoError(t, err) assert.Equal(t, expected, result) diff --git a/core/store/migrate/migrations/0248_log_poller_primary_keys.sql b/core/store/migrate/migrations/0248_log_poller_primary_keys.sql new file mode 100644 index 0000000000..2a94c0c490 --- /dev/null +++ b/core/store/migrate/migrations/0248_log_poller_primary_keys.sql @@ -0,0 +1,18 @@ +-- +goose Up + +ALTER TABLE evm.logs DROP CONSTRAINT logs_pkey; +ALTER TABLE evm.logs ADD COLUMN id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY; +CREATE UNIQUE INDEX idx_evm_logs_block_hash_log_index_evm_chain_id ON evm.logs (block_hash, log_index, evm_chain_id); +ALTER TABLE evm.log_poller_blocks DROP CONSTRAINT log_poller_blocks_pkey; +ALTER TABLE evm.log_poller_blocks ADD COLUMN id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY; +CREATE UNIQUE INDEX idx_evm_log_poller_blocks_block_number_evm_chain_id ON evm.log_poller_blocks (block_number, evm_chain_id); + +-- +goose Down + +DROP INDEX IF EXISTS evm.idx_evm_log_poller_blocks_block_number_evm_chain_id; +ALTER TABLE evm.log_poller_blocks DROP COLUMN id; +ALTER TABLE evm.log_poller_blocks ADD PRIMARY KEY (block_number, evm_chain_id); +DROP INDEX IF EXISTS evm.idx_evm_logs_block_hash_log_index_evm_chain_id; +ALTER TABLE evm.logs DROP COLUMN id; +ALTER TABLE evm.logs ADD PRIMARY KEY (block_hash, log_index, evm_chain_id); + From b02335377748f814f1a4002eae6d0a60fca8c2a7 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Fri, 13 Sep 2024 12:00:04 -0700 Subject: [PATCH 05/24] Clean up db indexes Some of the columns in these indexes (such as created_at) are no longer used. Others were not optimized for the queries we need. --- .../0248_log_poller_primary_keys.sql | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/core/store/migrate/migrations/0248_log_poller_primary_keys.sql b/core/store/migrate/migrations/0248_log_poller_primary_keys.sql index 2a94c0c490..079e73284b 100644 --- a/core/store/migrate/migrations/0248_log_poller_primary_keys.sql +++ b/core/store/migrate/migrations/0248_log_poller_primary_keys.sql @@ -2,17 +2,25 @@ ALTER TABLE evm.logs DROP CONSTRAINT logs_pkey; ALTER TABLE evm.logs ADD COLUMN id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY; -CREATE UNIQUE INDEX idx_evm_logs_block_hash_log_index_evm_chain_id ON evm.logs (block_hash, log_index, evm_chain_id); -ALTER TABLE evm.log_poller_blocks DROP CONSTRAINT log_poller_blocks_pkey; +CREATE UNIQUE INDEX idx_logs_chain_block_logindex ON evm.logs (evm_chain_id, block_number, log_index); +ALTER TABLE evm.log_poller_blocks DROP CONSTRAINT log_poller_blocks_pkey; ALTER TABLE evm.log_poller_blocks ADD COLUMN id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY; -CREATE UNIQUE INDEX idx_evm_log_poller_blocks_block_number_evm_chain_id ON evm.log_poller_blocks (block_number, evm_chain_id); +DROP INDEX IF EXISTS evm.idx_evm_log_poller_blocks_order_by_block; +DROP INDEX IF EXISTS evm.idx_evm_log_poller_blocks_block_number_evm_chain_id; +CREATE UNIQUE INDEX idx_log_poller_blocks_chain_block ON evm.log_poller_blocks (evm_chain_id, block_number DESC); +DROP INDEX IF EXISTS evm.idx_evm_logs_ordered_by_block_and_created_at; +CREATE INDEX idx_logs_chain_address_event_block_logindex ON evm.logs (evm_chain_id, address, event_sig, block_number, log_index); -- +goose Down -DROP INDEX IF EXISTS evm.idx_evm_log_poller_blocks_block_number_evm_chain_id; +DROP INDEX IF EXISTS evm.idx_logs_chain_address_event_block_logindex; +CREATE INDEX idx_evm_logs_ordered_by_block_and_created_at ON evm.logs (evm_chain_id, address, event_sig, block_number, created_at); +DROP INDEX IF EXISTS evm.idx_log_poller_blocks_chain_block; +CREATE INDEX idx_evm_log_poller_blocks_order_by_block ON evm.log_poller_blocks (evm_chain_id, block_number DESC); +CREATE INDEX idx_evm_log_poller_blocks_block_number_evm_chain_id ON evm.log_poller_blocks (block_number, evm_chain_id); ALTER TABLE evm.log_poller_blocks DROP COLUMN id; ALTER TABLE evm.log_poller_blocks ADD PRIMARY KEY (block_number, evm_chain_id); -DROP INDEX IF EXISTS evm.idx_evm_logs_block_hash_log_index_evm_chain_id; +DROP INDEX IF EXISTS evm.idx_logs_chain_block_logindex; ALTER TABLE evm.logs DROP COLUMN id; ALTER TABLE evm.logs ADD PRIMARY KEY (block_hash, log_index, evm_chain_id); From d153367f3d1d7267b439693a5d0b7d23b6dcb305 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Fri, 13 Sep 2024 15:00:04 -0700 Subject: [PATCH 06/24] Fix 2 unrelated bugs I noticed --- core/chains/evm/logpoller/orm.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index de5abaf908..a3a32adf88 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -617,14 +617,17 @@ func (o *DSORM) SelectLatestLogEventSigsAddrsWithConfs(ctx context.Context, from return nil, err } - query := logsQueryWithConfs(`WHERE (block_number, address, event_sig) IN ( - SELECT MAX(block_number), address, event_sig FROM evm.logs + query := logsQueryWithConfs(`WHERE id IN ( + SELECT LAST_VALUE(id) OVER( + PARTITION BY evm_chain_id, address, event_sig + ORDER BY block_number, log_index + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) FROM evm.logs WHERE evm_chain_id = :evm_chain_id - AND event_sig = ANY(:event_sig_array) - AND address = ANY(:address_array) - AND block_number > :start_block AND `, confs) + - `GROUP BY event_sig, address) - ORDER BY block_number ASC` + AND event_sig = ANY(:event_sig_array) + AND address = ANY(:address_array) + AND block_number >= :start_block AND `, confs) + ` + )` var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -653,7 +656,7 @@ func (o *DSORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, WHERE evm_chain_id = :evm_chain_id AND event_sig = ANY(:event_sig_array) AND address = ANY(:address_array) - AND block_number > :start_block AND `, "", confs) + AND block_number >= :start_block AND `, "", confs) var blockNumber int64 query, sqlArgs, err := o.ds.BindNamed(query, args) From b0b82b6efb823e43344f578c138a82c5543d8e8c Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:59:35 -0700 Subject: [PATCH 07/24] Update ExpiredLogs query --- core/chains/evm/logpoller/orm.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index a3a32adf88..3fed09ded1 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -364,26 +364,24 @@ type Exp struct { // - have a timestamp older than any matching filter's retention, UNLESS there is at // least one matching filter with retention=0 func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { - var err error - var result sql.Result - query := `DELETE FROM evm.logs - WHERE (evm_chain_id, address, event_sig, block_number) IN ( - SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number - FROM evm.logs l - LEFT JOIN ( + limitClause := "" + if limit > 0 { + limitClause = fmt.Sprintf("LIMIT %d", limit) + } + + query := fmt.Sprintf(` + WITH rows_to_delete AS ( + SELECT l.id + FROM evm.logs l LEFT JOIN ( SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event ) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event - WHERE r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)` - - if limit > 0 { - result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit) - } else { - result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID)) - } - + WHERE r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second'' + %s + ) DELETE FROM evm.logs WHERE id IN (SELECT id FROM rows_to_delete)`, limitClause) + result, err := o.ds.ExecContext(ctx, query, ubig.New(o.chainID)) if err != nil { return 0, err } From 62b650ba57d9ff36835752f1d36bc284e159e166 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Sun, 15 Sep 2024 23:42:50 -0700 Subject: [PATCH 08/24] Update test for fromBlock >= :block_number Previously it was using fromBlock > :block_number which is inconsistent with the other fromBlocks in queries --- core/chains/evm/logpoller/orm_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 4ef75ddc2b..b0a2208c0c 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -1556,7 +1556,7 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { events: []common.Hash{event1, event2}, addrs: []common.Address{address1, address2}, confs: 0, - fromBlock: 3, + fromBlock: 4, expectedBlockNumber: 0, }, { From ef6a78bcf74c83f13604bc537e25e16e6acd05bd Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:09:00 -0700 Subject: [PATCH 09/24] Increase staggering of initial pruning runs --- core/chains/evm/logpoller/log_poller.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 29a2eca355..cd05c992cd 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -24,8 +24,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -638,31 +638,36 @@ func (lp *logPoller) backgroundWorkerRun() { ctx, cancel := lp.stopCh.NewCtx() defer cancel() + blockPruneShortInterval := lp.pollPeriod * 100 + blockPruneInterval := blockPruneShortInterval * 10 + logPruneShortInterval := lp.pollPeriod * 241 // no common factors with 100 + logPruneInterval := logPruneShortInterval * 10 + // Avoid putting too much pressure on the database by staggering the pruning of old blocks and logs. // Usually, node after restart will have some work to boot the plugins and other services. - // Deferring first prune by minutes reduces risk of putting too much pressure on the database. - blockPruneTick := time.After(5 * time.Minute) - logPruneTick := time.After(10 * time.Minute) + // Deferring first prune by at least 5 mins reduces risk of putting too much pressure on the database. + blockPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(blockPruneInterval/2)) + logPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(logPruneInterval/2)) for { select { case <-ctx.Done(): return case <-blockPruneTick: - blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 1000)) + blockPruneTick = time.After(timeutil.JitterPct(0.1).Apply(blockPruneInterval)) if allRemoved, err := lp.PruneOldBlocks(ctx); err != nil { lp.lggr.Errorw("Unable to prune old blocks", "err", err) } else if !allRemoved { // Tick faster when cleanup can't keep up with the pace of new blocks - blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 100)) + blockPruneTick = time.After(timeutil.JitterPct(0.1).Apply(blockPruneShortInterval)) } case <-logPruneTick: - logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000 + logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneInterval)) if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil { lp.lggr.Errorw("Unable to prune expired logs", "err", err) } else if !allRemoved { // Tick faster when cleanup can't keep up with the pace of new logs - logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241)) + logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval)) } } } From f311f7d6672ddd9b15cfd9e34a9fdd633e5fa9a4 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:19:47 -0700 Subject: [PATCH 10/24] Decrease retention periods for CCIP events, for testing --- .../ocr2/plugins/ccip/internal/ccipdata/reader.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go index 3f57d419e1..aae0989c7d 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go @@ -29,14 +29,18 @@ const ( // be able to bring back processing without replaying any logs from chain. You can read that param as // "how long CCIP can be down and still be able to process all the messages after getting back to life". // Breaching this threshold would require replaying chain using LogPoller from the beginning of the outage. - CommitExecLogsRetention = 30 * 24 * time.Hour // 30 days + + CommitExecLogsRetention = 3 * time.Hour // 3 hours for testing TODO: REMOVE FOR PRODUCTION! + // CacheEvictionLogsRetention defines the duration for which logs used for caching on-chain data are kept. // Restarting node clears the cache entirely and rebuilds it from scratch by fetching data from chain, // so we don't need to keep these logs for very long. All events relying on cache.NewLogpollerEventsBased should use this retention. - CacheEvictionLogsRetention = 7 * 24 * time.Hour // 7 days + + CacheEvictionLogsRetention = 1 * time.Hour // 1 hour for testing TODO: REMOVE FOR PRODUCTION! + // PriceUpdatesLogsRetention defines the duration for which logs with price updates are kept. // These logs are emitted whenever the token price or gas price is updated and Commit scans very small time windows (e.g. 2 hours) - PriceUpdatesLogsRetention = 1 * 24 * time.Hour // 1 day + PriceUpdatesLogsRetention = 5 * time.Minute // 5 minutes for testing TODO: REMOVE FOR PRODUCTION! ) type Event[T any] struct { From ea8e25002395f0e9e300dc610f8e0e69ec0313ca Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 16 Sep 2024 20:27:20 -0700 Subject: [PATCH 11/24] Fix bug in merged commit from develop On a node with more than one chain, each LogPoller would have deleted all logs from chains it's not running on! Because of the LEFT JOIN, ON evm_chain_id = $1 does not filter out any rows where evm_chain_id != $1; only WHERE evm_chain_id = $1 can do that --- core/chains/evm/logpoller/orm.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 3fed09ded1..a538d1746a 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -373,12 +373,13 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro WITH rows_to_delete AS ( SELECT l.id FROM evm.logs l LEFT JOIN ( - SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention + SELECT evm_chain_id, address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event - ) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event - WHERE r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second'' + ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event + WHERE l.evm_chain_id = $1 AND -- needed because of LEFT JOIN + r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s ) DELETE FROM evm.logs WHERE id IN (SELECT id FROM rows_to_delete)`, limitClause) result, err := o.ds.ExecContext(ctx, query, ubig.New(o.chainID)) From 079120fe72a8e1703303c363558e7433a2a65cee Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 17 Sep 2024 18:54:32 -0700 Subject: [PATCH 12/24] restore retention periods --- .../ocr2/plugins/ccip/internal/ccipdata/reader.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go index aae0989c7d..3f57d419e1 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go @@ -29,18 +29,14 @@ const ( // be able to bring back processing without replaying any logs from chain. You can read that param as // "how long CCIP can be down and still be able to process all the messages after getting back to life". // Breaching this threshold would require replaying chain using LogPoller from the beginning of the outage. - - CommitExecLogsRetention = 3 * time.Hour // 3 hours for testing TODO: REMOVE FOR PRODUCTION! - + CommitExecLogsRetention = 30 * 24 * time.Hour // 30 days // CacheEvictionLogsRetention defines the duration for which logs used for caching on-chain data are kept. // Restarting node clears the cache entirely and rebuilds it from scratch by fetching data from chain, // so we don't need to keep these logs for very long. All events relying on cache.NewLogpollerEventsBased should use this retention. - - CacheEvictionLogsRetention = 1 * time.Hour // 1 hour for testing TODO: REMOVE FOR PRODUCTION! - + CacheEvictionLogsRetention = 7 * 24 * time.Hour // 7 days // PriceUpdatesLogsRetention defines the duration for which logs with price updates are kept. // These logs are emitted whenever the token price or gas price is updated and Commit scans very small time windows (e.g. 2 hours) - PriceUpdatesLogsRetention = 5 * time.Minute // 5 minutes for testing TODO: REMOVE FOR PRODUCTION! + PriceUpdatesLogsRetention = 1 * 24 * time.Hour // 1 day ) type Event[T any] struct { From 5d2472143fe4d0fb2e9fa2ca0bc2775e8e5f2286 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 17 Sep 2024 19:12:59 -0700 Subject: [PATCH 13/24] Set LogPrunePageSize = 2001 --- .../ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml b/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml index c404d3a0c0..505c677d26 100644 --- a/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml +++ b/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml @@ -132,6 +132,7 @@ DeltaReconcile = '5s' CommonChainConfigTOML = """ LogPollInterval = '1s' +LogPrunePageSize = 2001 [HeadTracker] HistoryDepth = 200 From 3d65b9003690960cc618d2a9ff0972cf4913d6ea Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 23 Sep 2024 19:59:53 -0700 Subject: [PATCH 14/24] merge whitespace differences from chainlink repo --- core/chains/evm/logpoller/orm.go | 42 +++++++++++++++++--------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index a538d1746a..45f534f56e 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -216,10 +216,10 @@ func withConfs(query string, tableAlias string, confs evmtypes.Confirmations) st lastConfirmedBlock = `block_number - :confs` } return fmt.Sprintf(`%s %sblock_number <= ( - SELECT %s - FROM evm.log_poller_blocks - WHERE evm_chain_id = :evm_chain_id - ORDER BY block_number DESC LIMIT 1)`, query, tablePrefix, lastConfirmedBlock) + SELECT %s + FROM evm.log_poller_blocks + WHERE evm_chain_id = :evm_chain_id + ORDER BY block_number DESC LIMIT 1)`, query, tablePrefix, lastConfirmedBlock) } func logsQueryWithConfs(clause string, confs evmtypes.Confirmations) string { @@ -278,7 +278,7 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig `WHERE evm_chain_id = :evm_chain_id AND event_sig = :event_sig AND address = :address AND `, confs) + - ` ORDER BY block_number desc, log_index DESC LIMIT 1` + `ORDER BY block_number desc, log_index DESC LIMIT 1` var l Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -651,6 +651,7 @@ func (o *DSORM) SelectLatestBlockByEventSigsAddrsWithConfs(ctx context.Context, if err != nil { return 0, err } + query := withConfs(`SELECT COALESCE(MAX(block_number), 0) FROM evm.logs WHERE evm_chain_id = :evm_chain_id AND event_sig = ANY(:event_sig_array) @@ -709,7 +710,8 @@ func (o *DSORM) SelectLogsDataWordGreaterThan(ctx context.Context, address commo return nil, err } - query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :event_sig AND substring(data from 32*:word_index+1 for 32) >= :word_value_min AND `, confs) + @@ -737,7 +739,9 @@ func (o *DSORM) SelectLogsDataWordBetween(ctx context.Context, address common.Ad if err != nil { return nil, err } - query := logsQueryWithConfs(`WHERE evm_chain_id = :evm_chain_id + + query := logsQueryWithConfs(` + WHERE evm_chain_id = :evm_chain_id AND address = :address AND event_sig = :event_sig AND substring(data from 32*:word_index_min+1 for 32) <= :word_value @@ -856,13 +860,13 @@ func (o *DSORM) SelectIndexedLogsByBlockRange(ctx context.Context, start, end in } query := logsQuery(` - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND topics[:topic_index] = ANY(:topic_values) - AND block_number >= :start_block - AND block_number <= :end_block - ORDER BY block_number, log_index`) + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND topics[:topic_index] = ANY(:topic_values) + AND block_number >= :start_block + AND block_number <= :end_block + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) @@ -919,11 +923,11 @@ func (o *DSORM) SelectIndexedLogsByTxHash(ctx context.Context, address common.Ad } query := logsQuery(` - WHERE evm_chain_id = :evm_chain_id - AND address = :address - AND event_sig = :event_sig - AND tx_hash = :tx_hash - ORDER BY block_number, log_index`) + WHERE evm_chain_id = :evm_chain_id + AND address = :address + AND event_sig = :event_sig + AND tx_hash = :tx_hash + ORDER BY block_number, log_index`) var logs []Log query, sqlArgs, err := o.ds.BindNamed(query, args) From 5949fe05094928e6a31cee3c710092eb5a30e660 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 23 Sep 2024 20:07:07 -0700 Subject: [PATCH 15/24] sync from chainlink repo --- core/chains/evm/logpoller/orm.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 45f534f56e..8a29c667dd 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -378,9 +378,8 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event - WHERE l.evm_chain_id = $1 AND -- needed because of LEFT JOIN - r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) - %s + WHERE l.evm_chain_id = $1 AND -- Must be WHERE rather than ON due to LEFT JOIN + r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s ) DELETE FROM evm.logs WHERE id IN (SELECT id FROM rows_to_delete)`, limitClause) result, err := o.ds.ExecContext(ctx, query, ubig.New(o.chainID)) if err != nil { From dfc58c8aa46fbb6a1c94f9f1be7dd8489975ae34 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:43:18 -0700 Subject: [PATCH 16/24] Update DeleteBlocksBefore query to use block_number index instead of LIMIT --- core/chains/evm/logpoller/log_poller.go | 24 +++++ core/chains/evm/logpoller/observability.go | 12 +++ .../evm/logpoller/observability_test.go | 4 +- core/chains/evm/logpoller/orm.go | 90 ++++++++++++++----- core/chains/evm/logpoller/orm_test.go | 10 +-- 5 files changed, 113 insertions(+), 27 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index cd05c992cd..57d8879a29 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -649,6 +649,8 @@ func (lp *logPoller) backgroundWorkerRun() { blockPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(blockPruneInterval/2)) logPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(logPruneInterval/2)) + successfulExpiredLogPrunes := 0 + for { select { case <-ctx.Done(): @@ -668,6 +670,18 @@ func (lp *logPoller) backgroundWorkerRun() { } else if !allRemoved { // Tick faster when cleanup can't keep up with the pace of new logs logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval)) + } else if successfulExpiredLogPrunes == 20 { + // Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times + // since the last time unmatched logs were pruned + if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil { + lp.lggr.Errorw("Unable to prune unmatched logs", "err", err) + } else if !allRemoved { + logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval)) + } else { + successfulExpiredLogPrunes = 0 + } + } else { + successfulExpiredLogPrunes++ } } } @@ -1101,6 +1115,16 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) { return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err } +func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) { + ids, err := lp.orm.SelectUnmatchedLogIds(ctx, lp.logPrunePageSize) + if err != nil { + return false, err + } + rowsRemoved, err := lp.orm.DeleteLogsByRowId(ctx, ids) + + return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err +} + // Logs returns logs matching topics and address (exactly) in the given block range, // which are canonical at time of query. func (lp *logPoller) Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) { diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index e0ed0cc478..6ac0c2d9a9 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -136,6 +136,18 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) }) } +func (o *ObservedORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) { + return withObservedExecAndRowsAffected(o, "DeleteLogsByRowId", del, func() (int64, error) { + return o.ORM.DeleteLogsByRowId(ctx, rowIds) + }) +} + +func (o *ObservedORM) SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) { + return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIds", func() ([]uint64, error) { + return o.ORM.SelectUnmatchedLogIds(ctx, limit) + }) +} + func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) { return o.ORM.DeleteExpiredLogs(ctx, limit) diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 2f502438bb..f96c207eca 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -121,8 +121,8 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3) require.NoError(t, err) - require.Equal(t, int64(3), rowsAffected) - assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete")) + require.Equal(t, int64(0), rowsAffected) + assert.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete")) rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0) require.NoError(t, err) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 8a29c667dd..035b55abff 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -33,9 +33,11 @@ type ORM interface { LoadFilters(ctx context.Context) (map[string]Filter, error) DeleteFilter(ctx context.Context, name string) error + DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error + SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error) @@ -294,27 +296,46 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig // DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks. // Otherwise, it will delete all blocks at once. func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) { - if limit > 0 { - result, err := o.ds.ExecContext(ctx, - `DELETE FROM evm.log_poller_blocks - WHERE block_number IN ( - SELECT block_number FROM evm.log_poller_blocks - WHERE block_number <= $1 - AND evm_chain_id = $2 - LIMIT $3 - ) AND evm_chain_id = $2`, - end, ubig.New(o.chainID), limit) + var result sql.Result + var err error + + if limit == 0 { + result, err = o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks + WHERE block_number <= $1 AND evm_chain_id = $2`, end, ubig.New(o.chainID)) if err != nil { return 0, err } return result.RowsAffected() } - result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks - WHERE block_number <= $1 AND evm_chain_id = $2`, end, ubig.New(o.chainID)) + + var limitBlock int64 + err = o.ds.GetContext(ctx, &limitBlock, `SELECT MIN(block_number) FROM evm.log_poller_blocks`) if err != nil { return 0, err } - return result.RowsAffected() + + // Remove up to limit blocks at a time, until we've reached the limit or removed everything eligible for deletion + var deleted, rows int64 + for limitBlock += (limit - 1); deleted < limit; limitBlock += limit { + if limitBlock > end { + limitBlock = end + } + result, err = o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE block_number <= $1 AND evm_chain_id = $2`, limitBlock, ubig.New(o.chainID)) + if err != nil { + return deleted, err + } + + if rows, err = result.RowsAffected(); err != nil { + return deleted, err + } + + deleted += rows + + if limitBlock == end { + break + } + } + return deleted, err } func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { @@ -328,8 +349,8 @@ func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error // Latency without upper bound filter can be orders of magnitude higher for large number of logs. _, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 - AND block_number >= $2 - AND block_number <= (SELECT MAX(block_number) + AND block_number >= $2 + AND block_number <= (SELECT MAX(block_number) FROM evm.log_poller_blocks WHERE evm_chain_id = $1)`, ubig.New(o.chainID), start) @@ -359,6 +380,26 @@ type Exp struct { ShouldDelete bool } +func (o *DSORM) SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) { + query := ` + SELECT l.id FROM evm.logs l JOIN ( + SELECT evm_chain_id, address, event + FROM evm.log_poller_filters + WHERE evm_chain_id = $1 + GROUP BY evm_chain_id, address, event + ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event + WHERE l.evm_chain_id = $1 AND r.id IS NULL + ` + + if limit == 0 { + err = o.ds.SelectContext(ctx, &ids, query, ubig.New(o.chainID)) + return ids, err + } + err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit)) + + return ids, err +} + // DeleteExpiredLogs removes any logs which either: // - don't match any currently registered filters, or // - have a timestamp older than any matching filter's retention, UNLESS there is at @@ -372,14 +413,14 @@ func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, erro query := fmt.Sprintf(` WITH rows_to_delete AS ( SELECT l.id - FROM evm.logs l LEFT JOIN ( - SELECT evm_chain_id, address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention + FROM evm.logs l JOIN ( + SELECT evm_chain_id, address, event, MAX(retention) AS retention FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event - ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event - WHERE l.evm_chain_id = $1 AND -- Must be WHERE rather than ON due to LEFT JOIN - r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s + HAVING MIN(retention) > 0 + ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event AND + l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second') %s ) DELETE FROM evm.logs WHERE id IN (SELECT id FROM rows_to_delete)`, limitClause) result, err := o.ds.ExecContext(ctx, query, ubig.New(o.chainID)) if err != nil { @@ -1008,3 +1049,12 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim return logs, nil } + +// DeleteLogsByRowId accepts a list of log row id's to delete +func (o *DSORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) { + result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE id = ANY($1)`, rowIds) + if err != nil { + return 0, err + } + return result.RowsAffected() +} diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index b0a2208c0c..fc4b248f3a 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -458,14 +458,14 @@ func TestORM(t *testing.T) { time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period deleted, err := o1.DeleteExpiredLogs(ctx, 0) require.NoError(t, err) - assert.Equal(t, int64(4), deleted) + assert.Equal(t, int64(1), deleted) logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - // It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all - // 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1 - // of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12. - assert.Len(t, logs, 4) + // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) + // Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything + // matching filter12 should be kept regardless of what other filters it matches. + assert.Len(t, logs, 7) // Delete logs after should delete all logs. err = o1.DeleteLogsAndBlocksAfter(ctx, 1) From 1ffb5c4a22b5d9880c3193a8a86c575de684bf6f Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:24:09 -0700 Subject: [PATCH 17/24] Merge unknown changes to tests from chainlink repo --- core/chains/evm/logpoller/orm_test.go | 118 ++++++++++++++++++-------- 1 file changed, 81 insertions(+), 37 deletions(-) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index fc4b248f3a..950e2d989b 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -187,6 +187,7 @@ func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { } func TestORM(t *testing.T) { + t.Parallel() th := SetupTH(t, lpOpts) o1 := th.ORM o2 := th.ORM2 @@ -334,6 +335,36 @@ func TestORM(t *testing.T) { }, })) + // Insert a couple logs on a different chain, to make sure + // these aren't affected by any operations on the chain LogPoller + // is managing. + require.NoError(t, o2.InsertLogs(ctx, []logpoller.Log{ + { + EvmChainId: ubig.New(th.ChainID2), + LogIndex: 8, + BlockHash: common.HexToHash("0x1238"), + BlockNumber: int64(17), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1236"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("same log on unrelated chain"), + BlockTimestamp: time.Now(), + }, + { + EvmChainId: ubig.New(th.ChainID2), + LogIndex: 9, + BlockHash: common.HexToHash("0x1999"), + BlockNumber: int64(18), + EventSig: topic, + Topics: [][]byte{topic[:], topic2[:]}, + Address: common.HexToAddress("0x5555"), + TxHash: common.HexToHash("0x1543"), + Data: []byte("different log on unrelated chain"), + BlockTimestamp: time.Now(), + }, + })) + t.Log(latest.BlockNumber) logs, err := o1.SelectLogsByBlockRange(ctx, 1, 17) require.NoError(t, err) @@ -454,18 +485,31 @@ func TestORM(t *testing.T) { require.NoError(t, err) require.Len(t, logs, 8) - // Delete expired logs + // Delete expired logs with page limit time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period - deleted, err := o1.DeleteExpiredLogs(ctx, 0) + deleted, err := o1.DeleteExpiredLogs(ctx, 2) + require.NoError(t, err) + assert.Equal(t, int64(2), deleted) + + // Delete expired logs without page limit + deleted, err = o1.DeleteExpiredLogs(ctx, 0) + require.NoError(t, err) + assert.Equal(t, int64(2), deleted) + + // Ensure that both of the logs from the second chain are still there + logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x1236"), topic2) + require.NoError(t, err) + assert.Len(t, logs, 1) + logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x5555"), topic) require.NoError(t, err) - assert.Equal(t, int64(1), deleted) + assert.Len(t, logs, 1) logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) - // Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything - // matching filter12 should be kept regardless of what other filters it matches. - assert.Len(t, logs, 7) + // It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all + // 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1 + // of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12. + assert.Len(t, logs, 4) // Delete logs after should delete all logs. err = o1.DeleteLogsAndBlocksAfter(ctx, 1) @@ -606,8 +650,8 @@ func TestORM_IndexedLogs(t *testing.T) { } for idx, value := range topicValues { - topicFilters.Expressions[idx] = logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(value).Hex(), Operator: primitives.Eq}, + topicFilters.Expressions[idx] = logpoller.NewEventByTopicFilter(topicIdx, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(value), Operator: primitives.Eq}, }) } @@ -658,7 +702,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, len(lgs)) - lgs, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 1, 1, []uint64{1}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "1", 1, []uint64{1}), limiter, "") require.NoError(t, err) assert.Equal(t, 1, len(lgs)) @@ -666,7 +710,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, len(lgs)) - lgs, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 1, []uint64{2}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 1, []uint64{2}), limiter, "") require.NoError(t, err) assert.Equal(t, 1, len(lgs)) @@ -674,7 +718,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, len(lgs)) - lgs, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 1, []uint64{1}), limiter, "") + lgs, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 1, []uint64{1}), limiter, "") require.NoError(t, err) assert.Equal(t, 1, len(lgs)) @@ -682,7 +726,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "invalid index for topic: 0") - _, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 0, []uint64{1}), limiter, "") + _, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 0, []uint64{1}), limiter, "") require.Error(t, err) assert.Contains(t, err.Error(), "invalid index for topic: 0") @@ -690,7 +734,7 @@ func TestORM_IndexedLogs(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "invalid index for topic: 4") - _, err = o1.FilteredLogs(ctx, blockRangeFilter(1, 2, 4, []uint64{1}), limiter, "") + _, err = o1.FilteredLogs(ctx, blockRangeFilter("1", "2", 4, []uint64{1}), limiter, "") require.Error(t, err) assert.Contains(t, err.Error(), "invalid index for topic: 4") @@ -702,8 +746,8 @@ func TestORM_IndexedLogs(t *testing.T) { Expressions: []query.Expression{ logpoller.NewAddressFilter(addr), logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByTopicFilter(1, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(2).Hex(), Operator: primitives.Gte}, + logpoller.NewEventByTopicFilter(1, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(2), Operator: primitives.Gte}, }), query.Confidence(primitives.Unconfirmed), }, @@ -717,11 +761,11 @@ func TestORM_IndexedLogs(t *testing.T) { return []query.Expression{ logpoller.NewAddressFilter(addr), logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(min).Hex(), Operator: primitives.Gte}, + logpoller.NewEventByTopicFilter(topicIdx, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(min), Operator: primitives.Gte}, }), - logpoller.NewEventByTopicFilter(topicIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(max).Hex(), Operator: primitives.Lte}, + logpoller.NewEventByTopicFilter(topicIdx, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(max), Operator: primitives.Lte}, }), query.Confidence(primitives.Unconfirmed), } @@ -876,11 +920,11 @@ func TestORM_DataWords(t *testing.T) { return []query.Expression{ logpoller.NewAddressFilter(addr), logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word1).Hex(), Operator: primitives.Gte}, + logpoller.NewEventByWordFilter(wordIdx, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(word1), Operator: primitives.Gte}, }), - logpoller.NewEventByWordFilter(eventSig, wordIdx, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word2).Hex(), Operator: primitives.Lte}, + logpoller.NewEventByWordFilter(wordIdx, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(word2), Operator: primitives.Lte}, }), query.Confidence(primitives.Unconfirmed), } @@ -944,8 +988,8 @@ func TestORM_DataWords(t *testing.T) { filter := []query.Expression{ logpoller.NewAddressFilter(addr), logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(1).Hex(), Operator: primitives.Gte}, + logpoller.NewEventByWordFilter(0, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(1), Operator: primitives.Gte}, }), query.Confidence(primitives.Unconfirmed), } @@ -1036,7 +1080,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { } require.NoError(t, o1.InsertLogs(ctx, inputLogs)) - filter := func(sigs []common.Hash, startBlock, endBlock int64) query.KeyFilter { + filter := func(sigs []common.Hash, startBlock, endBlock string) query.KeyFilter { filters := []query.Expression{ logpoller.NewAddressFilter(sourceAddr), } @@ -1058,8 +1102,8 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { filters = append(filters, query.Expression{ BoolExpression: query.BoolExpression{ Expressions: []query.Expression{ - query.Block(uint64(startBlock), primitives.Gte), - query.Block(uint64(endBlock), primitives.Lte), + query.Block(startBlock, primitives.Gte), + query.Block(endBlock, primitives.Lte), }, BoolOperator: query.AND, }, @@ -1172,7 +1216,7 @@ func TestLogPoller_Logs(t *testing.T) { assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[1].BlockHash.String()) assert.Equal(t, address1, lgs[1].Address) - lgs, err = th.ORM.FilteredLogs(ctx, logFilter(1, 3, address1), query.LimitAndSort{ + lgs, err = th.ORM.FilteredLogs(ctx, logFilter("1", "3", address1), query.LimitAndSort{ SortBy: []query.SortBy{query.NewSortBySequence(query.Asc)}, }, "") require.NoError(t, err) @@ -1192,7 +1236,7 @@ func TestLogPoller_Logs(t *testing.T) { assert.Equal(t, address2, lgs[0].Address) assert.Equal(t, event1.Bytes(), lgs[0].Topics[0]) - lgs, err = th.ORM.FilteredLogs(ctx, logFilter(2, 2, address2), query.LimitAndSort{ + lgs, err = th.ORM.FilteredLogs(ctx, logFilter("2", "2", address2), query.LimitAndSort{ SortBy: []query.SortBy{query.NewSortBySequence(query.Asc)}, }, "") require.NoError(t, err) @@ -1667,8 +1711,8 @@ func TestSelectLogsCreatedAfter(t *testing.T) { if len(topicVals) > 0 { exp := make([]query.Expression, len(topicVals)) for idx, val := range topicVals { - exp[idx] = logpoller.NewEventByTopicFilter(uint64(topicIdx), []primitives.ValueComparator{ - {Value: val.String(), Operator: primitives.Eq}, + exp[idx] = logpoller.NewEventByTopicFilter(uint64(topicIdx), []logpoller.HashedValueComparator{ + {Value: val, Operator: primitives.Eq}, }) } @@ -1955,11 +1999,11 @@ func TestSelectLogsDataWordBetween(t *testing.T) { Expressions: []query.Expression{ logpoller.NewAddressFilter(address), logpoller.NewEventSigFilter(eventSig), - logpoller.NewEventByWordFilter(eventSig, 0, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word).Hex(), Operator: primitives.Lte}, + logpoller.NewEventByWordFilter(0, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(word), Operator: primitives.Lte}, }), - logpoller.NewEventByWordFilter(eventSig, 1, []primitives.ValueComparator{ - {Value: logpoller.EvmWord(word).Hex(), Operator: primitives.Gte}, + logpoller.NewEventByWordFilter(1, []logpoller.HashedValueComparator{ + {Value: logpoller.EvmWord(word), Operator: primitives.Gte}, }), query.Confidence(primitives.Unconfirmed), }, From 097aec638a58d6c24f146399f08fde3573c79917 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:34:20 -0700 Subject: [PATCH 18/24] Changes in orm_test.go from "Split off SelectUnmatchedLogs from DeleteExpiredLogs" --- core/chains/evm/logpoller/orm_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 950e2d989b..51544c5892 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -506,10 +506,10 @@ func TestORM(t *testing.T) { logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - // It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all - // 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1 - // of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12. - assert.Len(t, logs, 4) + // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) + // Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything + // matching filter12 should be kept regardless of what other filters it matches. + assert.Len(t, logs, 7) // Delete logs after should delete all logs. err = o1.DeleteLogsAndBlocksAfter(ctx, 1) From d02e3948e333540372f4bb2a7f681ec22b03c003 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:40:48 -0700 Subject: [PATCH 19/24] Rename LogIds & some changes to orm_test.go from chainlink repo --- core/chains/evm/logpoller/log_poller.go | 4 ++-- core/chains/evm/logpoller/observability.go | 12 ++++++------ core/chains/evm/logpoller/orm.go | 12 ++++++------ core/chains/evm/logpoller/orm_test.go | 12 +++++++++++- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 57d8879a29..b38091e0cb 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -1116,11 +1116,11 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) { } func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) { - ids, err := lp.orm.SelectUnmatchedLogIds(ctx, lp.logPrunePageSize) + ids, err := lp.orm.SelectUnmatchedLogIDs(ctx, lp.logPrunePageSize) if err != nil { return false, err } - rowsRemoved, err := lp.orm.DeleteLogsByRowId(ctx, ids) + rowsRemoved, err := lp.orm.DeleteLogsByRowID(ctx, ids) return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err } diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 6ac0c2d9a9..59b93fffda 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -136,15 +136,15 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) }) } -func (o *ObservedORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) { - return withObservedExecAndRowsAffected(o, "DeleteLogsByRowId", del, func() (int64, error) { - return o.ORM.DeleteLogsByRowId(ctx, rowIds) +func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) { + return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) { + return o.ORM.DeleteLogsByRowID(ctx, rowIDs) }) } -func (o *ObservedORM) SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) { - return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIds", func() ([]uint64, error) { - return o.ORM.SelectUnmatchedLogIds(ctx, limit) +func (o *ObservedORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { + return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIDs", func() ([]uint64, error) { + return o.ORM.SelectUnmatchedLogIDs(ctx, limit) }) } diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 035b55abff..29606d89ab 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -33,11 +33,11 @@ type ORM interface { LoadFilters(ctx context.Context) (map[string]Filter, error) DeleteFilter(ctx context.Context, name string) error - DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) + DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error - SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) + SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) GetBlocksRange(ctx context.Context, start int64, end int64) ([]LogPollerBlock, error) @@ -380,7 +380,7 @@ type Exp struct { ShouldDelete bool } -func (o *DSORM) SelectUnmatchedLogIds(ctx context.Context, limit int64) (ids []uint64, err error) { +func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { query := ` SELECT l.id FROM evm.logs l JOIN ( SELECT evm_chain_id, address, event @@ -1050,9 +1050,9 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim return logs, nil } -// DeleteLogsByRowId accepts a list of log row id's to delete -func (o *DSORM) DeleteLogsByRowId(ctx context.Context, rowIds []uint64) (int64, error) { - result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE id = ANY($1)`, rowIds) +// DeleteLogsByRowID accepts a list of log row id's to delete +func (o *DSORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) { + result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.logs WHERE id = ANY($1)`, rowIDs) if err != nil { return 0, err } diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 51544c5892..9af6e84d70 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -494,7 +494,17 @@ func TestORM(t *testing.T) { // Delete expired logs without page limit deleted, err = o1.DeleteExpiredLogs(ctx, 0) require.NoError(t, err) - assert.Equal(t, int64(2), deleted) + assert.Equal(t, int64(1), deleted) + + // Delete unmatched logs with page limit + ids, err := o1.SelectUnmatchedLogIDs(ctx, 2) + require.NoError(t, err) + assert.Equal(t, int64(2), ids) + + // Delete unmatched logs without page limit + ids, err = o1.SelectUnmatchedLogIDs(ctx, 0) + require.NoError(t, err) + assert.Equal(t, int64(2), ids) // Ensure that both of the logs from the second chain are still there logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x1236"), topic2) From 10eb4df44b9d74308df9fc8a06bd2858501d93b6 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 24 Sep 2024 17:38:02 -0700 Subject: [PATCH 20/24] Fix UnmatchedLogs query --- core/chains/evm/logpoller/orm.go | 6 +++--- core/chains/evm/logpoller/orm_test.go | 15 ++++++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 29606d89ab..2e556da717 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -382,20 +382,20 @@ type Exp struct { func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) { query := ` - SELECT l.id FROM evm.logs l JOIN ( + SELECT l.id FROM evm.logs l LEFT JOIN ( SELECT evm_chain_id, address, event FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event ) r ON l.evm_chain_id = r.evm_chain_id AND l.address = r.address AND l.event_sig = r.event - WHERE l.evm_chain_id = $1 AND r.id IS NULL + WHERE l.evm_chain_id = $1 AND r.evm_chain_id IS NULL ` if limit == 0 { err = o.ds.SelectContext(ctx, &ids, query, ubig.New(o.chainID)) return ids, err } - err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit)) + err = o.ds.SelectContext(ctx, &ids, fmt.Sprintf("%s LIMIT %d", query, limit), ubig.New(o.chainID)) return ids, err } diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 9af6e84d70..e5a73c6f50 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -496,15 +496,20 @@ func TestORM(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(1), deleted) - // Delete unmatched logs with page limit + // Select unmatched logs with page limit ids, err := o1.SelectUnmatchedLogIDs(ctx, 2) require.NoError(t, err) - assert.Equal(t, int64(2), ids) + assert.Len(t, ids, 2) - // Delete unmatched logs without page limit + // Select unmatched logs without page limit ids, err = o1.SelectUnmatchedLogIDs(ctx, 0) require.NoError(t, err) - assert.Equal(t, int64(2), ids) + assert.Len(t, ids, 3) + + // Delete logs by row id + deleted, err = o1.DeleteLogsByRowID(ctx, ids) + require.NoError(t, err) + assert.Equal(t, int64(3), deleted) // Ensure that both of the logs from the second chain are still there logs, err = o2.SelectLogs(ctx, 0, 100, common.HexToAddress("0x1236"), topic2) @@ -519,7 +524,7 @@ func TestORM(t *testing.T) { // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) // Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything // matching filter12 should be kept regardless of what other filters it matches. - assert.Len(t, logs, 7) + assert.Len(t, logs, 4) // Delete logs after should delete all logs. err = o1.DeleteLogsAndBlocksAfter(ctx, 1) From ae7d15053fa7740782333722b87658f04b64f1b5 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 24 Sep 2024 17:44:28 -0700 Subject: [PATCH 21/24] Reduce 20x to 5x for testing --- core/chains/evm/logpoller/log_poller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index b38091e0cb..0eb9c36102 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -670,7 +670,7 @@ func (lp *logPoller) backgroundWorkerRun() { } else if !allRemoved { // Tick faster when cleanup can't keep up with the pace of new logs logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval)) - } else if successfulExpiredLogPrunes == 20 { + } else if successfulExpiredLogPrunes == 5 { // Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times // since the last time unmatched logs were pruned if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil { From 5ee61dc30887b3e63e155afa56074db0419256b7 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 24 Sep 2024 17:54:13 -0700 Subject: [PATCH 22/24] LogKeepBlocksDepth = 1000 --- .../ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml b/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml index 505c677d26..d312f612ea 100644 --- a/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml +++ b/integration-tests/ccip-tests/testconfig/tomls/ccip1.4-stress/baseline.toml @@ -15,7 +15,7 @@ # If you want to use a specific commit or a branch you need to switch to the internal ECR in `~/.testsecrets` # E2E_TEST_CHAINLINK_IMAGE=".dkr.ecr..amazonaws.com/chainlink-ccip" [CCIP.Env.NewCLCluster.Common.ChainlinkImage] -version = "2.14.0-ccip1.5.0" +version = "log_poller_id_columns" [CCIP] [CCIP.ContractVersions] @@ -132,7 +132,8 @@ DeltaReconcile = '5s' CommonChainConfigTOML = """ LogPollInterval = '1s' -LogPrunePageSize = 2001 +LogPrunePageSize = 5001 +LogKeepBlocksDepth = 1000 [HeadTracker] HistoryDepth = 200 From a5df4325b0e81c6638fb37ec69bfbe6588a59a4e Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 24 Sep 2024 17:59:35 -0700 Subject: [PATCH 23/24] Add debugging : TODO Remove! --- core/chains/evm/logpoller/log_poller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 0eb9c36102..54972002ca 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -1094,6 +1094,7 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) { // No blocks saved yet. return true, nil } + lp.lggr.Errorw("Calling DeleteBlocksBefore") if latestBlock.FinalizedBlockNumber <= lp.keepFinalizedBlocksDepth { // No-op, keep all blocks return true, nil @@ -1105,6 +1106,7 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) { latestBlock.FinalizedBlockNumber-lp.keepFinalizedBlocksDepth, lp.logPrunePageSize, ) + lp.lggr.Errorw("DeleteBlocksBefore returned", "logPrunePageSize", lp.logPrunePageSize, "rowsRemoved", rowsRemoved, "allRemoved", rowsRemoved < lp.logPrunePageSize) return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err } From f7038021e95b8de15f0173fb4808caa339ae14f8 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 24 Sep 2024 19:12:52 -0700 Subject: [PATCH 24/24] Fix event_bindings merge issue --- .tool-versions | 4 ++-- core/services/relay/evm/event_binding.go | 6 ++++++ integration-tests/.tool-versions | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/.tool-versions b/.tool-versions index 077946cbee..cab9e5edc2 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,4 +1,4 @@ -golang 1.22.5 +golang 1.22.7 mockery 2.43.2 nodejs 20.13.1 pnpm 9.4.0 @@ -8,4 +8,4 @@ zig 0.11.0 golangci-lint 1.59.1 protoc 25.1 python 3.10.5 -task 3.35.1 \ No newline at end of file +task 3.35.1 diff --git a/core/services/relay/evm/event_binding.go b/core/services/relay/evm/event_binding.go index 9b5a0f2dc6..f386fa0575 100644 --- a/core/services/relay/evm/event_binding.go +++ b/core/services/relay/evm/event_binding.go @@ -224,8 +224,14 @@ func (e *eventBinding) getLatestValueWithFilters( return err } + fai := filtersAndIndices[0] remainingFilters := filtersAndIndices[1:] + logs, err := e.lp.IndexedLogs(ctx, e.hash, e.address, 1, []common.Hash{fai}, confs) + if err != nil { + return wrapInternalErr(err) + } + // TODO Use filtered logs here BCF-3316 // TODO: there should be a better way to ask log poller to filter these // First, you should be able to ask for as many topics to match diff --git a/integration-tests/.tool-versions b/integration-tests/.tool-versions index d623afb283..342dafb1fa 100644 --- a/integration-tests/.tool-versions +++ b/integration-tests/.tool-versions @@ -1,4 +1,4 @@ -golang 1.22.5 +golang 1.22.7 k3d 5.4.6 kubectl 1.25.5 nodejs 20.13.1