Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flush race #1363

Merged
merged 4 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
cosmossdk.io/math v1.2.0
cosmossdk.io/store v1.0.2
cosmossdk.io/x/feegrant v0.1.0
cosmossdk.io/x/tx v0.13.0
cosmossdk.io/x/upgrade v0.1.0
github.com/avast/retry-go/v4 v4.5.1
github.com/btcsuite/btcd v0.23.5-0.20231215221805-96c9fd8078fd
Expand All @@ -22,7 +23,6 @@ require (
github.com/cosmos/ics23/go v0.10.0
github.com/ethereum/go-ethereum v1.13.5
github.com/gofrs/flock v0.8.1
github.com/google/go-cmp v0.6.0
github.com/google/go-github/v43 v43.0.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/jsternberg/zap-logfmt v1.3.0
Expand Down Expand Up @@ -51,7 +51,6 @@ require (
cosmossdk.io/core v0.11.0 // indirect
cosmossdk.io/depinject v1.0.0-alpha.4 // indirect
cosmossdk.io/log v1.3.0 // indirect
cosmossdk.io/x/tx v0.13.0 // indirect
filippo.io/edwards25519 v1.0.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
Expand Down Expand Up @@ -110,6 +109,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/orderedcode v0.0.1 // indirect
github.com/google/s2a-go v0.1.7 // indirect
Expand Down
14 changes: 13 additions & 1 deletion relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (pathEnd *pathEndRuntime) mergeMessageCache(
channelHandshakeMessages := make(ChannelMessagesCache)
clientICQMessages := make(ClientICQMessagesCache)

messageCache.PacketState.Prune(100) // Only keep most recent 100 packet states per channel

for ch, pmc := range messageCache.PacketFlow {
if pathEnd.ShouldRelayChannel(ChainChannelKey{
ChainID: pathEnd.info.ChainID,
Expand Down Expand Up @@ -194,6 +196,12 @@ func (pathEnd *pathEndRuntime) mergeMessageCache(
}

packetMessages[ch] = newPmc

for eventType, pCache := range newPmc {
for seq := range pCache {
pathEnd.messageCache.PacketState.UpdateState(ch, seq, eventType)
}
}
}
}

Expand Down Expand Up @@ -610,9 +618,13 @@ func (pathEnd *pathEndRuntime) removePacketRetention(
case chantypes.EventTypeRecvPacket:
toDelete[eventType] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeSendPacket] = []uint64{sequence}
case chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket:
case chantypes.EventTypeAcknowledgePacket:
toDelete[eventType] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeRecvPacket] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeWriteAck] = []uint64{sequence}
toDelete[chantypes.EventTypeSendPacket] = []uint64{sequence}
case chantypes.EventTypeTimeoutPacket:
toDelete[eventType] = []uint64{sequence}
toDelete[chantypes.EventTypeSendPacket] = []uint64{sequence}
}
// delete in progress send for this specific message
Expand Down
30 changes: 30 additions & 0 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,36 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun
case <-pp.retryProcess:
// No new data to merge in, just retry handling.
case <-pp.flushTimer.C:
for len(pp.pathEnd1.incomingCacheData) > 0 {
d := <-pp.pathEnd1.incomingCacheData
// we have new data from ChainProcessor for pathEnd1
pp.pathEnd1.mergeCacheData(
ctx,
cancel,
d,
pp.pathEnd2.info.ChainID,
pp.pathEnd2.inSync,
pp.messageLifecycle,
pp.pathEnd2,
pp.memoLimit,
pp.maxReceiverSize,
)
}
for len(pp.pathEnd2.incomingCacheData) > 0 {
d := <-pp.pathEnd2.incomingCacheData
// we have new data from ChainProcessor for pathEnd2
pp.pathEnd2.mergeCacheData(
ctx,
cancel,
d,
pp.pathEnd1.info.ChainID,
pp.pathEnd1.inSync,
pp.messageLifecycle,
pp.pathEnd1,
pp.memoLimit,
pp.maxReceiverSize,
)
}
// Periodic flush to clear out any old packets
pp.handleFlush(ctx)
}
Expand Down
11 changes: 10 additions & 1 deletion relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,10 @@ func (pp *PathProcessor) queuePendingRecvAndAcks(
var skipped *skippedPackets

for i, seq := range unrecv {
if state, ok := dst.messageCache.PacketState.State(k, seq); ok && stateValue(state) >= stateValue(chantypes.EventTypeRecvPacket) {
continue // already recv'd by path processor
}

srcMu.Lock()
if srcCache.IsCached(chantypes.EventTypeSendPacket, k, seq) {
continue // already cached
Expand Down Expand Up @@ -1340,8 +1344,13 @@ SeqLoop:
}

for i, seq := range unacked {
dstMu.Lock()
ck := k.Counterparty()

if state, ok := dst.messageCache.PacketState.State(ck, seq); ok && stateValue(state) >= stateValue(chantypes.EventTypeAcknowledgePacket) {
continue // already acked by path processor
}

dstMu.Lock()
if dstCache.IsCached(chantypes.EventTypeRecvPacket, ck, seq) &&
dstCache.IsCached(chantypes.EventTypeWriteAck, ck, seq) {
continue // already cached
Expand Down
78 changes: 78 additions & 0 deletions relayer/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (t *ChannelCloseLifecycle) messageLifecycler() {}
// which will retain relevant messages for each PathProcessor.
type IBCMessagesCache struct {
PacketFlow ChannelPacketMessagesCache
PacketState ChannelPacketStateCache
ConnectionHandshake ConnectionMessagesCache
ChannelHandshake ChannelMessagesCache
ClientICQ ClientICQMessagesCache
Expand All @@ -115,6 +116,7 @@ func (c IBCMessagesCache) Clone() IBCMessagesCache {
func NewIBCMessagesCache() IBCMessagesCache {
return IBCMessagesCache{
PacketFlow: make(ChannelPacketMessagesCache),
PacketState: make(ChannelPacketStateCache),
ConnectionHandshake: make(ConnectionMessagesCache),
ChannelHandshake: make(ChannelMessagesCache),
ClientICQ: make(ClientICQMessagesCache),
Expand All @@ -124,12 +126,18 @@ func NewIBCMessagesCache() IBCMessagesCache {
// ChannelPacketMessagesCache is used for caching a PacketMessagesCache for a given IBC channel.
type ChannelPacketMessagesCache map[ChannelKey]PacketMessagesCache

// ChannelPacketStateCache is used for caching a PacketSequenceStateCache for a given IBC channel.
type ChannelPacketStateCache map[ChannelKey]PacketSequenceStateCache

// PacketMessagesCache is used for caching a PacketSequenceCache for a given IBC message type.
type PacketMessagesCache map[string]PacketSequenceCache

// PacketSequenceCache is used for caching an IBC message for a given packet sequence.
type PacketSequenceCache map[uint64]provider.PacketInfo

// PacketSequenceStateCache is used for caching the state of a packet sequence.
type PacketSequenceStateCache map[uint64]string

// ChannelMessagesCache is used for caching a ChannelMessageCache for a given IBC message type.
type ChannelMessagesCache map[string]ChannelMessageCache

Expand Down Expand Up @@ -344,6 +352,76 @@ func (c PacketMessagesCache) DeleteMessages(toDelete ...map[string][]uint64) {
}
}

func stateValue(state string) int {
switch state {
case chantypes.EventTypeSendPacket:
return 1
case chantypes.EventTypeRecvPacket:
return 2
case chantypes.EventTypeWriteAck:
return 3
case chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket:
return 4
}
panic(fmt.Errorf("unexpected state: %s", state))
}

func (c ChannelPacketStateCache) UpdateState(k ChannelKey, sequence uint64, state string) {
minState := 0
if sequenceCache, ok := c[k]; ok {
if currentState, ok := sequenceCache[sequence]; ok {
minState = stateValue(currentState)
}
} else {
c[k] = make(PacketSequenceStateCache)
}

if stateValue(state) <= minState {
// can't downgrade state
return
}

c[k][sequence] = state
}

func (c ChannelPacketStateCache) State(k ChannelKey, sequence uint64) (string, bool) {
sequenceCache, ok := c[k]
if !ok {
return "", false
}

state, ok := sequenceCache[sequence]
if !ok {
return "", false
}

return state, true
}

// Prune deletes all map entries except for the most recent (keep) for all channels.
func (c ChannelPacketStateCache) Prune(keep int) {
for _, pssc := range c {
pssc.Prune(keep)
}
}

// Prune deletes all map entries except for the most recent (keep).
func (c PacketSequenceStateCache) Prune(keep int) {
if len(c) <= keep {
return
}
seqs := make([]uint64, 0, len(c))
for seq := range c {
seqs = append(seqs, seq)
}
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })

// only keep recent packet states
for _, seq := range seqs[:len(seqs)-keep] {
delete(c, seq)
}
}

// IsCached returns true if a sequence for a channel key and event type is already cached.
func (c ChannelPacketMessagesCache) IsCached(eventType string, k ChannelKey, sequence uint64) bool {
if _, ok := c[k]; !ok {
Expand Down
32 changes: 32 additions & 0 deletions relayer/processor/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package processor_test
import (
"testing"

chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types"
ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported"
"github.com/cosmos/relayer/v2/relayer/processor"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -41,3 +42,34 @@ func TestIBCHeaderCachePrune(t *testing.T) {
require.Len(t, cache, 5)
require.NotNil(t, cache[uint64(15)], cache[uint64(16)], cache[uint64(17)], cache[uint64(18)], cache[uint64(19)])
}

func TestPacketSequenceStateCachePrune(t *testing.T) {
cache := make(processor.PacketSequenceStateCache)

for i := uint64(0); i < 50; i++ {
cache[i] = chantypes.EventTypeSendPacket
}

cache.Prune(100)

require.Len(t, cache, 50)

cache.Prune(25)

require.Len(t, cache, 25)

min := uint64(1000)
max := uint64(0)

for seq := range cache {
if seq < min {
min = seq
}
if seq > max {
max = seq
}
}

require.Equal(t, uint64(25), min)
require.Equal(t, uint64(49), max)
}
Loading