diff --git a/bot/bot.go b/bot/bot.go index 65850a5..7cd0978 100644 --- a/bot/bot.go +++ b/bot/bot.go @@ -74,7 +74,7 @@ func NewBot(name bottypes.BotType, logger *zap.Logger, homePath string, configNa return nil, err } server := server.NewServer() - return executor.NewExecutor(cfg, db, server, logger, appCodec, txConfig), nil + return executor.NewExecutor(cfg, db, server, logger, appCodec, txConfig, homePath), nil } return nil, errors.New("not providing bot name") diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..525e8ca --- /dev/null +++ b/client/client.go @@ -0,0 +1,821 @@ +// this is from "github.com/cometbft/cometbft/rpc/client" + +package http + +import ( + "context" + "errors" + "net/http" + "strings" + "time" + + "github.com/cometbft/cometbft/libs/bytes" + cmtjson "github.com/cometbft/cometbft/libs/json" + "github.com/cometbft/cometbft/libs/log" + cmtpubsub "github.com/cometbft/cometbft/libs/pubsub" + "github.com/cometbft/cometbft/libs/service" + cmtsync "github.com/cometbft/cometbft/libs/sync" + rpcclient "github.com/cometbft/cometbft/rpc/client" + ctypes "github.com/cometbft/cometbft/rpc/core/types" + jsonrpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" + "github.com/cometbft/cometbft/types" +) + +/* +HTTP is a Client implementation that communicates with a CometBFT node over +JSON RPC and WebSockets. + +This is the main implementation you probably want to use in production code. +There are other implementations when calling the CometBFT node in-process +(Local), or when you want to mock out the server for test code (mock). + +You can subscribe for any event published by CometBFT using Subscribe method. +Note delivery is best-effort. If you don't read events fast enough or network is +slow, CometBFT might cancel the subscription. The client will attempt to +resubscribe (you don't need to do anything). It will keep trying every second +indefinitely until successful. + +Request batching is available for JSON RPC requests over HTTP, which conforms to +the JSON RPC specification (https://www.jsonrpc.org/specification#batch). See +the example for more details. + +Example: + + c, err := New("http://192.168.1.10:26657", "/websocket") + if err != nil { + // handle error + } + + // call Start/Stop if you're subscribing to events + err = c.Start() + if err != nil { + // handle error + } + defer c.Stop() + + res, err := c.Status() + if err != nil { + // handle error + } + + // handle result +*/ +type HTTP struct { + remote string + rpc *jsonrpcclient.Client + + *baseRPCClient + *WSEvents +} + +// BatchHTTP provides the same interface as `HTTP`, but allows for batching of +// requests (as per https://www.jsonrpc.org/specification#batch). Do not +// instantiate directly - rather use the HTTP.NewBatch() method to create an +// instance of this struct. +// +// Batching of HTTP requests is thread-safe in the sense that multiple +// goroutines can each create their own batches and send them using the same +// HTTP client. Multiple goroutines could also enqueue transactions in a single +// batch, but ordering of transactions in the batch cannot be guaranteed in such +// an example. +type BatchHTTP struct { + rpcBatch *jsonrpcclient.RequestBatch + *baseRPCClient +} + +// rpcClient is an internal interface to which our RPC clients (batch and +// non-batch) must conform. Acts as an additional code-level sanity check to +// make sure the implementations stay coherent. +type rpcClient interface { + rpcclient.ABCIClient + rpcclient.HistoryClient + rpcclient.NetworkClient + rpcclient.SignClient + rpcclient.StatusClient +} + +// baseRPCClient implements the basic RPC method logic without the actual +// underlying RPC call functionality, which is provided by `caller`. +type baseRPCClient struct { + caller jsonrpcclient.Caller +} + +var ( + _ rpcClient = (*HTTP)(nil) + _ rpcClient = (*BatchHTTP)(nil) + _ rpcClient = (*baseRPCClient)(nil) +) + +//----------------------------------------------------------------------------- +// HTTP + +// New takes a remote endpoint in the form ://: and +// the websocket path (which always seems to be "/websocket") +// An error is returned on invalid remote. The function panics when remote is nil. +func New(remote, wsEndpoint string) (*HTTP, error) { + httpClient, err := jsonrpcclient.DefaultHTTPClient(remote) + if err != nil { + return nil, err + } + return NewWithClient(remote, wsEndpoint, httpClient) +} + +// Create timeout enabled http client +func NewWithTimeout(remote, wsEndpoint string, timeout uint) (*HTTP, error) { + httpClient, err := jsonrpcclient.DefaultHTTPClient(remote) + if err != nil { + return nil, err + } + httpClient.Timeout = time.Duration(timeout) * time.Second + return NewWithClient(remote, wsEndpoint, httpClient) +} + +// NewWithClient allows for setting a custom http client (See New). +// An error is returned on invalid remote. The function panics when remote is nil. +func NewWithClient(remote, wsEndpoint string, client *http.Client) (*HTTP, error) { + if client == nil { + panic("nil http.Client provided") + } + + rc, err := jsonrpcclient.NewWithHTTPClient(remote, client) + if err != nil { + return nil, err + } + + wsEvents, err := newWSEvents(remote, wsEndpoint) + if err != nil { + return nil, err + } + + httpClient := &HTTP{ + rpc: rc, + remote: remote, + baseRPCClient: &baseRPCClient{caller: rc}, + WSEvents: wsEvents, + } + + return httpClient, nil +} + +var _ rpcclient.Client = (*HTTP)(nil) + +// SetLogger sets a logger. +func (c *HTTP) SetLogger(l log.Logger) { + c.WSEvents.SetLogger(l) +} + +// Remote returns the remote network address in a string form. +func (c *HTTP) Remote() string { + return c.remote +} + +// NewBatch creates a new batch client for this HTTP client. +func (c *HTTP) NewBatch() *BatchHTTP { + rpcBatch := c.rpc.NewRequestBatch() + return &BatchHTTP{ + rpcBatch: rpcBatch, + baseRPCClient: &baseRPCClient{ + caller: rpcBatch, + }, + } +} + +//----------------------------------------------------------------------------- +// BatchHTTP + +// Send is a convenience function for an HTTP batch that will trigger the +// compilation of the batched requests and send them off using the client as a +// single request. On success, this returns a list of the deserialized results +// from each request in the sent batch. +func (b *BatchHTTP) Send(ctx context.Context) ([]interface{}, error) { + return b.rpcBatch.Send(ctx) +} + +// Clear will empty out this batch of requests and return the number of requests +// that were cleared out. +func (b *BatchHTTP) Clear() int { + return b.rpcBatch.Clear() +} + +// Count returns the number of enqueued requests waiting to be sent. +func (b *BatchHTTP) Count() int { + return b.rpcBatch.Count() +} + +//----------------------------------------------------------------------------- +// baseRPCClient + +func (c *baseRPCClient) Status(ctx context.Context) (*ctypes.ResultStatus, error) { + result := new(ctypes.ResultStatus) + _, err := c.caller.Call(ctx, "status", map[string]interface{}{}, result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (c *baseRPCClient) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { + result := new(ctypes.ResultABCIInfo) + _, err := c.caller.Call(ctx, "abci_info", map[string]interface{}{}, result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (c *baseRPCClient) ABCIQuery( + ctx context.Context, + path string, + data bytes.HexBytes, +) (*ctypes.ResultABCIQuery, error) { + return c.ABCIQueryWithOptions(ctx, path, data, rpcclient.DefaultABCIQueryOptions) +} + +func (c *baseRPCClient) ABCIQueryWithOptions( + ctx context.Context, + path string, + data bytes.HexBytes, + opts rpcclient.ABCIQueryOptions, +) (*ctypes.ResultABCIQuery, error) { + result := new(ctypes.ResultABCIQuery) + _, err := c.caller.Call(ctx, "abci_query", + map[string]interface{}{"path": path, "data": data, "height": opts.Height, "prove": opts.Prove}, + result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (c *baseRPCClient) BroadcastTxCommit( + ctx context.Context, + tx types.Tx, +) (*ctypes.ResultBroadcastTxCommit, error) { + result := new(ctypes.ResultBroadcastTxCommit) + _, err := c.caller.Call(ctx, "broadcast_tx_commit", map[string]interface{}{"tx": tx}, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) BroadcastTxAsync( + ctx context.Context, + tx types.Tx, +) (*ctypes.ResultBroadcastTx, error) { + return c.broadcastTX(ctx, "broadcast_tx_async", tx) +} + +func (c *baseRPCClient) BroadcastTxSync( + ctx context.Context, + tx types.Tx, +) (*ctypes.ResultBroadcastTx, error) { + return c.broadcastTX(ctx, "broadcast_tx_sync", tx) +} + +func (c *baseRPCClient) broadcastTX( + ctx context.Context, + route string, + tx types.Tx, +) (*ctypes.ResultBroadcastTx, error) { + result := new(ctypes.ResultBroadcastTx) + _, err := c.caller.Call(ctx, route, map[string]interface{}{"tx": tx}, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) UnconfirmedTxs( + ctx context.Context, + limit *int, +) (*ctypes.ResultUnconfirmedTxs, error) { + result := new(ctypes.ResultUnconfirmedTxs) + params := make(map[string]interface{}) + if limit != nil { + params["limit"] = limit + } + _, err := c.caller.Call(ctx, "unconfirmed_txs", params, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirmedTxs, error) { + result := new(ctypes.ResultUnconfirmedTxs) + _, err := c.caller.Call(ctx, "num_unconfirmed_txs", map[string]interface{}{}, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { + result := new(ctypes.ResultCheckTx) + _, err := c.caller.Call(ctx, "check_tx", map[string]interface{}{"tx": tx}, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { + result := new(ctypes.ResultNetInfo) + _, err := c.caller.Call(ctx, "net_info", map[string]interface{}{}, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) { + result := new(ctypes.ResultDumpConsensusState) + _, err := c.caller.Call(ctx, "dump_consensus_state", map[string]interface{}{}, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) { + result := new(ctypes.ResultConsensusState) + _, err := c.caller.Call(ctx, "consensus_state", map[string]interface{}{}, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) ConsensusParams( + ctx context.Context, + height *int64, +) (*ctypes.ResultConsensusParams, error) { + result := new(ctypes.ResultConsensusParams) + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + _, err := c.caller.Call(ctx, "consensus_params", params, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) Health(ctx context.Context) (*ctypes.ResultHealth, error) { + result := new(ctypes.ResultHealth) + _, err := c.caller.Call(ctx, "health", map[string]interface{}{}, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) BlockchainInfo( + ctx context.Context, + minHeight, + maxHeight int64, +) (*ctypes.ResultBlockchainInfo, error) { + result := new(ctypes.ResultBlockchainInfo) + _, err := c.caller.Call(ctx, "blockchain", + map[string]interface{}{"minHeight": minHeight, "maxHeight": maxHeight}, + result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) { + result := new(ctypes.ResultGenesis) + _, err := c.caller.Call(ctx, "genesis", map[string]interface{}{}, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) GenesisChunked(ctx context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { + result := new(ctypes.ResultGenesisChunk) + _, err := c.caller.Call(ctx, "genesis_chunked", map[string]interface{}{"chunk": id}, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { + result := new(ctypes.ResultBlock) + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + _, err := c.caller.Call(ctx, "block", params, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) BlockBulk(ctx context.Context, start *uint64, end *uint64) ([][]byte, error) { + result := new(ResultBlockBulk) + params := make(map[string]interface{}) + if start != nil { + params["start"] = start + } + if end != nil { + params["end"] = end + } + _, err := c.caller.Call(ctx, "block_bulk", params, result) + if err != nil { + return nil, err + } + return result.Blocks, nil +} + +func (c *baseRPCClient) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) { + result := new(ctypes.ResultBlock) + params := map[string]interface{}{ + "hash": hash, + } + _, err := c.caller.Call(ctx, "block_by_hash", params, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) BlockResults( + ctx context.Context, + height *int64, +) (*ctypes.ResultBlockResults, error) { + result := new(ctypes.ResultBlockResults) + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + _, err := c.caller.Call(ctx, "block_results", params, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) Header(ctx context.Context, height *int64) (*ctypes.ResultHeader, error) { + result := new(ctypes.ResultHeader) + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + _, err := c.caller.Call(ctx, "header", params, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) HeaderByHash(ctx context.Context, hash bytes.HexBytes) (*ctypes.ResultHeader, error) { + result := new(ctypes.ResultHeader) + params := map[string]interface{}{ + "hash": hash, + } + _, err := c.caller.Call(ctx, "header_by_hash", params, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) { + result := new(ctypes.ResultCommit) + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + _, err := c.caller.Call(ctx, "commit", params, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) RawCommit(ctx context.Context, height *int64) ([]byte, error) { + result := new(ResultRawCommit) + params := make(map[string]interface{}) + if height != nil { + params["height"] = height + } + _, err := c.caller.Call(ctx, "commit", params, result) + if err != nil { + return nil, err + } + return result.Commit, nil +} + +func (c *baseRPCClient) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + result := new(ctypes.ResultTx) + params := map[string]interface{}{ + "hash": hash, + "prove": prove, + } + _, err := c.caller.Call(ctx, "tx", params, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) TxSearch( + ctx context.Context, + query string, + prove bool, + page, + perPage *int, + orderBy string, +) (*ctypes.ResultTxSearch, error) { + result := new(ctypes.ResultTxSearch) + params := map[string]interface{}{ + "query": query, + "prove": prove, + "order_by": orderBy, + } + + if page != nil { + params["page"] = page + } + if perPage != nil { + params["per_page"] = perPage + } + + _, err := c.caller.Call(ctx, "tx_search", params, result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (c *baseRPCClient) BlockSearch( + ctx context.Context, + query string, + page, perPage *int, + orderBy string, +) (*ctypes.ResultBlockSearch, error) { + result := new(ctypes.ResultBlockSearch) + params := map[string]interface{}{ + "query": query, + "order_by": orderBy, + } + + if page != nil { + params["page"] = page + } + if perPage != nil { + params["per_page"] = perPage + } + + _, err := c.caller.Call(ctx, "block_search", params, result) + if err != nil { + return nil, err + } + + return result, nil +} + +func (c *baseRPCClient) Validators( + ctx context.Context, + height *int64, + page, + perPage *int, +) (*ctypes.ResultValidators, error) { + result := new(ctypes.ResultValidators) + params := make(map[string]interface{}) + if page != nil { + params["page"] = page + } + if perPage != nil { + params["per_page"] = perPage + } + if height != nil { + params["height"] = height + } + _, err := c.caller.Call(ctx, "validators", params, result) + if err != nil { + return nil, err + } + return result, nil +} + +func (c *baseRPCClient) BroadcastEvidence( + ctx context.Context, + ev types.Evidence, +) (*ctypes.ResultBroadcastEvidence, error) { + result := new(ctypes.ResultBroadcastEvidence) + _, err := c.caller.Call(ctx, "broadcast_evidence", map[string]interface{}{"evidence": ev}, result) + if err != nil { + return nil, err + } + return result, nil +} + +//----------------------------------------------------------------------------- +// WSEvents + +var errNotRunning = errors.New("client is not running. Use .Start() method to start") + +// WSEvents is a wrapper around WSClient, which implements EventsClient. +type WSEvents struct { + service.BaseService + remote string + endpoint string + ws *jsonrpcclient.WSClient + + mtx cmtsync.RWMutex + subscriptions map[string]chan ctypes.ResultEvent // query -> chan +} + +func newWSEvents(remote, endpoint string) (*WSEvents, error) { + w := &WSEvents{ + endpoint: endpoint, + remote: remote, + subscriptions: make(map[string]chan ctypes.ResultEvent), + } + w.BaseService = *service.NewBaseService(nil, "WSEvents", w) + + var err error + w.ws, err = jsonrpcclient.NewWS(w.remote, w.endpoint, jsonrpcclient.OnReconnect(func() { + // resubscribe immediately + w.redoSubscriptionsAfter(0 * time.Second) + })) + if err != nil { + return nil, err + } + w.ws.SetLogger(w.Logger) + + return w, nil +} + +// OnStart implements service.Service by starting WSClient and event loop. +func (w *WSEvents) OnStart() error { + if err := w.ws.Start(); err != nil { + return err + } + + go w.eventListener() + + return nil +} + +// OnStop implements service.Service by stopping WSClient. +func (w *WSEvents) OnStop() { + if err := w.ws.Stop(); err != nil { + w.Logger.Error("Can't stop ws client", "err", err) + } +} + +// Subscribe implements EventsClient by using WSClient to subscribe given +// subscriber to query. By default, returns a channel with cap=1. Error is +// returned if it fails to subscribe. +// +// Channel is never closed to prevent clients from seeing an erroneous event. +// +// It returns an error if WSEvents is not running. +func (w *WSEvents) Subscribe(ctx context.Context, _, query string, + outCapacity ...int, +) (out <-chan ctypes.ResultEvent, err error) { + if !w.IsRunning() { + return nil, errNotRunning + } + + if err := w.ws.Subscribe(ctx, query); err != nil { + return nil, err + } + + outCap := 1 + if len(outCapacity) > 0 { + outCap = outCapacity[0] + } + + outc := make(chan ctypes.ResultEvent, outCap) + w.mtx.Lock() + // subscriber param is ignored because CometBFT will override it with + // remote IP anyway. + w.subscriptions[query] = outc + w.mtx.Unlock() + + return outc, nil +} + +// Unsubscribe implements EventsClient by using WSClient to unsubscribe given +// subscriber from query. +// +// It returns an error if WSEvents is not running. +func (w *WSEvents) Unsubscribe(ctx context.Context, _, query string) error { + if !w.IsRunning() { + return errNotRunning + } + + if err := w.ws.Unsubscribe(ctx, query); err != nil { + return err + } + + w.mtx.Lock() + _, ok := w.subscriptions[query] + if ok { + delete(w.subscriptions, query) + } + w.mtx.Unlock() + + return nil +} + +// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe +// given subscriber from all the queries. +// +// It returns an error if WSEvents is not running. +func (w *WSEvents) UnsubscribeAll(ctx context.Context, _ string) error { + if !w.IsRunning() { + return errNotRunning + } + + if err := w.ws.UnsubscribeAll(ctx); err != nil { + return err + } + + w.mtx.Lock() + w.subscriptions = make(map[string]chan ctypes.ResultEvent) + w.mtx.Unlock() + + return nil +} + +// After being reconnected, it is necessary to redo subscription to server +// otherwise no data will be automatically received. +func (w *WSEvents) redoSubscriptionsAfter(d time.Duration) { + time.Sleep(d) + + w.mtx.RLock() + defer w.mtx.RUnlock() + for q := range w.subscriptions { + err := w.ws.Subscribe(context.Background(), q) + if err != nil { + w.Logger.Error("Failed to resubscribe", "err", err) + } + } +} + +func isErrAlreadySubscribed(err error) bool { + return strings.Contains(err.Error(), cmtpubsub.ErrAlreadySubscribed.Error()) +} + +func (w *WSEvents) eventListener() { + for { + select { + case resp, ok := <-w.ws.ResponsesCh: + if !ok { + return + } + + if resp.Error != nil { + w.Logger.Error("WS error", "err", resp.Error.Error()) + // Error can be ErrAlreadySubscribed or max client (subscriptions per + // client) reached or CometBFT exited. + // We can ignore ErrAlreadySubscribed, but need to retry in other + // cases. + if !isErrAlreadySubscribed(resp.Error) { + // Resubscribe after 1 second to give CometBFT time to restart (if + // crashed). + w.redoSubscriptionsAfter(1 * time.Second) + } + continue + } + + result := new(ctypes.ResultEvent) + err := cmtjson.Unmarshal(resp.Result, result) + if err != nil { + w.Logger.Error("failed to unmarshal response", "err", err) + continue + } + + w.mtx.RLock() + if out, ok := w.subscriptions[result.Query]; ok { + if cap(out) == 0 { + out <- *result + } else { + select { + case out <- *result: + default: + w.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query) + } + } + } + w.mtx.RUnlock() + case <-w.Quit(): + return + } + } +} diff --git a/client/types.go b/client/types.go new file mode 100644 index 0000000..22ef120 --- /dev/null +++ b/client/types.go @@ -0,0 +1,11 @@ +package http + +// Result of block bulk +type ResultBlockBulk struct { + Blocks [][]byte `json:"blocks"` +} + +// Result of raw commit bytes +type ResultRawCommit struct { + Commit []byte `json:"commit"` +} diff --git a/db/db.go b/db/db.go index e512a18..f67cd93 100644 --- a/db/db.go +++ b/db/db.go @@ -14,6 +14,7 @@ var _ types.DB = (*LevelDB)(nil) type LevelDB struct { db *leveldb.DB + path string prefix []byte } @@ -24,7 +25,8 @@ func NewDB(path string) (types.DB, error) { } return &LevelDB{ - db: db, + db: db, + path: path, }, nil } @@ -135,3 +137,7 @@ func (db LevelDB) PrefixedKey(key []byte) []byte { func (db LevelDB) UnprefixedKey(key []byte) []byte { return bytes.TrimPrefix(key, append(db.prefix, dbtypes.Splitter)) } + +func (db LevelDB) GetPath() string { + return db.path +} diff --git a/db/types/utils.go b/db/types/utils.go index b7cb948..11e7003 100644 --- a/db/types/utils.go +++ b/db/types/utils.go @@ -8,6 +8,19 @@ import ( const Splitter = '/' +func FromInt64(v int64) []byte { + return []byte(fmt.Sprintf("%d", v)) +} + +func ToInt64(v []byte) int64 { + data, err := strconv.ParseInt(string(v), 10, 64) + if err != nil { + // must not happen + panic(err) + } + return data +} + func FromUint64(v uint64) []byte { return []byte(fmt.Sprintf("%d", v)) } diff --git a/executor/batch/batch.go b/executor/batch/batch.go new file mode 100644 index 0000000..79b4587 --- /dev/null +++ b/executor/batch/batch.go @@ -0,0 +1,170 @@ +package batch + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "sync" + "time" + + "cosmossdk.io/core/address" + opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" + ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" + executortypes "github.com/initia-labs/opinit-bots-go/executor/types" + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" + "github.com/initia-labs/opinit-bots-go/types" + "go.uber.org/zap" + + "github.com/initia-labs/opinit-bots-go/node" + + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/codec" + + dbtypes "github.com/initia-labs/opinit-bots-go/db/types" +) + +type hostNode interface { + QueryBatchInfos() (*ophosttypes.QueryBatchInfosResponse, error) +} + +type compressionFunc interface { + Write([]byte) (int, error) + Reset(io.Writer) + Close() error +} + +var SubmissionKey = []byte("submission_time") + +type BatchSubmitter struct { + version uint8 + + node *node.Node + host hostNode + da executortypes.DANode + + bridgeInfo opchildtypes.BridgeInfo + + cfg nodetypes.NodeConfig + batchCfg executortypes.BatchConfig + db types.DB + logger *zap.Logger + + cdc codec.Codec + ac address.Codec + + opchildQueryClient opchildtypes.QueryClient + + batchInfoMu *sync.Mutex + batchInfos []ophosttypes.BatchInfoWithOutput + batchWriter compressionFunc + batchFile *os.File + batchHeader *executortypes.BatchHeader + + processedMsgs []nodetypes.ProcessedMsgs + homePath string + + lastSubmissionTime time.Time +} + +func NewBatchSubmitter(version uint8, cfg nodetypes.NodeConfig, batchCfg executortypes.BatchConfig, db types.DB, logger *zap.Logger, cdc codec.Codec, txConfig client.TxConfig, homePath string) *BatchSubmitter { + node, err := node.NewNode(cfg, db, logger, cdc, txConfig) + if err != nil { + panic(err) + } + + ch := &BatchSubmitter{ + version: version, + + node: node, + + cfg: cfg, + batchCfg: batchCfg, + db: db, + logger: logger, + + cdc: cdc, + ac: cdc.InterfaceRegistry().SigningContext().AddressCodec(), + + opchildQueryClient: opchildtypes.NewQueryClient(node), + + batchInfoMu: &sync.Mutex{}, + + processedMsgs: make([]nodetypes.ProcessedMsgs, 0), + homePath: homePath, + } + return ch +} + +func (bs *BatchSubmitter) Initialize(host hostNode, da executortypes.DANode, bridgeInfo opchildtypes.BridgeInfo) error { + bs.host = host + bs.bridgeInfo = bridgeInfo + + res, err := bs.host.QueryBatchInfos() + if err != nil { + return err + } + bs.batchInfos = res.BatchInfos + if len(bs.batchInfos) == 0 { + return errors.New("no batch info") + } + for _, batchInfo := range bs.batchInfos { + if len(bs.batchInfos) == 1 || batchInfo.Output.L2BlockNumber >= bs.node.GetHeight() { + break + } + bs.DequeueBatchInfo() + } + // TODO: set da and key that match the current batch info + bs.da = da + if !bs.da.HasKey() { + return errors.New("da has no key") + } + + bs.batchFile, err = os.OpenFile(bs.homePath+"/batch", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) + if err != nil { + return err + } + + err = bs.LoadSubmissionInfo() + if err != nil { + return err + } + + bs.node.RegisterRawBlockHandler(bs.rawBlockHandler) + return nil +} + +func (bs *BatchSubmitter) Start(ctx context.Context, errCh chan error) { + defer func() { + if r := recover(); r != nil { + bs.logger.Error("batch panic", zap.Any("recover", r)) + errCh <- fmt.Errorf("batch panic: %v", r) + } + }() + + bs.node.Start(ctx, errCh, nodetypes.PROCESS_TYPE_RAW) +} + +func (bs *BatchSubmitter) SetBridgeInfo(bridgeInfo opchildtypes.BridgeInfo) { + bs.bridgeInfo = bridgeInfo +} + +func (bs *BatchSubmitter) LoadSubmissionInfo() error { + val, err := bs.db.Get(SubmissionKey) + if err != nil { + if err == dbtypes.ErrNotFound { + return nil + } + return err + } + bs.lastSubmissionTime = time.Unix(0, dbtypes.ToInt64(val)) + return nil +} + +func (bs *BatchSubmitter) SubmissionInfoToRawKV(timestamp int64) types.RawKV { + return types.RawKV{ + Key: bs.db.PrefixedKey(SubmissionKey), + Value: dbtypes.FromInt64(timestamp), + } +} diff --git a/executor/batch/handler.go b/executor/batch/handler.go new file mode 100644 index 0000000..c05438b --- /dev/null +++ b/executor/batch/handler.go @@ -0,0 +1,303 @@ +package batch + +import ( + "compress/gzip" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "time" + + "github.com/pkg/errors" + + sdk "github.com/cosmos/cosmos-sdk/types" + executortypes "github.com/initia-labs/opinit-bots-go/executor/types" + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" + "github.com/initia-labs/opinit-bots-go/types" + + cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" + "github.com/cosmos/gogoproto/proto" + ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" +) + +func (bs *BatchSubmitter) rawBlockHandler(args nodetypes.RawBlockArgs) error { + if len(bs.processedMsgs) != 0 { + panic("must not happen, msgQueue should be empty") + } + + pbb := new(cmtproto.Block) + err := proto.Unmarshal(args.BlockBytes, pbb) + if err != nil { + return errors.Wrap(err, "failed to unmarshal block") + } + + err = bs.prepareBatch(args.BlockHeight, pbb.Header.Time) + if err != nil { + return errors.Wrap(err, "failed to prepare batch") + } + + err = bs.handleBatch(args.BlockBytes) + if err != nil { + return errors.Wrap(err, "failed to handle batch") + } + + err = bs.checkBatch(args.BlockHeight, pbb.Header.Time) + if err != nil { + return errors.Wrap(err, "failed to check batch") + } + + // store the processed state into db with batch operation + batchKVs := make([]types.RawKV, 0) + batchKVs = append(batchKVs, bs.node.SyncInfoToRawKV(args.BlockHeight)) + batchMsgKVs, err := bs.da.ProcessedMsgsToRawKV(bs.processedMsgs, false) + if err != nil { + return errors.Wrap(err, "failed to convert processed messages to raw key value") + } + batchKVs = append(batchKVs, batchMsgKVs...) + if len(batchMsgKVs) > 0 { + batchKVs = append(batchKVs, bs.SubmissionInfoToRawKV(pbb.Header.Time.UnixNano())) + } + err = bs.db.RawBatchSet(batchKVs...) + if err != nil { + return errors.Wrap(err, "failed to set raw batch") + } + + // broadcast processed messages + for _, processedMsg := range bs.processedMsgs { + bs.da.BroadcastMsgs(processedMsg) + } + + // clear processed messages + bs.processedMsgs = bs.processedMsgs[:0] + + return nil +} + +func (bs *BatchSubmitter) prepareBatch(blockHeight uint64, blockTime time.Time) error { + + // check whether the requested block height is reached to the l2 block number of the next batch info. + if nextBatchInfo := bs.NextBatchInfo(); nextBatchInfo != nil && nextBatchInfo.Output.L2BlockNumber < blockHeight { + + // if the next batch info is reached, finalize the current batch and update the batch info. + if bs.batchWriter != nil { + err := bs.batchWriter.Close() + if err != nil { + return errors.Wrap(err, "failed to close batch writer") + } + } + err := bs.batchFile.Truncate(0) + if err != nil { + return errors.Wrap(err, "failed to truncate batch file") + } + _, err = bs.batchFile.Seek(0, 0) + if err != nil { + return errors.Wrap(err, "failed to seek batch file") + } + + // save sync info + err = bs.node.SaveSyncInfo(nextBatchInfo.Output.L2BlockNumber) + if err != nil { + return errors.Wrap(err, "failed to save sync info") + } + + // set last processed block height to l2 block number + bs.node.SetSyncInfo(nextBatchInfo.Output.L2BlockNumber) + bs.DequeueBatchInfo() + + // error will restart block process from nextBatchInfo.Output.L2BlockNumber + 1 + return fmt.Errorf("batch info updated: reset from %d", nextBatchInfo.Output.L2BlockNumber) + } + + if bs.batchHeader != nil { + // if the batch header end is not set, it means the batch is not finalized yet. + if bs.batchHeader.End == 0 { + return nil + } + + err := bs.finalizeBatch(blockHeight) + if err != nil { + return errors.Wrap(err, "failed to finalize batch") + } + + // update last submission time + bs.lastSubmissionTime = blockTime + } + + // reset batch header + var err error + bs.batchHeader = &executortypes.BatchHeader{} + + // linux command gzip use level 6 as default + bs.batchWriter, err = gzip.NewWriterLevel(bs.batchFile, 6) + if err != nil { + return err + } + + return nil +} + +// write block bytes to batch file +func (bs *BatchSubmitter) handleBatch(blockBytes []byte) error { + encodedBlockBytes := base64.StdEncoding.EncodeToString(blockBytes) + _, err := bs.batchWriter.Write(append([]byte(encodedBlockBytes), ',')) + if err != nil { + return err + } + return nil +} + +// finalize batch and create batch messages +func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error { + + // write last block's commit to batch file + rawCommit, err := bs.node.QueryRawCommit(int64(blockHeight)) + if err != nil { + return errors.Wrap(err, "failed to query raw commit") + } + encodedRawCommit := base64.StdEncoding.EncodeToString(rawCommit) + _, err = bs.batchWriter.Write([]byte(encodedRawCommit)) + if err != nil { + return errors.Wrap(err, "failed to write raw commit") + } + err = bs.batchWriter.Close() + if err != nil { + return errors.Wrap(err, "failed to close batch writer") + } + + batchBuffer := make([]byte, bs.batchCfg.MaxChunkSize) + checksums := make([][]byte, 0) + + // room for batch header + bs.processedMsgs = append(bs.processedMsgs, nodetypes.ProcessedMsgs{ + Timestamp: time.Now().UnixNano(), + Save: true, + }) + + for offset := int64(0); ; offset += int64(bs.batchCfg.MaxChunkSize) { + readLength, err := bs.batchFile.ReadAt(batchBuffer, offset) + if err != nil && err != io.EOF { + return err + } else if readLength == 0 { + break + } + + // trim the buffer to the actual read length + batchBuffer := batchBuffer[:readLength] + msg, err := bs.createBatchMsg(batchBuffer) + if err != nil { + return err + } + bs.processedMsgs = append(bs.processedMsgs, nodetypes.ProcessedMsgs{ + Msgs: []sdk.Msg{msg}, + Timestamp: time.Now().UnixNano(), + Save: true, + }) + checksum := sha256.Sum256(batchBuffer) + checksums = append(checksums, checksum[:]) + if int64(readLength) < bs.batchCfg.MaxChunkSize { + break + } + } + + // update batch header + bs.batchHeader.Chunks = checksums + headerBytes, err := json.Marshal(bs.batchHeader) + if err != nil { + return err + } + msg, err := bs.createBatchMsg(headerBytes) + if err != nil { + return err + } + bs.processedMsgs[0].Msgs = []sdk.Msg{msg} + + // reset batch file + err = bs.batchFile.Truncate(0) + if err != nil { + return err + } + _, err = bs.batchFile.Seek(0, 0) + if err != nil { + return err + } + + return nil +} + +func (bs *BatchSubmitter) checkBatch(blockHeight uint64, blockTime time.Time) error { + info, err := bs.batchFile.Stat() + if err != nil { + return errors.Wrap(err, "failed to get batch file stat") + } + + // if the block time is after the last submission time + submission interval * 2/3 + // or the block time is after the last submission time + max submission time + // or the batch file size is greater than (max chunks - 1) * max chunk size + // then finalize the batch + if blockTime.After(bs.lastSubmissionTime.Add(bs.bridgeInfo.BridgeConfig.SubmissionInterval*2/3)) || + blockTime.After(bs.lastSubmissionTime.Add(time.Duration(bs.batchCfg.MaxSubmissionTime)*time.Second)) || + info.Size() > (bs.batchCfg.MaxChunks-1)*bs.batchCfg.MaxChunkSize { + + // finalize the batch + bs.batchHeader.End = blockHeight + } + + return nil +} + +// TODO: support celestia +func (bs *BatchSubmitter) createBatchMsg(batchBytes []byte) (sdk.Msg, error) { + submitter, err := bs.da.GetAddressStr() + if err != nil { + return nil, err + } + + return ophosttypes.NewMsgRecordBatch( + submitter, + bs.bridgeInfo.BridgeId, + batchBytes, + ), nil +} + +// UpdateBatchInfo appends the batch info with the given chain, submitter, output index, and l2 block number +func (bs *BatchSubmitter) UpdateBatchInfo(chain string, submitter string, outputIndex uint64, l2BlockNumber uint64) { + bs.batchInfoMu.Lock() + defer bs.batchInfoMu.Unlock() + + bs.batchInfos = append(bs.batchInfos, ophosttypes.BatchInfoWithOutput{ + BatchInfo: ophosttypes.BatchInfo{ + Chain: chain, + Submitter: submitter, + }, + Output: ophosttypes.Output{ + L2BlockNumber: l2BlockNumber, + }, + }) +} + +// BatchInfo returns the current batch info +func (bs *BatchSubmitter) BatchInfo() *ophosttypes.BatchInfoWithOutput { + bs.batchInfoMu.Lock() + defer bs.batchInfoMu.Unlock() + + return &bs.batchInfos[0] +} + +// NextBatchInfo returns the next batch info in the queue +func (bs *BatchSubmitter) NextBatchInfo() *ophosttypes.BatchInfoWithOutput { + bs.batchInfoMu.Lock() + defer bs.batchInfoMu.Unlock() + if len(bs.batchInfos) == 1 { + return nil + } + return &bs.batchInfos[1] +} + +// DequeueBatchInfo removes the first batch info from the queue +func (bs *BatchSubmitter) DequeueBatchInfo() { + bs.batchInfoMu.Lock() + defer bs.batchInfoMu.Unlock() + + bs.batchInfos = bs.batchInfos[1:] +} diff --git a/executor/child/child.go b/executor/child/child.go index 9391418..08eea90 100644 --- a/executor/child/child.go +++ b/executor/child/child.go @@ -3,6 +3,7 @@ package child import ( "context" "fmt" + "io" "time" "cosmossdk.io/core/address" @@ -32,6 +33,12 @@ type hostNode interface { QueryOutput(uint64) (*ophosttypes.QueryOutputProposalResponse, error) } +type compressionFunc interface { + Write([]byte) (int, error) + Reset(io.Writer) + Close() error +} + type Child struct { version uint8 @@ -92,11 +99,12 @@ func NewChild( return ch } -func (ch *Child) Initialize(host hostNode, bridgeInfo opchildtypes.BridgeInfo) { +func (ch *Child) Initialize(host hostNode, bridgeInfo opchildtypes.BridgeInfo) error { ch.host = host ch.bridgeInfo = bridgeInfo ch.registerHandlers() + return nil } func (ch *Child) Start(ctx context.Context, errCh chan error) { @@ -107,7 +115,7 @@ func (ch *Child) Start(ctx context.Context, errCh chan error) { } }() - ch.node.Start(ctx, errCh) + ch.node.Start(ctx, errCh, nodetypes.PROCESS_TYPE_DEFAULT) } func (ch *Child) registerHandlers() { diff --git a/executor/child/handler.go b/executor/child/handler.go index d78ccac..0f64670 100644 --- a/executor/child/handler.go +++ b/executor/child/handler.go @@ -8,7 +8,7 @@ import ( ) func (ch *Child) beginBlockHandler(args nodetypes.BeginBlockArgs) (err error) { - blockHeight := uint64(args.BlockHeader.Height) + blockHeight := uint64(args.Block.Header.Height) // just to make sure that childMsgQueue is empty if blockHeight == args.LatestHeight && len(ch.msgQueue) != 0 && len(ch.processedMsgs) != 0 { panic("must not happen, msgQueue should be empty") @@ -27,9 +27,9 @@ func (ch *Child) beginBlockHandler(args nodetypes.BeginBlockArgs) (err error) { } func (ch *Child) endBlockHandler(args nodetypes.EndBlockArgs) error { - blockHeight := uint64(args.BlockHeader.Height) + blockHeight := uint64(args.Block.Header.Height) batchKVs := make([]types.RawKV, 0) - treeKVs, storageRoot, err := ch.handleTree(blockHeight, uint64(args.LatestHeight), args.BlockID, args.BlockHeader) + treeKVs, storageRoot, err := ch.handleTree(blockHeight, uint64(args.LatestHeight), args.BlockID, args.Block.Header) if err != nil { return err } diff --git a/executor/child/withdraw.go b/executor/child/withdraw.go index 04be9a7..a8a4d8e 100644 --- a/executor/child/withdraw.go +++ b/executor/child/withdraw.go @@ -14,10 +14,9 @@ import ( "github.com/initia-labs/opinit-bots-go/types" "go.uber.org/zap" + cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" dbtypes "github.com/initia-labs/opinit-bots-go/db/types" nodetypes "github.com/initia-labs/opinit-bots-go/node/types" - - comettpyes "github.com/cometbft/cometbft/types" ) func (ch *Child) initiateWithdrawalHandler(args nodetypes.EventHandlerArgs) error { @@ -130,7 +129,7 @@ func (ch *Child) prepareOutput() error { return nil } -func (ch *Child) handleTree(blockHeight uint64, latestHeight uint64, blockId []byte, blockHeader comettpyes.Header) (kvs []types.RawKV, storageRoot []byte, err error) { +func (ch *Child) handleTree(blockHeight uint64, latestHeight uint64, blockId []byte, blockHeader cmtproto.Header) (kvs []types.RawKV, storageRoot []byte, err error) { // finalize working tree if we are fully synced or block time is over next output time if ch.finalizingBlockHeight == blockHeight || (ch.finalizingBlockHeight == 0 && diff --git a/executor/executor.go b/executor/executor.go index 1a47aa0..5ad3265 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -6,6 +6,7 @@ import ( "strconv" "github.com/gofiber/fiber/v2" + "github.com/initia-labs/opinit-bots-go/executor/batch" "github.com/initia-labs/opinit-bots-go/executor/child" "github.com/initia-labs/opinit-bots-go/executor/host" "github.com/initia-labs/opinit-bots-go/server" @@ -27,6 +28,7 @@ var _ bottypes.Bot = &Executor{} type Executor struct { host *host.Host child *child.Child + batch *batch.BatchSubmitter cfg *executortypes.Config db types.DB @@ -34,7 +36,7 @@ type Executor struct { logger *zap.Logger } -func NewExecutor(cfg *executortypes.Config, db types.DB, sv *server.Server, logger *zap.Logger, cdc codec.Codec, txConfig client.TxConfig) *Executor { +func NewExecutor(cfg *executortypes.Config, db types.DB, sv *server.Server, logger *zap.Logger, cdc codec.Codec, txConfig client.TxConfig, homePath string) *Executor { err := cfg.Validate() if err != nil { panic(err) @@ -51,6 +53,11 @@ func NewExecutor(cfg *executortypes.Config, db types.DB, sv *server.Server, logg db.WithPrefix([]byte(executortypes.ChildNodeName)), logger.Named(executortypes.ChildNodeName), cdc, txConfig, ), + batch: batch.NewBatchSubmitter( + cfg.Version, cfg.DANodeConfig(), cfg.BatchConfig(), + db.WithPrefix([]byte(executortypes.BatchNodeName)), + logger.Named(executortypes.BatchNodeName), cdc, txConfig, homePath, + ), cfg: cfg, db: db, @@ -72,8 +79,23 @@ func NewExecutor(cfg *executortypes.Config, db types.DB, sv *server.Server, logg zap.Duration("submission_interval", bridgeInfo.BridgeConfig.SubmissionInterval), ) - executor.child.Initialize(executor.host, bridgeInfo) - err = executor.host.Initialize(executor.child, int64(bridgeInfo.BridgeId)) + da := executor.host + // if cfg.Batch.DANode.ChainID != cfg.HostNode.ChainID { + // switch cfg.Batch.DANode.ChainID { + // case "celestia": + // da = celestia.NewCelestia() + // } + // } + + err = executor.host.Initialize(executor.child, executor.batch, int64(bridgeInfo.BridgeId)) + if err != nil { + panic(err) + } + err = executor.child.Initialize(executor.host, bridgeInfo) + if err != nil { + panic(err) + } + err = executor.batch.Initialize(executor.host, da, bridgeInfo) if err != nil { panic(err) } @@ -91,10 +113,12 @@ func (ex *Executor) Start(cmdCtx context.Context) error { hostCtx, hostDone := context.WithCancel(cmdCtx) childCtx, childDone := context.WithCancel(cmdCtx) + batchCtx, batchDone := context.WithCancel(cmdCtx) errCh := make(chan error, 3) ex.host.Start(hostCtx, errCh) ex.child.Start(childCtx, errCh) + ex.batch.Start(batchCtx, errCh) go func() { err := ex.server.Start(ex.cfg.ListenAddress) @@ -115,6 +139,9 @@ func (ex *Executor) Start(cmdCtx context.Context) error { ex.logger.Debug("executor shutdown", zap.String("state", "wait"), zap.String("target", "child")) childDone() + ex.logger.Debug("executor shutdown", zap.String("state", "wait"), zap.String("target", "batch")) + batchDone() + ex.logger.Info("executor shutdown completed") return err } diff --git a/executor/host/batch.go b/executor/host/batch.go new file mode 100644 index 0000000..6b4c2ea --- /dev/null +++ b/executor/host/batch.go @@ -0,0 +1,67 @@ +package host + +import ( + "strconv" + + ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" + "go.uber.org/zap" +) + +func (h *Host) recordBatchHandler(args nodetypes.EventHandlerArgs) error { + var submitter string + for _, attr := range args.EventAttributes { + switch attr.Key { + case ophosttypes.AttributeKeySubmitter: + submitter = attr.Value + hostAddress, err := h.GetAddressStr() + if err != nil { + return nil + } + if submitter != hostAddress { + return nil + } + } + } + h.logger.Info("record batch", + zap.String("submitter", submitter), + ) + return nil +} + +func (h *Host) updateBatchInfoHandler(args nodetypes.EventHandlerArgs) error { + var bridgeId uint64 + var submitter, chain string + var outputIndex, l2BlockNumber uint64 + var err error + for _, attr := range args.EventAttributes { + switch attr.Key { + case ophosttypes.AttributeKeyBridgeId: + bridgeId, err = strconv.ParseUint(attr.Value, 10, 64) + if err != nil { + return err + } + if bridgeId != uint64(h.bridgeId) { + // pass other bridge deposit event + return nil + } + case ophosttypes.AttributeKeyBatchChain: + chain = attr.Value + case ophosttypes.AttributeKeyBatchSubmitter: + submitter = attr.Value + case ophosttypes.AttributeKeyFinalizedOutputIndex: + outputIndex, err = strconv.ParseUint(attr.Value, 10, 64) + if err != nil { + return err + } + case ophosttypes.AttributeKeyFinalizedL2BlockNumber: + l2BlockNumber, err = strconv.ParseUint(attr.Value, 10, 64) + if err != nil { + return err + } + } + } + + h.batch.UpdateBatchInfo(chain, submitter, outputIndex, l2BlockNumber) + return nil +} diff --git a/executor/host/deposit.go b/executor/host/deposit.go index 89a0b95..344f78d 100644 --- a/executor/host/deposit.go +++ b/executor/host/deposit.go @@ -14,7 +14,7 @@ import ( ) func (h *Host) initiateDepositHandler(args nodetypes.EventHandlerArgs) error { - var bridgeId int64 + var bridgeId uint64 var l1Sequence uint64 var from, to, l1Denom, l2Denom, amount string var data []byte @@ -23,11 +23,11 @@ func (h *Host) initiateDepositHandler(args nodetypes.EventHandlerArgs) error { for _, attr := range args.EventAttributes { switch attr.Key { case ophosttypes.AttributeKeyBridgeId: - bridgeId, err = strconv.ParseInt(attr.Value, 10, 64) + bridgeId, err = strconv.ParseUint(attr.Value, 10, 64) if err != nil { return err } - if bridgeId != h.bridgeId { + if bridgeId != uint64(h.bridgeId) { // pass other bridge deposit event return nil } diff --git a/executor/host/handler.go b/executor/host/handler.go index fb72fb7..04a1c76 100644 --- a/executor/host/handler.go +++ b/executor/host/handler.go @@ -10,7 +10,7 @@ import ( ) func (h *Host) beginBlockHandler(args nodetypes.BeginBlockArgs) error { - blockHeight := uint64(args.BlockHeader.Height) + blockHeight := uint64(args.Block.Header.Height) // just to make sure that childMsgQueue is empty if blockHeight == args.LatestHeight && len(h.msgQueue) != 0 && len(h.processedMsgs) != 0 { panic("must not happen, msgQueue should be empty") @@ -21,7 +21,7 @@ func (h *Host) beginBlockHandler(args nodetypes.BeginBlockArgs) error { func (h *Host) endBlockHandler(args nodetypes.EndBlockArgs) error { // temporary 50 limit for msg queue // collect more msgs if block height is not latest - blockHeight := uint64(args.BlockHeader.Height) + blockHeight := uint64(args.Block.Header.Height) if blockHeight != args.LatestHeight && len(h.msgQueue) > 0 && len(h.msgQueue) <= 10 { return nil } diff --git a/executor/host/host.go b/executor/host/host.go index 9577bd6..59f1f5c 100644 --- a/executor/host/host.go +++ b/executor/host/host.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" + executortypes "github.com/initia-labs/opinit-bots-go/executor/types" "github.com/initia-labs/opinit-bots-go/node" nodetypes "github.com/initia-labs/opinit-bots-go/node/types" "github.com/initia-labs/opinit-bots-go/types" @@ -26,12 +27,19 @@ type childNode interface { QueryNextL1Sequence() (uint64, error) } +type batchNode interface { + UpdateBatchInfo(chain string, submitter string, outputIndex uint64, l2BlockNumber uint64) +} + +var _ executortypes.DANode = &Host{} + type Host struct { version uint8 relayOracle bool node *node.Node child childNode + batch batchNode bridgeId int64 initialL1Sequence uint64 @@ -79,8 +87,9 @@ func NewHost( return h } -func (h *Host) Initialize(child childNode, bridgeId int64) (err error) { +func (h *Host) Initialize(child childNode, batch batchNode, bridgeId int64) (err error) { h.child = child + h.batch = batch h.bridgeId = bridgeId h.initialL1Sequence, err = h.child.QueryNextL1Sequence() @@ -101,7 +110,7 @@ func (h *Host) Start(ctx context.Context, errCh chan error) { } }() - h.node.Start(ctx, errCh) + h.node.Start(ctx, errCh, nodetypes.PROCESS_TYPE_DEFAULT) } func (h *Host) registerHandlers() { @@ -110,6 +119,7 @@ func (h *Host) registerHandlers() { h.node.RegisterEventHandler(ophosttypes.EventTypeInitiateTokenDeposit, h.initiateDepositHandler) h.node.RegisterEventHandler(ophosttypes.EventTypeProposeOutput, h.proposeOutputHandler) h.node.RegisterEventHandler(ophosttypes.EventTypeFinalizeTokenWithdrawal, h.finalizeWithdrawalHandler) + h.node.RegisterEventHandler(ophosttypes.EventTypeRecordBatch, h.recordBatchHandler) h.node.RegisterEndBlockHandler(h.endBlockHandler) } diff --git a/executor/host/query.go b/executor/host/query.go index f161076..e704f22 100644 --- a/executor/host/query.go +++ b/executor/host/query.go @@ -47,3 +47,12 @@ func (h Host) QueryOutput(outputIndex uint64) (*ophosttypes.QueryOutputProposalR return h.ophostQueryClient.OutputProposal(ctx, req) } + +func (h Host) QueryBatchInfos() (*ophosttypes.QueryBatchInfosResponse, error) { + req := &ophosttypes.QueryBatchInfosRequest{ + BridgeId: uint64(h.bridgeId), + } + ctx, cancel := node.GetQueryContext(0) + defer cancel() + return h.ophostQueryClient.BatchInfos(ctx, req) +} diff --git a/executor/host/withdraw.go b/executor/host/withdraw.go index 3967aa2..eb4c40a 100644 --- a/executor/host/withdraw.go +++ b/executor/host/withdraw.go @@ -69,6 +69,10 @@ func (h *Host) finalizeWithdrawalHandler(args nodetypes.EventHandlerArgs) error if err != nil { return err } + if bridgeId != uint64(h.bridgeId) { + // pass other bridge deposit event + return nil + } case ophosttypes.AttributeKeyOutputIndex: outputIndex, err = strconv.ParseUint(attr.Value, 10, 64) if err != nil { diff --git a/executor/types/batch.go b/executor/types/batch.go new file mode 100644 index 0000000..9ea92ed --- /dev/null +++ b/executor/types/batch.go @@ -0,0 +1,18 @@ +package types + +import ( + nodetypes "github.com/initia-labs/opinit-bots-go/node/types" + "github.com/initia-labs/opinit-bots-go/types" +) + +type DANode interface { + GetAddressStr() (string, error) + HasKey() bool + BroadcastMsgs(nodetypes.ProcessedMsgs) + ProcessedMsgsToRawKV(processedMsgs []nodetypes.ProcessedMsgs, delete bool) ([]types.RawKV, error) +} + +type BatchHeader struct { + End uint64 `json:"end"` + Chunks [][]byte `json:"chunks"` +} diff --git a/executor/types/config.go b/executor/types/config.go index c8ec5fd..ae67e76 100644 --- a/executor/types/config.go +++ b/executor/types/config.go @@ -15,12 +15,15 @@ type Config struct { L1RPCAddress string `json:"l1_rpc_address"` L2RPCAddress string `json:"l2_rpc_address"` + DARPCAddress string `json:"da_rpc_address"` L1GasPrice string `json:"l1_gas_price"` L2GasPrice string `json:"l2_gas_price"` + DAGasPrice string `json:"da_gas_price"` L1ChainID string `json:"l1_chain_id"` L2ChainID string `json:"l2_chain_id"` + DAChainID string `json:"da_chain_id"` // OutputSubmitterMnemonic is the mnemonic phrase for the output submitter, // which is used to relay the output transaction from l2 to l1. @@ -34,8 +37,21 @@ type Config struct { // If you don't want to use the bridge executor feature, you can leave it empty. BridgeExecutorMnemonic string `json:"bridge_executor_mnemonic"` + // BatchSubmitterMnemonic is the mnemonic phrase for the batch submitter, + // which is used to relay the batch of blocks from l2 to da. + // + // If you don't want to use the batch submitter feature, you can leave it empty. + BatchSubmitterMnemonic string `json:"batch_submitter_mnemonic"` + // RelayOracle is the flag to enable the oracle relay feature. RelayOracle bool `json:"relay_oracle"` + + // MaxChunks is the maximum number of chunks in a batch. + MaxChunks int64 `json:"max_chunks"` + // MaxChunkSize is the maximum size of a chunk in a batch. + MaxChunkSize int64 `json:"max_chunk_size"` + // MaxSubmissionTime is the maximum time to submit a batch. + MaxSubmissionTime int64 `json:"max_submission_time"` // seconds } type HostConfig struct { @@ -50,15 +66,19 @@ func DefaultConfig() *Config { L1RPCAddress: "tcp://localhost:26657", L2RPCAddress: "tcp://localhost:27657", + DARPCAddress: "tcp://localhost:28657", L1GasPrice: "0.15uinit", L2GasPrice: "", + DAGasPrice: "", L1ChainID: "testnet-l1-1", L2ChainID: "testnet-l2-1", + DAChainID: "testnet-l3-1", OutputSubmitterMnemonic: "", BridgeExecutorMnemonic: "", + BatchSubmitterMnemonic: "", } } @@ -73,6 +93,9 @@ func (cfg Config) Validate() error { if cfg.L2RPCAddress == "" { return errors.New("L2 RPC URL is required") } + if cfg.DARPCAddress == "" { + return errors.New("L2 RPC URL is required") + } if cfg.L1ChainID == "" { return errors.New("L1 chain ID is required") @@ -80,11 +103,12 @@ func (cfg Config) Validate() error { if cfg.L2ChainID == "" { return errors.New("L2 chain ID is required") } - + if cfg.DAChainID == "" { + return errors.New("L2 RPC URL is required") + } if cfg.ListenAddress == "" { return errors.New("listen address is required") } - return nil } @@ -103,3 +127,25 @@ func (cfg Config) L2NodeConfig() nodetypes.NodeConfig { Mnemonic: cfg.BridgeExecutorMnemonic, } } + +func (cfg Config) DANodeConfig() nodetypes.NodeConfig { + return nodetypes.NodeConfig{ + RPC: cfg.DARPCAddress, + ChainID: cfg.DAChainID, + Mnemonic: cfg.BatchSubmitterMnemonic, + } +} + +func (cfg Config) BatchConfig() BatchConfig { + return BatchConfig{ + MaxChunks: cfg.MaxChunks, + MaxChunkSize: cfg.MaxChunkSize, + MaxSubmissionTime: cfg.MaxSubmissionTime, + } +} + +type BatchConfig struct { + MaxChunks int64 `json:"max_chunks"` + MaxChunkSize int64 `json:"max_chunk_size"` + MaxSubmissionTime int64 `json:"max_submission_time"` // seconds +} diff --git a/executor/types/const.go b/executor/types/const.go index c35cd12..36b77b9 100644 --- a/executor/types/const.go +++ b/executor/types/const.go @@ -3,5 +3,6 @@ package types const ( HostNodeName = "host" ChildNodeName = "child" + BatchNodeName = "batch" MerkleName = "merkle" ) diff --git a/go.mod b/go.mod index c9be55b..a3a1461 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/gofiber/fiber/v2 v2.52.5 github.com/initia-labs/OPinit v0.4.0 github.com/initia-labs/initia v0.4.0 + github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.18.2 gopkg.in/yaml.v2 v2.4.0 @@ -178,7 +179,6 @@ require ( github.com/oasisprotocol/curve25519-voi v0.0.0-20230904125328-1f23a7beb09a // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/node/db.go b/node/db.go index 4c542f6..6b22c9c 100644 --- a/node/db.go +++ b/node/db.go @@ -7,12 +7,12 @@ import ( "go.uber.org/zap" ) -////////////// -// SyncInfo // -////////////// +func (n *Node) SetSyncInfo(height uint64) { + n.lastProcessedBlockHeight = height +} -func (n *Node) SaveSyncInfo() error { - return n.db.Set(nodetypes.LastProcessedBlockHeightKey, dbtypes.FromUint64(n.lastProcessedBlockHeight)) +func (n *Node) SaveSyncInfo(height uint64) error { + return n.db.Set(nodetypes.LastProcessedBlockHeightKey, dbtypes.FromUint64(height)) } func (n *Node) SyncInfoToRawKV(height uint64) types.RawKV { @@ -78,14 +78,10 @@ func (n *Node) loadPendingTxs() (txs []nodetypes.PendingTxInfo, err error) { func (n *Node) PendingTxsToRawKV(txInfos []nodetypes.PendingTxInfo, delete bool) ([]types.RawKV, error) { kvs := make([]types.RawKV, 0, len(txInfos)) for _, txInfo := range txInfos { - if !txInfo.Save { - continue - } - var data []byte var err error - if !delete { + if !delete && txInfo.Save { data, err = txInfo.Marshal() if err != nil { return nil, err @@ -108,14 +104,10 @@ func (n *Node) PendingTxsToRawKV(txInfos []nodetypes.PendingTxInfo, delete bool) func (n *Node) ProcessedMsgsToRawKV(ProcessedMsgs []nodetypes.ProcessedMsgs, delete bool) ([]types.RawKV, error) { kvs := make([]types.RawKV, 0, len(ProcessedMsgs)) for _, processedMsgs := range ProcessedMsgs { - if !processedMsgs.Save { - continue - } - var data []byte var err error - if !delete { + if !delete && processedMsgs.Save { data, err = processedMsgs.MarshalInterfaceJSON(n.cdc) if err != nil { return nil, err diff --git a/node/node.go b/node/node.go index 0d4aeee..f2ef2ba 100644 --- a/node/node.go +++ b/node/node.go @@ -7,7 +7,6 @@ import ( "errors" - rpchttp "github.com/cometbft/cometbft/rpc/client/http" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/codec" @@ -15,13 +14,14 @@ import ( "github.com/cosmos/cosmos-sdk/crypto/keyring" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/tx/signing" + clienthttp "github.com/initia-labs/opinit-bots-go/client" nodetypes "github.com/initia-labs/opinit-bots-go/node/types" "github.com/initia-labs/opinit-bots-go/types" "go.uber.org/zap" ) type Node struct { - *rpchttp.HTTP + *clienthttp.HTTP cfg nodetypes.NodeConfig db types.DB @@ -31,6 +31,7 @@ type Node struct { txHandler nodetypes.TxHandlerFn beginBlockHandler nodetypes.BeginBlockHandlerFn endBlockHandler nodetypes.EndBlockHandlerFn + rawBlockHandler nodetypes.RawBlockHandlerFn cdc codec.Codec txConfig client.TxConfig @@ -50,7 +51,7 @@ type Node struct { } func NewNode(cfg nodetypes.NodeConfig, db types.DB, logger *zap.Logger, cdc codec.Codec, txConfig client.TxConfig) (*Node, error) { - client, err := client.NewClientFromNode(cfg.RPC) + client, err := clienthttp.New(cfg.RPC, "/websocket") if err != nil { return nil, err } @@ -106,7 +107,7 @@ func NewNode(cfg nodetypes.NodeConfig, db types.DB, logger *zap.Logger, cdc code return n, nil } -func (n Node) Start(ctx context.Context, errCh chan error) { +func (n Node) Start(ctx context.Context, errCh chan error, processType nodetypes.BlockProcessType) { go func() { err := n.txBroadcastLooper(ctx) if err != nil { @@ -121,7 +122,7 @@ func (n Node) Start(ctx context.Context, errCh chan error) { } go func() { - err := n.blockProcessLooper(ctx) + err := n.blockProcessLooper(ctx, processType) if err != nil { errCh <- err } @@ -270,3 +271,7 @@ func (n *Node) RegisterBeginBlockHandler(fn nodetypes.BeginBlockHandlerFn) { func (n *Node) RegisterEndBlockHandler(fn nodetypes.EndBlockHandlerFn) { n.endBlockHandler = fn } + +func (n *Node) RegisterRawBlockHandler(fn nodetypes.RawBlockHandlerFn) { + n.rawBlockHandler = fn +} diff --git a/node/process.go b/node/process.go index 84cbf02..968fe4e 100644 --- a/node/process.go +++ b/node/process.go @@ -11,7 +11,7 @@ import ( "go.uber.org/zap" ) -func (n *Node) blockProcessLooper(ctx context.Context) error { +func (n *Node) blockProcessLooper(ctx context.Context, processType nodetypes.BlockProcessType) error { timer := time.NewTicker(nodetypes.POLLING_INTERVAL) for { @@ -32,22 +32,51 @@ func (n *Node) blockProcessLooper(ctx context.Context) error { continue } - // TODO: may fetch blocks in batch - for queryHeight := n.lastProcessedBlockHeight + 1; queryHeight <= latestChainHeight; queryHeight++ { - block, blockResult, err := n.fetchNewBlock(ctx, int64(queryHeight)) - if err != nil { - // TODO: handle error - n.logger.Error("failed to fetch new block", zap.String("error", err.Error())) - break + switch processType { + case nodetypes.PROCESS_TYPE_DEFAULT: + for queryHeight := n.lastProcessedBlockHeight + 1; queryHeight <= latestChainHeight; { + // TODO: may fetch blocks in batch + block, blockResult, err := n.fetchNewBlock(ctx, int64(queryHeight)) + if err != nil { + // TODO: handle error + n.logger.Error("failed to fetch new block", zap.String("error", err.Error())) + break + } + + err = n.handleNewBlock(block, blockResult, latestChainHeight) + if err != nil { + // TODO: handle error + n.logger.Error("failed to handle new block", zap.String("error", err.Error())) + break + } + n.lastProcessedBlockHeight = queryHeight + queryHeight++ + } + + case nodetypes.PROCESS_TYPE_RAW: + start := n.lastProcessedBlockHeight + 1 + end := n.lastProcessedBlockHeight + 100 + if end > latestChainHeight { + end = latestChainHeight } - err = n.handleNewBlock(block, blockResult, latestChainHeight) + blockBulk, err := n.QueryBlockBulk(start, end) if err != nil { - // TODO: handle error - n.logger.Error("failed to handle new block", zap.String("error", err.Error())) - break + n.logger.Error("failed to fetch block bulk", zap.String("error", err.Error())) + continue + } + + for i := start; i <= end; i++ { + err := n.rawBlockHandler(nodetypes.RawBlockArgs{ + BlockHeight: i, + BlockBytes: blockBulk[i-start], + }) + if err != nil { + n.logger.Error("failed to handle raw block", zap.String("error", err.Error())) + break + } + n.lastProcessedBlockHeight = i } - n.lastProcessedBlockHeight = queryHeight } } } @@ -69,6 +98,10 @@ func (n *Node) fetchNewBlock(ctx context.Context, height int64) (block *rpccoret } func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpccoretypes.ResultBlockResults, latestChainHeight uint64) error { + protoBlock, err := block.Block.ToProto() + if err != nil { + return err + } // check pending txs first // TODO: may handle pending txs with same level of other handlers for _, tx := range block.Block.Txs { @@ -99,7 +132,7 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc if n.beginBlockHandler != nil { err := n.beginBlockHandler(nodetypes.BeginBlockArgs{ BlockID: block.BlockID.Hash, - BlockHeader: block.Block.Header, + Block: *protoBlock, LatestHeight: latestChainHeight, }) if err != nil { @@ -123,7 +156,7 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc if len(n.eventHandlers) != 0 { events := blockResult.TxsResults[txIndex].GetEvents() for eventIndex, event := range events { - err := n.handleEvent(uint64(block.Block.Height), latestChainHeight, uint64(txIndex), event) + err := n.handleEvent(uint64(block.Block.Height), latestChainHeight, event) if err != nil { return fmt.Errorf("failed to handle event: tx_index: %d, event_index: %d; %w", txIndex, eventIndex, err) } @@ -131,10 +164,17 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc } } + for eventIndex, event := range blockResult.FinalizeBlockEvents { + err := n.handleEvent(uint64(block.Block.Height), latestChainHeight, event) + if err != nil { + return fmt.Errorf("failed to handle event: finalize block, event_index: %d; %w", eventIndex, err) + } + } + if n.endBlockHandler != nil { err := n.endBlockHandler(nodetypes.EndBlockArgs{ BlockID: block.BlockID.Hash, - BlockHeader: block.Block.Header, + Block: *protoBlock, LatestHeight: latestChainHeight, }) if err != nil { @@ -144,7 +184,7 @@ func (n *Node) handleNewBlock(block *rpccoretypes.ResultBlock, blockResult *rpcc return nil } -func (n *Node) handleEvent(blockHeight uint64, latestHeight uint64, txIndex uint64, event abcitypes.Event) error { +func (n *Node) handleEvent(blockHeight uint64, latestHeight uint64, event abcitypes.Event) error { if n.eventHandlers[event.GetType()] == nil { return nil } @@ -153,7 +193,6 @@ func (n *Node) handleEvent(blockHeight uint64, latestHeight uint64, txIndex uint return n.eventHandlers[event.Type](nodetypes.EventHandlerArgs{ BlockHeight: blockHeight, LatestHeight: latestHeight, - TxIndex: txIndex, EventAttributes: event.GetAttributes(), }) } diff --git a/node/query.go b/node/query.go index dc8c472..b00996a 100644 --- a/node/query.go +++ b/node/query.go @@ -152,3 +152,17 @@ func GetQueryContext(height uint64) (context.Context, context.CancelFunc) { ctx = metadata.AppendToOutgoingContext(ctx, grpctypes.GRPCBlockHeightHeader, strHeight) return ctx, cancel } + +// QueryRawCommit queries the raw commit at a given height. +func (n *Node) QueryRawCommit(height int64) ([]byte, error) { + ctx, cancel := GetQueryContext(uint64(height)) + defer cancel() + return n.RawCommit(ctx, &height) +} + +// QueryBlockBulk queries blocks in bulk. +func (n *Node) QueryBlockBulk(start uint64, end uint64) ([][]byte, error) { + ctx, cancel := GetQueryContext(0) + defer cancel() + return n.BlockBulk(ctx, &start, &end) +} diff --git a/node/tx.go b/node/tx.go index db85938..9d031e3 100644 --- a/node/tx.go +++ b/node/tx.go @@ -7,6 +7,9 @@ import ( "regexp" "strconv" "strings" + "time" + + "github.com/pkg/errors" sdkerrors "cosmossdk.io/errors" abci "github.com/cometbft/cometbft/abci/types" @@ -39,9 +42,22 @@ func (n *Node) txBroadcastLooper(ctx context.Context) error { case <-ctx.Done(): return nil case data := <-n.txChannel: - err := n.handleProcessedMsgs(ctx, data) - if err != nil && n.handleMsgError(err) != nil { - return err + var err error + for retry := 0; retry < 5; retry++ { + err = n.handleProcessedMsgs(ctx, data) + if err == nil { + break + } else if err = n.handleMsgError(err); err == nil { + break + } + + n.logger.Warn("retry ", zap.String("error", err.Error())) + time.Sleep(2 * time.Second) + continue + } + + if err != nil { + return errors.Wrap(err, "failed to handle processed msgs") } } } diff --git a/node/types/const.go b/node/types/const.go index 04a8985..129423b 100644 --- a/node/types/const.go +++ b/node/types/const.go @@ -10,3 +10,10 @@ const MAX_PENDING_TXS = 5 const GAS_ADJUSTMENT = 1.5 const KEY_NAME = "key" const TX_TIMEOUT = 30 * time.Second + +type BlockProcessType uint8 + +const ( + PROCESS_TYPE_DEFAULT BlockProcessType = iota + PROCESS_TYPE_RAW +) diff --git a/node/types/handler.go b/node/types/handler.go index caf70e8..eb9f892 100644 --- a/node/types/handler.go +++ b/node/types/handler.go @@ -2,6 +2,7 @@ package types import ( abcitypes "github.com/cometbft/cometbft/abci/types" + cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" comettypes "github.com/cometbft/cometbft/types" ) @@ -25,7 +26,7 @@ type TxHandlerFn func(TxHandlerArgs) error type BeginBlockArgs struct { BlockID []byte - BlockHeader comettypes.Header + Block cmtproto.Block LatestHeight uint64 } @@ -33,8 +34,15 @@ type BeginBlockHandlerFn func(BeginBlockArgs) error type EndBlockArgs struct { BlockID []byte - BlockHeader comettypes.Header + Block cmtproto.Block LatestHeight uint64 } type EndBlockHandlerFn func(EndBlockArgs) error + +type RawBlockArgs struct { + BlockHeight uint64 + BlockBytes []byte +} + +type RawBlockHandlerFn func(RawBlockArgs) error diff --git a/types/db.go b/types/db.go index 3fef08d..537a7cd 100644 --- a/types/db.go +++ b/types/db.go @@ -24,4 +24,5 @@ type DB interface { WithPrefix([]byte) DB PrefixedKey([]byte) []byte UnprefixedKey([]byte) []byte + GetPath() string }