Skip to content

Commit

Permalink
Cherry-pick: Problem: parallel tx execution is not supported (crypto-…
Browse files Browse the repository at this point in the history
…org-chain#205)

Reason for Cherry-pick:
- Migrate Block STM (Parallel Transaction Execution)

Original Commit Message:

add basic support in sdk:
- add a TxExecutor baseapp option
- add TxIndex/TxCount/MsgIndex in context

Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

-----------

Cherry-picked-by: zsystm <[email protected]>
  • Loading branch information
yihuang authored and zsystm committed May 28, 2024
1 parent c4d9a49 commit 650cf91
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 51 deletions.
9 changes: 0 additions & 9 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ linters:
- exportloopref
- goconst
- gocritic
- gci
- gofumpt
- gosec
- gosimple
Expand Down Expand Up @@ -63,14 +62,6 @@ issues:
max-same-issues: 10000

linters-settings:
gci:
custom-order: true
sections:
- standard # Standard section: captures all standard packages.
- default # Default section: contains all imports that could not be matched to another section type.
- prefix(cosmossdk.io)
- prefix(github.com/cosmos/cosmos-sdk)

gosec:
# To select a subset of rules to run.
# Available rules: https://github.com/securego/gosec#available-rules
Expand Down
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

### Features

* (baseapp) [#205](https://github.com/crypto-org-chain/cosmos-sdk/pull/205) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support tx parallel execution.

## [v0.50.6](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.50.6) - 2024-04-22

### Features

* (types) [#19759](https://github.com/cosmos/cosmos-sdk/pull/19759) Align SignerExtractionAdapter in PriorityNonceMempool Remove.
* (client) [#19870](https://github.com/cosmos/cosmos-sdk/pull/19870) Add new query command `wait-tx`. Alias `event-query-tx-for` to `wait-tx` for backward compatibility.

### Improvements
### Improvements

* (telemetry) [#19903](https://github.com/cosmos/cosmos-sdk/pull/19903) Conditionally emit metrics based on enablement.
* **Introduction of `Now` Function**: Added a new function called `Now` to the telemetry package. It returns the current system time if telemetry is enabled, or a zero time if telemetry is not enabled.
Expand Down Expand Up @@ -498,7 +502,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

* (types) [#16980](https://github.com/cosmos/cosmos-sdk/pull/16980) Deprecate `IntProto` and `DecProto`. Instead, `math.Int` and `math.LegacyDec` should be used respectively. Both types support `Marshal` and `Unmarshal` for binary serialization.
* (x/staking) [#14567](https://github.com/cosmos/cosmos-sdk/pull/14567) The `delegator_address` field of `MsgCreateValidator` has been deprecated.
The validator address bytes and delegator address bytes refer to the same account while creating validator (defer only in bech32 notation).
The validator address bytes and delegator address bytes refer to the same account while creating validator (defer only in bech32 notation).

## Previous Versions

Expand Down
91 changes: 58 additions & 33 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*ab
func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
var mode execMode

switch {
case req.Type == abci.CheckTxType_New:
switch req.Type {
case abci.CheckTxType_New:
mode = execModeCheck

case req.Type == abci.CheckTxType_Recheck:
case abci.CheckTxType_Recheck:
mode = execModeReCheck

default:
Expand Down Expand Up @@ -775,48 +775,34 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
app.finalizeBlockState.SetContext(
app.finalizeBlockState.Context().
WithBlockGasMeter(gasMeter).
WithTxCount(len(req.Txs)),
)

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
//
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
// vote extensions, so skip those.
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
for _, rawTx := range req.Txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
txResults, err := app.executeTxs(ctx, req.Txs)
if err != nil {
// usually due to canceled
return nil, err
}

if app.finalizeBlockState.ms.TracingEnabled() {
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
var blockGasUsed uint64
for _, res := range txResults {
blockGasUsed += uint64(res.GasUsed)
}
sdkCtx := app.finalizeBlockState.Context().WithBlockGasUsed(blockGasUsed)

endBlock, err := app.endBlock(sdkCtx)
if err != nil {
return nil, err
}
Expand All @@ -840,6 +826,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}, nil
}

func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
if app.txExecutor != nil {
return app.txExecutor(ctx, len(txs), app.finalizeBlockState.ms, func(i int, ms storetypes.MultiStore) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(txs[i], i, ms)
})
}

txResults := make([]*abci.ExecTxResult, 0, len(txs))
for i, rawTx := range txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx, i)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
}
return txResults, nil
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
Expand Down
25 changes: 21 additions & 4 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ type BaseApp struct {
//
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
disableBlockGasMeter bool

// Optional alternative tx executor, used for block-stm parallel transaction execution.
txExecutor TxExecutor
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down Expand Up @@ -659,13 +662,14 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
}

// retrieve the context for the tx w/ txBytes and other memoized values.
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
modeState := app.getState(mode)
if modeState == nil {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.Context().
WithTxBytes(txBytes).
WithTxIndex(txIndex).
WithGasMeter(storetypes.NewInfiniteGasMeter())
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

Expand Down Expand Up @@ -747,7 +751,11 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock,
return resp, nil
}

func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(tx, txIndex, nil)
}

func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore) *abci.ExecTxResult {
gInfo := sdk.GasInfo{}
resultStr := "successful"

Expand All @@ -760,7 +768,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx)
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, txIndex, txMultiStore)
if err != nil {
resultStr = "failed"
resp = sdkerrors.ResponseExecTxResultWithEvents(
Expand Down Expand Up @@ -818,12 +826,19 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) {
// returned if the tx does not run out of gas and if all the messages are valid
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
return app.runTxWithMultiStore(mode, txBytes, -1, nil)
}

func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, txIndex int, txMultiStore storetypes.MultiStore) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
// meter, so we initialize upfront.
var gasWanted uint64

ctx := app.getContextForTx(mode, txBytes)
ctx := app.getContextForTx(mode, txBytes, txIndex)
if txMultiStore != nil {
ctx = ctx.WithMultiStore(txMultiStore)
}
ms := ctx.MultiStore()

// only run the tx if there is block gas remaining
Expand Down Expand Up @@ -1001,6 +1016,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
break
}

ctx = ctx.WithMsgIndex(i)

handler := app.msgServiceRouter.Handler(msg)
if handler == nil {
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)
Expand Down
2 changes: 1 addition & 1 deletion baseapp/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)
// ExecuteGenesisTx implements genesis.GenesisState from
// cosmossdk.io/core/genesis to set initial state in genesis
func (ba BaseApp) ExecuteGenesisTx(tx []byte) error {
res := ba.deliverTx(tx)
res := ba.deliverTx(tx, -1)

if res.Code != types.CodeTypeOK {
return errors.New(res.Log)
Expand Down
10 changes: 10 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func DisableBlockGasMeter() func(*BaseApp) {
return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) }
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
return func(app *BaseApp) { app.txExecutor = executor }
}

