Skip to content

Commit

Permalink
feat!: optimize pending packets storage on consumer + migration (#1037)
Browse files Browse the repository at this point in the history
* wip

* tests

* tests

* update genesis tests

* comments

* migration and changelog

* migration test

* lints

* merge fixes

* clean

* Update ccv.pb.go

* add to ADR

* address some PR comments

* comment

* Update ccv.pb.go

* lint

* Update x/ccv/consumer/keeper/keeper.go

Co-authored-by: Simon Noetzlin <[email protected]>

* byte wise

---------

Co-authored-by: Simon Noetzlin <[email protected]>
  • Loading branch information
shaspitz and sainoe authored Jul 13, 2023
1 parent fd76f45 commit abf24c7
Show file tree
Hide file tree
Showing 18 changed files with 465 additions and 159 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

Add an entry to the unreleased section whenever merging a PR to main that is not targeted at a specific release. These entries will eventually be included in a release.

* (feat!) optimize pending packets storage on consumer, with migration! [#1037](https://github.com/cosmos/interchain-security/pull/1037)

## v3.1.0

Date July 11th, 2023
Expand Down
16 changes: 16 additions & 0 deletions docs/docs/adrs/adr-008-throttle-retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ title: Throttle with retries
## Changelog

* 6/9/23: Initial draft
* 6/22/23: added note on consumer pending packets storage optimization

## Status

Expand Down Expand Up @@ -46,6 +47,21 @@ With the behavior described, we maintain very similar behavior to the current th

In the normal case, when no or a few slash packets are being sent, the VSCMaturedPackets will not be delayed, and hence unbonding will not be delayed.

### Consumer pending packets storage optimization

In addition to the mentioned consumer changes above. An optimization will need to be made to the consumer's pending packets storage to properly implement the feature from this ADR.

The consumer ccv module previously queued "pending packets" to be sent on each endblocker in [SendPackets](https://github.com/cosmos/interchain-security/blob/3bc4e7135066d848aac60b0787364c07157fd36d/x/ccv/consumer/keeper/relay.go#L178). These packets are queued in state with a protobuf list of `ConsumerPacketData`. For a single append operation, the entire list is deserialized, then a packet is appended to that list, and the list is serialized again. See older version of [AppendPendingPacket](https://github.com/cosmos/interchain-security/blob/05c2dae7c6372b1252b9e97215d07c6aa7618f33/x/ccv/consumer/keeper/keeper.go#L606). That is, a single append operation has O(N) complexity, where N is the size of the list.

This poor append performance isn't a problem when the pending packets list is small. But with this ADR being implemented, the pending packets list could potentially grow to the order of thousands of entries, in the scenario that a slash packet is bouncing.

We can improve the append time for this queue by converting it from a protobuf-esq list, to a queue implemented with sdk-esq code. The idea is to persist an uint64 index that will be incremented each time you queue up a packet. You can think of this as storing the tail of the queue. Then, packet data will be keyed by that index, making the data naturally ordered byte-wise for sdk's iterator. The index will also be stored in the packet data value bytes, so that the index can later be used to delete certain packets from the queue.

Two things are achieved with this approach:

* More efficient packet append/enqueue times
* The ability to delete select packets from the queue (previously all packets were deleted at once)

### Provider changes

The main change needed for the provider is the removal of queuing logic for slash and vsc matured packets upon being received.
Expand Down
5 changes: 4 additions & 1 deletion proto/interchain_security/ccv/v1/ccv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,20 @@ message SlashPacketData {
// unbonding operations.
message MaturedUnbondingOps { repeated uint64 ids = 1; }

// ConsumerPacketData contains a consumer packet data and a type tag
// ConsumerPacketData contains a consumer packet data, type tag, and index for storage.
message ConsumerPacketData {
ConsumerPacketDataType type = 1;

oneof data {
SlashPacketData slashPacketData = 2;
VSCMaturedPacketData vscMaturedPacketData = 3;
}
uint64 idx = 4;
}


// ConsumerPacketDataList is a list of consumer packet data packets.
// NOTE: It is only used for exporting / importing state in InitGenesis and ExportGenesis.
message ConsumerPacketDataList {
repeated ConsumerPacketData list = 1 [ (gogoproto.nullable) = false ];
}
Expand Down
5 changes: 2 additions & 3 deletions tests/integration/expired_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the packets were added to the list of pending data packets
consumerPackets := consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().NotEmpty(consumerPackets)
s.Require().Equal(2, len(consumerPackets.GetList()), "unexpected number of pending data packets")
s.Require().Len(consumerPackets, 2, "unexpected number of pending data packets")

// try to send slash packet for downtime infraction
addr := ed25519.GenPrivKey().PubKey().Address()
Expand All @@ -139,7 +139,7 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the packets were added to the list of pending data packets
consumerPackets = consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().NotEmpty(consumerPackets)
s.Require().Equal(4, len(consumerPackets.GetList()), "unexpected number of pending data packets")
s.Require().Len(consumerPackets, 4, "unexpected number of pending data packets")

// upgrade expired client to the consumer
upgradeExpiredClient(s, Provider)
Expand All @@ -150,7 +150,6 @@ func (s *CCVTestSuite) TestConsumerPacketSendExpiredClient() {
// check that the list of pending data packets is emptied
consumerPackets = consumerKeeper.GetPendingPackets(s.consumerCtx())
s.Require().Empty(consumerPackets)
s.Require().Equal(0, len(consumerPackets.GetList()), "unexpected number of pending data packets")

// relay all packet from consumer to provider
relayAllCommittedPackets(s, s.consumerChain, s.path, ccv.ConsumerPortID, s.path.EndpointA.ChannelID, 4)
Expand Down
24 changes: 12 additions & 12 deletions tests/integration/slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,15 +486,15 @@ func (suite *CCVTestSuite) TestValidatorDowntime() {

// check that slash packet is queued
pendingPackets := consumerKeeper.GetPendingPackets(ctx)
suite.Require().NotEmpty(pendingPackets.List, "pending packets empty")
suite.Require().Len(pendingPackets.List, 1, "pending packets len should be 1 is %d", len(pendingPackets.List))
suite.Require().NotEmpty(pendingPackets, "pending packets empty")
suite.Require().Len(pendingPackets, 1, "pending packets len should be 1 is %d", len(pendingPackets))

// clear queue, commit packets
suite.consumerApp.GetConsumerKeeper().SendPackets(ctx)

// check queue was cleared
pendingPackets = suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().Empty(pendingPackets.List, "pending packets NOT empty")
suite.Require().Empty(pendingPackets, "pending packets NOT empty")

// verify that the slash packet was sent
gotCommit := consumerIBCKeeper.ChannelKeeper.GetPacketCommitment(ctx, ccv.ConsumerPortID, channelID, seq)
Expand Down Expand Up @@ -573,15 +573,15 @@ func (suite *CCVTestSuite) TestValidatorDoubleSigning() {

// check slash packet is queued
pendingPackets := suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().NotEmpty(pendingPackets.List, "pending packets empty")
suite.Require().Len(pendingPackets.List, 1, "pending packets len should be 1 is %d", len(pendingPackets.List))
suite.Require().NotEmpty(pendingPackets, "pending packets empty")
suite.Require().Len(pendingPackets, 1, "pending packets len should be 1 is %d", len(pendingPackets))

// clear queue, commit packets
suite.consumerApp.GetConsumerKeeper().SendPackets(ctx)

// check queue was cleared
pendingPackets = suite.consumerApp.GetConsumerKeeper().GetPendingPackets(ctx)
suite.Require().Empty(pendingPackets.List, "pending packets NOT empty")
suite.Require().Empty(pendingPackets, "pending packets NOT empty")

// check slash packet is sent
gotCommit := suite.consumerApp.GetIBCKeeper().ChannelKeeper.GetPacketCommitment(ctx, ccv.ConsumerPortID, channelID, seq)
Expand Down Expand Up @@ -636,7 +636,7 @@ func (suite *CCVTestSuite) TestQueueAndSendSlashPacket() {
// the downtime slash request duplicates
dataPackets := consumerKeeper.GetPendingPackets(ctx)
suite.Require().NotEmpty(dataPackets)
suite.Require().Len(dataPackets.GetList(), 12)
suite.Require().Len(dataPackets, 12)

// save consumer next sequence
seq, _ := consumerIBCKeeper.ChannelKeeper.GetNextSequenceSend(ctx, ccv.ConsumerPortID, channelID)
Expand All @@ -663,7 +663,7 @@ func (suite *CCVTestSuite) TestQueueAndSendSlashPacket() {
// check that pending data packets got cleared
dataPackets = consumerKeeper.GetPendingPackets(ctx)
suite.Require().Empty(dataPackets)
suite.Require().Len(dataPackets.GetList(), 0)
suite.Require().Len(dataPackets, 0)
}

// TestCISBeforeCCVEstablished tests that the consumer chain doesn't panic or
Expand All @@ -674,14 +674,14 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {

// Check pending packets is empty
pendingPackets := consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 0)
suite.Require().Len(pendingPackets, 0)

consumerKeeper.SlashWithInfractionReason(suite.consumerCtx(), []byte{0x01, 0x02, 0x3},
66, 4324, sdk.MustNewDecFromStr("0.05"), stakingtypes.Infraction_INFRACTION_DOWNTIME)

// Check slash packet was queued
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 1)
suite.Require().Len(pendingPackets, 1)

// Pass 5 blocks, confirming the consumer doesn't panic
for i := 0; i < 5; i++ {
Expand All @@ -690,7 +690,7 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {

// Check packet is still queued
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 1)
suite.Require().Len(pendingPackets, 1)

// establish ccv channel
suite.SetupCCVChannel(suite.path)
Expand All @@ -699,5 +699,5 @@ func (suite *CCVTestSuite) TestCISBeforeCCVEstablished() {
// Pass one more block, and confirm the packet is sent now that ccv channel is established
suite.consumerChain.NextBlock()
pendingPackets = consumerKeeper.GetPendingPackets(suite.consumerCtx())
suite.Require().Len(pendingPackets.List, 0)
suite.Require().Len(pendingPackets, 0)
}
16 changes: 12 additions & 4 deletions x/ccv/consumer/keeper/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,12 @@ func (k Keeper) InitGenesis(ctx sdk.Context, state *consumertypes.GenesisState)
k.SetLastTransmissionBlockHeight(ctx, state.LastTransmissionBlockHeight)
}

// set pending consumer pending packets
// Set pending consumer packets, using the depreciated ConsumerPacketDataList type
// that exists for genesis.
// note that the list includes pending mature VSC packet only if the handshake is completed
k.AppendPendingPacket(ctx, state.PendingConsumerPackets.List...)
for _, packet := range state.PendingConsumerPackets.List {
k.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

// set height to valset update id mapping
for _, h2v := range state.HeightToValsetUpdateId {
Expand Down Expand Up @@ -122,6 +125,11 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
// export the current validator set
valset := k.MustGetCurrentValidatorsAsABCIUpdates(ctx)

// export pending packets using the depreciated ConsumerPacketDataList type
pendingPackets := k.GetPendingPackets(ctx)
pendingPacketsDepreciated := ccv.ConsumerPacketDataList{}
pendingPacketsDepreciated.List = append(pendingPacketsDepreciated.List, pendingPackets...)

// export all the states created after a provider channel got established
if channelID, ok := k.GetProviderChannel(ctx); ok {
clientID, found := k.GetProviderClientID(ctx)
Expand All @@ -136,7 +144,7 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
k.GetAllPacketMaturityTimes(ctx),
valset,
k.GetAllHeightToValsetUpdateIDs(ctx),
k.GetPendingPackets(ctx),
pendingPacketsDepreciated,
k.GetAllOutstandingDowntimes(ctx),
k.GetLastTransmissionBlockHeight(ctx),
params,
Expand All @@ -156,7 +164,7 @@ func (k Keeper) ExportGenesis(ctx sdk.Context) (genesis *consumertypes.GenesisSt
nil,
valset,
k.GetAllHeightToValsetUpdateIDs(ctx),
k.GetPendingPackets(ctx),
pendingPacketsDepreciated,
nil,
consumertypes.LastTransmissionBlockHeight{},
params,
Expand Down
27 changes: 23 additions & 4 deletions x/ccv/consumer/keeper/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ func TestInitGenesis(t *testing.T) {
func(ctx sdk.Context, ck consumerkeeper.Keeper, gs *consumertypes.GenesisState) {
assertConsumerPortIsBound(t, ctx, &ck)

require.Equal(t, pendingDataPackets, ck.GetPendingPackets(ctx))
obtainedPendingPackets := ck.GetPendingPackets(ctx)
for idx, expectedPacketData := range pendingDataPackets.List {
require.Equal(t, expectedPacketData.Type, obtainedPendingPackets[idx].Type)
require.Equal(t, expectedPacketData.Data, obtainedPendingPackets[idx].Data)
}

assertHeightValsetUpdateIDs(t, ctx, &ck, defaultHeightValsetUpdateIDs)
assertProviderClientID(t, ctx, &ck, provClientID)
require.Equal(t, validator.Address.Bytes(), ck.GetAllCCValidator(ctx)[0].Address)
Expand Down Expand Up @@ -186,7 +191,12 @@ func TestInitGenesis(t *testing.T) {
require.Equal(t, provChannelID, gotChannelID)

require.True(t, ck.PacketMaturityTimeExists(ctx, matPackets[0].VscId, matPackets[0].MaturityTime))
require.Equal(t, pendingDataPackets, ck.GetPendingPackets(ctx))

obtainedPendingPackets := ck.GetPendingPackets(ctx)
for idx, expectedPacketData := range pendingDataPackets.List {
require.Equal(t, expectedPacketData.Type, obtainedPendingPackets[idx].Type)
require.Equal(t, expectedPacketData.Data, obtainedPendingPackets[idx].Data)
}

require.Equal(t, gs.OutstandingDowntimeSlashing, ck.GetAllOutstandingDowntimes(ctx))

Expand Down Expand Up @@ -252,12 +262,16 @@ func TestExportGenesis(t *testing.T) {
Data: &ccv.ConsumerPacketData_SlashPacketData{
SlashPacketData: ccv.NewSlashPacketData(abciValidator, vscID, stakingtypes.Infraction_INFRACTION_DOWNTIME),
},
Idx: 0,
},
{
Type: ccv.VscMaturedPacket,
Data: &ccv.ConsumerPacketData_VscMaturedPacketData{
VscMaturedPacketData: ccv.NewVSCMaturedPacketData(vscID),
},
// This idx is a part of the expected genesis state.
// If the keeper is correctly storing consumer packet data, indexes should be populated.
Idx: 1,
},
},
}
Expand Down Expand Up @@ -291,7 +305,10 @@ func TestExportGenesis(t *testing.T) {
ck.SetCCValidator(ctx, cVal)
ck.SetParams(ctx, params)

ck.AppendPendingPacket(ctx, consPackets.List...)
for _, packet := range consPackets.List {
ck.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

ck.SetHeightValsetUpdateID(ctx, defaultHeightValsetUpdateIDs[0].Height, defaultHeightValsetUpdateIDs[0].ValsetUpdateId)
},
consumertypes.NewRestartGenesisState(
Expand Down Expand Up @@ -321,7 +338,9 @@ func TestExportGenesis(t *testing.T) {
ck.SetHeightValsetUpdateID(ctx, updatedHeightValsetUpdateIDs[0].Height, updatedHeightValsetUpdateIDs[0].ValsetUpdateId)
ck.SetHeightValsetUpdateID(ctx, updatedHeightValsetUpdateIDs[1].Height, updatedHeightValsetUpdateIDs[1].ValsetUpdateId)

ck.AppendPendingPacket(ctx, consPackets.List...)
for _, packet := range consPackets.List {
ck.AppendPendingPacket(ctx, packet.Type, packet.Data)
}

// populate the required states for an established CCV channel
ck.SetPacketMaturityTime(ctx, matPackets[0].VscId, matPackets[0].MaturityTime)
Expand Down
88 changes: 59 additions & 29 deletions x/ccv/consumer/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,48 +593,78 @@ func (k Keeper) GetAllValidators(ctx sdk.Context) (validators []stakingtypes.Val
return validators
}

// SetPendingPackets sets the pending CCV packets
func (k Keeper) SetPendingPackets(ctx sdk.Context, packets ccv.ConsumerPacketDataList) {
// getAndIncrementPendingPacketsIdx returns the current pending packets index and increments it.
// This index is used for implementing a FIFO queue of pending packets in the KV store.
func (k Keeper) getAndIncrementPendingPacketsIdx(ctx sdk.Context) (toReturn uint64) {
store := ctx.KVStore(k.storeKey)
bz, err := packets.Marshal()
if err != nil {
// This should never happen
panic(fmt.Errorf("failed to marshal ConsumerPacketDataList: %w", err))
bz := store.Get(types.PendingPacketsIndexKey())
if bz != nil {
toReturn = sdk.BigEndianToUint64(bz)
}
store.Set(types.PendingDataPacketsKey(), bz)
toStore := toReturn + 1
store.Set(types.PendingPacketsIndexKey(), sdk.Uint64ToBigEndian(toStore))
return toReturn
}

// GetPendingPackets returns the pending CCV packets from the store
func (k Keeper) GetPendingPackets(ctx sdk.Context) ccv.ConsumerPacketDataList {
var packets ccv.ConsumerPacketDataList

// GetPendingPackets returns ALL the pending CCV packets from the store
func (k Keeper) GetPendingPackets(ctx sdk.Context) []ccv.ConsumerPacketData {
var packets []ccv.ConsumerPacketData
store := ctx.KVStore(k.storeKey)
bz := store.Get(types.PendingDataPacketsKey())
if bz == nil {
return packets
// Note: PendingDataPacketsBytePrefix is the correct prefix, NOT PendingDataPacketsByteKey.
// See consistency with PendingDataPacketsKey().
iterator := sdk.KVStorePrefixIterator(store, []byte{types.PendingDataPacketsBytePrefix})
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
var packet ccv.ConsumerPacketData
bz := iterator.Value()
err := packet.Unmarshal(bz)
if err != nil {
// An error here would indicate something is very wrong,
panic(fmt.Errorf("failed to unmarshal pending data packet: %w", err))
}
packets = append(packets, packet)
}
return packets
}

err := packets.Unmarshal(bz)
if err != nil {
// An error here would indicate something is very wrong,
// the PendingPackets are assumed to be correctly serialized in SetPendingPackets.
panic(fmt.Errorf("failed to unmarshal pending data packets: %w", err))
// DeletePendingDataPackets deletes pending data packets with given indexes
func (k Keeper) DeletePendingDataPackets(ctx sdk.Context, idxs ...uint64) {
store := ctx.KVStore(k.storeKey)
for _, idx := range idxs {
store.Delete(types.PendingDataPacketsKey(idx))
}

return packets
}

// DeletePendingDataPackets clears the pending data packets in store
func (k Keeper) DeletePendingDataPackets(ctx sdk.Context) {
func (k Keeper) DeleteAllPendingDataPackets(ctx sdk.Context) {
store := ctx.KVStore(k.storeKey)
store.Delete(types.PendingDataPacketsKey())
// Note: PendingDataPacketsBytePrefix is the correct prefix, NOT PendingDataPacketsByteKey.
// See consistency with PendingDataPacketsKey().
iterator := sdk.KVStorePrefixIterator(store, []byte{types.PendingDataPacketsBytePrefix})
keysToDel := [][]byte{}
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
keysToDel = append(keysToDel, iterator.Key())
}
for _, key := range keysToDel {
store.Delete(key)
}
}

// AppendPendingDataPacket appends the given data packet to the pending data packets in store
func (k Keeper) AppendPendingPacket(ctx sdk.Context, packet ...ccv.ConsumerPacketData) {
pending := k.GetPendingPackets(ctx)
list := append(pending.GetList(), packet...)
k.SetPendingPackets(ctx, ccv.ConsumerPacketDataList{List: list})
// AppendPendingPacket enqueues the given data packet to the end of the pending data packets queue
func (k Keeper) AppendPendingPacket(ctx sdk.Context, packetType ccv.ConsumerPacketDataType, data ccv.ExportedIsConsumerPacketData_Data) {
cpd := ccv.NewConsumerPacketData(
packetType,
data,
k.getAndIncrementPendingPacketsIdx(ctx),
)
key := types.PendingDataPacketsKey(cpd.Idx)
store := ctx.KVStore(k.storeKey)
bz, err := cpd.Marshal()
if err != nil {
// This should never happen
panic(fmt.Errorf("failed to marshal ConsumerPacketData: %w", err))
}
store.Set(key, bz)
}

func (k Keeper) MarkAsPrevStandaloneChain(ctx sdk.Context) {
Expand Down
Loading

0 comments on commit abf24c7

Please sign in to comment.