Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Stork oracle #13

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .chain_cookie
Empty file.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ ORACLE_STATSD_ADDR="localhost:8125"
ORACLE_STATSD_STUCK_DUR="5m"
ORACLE_STATSD_MOCKING=false
ORACLE_STATSD_DISABLED=false

STORK_WEBSOCKET_URL="wss://dev.api.stork-oracle.network/evm/subscribe"
STORK_WEBSOCKET_HEADER=
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide guidance for STORK_WEBSOCKET_HEADER.

The variable is left empty, which might be confusing for users. Consider adding a comment or example value to clarify what kind of data might be expected here, such as authentication tokens or specific headers required by the Stork oracle.

23 changes: 23 additions & 0 deletions cmd/injective-price-oracle/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func initExternalFeedsOptions(
cmd *cli.Cmd,
binanceBaseURL **string,
dynamicFeedsDir **string,
storkFeedsDir **string,
) {
*binanceBaseURL = cmd.String(cli.StringOpt{
Name: "binance-url",
Expand All @@ -148,6 +149,11 @@ func initExternalFeedsOptions(
Desc: "Path to dynamic feeds configuration files in TOML format",
EnvVar: "ORACLE_DYNAMIC_FEEDS_DIR",
})
*storkFeedsDir = cmd.String(cli.StringOpt{
Name: "stork-feeds",
Desc: "Path to stork feeds configuration files in TOML format",
EnvVar: "ORACLE_STORK_FEEDS_DIR",
})
}

// initStatsdOptions sets options for StatsD metrics.
Expand Down Expand Up @@ -194,3 +200,20 @@ func initStatsdOptions(
Value: "true",
})
}

func initStorkOracleWebSocket(
cmd *cli.Cmd,
websocketUrl **string,
websocketHeader **string,
) {
*websocketUrl = cmd.String(cli.StringOpt{
Name: "websocket-url",
Desc: "Stork websocket URL",
EnvVar: "STORK_WEBSOCKET_URL",
})
*websocketHeader = cmd.String(cli.StringOpt{
Name: "websocket header",
Desc: "Stork websocket header",
EnvVar: "STORK_WEBSOCKET_HEADER",
})
}
105 changes: 101 additions & 4 deletions cmd/injective-price-oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package main

