Skip to content
This repository has been archived by the owner on Nov 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2896 from bandprotocol/handle-missing-requests
Browse files Browse the repository at this point in the history
Update Yoda to handle missing requests on startup
  • Loading branch information
taobun authored Nov 18, 2020
2 parents 8ec08a9 + 943e76a commit c046c06
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_UNRELEASED.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

### Yoda

- (impv) [\#2896](https://github.com/bandprotocol/bandchain/pull/2896) Improved yoda to handle pending requests
- (impv) [\#2857](https://github.com/bandprotocol/bandchain/pull/2857) Improved yoda gas estimation function

### Emitter & Flusher
Expand Down
11 changes: 1 addition & 10 deletions chain/x/oracle/keeper/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package keeper_test

import (
"encoding/json"
"strconv"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,16 +63,8 @@ func TestQueryPendingRequests(t *testing.T) {
var queryRequest types.QueryResult
require.NoError(t, json.Unmarshal(raw, &queryRequest))

var rawRequestIDs []string
types.ModuleCdc.MustUnmarshalJSON(queryRequest.Result, &rawRequestIDs)

var requestIDs []types.RequestID
for _, r := range rawRequestIDs {
id, err := strconv.ParseInt(r, 10, 64)
require.NoError(t, err)

requestIDs = append(requestIDs, types.RequestID(id))
}
types.ModuleCdc.MustUnmarshalJSON(queryRequest.Result, &requestIDs)

require.Equal(t, tt.expected, requestIDs)
})
Expand Down
5 changes: 4 additions & 1 deletion chain/yoda/context.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package yoda

import (
"sync"
"sync/atomic"
"time"

Expand All @@ -17,7 +18,6 @@ type FeeEstimationData struct {
askCount int64
minCount int64
callData []byte
validators int
rawRequests []rawRequest
clientID string
}
Expand All @@ -44,6 +44,9 @@ type Context struct {
pendingMsgs chan ReportMsgWithKey
freeKeys chan int64
keyRoundRobinIndex int64 // Must use in conjunction with sync/atomic

dataSourceCache *sync.Map
pendingRequests map[types.RequestID]bool
}

func (c *Context) nextKeyIndex() int64 {
Expand Down
34 changes: 34 additions & 0 deletions chain/yoda/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,37 @@ func GetExecutable(c *Context, l *Logger, hash string) ([]byte, error) {
l.Debug(":balloon: Received data source hash: %s content: %q", hash, resValue[:32])
return resValue, nil
}

// GetDataSourceHash fetches data source hash by id
func GetDataSourceHash(c *Context, l *Logger, id types.DataSourceID) (string, error) {
if hash, ok := c.dataSourceCache.Load(id); ok {
return hash.(string), nil
}

res, err := c.client.ABCIQuery(fmt.Sprintf("/store/%s/key", types.StoreKey), types.DataSourceStoreKey(id))
if err != nil {
l.Debug(":skull: Failed to get data source with error: %s", err.Error())
return "", err
}

var d types.DataSource
cdc.MustUnmarshalBinaryBare(res.Response.Value, &d)

hash, _ := c.dataSourceCache.LoadOrStore(id, d.Filename)

return hash.(string), nil
}

// GetRequest fetches request by id
func GetRequest(c *Context, l *Logger, id types.RequestID) (types.Request, error) {
res, err := c.client.ABCIQuery(fmt.Sprintf("/store/%s/key", types.StoreKey), types.RequestStoreKey(id))
if err != nil {
l.Debug(":skull: Failed to get request with error: %s", err.Error())
return types.Request{}, err
}

var r types.Request
cdc.MustUnmarshalBinaryBare(res.Response.Value, &r)

return r, nil
}
2 changes: 1 addition & 1 deletion chain/yoda/gas.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func estimateReadingRequestCost(f FeeEstimationData) uint64 {

size := baseRequestSize
size += uint64(len(f.callData))
size += uint64(f.validators) * addressSize
size += uint64(f.askCount) * addressSize
size += uint64(len(f.clientID))

for _, r := range f.rawRequests {
Expand Down
191 changes: 137 additions & 54 deletions chain/yoda/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package yoda
import (
"encoding/hex"
"strconv"
"sync"

ckeys "github.com/cosmos/cosmos-sdk/client/keys"
"github.com/cosmos/cosmos-sdk/crypto/keys"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/tendermint/tendermint/crypto/tmhash"
tmtypes "github.com/tendermint/tendermint/types"
Expand All @@ -14,6 +14,12 @@ import (
"github.com/bandprotocol/bandchain/chain/x/oracle/types"
)

type processingResult struct {
rawReport types.RawReport
version string
err error
}

func handleTransaction(c *Context, l *Logger, tx tmtypes.TxResult) {
l.Debug(":eyes: Inspecting incoming transaction: %X", tmhash.Sum(tx.Tx))
if tx.Result.Code != 0 {
Expand Down Expand Up @@ -65,6 +71,12 @@ func handleRequestLog(c *Context, l *Logger, log sdk.ABCIMessageLog) {

l = l.With("rid", id)

// If id is in pending requests list, then skip it.
if c.pendingRequests[types.RequestID(id)] {
l.Debug(":eyes: Request is in pending list, then skip")
return
}

// Skip if not related to this validator
validators := GetEventValues(log, types.EventTypeRequest, types.AttributeKeyValidator)
hasMe := false
Expand All @@ -90,58 +102,7 @@ func handleRequestLog(c *Context, l *Logger, log sdk.ABCIMessageLog) {
keyIndex := c.nextKeyIndex()
key := c.keys[keyIndex]

reportsChan := make(chan types.RawReport, len(reqs))
var version sync.Map
for _, req := range reqs {
go func(l *Logger, req rawRequest) {
exec, err := GetExecutable(c, l, req.dataSourceHash)
if err != nil {
l.Error(":skull: Failed to load data source with error: %s", err.Error())
reportsChan <- types.NewRawReport(
req.externalID, 255, []byte("FAIL_TO_LOAD_DATA_SOURCE"),
)
return
}

vmsg := NewVerificationMessage(cfg.ChainID, c.validator, types.RequestID(id), req.externalID)
sig, pubkey, err := keybase.Sign(key.GetName(), ckeys.DefaultKeyPass, vmsg.GetSignBytes())
if err != nil {
l.Error(":skull: Failed to sign verify message: %s", err.Error())
reportsChan <- types.NewRawReport(req.externalID, 255, nil)
}

result, err := c.executor.Exec(exec, req.calldata, map[string]interface{}{
"BAND_CHAIN_ID": vmsg.ChainID,
"BAND_VALIDATOR": vmsg.Validator.String(),
"BAND_REQUEST_ID": strconv.Itoa(int(vmsg.RequestID)),
"BAND_EXTERNAL_ID": strconv.Itoa(int(vmsg.ExternalID)),
"BAND_REPORTER": sdk.MustBech32ifyPubKey(sdk.Bech32PubKeyTypeAccPub, pubkey),
"BAND_SIGNATURE": sig,
})

if err != nil {
l.Error(":skull: Failed to execute data source script: %s", err.Error())
reportsChan <- types.NewRawReport(req.externalID, 255, nil)
} else {
l.Debug(
":sparkles: Query data done with calldata: %q, result: %q, exitCode: %d",
req.calldata, result.Output, result.Code,
)
version.Store(result.Version, true)
reportsChan <- types.NewRawReport(req.externalID, result.Code, result.Output)
}
}(l.With("did", req.dataSourceID, "eid", req.externalID), req)
}

reports := make([]types.RawReport, 0)
execVersions := make([]string, 0)
for range reqs {
reports = append(reports, <-reportsChan)
}
version.Range(func(key, value interface{}) bool {
execVersions = append(execVersions, key.(string))
return true
})
reports, execVersions := handleRawRequests(c, l, types.RequestID(id), reqs, key)

rawAskCount := GetEventValues(log, types.EventTypeRequest, types.AttributeKeyAskCount)
if len(rawAskCount) != 1 {
Expand Down Expand Up @@ -178,9 +139,131 @@ func handleRequestLog(c *Context, l *Logger, log sdk.ABCIMessageLog) {
askCount: askCount,
minCount: minCount,
callData: callData,
validators: len(validators),
rawRequests: reqs,
clientID: clientID,
},
}
}

func handlePendingRequest(c *Context, l *Logger, id types.RequestID) {

req, err := GetRequest(c, l, id)
if err != nil {
l.Error(":skull: Failed to get request with error: %s", err.Error())
return
}

l.Info(":delivery_truck: Processing pending request")

keyIndex := c.nextKeyIndex()
key := c.keys[keyIndex]

var rawRequests []rawRequest

// prepare raw requests
for _, raw := range req.RawRequests {

hash, err := GetDataSourceHash(c, l, raw.DataSourceID)
if err != nil {
l.Error(":skull: Failed to get data source hash with error: %s", err.Error())
return
}

rawRequests = append(rawRequests, rawRequest{
dataSourceID: raw.DataSourceID,
dataSourceHash: hash,
externalID: raw.ExternalID,
calldata: string(raw.Calldata),
})
}

// process raw requests
reports, execVersions := handleRawRequests(c, l, id, rawRequests, key)

c.pendingMsgs <- ReportMsgWithKey{
msg: types.NewMsgReportData(types.RequestID(id), reports, c.validator, key.GetAddress()),
execVersion: execVersions,
keyIndex: keyIndex,
feeEstimationData: FeeEstimationData{
askCount: int64(len(req.RequestedValidators)),
minCount: int64(req.MinCount),
callData: req.Calldata,
rawRequests: rawRequests,
clientID: req.ClientID,
},
}
}

func handleRawRequests(c *Context, l *Logger, id types.RequestID, reqs []rawRequest, key keys.Info) (reports []types.RawReport, execVersions []string) {
resultsChan := make(chan processingResult, len(reqs))
for _, req := range reqs {
go handleRawRequest(c, l.With("did", req.dataSourceID, "eid", req.externalID), req, key, types.RequestID(id), resultsChan)
}

versions := map[string]bool{}
for range reqs {
result := <-resultsChan
reports = append(reports, result.rawReport)

if result.err == nil {
versions[result.version] = true
}
}

for version := range versions {
execVersions = append(execVersions, version)
}

return
}

func handleRawRequest(c *Context, l *Logger, req rawRequest, key keys.Info, id types.RequestID, processingResultCh chan processingResult) {

exec, err := GetExecutable(c, l, req.dataSourceHash)
if err != nil {
l.Error(":skull: Failed to load data source with error: %s", err.Error())
processingResultCh <- processingResult{
rawReport: types.NewRawReport(
req.externalID, 255, []byte("FAIL_TO_LOAD_DATA_SOURCE"),
),
err: err,
}
return
}

vmsg := NewVerificationMessage(cfg.ChainID, c.validator, id, req.externalID)
sig, pubkey, err := keybase.Sign(key.GetName(), ckeys.DefaultKeyPass, vmsg.GetSignBytes())
if err != nil {
l.Error(":skull: Failed to sign verify message: %s", err.Error())
processingResultCh <- processingResult{
rawReport: types.NewRawReport(req.externalID, 255, nil),
err: err,
}
}

result, err := c.executor.Exec(exec, req.calldata, map[string]interface{}{
"BAND_CHAIN_ID": vmsg.ChainID,
"BAND_VALIDATOR": vmsg.Validator.String(),
"BAND_REQUEST_ID": strconv.Itoa(int(vmsg.RequestID)),
"BAND_EXTERNAL_ID": strconv.Itoa(int(vmsg.ExternalID)),
"BAND_REPORTER": sdk.MustBech32ifyPubKey(sdk.Bech32PubKeyTypeAccPub, pubkey),
"BAND_SIGNATURE": sig,
})

if err != nil {
l.Error(":skull: Failed to execute data source script: %s", err.Error())
processingResultCh <- processingResult{
rawReport: types.NewRawReport(req.externalID, 255, nil),
err: err,
}
} else {
l.Debug(
":sparkles: Query data done with calldata: %q, result: %q, exitCode: %d",
req.calldata, result.Output, result.Code,
)
processingResultCh <- processingResult{
rawReport: types.NewRawReport(req.externalID, result.Code, result.Output),
version: result.Version,
}
}
}
26 changes: 26 additions & 0 deletions chain/yoda/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ package yoda

import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/client/flags"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/tendermint/tendermint/libs/log"
rpcclient "github.com/tendermint/tendermint/rpc/client"
httpclient "github.com/tendermint/tendermint/rpc/client/http"
tmtypes "github.com/tendermint/tendermint/types"

"github.com/bandprotocol/bandchain/chain/pkg/filecache"
"github.com/bandprotocol/bandchain/chain/x/oracle/types"
"github.com/bandprotocol/bandchain/chain/yoda/executor"
)

Expand Down Expand Up @@ -48,6 +53,25 @@ func runImpl(c *Context, l *Logger) error {
waitingMsgs[i] = []ReportMsgWithKey{}
}

// Get pending requests and handle them
rawPendingRequests, err := c.client.ABCIQueryWithOptions(fmt.Sprintf("custom/%s/%s/%s", types.StoreKey, types.QueryPendingRequests, c.validator.String()), nil, rpcclient.ABCIQueryOptions{})
if err != nil {
return err
}

var result types.QueryResult
if err := json.Unmarshal(rawPendingRequests.Response.GetValue(), &result); err != nil {
return err
}

var pendingRequests []types.RequestID
cdc.MustUnmarshalJSON(result.Result, &pendingRequests)

for _, id := range pendingRequests {
c.pendingRequests[id] = true
go handlePendingRequest(c, l.With("rid", id), id)
}

for {
select {
case ev := <-eventChan:
Expand Down Expand Up @@ -133,6 +157,8 @@ func runCmd(c *Context) *cobra.Command {
c.pendingMsgs = make(chan ReportMsgWithKey)
c.freeKeys = make(chan int64, len(keys))
c.keyRoundRobinIndex = -1
c.dataSourceCache = new(sync.Map)
c.pendingRequests = make(map[types.RequestID]bool)
return runImpl(c, l)
},
}
Expand Down

0 comments on commit c046c06

Please sign in to comment.