Skip to content

Commit

Permalink
Add fee limit setting when speeding up renewal requests (#9)
Browse files Browse the repository at this point in the history
* Add fee limit setting when speeding up renewal requests

* fix: Add mising sectors num of message
  • Loading branch information
strahe authored Jun 3, 2024
1 parent db71800 commit f5fc68b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 8 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ Authorization: Bearer <token>
> 只有pending状态的请求才能加速,其他状态的请求不能加速.
> 加速返回success,并不能保证消息一定会上链,等待一段时间后再次查询请求状态,可多次尝试.
#### 请求参数
| 参数名 || 是否必须 | 说明 |
|-----------|------|------|---------------------------------|
| fee_limit | 1FIL || 允许最大消耗的GAS费用,不传系统自动预估,值越大上链时间越快 |
#### 请求示例
```json
{
"fee_limit": "1FIL"
}
```
#### 返回示例
```json
{
Expand Down
34 changes: 33 additions & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"strconv"
"time"

"github.com/filecoin-project/lotus/chain/types"

"github.com/filecoin-project/lotus/api"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/go-playground/validator/v10"
Expand Down Expand Up @@ -111,6 +115,10 @@ func (a *implAPI) get(w http.ResponseWriter, r *http.Request) {
warpResponse(w, http.StatusOK, req, nil)
}

type speedupRequestArgs struct {
FeeLimit *string `json:"fee_limit"`
}

func (a *implAPI) speedup(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := strconv.Atoi(vars["id"])
Expand All @@ -119,7 +127,31 @@ func (a *implAPI) speedup(w http.ResponseWriter, r *http.Request) {
return
}

err = a.srv.speedupRequest(r.Context(), uint(id))
var args speedupRequestArgs
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
warpResponse(w, http.StatusBadRequest, nil, err)
return
}

if err := a.validate.Struct(args); err != nil {
warpResponse(w, http.StatusBadRequest, nil, err)
return
}

var mss *api.MessageSendSpec
if args.FeeLimit != nil {
maxFee, err := types.ParseFIL(*args.FeeLimit)
if err != nil {
warpResponse(w, http.StatusBadRequest, nil,
fmt.Errorf("failed to parse fee limit: %s", err))
return
}
mss = &api.MessageSendSpec{
MaxFee: abi.TokenAmount(maxFee),
}
}

err = a.srv.speedupRequest(r.Context(), uint(id), mss)
if err != nil {
warpResponse(w, http.StatusBadRequest, nil, err)
return
Expand Down
22 changes: 15 additions & 7 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func buildParams(l miner.SectorLocation, newExp abi.ChainEpoch, numbers []abi.Se
return &e2, cannotExtendSectors, sectorsInDecl, nil
}

func (s *Service) speedupRequest(ctx context.Context, id uint) error {
func (s *Service) speedupRequest(ctx context.Context, id uint, mss *api.MessageSendSpec) error {
var request Request
if err := s.db.Preload(clause.Associations).First(&request, id).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand All @@ -647,12 +647,13 @@ func (s *Service) speedupRequest(ctx context.Context, id uint) error {
if request.Status != RequestStatusPending {
return fmt.Errorf("request is not pending")
}

for _, msg := range request.Messages {
if msg.OnChain {
continue
}
// todo: need order by nonce?
if err := s.replaceMessage(ctx, msg.ID); err != nil {
if err := s.replaceMessage(ctx, msg.ID, mss); err != nil {
return fmt.Errorf("failed to replace message: %w", err)
}
}
Expand Down Expand Up @@ -789,7 +790,7 @@ func (s *Service) runPendingChecker(ctx context.Context) {
func() {
var replaceMessages []uint
err := s.watchingMessages.Range(func(k uint, wm *watchMessage) error {
if time.Since(wm.started) > 5*time.Minute {
if time.Since(wm.started) > 6*time.Hour {
log.Warnw("message is pending too long", "id", k, "took", time.Since(wm.started))
}
if s.maxWait > 0 && time.Since(wm.started) > s.maxWait {
Expand All @@ -800,8 +801,14 @@ func (s *Service) runPendingChecker(ctx context.Context) {
if err != nil {
log.Errorf("failed to range watching messages: %s", err)
}

maxFee, _ := types.ParseFIL("1FIL") // todo: get from config

mss := &api.MessageSendSpec{
MaxFee: abi.TokenAmount(maxFee),
}
for _, id := range replaceMessages {
if err := s.replaceMessage(ctx, id); err != nil {
if err := s.replaceMessage(ctx, id, mss); err != nil {
log.Errorf("failed to replace message: %s", err)
}
}
Expand All @@ -810,7 +817,7 @@ func (s *Service) runPendingChecker(ctx context.Context) {
}
}

func (s *Service) replaceMessage(ctx context.Context, id uint) error {
func (s *Service) replaceMessage(ctx context.Context, id uint, mss *api.MessageSendSpec) error {
var m Message
if err := s.db.Preload(clause.Associations).First(&m, id).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
Expand Down Expand Up @@ -857,7 +864,7 @@ func (s *Service) replaceMessage(ctx context.Context, id uint) error {

defaultRBF := messagepool.ComputeRBF(msg.GasPremium, cfg.ReplaceByFeeRatio)

ret, err := s.api.GasEstimateMessageGas(ctx, &msg, nil, types.EmptyTSK)
ret, err := s.api.GasEstimateMessageGas(ctx, &msg, mss, types.EmptyTSK)
if err != nil {
return fmt.Errorf("failed to estimate gas values: %w", err)
}
Expand All @@ -867,7 +874,7 @@ func (s *Service) replaceMessage(ctx context.Context, id uint) error {
mff := func() (abi.TokenAmount, error) {
return abi.TokenAmount(config.DefaultDefaultMaxFee), nil
}
messagepool.CapGasFee(mff, &msg, nil)
messagepool.CapGasFee(mff, &msg, mss)

smsg, err := s.api.WalletSignMessage(ctx, msg.From, &msg)
if err != nil {
Expand All @@ -887,6 +894,7 @@ func (s *Service) replaceMessage(ctx context.Context, id uint) error {
Cid: CID{newID},
Extensions: m.Extensions,
RequestID: m.RequestID,
Sectors: m.Sectors,
}

if err := tx.Create(newMsg).Error; err != nil {
Expand Down

0 comments on commit f5fc68b

Please sign in to comment.