diff --git a/README.md b/README.md index f09e861..15437d6 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ To ensure compatibility with the node version, check the following versions: | L1 Node | MiniMove | MiniWasm | MiniEVM | | ------- | -------- | -------- | ------- | -| v0.4.1 | v0.4.1 | v0.4.1 | v0.4.1 | +| v0.4.2 | v0.4.0 | v0.4.0 | v0.4.0 | ### Build and Configure @@ -64,6 +64,11 @@ To reset the bot database, use the following command: opinitd reset-db [bot-name] ``` +### Query status +```bash +curl localhost:3000/status +``` + ### Query withdrawals ```bash curl localhost:3000/withdrawal/{sequence} | jq . > ./withdrawal-info.json diff --git a/bot/types/bot.go b/bot/types/bot.go index 1b1e7f3..0798954 100644 --- a/bot/types/bot.go +++ b/bot/types/bot.go @@ -5,6 +5,7 @@ import ( ) type Bot interface { + Initialize(context.Context) error Start(context.Context) error Close() } diff --git a/cmd/opinitd/reset.go b/cmd/opinitd/reset.go index 9513bb9..20d1a35 100644 --- a/cmd/opinitd/reset.go +++ b/cmd/opinitd/reset.go @@ -25,7 +25,7 @@ func resetDBCmd(ctx *cmdContext) *cobra.Command { if err != nil { return err } - err = os.Remove(path.Join(ctx.homePath, "batch")) + err = os.RemoveAll(path.Join(ctx.homePath, "batch")) if err != nil { return err } diff --git a/cmd/opinitd/start.go b/cmd/opinitd/start.go index d680a89..ab3c4d6 100644 --- a/cmd/opinitd/start.go +++ b/cmd/opinitd/start.go @@ -6,11 +6,18 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" "github.com/initia-labs/opinit-bots-go/bot" bottypes "github.com/initia-labs/opinit-bots-go/bot/types" + "github.com/initia-labs/opinit-bots-go/types" +) + +const ( + flagPollingInterval = "polling-interval" ) func startCmd(ctx *cmdContext) *cobra.Command { @@ -37,11 +44,23 @@ Currently supported bots: cmdCtx, botDone := context.WithCancel(cmd.Context()) gracefulShutdown(botDone) - return bot.Start(cmdCtx) + errGrp, ctx := errgroup.WithContext(cmdCtx) + ctx = types.WithErrGrp(ctx, errGrp) + interval, err := cmd.Flags().GetDuration(flagPollingInterval) + if err != nil { + return err + } + ctx = types.WithPollingInterval(ctx, interval) + err = bot.Initialize(ctx) + if err != nil { + return err + } + return bot.Start(ctx) }, } cmd = configFlag(ctx.v, cmd) + cmd.Flags().Duration(flagPollingInterval, 100*time.Millisecond, "Polling interval in milliseconds") return cmd } diff --git a/executor/README.md b/executor/README.md index c36735b..580f815 100644 --- a/executor/README.md +++ b/executor/README.md @@ -14,50 +14,126 @@ To configure the Executor, fill in the values in the `~/.opinit/executor.json` f { // Version is the version used to build output root. "version": 1, - // ListenAddress is the address to listen for incoming requests. "listen_address": "localhost:3000", - - "l1_rpc_address": "tcp://localhost:26657", - "l2_rpc_address": "tcp://localhost:27657", - "da_rpc_address": "tcp://localhost:27657", - - "l1_gas_price": "0.15uinit", - "l2_gas_price": "", - "da_gas_price": "", - - "l1_chain_id": "testnet-l1-1", - "l2_chain_id": "testnet-l2-1", - "da_chain_id": "testnet-da-1", - - "l1_bech32_prefix": "init", - "l2_bech32_prefix": "init", - "da_bech32_prefix": "init", - + "l1_node": { + "chain_id": "testnet-l1-1", + "bech32_prefix": "init", + "rpc_address": "tcp://localhost:26657", + "gas_price": "0.15uinit", + "gas_adjustment": 1.5, + "tx_timeout": 60 + }, + "l2_node": { + "chain_id": "testnet-l2-1", + "bech32_prefix": "init", + "rpc_address": "tcp://localhost:27657", + "gas_price": "", + "gas_adjustment": 1.5, + "tx_timeout": 60 + }, + "da_node": { + "chain_id": "testnet-l1-1", + "bech32_prefix": "init", + "rpc_address": "tcp://localhost:26657", + "gas_price": "0.15uinit", + "gas_adjustment": 1.5, + "tx_timeout": 60 + }, // OutputSubmitter is the key name in the keyring for the output submitter, // which is used to relay the output transaction from l2 to l1. // // If you don't want to use the output submitter feature, you can leave it empty. - "output_submitter": "output_submitter", + "output_submitter": "", // BridgeExecutor is the key name in the keyring for the bridge executor, // which is used to relay initiate token bridge transaction from l1 to l2. // // If you don't want to use the bridge executor feature, you can leave it empty. - "bridge_executor": "bridge_executor", - + "bridge_executor": "", // RelayOracle is the flag to enable the oracle relay feature. "relay_oracle": true, - // MaxChunks is the maximum number of chunks in a batch. "max_chunks": 5000, // MaxChunkSize is the maximum size of a chunk in a batch. "max_chunk_size": 300000, // MaxSubmissionTime is the maximum time to submit a batch. - "max_submission_time": 3600, // seconds + "max_submission_time": 3600, + // L2StartHeight is the height to start the l2 node. If it is 0, it will start from the latest height. + // If the latest height stored in the db is not 0, this config is ignored. + // L2 starts from the last submitted output l2 block number + 1 before L2StartHeight. + // L1 starts from the block number of the output tx + 1 + "l2_start_height": 0, + // StartBatchHeight is the height to start the batch. If it is 0, it will start from the latest height. + // If the latest height stored in the db is not 0, this config is ignored. + "batch_start_height": 0 } ``` +### Start height config examples +If the latest height stored in the db is not 0, start height config is ignored. + +``` +Output tx 1 +- L1BlockNumber: 10 +- L2BlockNumber: 100 + +Output tx 2 +- L1BlockNumber: 20 +- L2BlockNumber: 200 + +InitializeTokenDeposit tx 1 +- Height: 5 +- L1Sequence: 1 + +InitializeTokenDeposit tx 2 +- Height: 15 +- L1Sequence: 2 + +FinalizedTokenDeposit tx 1 +- L1Sequence: 1 + +FinalizedTokenDeposit tx 2 +- L1Sequence: 2 +``` + +#### Config 1 +```json +{ + l2_start_height: 150, + batch_start_height: 0 +} +``` +When Child's last l1 Sequence is `2`, +- L1 starts from the height 10 + 1 = 11 +- L2 starts from the height 100 + 1 = 101 +- Batch starts from the height 1 + +#### Config 2 +```json +{ + l2_start_height: 150, + batch_start_height: 150 +} +``` +When Child's last l1 Sequence is `2`, +- L1 starts from the height 10 + 1 = 11 +- L2 starts from the height 100 + 1 = 101 +- Batch starts from the height 150 + +#### Config 3 +```json +{ + l2_start_height: 150, + batch_start_height: 150 +} +``` +When Child's last l1 Sequence is `1`, +- L1 starts from the height 5 + 1 = 6 +- L2 starts from the height 100 + 1 = 101 +- Batch starts from the height 150 + + ## Handler rules for the components of the Executor For registered events or tx handlers, work processed in a block is atomically saved as ProcessedMsg. Therfore, if ProcessedMsgs or Txs cannot be processed due to an interrupt or error, it is guaranteed to be read from the DB and processed. @@ -149,10 +225,11 @@ If the batch info registered in the chain is changed to change the account or DA ```go { - DARPCAddress string `json:"da_rpc_address"` - DAGasPrice string `json:"da_gas_price"` - DAChainID string `json:"da_chain_id"` - DABech32Prefix string `json:"da_bech32_prefix"` + RPCAddress string `json:"rpc_address"` + GasPrice string `json:"gas_price"` + GasAdjustment string `json:"gas_adjustment"` + ChainID string `json:"chain_id"` + Bech32Prefix string `json:"bech32_prefix"` } ``` ## Sync from the beginning diff --git a/executor/batch/batch.go b/executor/batch/batch.go index b922996..d43231d 100644 --- a/executor/batch/batch.go +++ b/executor/batch/batch.go @@ -23,7 +23,7 @@ import ( ) type hostNode interface { - QueryBatchInfos() (*ophosttypes.QueryBatchInfosResponse, error) + QueryBatchInfos(context.Context, uint64) (*ophosttypes.QueryBatchInfosResponse, error) } type compressionFunc interface { @@ -62,6 +62,9 @@ type BatchSubmitter struct { homePath string lastSubmissionTime time.Time + + // status info + LastBatchEndBlockNumber uint64 } func NewBatchSubmitter( @@ -104,11 +107,15 @@ func NewBatchSubmitter( return ch } -func (bs *BatchSubmitter) Initialize(host hostNode, bridgeInfo opchildtypes.BridgeInfo) error { +func (bs *BatchSubmitter) Initialize(ctx context.Context, startHeight uint64, host hostNode, bridgeInfo opchildtypes.BridgeInfo) error { + err := bs.node.Initialize(startHeight) + if err != nil { + return err + } bs.host = host bs.bridgeInfo = bridgeInfo - res, err := bs.host.QueryBatchInfos() + res, err := bs.host.QueryBatchInfos(ctx, bridgeInfo.BridgeId) if err != nil { return err } @@ -123,7 +130,13 @@ func (bs *BatchSubmitter) Initialize(host hostNode, bridgeInfo opchildtypes.Brid bs.DequeueBatchInfo() } - bs.batchFile, err = os.OpenFile(bs.homePath+"/batch", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) + fileFlag := os.O_CREATE | os.O_RDWR + // if the node has already processed blocks, append to the file + if !bs.node.HeightInitialized() { + fileFlag |= os.O_APPEND + } + + bs.batchFile, err = os.OpenFile(bs.homePath+"/batch", fileFlag, 0666) if err != nil { return err } diff --git a/executor/batch/handler.go b/executor/batch/handler.go index b17f6b2..e1b0ce2 100644 --- a/executor/batch/handler.go +++ b/executor/batch/handler.go @@ -2,6 +2,7 @@ package batch import ( "compress/gzip" + "context" "crypto/sha256" "encoding/json" "fmt" @@ -9,12 +10,14 @@ import ( "time" "github.com/pkg/errors" + "go.uber.org/zap" cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" + sdk "github.com/cosmos/cosmos-sdk/types" + ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" executortypes "github.com/initia-labs/opinit-bots-go/executor/types" @@ -23,7 +26,7 @@ import ( "github.com/initia-labs/opinit-bots-go/types" ) -func (bs *BatchSubmitter) rawBlockHandler(args nodetypes.RawBlockArgs) error { +func (bs *BatchSubmitter) rawBlockHandler(ctx context.Context, args nodetypes.RawBlockArgs) error { if len(bs.processedMsgs) != 0 { panic("must not happen, msgQueue should be empty") } @@ -34,7 +37,7 @@ func (bs *BatchSubmitter) rawBlockHandler(args nodetypes.RawBlockArgs) error { return errors.Wrap(err, "failed to unmarshal block") } - err = bs.prepareBatch(args.BlockHeight, pbb.Header.Time) + err = bs.prepareBatch(ctx, args.BlockHeight, pbb.Header.Time) if err != nil { return errors.Wrap(err, "failed to prepare batch") } @@ -79,7 +82,7 @@ func (bs *BatchSubmitter) rawBlockHandler(args nodetypes.RawBlockArgs) error { return nil } -func (bs *BatchSubmitter) prepareBatch(blockHeight uint64, blockTime time.Time) error { +func (bs *BatchSubmitter) prepareBatch(ctx context.Context, blockHeight uint64, blockTime time.Time) error { // check whether the requested block height is reached to the l2 block number of the next batch info. if nextBatchInfo := bs.NextBatchInfo(); nextBatchInfo != nil && nextBatchInfo.Output.L2BlockNumber < blockHeight { // if the next batch info is reached, finalize the current batch and update the batch info. @@ -120,13 +123,14 @@ func (bs *BatchSubmitter) prepareBatch(blockHeight uint64, blockTime time.Time) return nil } - err := bs.finalizeBatch(blockHeight) + err := bs.finalizeBatch(ctx, blockHeight) if err != nil { return errors.Wrap(err, "failed to finalize batch") } // update last submission time bs.lastSubmissionTime = blockTime + bs.LastBatchEndBlockNumber = blockHeight } // reset batch header @@ -152,10 +156,10 @@ func (bs *BatchSubmitter) handleBatch(blockBytes []byte) error { } // finalize batch and create batch messages -func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error { +func (bs *BatchSubmitter) finalizeBatch(ctx context.Context, blockHeight uint64) error { // write last block's commit to batch file - rawCommit, err := bs.node.GetRPCClient().QueryRawCommit(int64(blockHeight)) + rawCommit, err := bs.node.GetRPCClient().QueryRawCommit(ctx, int64(blockHeight)) if err != nil { return errors.Wrap(err, "failed to query raw commit") } @@ -225,13 +229,19 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error { return err } + bs.logger.Info("finalize batch", + zap.Uint64("height", blockHeight), + zap.Uint64("batch end", bs.batchHeader.End), + zap.Int("chunks", len(checksums)), + zap.Int("txs", len(bs.processedMsgs)), + ) return nil } func (bs *BatchSubmitter) checkBatch(blockHeight uint64, blockTime time.Time) error { - info, err := bs.batchFile.Stat() + fileSize, err := bs.batchFileSize() if err != nil { - return errors.Wrap(err, "failed to get batch file stat") + return err } // if the block time is after the last submission time + submission interval * 2/3 @@ -240,9 +250,7 @@ func (bs *BatchSubmitter) checkBatch(blockHeight uint64, blockTime time.Time) er // then finalize the batch if blockTime.After(bs.lastSubmissionTime.Add(bs.bridgeInfo.BridgeConfig.SubmissionInterval*2/3)) || blockTime.After(bs.lastSubmissionTime.Add(time.Duration(bs.batchCfg.MaxSubmissionTime)*time.Second)) || - info.Size() > (bs.batchCfg.MaxChunks-1)*bs.batchCfg.MaxChunkSize { - - // finalize the batch + fileSize > (bs.batchCfg.MaxChunks-1)*bs.batchCfg.MaxChunkSize { // finalize the batch bs.batchHeader.End = blockHeight @@ -251,11 +259,27 @@ func (bs *BatchSubmitter) checkBatch(blockHeight uint64, blockTime time.Time) er return nil } +func (bs *BatchSubmitter) batchFileSize() (int64, error) { + if bs.batchFile == nil { + return 0, errors.New("batch file is not initialized") + } + info, err := bs.batchFile.Stat() + if err != nil { + return 0, errors.Wrap(err, "failed to get batch file stat") + } + return info.Size(), nil +} + // UpdateBatchInfo appends the batch info with the given chain, submitter, output index, and l2 block number func (bs *BatchSubmitter) UpdateBatchInfo(chain string, submitter string, outputIndex uint64, l2BlockNumber uint64) { bs.batchInfoMu.Lock() defer bs.batchInfoMu.Unlock() + // check if the batch info is already updated + if bs.batchInfos[len(bs.batchInfos)-1].Output.L2BlockNumber >= l2BlockNumber { + return + } + bs.batchInfos = append(bs.batchInfos, ophosttypes.BatchInfoWithOutput{ BatchInfo: ophosttypes.BatchInfo{ ChainType: ophosttypes.BatchInfo_ChainType(ophosttypes.BatchInfo_ChainType_value["CHAIN_TYPE_"+chain]), diff --git a/executor/batch/status.go b/executor/batch/status.go new file mode 100644 index 0000000..8d30b09 --- /dev/null +++ b/executor/batch/status.go @@ -0,0 +1,28 @@ +package batch + +import ( + "time" + + ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" +) + +type Status struct { + Node nodetypes.Status `json:"node"` + BatchInfo ophosttypes.BatchInfo `json:"batch_info"` + CurrentBatchFileSize int64 `json:"current_batch_file_size"` + LastBatchEndBlockNumber uint64 `json:"last_batch_end_block_number"` + LastBatchSubmissionTime time.Time `json:"last_batch_submission_time"` +} + +func (bs BatchSubmitter) GetStatus() Status { + fileSize, _ := bs.batchFileSize() + + return Status{ + Node: bs.node.GetStatus(), + BatchInfo: bs.BatchInfo().BatchInfo, + CurrentBatchFileSize: fileSize, + LastBatchEndBlockNumber: bs.LastBatchEndBlockNumber, + LastBatchSubmissionTime: bs.lastSubmissionTime, + } +} diff --git a/executor/batch/utils.go b/executor/batch/utils.go index bee9b9b..bc90124 100644 --- a/executor/batch/utils.go +++ b/executor/batch/utils.go @@ -4,9 +4,10 @@ import ( "encoding/binary" cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" + sdk "github.com/cosmos/cosmos-sdk/types" + opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" "github.com/initia-labs/opinit-bots-go/txutils" ) diff --git a/executor/celestia/celestia.go b/executor/celestia/celestia.go index b2e5a4b..8e64639 100644 --- a/executor/celestia/celestia.go +++ b/executor/celestia/celestia.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" "github.com/cometbft/cometbft/crypto/merkle" + "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" @@ -93,9 +94,13 @@ func createCodec(bech32Prefix string) (codec.Codec, client.TxConfig, error) { } func (c *Celestia) Initialize(batch batchNode, bridgeId int64) error { + err := c.node.Initialize(0) + if err != nil { + return err + } + c.batch = batch c.bridgeId = bridgeId - var err error c.namespace, err = sh.NewV0Namespace(c.NamespaceID()) if err != nil { return err diff --git a/executor/celestia/handler.go b/executor/celestia/handler.go index bb4ea3b..476eaee 100644 --- a/executor/celestia/handler.go +++ b/executor/celestia/handler.go @@ -1,11 +1,13 @@ package celestia import ( + "context" + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" "go.uber.org/zap" ) -func (c *Celestia) payForBlobsHandler(args nodetypes.EventHandlerArgs) error { +func (c *Celestia) payForBlobsHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { var signer string var blobSizes string var namespaces string diff --git a/executor/celestia/status.go b/executor/celestia/status.go new file mode 100644 index 0000000..d9885de --- /dev/null +++ b/executor/celestia/status.go @@ -0,0 +1,9 @@ +package celestia + +import ( + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" +) + +func (c Celestia) GetNodeStatus() nodetypes.Status { + return c.node.GetStatus() +} diff --git a/executor/child/child.go b/executor/child/child.go index cb08438..e885593 100644 --- a/executor/child/child.go +++ b/executor/child/child.go @@ -2,6 +2,7 @@ package child import ( "context" + "sync" "time" "go.uber.org/zap" @@ -29,8 +30,8 @@ type hostNode interface { HasKey() bool BroadcastMsgs(btypes.ProcessedMsgs) ProcessedMsgsToRawKV([]btypes.ProcessedMsgs, bool) ([]types.RawKV, error) - QueryLastOutput() (*ophosttypes.QueryOutputProposalResponse, error) - QueryOutput(uint64) (*ophosttypes.QueryOutputProposalResponse, error) + QueryLastOutput(context.Context, uint64) (*ophosttypes.QueryOutputProposalResponse, error) + QueryOutput(context.Context, uint64, uint64) (*ophosttypes.QueryOutputProposalResponse, error) GetMsgProposeOutput( bridgeId uint64, @@ -52,6 +53,9 @@ type Child struct { nextOutputTime time.Time finalizingBlockHeight uint64 + initializeTree *sync.Once + initializeTreeFn func() error + cfg nodetypes.NodeConfig db types.DB logger *zap.Logger @@ -60,6 +64,12 @@ type Child struct { processedMsgs []btypes.ProcessedMsgs msgQueue []sdk.Msg + + // status info + lastUpdatedOracleL1Height uint64 + lastFinalizedDepositL1BlockHeight uint64 + lastFinalizedDepositL1Sequence uint64 + lastOutputTime time.Time } func NewChild( @@ -87,6 +97,8 @@ func NewChild( node: node, mk: mk, + initializeTree: &sync.Once{}, + cfg: cfg, db: db, logger: logger, @@ -109,10 +121,24 @@ func GetCodec(bech32Prefix string) (codec.Codec, client.TxConfig, error) { }) } -func (ch *Child) Initialize(host hostNode, bridgeInfo opchildtypes.BridgeInfo) error { +func (ch *Child) Initialize(startHeight uint64, startOutputIndex uint64, host hostNode, bridgeInfo opchildtypes.BridgeInfo) error { + err := ch.node.Initialize(startHeight) + if err != nil { + return err + } + + if startOutputIndex != 0 { + ch.initializeTreeFn = func() error { + ch.logger.Info("initialize tree", zap.Uint64("index", startOutputIndex)) + err := ch.mk.InitializeWorkingTree(startOutputIndex, 1) + if err != nil { + return err + } + return nil + } + } ch.host = host ch.bridgeInfo = bridgeInfo - ch.registerHandlers() return nil } diff --git a/executor/child/deposit.go b/executor/child/deposit.go index 3ac31c7..b306093 100644 --- a/executor/child/deposit.go +++ b/executor/child/deposit.go @@ -1,18 +1,20 @@ package child import ( + "context" "fmt" "strconv" "cosmossdk.io/math" - sdk "github.com/cosmos/cosmos-sdk/types" opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" + sdk "github.com/cosmos/cosmos-sdk/types" + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" "go.uber.org/zap" ) -func (ch *Child) finalizeDepositHandler(args nodetypes.EventHandlerArgs) error { +func (ch *Child) finalizeDepositHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { var l1BlockHeight, l1Sequence uint64 var from, to, baseDenom string var amount sdk.Coin @@ -48,6 +50,8 @@ func (ch *Child) finalizeDepositHandler(args nodetypes.EventHandlerArgs) error { } } ch.handleFinalizeDeposit(l1BlockHeight, l1Sequence, from, to, amount, baseDenom) + ch.lastFinalizedDepositL1BlockHeight = l1BlockHeight + ch.lastFinalizedDepositL1Sequence = l1Sequence return nil } diff --git a/executor/child/handler.go b/executor/child/handler.go index ac1416d..11cca8b 100644 --- a/executor/child/handler.go +++ b/executor/child/handler.go @@ -1,6 +1,7 @@ package child import ( + "context" "time" btypes "github.com/initia-labs/opinit-bots-go/node/broadcaster/types" @@ -8,7 +9,7 @@ import ( "github.com/initia-labs/opinit-bots-go/types" ) -func (ch *Child) beginBlockHandler(args nodetypes.BeginBlockArgs) (err error) { +func (ch *Child) beginBlockHandler(ctx context.Context, args nodetypes.BeginBlockArgs) (err error) { blockHeight := uint64(args.Block.Header.Height) // just to make sure that childMsgQueue is empty if blockHeight == args.LatestHeight && len(ch.msgQueue) != 0 && len(ch.processedMsgs) != 0 { @@ -20,14 +21,14 @@ func (ch *Child) beginBlockHandler(args nodetypes.BeginBlockArgs) (err error) { return err } - err = ch.prepareOutput() + err = ch.prepareOutput(ctx) if err != nil { return err } return nil } -func (ch *Child) endBlockHandler(args nodetypes.EndBlockArgs) error { +func (ch *Child) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) error { blockHeight := uint64(args.Block.Header.Height) batchKVs := make([]types.RawKV, 0) treeKVs, storageRoot, err := ch.handleTree(blockHeight, args.LatestHeight, args.BlockID, args.Block.Header) diff --git a/executor/child/msgs.go b/executor/child/msgs.go index acc74ea..82e1513 100644 --- a/executor/child/msgs.go +++ b/executor/child/msgs.go @@ -1,8 +1,9 @@ package child import ( - sdk "github.com/cosmos/cosmos-sdk/types" opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" + + sdk "github.com/cosmos/cosmos-sdk/types" ) func (ch Child) GetMsgFinalizeTokenDeposit( diff --git a/executor/child/oracle.go b/executor/child/oracle.go index 87b42a2..1165d86 100644 --- a/executor/child/oracle.go +++ b/executor/child/oracle.go @@ -1,6 +1,7 @@ package child import ( + "context" "strconv" opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" @@ -9,7 +10,7 @@ import ( "go.uber.org/zap" ) -func (ch *Child) updateOracleHandler(args nodetypes.EventHandlerArgs) error { +func (ch *Child) updateOracleHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { var l1BlockHeight uint64 var from string var err error @@ -27,7 +28,7 @@ func (ch *Child) updateOracleHandler(args nodetypes.EventHandlerArgs) error { } ch.handleUpdateOracle(l1BlockHeight, from) - + ch.lastUpdatedOracleL1Height = l1BlockHeight return nil } diff --git a/executor/child/query.go b/executor/child/query.go index daca71f..32582ba 100644 --- a/executor/child/query.go +++ b/executor/child/query.go @@ -1,12 +1,14 @@ package child import ( + "context" "encoding/json" "cosmossdk.io/math" - sdk "github.com/cosmos/cosmos-sdk/types" opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" + sdk "github.com/cosmos/cosmos-sdk/types" + executortypes "github.com/initia-labs/opinit-bots-go/executor/types" "github.com/initia-labs/opinit-bots-go/node/rpcclient" ) @@ -19,9 +21,9 @@ func (ch Child) GetAddressStr() (string, error) { return ch.node.MustGetBroadcaster().GetAddressString() } -func (ch Child) QueryBridgeInfo() (opchildtypes.BridgeInfo, error) { +func (ch Child) QueryBridgeInfo(ctx context.Context) (opchildtypes.BridgeInfo, error) { req := &opchildtypes.QueryBridgeInfoRequest{} - ctx, cancel := rpcclient.GetQueryContext(0) + ctx, cancel := rpcclient.GetQueryContext(ctx, 0) defer cancel() res, err := ch.opchildQueryClient.BridgeInfo(ctx, req) @@ -31,9 +33,9 @@ func (ch Child) QueryBridgeInfo() (opchildtypes.BridgeInfo, error) { return res.BridgeInfo, nil } -func (ch Child) QueryNextL1Sequence() (uint64, error) { +func (ch Child) QueryNextL1Sequence(ctx context.Context) (uint64, error) { req := &opchildtypes.QueryNextL1SequenceRequest{} - ctx, cancel := rpcclient.GetQueryContext(0) + ctx, cancel := rpcclient.GetQueryContext(ctx, 0) defer cancel() res, err := ch.opchildQueryClient.NextL1Sequence(ctx, req) @@ -43,9 +45,9 @@ func (ch Child) QueryNextL1Sequence() (uint64, error) { return res.NextL1Sequence, nil } -func (ch Child) QueryNextL2Sequence(height uint64) (uint64, error) { +func (ch Child) QueryNextL2Sequence(ctx context.Context, height uint64) (uint64, error) { req := &opchildtypes.QueryNextL2SequenceRequest{} - ctx, cancel := rpcclient.GetQueryContext(height) + ctx, cancel := rpcclient.GetQueryContext(ctx, height) defer cancel() res, err := ch.opchildQueryClient.NextL2Sequence(ctx, req) diff --git a/executor/child/status.go b/executor/child/status.go new file mode 100644 index 0000000..5480d8e --- /dev/null +++ b/executor/child/status.go @@ -0,0 +1,31 @@ +package child + +import ( + "time" + + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" +) + +type Status struct { + Node nodetypes.Status `json:"node"` + LastUpdatedOracleL1Height uint64 `json:"last_updated_oracle_height"` + LastFinalizedDepositL1BlockHeight uint64 `json:"last_finalized_deposit_l1_block_height"` + LastFinalizedDepositL1Sequence uint64 `json:"last_finalized_deposit_l1_sequence"` + LastWithdrawalL2Sequence uint64 `json:"last_withdrawal_l2_sequence"` + WorkingTreeIndex uint64 `json:"working_tree_index"` + LastOutputSubmissionTime time.Time `json:"last_output_submission_time"` + NextOutputSubmissionTime time.Time `json:"next_output_submission_time"` +} + +func (ch Child) GetStatus() Status { + return Status{ + Node: ch.node.GetStatus(), + LastUpdatedOracleL1Height: ch.lastUpdatedOracleL1Height, + LastFinalizedDepositL1BlockHeight: ch.lastFinalizedDepositL1BlockHeight, + LastFinalizedDepositL1Sequence: ch.lastFinalizedDepositL1Sequence, + LastWithdrawalL2Sequence: ch.mk.GetWorkingTreeLeafCount() + ch.mk.GetStartLeafIndex(), + WorkingTreeIndex: ch.mk.GetWorkingTreeIndex(), + LastOutputSubmissionTime: ch.lastOutputTime, + NextOutputSubmissionTime: ch.nextOutputTime, + } +} diff --git a/executor/child/withdraw.go b/executor/child/withdraw.go index 3780276..a015659 100644 --- a/executor/child/withdraw.go +++ b/executor/child/withdraw.go @@ -1,6 +1,7 @@ package child import ( + "context" "encoding/base64" "encoding/json" "fmt" @@ -19,7 +20,7 @@ import ( nodetypes "github.com/initia-labs/opinit-bots-go/node/types" ) -func (ch *Child) initiateWithdrawalHandler(args nodetypes.EventHandlerArgs) error { +func (ch *Child) initiateWithdrawalHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { var l2Sequence, amount uint64 var from, to, baseDenom string var err error @@ -86,14 +87,17 @@ func (ch *Child) handleInitiateWithdrawal(l2Sequence uint64, from string, to str } func (ch *Child) prepareTree(blockHeight uint64) error { - if blockHeight == 1 { - return ch.mk.InitializeWorkingTree(1, 1) + if ch.initializeTreeFn != nil { + var err error + ch.initializeTree.Do(func() { + err = ch.initializeTreeFn() + }) + return err } err := ch.mk.LoadWorkingTree(blockHeight - 1) if err == dbtypes.ErrNotFound { // must not happened - // TODO: if user want to start from a specific height, we need to provide a way to do so panic(fmt.Errorf("working tree not found at height: %d, current: %d", blockHeight-1, blockHeight)) } else if err != nil { return err @@ -102,12 +106,12 @@ func (ch *Child) prepareTree(blockHeight uint64) error { return nil } -func (ch *Child) prepareOutput() error { +func (ch *Child) prepareOutput(ctx context.Context) error { workingOutputIndex := ch.mk.GetWorkingTreeIndex() // initialize next output time if ch.nextOutputTime.IsZero() && workingOutputIndex > 1 { - output, err := ch.host.QueryOutput(workingOutputIndex - 1) + output, err := ch.host.QueryOutput(ctx, ch.BridgeId(), workingOutputIndex-1) if err != nil { // TODO: maybe not return error here and roll back return fmt.Errorf("output does not exist at index: %d", workingOutputIndex-1) @@ -116,7 +120,7 @@ func (ch *Child) prepareOutput() error { ch.nextOutputTime = output.OutputProposal.L1BlockTime.Add(ch.bridgeInfo.BridgeConfig.SubmissionInterval * 2 / 3) } - output, err := ch.host.QueryOutput(ch.mk.GetWorkingTreeIndex()) + output, err := ch.host.QueryOutput(ctx, ch.BridgeId(), ch.mk.GetWorkingTreeIndex()) if err != nil { if strings.Contains(err.Error(), "collections: not found") { return nil @@ -163,6 +167,7 @@ func (ch *Child) handleTree(blockHeight uint64, latestHeight uint64, blockId []b ch.finalizingBlockHeight = 0 ch.nextOutputTime = blockHeader.Time.Add(ch.bridgeInfo.BridgeConfig.SubmissionInterval * 2 / 3) + ch.lastOutputTime = blockHeader.Time } err = ch.mk.SaveWorkingTree(blockHeight) diff --git a/executor/executor.go b/executor/executor.go index 06f1ea6..b561282 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -20,7 +20,6 @@ import ( ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" "github.com/initia-labs/opinit-bots-go/types" "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) var _ bottypes.Bot = &Executor{} @@ -47,22 +46,22 @@ func NewExecutor(cfg *executortypes.Config, db types.DB, sv *server.Server, logg panic(err) } - executor := &Executor{ + return &Executor{ host: host.NewHost( cfg.Version, cfg.RelayOracle, cfg.L1NodeConfig(homePath), db.WithPrefix([]byte(executortypes.HostNodeName)), - logger.Named(executortypes.HostNodeName), cfg.L1Bech32Prefix, "", + logger.Named(executortypes.HostNodeName), cfg.L1Node.Bech32Prefix, "", ), child: child.NewChild( cfg.Version, cfg.L2NodeConfig(homePath), db.WithPrefix([]byte(executortypes.ChildNodeName)), - logger.Named(executortypes.ChildNodeName), cfg.L2Bech32Prefix, + logger.Named(executortypes.ChildNodeName), cfg.L2Node.Bech32Prefix, ), batch: batch.NewBatchSubmitter( cfg.Version, cfg.L2NodeConfig(homePath), cfg.BatchConfig(), db.WithPrefix([]byte(executortypes.BatchNodeName)), - logger.Named(executortypes.BatchNodeName), cfg.L2ChainID, homePath, - cfg.DABech32Prefix, + logger.Named(executortypes.BatchNodeName), cfg.L2Node.ChainID, homePath, + cfg.L2Node.Bech32Prefix, ), cfg: cfg, @@ -72,52 +71,58 @@ func NewExecutor(cfg *executortypes.Config, db types.DB, sv *server.Server, logg homePath: homePath, } +} - bridgeInfo, err := executor.child.QueryBridgeInfo() +func (ex *Executor) Initialize(ctx context.Context) error { + bridgeInfo, err := ex.child.QueryBridgeInfo(ctx) if err != nil { - panic(err) + return err } if bridgeInfo.BridgeId == 0 { - panic("bridge info is not set") + return errors.New("bridge info is not set") } - executor.logger.Info( + ex.logger.Info( "bridge info", zap.Uint64("id", bridgeInfo.BridgeId), zap.Duration("submission_interval", bridgeInfo.BridgeConfig.SubmissionInterval), ) - err = executor.host.Initialize(executor.child, executor.batch, int64(bridgeInfo.BridgeId)) + hostStartHeight, childStartHeight, startOutputIndex, batchStartHeight, err := ex.getStartHeights(ctx, bridgeInfo.BridgeId) if err != nil { - panic(err) + return err } - err = executor.child.Initialize(executor.host, bridgeInfo) + + err = ex.host.Initialize(ctx, hostStartHeight, ex.child, ex.batch, int64(bridgeInfo.BridgeId)) if err != nil { - panic(err) + return err } - err = executor.batch.Initialize(executor.host, bridgeInfo) + err = ex.child.Initialize(childStartHeight, startOutputIndex, ex.host, bridgeInfo) if err != nil { - panic(err) + return err + } + err = ex.batch.Initialize(ctx, batchStartHeight, ex.host, bridgeInfo) + if err != nil { + return err } - da, err := executor.makeDANode(int64(bridgeInfo.BridgeId)) + da, err := ex.makeDANode(int64(bridgeInfo.BridgeId)) if err != nil { - panic(err) + return err } - err = executor.batch.SetDANode(da) + err = ex.batch.SetDANode(da) if err != nil { - panic(err) + return err } - executor.RegisterQuerier() - return executor + ex.RegisterQuerier() + return nil } -func (ex *Executor) Start(cmdCtx context.Context) error { +func (ex *Executor) Start(ctx context.Context) error { defer ex.Close() - errGrp, ctx := errgroup.WithContext(cmdCtx) - ctx = context.WithValue(ctx, types.ContextKeyErrGrp, errGrp) + errGrp := types.ErrGrp(ctx) errGrp.Go(func() (err error) { <-ctx.Done() return ex.server.Shutdown() @@ -158,14 +163,7 @@ func (ex *Executor) RegisterQuerier() { }) ex.server.RegisterQuerier("/status", func(c *fiber.Ctx) error { - childHeight := ex.child.GetHeight() - hostHeight := ex.host.GetHeight() - res := map[string]uint64{ - "child": childHeight, - "host": hostHeight, - } - - return c.JSON(res) + return c.JSON(ex.GetStatus()) }) } @@ -177,7 +175,7 @@ func (ex *Executor) makeDANode(bridgeId int64) (executortypes.DANode, error) { ex.cfg.Version, false, ex.cfg.DANodeConfig(ex.homePath), ex.db.WithPrefix([]byte(executortypes.DAHostNodeName)), ex.logger.Named(executortypes.DAHostNodeName), - ex.cfg.DABech32Prefix, batchInfo.BatchInfo.Submitter, + ex.cfg.DANode.Bech32Prefix, batchInfo.BatchInfo.Submitter, ) if ex.host.GetAddress().Equals(da.GetAddress()) { return ex.host, nil @@ -189,7 +187,7 @@ func (ex *Executor) makeDANode(bridgeId int64) (executortypes.DANode, error) { da := celestia.NewDACelestia(ex.cfg.Version, ex.cfg.DANodeConfig(ex.homePath), ex.db.WithPrefix([]byte(executortypes.DACelestiaNodeName)), ex.logger.Named(executortypes.DACelestiaNodeName), - ex.cfg.DABech32Prefix, batchInfo.BatchInfo.Submitter, + ex.cfg.DANode.Bech32Prefix, batchInfo.BatchInfo.Submitter, ) err := da.Initialize(ex.batch, bridgeId) if err != nil { @@ -201,3 +199,39 @@ func (ex *Executor) makeDANode(bridgeId int64) (executortypes.DANode, error) { return nil, fmt.Errorf("unsupported chain id for DA: %s", ophosttypes.BatchInfo_ChainType_name[int32(batchInfo.BatchInfo.ChainType)]) } + +func (ex *Executor) getStartHeights(ctx context.Context, bridgeId uint64) (l1StartHeight uint64, l2StartHeight uint64, startOutputIndex uint64, batchStartHeight uint64, err error) { + // get the bridge start height from the host + l1StartHeight, err = ex.host.QueryCreateBridgeHeight(ctx, bridgeId) + if err != nil { + return 0, 0, 0, 0, err + } + + // get the last submitted output height before the start height from the host + if ex.cfg.L2StartHeight != 0 { + output, err := ex.host.QueryOutputByL2BlockNumber(ctx, bridgeId, uint64(ex.cfg.L2StartHeight)) + if err != nil { + return 0, 0, 0, 0, err + } else if output != nil { + l1StartHeight = output.OutputProposal.L1BlockNumber + l2StartHeight = output.OutputProposal.L2BlockNumber + startOutputIndex = output.OutputIndex + 1 + } else { + startOutputIndex = 1 + } + } + // get the last deposit tx height from the host + l1Sequence, err := ex.child.QueryNextL1Sequence(ctx) + if err != nil { + return 0, 0, 0, 0, err + } + depositTxHeight, err := ex.host.QueryDepositTxHeight(ctx, bridgeId, l1Sequence-1) + if err != nil { + return 0, 0, 0, 0, err + } + if l1StartHeight > depositTxHeight { + l1StartHeight = depositTxHeight + } + batchStartHeight = uint64(ex.cfg.BatchStartHeight) - 1 + return l1StartHeight, l2StartHeight, startOutputIndex, batchStartHeight, err +} diff --git a/executor/host/batch.go b/executor/host/batch.go index 2e81b38..d1f5402 100644 --- a/executor/host/batch.go +++ b/executor/host/batch.go @@ -1,6 +1,7 @@ package host import ( + "context" "strconv" ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" @@ -8,7 +9,7 @@ import ( "go.uber.org/zap" ) -func (h *Host) recordBatchHandler(args nodetypes.EventHandlerArgs) error { +func (h *Host) recordBatchHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { var submitter string for _, attr := range args.EventAttributes { switch attr.Key { @@ -29,7 +30,7 @@ func (h *Host) recordBatchHandler(args nodetypes.EventHandlerArgs) error { return nil } -func (h *Host) updateBatchInfoHandler(args nodetypes.EventHandlerArgs) error { +func (h *Host) updateBatchInfoHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { var bridgeId uint64 var submitter, chain string var outputIndex, l2BlockNumber uint64 diff --git a/executor/host/deposit.go b/executor/host/deposit.go index 64b0f5a..f822c42 100644 --- a/executor/host/deposit.go +++ b/executor/host/deposit.go @@ -1,6 +1,7 @@ package host import ( + "context" "encoding/hex" "errors" "strconv" @@ -12,7 +13,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -func (h *Host) initiateDepositHandler(args nodetypes.EventHandlerArgs) error { +func (h *Host) initiateDepositHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { var bridgeId uint64 var l1Sequence uint64 var from, to, l1Denom, l2Denom, amount string diff --git a/executor/host/handler.go b/executor/host/handler.go index edc33a9..1c3e89e 100644 --- a/executor/host/handler.go +++ b/executor/host/handler.go @@ -1,6 +1,7 @@ package host import ( + "context" "time" "github.com/initia-labs/opinit-bots-go/types" @@ -11,7 +12,7 @@ import ( nodetypes "github.com/initia-labs/opinit-bots-go/node/types" ) -func (h *Host) beginBlockHandler(args nodetypes.BeginBlockArgs) error { +func (h *Host) beginBlockHandler(_ context.Context, args nodetypes.BeginBlockArgs) error { blockHeight := uint64(args.Block.Header.Height) // just to make sure that childMsgQueue is empty if blockHeight == args.LatestHeight && len(h.msgQueue) != 0 && len(h.processedMsgs) != 0 { @@ -20,7 +21,7 @@ func (h *Host) beginBlockHandler(args nodetypes.BeginBlockArgs) error { return nil } -func (h *Host) endBlockHandler(args nodetypes.EndBlockArgs) error { +func (h *Host) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) error { // temporary 50 limit for msg queue // collect more msgs if block height is not latest blockHeight := uint64(args.Block.Header.Height) @@ -61,7 +62,7 @@ func (h *Host) endBlockHandler(args nodetypes.EndBlockArgs) error { return nil } -func (h *Host) txHandler(args nodetypes.TxHandlerArgs) error { +func (h *Host) txHandler(_ context.Context, args nodetypes.TxHandlerArgs) error { if args.BlockHeight == args.LatestHeight && args.TxIndex == 0 { if msg, err := h.oracleTxHandler(args.BlockHeight, args.Tx); err != nil { return err diff --git a/executor/host/host.go b/executor/host/host.go index 2a1f11d..30e1b79 100644 --- a/executor/host/host.go +++ b/executor/host/host.go @@ -26,7 +26,7 @@ type childNode interface { HasKey() bool BroadcastMsgs(btypes.ProcessedMsgs) ProcessedMsgsToRawKV([]btypes.ProcessedMsgs, bool) ([]types.RawKV, error) - QueryNextL1Sequence() (uint64, error) + QueryNextL1Sequence(context.Context) (uint64, error) GetMsgFinalizeTokenDeposit(string, string, sdk.Coin, uint64, uint64, string, []byte) (sdk.Msg, error) GetMsgUpdateOracle( @@ -60,6 +60,10 @@ type Host struct { processedMsgs []btypes.ProcessedMsgs msgQueue []sdk.Msg + + // status info + lastProposedOutputIndex uint64 + lastProposedOutputL2BlockNumber uint64 } func NewHost( @@ -111,12 +115,16 @@ func GetCodec(bech32Prefix string) (codec.Codec, client.TxConfig, error) { }) } -func (h *Host) Initialize(child childNode, batch batchNode, bridgeId int64) (err error) { +func (h *Host) Initialize(ctx context.Context, startHeight uint64, child childNode, batch batchNode, bridgeId int64) error { + err := h.node.Initialize(startHeight) + if err != nil { + return err + } h.child = child h.batch = batch h.bridgeId = bridgeId - h.initialL1Sequence, err = h.child.QueryNextL1Sequence() + h.initialL1Sequence, err = h.child.QueryNextL1Sequence(ctx) if err != nil { return err } diff --git a/executor/host/msgs.go b/executor/host/msgs.go index 2d4b19f..6ecf124 100644 --- a/executor/host/msgs.go +++ b/executor/host/msgs.go @@ -1,8 +1,9 @@ package host import ( - sdk "github.com/cosmos/cosmos-sdk/types" ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" + + sdk "github.com/cosmos/cosmos-sdk/types" ) func (h Host) GetMsgProposeOutput( diff --git a/executor/host/query.go b/executor/host/query.go index dde4152..17b0109 100644 --- a/executor/host/query.go +++ b/executor/host/query.go @@ -1,12 +1,20 @@ package host import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "time" + sdk "github.com/cosmos/cosmos-sdk/types" query "github.com/cosmos/cosmos-sdk/types/query" ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" "github.com/initia-labs/opinit-bots-go/node/rpcclient" + "github.com/initia-labs/opinit-bots-go/types" ) func (h Host) GetAddress() sdk.AccAddress { @@ -17,15 +25,15 @@ func (h Host) GetAddressStr() (string, error) { return h.node.MustGetBroadcaster().GetAddressString() } -func (h Host) QueryLastOutput() (*ophosttypes.QueryOutputProposalResponse, error) { +func (h Host) QueryLastOutput(ctx context.Context, bridgeId uint64) (*ophosttypes.QueryOutputProposalResponse, error) { req := &ophosttypes.QueryOutputProposalsRequest{ - BridgeId: uint64(h.bridgeId), + BridgeId: bridgeId, Pagination: &query.PageRequest{ Limit: 1, Reverse: true, }, } - ctx, cancel := rpcclient.GetQueryContext(0) + ctx, cancel := rpcclient.GetQueryContext(ctx, 0) defer cancel() res, err := h.ophostQueryClient.OutputProposals(ctx, req) @@ -38,22 +46,133 @@ func (h Host) QueryLastOutput() (*ophosttypes.QueryOutputProposalResponse, error return &res.OutputProposals[0], nil } -func (h Host) QueryOutput(outputIndex uint64) (*ophosttypes.QueryOutputProposalResponse, error) { +func (h Host) QueryOutput(ctx context.Context, bridgeId uint64, outputIndex uint64) (*ophosttypes.QueryOutputProposalResponse, error) { req := &ophosttypes.QueryOutputProposalRequest{ - BridgeId: uint64(h.bridgeId), + BridgeId: bridgeId, OutputIndex: outputIndex, } - ctx, cancel := rpcclient.GetQueryContext(0) + ctx, cancel := rpcclient.GetQueryContext(ctx, 0) defer cancel() return h.ophostQueryClient.OutputProposal(ctx, req) } -func (h Host) QueryBatchInfos() (*ophosttypes.QueryBatchInfosResponse, error) { +// QueryOutputByL2BlockNumber queries the last output proposal before the given L2 block number +func (h Host) QueryOutputByL2BlockNumber(ctx context.Context, bridgeId uint64, l2BlockNumber uint64) (*ophosttypes.QueryOutputProposalResponse, error) { + start, err := h.QueryOutput(ctx, bridgeId, 1) + if err != nil { + if strings.Contains(err.Error(), "not found") { + return nil, nil + } + return nil, err + } + end, err := h.QueryLastOutput(ctx, bridgeId) + if err != nil { + return nil, err + } else if end == nil { + return nil, nil + } + + for { + if start.OutputProposal.L2BlockNumber >= l2BlockNumber { + if start.OutputIndex != 1 { + return h.QueryOutput(ctx, bridgeId, start.OutputIndex-1) + } + return nil, nil + } else if end.OutputProposal.L2BlockNumber < l2BlockNumber { + return end, nil + } else if end.OutputIndex-start.OutputIndex <= 1 { + return start, nil + } + + midIndex := (start.OutputIndex + end.OutputIndex) / 2 + output, err := h.QueryOutput(ctx, bridgeId, midIndex) + if err != nil { + return nil, err + } + + if output.OutputProposal.L2BlockNumber <= l2BlockNumber { + start = output + } else { + end = output + } + } +} + +func (h Host) QueryCreateBridgeHeight(ctx context.Context, bridgeId uint64) (uint64, error) { + ctx, cancel := rpcclient.GetQueryContext(ctx, 0) + defer cancel() + + query := fmt.Sprintf("%s.%s = %d", + ophosttypes.EventTypeCreateBridge, + ophosttypes.AttributeKeyBridgeId, + bridgeId, + ) + perPage := 1 + res, err := h.node.GetRPCClient().TxSearch(ctx, query, false, nil, &perPage, "desc") + if err != nil { + return 0, err + } + if len(res.Txs) == 0 { + // bridge not found + return 0, errors.New("bridge not found") + } + return uint64(res.Txs[0].Height), nil +} + +func (h Host) QueryBatchInfos(ctx context.Context, bridgeId uint64) (*ophosttypes.QueryBatchInfosResponse, error) { req := &ophosttypes.QueryBatchInfosRequest{ - BridgeId: uint64(h.bridgeId), + BridgeId: bridgeId, } - ctx, cancel := rpcclient.GetQueryContext(0) + ctx, cancel := rpcclient.GetQueryContext(ctx, 0) defer cancel() return h.ophostQueryClient.BatchInfos(ctx, req) } + +func (h Host) QueryDepositTxHeight(ctx context.Context, bridgeId uint64, l1Sequence uint64) (uint64, error) { + if l1Sequence == 0 { + return 0, nil + } + + ctx, cancel := rpcclient.GetQueryContext(ctx, 0) + defer cancel() + + ticker := time.NewTicker(types.PollingInterval(ctx)) + defer ticker.Stop() + + query := fmt.Sprintf("%s.%s = %d", + ophosttypes.EventTypeInitiateTokenDeposit, + ophosttypes.AttributeKeyL1Sequence, + l1Sequence, + ) + per_page := 100 + for page := 1; ; page++ { + select { + case <-ctx.Done(): + return 0, ctx.Err() + case <-ticker.C: + } + + res, err := h.node.GetRPCClient().TxSearch(ctx, query, false, &page, &per_page, "asc") + if err != nil { + return 0, err + } + + for _, tx := range res.Txs { + for _, event := range tx.TxResult.Events { + if event.Type == ophosttypes.EventTypeInitiateTokenDeposit { + for _, attr := range event.Attributes { + if attr.Key == ophosttypes.AttributeKeyBridgeId && attr.Value == strconv.FormatUint(bridgeId, 10) { + return uint64(tx.Height), nil + } + } + } + } + } + + if page*per_page >= res.TotalCount { + break + } + } + return 0, fmt.Errorf("failed to fetch deposit tx with l1 Sequence: %d", l1Sequence) +} diff --git a/executor/host/status.go b/executor/host/status.go new file mode 100644 index 0000000..4e9f3e7 --- /dev/null +++ b/executor/host/status.go @@ -0,0 +1,23 @@ +package host + +import ( + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" +) + +type Status struct { + Node nodetypes.Status `json:"node"` + LastProposedOutputIndex uint64 `json:"last_proposed_output_index"` + LastProposedOutputL2BlockNumber uint64 `json:"last_proposed_output_l2_block_number"` +} + +func (h Host) GetStatus() Status { + return Status{ + Node: h.node.GetStatus(), + LastProposedOutputIndex: h.lastProposedOutputIndex, + LastProposedOutputL2BlockNumber: h.lastProposedOutputL2BlockNumber, + } +} + +func (h Host) GetNodeStatus() nodetypes.Status { + return h.node.GetStatus() +} diff --git a/executor/host/withdraw.go b/executor/host/withdraw.go index e79582a..7e6ac85 100644 --- a/executor/host/withdraw.go +++ b/executor/host/withdraw.go @@ -1,6 +1,7 @@ package host import ( + "context" "encoding/base64" "encoding/hex" "strconv" @@ -10,7 +11,7 @@ import ( "go.uber.org/zap" ) -func (h *Host) proposeOutputHandler(args nodetypes.EventHandlerArgs) error { +func (h *Host) proposeOutputHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { var bridgeId, l2BlockNumber, outputIndex uint64 var proposer string var outputRoot []byte @@ -44,7 +45,8 @@ func (h *Host) proposeOutputHandler(args nodetypes.EventHandlerArgs) error { } h.handleProposeOutput(bridgeId, proposer, outputIndex, l2BlockNumber, outputRoot) - + h.lastProposedOutputIndex = outputIndex + h.lastProposedOutputL2BlockNumber = l2BlockNumber return nil } @@ -58,7 +60,7 @@ func (h *Host) handleProposeOutput(bridgeId uint64, proposer string, outputIndex ) } -func (h *Host) finalizeWithdrawalHandler(args nodetypes.EventHandlerArgs) error { +func (h *Host) finalizeWithdrawalHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { var bridgeId uint64 var outputIndex, l2Sequence uint64 var from, to, l1Denom, l2Denom, amount string diff --git a/executor/status.go b/executor/status.go new file mode 100644 index 0000000..388d07a --- /dev/null +++ b/executor/status.go @@ -0,0 +1,35 @@ +package executor + +import ( + "github.com/initia-labs/opinit-bots-go/executor/batch" + "github.com/initia-labs/opinit-bots-go/executor/child" + "github.com/initia-labs/opinit-bots-go/executor/host" + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" +) + +type Status struct { + BridgeId int64 `json:"bridge_id"` + Host host.Status `json:"host,omitempty"` + Child child.Status `json:"child,omitempty"` + Batch batch.Status `json:"batch,omitempty"` + DA nodetypes.Status `json:"da,omitempty"` +} + +func (ex Executor) GetStatus() Status { + s := Status{ + BridgeId: ex.host.BridgeId(), + } + if ex.host != nil { + s.Host = ex.host.GetStatus() + } + if ex.child != nil { + s.Child = ex.child.GetStatus() + } + if ex.batch != nil { + s.Batch = ex.batch.GetStatus() + if ex.batch.DA() != nil { + s.DA = ex.batch.DA().GetNodeStatus() + } + } + return s +} diff --git a/executor/types/batch.go b/executor/types/batch.go index 2d747f6..1b4d8f6 100644 --- a/executor/types/batch.go +++ b/executor/types/batch.go @@ -3,9 +3,11 @@ package types import ( "context" - sdk "github.com/cosmos/cosmos-sdk/types" btypes "github.com/initia-labs/opinit-bots-go/node/broadcaster/types" + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" "github.com/initia-labs/opinit-bots-go/types" + + sdk "github.com/cosmos/cosmos-sdk/types" ) type DANode interface { @@ -14,6 +16,7 @@ type DANode interface { CreateBatchMsg([]byte) (sdk.Msg, error) BroadcastMsgs(btypes.ProcessedMsgs) ProcessedMsgsToRawKV(processedMsgs []btypes.ProcessedMsgs, delete bool) ([]types.RawKV, error) + GetNodeStatus() nodetypes.Status } // BatchHeader is the header of a batch diff --git a/executor/types/config.go b/executor/types/config.go index ed40d76..41292e7 100644 --- a/executor/types/config.go +++ b/executor/types/config.go @@ -2,11 +2,34 @@ package types import ( "errors" + "time" btypes "github.com/initia-labs/opinit-bots-go/node/broadcaster/types" nodetypes "github.com/initia-labs/opinit-bots-go/node/types" ) +type NodeConfig struct { + ChainID string `json:"chain_id"` + Bech32Prefix string `json:"bech32_prefix"` + RPCAddress string `json:"rpc_address"` + GasPrice string `json:"gas_price"` + GasAdjustment float64 `json:"gas_adjustment"` + TxTimeout int64 `json:"tx_timeout"` // seconds +} + +func (nc NodeConfig) Validate() error { + if nc.ChainID == "" { + return errors.New("chain ID is required") + } + if nc.Bech32Prefix == "" { + return errors.New("bech32 prefix is required") + } + if nc.RPCAddress == "" { + return errors.New("RPC address is required") + } + return nil +} + type Config struct { // Version is the version used to build output root. Version uint8 `json:"version"` @@ -14,21 +37,12 @@ type Config struct { // ListenAddress is the address to listen for incoming requests. ListenAddress string `json:"listen_address"` - L1RPCAddress string `json:"l1_rpc_address"` - L2RPCAddress string `json:"l2_rpc_address"` - DARPCAddress string `json:"da_rpc_address"` - - L1GasPrice string `json:"l1_gas_price"` - L2GasPrice string `json:"l2_gas_price"` - DAGasPrice string `json:"da_gas_price"` - - L1ChainID string `json:"l1_chain_id"` - L2ChainID string `json:"l2_chain_id"` - DAChainID string `json:"da_chain_id"` - - L1Bech32Prefix string `json:"l1_bech32_prefix"` - L2Bech32Prefix string `json:"l2_bech32_prefix"` - DABech32Prefix string `json:"da_bech32_prefix"` + // L1Node is the configuration for the l1 node. + L1Node NodeConfig `json:"l1_node"` + // L2Node is the configuration for the l2 node. + L2Node NodeConfig `json:"l2_node"` + // DANode is the configuration for the data availability node. + DANode NodeConfig `json:"da_node"` // OutputSubmitter is the key name in the keyring for the output submitter, // which is used to relay the output transaction from l2 to l1. @@ -51,11 +65,15 @@ type Config struct { MaxChunkSize int64 `json:"max_chunk_size"` // MaxSubmissionTime is the maximum time to submit a batch. MaxSubmissionTime int64 `json:"max_submission_time"` // seconds -} -type HostConfig struct { - nodetypes.NodeConfig - RelayOracle bool `json:"relay_oracle"` + // L2StartHeight is the height to start the l2 node. If it is 0, it will start from the latest height. + // If the latest height stored in the db is not 0, this config is ignored. + // L2 starts from the last submitted output l2 block number + 1 before L2StartHeight. + // L1 starts from the block number of the output tx + 1 + L2StartHeight int64 `json:"l2_start_height"` + // BatchStartHeight is the height to start the batch. If it is 0, it will start from the latest height. + // If the latest height stored in the db is not 0, this config is ignored. + BatchStartHeight int64 `json:"batch_start_height"` } func DefaultConfig() *Config { @@ -63,21 +81,32 @@ func DefaultConfig() *Config { Version: 1, ListenAddress: "localhost:3000", - L1RPCAddress: "tcp://localhost:26657", - L2RPCAddress: "tcp://localhost:27657", - DARPCAddress: "tcp://localhost:28657", - - L1GasPrice: "0.15uinit", - L2GasPrice: "", - DAGasPrice: "", + L1Node: NodeConfig{ + ChainID: "testnet-l1-1", + Bech32Prefix: "init", + RPCAddress: "tcp://localhost:26657", + GasPrice: "0.15uinit", + GasAdjustment: 1.5, + TxTimeout: 60, + }, - L1ChainID: "testnet-l1-1", - L2ChainID: "testnet-l2-1", - DAChainID: "testnet-l3-1", + L2Node: NodeConfig{ + ChainID: "testnet-l2-1", + Bech32Prefix: "init", + RPCAddress: "tcp://localhost:27657", + GasPrice: "", + GasAdjustment: 1.5, + TxTimeout: 60, + }, - L1Bech32Prefix: "init", - L2Bech32Prefix: "init", - DABech32Prefix: "init", + DANode: NodeConfig{ + ChainID: "testnet-l1-1", + Bech32Prefix: "init", + RPCAddress: "tcp://localhost:26657", + GasPrice: "0.15uinit", + GasAdjustment: 1.5, + TxTimeout: 60, + }, OutputSubmitter: "", BridgeExecutor: "", @@ -87,6 +116,9 @@ func DefaultConfig() *Config { MaxChunks: 5000, MaxChunkSize: 300000, // 300KB MaxSubmissionTime: 60 * 60, // 1 hour + + L2StartHeight: 0, + BatchStartHeight: 0, } } @@ -95,53 +127,37 @@ func (cfg Config) Validate() error { return errors.New("version is required") } - if cfg.L1RPCAddress == "" { - return errors.New("L1 RPC URL is required") - } - if cfg.L2RPCAddress == "" { - return errors.New("L2 RPC URL is required") - } - if cfg.DARPCAddress == "" { - return errors.New("L2 RPC URL is required") - } - - if cfg.L1ChainID == "" { - return errors.New("L1 chain ID is required") - } - if cfg.L2ChainID == "" { - return errors.New("L2 chain ID is required") - } - if cfg.DAChainID == "" { - return errors.New("L2 RPC URL is required") - } if cfg.ListenAddress == "" { return errors.New("listen address is required") } - if cfg.L1Bech32Prefix == "" { - return errors.New("L1 bech32 prefix is required") + if err := cfg.L1Node.Validate(); err != nil { + return err } - if cfg.L2Bech32Prefix == "" { - return errors.New("L2 bech32 prefix is required") - } - if cfg.DABech32Prefix == "" { - return errors.New("DA bech32 prefix is required") + + if err := cfg.L2Node.Validate(); err != nil { + return err } + if err := cfg.DANode.Validate(); err != nil { + return err + } return nil } func (cfg Config) L1NodeConfig(homePath string) nodetypes.NodeConfig { nc := nodetypes.NodeConfig{ - RPC: cfg.L1RPCAddress, + RPC: cfg.L1Node.RPCAddress, ProcessType: nodetypes.PROCESS_TYPE_DEFAULT, } if cfg.OutputSubmitter != "" { nc.BroadcasterConfig = &btypes.BroadcasterConfig{ - ChainID: cfg.L1ChainID, - GasPrice: cfg.L1GasPrice, - Bech32Prefix: cfg.L1Bech32Prefix, + ChainID: cfg.L1Node.ChainID, + GasPrice: cfg.L1Node.GasPrice, + GasAdjustment: cfg.L1Node.GasAdjustment, + Bech32Prefix: cfg.L1Node.Bech32Prefix, + TxTimeout: time.Duration(cfg.L1Node.TxTimeout) * time.Second, KeyringConfig: btypes.KeyringConfig{ Name: cfg.OutputSubmitter, HomePath: homePath, @@ -154,15 +170,17 @@ func (cfg Config) L1NodeConfig(homePath string) nodetypes.NodeConfig { func (cfg Config) L2NodeConfig(homePath string) nodetypes.NodeConfig { nc := nodetypes.NodeConfig{ - RPC: cfg.L2RPCAddress, + RPC: cfg.L2Node.RPCAddress, ProcessType: nodetypes.PROCESS_TYPE_DEFAULT, } if cfg.BridgeExecutor != "" { nc.BroadcasterConfig = &btypes.BroadcasterConfig{ - ChainID: cfg.L2ChainID, - GasPrice: cfg.L2GasPrice, - Bech32Prefix: cfg.L2Bech32Prefix, + ChainID: cfg.L2Node.ChainID, + GasPrice: cfg.L2Node.GasPrice, + GasAdjustment: cfg.L2Node.GasAdjustment, + Bech32Prefix: cfg.L2Node.Bech32Prefix, + TxTimeout: time.Duration(cfg.L2Node.TxTimeout) * time.Second, KeyringConfig: btypes.KeyringConfig{ Name: cfg.BridgeExecutor, HomePath: homePath, @@ -175,12 +193,14 @@ func (cfg Config) L2NodeConfig(homePath string) nodetypes.NodeConfig { func (cfg Config) DANodeConfig(homePath string) nodetypes.NodeConfig { nc := nodetypes.NodeConfig{ - RPC: cfg.DARPCAddress, + RPC: cfg.DANode.RPCAddress, ProcessType: nodetypes.PROCESS_TYPE_DEFAULT, BroadcasterConfig: &btypes.BroadcasterConfig{ - ChainID: cfg.DAChainID, - GasPrice: cfg.DAGasPrice, - Bech32Prefix: cfg.DABech32Prefix, + ChainID: cfg.DANode.ChainID, + GasPrice: cfg.DANode.GasPrice, + GasAdjustment: cfg.DANode.GasAdjustment, + Bech32Prefix: cfg.DANode.Bech32Prefix, + TxTimeout: time.Duration(cfg.DANode.TxTimeout) * time.Second, KeyringConfig: btypes.KeyringConfig{ HomePath: homePath, }, diff --git a/go.mod b/go.mod index 28c6c92..351c4a0 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/cometbft/cometbft v0.38.10 github.com/cosmos/cosmos-sdk v0.50.8 github.com/gofiber/fiber/v2 v2.52.5 - github.com/initia-labs/OPinit v0.4.1 + github.com/initia-labs/OPinit v0.4.2 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 diff --git a/go.sum b/go.sum index 3ae95f9..613bbc8 100644 --- a/go.sum +++ b/go.sum @@ -476,8 +476,8 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= -github.com/initia-labs/OPinit v0.4.1 h1:g6IVEAOe2X31pgjk/q0zg4R1GfNj2QP3q5s3HMcWm8w= -github.com/initia-labs/OPinit v0.4.1/go.mod h1:n0eqwOnVGE1vuTnW+3jzyEXfE4ndTM0vCRGmPu9VvUc= +github.com/initia-labs/OPinit v0.4.2 h1:cfE6LXb3EquDK1/UTDT62o9c2HOQL0E9pWTr5BWAf8g= +github.com/initia-labs/OPinit v0.4.2/go.mod h1:bM+tav+ER4uC6U84PB5vgnRUNyjc/phNgHGYQX9ALhg= github.com/initia-labs/OPinit/api v0.4.1 h1:Q8etW92LiwekKZxzDYVFdiHF3uOpEA4nyajy8zpcxB0= github.com/initia-labs/OPinit/api v0.4.1/go.mod h1:Xy/Nt3ubXLQ4zKn0m7RuQOM1sj8TVdlNNyek21TGYR0= github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls= diff --git a/keys/codec.go b/keys/codec.go index 9b92e74..890e80d 100644 --- a/keys/codec.go +++ b/keys/codec.go @@ -2,6 +2,8 @@ package keys import ( "cosmossdk.io/x/tx/signing" + "github.com/cosmos/gogoproto/proto" + "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" codecaddress "github.com/cosmos/cosmos-sdk/codec/address" @@ -9,7 +11,6 @@ import ( "github.com/cosmos/cosmos-sdk/std" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/x/auth/tx" - "github.com/cosmos/gogoproto/proto" ) type RegisterInterfaces func(registry codectypes.InterfaceRegistry) diff --git a/keys/keyring.go b/keys/keyring.go index ee3a9fd..3897b44 100644 --- a/keys/keyring.go +++ b/keys/keyring.go @@ -3,9 +3,10 @@ package keys import ( "io" + "github.com/cosmos/go-bip39" + "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/crypto/keyring" - "github.com/cosmos/go-bip39" ) func GetKeyBase(chainId string, dir string, cdc codec.Codec, userInput io.Reader) (keyring.Keyring, error) { diff --git a/merkle/merkle.go b/merkle/merkle.go index 0def1ce..6e1c8fa 100644 --- a/merkle/merkle.go +++ b/merkle/merkle.go @@ -170,6 +170,11 @@ func (m *Merkle) GetWorkingTreeLeafCount() uint64 { return m.workingTree.LeafCount } +// GetStartLeafIndex returns the start leaf index of the working tree. +func (m *Merkle) GetStartLeafIndex() uint64 { + return m.workingTree.StartLeafIndex +} + func (m *Merkle) saveNode(height uint8, localNodeIndex uint64, data []byte) error { return m.db.Set(merkletypes.PrefixedNodeKey(m.GetWorkingTreeIndex(), height, localNodeIndex), data) } diff --git a/node/broadcaster/broadcaster.go b/node/broadcaster/broadcaster.go index 8eaddab..53a7b73 100644 --- a/node/broadcaster/broadcaster.go +++ b/node/broadcaster/broadcaster.go @@ -8,13 +8,14 @@ import ( rpccoretypes "github.com/cometbft/cometbft/rpc/core/types" + "github.com/pkg/errors" + "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/crypto/keyring" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/tx/signing" - "github.com/pkg/errors" btypes "github.com/initia-labs/opinit-bots-go/node/broadcaster/types" "github.com/initia-labs/opinit-bots-go/node/rpcclient" @@ -134,7 +135,7 @@ func (b *Broadcaster) prepareBroadcaster(_ /*lastBlockHeight*/ uint64, lastBlock WithAccountRetriever(b). WithChainID(b.cfg.ChainID). WithTxConfig(b.txConfig). - WithGasAdjustment(btypes.GAS_ADJUSTMENT). + WithGasAdjustment(b.cfg.GasAdjustment). WithGasPrices(b.cfg.GasPrice). WithKeybase(b.keyBase). WithSignMode(signing.SignMode_SIGN_MODE_DIRECT) @@ -156,8 +157,10 @@ func (b *Broadcaster) prepareBroadcaster(_ /*lastBlockHeight*/ uint64, lastBlock pendingTxTime := time.Unix(0, loadedPendingTxs[0].Timestamp) // if we have pending txs, wait until timeout - if timeoutTime := pendingTxTime.Add(btypes.TX_TIMEOUT); lastBlockTime.Before(timeoutTime) { - timer := time.NewTimer(timeoutTime.Sub(lastBlockTime)) + if timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout); lastBlockTime.Before(timeoutTime) { + waitingTime := timeoutTime.Sub(lastBlockTime) + timer := time.NewTimer(waitingTime) + b.logger.Info("waiting for pending txs to be processed", zap.Duration("waiting_time", waitingTime)) <-timer.C } diff --git a/node/broadcaster/process.go b/node/broadcaster/process.go index 3e5768d..f7a5b55 100644 --- a/node/broadcaster/process.go +++ b/node/broadcaster/process.go @@ -20,16 +20,15 @@ func (b Broadcaster) GetHeight() uint64 { // HandleNewBlock is called when a new block is received. func (b *Broadcaster) HandleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight uint64) error { - // check pending txs first for _, tx := range block.Block.Txs { - if b.lenLocalPendingTx() == 0 { + if b.LenLocalPendingTx() == 0 { break } // check if the first pending tx is included in the block if pendingTx := b.peekLocalPendingTx(); btypes.TxHash(tx) == pendingTx.TxHash { - err := b.RemovePendingTx(block.Block.Height, pendingTx.TxHash, pendingTx.Sequence) + err := b.RemovePendingTx(block.Block.Height, pendingTx.TxHash, pendingTx.Sequence, pendingTx.MsgTypes) if err != nil { return err } @@ -38,10 +37,10 @@ func (b *Broadcaster) HandleNewBlock(block *rpccoretypes.ResultBlock, blockResul // check timeout of pending txs // @sh-cha: should we rebroadcast pending txs? or rasing monitoring alert? - if length := b.lenLocalPendingTx(); length > 0 { + if length := b.LenLocalPendingTx(); length > 0 { b.logger.Debug("remaining pending txs", zap.Int64("height", block.Block.Height), zap.Int("count", length)) pendingTxTime := time.Unix(0, b.peekLocalPendingTx().Timestamp) - if block.Block.Time.After(pendingTxTime.Add(btypes.TX_TIMEOUT)) { + if block.Block.Time.After(pendingTxTime.Add(b.cfg.TxTimeout)) { panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", block.Block.Time.UTC().String(), pendingTxTime.UTC().String())) } } @@ -53,14 +52,14 @@ func (b *Broadcaster) HandleNewBlock(block *rpccoretypes.ResultBlock, blockResul } // CheckPendingTx query tx info to check if pending tx is processed. -func (b *Broadcaster) CheckPendingTx() (*btypes.PendingTxInfo, *rpccoretypes.ResultTx, error) { - if b.lenLocalPendingTx() == 0 { +func (b *Broadcaster) CheckPendingTx(ctx context.Context) (*btypes.PendingTxInfo, *rpccoretypes.ResultTx, error) { + if b.LenLocalPendingTx() == 0 { return nil, nil, nil } pendingTx := b.peekLocalPendingTx() pendingTxTime := time.Unix(0, b.peekLocalPendingTx().Timestamp) - if time.Now().After(pendingTxTime.Add(btypes.TX_TIMEOUT)) { + if time.Now().After(pendingTxTime.Add(b.cfg.TxTimeout)) { // @sh-cha: should we rebroadcast pending txs? or rasing monitoring alert? panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", time.Now().UTC().String(), pendingTxTime.UTC().String())) } @@ -69,9 +68,9 @@ func (b *Broadcaster) CheckPendingTx() (*btypes.PendingTxInfo, *rpccoretypes.Res if err != nil { return nil, nil, err } - res, err := b.rpcClient.QueryTx(txHash) + res, err := b.rpcClient.QueryTx(ctx, txHash) if err != nil { - b.logger.Debug("failed to query tx", zap.String("txHash", pendingTx.TxHash), zap.String("error", err.Error())) + b.logger.Debug("failed to query tx", zap.String("tx_hash", pendingTx.TxHash), zap.String("error", err.Error())) return nil, nil, nil } @@ -80,13 +79,13 @@ func (b *Broadcaster) CheckPendingTx() (*btypes.PendingTxInfo, *rpccoretypes.Res // RemovePendingTx remove pending tx from local pending txs. // It is called when the pending tx is included in the block. -func (b *Broadcaster) RemovePendingTx(blockHeight int64, txHash string, sequence uint64) error { +func (b *Broadcaster) RemovePendingTx(blockHeight int64, txHash string, sequence uint64, msgTypes []string) error { err := b.deletePendingTx(sequence) if err != nil { return err } - b.logger.Debug("tx inserted", zap.Int64("height", blockHeight), zap.Uint64("sequence", sequence), zap.String("txHash", txHash)) + b.logger.Info("tx inserted", zap.Int64("height", blockHeight), zap.Uint64("sequence", sequence), zap.String("tx_hash", txHash), zap.Strings("msg_types", msgTypes)) b.dequeueLocalPendingTx() return nil @@ -103,11 +102,6 @@ func (b *Broadcaster) Start(ctx context.Context) error { case data := <-b.txChannel: var err error for retry := 1; retry <= 10; retry++ { - select { - case <-ctx.Done(): - return nil - default: - } err = b.handleProcessedMsgs(ctx, data) if err == nil { break @@ -115,11 +109,16 @@ func (b *Broadcaster) Start(ctx context.Context) error { break } else if !data.Save { // if the message does not need to be saved, we can skip retry + err = nil break } - b.logger.Warn("retry", zap.Int("count", retry), zap.String("error", err.Error())) - - time.Sleep(30 * time.Second) + b.logger.Warn("retry to handle processed msgs after 30 seconds", zap.Int("count", retry), zap.String("error", err.Error())) + timer := time.NewTimer(30 * time.Second) + select { + case <-ctx.Done(): + return nil + case <-timer.C: + } } if err != nil { return errors.Wrap(err, "failed to handle processed msgs") diff --git a/node/broadcaster/tx.go b/node/broadcaster/tx.go index 753fbc8..6bfa698 100644 --- a/node/broadcaster/tx.go +++ b/node/broadcaster/tx.go @@ -119,6 +119,7 @@ func (b *Broadcaster) handleProcessedMsgs(ctx context.Context, data btypes.Proce Tx: txBytes, TxHash: txHash, Timestamp: data.Timestamp, + MsgTypes: data.GetMsgTypes(), Save: data.Save, } @@ -191,7 +192,7 @@ func (b Broadcaster) adjustEstimatedGas(gasUsed uint64) (uint64, error) { return gasUsed, nil } - gas := btypes.GAS_ADJUSTMENT * float64(gasUsed) + gas := b.cfg.GasAdjustment * float64(gasUsed) if math.IsInf(gas, 1) { return 0, fmt.Errorf("infinite gas used") } @@ -243,7 +244,7 @@ func (b *Broadcaster) peekLocalPendingTx() btypes.PendingTxInfo { return b.pendingTxs[0] } -func (b Broadcaster) lenLocalPendingTx() int { +func (b Broadcaster) LenLocalPendingTx() int { b.pendingTxMu.Lock() defer b.pendingTxMu.Unlock() diff --git a/node/broadcaster/types/config.go b/node/broadcaster/types/config.go index 9551a14..d1fbd63 100644 --- a/node/broadcaster/types/config.go +++ b/node/broadcaster/types/config.go @@ -3,11 +3,13 @@ package types import ( "context" "fmt" + "time" + + "github.com/initia-labs/opinit-bots-go/keys" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/crypto/keyring" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/initia-labs/opinit-bots-go/keys" ) type BuildTxWithMessagesFn func(context.Context, []sdk.Msg) ([]byte, string, error) @@ -20,6 +22,12 @@ type BroadcasterConfig struct { // GasPrice is the gas price. GasPrice string + // GasAdjustment is the gas adjustment. + GasAdjustment float64 + + // TxTimeout is the transaction timeout. + TxTimeout time.Duration + // Bech32Prefix is the Bech32 prefix. Bech32Prefix string @@ -63,6 +71,14 @@ func (bc BroadcasterConfig) Validate() error { return fmt.Errorf("pending tx to processed msgs is nil") } + if bc.GasAdjustment == 0 { + return fmt.Errorf("gas adjustment is zero") + } + + if bc.TxTimeout == 0 { + return fmt.Errorf("tx timeout is zero") + } + return bc.KeyringConfig.Validate() } diff --git a/node/broadcaster/types/consts.go b/node/broadcaster/types/consts.go deleted file mode 100644 index 5ea79c3..0000000 --- a/node/broadcaster/types/consts.go +++ /dev/null @@ -1,8 +0,0 @@ -package types - -import "time" - -const ( - GAS_ADJUSTMENT = 1.5 - TX_TIMEOUT = 60 * time.Second -) diff --git a/node/broadcaster/types/db.go b/node/broadcaster/types/db.go index 9d09e84..b4c8cf7 100644 --- a/node/broadcaster/types/db.go +++ b/node/broadcaster/types/db.go @@ -3,6 +3,7 @@ package types import ( "encoding/json" "fmt" + "strings" "time" "github.com/cosmos/cosmos-sdk/codec" @@ -10,11 +11,12 @@ import ( ) type PendingTxInfo struct { - ProcessedHeight uint64 `json:"height"` - Sequence uint64 `json:"sequence"` - Tx []byte `json:"tx"` - TxHash string `json:"tx_hash"` - Timestamp int64 `json:"timestamp"` + ProcessedHeight uint64 `json:"height"` + Sequence uint64 `json:"sequence"` + Tx []byte `json:"tx"` + TxHash string `json:"tx_hash"` + Timestamp int64 `json:"timestamp"` + MsgTypes []string `json:"msg_types"` // Save is true if the pending tx should be saved until processed. // Save is false if the pending tx can be discarded even if it is not processed @@ -32,7 +34,7 @@ func (p *PendingTxInfo) Unmarshal(data []byte) error { func (p PendingTxInfo) String() string { tsStr := time.Unix(0, p.Timestamp).UTC().String() - return fmt.Sprintf("Pending tx: %s, sequence: %d at height: %d, %s", p.TxHash, p.Sequence, p.ProcessedHeight, tsStr) + return fmt.Sprintf("Pending tx: %s, msgs: %s, sequence: %d at height: %d, %s", p.TxHash, strings.Join(p.MsgTypes, ","), p.Sequence, p.ProcessedHeight, tsStr) } type ProcessedMsgs struct { @@ -91,10 +93,14 @@ func (p *ProcessedMsgs) UnmarshalInterfaceJSON(cdc codec.Codec, data []byte) err } func (p ProcessedMsgs) String() string { - msgStr := "" + tsStr := time.Unix(0, p.Timestamp).UTC().String() + return fmt.Sprintf("Pending msgs: %s at %s", strings.Join(p.GetMsgTypes(), ","), tsStr) +} + +func (p ProcessedMsgs) GetMsgTypes() []string { + msgTypes := make([]string, 0, len(p.Msgs)) for _, msg := range p.Msgs { - msgStr += sdk.MsgTypeURL(msg) + "," + msgTypes = append(msgTypes, sdk.MsgTypeURL(msg)) } - tsStr := time.Unix(0, p.Timestamp).UTC().String() - return fmt.Sprintf("Pending msgs: %s at %s", msgStr, tsStr) + return msgTypes } diff --git a/node/db.go b/node/db.go index 7461496..2f32a47 100644 --- a/node/db.go +++ b/node/db.go @@ -14,9 +14,11 @@ func (n *Node) SetSyncInfo(height uint64) { } } -func (n *Node) loadSyncInfo() error { +func (n *Node) loadSyncInfo(startHeight uint64) error { data, err := n.db.Get(nodetypes.LastProcessedBlockHeightKey) if err == dbtypes.ErrNotFound { + n.SetSyncInfo(startHeight) + n.startHeightInitialized = true return nil } else if err != nil { return err diff --git a/node/node.go b/node/node.go index 3f11989..7726aba 100644 --- a/node/node.go +++ b/node/node.go @@ -7,14 +7,15 @@ import ( "github.com/pkg/errors" "cosmossdk.io/core/address" - "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/codec" "github.com/initia-labs/opinit-bots-go/node/broadcaster" "github.com/initia-labs/opinit-bots-go/node/rpcclient" nodetypes "github.com/initia-labs/opinit-bots-go/node/types" "github.com/initia-labs/opinit-bots-go/types" "go.uber.org/zap" "golang.org/x/sync/errgroup" + + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/codec" ) type Node struct { @@ -36,6 +37,7 @@ type Node struct { rawBlockHandler nodetypes.RawBlockHandlerFn // status info + startHeightInitialized bool lastProcessedBlockHeight uint64 running bool } @@ -62,39 +64,42 @@ func NewNode(cfg nodetypes.NodeConfig, db types.DB, logger *zap.Logger, cdc code cdc: cdc, txConfig: txConfig, } - // check if node is catching up - status, err := rpcClient.Status(context.Background()) + status, err := n.rpcClient.Status(context.Background()) if err != nil { return nil, err } if status.SyncInfo.CatchingUp { return nil, errors.New("node is catching up") } - // create broadcaster - if cfg.BroadcasterConfig != nil { + if n.cfg.BroadcasterConfig != nil { n.broadcaster, err = broadcaster.NewBroadcaster( - *cfg.BroadcasterConfig, - db, - logger, - cdc, - txConfig, - rpcClient, + *n.cfg.BroadcasterConfig, + n.db, + n.logger, + n.cdc, + n.txConfig, + n.rpcClient, status, ) if err != nil { return nil, errors.Wrap(err, "failed to create broadcaster") } } + return n, nil +} +// StartHeight is the height to start processing. +// If it is 0, the latest height is used. +// If the latest height exists in the database, this is ignored. +func (n *Node) Initialize(startHeight uint64) error { // load sync info - err = n.loadSyncInfo() - if err != nil { - return nil, err - } + return n.loadSyncInfo(startHeight) +} - return n, nil +func (n *Node) HeightInitialized() bool { + return n.startHeightInitialized } func (n *Node) Start(ctx context.Context) { @@ -184,6 +189,21 @@ func (n Node) MustGetBroadcaster() *broadcaster.Broadcaster { return n.broadcaster } +func (n Node) GetStatus() nodetypes.Status { + s := nodetypes.Status{} + if n.cfg.ProcessType != nodetypes.PROCESS_TYPE_ONLY_BROADCAST { + s.LastProcessedBlockHeight = n.GetHeight() + } + + if n.broadcaster != nil { + s.Broadcaster = nodetypes.BroadcasterStatus{ + PendingTxs: n.broadcaster.LenLocalPendingTx(), + Sequence: n.broadcaster.GetTxf().Sequence(), + } + } + return s +} + func (n Node) GetRPCClient() *rpcclient.RPCClient { return n.rpcClient } diff --git a/node/process.go b/node/process.go index bb219a3..2bd4827 100644 --- a/node/process.go +++ b/node/process.go @@ -8,12 +8,13 @@ import ( abcitypes "github.com/cometbft/cometbft/abci/types" rpccoretypes "github.com/cometbft/cometbft/rpc/core/types" nodetypes "github.com/initia-labs/opinit-bots-go/node/types" + "github.com/initia-labs/opinit-bots-go/types" "go.uber.org/zap" ) // blockProcessLooper fetches new blocks and processes them func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.BlockProcessType) error { - timer := time.NewTicker(nodetypes.POLLING_INTERVAL) + timer := time.NewTicker(types.PollingInterval(ctx)) defer timer.Stop() for { @@ -40,7 +41,7 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo select { case <-ctx.Done(): return nil - default: + case <-timer.C: } // TODO: may fetch blocks in batch block, blockResult, err := n.fetchNewBlock(ctx, int64(queryHeight)) @@ -50,7 +51,7 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo break } - err = n.handleNewBlock(block, blockResult, latestChainHeight) + err = n.handleNewBlock(ctx, block, blockResult, latestChainHeight) if err != nil { // TODO: handle error n.logger.Error("failed to handle new block", zap.String("error", err.Error())) @@ -67,7 +68,7 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo end = latestChainHeight } - blockBulk, err := n.rpcClient.QueryBlockBulk(start, end) + blockBulk, err := n.rpcClient.QueryBlockBulk(ctx, start, end) if err != nil { n.logger.Error("failed to fetch block bulk", zap.String("error", err.Error())) continue @@ -79,7 +80,7 @@ func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.Blo return nil default: } - err := n.rawBlockHandler(nodetypes.RawBlockArgs{ + err := n.rawBlockHandler(ctx, nodetypes.RawBlockArgs{ BlockHeight: i, BlockBytes: blockBulk[i-start], }) @@ -110,7 +111,7 @@ func (n *Node) fetchNewBlock(ctx context.Context, height int64) (block *rpccoret return block, blockResult, nil } -func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight uint64) error { +func (n *Node) handleNewBlock(ctx context.Context, block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight uint64) error { protoBlock, err := block.Block.ToProto() if err != nil { return err @@ -125,7 +126,7 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc } if n.beginBlockHandler != nil { - err := n.beginBlockHandler(nodetypes.BeginBlockArgs{ + err := n.beginBlockHandler(ctx, nodetypes.BeginBlockArgs{ BlockID: block.BlockID.Hash, Block: *protoBlock, LatestHeight: latestChainHeight, @@ -137,7 +138,7 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc for txIndex, tx := range block.Block.Txs { if n.txHandler != nil { - err := n.txHandler(nodetypes.TxHandlerArgs{ + err := n.txHandler(ctx, nodetypes.TxHandlerArgs{ BlockHeight: uint64(block.Block.Height), LatestHeight: latestChainHeight, TxIndex: uint64(txIndex), @@ -151,7 +152,7 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc if len(n.eventHandlers) != 0 { events := blockResult.TxsResults[txIndex].GetEvents() for eventIndex, event := range events { - err := n.handleEvent(uint64(block.Block.Height), latestChainHeight, event) + err := n.handleEvent(ctx, uint64(block.Block.Height), latestChainHeight, event) if err != nil { return fmt.Errorf("failed to handle event: tx_index: %d, event_index: %d; %w", txIndex, eventIndex, err) } @@ -160,14 +161,14 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc } for eventIndex, event := range blockResult.FinalizeBlockEvents { - err := n.handleEvent(uint64(block.Block.Height), latestChainHeight, event) + err := n.handleEvent(ctx, uint64(block.Block.Height), latestChainHeight, event) if err != nil { return fmt.Errorf("failed to handle event: finalize block, event_index: %d; %w", eventIndex, err) } } if n.endBlockHandler != nil { - err := n.endBlockHandler(nodetypes.EndBlockArgs{ + err := n.endBlockHandler(ctx, nodetypes.EndBlockArgs{ BlockID: block.BlockID.Hash, Block: *protoBlock, LatestHeight: latestChainHeight, @@ -179,13 +180,13 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc return nil } -func (n *Node) handleEvent(blockHeight uint64, latestHeight uint64, event abcitypes.Event) error { +func (n *Node) handleEvent(ctx context.Context, blockHeight uint64, latestHeight uint64, event abcitypes.Event) error { if n.eventHandlers[event.GetType()] == nil { return nil } n.logger.Debug("handle event", zap.Uint64("height", blockHeight), zap.String("type", event.GetType())) - return n.eventHandlers[event.Type](nodetypes.EventHandlerArgs{ + return n.eventHandlers[event.Type](ctx, nodetypes.EventHandlerArgs{ BlockHeight: blockHeight, LatestHeight: latestHeight, EventAttributes: event.GetAttributes(), @@ -198,7 +199,7 @@ func (n *Node) txChecker(ctx context.Context) error { return nil } - timer := time.NewTicker(nodetypes.POLLING_INTERVAL) + timer := time.NewTicker(types.PollingInterval(ctx)) defer timer.Stop() for { select { @@ -207,7 +208,7 @@ func (n *Node) txChecker(ctx context.Context) error { case <-timer.C: } - pendingTx, res, err := n.broadcaster.CheckPendingTx() + pendingTx, res, err := n.broadcaster.CheckPendingTx(ctx) if err != nil { return err } else if pendingTx == nil || res == nil { @@ -224,15 +225,15 @@ func (n *Node) txChecker(ctx context.Context) error { default: } - err := n.handleEvent(uint64(res.Height), 0, event) + err := n.handleEvent(ctx, uint64(res.Height), 0, event) if err != nil { - n.logger.Error("failed to handle event", zap.String("txHash", pendingTx.TxHash), zap.Int("event_index", eventIndex), zap.String("error", err.Error())) + n.logger.Error("failed to handle event", zap.String("tx_hash", pendingTx.TxHash), zap.Int("event_index", eventIndex), zap.String("error", err.Error())) break } } } - err = n.broadcaster.RemovePendingTx(res.Height, pendingTx.TxHash, pendingTx.Sequence) + err = n.broadcaster.RemovePendingTx(res.Height, pendingTx.TxHash, pendingTx.Sequence, pendingTx.MsgTypes) if err != nil { return err } diff --git a/node/rpcclient/client.go b/node/rpcclient/client.go index ec89082..755fdf7 100644 --- a/node/rpcclient/client.go +++ b/node/rpcclient/client.go @@ -17,11 +17,12 @@ import ( abci "github.com/cometbft/cometbft/abci/types" client2 "github.com/cometbft/cometbft/rpc/client" coretypes "github.com/cometbft/cometbft/rpc/core/types" + gogogrpc "github.com/cosmos/gogoproto/grpc" + "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/codec/types" legacyerrors "github.com/cosmos/cosmos-sdk/types/errors" grpctypes "github.com/cosmos/cosmos-sdk/types/grpc" - gogogrpc "github.com/cosmos/gogoproto/grpc" clienthttp "github.com/initia-labs/opinit-bots-go/client" ) @@ -167,10 +168,10 @@ func GetHeightFromMetadata(md metadata.MD) (int64, error) { // Canceling this context releases resources associated with it, so code should // call cancel as soon as the operations running in this [Context] complete: -func GetQueryContext(height uint64) (context.Context, context.CancelFunc) { +func GetQueryContext(ctx context.Context, height uint64) (context.Context, context.CancelFunc) { // TODO: configurable timeout timeout := 10 * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(ctx, timeout) strHeight := strconv.FormatUint(height, 10) ctx = metadata.AppendToOutgoingContext(ctx, grpctypes.GRPCBlockHeightHeader, strHeight) @@ -178,21 +179,21 @@ func GetQueryContext(height uint64) (context.Context, context.CancelFunc) { } // QueryRawCommit queries the raw commit at a given height. -func (q RPCClient) QueryRawCommit(height int64) ([]byte, error) { - ctx, cancel := GetQueryContext(uint64(height)) +func (q RPCClient) QueryRawCommit(ctx context.Context, height int64) ([]byte, error) { + ctx, cancel := GetQueryContext(ctx, uint64(height)) defer cancel() return q.RawCommit(ctx, &height) } // QueryBlockBulk queries blocks in bulk. -func (q RPCClient) QueryBlockBulk(start uint64, end uint64) ([][]byte, error) { - ctx, cancel := GetQueryContext(0) +func (q RPCClient) QueryBlockBulk(ctx context.Context, start uint64, end uint64) ([][]byte, error) { + ctx, cancel := GetQueryContext(ctx, 0) defer cancel() return q.BlockBulk(ctx, &start, &end) } -func (q RPCClient) QueryTx(txHash []byte) (*coretypes.ResultTx, error) { - ctx, cancel := GetQueryContext(0) +func (q RPCClient) QueryTx(ctx context.Context, txHash []byte) (*coretypes.ResultTx, error) { + ctx, cancel := GetQueryContext(ctx, 0) defer cancel() return q.Tx(ctx, txHash, false) } diff --git a/node/types/const.go b/node/types/const.go deleted file mode 100644 index 152e43d..0000000 --- a/node/types/const.go +++ /dev/null @@ -1,7 +0,0 @@ -package types - -import ( - "time" -) - -const POLLING_INTERVAL = 1 * time.Second diff --git a/node/types/handler.go b/node/types/handler.go index eb9f892..ab23f2e 100644 --- a/node/types/handler.go +++ b/node/types/handler.go @@ -1,6 +1,8 @@ package types import ( + "context" + abcitypes "github.com/cometbft/cometbft/abci/types" cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" comettypes "github.com/cometbft/cometbft/types" @@ -13,7 +15,7 @@ type EventHandlerArgs struct { EventAttributes []abcitypes.EventAttribute } -type EventHandlerFn func(EventHandlerArgs) error +type EventHandlerFn func(context.Context, EventHandlerArgs) error type TxHandlerArgs struct { BlockHeight uint64 @@ -22,7 +24,7 @@ type TxHandlerArgs struct { Tx comettypes.Tx } -type TxHandlerFn func(TxHandlerArgs) error +type TxHandlerFn func(context.Context, TxHandlerArgs) error type BeginBlockArgs struct { BlockID []byte @@ -30,7 +32,7 @@ type BeginBlockArgs struct { LatestHeight uint64 } -type BeginBlockHandlerFn func(BeginBlockArgs) error +type BeginBlockHandlerFn func(context.Context, BeginBlockArgs) error type EndBlockArgs struct { BlockID []byte @@ -38,11 +40,11 @@ type EndBlockArgs struct { LatestHeight uint64 } -type EndBlockHandlerFn func(EndBlockArgs) error +type EndBlockHandlerFn func(context.Context, EndBlockArgs) error type RawBlockArgs struct { BlockHeight uint64 BlockBytes []byte } -type RawBlockHandlerFn func(RawBlockArgs) error +type RawBlockHandlerFn func(context.Context, RawBlockArgs) error diff --git a/node/types/status.go b/node/types/status.go new file mode 100644 index 0000000..4812a41 --- /dev/null +++ b/node/types/status.go @@ -0,0 +1,11 @@ +package types + +type BroadcasterStatus struct { + PendingTxs int `json:"pending_txs"` + Sequence uint64 `json:"sequence"` +} + +type Status struct { + LastProcessedBlockHeight uint64 `json:"last_processed_block_height,omitempty"` + Broadcaster BroadcasterStatus `json:"broadcaster,omitempty"` +} diff --git a/types/const.go b/types/const.go deleted file mode 100644 index 546eeca..0000000 --- a/types/const.go +++ /dev/null @@ -1,7 +0,0 @@ -package types - -type contextKey string - -var ( - ContextKeyErrGrp = contextKey("ErrGrp") -) diff --git a/types/context.go b/types/context.go new file mode 100644 index 0000000..3f3aa55 --- /dev/null +++ b/types/context.go @@ -0,0 +1,36 @@ +package types + +import ( + "context" + "time" + + "golang.org/x/sync/errgroup" +) + +type contextKey string + +var ( + ContextKeyErrGrp = contextKey("ErrGrp") + ContextKeyPollingInterval = contextKey("PollingInterval") + ContextKeyTxTimeout = contextKey("TxTimeout") +) + +func WithErrGrp(ctx context.Context, errGrp *errgroup.Group) context.Context { + return context.WithValue(ctx, ContextKeyErrGrp, errGrp) +} + +func ErrGrp(ctx context.Context) *errgroup.Group { + return ctx.Value(ContextKeyErrGrp).(*errgroup.Group) +} + +func WithPollingInterval(ctx context.Context, interval time.Duration) context.Context { + return context.WithValue(ctx, ContextKeyPollingInterval, interval) +} + +func PollingInterval(ctx context.Context) time.Duration { + interval := ctx.Value(ContextKeyPollingInterval) + if interval == nil { + return 100 * time.Millisecond + } + return ctx.Value(ContextKeyPollingInterval).(time.Duration) +}