Skip to content

Commit

Permalink
Merge pull request #826 from SiaFoundation/chris/redistribute-batches
Browse files Browse the repository at this point in the history
Redistribute wallet in batches
  • Loading branch information
ChrisSchinnerl authored Dec 15, 2023
2 parents dcafaf8 + 3fd61f6 commit 29cf82e
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 113 deletions.
2 changes: 1 addition & 1 deletion autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type Bus interface {
WalletDiscard(ctx context.Context, txn types.Transaction) error
WalletOutputs(ctx context.Context) (resp []wallet.SiacoinElement, err error)
WalletPending(ctx context.Context) (resp []types.Transaction, err error)
WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (id types.TransactionID, err error)
WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (ids []types.TransactionID, err error)
}

type Autopilot struct {
Expand Down
16 changes: 9 additions & 7 deletions autopilot/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type (
resolver *ipResolver
logger *zap.SugaredLogger

maintenanceTxnID types.TransactionID
maintenanceTxnIDs []types.TransactionID

revisionBroadcastInterval time.Duration
revisionLastBroadcast map[types.FileContractID]time.Time
Expand Down Expand Up @@ -579,9 +579,11 @@ func (c *contractor) performWalletMaintenance(ctx context.Context) error {
return nil
}
for _, txn := range pending {
if c.maintenanceTxnID == txn.ID() {
l.Debugf("wallet maintenance skipped, pending transaction found with id %v", c.maintenanceTxnID)
return nil
for _, mTxnID := range c.maintenanceTxnIDs {
if mTxnID == txn.ID() {
l.Debugf("wallet maintenance skipped, pending transaction found with id %v", mTxnID)
return nil
}
}
}

Expand All @@ -607,13 +609,13 @@ func (c *contractor) performWalletMaintenance(ctx context.Context) error {
}

// redistribute outputs
id, err := b.WalletRedistribute(ctx, int(outputs), amount)
ids, err := b.WalletRedistribute(ctx, int(outputs), amount)
if err != nil {
return fmt.Errorf("failed to redistribute wallet into %d outputs of amount %v, balance %v, err %v", outputs, amount, balance, err)
}

l.Debugf("wallet maintenance succeeded, tx %v", id)
c.maintenanceTxnID = id
l.Debugf("wallet maintenance succeeded, txns %v", ids)
c.maintenanceTxnIDs = ids
return nil
}

Expand Down
23 changes: 14 additions & 9 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type (
Balance() (spendable, confirmed, unconfirmed types.Currency, _ error)
FundTransaction(cs consensus.State, txn *types.Transaction, amount types.Currency, useUnconfirmedTxns bool) ([]types.Hash256, error)
Height() uint64
Redistribute(cs consensus.State, outputs int, amount, feePerByte types.Currency, pool []types.Transaction) (types.Transaction, []types.Hash256, error)
ReleaseInputs(txn types.Transaction)
Redistribute(cs consensus.State, outputs int, amount, feePerByte types.Currency, pool []types.Transaction) ([]types.Transaction, []types.Hash256, error)
ReleaseInputs(txn ...types.Transaction)
SignTransaction(cs consensus.State, txn *types.Transaction, toSign []types.Hash256, cf types.CoveredFields) error
Transactions(before, since time.Time, offset, limit int) ([]wallet.Transaction, error)
UnspentOutputs() ([]wallet.SiacoinElement, error)
Expand Down Expand Up @@ -602,22 +602,27 @@ func (b *bus) walletRedistributeHandler(jc jape.Context) {
}

cs := b.cm.TipState()
txn, toSign, err := b.w.Redistribute(cs, wfr.Outputs, wfr.Amount, b.tp.RecommendedFee(), b.tp.Transactions())
txns, toSign, err := b.w.Redistribute(cs, wfr.Outputs, wfr.Amount, b.tp.RecommendedFee(), b.tp.Transactions())
if jc.Check("couldn't redistribute money in the wallet into the desired outputs", err) != nil {
return
}

err = b.w.SignTransaction(cs, &txn, toSign, types.CoveredFields{WholeTransaction: true})
if jc.Check("couldn't sign the transaction", err) != nil {
return
var ids []types.TransactionID
for i := 0; i < len(txns); i++ {
err = b.w.SignTransaction(cs, &txns[i], toSign, types.CoveredFields{WholeTransaction: true})
if jc.Check("couldn't sign the transaction", err) != nil {
b.w.ReleaseInputs(txns...)
return
}
ids = append(ids, txns[i].ID())
}

if jc.Check("couldn't broadcast the transaction", b.tp.AcceptTransactionSet([]types.Transaction{txn})) != nil {
b.w.ReleaseInputs(txn)
if jc.Check("couldn't broadcast the transaction", b.tp.AcceptTransactionSet(txns)) != nil {
b.w.ReleaseInputs(txns...)
return
}

jc.Encode(txn.ID())
jc.Encode(ids)
}

func (b *bus) walletDiscardHandler(jc jape.Context) {
Expand Down
4 changes: 2 additions & 2 deletions bus/client/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ func (c *Client) WalletPrepareRenew(ctx context.Context, revision types.FileCont
// WalletRedistribute broadcasts a transaction that redistributes the money in
// the wallet in the desired number of outputs of given amount. If the
// transaction was successfully broadcasted it will return the transaction ID.
func (c *Client) WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (id types.TransactionID, err error) {
func (c *Client) WalletRedistribute(ctx context.Context, outputs int, amount types.Currency) (ids []types.TransactionID, err error) {
req := api.WalletRedistributeRequest{
Amount: amount,
Outputs: outputs,
}

err = c.c.WithContext(ctx).POST("/wallet/redistribute", req, &id)
err = c.c.WithContext(ctx).POST("/wallet/redistribute", req, &ids)
return
}

Expand Down
178 changes: 102 additions & 76 deletions wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"sync"
"time"
Expand All @@ -16,12 +15,17 @@ import (
"go.sia.tech/renterd/api"
"go.sia.tech/siad/modules"
"go.uber.org/zap"
"lukechampine.com/frand"
)

// BytesPerInput is the encoded size of a SiacoinInput and corresponding
// TransactionSignature, assuming standard UnlockConditions.
const BytesPerInput = 241
const (
// BytesPerInput is the encoded size of a SiacoinInput and corresponding
// TransactionSignature, assuming standard UnlockConditions.
BytesPerInput = 241

// redistributeBatchSize is the number of outputs to redistribute per txn to
// avoid creating a txn that is too large.
redistributeBatchSize = 10
)

// ErrInsufficientBalance is returned when there aren't enough unused outputs to
// cover the requested amount.
Expand Down Expand Up @@ -223,14 +227,22 @@ func (w *SingleAddressWallet) FundTransaction(cs consensus.State, txn *types.Tra
return nil, err
}

// choose outputs randomly
frand.Shuffle(len(utxos), reflect.Swapper(utxos))
// desc sort
sort.Slice(utxos, func(i, j int) bool {
return utxos[i].Value.Cmp(utxos[j].Value) > 0
})

// add all unconfirmed outputs to the end of the slice as a last resort
if useUnconfirmedTxns {
var tpoolUtxos []SiacoinElement
for _, sco := range w.tpoolUtxos {
utxos = append(utxos, sco)
tpoolUtxos = append(tpoolUtxos, sco)
}
// desc sort
sort.Slice(tpoolUtxos, func(i, j int) bool {
return tpoolUtxos[i].Value.Cmp(tpoolUtxos[j].Value) > 0
})
utxos = append(utxos, tpoolUtxos...)
}

var outputSum types.Currency
Expand Down Expand Up @@ -270,9 +282,17 @@ func (w *SingleAddressWallet) FundTransaction(cs consensus.State, txn *types.Tra
// ReleaseInputs is a helper function that releases the inputs of txn for use in
// other transactions. It should only be called on transactions that are invalid
// or will never be broadcast.
func (w *SingleAddressWallet) ReleaseInputs(txn types.Transaction) {
for _, in := range txn.SiacoinInputs {
delete(w.lastUsed, types.Hash256(in.ParentID))
func (w *SingleAddressWallet) ReleaseInputs(txns ...types.Transaction) {
w.mu.Lock()
defer w.mu.Unlock()
w.releaseInputs(txns...)
}

func (w *SingleAddressWallet) releaseInputs(txns ...types.Transaction) {
for _, txn := range txns {
for _, in := range txn.SiacoinInputs {
delete(w.lastUsed, types.Hash256(in.ParentID))
}
}
}

Expand Down Expand Up @@ -300,10 +320,7 @@ func (w *SingleAddressWallet) SignTransaction(cs consensus.State, txn *types.Tra
// Redistribute returns a transaction that redistributes money in the wallet by
// selecting a minimal set of inputs to cover the creation of the requested
// outputs. It also returns a list of output IDs that need to be signed.
//
// NOTE: we can not reuse 'FundTransaction' because it randomizes the unspent
// transaction outputs it uses and we need a minimal set of inputs
func (w *SingleAddressWallet) Redistribute(cs consensus.State, outputs int, amount, feePerByte types.Currency, pool []types.Transaction) (types.Transaction, []types.Hash256, error) {
func (w *SingleAddressWallet) Redistribute(cs consensus.State, outputs int, amount, feePerByte types.Currency, pool []types.Transaction) ([]types.Transaction, []types.Hash256, error) {
w.mu.Lock()
defer w.mu.Unlock()

Expand All @@ -318,7 +335,7 @@ func (w *SingleAddressWallet) Redistribute(cs consensus.State, outputs int, amou
// fetch unspent transaction outputs
utxos, err := w.store.UnspentSiacoinElements(false)
if err != nil {
return types.Transaction{}, nil, err
return nil, nil, err
}

// check whether a redistribution is necessary, adjust number of desired
Expand All @@ -332,84 +349,93 @@ func (w *SingleAddressWallet) Redistribute(cs consensus.State, outputs int, amou
}
}
if outputs <= 0 {
return types.Transaction{}, nil, nil
}

// prepare all outputs
var txn types.Transaction
for i := 0; i < int(outputs); i++ {
txn.SiacoinOutputs = append(txn.SiacoinOutputs, types.SiacoinOutput{
Value: amount,
Address: w.Address(),
})
return nil, nil, nil
}

// desc sort
sort.Slice(utxos, func(i, j int) bool {
return utxos[i].Value.Cmp(utxos[j].Value) > 0
})

// estimate the fees
outputFees := feePerByte.Mul64(uint64(len(encoding.Marshal(txn.SiacoinOutputs))))
feePerInput := feePerByte.Mul64(BytesPerInput)
// prepare all outputs
var txns []types.Transaction
var toSign []types.Hash256

for outputs > 0 {
var txn types.Transaction
for i := 0; i < outputs && i < redistributeBatchSize; i++ {
txn.SiacoinOutputs = append(txn.SiacoinOutputs, types.SiacoinOutput{
Value: amount,
Address: w.Address(),
})
}
outputs -= len(txn.SiacoinOutputs)

// estimate the fees
outputFees := feePerByte.Mul64(uint64(len(encoding.Marshal(txn.SiacoinOutputs))))
feePerInput := feePerByte.Mul64(BytesPerInput)

// collect outputs that cover the total amount
var inputs []SiacoinElement
want := amount.Mul64(uint64(len(txn.SiacoinOutputs)))
var amtInUse, amtSameValue, amtNotMatured types.Currency
for _, sce := range utxos {
inUse := w.isOutputUsed(sce.ID) || inPool[sce.ID]
matured := cs.Index.Height >= sce.MaturityHeight
sameValue := sce.Value.Equals(amount)
if inUse {
amtInUse = amtInUse.Add(sce.Value)
continue
} else if sameValue {
amtSameValue = amtSameValue.Add(sce.Value)
continue
} else if !matured {
amtNotMatured = amtNotMatured.Add(sce.Value)
continue
}

// collect outputs that cover the total amount
var inputs []SiacoinElement
want := amount.Mul64(uint64(outputs))
var amtInUse, amtSameValue, amtNotMatured types.Currency
for _, sce := range utxos {
inUse := w.isOutputUsed(sce.ID) || inPool[sce.ID]
matured := cs.Index.Height >= sce.MaturityHeight
sameValue := sce.Value.Equals(amount)
if inUse {
amtInUse = amtInUse.Add(sce.Value)
continue
} else if sameValue {
amtSameValue = amtSameValue.Add(sce.Value)
continue
} else if !matured {
amtNotMatured = amtNotMatured.Add(sce.Value)
continue
inputs = append(inputs, sce)
fee := feePerInput.Mul64(uint64(len(inputs))).Add(outputFees)
if SumOutputs(inputs).Cmp(want.Add(fee)) > 0 {
break
}
}

inputs = append(inputs, sce)
// not enough outputs found
fee := feePerInput.Mul64(uint64(len(inputs))).Add(outputFees)
if SumOutputs(inputs).Cmp(want.Add(fee)) > 0 {
break
if sumOut := SumOutputs(inputs); sumOut.Cmp(want.Add(fee)) < 0 {
// in case of an error we need to free all inputs
w.releaseInputs(txns...)
return nil, nil, fmt.Errorf("%w: inputs %v < needed %v + txnFee %v (usable: %v, inUse: %v, sameValue: %v, notMatured: %v)",
ErrInsufficientBalance, sumOut.String(), want.String(), fee.String(), sumOut.String(), amtInUse.String(), amtSameValue.String(), amtNotMatured.String())
}
}

// not enough outputs found
fee := feePerInput.Mul64(uint64(len(inputs))).Add(outputFees)
if sumOut := SumOutputs(inputs); sumOut.Cmp(want.Add(fee)) < 0 {
return types.Transaction{}, nil, fmt.Errorf("%w: inputs %v < needed %v + txnFee %v (usable: %v, inUse: %v, sameValue: %v, notMatured: %v)",
ErrInsufficientBalance, sumOut.String(), want.String(), fee.String(), sumOut.String(), amtInUse.String(), amtSameValue.String(), amtNotMatured.String())
}
// set the miner fee
txn.MinerFees = []types.Currency{fee}

// set the miner fee
txn.MinerFees = []types.Currency{fee}
// add the change output
change := SumOutputs(inputs).Sub(want.Add(fee))
if !change.IsZero() {
txn.SiacoinOutputs = append(txn.SiacoinOutputs, types.SiacoinOutput{
Value: change,
Address: w.addr,
})
}

// add the change output
change := SumOutputs(inputs).Sub(want.Add(fee))
if !change.IsZero() {
txn.SiacoinOutputs = append(txn.SiacoinOutputs, types.SiacoinOutput{
Value: change,
Address: w.addr,
})
}
// add the inputs
for _, sce := range inputs {
txn.SiacoinInputs = append(txn.SiacoinInputs, types.SiacoinInput{
ParentID: types.SiacoinOutputID(sce.ID),
UnlockConditions: StandardUnlockConditions(w.priv.PublicKey()),
})
toSign = append(toSign, sce.ID)
w.lastUsed[sce.ID] = time.Now()
}

// add the inputs
toSign := make([]types.Hash256, len(inputs))
for i, sce := range inputs {
txn.SiacoinInputs = append(txn.SiacoinInputs, types.SiacoinInput{
ParentID: types.SiacoinOutputID(sce.ID),
UnlockConditions: StandardUnlockConditions(w.priv.PublicKey()),
})
toSign[i] = sce.ID
w.lastUsed[sce.ID] = time.Now()
txns = append(txns, txn)
}

return txn, toSign, nil
return txns, toSign, nil
}

func (w *SingleAddressWallet) isOutputUsed(id types.Hash256) bool {
Expand Down
Loading

0 comments on commit 29cf82e

Please sign in to comment.