Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Stork oracle #13

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .chain_cookie
Empty file.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ ORACLE_STATSD_ADDR="localhost:8125"
ORACLE_STATSD_STUCK_DUR="5m"
ORACLE_STATSD_MOCKING=false
ORACLE_STATSD_DISABLED=false

STORK_WEBSOCKET_URL="wss://dev.api.stork-oracle.network/evm/subscribe"
STORK_WEBSOCKET_HEADER=
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide guidance for STORK_WEBSOCKET_HEADER.

The variable is left empty, which might be confusing for users. Consider adding a comment or example value to clarify what kind of data might be expected here, such as authentication tokens or specific headers required by the Stork oracle.

23 changes: 23 additions & 0 deletions cmd/injective-price-oracle/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func initExternalFeedsOptions(
cmd *cli.Cmd,
binanceBaseURL **string,
dynamicFeedsDir **string,
storkFeedsDir **string,
) {
*binanceBaseURL = cmd.String(cli.StringOpt{
Name: "binance-url",
Expand All @@ -148,6 +149,11 @@ func initExternalFeedsOptions(
Desc: "Path to dynamic feeds configuration files in TOML format",
EnvVar: "ORACLE_DYNAMIC_FEEDS_DIR",
})
*storkFeedsDir = cmd.String(cli.StringOpt{
Name: "stork-feeds",
Desc: "Path to stork feeds configuration files in TOML format",
EnvVar: "ORACLE_STORK_FEEDS_DIR",
})
}

// initStatsdOptions sets options for StatsD metrics.
Expand Down Expand Up @@ -194,3 +200,20 @@ func initStatsdOptions(
Value: "true",
})
}

func initStorkOracleWebSocket(
cmd *cli.Cmd,
websocketUrl **string,
websocketHeader **string,
) {
*websocketUrl = cmd.String(cli.StringOpt{
Name: "websocket-url",
Desc: "Stork websocket URL",
EnvVar: "STORK_WEBSOCKET_URL",
})
*websocketHeader = cmd.String(cli.StringOpt{
Name: "websocket header",
Desc: "Stork websocket header",
EnvVar: "STORK_WEBSOCKET_HEADER",
})
}
110 changes: 106 additions & 4 deletions cmd/injective-price-oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package main

