From 730dbc5ce3a96b8c637a43415735e99435847c89 Mon Sep 17 00:00:00 2001 From: ghost Date: Wed, 28 Aug 2024 19:23:14 +0100 Subject: [PATCH] fix: create delegation reverse index over multiple blocks at 1000 items per block --- x/staking/keeper/abci.go | 15 ++++ x/staking/keeper/grpc_query.go | 7 ++ x/staking/keeper/keeper_test.go | 3 + x/staking/keeper/validator_index.go | 86 ++++++++++++++++++ x/staking/keeper/validator_index_test.go | 109 +++++++++++++++++++++++ x/staking/migrations/v5/keys.go | 7 +- x/staking/migrations/v5/store.go | 13 ++- x/staking/types/keys.go | 2 + 8 files changed, 238 insertions(+), 4 deletions(-) create mode 100644 x/staking/keeper/validator_index.go create mode 100644 x/staking/keeper/validator_index_test.go diff --git a/x/staking/keeper/abci.go b/x/staking/keeper/abci.go index 8131fa6dd16ff..bcaab73488f57 100644 --- a/x/staking/keeper/abci.go +++ b/x/staking/keeper/abci.go @@ -6,6 +6,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" "github.com/cosmos/cosmos-sdk/telemetry" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/x/staking/types" ) @@ -19,5 +20,19 @@ func (k *Keeper) BeginBlocker(ctx context.Context) error { // EndBlocker called at every block, update validator set func (k *Keeper) EndBlocker(ctx context.Context) ([]abci.ValidatorUpdate, error) { defer telemetry.ModuleMeasureSince(types.ModuleName, telemetry.Now(), telemetry.MetricKeyEndBlocker) + + // TODO: Remove migration code and panic catch in the next upgrade + // Wrap the migration call in a function that can recover from panics + func() { + defer func() { + if r := recover(); r != nil { + k.Logger(sdk.UnwrapSDKContext(ctx)).Error("Panic in MigrateDelegationsByValidatorIndex", "recover", r) + } + }() + + // Only migrate 1000 items per block to make sure block times don't grow too much + k.MigrateDelegationsByValidatorIndex(sdk.UnwrapSDKContext(ctx), 1000) + }() + return k.BlockValidatorUpdates(ctx) } diff --git a/x/staking/keeper/grpc_query.go b/x/staking/keeper/grpc_query.go index 890ca859c12b6..551ad16163bc1 100644 --- a/x/staking/keeper/grpc_query.go +++ b/x/staking/keeper/grpc_query.go @@ -2,6 +2,7 @@ package keeper import ( "context" + "fmt" "strings" "google.golang.org/grpc/codes" @@ -108,6 +109,12 @@ func (k Querier) ValidatorDelegations(ctx context.Context, req *types.QueryValid pageRes *query.PageResponse ) pageRes, err = query.Paginate(delStore, req.Pagination, func(delAddr, value []byte) error { + // Check the store to see if there is a value stored under the key + key := store.Get(types.NextMigrateDelegationsByValidatorIndexKey) + if key != nil { + // Users will never see this error as if there is an error the function defaults to the legacy implementation below + return fmt.Errorf("store migration is not finished, try again later") + } bz := store.Get(types.GetDelegationKey(delAddr, valAddr)) var delegation types.Delegation diff --git a/x/staking/keeper/keeper_test.go b/x/staking/keeper/keeper_test.go index ce2000f733b8d..a927ddc7c1d15 100644 --- a/x/staking/keeper/keeper_test.go +++ b/x/staking/keeper/keeper_test.go @@ -8,6 +8,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/suite" + "cosmossdk.io/core/store" "cosmossdk.io/math" storetypes "cosmossdk.io/store/types" @@ -40,6 +41,7 @@ type KeeperTestSuite struct { accountKeeper *stakingtestutil.MockAccountKeeper queryClient stakingtypes.QueryClient msgServer stakingtypes.MsgServer + storeService store.KVStoreService } func (s *KeeperTestSuite) SetupTest() { @@ -73,6 +75,7 @@ func (s *KeeperTestSuite) SetupTest() { s.stakingKeeper = keeper s.bankKeeper = bankKeeper s.accountKeeper = accountKeeper + s.storeService = storeService stakingtypes.RegisterInterfaces(encCfg.InterfaceRegistry) queryHelper := baseapp.NewQueryServerTestHelper(ctx, encCfg.InterfaceRegistry) diff --git a/x/staking/keeper/validator_index.go b/x/staking/keeper/validator_index.go new file mode 100644 index 0000000000000..16061b947b6aa --- /dev/null +++ b/x/staking/keeper/validator_index.go @@ -0,0 +1,86 @@ +package keeper + +import ( + "fmt" + + "cosmossdk.io/store/prefix" + + "github.com/cosmos/cosmos-sdk/runtime" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/x/staking/types" +) + +// MigrateDelegationsByValidatorIndex is a migration that runs over multiple blocks, +// this is necessary as to build the reverse index we need to iterate over a large set +// of delegations. +func (k Keeper) MigrateDelegationsByValidatorIndex(ctx sdk.Context, iterationLimit int) error { + store := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx)) + valStore := prefix.NewStore(store, types.DelegationKey) + + // Check the store to see if there is a value stored under the key + key := store.Get(types.NextMigrateDelegationsByValidatorIndexKey) + if key == nil { + return nil + } + + // Initialize the counter to 0 + iterationCounter := 0 + + // Start the iterator from the key that is in the store + iterator := valStore.Iterator(key, nil) + defer iterator.Close() + + for ; iterator.Valid(); iterator.Next() { + key := iterator.Key() + + // Parse the index to use setting the reverse index + del, val, err := ParseDelegationKey(key) + if err != nil { + return err + } + + // Set the reverse index in the store + store.Set(types.GetDelegationsByValKey(val, del), []byte{}) + + iterationCounter++ + if iterationCounter >= iterationLimit { + ctx.Logger().Info(fmt.Sprintf("Migrated %d delegations, next key %x", iterationLimit, key)) + + // Set the key in the store after it has been processed + store.Set(types.NextMigrateDelegationsByValidatorIndexKey, key) + break + } + } + + // If the iterator is invalid we have processed the full store + if !iterator.Valid() { + ctx.Logger().Info("Migration completed") + store.Delete(types.NextMigrateDelegationsByValidatorIndexKey) + } + + return nil +} + +// ParseDelegationKey parses given key and returns delagator, validator address bytes +func ParseDelegationKey(bz []byte) (sdk.AccAddress, sdk.ValAddress, error) { + delAddrLen := bz[0] + bz = bz[1:] // remove the length byte of delegator address. + if len(bz) == 0 { + return nil, nil, fmt.Errorf("no bytes left to parse delegator address: %X", bz) + } + + del := bz[:int(delAddrLen)] + bz = bz[int(delAddrLen):] // remove the length byte of a delegator address + if len(bz) == 0 { + return nil, nil, fmt.Errorf("no bytes left to parse delegator address: %X", bz) + } + + bz = bz[1:] // remove the validator address bytes. + if len(bz) == 0 { + return nil, nil, fmt.Errorf("no bytes left to parse validator address: %X", bz) + } + + val := bz + + return del, val, nil +} diff --git a/x/staking/keeper/validator_index_test.go b/x/staking/keeper/validator_index_test.go new file mode 100644 index 0000000000000..7b48e2dbb5424 --- /dev/null +++ b/x/staking/keeper/validator_index_test.go @@ -0,0 +1,109 @@ +package keeper_test + +import ( + "cosmossdk.io/core/store" + sdkmath "cosmossdk.io/math" + storetypes "cosmossdk.io/store/types" + + "github.com/cosmos/cosmos-sdk/codec" + "github.com/cosmos/cosmos-sdk/runtime" + "github.com/cosmos/cosmos-sdk/testutil/sims" + sdk "github.com/cosmos/cosmos-sdk/types" + moduletestutil "github.com/cosmos/cosmos-sdk/types/module/testutil" + "github.com/cosmos/cosmos-sdk/x/staking" + "github.com/cosmos/cosmos-sdk/x/staking/types" +) + +// TestDelegationsByValidatorMigration tests the multi block migration of the reverse delegation index +func (s *KeeperTestSuite) TestDelegationsByValidatorMigration() { + require := s.Require() + ctx, keeper := s.ctx, s.stakingKeeper + store := s.storeService.OpenKVStore(ctx) + storeInit := runtime.KVStoreAdapter(store) + cdc := moduletestutil.MakeTestEncodingConfig(staking.AppModuleBasic{}).Codec + + accAddrs := sims.CreateIncrementalAccounts(15) + valAddrs := sims.ConvertAddrsToValAddrs(accAddrs[0:1]) + var addedDels []types.Delegation + + for i := 1; i < 11; i++ { + del1 := types.NewDelegation(accAddrs[i].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100)) + store.Set(types.GetDelegationKey(accAddrs[i], valAddrs[0]), types.MustMarshalDelegation(cdc, del1)) + addedDels = append(addedDels, del1) + } + + // number of items we migrate per migration + migrationCadence := 7 + + // set the key in the store, this happens on the original migration + iterator := storetypes.KVStorePrefixIterator(storeInit, types.DelegationKey) + for ; iterator.Valid(); iterator.Next() { + key := iterator.Key() + store.Set(types.NextMigrateDelegationsByValidatorIndexKey, key[1:]) + break + } + + // before migration the state of delegations by val index should be empty + dels := getValDelegations(cdc, store, valAddrs[0]) + require.Equal(len(dels), 0) + + // run the first round of migrations first 6, 10 in store + err := keeper.MigrateDelegationsByValidatorIndex(ctx, migrationCadence) + require.NoError(err) + + // after migration the state of delegations by val index should not be empty + dels = getValDelegations(cdc, store, valAddrs[0]) + require.Equal(len(dels), migrationCadence) + require.NotEqual(len(dels), len(addedDels)) + + // check that the next value needed from the store is present + next, err := store.Get(types.NextMigrateDelegationsByValidatorIndexKey) + require.NoError(err) + require.NotNil(next) + + // delegate to a validator while the migration is in progress + delagationWhileMigrationInProgress := types.NewDelegation(accAddrs[12].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100)) + keeper.SetDelegation(ctx, delagationWhileMigrationInProgress) + addedDels = append(addedDels, delagationWhileMigrationInProgress) + + // while migrating get state of delegations by val index should be increased by 1 + delagationWhileMigrationInProgressCount := getValDelegations(cdc, store, valAddrs[0]) + require.Equal(len(delagationWhileMigrationInProgressCount), migrationCadence+1) + + // run the second round of migrations first 12, 10 in store + err = keeper.MigrateDelegationsByValidatorIndex(ctx, migrationCadence) + require.NoError(err) + + // after migration the state of delegations by val index equal all delegations + dels = getValDelegations(cdc, store, valAddrs[0]) + require.Equal(len(dels), len(addedDels)) + require.Equal(dels, addedDels) + + // check that the next value needed from the store is present + next, err = store.Get(types.NextMigrateDelegationsByValidatorIndexKey) + require.NoError(err) + require.Nil(next) +} + +// getValidatorDelegations is a helper function to get all delegations using the new v5 staking reverse index +func getValDelegations(cdc codec.Codec, keeperStore store.KVStore, valAddr sdk.ValAddress) []types.Delegation { + var delegations []types.Delegation + + store := runtime.KVStoreAdapter(keeperStore) + iterator := storetypes.KVStorePrefixIterator(store, types.GetDelegationsByValPrefixKey(valAddr)) + + for ; iterator.Valid(); iterator.Next() { + var delegation types.Delegation + valAddr, delAddr, err := types.ParseDelegationsByValKey(iterator.Key()) + if err != nil { + panic(err) + } + + bz := store.Get(types.GetDelegationKey(delAddr, valAddr)) + cdc.MustUnmarshal(bz, &delegation) + + delegations = append(delegations, delegation) + } + + return delegations +} diff --git a/x/staking/migrations/v5/keys.go b/x/staking/migrations/v5/keys.go index 5c11c8735c3d9..bd689aef17730 100644 --- a/x/staking/migrations/v5/keys.go +++ b/x/staking/migrations/v5/keys.go @@ -15,9 +15,10 @@ const ( ) var ( - DelegationKey = []byte{0x31} // key for a delegation - HistoricalInfoKey = []byte{0x50} // prefix for the historical info - DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator + DelegationKey = []byte{0x31} // key for a delegation + HistoricalInfoKey = []byte{0x50} // prefix for the historical info + DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator + NextMigrateDelegationsByValidatorIndexKey = []byte{0x81} // key used to migrate to the new validator index ) // ParseDelegationKey parses given key and returns delagator, validator address bytes diff --git a/x/staking/migrations/v5/store.go b/x/staking/migrations/v5/store.go index 569cc80c648f2..d5567edac890e 100644 --- a/x/staking/migrations/v5/store.go +++ b/x/staking/migrations/v5/store.go @@ -12,17 +12,28 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -func migrateDelegationsByValidatorIndex(ctx sdk.Context, store storetypes.KVStore, cdc codec.BinaryCodec) error { +func migrateDelegationsByValidatorIndex(ctx sdk.Context, store storetypes.KVStore, _ codec.BinaryCodec) error { iterator := storetypes.KVStorePrefixIterator(store, DelegationKey) + iterationLimit := 1000 + iterationCounter := 0 for ; iterator.Valid(); iterator.Next() { key := iterator.Key() + del, val, err := ParseDelegationKey(key) if err != nil { return err } store.Set(GetDelegationsByValKey(val, del), []byte{}) + + iterationCounter++ + if iterationCounter >= iterationLimit { + ctx.Logger().Info(fmt.Sprintf("Migrated 10000 delegations, next key %x", key[1:])) + // we set the store to the key sans the DelgationKey as we create a prefix store to iterate per block + store.Set(NextMigrateDelegationsByValidatorIndexKey, key[1:]) + break + } } return nil diff --git a/x/staking/types/keys.go b/x/staking/types/keys.go index 598aa332b3123..4afa81994a8a2 100644 --- a/x/staking/types/keys.go +++ b/x/staking/types/keys.go @@ -56,6 +56,8 @@ var ( ParamsKey = []byte{0x51} // prefix for parameters for module x/staking DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator + + NextMigrateDelegationsByValidatorIndexKey = []byte{0x81} // key used to migrate to the new validator index ) // UnbondingType defines the type of unbonding operation