Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add id column as PRIMARY KEY for evm.logs & evm.log_poller_blocks #1441

Open
wants to merge 24 commits into
base: ccip-develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3cc98be
Manually merged in from chainlink repo:
reductionista Sep 17, 2024
679ba08
Manually merge in FilteredLogs receive []Expression from upstream
reductionista Sep 24, 2024
7456353
use tx in insertLogsWithinTx
reductionista Sep 24, 2024
5b22a24
Add id column as PRIMARY KEY for evm.logs & evm.log_poller_blocks
reductionista Sep 17, 2024
b023353
Clean up db indexes
reductionista Sep 13, 2024
d153367
Fix 2 unrelated bugs I noticed
reductionista Sep 13, 2024
b0b82b6
Update ExpiredLogs query
reductionista Sep 13, 2024
62b650b
Update test for fromBlock >= :block_number
reductionista Sep 16, 2024
ef6a78b
Increase staggering of initial pruning runs
reductionista Sep 17, 2024
f311f7d
Decrease retention periods for CCIP events, for testing
reductionista Sep 17, 2024
ea8e250
Fix bug in merged commit from develop
reductionista Sep 17, 2024
079120f
restore retention periods
reductionista Sep 18, 2024
5d24721
Set LogPrunePageSize = 2001
reductionista Sep 18, 2024
3d65b90
merge whitespace differences from chainlink repo
reductionista Sep 24, 2024
5949fe0
sync from chainlink repo
reductionista Sep 24, 2024
dfc58c8
Update DeleteBlocksBefore query to use block_number index instead of …
reductionista Sep 19, 2024
1ffb5c4
Merge unknown changes to tests from chainlink repo
reductionista Sep 24, 2024
097aec6
Changes in orm_test.go from "Split off SelectUnmatchedLogs from Delet…
reductionista Sep 24, 2024
d02e394
Rename LogIds & some changes to orm_test.go from chainlink repo
reductionista Sep 24, 2024
10eb4df
Fix UnmatchedLogs query
reductionista Sep 25, 2024
ae7d150
Reduce 20x to 5x for testing
reductionista Sep 25, 2024
5ee61dc
LogKeepBlocksDepth = 1000
reductionista Sep 25, 2024
a5df432
Add debugging : TODO Remove!
reductionista Sep 25, 2024
f703802
Fix event_bindings merge issue
reductionista Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/brown-geese-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

FilteredLogs receive Expression instead of whole KeyFilter. #internal
5 changes: 5 additions & 0 deletions .changeset/sweet-pumas-refuse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#bugfix Addresses 2 minor issues with the pruning of LogPoller's db tables: logs not matching any filter will now be pruned, and rows deleted are now properly reported for observability
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
golang 1.22.5
golang 1.22.7
mockery 2.43.2
nodejs 20.13.1
pnpm 9.4.0
Expand All @@ -8,4 +8,4 @@ zig 0.11.0
golangci-lint 1.59.1
protoc 25.1
python 3.10.5
task 3.35.1
task 3.35.1
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash,
return nil, ErrDisabled
}

func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ string) ([]Log, error) {
func (d disabled) FilteredLogs(_ context.Context, _ []query.Expression, _ query.LimitAndSort, _ string) ([]Log, error) {
return nil, ErrDisabled
}

Expand Down
70 changes: 60 additions & 10 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand Down Expand Up @@ -68,7 +68,7 @@ type LogPoller interface {
LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

// chainlink-common query filtering
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type LogPollerTest interface {
Expand Down Expand Up @@ -638,31 +638,50 @@ func (lp *logPoller) backgroundWorkerRun() {
ctx, cancel := lp.stopCh.NewCtx()
defer cancel()

blockPruneShortInterval := lp.pollPeriod * 100
blockPruneInterval := blockPruneShortInterval * 10
logPruneShortInterval := lp.pollPeriod * 241 // no common factors with 100
logPruneInterval := logPruneShortInterval * 10

// Avoid putting too much pressure on the database by staggering the pruning of old blocks and logs.
// Usually, node after restart will have some work to boot the plugins and other services.
// Deferring first prune by minutes reduces risk of putting too much pressure on the database.
blockPruneTick := time.After(5 * time.Minute)
logPruneTick := time.After(10 * time.Minute)
// Deferring first prune by at least 5 mins reduces risk of putting too much pressure on the database.
blockPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(blockPruneInterval/2))
logPruneTick := time.After((5 * time.Minute) + timeutil.JitterPct(1.0).Apply(logPruneInterval/2))

successfulExpiredLogPrunes := 0

for {
select {
case <-ctx.Done():
return
case <-blockPruneTick:
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 1000))
blockPruneTick = time.After(timeutil.JitterPct(0.1).Apply(blockPruneInterval))
if allRemoved, err := lp.PruneOldBlocks(ctx); err != nil {
lp.lggr.Errorw("Unable to prune old blocks", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new blocks
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 100))
blockPruneTick = time.After(timeutil.JitterPct(0.1).Apply(blockPruneShortInterval))
}
case <-logPruneTick:
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000
logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneInterval))
if allRemoved, err := lp.PruneExpiredLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241))
logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval))
} else if successfulExpiredLogPrunes == 5 {
// Only prune unmatched logs if we've successfully pruned all expired logs at least 20 times
// since the last time unmatched logs were pruned
if allRemoved, err := lp.PruneUnmatchedLogs(ctx); err != nil {
lp.lggr.Errorw("Unable to prune unmatched logs", "err", err)
} else if !allRemoved {
logPruneTick = time.After(timeutil.JitterPct(0.1).Apply(logPruneShortInterval))
} else {
successfulExpiredLogPrunes = 0
}
} else {
successfulExpiredLogPrunes++
}
}
}
Expand Down Expand Up @@ -1075,6 +1094,7 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
// No blocks saved yet.
return true, nil
}
lp.lggr.Errorw("Calling DeleteBlocksBefore")
if latestBlock.FinalizedBlockNumber <= lp.keepFinalizedBlocksDepth {
// No-op, keep all blocks
return true, nil
Expand All @@ -1086,6 +1106,7 @@ func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
latestBlock.FinalizedBlockNumber-lp.keepFinalizedBlocksDepth,
lp.logPrunePageSize,
)
lp.lggr.Errorw("DeleteBlocksBefore returned", "logPrunePageSize", lp.logPrunePageSize, "rowsRemoved", rowsRemoved, "allRemoved", rowsRemoved < lp.logPrunePageSize)
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

