Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow delaying transactions in ResponsePrepareProposal #717

Merged
merged 7 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion abci/cmd/abci-cli/abci-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func cmdPrepareProposal(cmd *cobra.Command, args []string) error {
existingTx := inTxArray(txsBytesArray, tx.Tx)
if tx.Action == types.TxRecord_UNKNOWN ||
(existingTx && tx.Action == types.TxRecord_ADDED) ||
(!existingTx && (tx.Action == types.TxRecord_UNMODIFIED || tx.Action == types.TxRecord_REMOVED)) {
(!existingTx && (tx.Action == types.TxRecord_UNMODIFIED || tx.Action == types.TxRecord_REMOVED || tx.Action == types.TxRecord_DELAYED)) {
resps = append(resps, response{
Code: codeBad,
Log: "Failed. Tx: " + string(tx.GetTx()) + " action: " + tx.Action.String(),
Expand Down
98 changes: 77 additions & 21 deletions abci/example/kvstore/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,53 @@ type VerifyTxFunc func(tx types.Tx, typ abci.CheckTxType) (abci.ResponseCheckTx,
// ExecTxFunc executes the transaction against some state
type ExecTxFunc func(tx types.Tx, roundState State) (abci.ExecTxResult, error)

// Helper struct that controls size of added transactions
type TxRecords struct {
Size int64
Limit int64
Txs []*abci.TxRecord
}

// Add new transaction if it fits the limit.
//
// Returns action that was taken, as some transactions may be delayed despite provided `tx.Action`.
// Errors when newly added transaction does not fit the limit.
func (txr *TxRecords) Add(tx *abci.TxRecord) (abci.TxRecord_TxAction, error) {
txSize := int64(len(tx.Tx))
switch tx.Action {
case abci.TxRecord_ADDED:
if txr.Size+txSize > txr.Limit {
return abci.TxRecord_UNKNOWN, errors.New("new transaction cannot be added: over limit")
}
// add to txs
txr.Txs = append(txr.Txs, tx)
txr.Size += txSize
return tx.Action, nil
case abci.TxRecord_UNMODIFIED:
{
if txr.Size+txSize > txr.Limit {
// over limit, delaying
delay := abci.TxRecord{Tx: tx.Tx, Action: abci.TxRecord_DELAYED}
return txr.Add(&delay)
}

// add to txs
txr.Txs = append(txr.Txs, tx)
txr.Size += txSize
return tx.Action, nil
}

case abci.TxRecord_REMOVED, abci.TxRecord_DELAYED:
// remove from txs, not counted in size
txr.Txs = append(txr.Txs, tx)
return tx.Action, nil
default:
panic(fmt.Sprintf("unknown tx action: %v", tx.Action))
}
}

func prepareTxs(req abci.RequestPrepareProposal) ([]*abci.TxRecord, error) {
return substPrepareTx(req.Txs, req.MaxTxBytes), nil
return substPrepareTx(req.Txs, req.MaxTxBytes)
}

func txRecords2Txs(txRecords []*abci.TxRecord) types.Txs {
Expand Down Expand Up @@ -83,34 +128,45 @@ func execPrepareTx(tx []byte) (abci.ExecTxResult, error) {
// proposal for transactions with the prefix stripped.
// It marks all of the original transactions as 'REMOVED' so that
// Tendermint will remove them from its mempool.
func substPrepareTx(txs [][]byte, maxTxBytes int64) []*abci.TxRecord {
trs := make([]*abci.TxRecord, 0, len(txs))
var removed []*abci.TxRecord
var totalBytes int64
func substPrepareTx(txs [][]byte, maxTxBytes int64) ([]*abci.TxRecord, error) {
trs := TxRecords{
Size: 0,
Limit: maxTxBytes,
Txs: make([]*abci.TxRecord, 0, len(txs)+1),
}

for _, tx := range txs {
action := abci.TxRecord_UNMODIFIED

// As a special logic of this app, we replace tx with the prefix 'prepare' with one without the prefix.
// We need to preserve ordering of the transactions.
if isPrepareTx(tx) {
// replace tx and add it as REMOVED
removed = append(removed, &abci.TxRecord{
Tx: tx,
Action: abci.TxRecord_REMOVED,
})
totalBytes -= int64(len(tx))

tx = bytes.TrimPrefix(tx, []byte(PreparePrefix))
action = abci.TxRecord_ADDED
// add new tx in place of the old one
record := abci.TxRecord{
Tx: bytes.TrimPrefix(tx, []byte(PreparePrefix)),
Action: abci.TxRecord_ADDED,
}
if _, err := trs.Add(&record); err != nil {
// cannot add new tx, so we cannot remove old one - just delay it and retry next time
action = abci.TxRecord_DELAYED
} else {
// old one can be removed from the mempool
action = abci.TxRecord_REMOVED
}
}
totalBytes += int64(len(tx))
if totalBytes > maxTxBytes {
break
}
trs = append(trs, &abci.TxRecord{

// Now we add the transaction to the list of transactions
transaction := &abci.TxRecord{
Tx: tx,
Action: action,
})
}
if _, err := trs.Add(transaction); err != nil {
// this should definitely not fail, as we don't add anything new
return nil, err
}
}

return append(trs, removed...)
return trs.Txs, nil
}

const PreparePrefix = "prepare"
Expand Down
59 changes: 59 additions & 0 deletions abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/gogo/protobuf/jsonpb"
"github.com/rs/zerolog"

"github.com/dashpay/tenderdash/crypto"
cryptoenc "github.com/dashpay/tenderdash/crypto/encoding"
Expand Down Expand Up @@ -276,6 +277,64 @@ func (m *ResponsePrepareProposal) Validate() error {
return nil
}

type Misbehaviors []Misbehavior

func (m Misbehaviors) MarshalZerologArray(e *zerolog.Array) {
for v := range m {
e.Interface(v)
}
}

type Txs [][]byte

func (b Txs) MarshalZerologArray(a *zerolog.Array) {
for _, bs := range b {
a.Hex(crypto.Checksum(bs)[:8])
}
}

func (txr *TxRecord) MarshalZerologObject(e *zerolog.Event) {
e.Str("action", txr.Action.String())
e.Hex("tx", crypto.Checksum(txr.Tx)[:8])
}

func (r *RequestPrepareProposal) MarshalZerologObject(e *zerolog.Event) {
e.Int64("max_tx_bytes", r.MaxTxBytes)
e.Array("txs", Txs(r.Txs))
e.Interface("last_commit", r.LocalLastCommit)
e.Array("misbehavior", Misbehaviors(r.Misbehavior))
e.Time("proposed_time", r.Time)

e.Int64("height", r.Height)
e.Int32("round", r.Round)

e.Hex("next_validators_hash", r.NextValidatorsHash)
e.Uint32("core_chain_locked_height", r.CoreChainLockedHeight)
e.Hex("proposer_pro_tx_hash", r.ProposerProTxHash)
e.Uint64("proposed_app_version", r.ProposedAppVersion)
e.Str("version", r.Version.String())
e.Hex("quorum_hash", r.QuorumHash)
}

func (r *RequestProcessProposal) MarshalZerologObject(e *zerolog.Event) {
e.Array("txs", Txs(r.Txs))
e.Interface("last_commit", r.ProposedLastCommit.String())
e.Array("misbehavior", Misbehaviors(r.Misbehavior))
e.Time("proposed_time", r.Time)

e.Hex("block_hash", r.Hash)
e.Int64("height", r.Height)
e.Int32("round", r.Round)

e.Hex("next_validators_hash", r.NextValidatorsHash)
e.Uint32("core_chain_locked_height", r.CoreChainLockedHeight)
e.Interface("core_chain_lock_update", r.CoreChainLockUpdate)
e.Hex("proposer_pro_tx_hash", r.ProposerProTxHash)
e.Uint64("proposed_app_version", r.ProposedAppVersion)
e.Str("version", r.Version.String())
e.Hex("quorum_hash", r.QuorumHash)
}

func isValidApphash(apphash tmbytes.HexBytes) bool {
return len(apphash) == crypto.DefaultAppHashSize
}
Expand Down
Loading
Loading