import (
"context"
"encoding/base64"
"fmt"
"io/fs"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -13,6 +15,7 @@ import (
exchangetypes "github.com/InjectiveLabs/sdk-go/chain/exchange/types"
oracletypes "github.com/InjectiveLabs/sdk-go/chain/oracle/types"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
"github.com/gorilla/websocket"
cli "github.com/jawher/mow.cli"
"github.com/pkg/errors"
"github.com/xlab/closer"
Expand Down Expand Up @@ -49,13 +52,18 @@ func oracleCmd(cmd *cli.Cmd) {
// External Feeds params
dynamicFeedsDir *string
binanceBaseURL *string
storkFeedsDir *string

// Metrics
statsdPrefix *string
statsdAddr *string
statsdStuckDur *string
statsdMocking *string
statsdDisabled *string

// Stork Oracle websocket params
websocketUrl *string
websocketHeader *string
)

initCosmosOptions(
Expand All @@ -82,6 +90,7 @@ func oracleCmd(cmd *cli.Cmd) {
cmd,
&binanceBaseURL,
&dynamicFeedsDir,
&storkFeedsDir,
)

initStatsdOptions(
Expand All @@ -93,7 +102,14 @@ func oracleCmd(cmd *cli.Cmd) {
&statsdDisabled,
)

initStorkOracleWebSocket(
cmd,
&websocketUrl,
&websocketHeader,
)

cmd.Action = func() {
ctx := context.Background()
// ensure a clean exit
defer closer.Close()

Expand Down Expand Up @@ -150,12 +166,13 @@ func oracleCmd(cmd *cli.Cmd) {
log.Infoln("waiting for GRPC services")
time.Sleep(1 * time.Second)

daemonWaitCtx, cancelWait := context.WithTimeout(context.Background(), 10*time.Second)
daemonWaitCtx, cancelWait := context.WithTimeout(ctx, 10*time.Second)
defer cancelWait()

daemonConn := cosmosClient.QueryClient()
if err := waitForService(daemonWaitCtx, daemonConn); err != nil {
panic(fmt.Errorf("failed to wait for cosmos client connection: %w", err))
}
cancelWait()
feedProviderConfigs := map[oracle.FeedProvider]interface{}{
oracle.FeedProviderBinance: &oracle.BinanceEndpointConfig{
BaseURL: *binanceBaseURL,
Expand All @@ -173,7 +190,7 @@ func oracleCmd(cmd *cli.Cmd) {
return nil
}

cfgBody, err := ioutil.ReadFile(path)
cfgBody, err := os.ReadFile(path)
if err != nil {
err = errors.Wrapf(err, "failed to read dynamic feed config")
return err
Expand Down Expand Up @@ -201,12 +218,61 @@ func oracleCmd(cmd *cli.Cmd) {
log.Infof("found %d dynamic feed configs", len(dynamicFeedConfigs))
}

storkFeedConfigs := make([]*oracle.StorkFeedConfig, 0, 10)
if len(*storkFeedsDir) > 0 {
err := filepath.WalkDir(*storkFeedsDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
} else if d.IsDir() {
return nil
} else if filepath.Ext(path) != ".toml" {
return nil
}

cfgBody, err := os.ReadFile(path)
if err != nil {
err = errors.Wrapf(err, "failed to read stork feed config")
return err
}

feedCfg, err := oracle.ParseStorkFeedConfig(cfgBody)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should support price updates for multiple tickers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated flow, can you plz recheck?

if err != nil {
log.WithError(err).WithFields(log.Fields{
"filename": d.Name(),
}).Errorln("failed to parse stork feed config")
return nil
}

storkFeedConfigs = append(storkFeedConfigs, feedCfg)

return nil
})

if err != nil {
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *storkFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")
return
}

log.Infof("found %d stork feed configs", len(storkFeedConfigs))
}

storkWebsocket, err := ConnectWebSocket(ctx, *websocketUrl, *websocketHeader)
if err != nil {
err = errors.Wrapf(err, "can not connect with stork oracle websocket")
log.WithError(err).Fatalln("failed to load stork feeds")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be a fatal error; if the Stork feed WS is down for whatever reason this would prevent the whole Injective oracle from starting, if we cannot connect it should disable pulling/pushing Stork oracle prices. Ideally, it should try to reconnect so it can eventually resume the operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we will try to reconnect ws with MaxRetriesReConnectWebSocket = 5, so after that times it will return error with the log that can't not connect with ws, I think in case cannot connect it should disable pulling/pushing Stork as your recommend because we dont want to try to reconnect alot of time and make stuck for another price feeder, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this should be fine 👍

return
}
log.Info("Connected to stork websocket")

svc, err := oracle.NewService(
cosmosClient,
exchangetypes.NewQueryClient(daemonConn),
oracletypes.NewQueryClient(daemonConn),
feedProviderConfigs,
dynamicFeedConfigs,
storkFeedConfigs,
storkWebsocket,
)
if err != nil {
log.Fatalln(err)
Expand All @@ -228,3 +294,34 @@ func oracleCmd(cmd *cli.Cmd) {
closer.Hold()
}
}

func ConnectWebSocket(ctx context.Context, websocketUrl, urlHeader string) (conn *websocket.Conn, err error) {
u, err := url.Parse(websocketUrl)
if err != nil {
log.Fatal("Error parsing URL:", err)
return &websocket.Conn{}, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fatal will panic, it should return the error instead

Suggested change
log.Fatal("Error parsing URL:", err)
return &websocket.Conn{}, err
return &websocket.Conn{}, errors.Wrapf(err, "can not parse WS url %s: %v", websocketUrl, err)

}

header := http.Header{}
header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(urlHeader)))

dialer := websocket.DefaultDialer
dialer.EnableCompression = true
retries := 0
for {
conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), header)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should retry only in case of no context error

Suggested change
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
} else if err != nil {

log.Infof("Failed to connect to WebSocket server: %v", err)
retries++
if retries > oracle.MaxRetriesReConnectWebSocket {
log.Infof("Reached maximum retries (%d), exiting...", oracle.MaxRetriesReConnectWebSocket)
return
}
log.Infof("Retrying connect %sth in 5s...", fmt.Sprint(retries))
time.Sleep(5 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is blocking, it should acknowledge the context

Suggested change
time.Sleep(5 * time.Second)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.NewTimer(5*time.Second).C:
}

} else {
log.Infof("Connected to WebSocket server")
return
}
}
}
5 changes: 5 additions & 0 deletions examples/stork.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
provider = "Stork"
Copy link
Contributor

@hmoragrega hmoragrega Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should be committed in a different folder, with the code as it is, we cannot have price feed and stork configs in the same folder, because they are parsed twice, as DynamicFeedConfig and then as StorkFeedConfig too

Ideally, it'll be better if both file types can coexist in the same folder, it'll require less DevOps overhead and it's much clearer to know all the enabled feeds, the oracleType should be used to discriminate which type of config and the puller to get the prices

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First we should separate them into different folders then process your idea in another pr to make this pr can be merged soon

ticker = "BTCUSD"
pullInterval = "1m"
oracleType = "Stork"
message = "{\"type\":\"subscribe\",\"trace_id\":\"123\",\"data\":[\"BTCUSD\",\"ETHUSD\"]}"
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ module github.com/InjectiveLabs/injective-price-oracle

go 1.22

toolchain go1.22.4

require (
cosmossdk.io/math v1.3.0
github.com/InjectiveLabs/metrics v0.0.10
github.com/InjectiveLabs/sdk-go v1.51.0
github.com/cometbft/cometbft v0.38.9
github.com/cosmos/cosmos-sdk v0.50.7
github.com/ethereum/go-ethereum v1.11.5
github.com/gorilla/websocket v1.5.0
github.com/jawher/mow.cli v1.2.0
github.com/jpillora/backoff v1.0.0
github.com/mitchellh/mapstructure v1.5.0
Expand All @@ -30,7 +34,6 @@ require (
cosmossdk.io/depinject v1.0.0-alpha.4 // indirect
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/log v1.3.1 // indirect
cosmossdk.io/math v1.3.0 // indirect
cosmossdk.io/store v1.1.0 // indirect
cosmossdk.io/x/evidence v0.1.0 // indirect
cosmossdk.io/x/feegrant v0.1.0 // indirect
Expand Down Expand Up @@ -117,10 +120,9 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/handlers v1.5.2 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 h1:lLT7ZLSzGLI08vc9cpd+tYmNWjdKDqyr/2L+f6U12Fk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
Expand Down
5 changes: 2 additions & 3 deletions oracle/feed_binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
Expand Down Expand Up @@ -123,14 +122,14 @@ func (f *binancePriceFeed) PullPrice(ctx context.Context) (
return zeroPrice, err
}

respBody, err := ioutil.ReadAll(io.LimitReader(resp.Body, maxRespBytes))
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxRespBytes))
if err != nil {
metrics.ReportFuncError(f.svcTags)
_ = resp.Body.Close()
err = errors.Wrapf(err, "failed to read response body from %s", reqURL)
return zeroPrice, err
}
_ = resp.Body.Close()
defer resp.Body.Close()

var priceResp binancePriceResp
if err = json.Unmarshal(respBody, &priceResp); err != nil {
Expand Down
11 changes: 5 additions & 6 deletions oracle/feed_dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"sync/atomic"
"time"

toml "github.com/pelletier/go-toml/v2"
"github.com/pelletier/go-toml/v2"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
log "github.com/xlab/suplog"

"github.com/InjectiveLabs/injective-price-oracle/pipeline"
"github.com/InjectiveLabs/metrics"
oracletypes "github.com/InjectiveLabs/sdk-go/chain/oracle/types"

"github.com/InjectiveLabs/injective-price-oracle/pipeline"
)

type DynamicFeedConfig struct {
Expand Down Expand Up @@ -195,14 +196,12 @@ func (f *dynamicPriceFeed) PullPrice(ctx context.Context) (
}

if finalResult.HasFatalErrors() {
err = errors.Errorf("final run result has fatal errors: %v", finalResult.FatalErrors)
return zeroPrice, err
return zeroPrice, errors.Errorf("final run result has fatal errors: %v", finalResult.FatalErrors)
}

res, err := finalResult.SingularResult()
if err != nil {
err = errors.Wrap(err, "failed to get single result of pipeline run")
return zeroPrice, err
return zeroPrice, errors.Wrap(err, "failed to get single result of pipeline run")
}

price, ok := res.Value.(decimal.Decimal)
Expand Down
Loading