func (app *BaseApp) SetName(name string) {
if app.sealed {
panic("SetName() on sealed BaseApp")
Expand Down Expand Up @@ -372,3 +377,8 @@ func (app *BaseApp) SetStreamingManager(manager storetypes.StreamingManager) {
func (app *BaseApp) SetDisableBlockGasMeter(disableBlockGasMeter bool) {
app.disableBlockGasMeter = disableBlockGasMeter
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
app.txExecutor = executor
}
4 changes: 2 additions & 2 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s
}

func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeFinalize, txBytes)
return app.getContextForTx(execModeFinalize, txBytes, -1)
}

func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeCheck, txBytes)
return app.getContextForTx(execModeCheck, txBytes, -1)
}
16 changes: 16 additions & 0 deletions baseapp/txexecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package baseapp

import (
"context"

abci "github.com/cometbft/cometbft/abci/types"

"cosmossdk.io/store/types"
)

type TxExecutor func(
ctx context.Context,
blockSize int,
cms types.MultiStore,
deliverTxWithMultiStore func(int, types.MultiStore) *abci.ExecTxResult,
) ([]*abci.ExecTxResult, error)
35 changes: 35 additions & 0 deletions types/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ type Context struct {
streamingManager storetypes.StreamingManager
cometInfo comet.BlockInfo
headerInfo header.Info

// the index of the current tx in the block, -1 means not in finalize block context
txIndex int
// the index of the current msg in the tx, -1 means not in finalize block context
msgIndex int
// the total number of transactions in current block
txCount int
// sum the gas used by all the transactions in the current block, only accessible by end blocker
blockGasUsed uint64
}

// Proposed rename, not done to avoid API breakage
Expand Down Expand Up @@ -91,6 +100,10 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans
func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager }
func (c Context) CometInfo() comet.BlockInfo { return c.cometInfo }
func (c Context) HeaderInfo() header.Info { return c.headerInfo }
func (c Context) TxIndex() int { return c.txIndex }
func (c Context) MsgIndex() int { return c.msgIndex }
func (c Context) TxCount() int { return c.txCount }
func (c Context) BlockGasUsed() uint64 { return c.blockGasUsed }

// clone the header before returning
func (c Context) BlockHeader() cmtproto.Header {
Expand Down Expand Up @@ -137,6 +150,8 @@ func NewContext(ms storetypes.MultiStore, header cmtproto.Header, isCheckTx bool
eventManager: NewEventManager(),
kvGasConfig: storetypes.KVGasConfig(),
transientKVGasConfig: storetypes.TransientGasConfig(),
txIndex: -1,
msgIndex: -1,
}
}

Expand Down Expand Up @@ -310,6 +325,26 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context {
return c
}

func (c Context) WithTxIndex(txIndex int) Context {
c.txIndex = txIndex
return c
}

func (c Context) WithTxCount(txCount int) Context {
c.txCount = txCount
return c
}

func (c Context) WithMsgIndex(msgIndex int) Context {
c.msgIndex = msgIndex
return c
}

func (c Context) WithBlockGasUsed(gasUsed uint64) Context {
c.blockGasUsed = gasUsed
return c
}

// TODO: remove???
func (c Context) IsZero() bool {
return c.ms == nil
Expand Down

0 comments on commit 650cf91

Please sign in to comment.