Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migration("re-indexing"), backfilling and diasgnostics tooling for the ChainIndexer #12450

Open
wants to merge 10 commits into
base: feat/msg-eth-tx-index
Choose a base branch
from
2 changes: 1 addition & 1 deletion api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 16 additions & 8 deletions build/openrpc/full.json
Original file line number Diff line number Diff line change
Expand Up @@ -2055,10 +2055,17 @@
"schema": {
"examples": [
{
"TipsetKey": "string value",
"TipSetKey": [
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
{
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
}
],
"Height": 42,
"TotalMessages": 42,
"TotalEvents": 42,
"IndexedMessagesCount": 42,
"IndexedEventsCount": 42,
"Backfilled": true
}
],
Expand All @@ -2071,16 +2078,17 @@
"title": "number",
"type": "number"
},
"TipsetKey": {
"type": "string"
},
"TotalEvents": {
"IndexedEventsCount": {
"title": "number",
"type": "number"
},
"TotalMessages": {
"IndexedMessagesCount": {
"title": "number",
"type": "number"
},
"TipSetKey": {
"additionalProperties": false,
"type": "object"
}
},
"type": [
Expand Down
52 changes: 32 additions & 20 deletions chain/index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (si *SqliteIndexer) sanityCheckBackfillEpoch(ctx context.Context, epoch abi
return nil
}

func (si *SqliteIndexer) validateNullRound(ctx context.Context, epoch abi.ChainEpoch) (*types.IndexValidation, error) {
func (si *SqliteIndexer) validateIsNullRound(ctx context.Context, epoch abi.ChainEpoch) (*types.IndexValidation, error) {
// make sure we do not have ANY non-reverted tipset at this epoch
var isNullRound bool
err := si.stmts.hasNullRoundAtHeightStmt.QueryRowContext(ctx, epoch).Scan(&isNullRound)
Expand Down Expand Up @@ -84,18 +84,14 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
if err != nil {
return nil, xerrors.Errorf("failed to get tipset by height: %w", err)
}
expectedTsKeyCid, err := expectedTs.Key().Cid()
if err != nil {
return nil, xerrors.Errorf("failed to get tipset key cid: %w", err)
}

// Canonical chain has a null round at the epoch -> return if index is empty otherwise validate
if expectedTs.Height() != epoch { // Canonical chain has a null round at the epoch
if isIndexEmpty {
return nil, nil
}
// validate the db has a hole here and error if not, we don't attempt to repair because something must be very wrong for this to fail
return si.validateNullRound(ctx, epoch)
return si.validateIsNullRound(ctx, epoch)
}

// if the index is empty -> short-circuit and simply backfill if applicable
Expand Down Expand Up @@ -131,15 +127,18 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
if err != nil {
return nil, xerrors.Errorf("failed to cast tipset key cid: %w", err)
}

expectedTsKeyCid, err := expectedTs.Key().Cid()
if err != nil {
return nil, xerrors.Errorf("failed to get tipset key cid: %w", err)
}
if !indexedTsKeyCid.Equals(expectedTsKeyCid) {
return nil, xerrors.Errorf("index corruption: non-reverted tipset at height %d has key %s, but canonical chain has %s", epoch, indexedTsKeyCid, expectedTsKeyCid)
}

// indexedTsKeyCid and expectedTsKeyCid are the same, so we can use `expectedTs` to fetch the indexed data
indexedData, err := si.getIndexedTipSetData(ctx, expectedTs)
if err != nil {
return nil, xerrors.Errorf("failed to get tipset message and event counts at height %d: %w", expectedTs.Height(), err)
return nil, xerrors.Errorf("failed to get indexed data for tipset at height %d: %w", expectedTs.Height(), err)
}

if indexedData == nil {
Expand All @@ -151,10 +150,10 @@ func (si *SqliteIndexer) ChainValidateIndex(ctx context.Context, epoch abi.Chain
}

return &types.IndexValidation{
TipsetKey: expectedTs.Key().String(),
Height: uint64(expectedTs.Height()),
NonRevertedMessageCount: uint64(indexedData.nonRevertedMessageCount),
NonRevertedEventsCount: uint64(indexedData.nonRevertedEventCount),
TipSetKey: expectedTs.Key(),
Height: uint64(expectedTs.Height()),
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
}, nil
}

Expand Down Expand Up @@ -207,8 +206,11 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
}

// the parent tipset of the execution tipset should be the same as the indexed tipset (`ts` should be the parent of `executionTs`)
// if that is not the case, it means that the chain forked after we fetched the tipset `ts` from the canonical chain and
// `ts` is no longer part of the canonical chain. Simply return an error here and ask the user to retry.
if !eParentTsKeyCid.Equals(tsKeyCid) {
return xerrors.Errorf("execution tipset parent key mismatch: chainstore has %s, index has %s", eParentTsKeyCid, tsKeyCid)
return xerrors.Errorf("execution tipset parent key mismatch: chainstore has %s, index has %s; please retry your request as this could have been caused by a chain reorg",
eParentTsKeyCid, tsKeyCid)
}

executedMsgs, err := si.loadExecutedMessages(ctx, ts, executionTs)
Expand All @@ -222,12 +224,22 @@ func (si *SqliteIndexer) verifyIndexedData(ctx context.Context, ts *types.TipSet
}

if totalEventsCount != indexedData.nonRevertedEventCount {
return xerrors.Errorf("tipset event count mismatch: chainstore has %d, index has %d", totalEventsCount, indexedData.nonRevertedEventCount)
return xerrors.Errorf("event count mismatch: chainstore has %d, index has %d", totalEventsCount, indexedData.nonRevertedEventCount)
}

totalExecutedMsgCount := len(executedMsgs)
if totalExecutedMsgCount != indexedData.nonRevertedMessageCount {
return xerrors.Errorf("tipset executed message count mismatch: chainstore has %d, index has %d", totalExecutedMsgCount, indexedData.nonRevertedMessageCount)
return xerrors.Errorf("message count mismatch: chainstore has %d, index has %d", totalExecutedMsgCount, indexedData.nonRevertedMessageCount)
}

// if non-reverted events exist which means that tipset `ts` has been executed, there should be 0 reverted events in the DB
var hasRevertedEventsInTipset bool
err = si.stmts.hasRevertedEventsInTipsetStmt.QueryRowContext(ctx, tsKeyCid.Bytes()).Scan(&hasRevertedEventsInTipset)
if err != nil {
return xerrors.Errorf("failed to check if there are reverted events in tipset: %w", err)
}
if hasRevertedEventsInTipset {
return xerrors.Errorf("index corruption: reverted events found for an executed tipset %s", tsKeyCid)
}

return nil
Expand Down Expand Up @@ -261,10 +273,10 @@ func (si *SqliteIndexer) backfillMissingTipset(ctx context.Context, ts *types.Ti
}

return &types.IndexValidation{
TipsetKey: ts.Key().String(),
Height: uint64(ts.Height()),
Backfilled: true,
NonRevertedMessageCount: uint64(indexedData.nonRevertedMessageCount),
NonRevertedEventsCount: uint64(indexedData.nonRevertedEventCount),
TipSetKey: ts.Key(),
Height: uint64(ts.Height()),
Backfilled: true,
IndexedMessagesCount: uint64(indexedData.nonRevertedMessageCount),
IndexedEventsCount: uint64(indexedData.nonRevertedEventCount),
}, nil
}
5 changes: 3 additions & 2 deletions chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.hasNullRoundAtHeightStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message WHERE height = ?)",
&ps.getNonRevertedTipsetAtHeightStmt: "SELECT tipset_key_cid FROM tipset_message WHERE height = ? AND reverted = 0 LIMIT 1",
&ps.countTipsetsAtHeightStmt: "SELECT COUNT(CASE WHEN reverted = 1 THEN 1 END) AS reverted_count, COUNT(CASE WHEN reverted = 0 THEN 1 END) AS non_reverted_count FROM (SELECT tipset_key_cid, MAX(reverted) AS reverted FROM tipset_message WHERE height = ? GROUP BY tipset_key_cid) AS unique_tipsets",
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0",
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.getNonRevertedTipsetMessageCountStmt: "SELECT COUNT(*) FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0 AND message_cid IS NOT NULL",
&ps.getNonRevertedTipsetEventCountStmt: "SELECT COUNT(*) FROM event WHERE reverted = 0 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ? AND reverted = 0)",
&ps.hasRevertedEventsInTipsetStmt: "SELECT EXISTS(SELECT 1 FROM event WHERE reverted = 1 AND message_id IN (SELECT message_id FROM tipset_message WHERE tipset_key_cid = ?))",
}
}
14 changes: 7 additions & 7 deletions chain/index/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type preparedStatements struct {

getNonRevertedTipsetMessageCountStmt *sql.Stmt
getNonRevertedTipsetEventCountStmt *sql.Stmt
hasRevertedEventsInTipsetStmt *sql.Stmt
}

