From c88c7c36bf2b1c09b6c18a742bc5060f3cce1ebb Mon Sep 17 00:00:00 2001 From: PaddyMc Date: Fri, 30 Aug 2024 10:41:47 +0100 Subject: [PATCH] fix: create delegation reverse index over multiple blocks at 1000 items per block (#622) * fix: create delegation reverse index over multiple blocks at 1000 items per block * chore: update migration to migrate 10000 items per block --- .../staking/keeper/determinstic_test.go | 2 +- x/staking/keeper/abci.go | 15 ++ x/staking/keeper/grpc_query.go | 7 + x/staking/keeper/keeper_test.go | 3 + x/staking/keeper/validator_index.go | 86 ++++++++++ x/staking/keeper/validator_index_test.go | 150 ++++++++++++++++++ x/staking/migrations/v5/keys.go | 7 +- x/staking/migrations/v5/store.go | 13 +- x/staking/types/keys.go | 2 + 9 files changed, 280 insertions(+), 5 deletions(-) create mode 100644 x/staking/keeper/validator_index.go create mode 100644 x/staking/keeper/validator_index_test.go diff --git a/tests/integration/staking/keeper/determinstic_test.go b/tests/integration/staking/keeper/determinstic_test.go index 1c928d00b719..932d2a488402 100644 --- a/tests/integration/staking/keeper/determinstic_test.go +++ b/tests/integration/staking/keeper/determinstic_test.go @@ -414,7 +414,7 @@ func TestGRPCValidatorDelegations(t *testing.T) { ValidatorAddr: validator.OperatorAddress, } - testdata.DeterministicIterations(f.ctx, t, req, f.queryClient.ValidatorDelegations, 14475, false) + testdata.DeterministicIterations(f.ctx, t, req, f.queryClient.ValidatorDelegations, 17484, false) } func TestGRPCValidatorUnbondingDelegations(t *testing.T) { diff --git a/x/staking/keeper/abci.go b/x/staking/keeper/abci.go index 8131fa6dd16f..f65a5c1460b9 100644 --- a/x/staking/keeper/abci.go +++ b/x/staking/keeper/abci.go @@ -6,6 +6,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" "github.com/cosmos/cosmos-sdk/telemetry" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/x/staking/types" ) @@ -19,5 +20,19 @@ func (k *Keeper) BeginBlocker(ctx context.Context) error { // EndBlocker called at every block, update validator set func (k *Keeper) EndBlocker(ctx context.Context) ([]abci.ValidatorUpdate, error) { defer telemetry.ModuleMeasureSince(types.ModuleName, telemetry.Now(), telemetry.MetricKeyEndBlocker) + + // TODO: Remove migration code and panic catch in the next upgrade + // Wrap the migration call in a function that can recover from panics + func() { + defer func() { + if r := recover(); r != nil { + k.Logger(sdk.UnwrapSDKContext(ctx)).Error("Panic in MigrateDelegationsByValidatorIndex", "recover", r) + } + }() + + // Only migrate 10000 items per block to make the migration as fast as possible + k.MigrateDelegationsByValidatorIndex(sdk.UnwrapSDKContext(ctx), 10000) + }() + return k.BlockValidatorUpdates(ctx) } diff --git a/x/staking/keeper/grpc_query.go b/x/staking/keeper/grpc_query.go index 890ca859c12b..551ad16163bc 100644 --- a/x/staking/keeper/grpc_query.go +++ b/x/staking/keeper/grpc_query.go @@ -2,6 +2,7 @@ package keeper import ( "context" + "fmt" "strings" "google.golang.org/grpc/codes" @@ -108,6 +109,12 @@ func (k Querier) ValidatorDelegations(ctx context.Context, req *types.QueryValid pageRes *query.PageResponse ) pageRes, err = query.Paginate(delStore, req.Pagination, func(delAddr, value []byte) error { + // Check the store to see if there is a value stored under the key + key := store.Get(types.NextMigrateDelegationsByValidatorIndexKey) + if key != nil { + // Users will never see this error as if there is an error the function defaults to the legacy implementation below + return fmt.Errorf("store migration is not finished, try again later") + } bz := store.Get(types.GetDelegationKey(delAddr, valAddr)) var delegation types.Delegation diff --git a/x/staking/keeper/keeper_test.go b/x/staking/keeper/keeper_test.go index ce2000f733b8..a927ddc7c1d1 100644 --- a/x/staking/keeper/keeper_test.go +++ b/x/staking/keeper/keeper_test.go @@ -8,6 +8,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/suite" + "cosmossdk.io/core/store" "cosmossdk.io/math" storetypes "cosmossdk.io/store/types" @@ -40,6 +41,7 @@ type KeeperTestSuite struct { accountKeeper *stakingtestutil.MockAccountKeeper queryClient stakingtypes.QueryClient msgServer stakingtypes.MsgServer + storeService store.KVStoreService } func (s *KeeperTestSuite) SetupTest() { @@ -73,6 +75,7 @@ func (s *KeeperTestSuite) SetupTest() { s.stakingKeeper = keeper s.bankKeeper = bankKeeper s.accountKeeper = accountKeeper + s.storeService = storeService stakingtypes.RegisterInterfaces(encCfg.InterfaceRegistry) queryHelper := baseapp.NewQueryServerTestHelper(ctx, encCfg.InterfaceRegistry) diff --git a/x/staking/keeper/validator_index.go b/x/staking/keeper/validator_index.go new file mode 100644 index 000000000000..16061b947b6a --- /dev/null +++ b/x/staking/keeper/validator_index.go @@ -0,0 +1,86 @@ +package keeper + +import ( + "fmt" + + "cosmossdk.io/store/prefix" + + "github.com/cosmos/cosmos-sdk/runtime" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/x/staking/types" +) + +// MigrateDelegationsByValidatorIndex is a migration that runs over multiple blocks, +// this is necessary as to build the reverse index we need to iterate over a large set +// of delegations. +func (k Keeper) MigrateDelegationsByValidatorIndex(ctx sdk.Context, iterationLimit int) error { + store := runtime.KVStoreAdapter(k.storeService.OpenKVStore(ctx)) + valStore := prefix.NewStore(store, types.DelegationKey) + + // Check the store to see if there is a value stored under the key + key := store.Get(types.NextMigrateDelegationsByValidatorIndexKey) + if key == nil { + return nil + } + + // Initialize the counter to 0 + iterationCounter := 0 + + // Start the iterator from the key that is in the store + iterator := valStore.Iterator(key, nil) + defer iterator.Close() + + for ; iterator.Valid(); iterator.Next() { + key := iterator.Key() + + // Parse the index to use setting the reverse index + del, val, err := ParseDelegationKey(key) + if err != nil { + return err + } + + // Set the reverse index in the store + store.Set(types.GetDelegationsByValKey(val, del), []byte{}) + + iterationCounter++ + if iterationCounter >= iterationLimit { + ctx.Logger().Info(fmt.Sprintf("Migrated %d delegations, next key %x", iterationLimit, key)) + + // Set the key in the store after it has been processed + store.Set(types.NextMigrateDelegationsByValidatorIndexKey, key) + break + } + } + + // If the iterator is invalid we have processed the full store + if !iterator.Valid() { + ctx.Logger().Info("Migration completed") + store.Delete(types.NextMigrateDelegationsByValidatorIndexKey) + } + + return nil +} + +// ParseDelegationKey parses given key and returns delagator, validator address bytes +func ParseDelegationKey(bz []byte) (sdk.AccAddress, sdk.ValAddress, error) { + delAddrLen := bz[0] + bz = bz[1:] // remove the length byte of delegator address. + if len(bz) == 0 { + return nil, nil, fmt.Errorf("no bytes left to parse delegator address: %X", bz) + } + + del := bz[:int(delAddrLen)] + bz = bz[int(delAddrLen):] // remove the length byte of a delegator address + if len(bz) == 0 { + return nil, nil, fmt.Errorf("no bytes left to parse delegator address: %X", bz) + } + + bz = bz[1:] // remove the validator address bytes. + if len(bz) == 0 { + return nil, nil, fmt.Errorf("no bytes left to parse validator address: %X", bz) + } + + val := bz + + return del, val, nil +} diff --git a/x/staking/keeper/validator_index_test.go b/x/staking/keeper/validator_index_test.go new file mode 100644 index 000000000000..13c4c26dc8c6 --- /dev/null +++ b/x/staking/keeper/validator_index_test.go @@ -0,0 +1,150 @@ +package keeper_test + +import ( + "cosmossdk.io/core/store" + sdkmath "cosmossdk.io/math" + storetypes "cosmossdk.io/store/types" + + "github.com/cosmos/cosmos-sdk/codec" + "github.com/cosmos/cosmos-sdk/runtime" + "github.com/cosmos/cosmos-sdk/testutil/sims" + sdk "github.com/cosmos/cosmos-sdk/types" + moduletestutil "github.com/cosmos/cosmos-sdk/types/module/testutil" + "github.com/cosmos/cosmos-sdk/x/staking" + "github.com/cosmos/cosmos-sdk/x/staking/types" +) + +// TestDelegationsByValidatorMigration tests the multi block migration of the reverse delegation index +func (s *KeeperTestSuite) TestDelegationsByValidatorMigration() { + require := s.Require() + ctx, keeper := s.ctx, s.stakingKeeper + store := s.storeService.OpenKVStore(ctx) + storeInit := runtime.KVStoreAdapter(store) + cdc := moduletestutil.MakeTestEncodingConfig(staking.AppModuleBasic{}).Codec + + accAddrs := sims.CreateIncrementalAccounts(15) + valAddrs := sims.ConvertAddrsToValAddrs(accAddrs[0:1]) + var addedDels []types.Delegation + + // start at 1 as 0 addr is the validator addr + for i := 1; i < 11; i++ { + del1 := types.NewDelegation(accAddrs[i].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100)) + store.Set(types.GetDelegationKey(accAddrs[i], valAddrs[0]), types.MustMarshalDelegation(cdc, del1)) + addedDels = append(addedDels, del1) + } + + // number of items we migrate per migration + migrationCadence := 6 + + // set the key in the store, this happens on the original migration + iterator := storetypes.KVStorePrefixIterator(storeInit, types.DelegationKey) + for ; iterator.Valid(); iterator.Next() { + key := iterator.Key() + store.Set(types.NextMigrateDelegationsByValidatorIndexKey, key[1:]) + break + } + + // before migration the state of delegations by val index should be empty + dels := getValDelegations(cdc, store, valAddrs[0]) + require.Equal(len(dels), 0) + + // run the first round of migrations first 6, 10 in store + err := keeper.MigrateDelegationsByValidatorIndex(ctx, migrationCadence) + require.NoError(err) + + // after migration the state of delegations by val index should not be empty + dels = getValDelegations(cdc, store, valAddrs[0]) + require.Equal(len(dels), migrationCadence) + require.NotEqual(len(dels), len(addedDels)) + + // check that the next value needed from the store is present + next, err := store.Get(types.NextMigrateDelegationsByValidatorIndexKey) + require.NoError(err) + require.NotNil(next) + + // delegate to a validator while the migration is in progress + delagationWhileMigrationInProgress := types.NewDelegation(accAddrs[12].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100)) + keeper.SetDelegation(ctx, delagationWhileMigrationInProgress) + addedDels = append(addedDels, delagationWhileMigrationInProgress) + + // remove a delegation from a validator while the migration is in progress that has been processed + removeDelagationWhileMigrationInProgress := types.NewDelegation(accAddrs[3].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100)) + keeper.RemoveDelegation(ctx, removeDelagationWhileMigrationInProgress) + // index in the array is 2 + addedDels = deleteElement(addedDels, 2) + + // remove the index on the off chance this happens during the migration + removeDelagationWhileMigrationInProgressNextIndex := types.NewDelegation(accAddrs[6].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100)) + keeper.RemoveDelegation(ctx, removeDelagationWhileMigrationInProgressNextIndex) + // index in the array is 4, as we've removed one item + addedDels = deleteElement(addedDels, 4) + + // remove a delegation from a validator while the migration is in progress that has not been processed + removeDelagationWhileMigrationInProgressNotProcessed := types.NewDelegation(accAddrs[10].String(), valAddrs[0].String(), sdkmath.LegacyNewDec(100)) + keeper.RemoveDelegation(ctx, removeDelagationWhileMigrationInProgressNotProcessed) + // index in the array is 7, as we've removed 2 items + addedDels = deleteElement(addedDels, 7) + + // while migrating get state of delegations by val index should be increased by 1 + delagationWhileMigrationInProgressCount := getValDelegations(cdc, store, valAddrs[0]) + require.Equal(len(delagationWhileMigrationInProgressCount), migrationCadence-1) + + // run the second round of migrations + err = keeper.MigrateDelegationsByValidatorIndex(ctx, migrationCadence) + require.NoError(err) + + // after migration the state of delegations by val index equal all delegations + dels = getValDelegations(cdc, store, valAddrs[0]) + require.Equal(len(dels), len(addedDels)) + require.Equal(dels, addedDels) + + // check that the next value needed from the store is empty + next, err = store.Get(types.NextMigrateDelegationsByValidatorIndexKey) + require.NoError(err) + require.Nil(next) + + // Iterate over the store by delegation key + delKeyCount := 0 + iteratorDel := storetypes.KVStorePrefixIterator(storeInit, types.DelegationKey) + for ; iteratorDel.Valid(); iteratorDel.Next() { + delKeyCount++ + } + + // Iterate over the store by validator key + valKeyCount := 0 + iteratorVal := storetypes.KVStorePrefixIterator(storeInit, types.DelegationByValIndexKey) + for ; iteratorVal.Valid(); iteratorVal.Next() { + valKeyCount++ + } + + // Make sure the store count is the same + require.Equal(valKeyCount, delKeyCount) +} + +// deleteElement is a simple helper function to remove items from a slice +func deleteElement(slice []types.Delegation, index int) []types.Delegation { + return append(slice[:index], slice[index+1:]...) +} + +// getValidatorDelegations is a helper function to get all delegations using the new v5 staking reverse index +func getValDelegations(cdc codec.Codec, keeperStore store.KVStore, valAddr sdk.ValAddress) []types.Delegation { + var delegations []types.Delegation + + store := runtime.KVStoreAdapter(keeperStore) + iterator := storetypes.KVStorePrefixIterator(store, types.GetDelegationsByValPrefixKey(valAddr)) + + for ; iterator.Valid(); iterator.Next() { + var delegation types.Delegation + valAddr, delAddr, err := types.ParseDelegationsByValKey(iterator.Key()) + if err != nil { + panic(err) + } + + bz := store.Get(types.GetDelegationKey(delAddr, valAddr)) + cdc.MustUnmarshal(bz, &delegation) + + delegations = append(delegations, delegation) + } + + return delegations +} diff --git a/x/staking/migrations/v5/keys.go b/x/staking/migrations/v5/keys.go index 5c11c8735c3d..bd689aef1773 100644 --- a/x/staking/migrations/v5/keys.go +++ b/x/staking/migrations/v5/keys.go @@ -15,9 +15,10 @@ const ( ) var ( - DelegationKey = []byte{0x31} // key for a delegation - HistoricalInfoKey = []byte{0x50} // prefix for the historical info - DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator + DelegationKey = []byte{0x31} // key for a delegation + HistoricalInfoKey = []byte{0x50} // prefix for the historical info + DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator + NextMigrateDelegationsByValidatorIndexKey = []byte{0x81} // key used to migrate to the new validator index ) // ParseDelegationKey parses given key and returns delagator, validator address bytes diff --git a/x/staking/migrations/v5/store.go b/x/staking/migrations/v5/store.go index 569cc80c648f..c12a9b1b528e 100644 --- a/x/staking/migrations/v5/store.go +++ b/x/staking/migrations/v5/store.go @@ -12,17 +12,28 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -func migrateDelegationsByValidatorIndex(ctx sdk.Context, store storetypes.KVStore, cdc codec.BinaryCodec) error { +func migrateDelegationsByValidatorIndex(ctx sdk.Context, store storetypes.KVStore, _ codec.BinaryCodec) error { iterator := storetypes.KVStorePrefixIterator(store, DelegationKey) + iterationLimit := 1000 + iterationCounter := 0 for ; iterator.Valid(); iterator.Next() { key := iterator.Key() + del, val, err := ParseDelegationKey(key) if err != nil { return err } store.Set(GetDelegationsByValKey(val, del), []byte{}) + + iterationCounter++ + if iterationCounter >= iterationLimit { + ctx.Logger().Info(fmt.Sprintf("Migrated 1000 delegations, next key %x", key[1:])) + // we set the store to the key sans the DelgationKey as we create a prefix store to iterate per block + store.Set(NextMigrateDelegationsByValidatorIndexKey, key[1:]) + break + } } return nil diff --git a/x/staking/types/keys.go b/x/staking/types/keys.go index 598aa332b312..4afa81994a8a 100644 --- a/x/staking/types/keys.go +++ b/x/staking/types/keys.go @@ -56,6 +56,8 @@ var ( ParamsKey = []byte{0x51} // prefix for parameters for module x/staking DelegationByValIndexKey = []byte{0x71} // key for delegations by a validator + + NextMigrateDelegationsByValidatorIndexKey = []byte{0x81} // key used to migrate to the new validator index ) // UnbondingType defines the type of unbonding operation