import (
"context"
"encoding/base64"
"fmt"
"io/fs"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -13,6 +15,7 @@ import (
exchangetypes "github.com/InjectiveLabs/sdk-go/chain/exchange/types"
oracletypes "github.com/InjectiveLabs/sdk-go/chain/oracle/types"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
"github.com/gorilla/websocket"
cli "github.com/jawher/mow.cli"
"github.com/pkg/errors"
"github.com/xlab/closer"
Expand Down Expand Up @@ -49,13 +52,18 @@ func oracleCmd(cmd *cli.Cmd) {
// External Feeds params
dynamicFeedsDir *string
binanceBaseURL *string
storkFeedsDir *string

// Metrics
statsdPrefix *string
statsdAddr *string
statsdStuckDur *string
statsdMocking *string
statsdDisabled *string

// Stork Oracle websocket params
websocketUrl *string
websocketHeader *string
)

initCosmosOptions(
Expand All @@ -82,6 +90,7 @@ func oracleCmd(cmd *cli.Cmd) {
cmd,
&binanceBaseURL,
&dynamicFeedsDir,
&storkFeedsDir,
)

initStatsdOptions(
Expand All @@ -93,7 +102,14 @@ func oracleCmd(cmd *cli.Cmd) {
&statsdDisabled,
)

initStorkOracleWebSocket(
cmd,
&websocketUrl,
&websocketHeader,
)

cmd.Action = func() {
ctx := context.Background()
// ensure a clean exit
defer closer.Close()

Expand Down Expand Up @@ -150,12 +166,13 @@ func oracleCmd(cmd *cli.Cmd) {
log.Infoln("waiting for GRPC services")
time.Sleep(1 * time.Second)

daemonWaitCtx, cancelWait := context.WithTimeout(context.Background(), 10*time.Second)
daemonWaitCtx, cancelWait := context.WithTimeout(ctx, 10*time.Second)
defer cancelWait()

daemonConn := cosmosClient.QueryClient()
if err := waitForService(daemonWaitCtx, daemonConn); err != nil {
panic(fmt.Errorf("failed to wait for cosmos client connection: %w", err))
}
cancelWait()
feedProviderConfigs := map[oracle.FeedProvider]interface{}{
oracle.FeedProviderBinance: &oracle.BinanceEndpointConfig{
BaseURL: *binanceBaseURL,
Expand All @@ -173,7 +190,7 @@ func oracleCmd(cmd *cli.Cmd) {
return nil
}

cfgBody, err := ioutil.ReadFile(path)
cfgBody, err := os.ReadFile(path)
if err != nil {
err = errors.Wrapf(err, "failed to read dynamic feed config")
return err
Expand Down Expand Up @@ -201,12 +218,61 @@ func oracleCmd(cmd *cli.Cmd) {
log.Infof("found %d dynamic feed configs", len(dynamicFeedConfigs))
}

storkFeedConfigs := make([]*oracle.StorkFeedConfig, 0, 10)
if len(*storkFeedsDir) > 0 {
err := filepath.WalkDir(*storkFeedsDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
} else if d.IsDir() {
return nil
} else if filepath.Ext(path) != ".toml" {
return nil
}

cfgBody, err := os.ReadFile(path)
if err != nil {
err = errors.Wrapf(err, "failed to read stork feed config")
return err
}

feedCfg, err := oracle.ParseStorkFeedConfig(cfgBody)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should support price updates for multiple tickers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated flow, can you plz recheck?

if err != nil {
log.WithError(err).WithFields(log.Fields{
"filename": d.Name(),
}).Errorln("failed to parse stork feed config")
return nil
}

storkFeedConfigs = append(storkFeedConfigs, feedCfg)

return nil
})

if err != nil {
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *storkFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")
return
}

log.Infof("found %d stork feed configs", len(storkFeedConfigs))
}

storkWebsocket, err := ConnectWebSocket(ctx, *websocketUrl, *websocketHeader)
if err != nil {
err = errors.Wrapf(err, "can not connect with stork oracle websocket")
log.WithError(err).Errorln("failed to load stork feeds")
return
}
log.Info("Connected to stork websocket")

svc, err := oracle.NewService(
cosmosClient,
exchangetypes.NewQueryClient(daemonConn),
oracletypes.NewQueryClient(daemonConn),
feedProviderConfigs,
dynamicFeedConfigs,
storkFeedConfigs,
storkWebsocket,
)
if err != nil {
log.Fatalln(err)
Expand All @@ -228,3 +294,39 @@ func oracleCmd(cmd *cli.Cmd) {
closer.Hold()
}
}

func ConnectWebSocket(ctx context.Context, websocketUrl, urlHeader string) (conn *websocket.Conn, err error) {
u, err := url.Parse(websocketUrl)
if err != nil {
return &websocket.Conn{}, errors.Wrapf(err, "can not parse WS url %s: %v", websocketUrl, err)
}

header := http.Header{}
header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(urlHeader)))

dialer := websocket.DefaultDialer
dialer.EnableCompression = true
retries := 0
for {
conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), header)
if ctx.Err() != nil {
return nil, ctx.Err()
} else if err != nil {
log.Infof("Failed to connect to WebSocket server: %v", err)
retries++
if retries > oracle.MaxRetriesReConnectWebSocket {
log.Infof("Reached maximum retries (%d), exiting...", oracle.MaxRetriesReConnectWebSocket)
return
}
log.Infof("Retrying connect %sth in 5s...", fmt.Sprint(retries))
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.NewTimer(5*time.Second).C:
}
} else {
log.Infof("Connected to WebSocket server")
return
}
}
}
File renamed without changes.
5 changes: 5 additions & 0 deletions examples/stork/stork.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
provider = "Stork"
ticker = "BTCUSD"
pullInterval = "1m"
oracleType = "Stork"
message = "{\"type\":\"subscribe\",\"trace_id\":\"123\",\"data\":[\"BTCUSD\",\"ETHUSD\"]}"
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ module github.com/InjectiveLabs/injective-price-oracle

go 1.22

toolchain go1.22.4

require (
cosmossdk.io/math v1.3.0
github.com/InjectiveLabs/metrics v0.0.10
github.com/InjectiveLabs/sdk-go v1.51.0
github.com/cometbft/cometbft v0.38.9
github.com/cosmos/cosmos-sdk v0.50.7
github.com/ethereum/go-ethereum v1.11.5
github.com/gorilla/websocket v1.5.0
github.com/jawher/mow.cli v1.2.0
github.com/jpillora/backoff v1.0.0
github.com/mitchellh/mapstructure v1.5.0
Expand All @@ -30,7 +34,6 @@ require (
cosmossdk.io/depinject v1.0.0-alpha.4 // indirect
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/log v1.3.1 // indirect
cosmossdk.io/math v1.3.0 // indirect
cosmossdk.io/store v1.1.0 // indirect
cosmossdk.io/x/evidence v0.1.0 // indirect
cosmossdk.io/x/feegrant v0.1.0 // indirect
Expand Down Expand Up @@ -117,10 +120,9 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/handlers v1.5.2 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 h1:lLT7ZLSzGLI08vc9cpd+tYmNWjdKDqyr/2L+f6U12Fk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
Expand Down
5 changes: 2 additions & 3 deletions oracle/feed_binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
Expand Down Expand Up @@ -123,14 +122,14 @@ func (f *binancePriceFeed) PullPrice(ctx context.Context) (
return zeroPrice, err
}

respBody, err := ioutil.ReadAll(io.LimitReader(resp.Body, maxRespBytes))
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxRespBytes))
if err != nil {
metrics.ReportFuncError(f.svcTags)
_ = resp.Body.Close()
err = errors.Wrapf(err, "failed to read response body from %s", reqURL)
return zeroPrice, err
}
_ = resp.Body.Close()
defer resp.Body.Close()

var priceResp binancePriceResp
if err = json.Unmarshal(respBody, &priceResp); err != nil {
Expand Down
11 changes: 5 additions & 6 deletions oracle/feed_dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"sync/atomic"
"time"

toml "github.com/pelletier/go-toml/v2"
"github.com/pelletier/go-toml/v2"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
log "github.com/xlab/suplog"

"github.com/InjectiveLabs/injective-price-oracle/pipeline"
"github.com/InjectiveLabs/metrics"
oracletypes "github.com/InjectiveLabs/sdk-go/chain/oracle/types"

"github.com/InjectiveLabs/injective-price-oracle/pipeline"
)

type DynamicFeedConfig struct {
Expand Down Expand Up @@ -195,14 +196,12 @@ func (f *dynamicPriceFeed) PullPrice(ctx context.Context) (
}

if finalResult.HasFatalErrors() {
err = errors.Errorf("final run result has fatal errors: %v", finalResult.FatalErrors)
return zeroPrice, err
return zeroPrice, errors.Errorf("final run result has fatal errors: %v", finalResult.FatalErrors)
}

res, err := finalResult.SingularResult()
if err != nil {
err = errors.Wrap(err, "failed to get single result of pipeline run")
return zeroPrice, err
return zeroPrice, errors.Wrap(err, "failed to get single result of pipeline run")
}

price, ok := res.Value.(decimal.Decimal)
Expand Down
Loading