Skip to content

Commit

Permalink
next seq ack handling (#1244)
Browse files Browse the repository at this point in the history
* next seq ack handling and chan order

* use max msgs for ack flush

* improve logs

* fix check

* don't override unless not chantypes.NONE

* fix: Suppressing scary SDK error on redundant packets (#1214)

Co-authored-by: Andrew Gouin <[email protected]>

* tidy logic

* improve logic and order detection

* shorten flushFailureRetry

* check empty string

* tidy logs. better account sequence regex. don't split up ordered channel batches

---------

Co-authored-by: Joe Abbey <[email protected]>
Co-authored-by: jtieri <[email protected]>
  • Loading branch information
3 people authored Jul 31, 2023
1 parent 1301e1d commit ab1c4fc
Show file tree
Hide file tree
Showing 17 changed files with 293 additions and 1,358 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ dist/

# Don't commit the vendor directory if anyone runs 'go mod vendor'.
/vendor

go.work.sum
1,238 changes: 0 additions & 1,238 deletions go.work.sum

This file was deleted.

16 changes: 9 additions & 7 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func NewCosmosChainProcessor(log *zap.Logger, provider *CosmosProvider, metrics

const (
queryTimeout = 5 * time.Second
queryStateTimeout = 60 * time.Second
blockResultsQueryTimeout = 2 * time.Minute
latestHeightQueryRetryDelay = 1 * time.Second
latestHeightQueryRetries = 5
Expand Down Expand Up @@ -279,7 +280,7 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui

// initializeConnectionState will bootstrap the connectionStateCache with the open connection state.
func (ccp *CosmosChainProcessor) initializeConnectionState(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
ctx, cancel := context.WithTimeout(ctx, queryStateTimeout)
defer cancel()
connections, err := ccp.chainProvider.QueryConnections(ctx)
if err != nil {
Expand All @@ -299,7 +300,7 @@ func (ccp *CosmosChainProcessor) initializeConnectionState(ctx context.Context)

// initializeChannelState will bootstrap the channelStateCache with the open channel state.
func (ccp *CosmosChainProcessor) initializeChannelState(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
ctx, cancel := context.WithTimeout(ctx, queryStateTimeout)
defer cancel()
channels, err := ccp.chainProvider.QueryChannels(ctx)
if err != nil {
Expand All @@ -315,12 +316,13 @@ func (ccp *CosmosChainProcessor) initializeChannelState(ctx context.Context) err
continue
}
ccp.channelConnections[ch.ChannelId] = ch.ConnectionHops[0]
ccp.channelStateCache[processor.ChannelKey{
k := processor.ChannelKey{
ChannelID: ch.ChannelId,
PortID: ch.PortId,
CounterpartyChannelID: ch.Counterparty.ChannelId,
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN
}
ccp.channelStateCache.SetOpen(k, ch.State == chantypes.OPEN, ch.Ordering)
}
return nil
}
Expand Down Expand Up @@ -402,11 +404,11 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
})

if err := eg.Wait(); err != nil {
ccp.log.Warn(
"Could not query block data. Consider checking if your RPC node is online, and that transaction indexing is enabled.",
ccp.log.Debug(
"Error querying block data",
zap.Int64("height", i),
zap.Error(err),
)
ccp.log.Debug("Error querying block data", zap.Error(err))

persistence.retriesAtLatestQueriedBlock++
if persistence.retriesAtLatestQueriedBlock >= blockMaxRetries {
Expand Down
10 changes: 5 additions & 5 deletions relayer/chains/cosmos/message_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (ccp *CosmosChainProcessor) handlePacketMessage(eventType string, pi provid
}

if eventType == chantypes.EventTypeTimeoutPacket && pi.ChannelOrder == chantypes.ORDERED.String() {
ccp.channelStateCache[k] = false
ccp.channelStateCache.SetOpen(k, false, chantypes.ORDERED)
}

if !c.PacketFlow.ShouldRetainSequence(ccp.pathProcessors, k, ccp.chainProvider.ChainId(), eventType, pi.Sequence) {
Expand Down Expand Up @@ -78,19 +78,19 @@ func (ccp *CosmosChainProcessor) handleChannelMessage(eventType string, ci provi
}
}
if !found {
ccp.channelStateCache[channelKey] = false
ccp.channelStateCache.SetOpen(channelKey, false, ci.Order)
}
} else {
switch eventType {
case chantypes.EventTypeChannelOpenTry:
ccp.channelStateCache[channelKey] = false
ccp.channelStateCache.SetOpen(channelKey, false, ci.Order)
case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm:
ccp.channelStateCache[channelKey] = true
ccp.channelStateCache.SetOpen(channelKey, true, ci.Order)
ccp.logChannelOpenMessage(eventType, ci)
case chantypes.EventTypeChannelCloseConfirm:
for k := range ccp.channelStateCache {
if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID {
ccp.channelStateCache[k] = false
ccp.channelStateCache.SetOpen(channelKey, false, ci.Order)
break
}
}
Expand Down
10 changes: 5 additions & 5 deletions relayer/chains/cosmos/message_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestChannelStateCache(t *testing.T) {

// The channel state is not open, but the entry should exist in the channelStateCache.
// MsgInitKey returns the ChannelKey with an empty counterparty channel ID.
require.False(t, ccp.channelStateCache[k.MsgInitKey()])
require.False(t, ccp.channelStateCache[k.MsgInitKey()].Open)

// Observe MsgChannelOpenAck, which does have counterparty channel ID.
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenAck, msgOpenAck, c)
Expand All @@ -139,7 +139,7 @@ func TestChannelStateCache(t *testing.T) {

// The fully populated ChannelKey should now be the only entry for this channel.
// The channel now open.
require.True(t, ccp.channelStateCache[k])
require.True(t, ccp.channelStateCache[k].Open)
})

t.Run("handshake already occurred", func(t *testing.T) {
Expand All @@ -156,7 +156,7 @@ func TestChannelStateCache(t *testing.T) {

// Initialize channelStateCache with populated channel ID and counterparty channel ID.
// This emulates initializeChannelState after a recent channel handshake has completed
ccp.channelStateCache[k] = true
ccp.channelStateCache.SetOpen(k, true, chantypes.NONE)

// Observe MsgChannelOpenInit, which does not have counterparty channel ID.
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenInit, msgOpenInit, c)
Expand All @@ -166,7 +166,7 @@ func TestChannelStateCache(t *testing.T) {

// The fully populated ChannelKey should still be the only entry for this channel.
// The channel is still marked open since it was open during initializeChannelState.
require.True(t, ccp.channelStateCache[k])
require.True(t, ccp.channelStateCache[k].Open)

// Observe MsgChannelOpenAck, which does have counterparty channel ID.
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenAck, msgOpenAck, c)
Expand All @@ -175,6 +175,6 @@ func TestChannelStateCache(t *testing.T) {
require.Len(t, ccp.channelStateCache, 1)

// The fully populated ChannelKey should still be the only entry for this channel.
require.True(t, ccp.channelStateCache[k])
require.True(t, ccp.channelStateCache[k].Open)
})
}
23 changes: 23 additions & 0 deletions relayer/chains/cosmos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,29 @@ func (cc *CosmosProvider) QueryNextSeqRecv(ctx context.Context, height int64, ch
}, nil
}

// QueryNextSeqAck returns the next seqAck for a configured channel
func (cc *CosmosProvider) QueryNextSeqAck(ctx context.Context, height int64, channelid, portid string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error) {
key := host.NextSequenceAckKey(portid, channelid)

value, proofBz, proofHeight, err := cc.QueryTendermintProof(ctx, height, key)
if err != nil {
return nil, err
}

// check if next sequence receive exists
if len(value) == 0 {
return nil, sdkerrors.Wrapf(chantypes.ErrChannelNotFound, "portID (%s), channelID (%s)", portid, channelid)
}

sequence := binary.BigEndian.Uint64(value)

return &chantypes.QueryNextSequenceReceiveResponse{
NextSequenceReceive: sequence,
Proof: proofBz,
ProofHeight: proofHeight,
}, nil
}

// QueryPacketCommitment returns the packet commitment proof at a given height
func (cc *CosmosProvider) QueryPacketCommitment(ctx context.Context, height int64, channelid, portid string, seq uint64) (comRes *chantypes.QueryPacketCommitmentResponse, err error) {
key := host.PacketCommitmentKey(portid, channelid, seq)
Expand Down
23 changes: 4 additions & 19 deletions relayer/chains/cosmos/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
rtyAtt = retry.Attempts(rtyAttNum)
rtyDel = retry.Delay(time.Millisecond * 400)
rtyErr = retry.LastErrorOnly(true)
numRegex = regexp.MustCompile("[0-9]+")
accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)")
defaultBroadcastWaitTimeout = 10 * time.Minute
errUnknown = "unknown"
)
Expand Down Expand Up @@ -660,32 +660,17 @@ func (cc *CosmosProvider) handleAccountSequenceMismatchError(sequenceGuard *Wall
panic("sequence guard not configured")
}

sequences := numRegex.FindAllString(err.Error(), -1)
if len(sequences) != 2 {
matches := accountSeqRegex.FindStringSubmatch(err.Error())
if len(matches) == 0 {
return
}
nextSeq, err := strconv.ParseUint(sequences[0], 10, 64)
nextSeq, err := strconv.ParseUint(matches[1], 10, 64)
if err != nil {
return
}
sequenceGuard.NextAccountSequence = nextSeq
}

// handleAccountSequenceMismatchError will parse the error string, e.g.:
// "account sequence mismatch, expected 10, got 9: incorrect account sequence"
// and update the next account sequence with the expected value.
// func (cc *CosmosProvider) handleAccountSequenceMismatchError(err error) {
// sequences := numRegex.FindAllString(err.Error(), -1)
// if len(sequences) != 2 {
// return
// }
// nextSeq, err := strconv.ParseUint(sequences[0], 10, 64)
// if err != nil {
// return
// }
// cc.nextAccountSeq = nextSeq
// }

// MsgCreateClient creates an sdk.Msg to update the client on src with consensus state from dst
func (cc *CosmosProvider) MsgCreateClient(
clientState ibcexported.ClientState,
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/mock/mock_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (mcp *MockChainProcessor) queryCycle(ctx context.Context, persistence *quer

// mocking all channels open
for channelKey := range ibcMessagesCache.PacketFlow {
channelStateCache[channelKey] = true
channelStateCache.SetOpen(channelKey, true, chantypes.NONE)
}

// now pass foundMessages to the path processors
Expand Down
8 changes: 4 additions & 4 deletions relayer/chains/penumbra/message_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,18 @@ func (pcp *PenumbraChainProcessor) handleChannelMessage(eventType string, ci pro
}
}
if !found {
pcp.channelStateCache[channelKey] = false
pcp.channelStateCache.SetOpen(channelKey, false, ci.Order)
}
} else {
switch eventType {
case chantypes.EventTypeChannelOpenTry:
pcp.channelStateCache[channelKey] = false
pcp.channelStateCache.SetOpen(channelKey, false, ci.Order)
case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm:
pcp.channelStateCache[channelKey] = true
pcp.channelStateCache.SetOpen(channelKey, true, ci.Order)
case chantypes.EventTypeChannelCloseConfirm:
for k := range pcp.channelStateCache {
if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID {
pcp.channelStateCache[k] = false
pcp.channelStateCache.SetOpen(channelKey, false, ci.Order)
break
}
}
Expand Down
5 changes: 3 additions & 2 deletions relayer/chains/penumbra/penumbra_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,13 @@ func (pcp *PenumbraChainProcessor) initializeChannelState(ctx context.Context) e
continue
}
pcp.channelConnections[ch.ChannelId] = ch.ConnectionHops[0]
pcp.channelStateCache[processor.ChannelKey{
k := processor.ChannelKey{
ChannelID: ch.ChannelId,
PortID: ch.PortId,
CounterpartyChannelID: ch.Counterparty.ChannelId,
CounterpartyPortID: ch.Counterparty.PortId,
}] = ch.State == chantypes.OPEN
}
pcp.channelStateCache.SetOpen(k, ch.State == chantypes.OPEN, ch.Ordering)
}
return nil
}
Expand Down
23 changes: 23 additions & 0 deletions relayer/chains/penumbra/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,29 @@ func (cc *PenumbraProvider) QueryNextSeqRecv(ctx context.Context, height int64,
}, nil
}

// QueryNextSeqAck returns the next seqAck for a configured channel
func (cc *PenumbraProvider) QueryNextSeqAck(ctx context.Context, height int64, channelid, portid string) (recvRes *chantypes.QueryNextSequenceReceiveResponse, err error) {
key := host.NextSequenceAckKey(portid, channelid)

value, proofBz, proofHeight, err := cc.QueryTendermintProof(ctx, height, key)
if err != nil {
return nil, err
}

// check if next sequence receive exists
if len(value) == 0 {
return nil, sdkerrors.Wrapf(chantypes.ErrChannelNotFound, "portID (%s), channelID (%s)", portid, channelid)
}

sequence := binary.BigEndian.Uint64(value)

return &chantypes.QueryNextSequenceReceiveResponse{
NextSequenceReceive: sequence,
Proof: proofBz,
ProofHeight: proofHeight,
}, nil
}

// QueryPacketCommitment returns the packet commitment proof at a given height
func (cc *PenumbraProvider) QueryPacketCommitment(ctx context.Context, height int64, channelid, portid string, seq uint64) (comRes *chantypes.QueryPacketCommitmentResponse, err error) {
key := host.PacketCommitmentKey(portid, channelid, seq)
Expand Down
9 changes: 8 additions & 1 deletion relayer/processor/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,18 @@ func (mp *messageProcessor) trackAndSendMessages(
var batch []messageToTrack

for _, t := range mp.trackers() {

retries := dst.trackProcessingMessage(t)
if t.assembledMsg() == nil {
continue
}
if broadcastBatch && retries == 0 {

ordered := false
if m, ok := t.(packetMessageToTrack); ok && m.msg.info.ChannelOrder == chantypes.ORDERED.String() {
ordered = true
}

if broadcastBatch && (retries == 0 || ordered) {
batch = append(batch, t)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (pathEnd *pathEndRuntime) shouldSendPacketMessage(message packetIBCMessage,
)
return false
}
if !pathEnd.channelStateCache[k] {
if !pathEnd.channelStateCache[k].Open {
// channel is not open, do not send
pathEnd.log.Warn("Refusing to relay packet message because channel is not open",
zap.String("event_type", eventType),
Expand Down
16 changes: 8 additions & 8 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
interchainQueryTimeout = 60 * time.Second

// Amount of time between flushes if the previous flush failed.
flushFailureRetry = 15 * time.Second
flushFailureRetry = 5 * time.Second

// If message assembly fails from either proof query failure on the source
// or assembling the message for the destination, how many blocks should pass
Expand Down Expand Up @@ -186,12 +186,12 @@ func (pp *PathProcessor) OnConnectionMessage(chainID string, eventType string, o

func (pp *PathProcessor) channelPairs() []channelPair {
// Channel keys are from pathEnd1's perspective
channels := make(map[ChannelKey]bool)
for k, open := range pp.pathEnd1.channelStateCache {
channels[k] = open
channels := make(map[ChannelKey]ChannelState)
for k, cs := range pp.pathEnd1.channelStateCache {
channels[k] = cs
}
for k, open := range pp.pathEnd2.channelStateCache {
channels[k.Counterparty()] = open
for k, cs := range pp.pathEnd2.channelStateCache {
channels[k.Counterparty()] = cs
}
pairs := make([]channelPair, len(channels))
i := 0
Expand Down Expand Up @@ -457,8 +457,8 @@ func (pp *PathProcessor) handleLocalhostData(cacheData ChainProcessorCacheData)
}
}

channelStateCache1 := make(map[ChannelKey]bool)
channelStateCache2 := make(map[ChannelKey]bool)
channelStateCache1 := make(map[ChannelKey]ChannelState)
channelStateCache2 := make(map[ChannelKey]ChannelState)

// split up data and send lower channel-id data to pathEnd2 and higher channel-id data to pathEnd1.
for k, v := range cacheData.ChannelStateCache {
Expand Down
Loading

0 comments on commit ab1c4fc

Please sign in to comment.