Skip to content

Commit

Permalink
clone msg queue to avoid deleting contents
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-cha committed Nov 6, 2024
1 parent c8e4523 commit e3aac98
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 6 deletions.
3 changes: 2 additions & 1 deletion executor/child/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package child

import (
"context"
"slices"
"time"

btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types"
Expand Down Expand Up @@ -57,7 +58,7 @@ func (ch *Child) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs)
}

ch.AppendProcessedMsgs(btypes.ProcessedMsgs{
Msgs: msgQueue[i:end],
Msgs: slices.Clone(msgQueue[i:end]),
Timestamp: time.Now().UnixNano(),
Save: true,
})
Expand Down
3 changes: 2 additions & 1 deletion executor/host/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package host

import (
"context"
"slices"
"time"

"github.com/initia-labs/opinit-bots/types"
Expand Down Expand Up @@ -34,7 +35,7 @@ func (h *Host) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) e
}

h.AppendProcessedMsgs(btypes.ProcessedMsgs{
Msgs: msgQueue[i:end],
Msgs: slices.Clone(msgQueue[i:end]),
Timestamp: time.Now().UnixNano(),
Save: true,
})
Expand Down
3 changes: 2 additions & 1 deletion node/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package broadcaster

import (
"context"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -194,7 +195,7 @@ func (b *Broadcaster) prepareBroadcaster(ctx context.Context, lastBlockTime time
}

b.pendingProcessedMsgs = append(b.pendingProcessedMsgs, btypes.ProcessedMsgs{
Msgs: msgs[i:end],
Msgs: slices.Clone(msgs[i:end]),
Timestamp: time.Now().UnixNano(),
Save: true,
})
Expand Down
5 changes: 3 additions & 2 deletions node/broadcaster/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -111,7 +112,7 @@ func (b *Broadcaster) Start(ctx context.Context) error {
return nil
case data := <-b.txChannel:
var err error
for retry := 1; retry <= 7; retry++ {
for retry := 1; retry <= types.MaxRetryCount; retry++ {
err = b.handleProcessedMsgs(ctx, data)
if err == nil {
break
Expand All @@ -122,7 +123,7 @@ func (b *Broadcaster) Start(ctx context.Context) error {
err = nil
break
}
b.logger.Warn("retry to handle processed msgs after 30 seconds", zap.Int("count", retry), zap.String("error", err.Error()))
b.logger.Warn(fmt.Sprintf("retry to handle processed msgs after %d seconds", int(2*math.Exp2(float64(retry)))), zap.Int("count", retry), zap.String("error", err.Error()))
types.SleepWithRetry(ctx, retry)
}
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion node/broadcaster/types/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,15 @@ func (p *ProcessedMsgs) UnmarshalInterfaceJSON(cdc codec.Codec, data []byte) err

func (p ProcessedMsgs) String() string {
tsStr := time.Unix(0, p.Timestamp).UTC().String()
return fmt.Sprintf("Pending msgs: %s at %s", strings.Join(p.GetMsgTypes(), ","), tsStr)
return fmt.Sprintf("Pending msgs: %s at %s", strings.Join(p.GetMsgStrings(), ","), tsStr)
}

func (p ProcessedMsgs) GetMsgStrings() []string {
msgStrings := make([]string, 0, len(p.Msgs))
for _, msg := range p.Msgs {
msgStrings = append(msgStrings, msg.String())
}
return msgStrings
}

func (p ProcessedMsgs) GetMsgTypes() []string {
Expand Down

0 comments on commit e3aac98

Please sign in to comment.