Expand All @@ -1096,6 +1117,16 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

func (lp *logPoller) PruneUnmatchedLogs(ctx context.Context) (bool, error) {
ids, err := lp.orm.SelectUnmatchedLogIDs(ctx, lp.logPrunePageSize)
if err != nil {
return false, err
}
rowsRemoved, err := lp.orm.DeleteLogsByRowID(ctx, ids)

return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// Logs returns logs matching topics and address (exactly) in the given block range,
// which are canonical at time of query.
func (lp *logPoller) Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error) {
Expand Down Expand Up @@ -1518,6 +1549,25 @@ func EvmWord(i uint64) common.Hash {
return common.BytesToHash(b)
}

func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName)
}

// Where is a query.Where wrapper that ignores the Key and returns a slice of query.Expression rather than query.KeyFilter.
// If no expressions are provided, or an error occurs, an empty slice is returned.
func Where(expressions ...query.Expression) ([]query.Expression, error) {
filter, err := query.Where(
"",
expressions...,
)

if err != nil {
return []query.Expression{}, err
}

if filter.Expressions == nil {
return []query.Expression{}, nil
}

return filter.Expressions, nil
}
38 changes: 38 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"
commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils"

htMocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks"
Expand Down Expand Up @@ -2052,3 +2054,39 @@ func TestFindLCA(t *testing.T) {
})
}
}

func TestWhere(t *testing.T) {
address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678")
eventSig := common.HexToHash("0xabcdef1234567890abcdef1234567890abcdef1234")
ts := time.Now()

expr1 := logpoller.NewAddressFilter(address)
expr2 := logpoller.NewEventSigFilter(eventSig)
expr3 := query.Timestamp(uint64(ts.Unix()), primitives.Gte)
expr4 := logpoller.NewConfirmationsFilter(evmtypes.Confirmations(0))

t.Run("Valid combination of filters", func(t *testing.T) {
result, err := logpoller.Where(expr1, expr2, expr3, expr4)
assert.NoError(t, err)
assert.Equal(t, []query.Expression{expr1, expr2, expr3, expr4}, result)
})

t.Run("No expressions (should return empty slice)", func(t *testing.T) {
result, err := logpoller.Where()
assert.NoError(t, err)
assert.Equal(t, []query.Expression{}, result)
})

t.Run("Invalid boolean expression", func(t *testing.T) {
invalidExpr := query.Expression{
BoolExpression: query.BoolExpression{
Expressions: []query.Expression{},
},
}

result, err := logpoller.Where(invalidExpr)
assert.Error(t, err)
assert.EqualError(t, err, "all boolean expressions should have at least 2 expressions")
assert.Equal(t, []query.Expression{}, result)
})
}
16 changes: 8 additions & 8 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64)
})
}

func (o *ObservedORM) DeleteLogsByRowID(ctx context.Context, rowIDs []uint64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteLogsByRowID", del, func() (int64, error) {
return o.ORM.DeleteLogsByRowID(ctx, rowIDs)
})
}

func (o *ObservedORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []uint64, err error) {
return withObservedQueryAndResults[uint64](o, "SelectUnmatchedLogIDs", func() ([]uint64, error) {
return o.ORM.SelectUnmatchedLogIDs(ctx, limit)
})
}

func (o *ObservedORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) {
return o.ORM.DeleteExpiredLogs(ctx, limit)
Expand Down Expand Up @@ -262,7 +274,7 @@ func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address c
})
}

func (o *ObservedORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
func (o *ObservedORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return withObservedQueryAndResults(o, queryName, func() ([]Log, error) {
return o.ORM.FilteredLogs(ctx, filter, limitAndSort, queryName)
})
Expand Down
10 changes: 10 additions & 0 deletions core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420")))

rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3)
require.NoError(t, err)
require.Equal(t, int64(0), rowsAffected)
assert.Equal(t, 0, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete"))

rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0)
require.NoError(t, err)
require.Equal(t, int64(2), rowsAffected)
assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteBlocksBefore", "delete"))

// Don't update counters in case of an error
require.Error(t, orm.InsertLogsWithBlock(ctx, logs, NewLogPollerBlock(utils.RandomBytes32(), 0, time.Now(), 0)))
assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
Expand Down
Loading
Loading