type SqliteIndexer struct {
Expand Down Expand Up @@ -224,15 +225,11 @@ func (si *SqliteIndexer) indexSignedMessage(ctx context.Context, tx *sql.Tx, msg
}

func (si *SqliteIndexer) Apply(ctx context.Context, from, to *types.TipSet) error {
si.closeLk.RLock()
if si.isClosed() {
si.closeLk.RUnlock()
return ErrClosed
}
si.closeLk.RUnlock()

si.writerLk.Lock()
defer si.writerLk.Unlock()

// We're moving the chain ahead from the `from` tipset to the `to` tipset
// Height(to) > Height(from)
Expand All @@ -245,8 +242,10 @@ func (si *SqliteIndexer) Apply(ctx context.Context, from, to *types.TipSet) erro
})

if err != nil {
si.writerLk.Unlock()
return xerrors.Errorf("failed to apply tipset: %w", err)
}
si.writerLk.Unlock()

si.notifyUpdateSubs()

Expand Down Expand Up @@ -348,9 +347,6 @@ func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) err
return ErrClosed
}

si.writerLk.Lock()
defer si.writerLk.Unlock()

// We're reverting the chain from the tipset at `from` to the tipset at `to`.
// Height(to) < Height(from)

Expand All @@ -366,6 +362,8 @@ func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) err
return xerrors.Errorf("failed to get tipset key cid: %w", err)
}

si.writerLk.Lock()

err = withTx(ctx, si.db, func(tx *sql.Tx) error {
// revert the `from` tipset
if _, err := tx.Stmt(si.stmts.updateTipsetToRevertedStmt).ExecContext(ctx, revertTsKeyCid); err != nil {
Expand All @@ -386,9 +384,11 @@ func (si *SqliteIndexer) Revert(ctx context.Context, from, to *types.TipSet) err
return nil
})
if err != nil {
si.writerLk.Unlock()
return xerrors.Errorf("failed during revert transaction: %w", err)
}

si.writerLk.Unlock()
si.notifyUpdateSubs()

return nil
Expand Down
6 changes: 3 additions & 3 deletions chain/types/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type IndexValidation struct {
TipSetKey TipSetKey
Height uint64

NonRevertedMessageCount uint64
NonRevertedEventsCount uint64
Backfilled bool
IndexedMessagesCount uint64
IndexedEventsCount uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this to IndexedNonRevertedMsgCount and IndexedNonRevertedEventsCount to be precise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akaladarshi That is an implementation detail that will confuse users. They're only asking for the indexed entries at a specific epoch here and so ofcourse expect only non-reverted tipsets back.

Backfilled bool
}
16 changes: 11 additions & 5 deletions documentation/en/api-v1-unstable-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,7 @@ Response: `"0"`
### ChainValidateIndex
There are not yet any comments for this method.

Perms: read
Perms: write

Inputs:
```json
Expand All @@ -1250,11 +1250,17 @@ Inputs:
Response:
```json
{
"TipsetKey": "string value",
"TipSetKey": [
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
{
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
}
],
"Height": 42,
"TotalMessages": 42,
"TotalEvents": 42,
"EventsReverted": true,
"IndexedMessagesCount": 42,
"IndexedEventsCount": 42,
"Backfilled": true
}
```
Expand Down
60 changes: 39 additions & 21 deletions itests/eth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,31 +525,49 @@ func TestEthGetLogsBasic(t *testing.T) {

AssertEthLogs(t, rctLogs, expected, received)

epoch := uint64(0)
iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false)
head, err := client.ChainHead(ctx)
require.NoError(err)
require.NotNil(iv)

fmt.Printf("index validation: %v\n", iv)
for height := 0; height < int(head.Height()); height++ {
// for each tipset
ts, err := client.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height), types.EmptyTSK)
require.NoError(err)

// Add assertions for IndexValidation fields
require.NotEmpty(t, iv.TipsetKey, "TipsetKey should not be empty")
require.Equal(t, epoch, iv.Height, "Height should be 0")
require.GreaterOrEqual(t, iv.NonRevertedMessageCount, uint64(0), "NonRevertedMessageCount should be non-negative") // TODO: change according to actual number of messages in the tipset
require.GreaterOrEqual(t, iv.NonRevertedEventsCount, uint64(0), "NonRevertedEventsCount should be non-negative") // TODO: change according to actual number of messages in the tipset
require.False(iv.Backfilled, "Backfilled should be flase")
if ts.Height() != abi.ChainEpoch(height) {
iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(height), false)
require.Nil(iv)
require.NoError(err)
t.Logf("tipset %d is a null round", height)
continue
}

epoch = 22
iv, err = client.ChainValidateIndex(ctx, abi.ChainEpoch(epoch), false)
require.NoError(err)
require.NotNil(iv)
fmt.Printf("index validation: %v\n", iv)

require.NotEmpty(t, iv.TipsetKey, "TipsetKey should not be empty")
require.Equal(t, epoch, iv.Height, "Height should be 22")
require.GreaterOrEqual(t, iv.NonRevertedMessageCount, uint64(0), "NonRevertedMessageCount be non-negative") // TODO: change according to actual number of messages in the tipset
require.GreaterOrEqual(t, iv.NonRevertedEventsCount, uint64(0), "NonRevertedEventsCount be non-negative") // TODO: change according to actual number of messages in the tipset
require.True(iv.Backfilled, "Backfilled should be false")
totalMessageCount := 0
totalEventCount := 0
messages, err := client.ChainGetMessagesInTipset(ctx, ts.Key())
require.NoError(err)
totalMessageCount = len(messages)
for _, m := range messages {
receipt, err := client.StateSearchMsg(ctx, types.EmptyTSK, m.Cid, -1, false)
require.NoError(err)
require.NotNil(receipt)
// receipt
if receipt.Receipt.EventsRoot != nil {
events, err := client.ChainGetEvents(ctx, *receipt.Receipt.EventsRoot)
require.NoError(err)
totalEventCount += len(events)
}
}
t.Logf("tipset %d: %d messages, %d events", height, totalMessageCount, totalEventCount)

iv, err := client.ChainValidateIndex(ctx, abi.ChainEpoch(height), false)
require.NoError(err)
require.NotNil(iv)
t.Logf("tipset %d: %+v", height, iv)
require.EqualValues(height, iv.Height)
require.EqualValues(totalMessageCount, iv.IndexedMessagesCount)
require.EqualValues(totalEventCount, iv.IndexedEventsCount)
require.False(iv.Backfilled)
}
}

func TestEthSubscribeLogsNoTopicSpec(t *testing.T) {
Expand Down
Loading