Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Stork oracle #13

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .chain_cookie
Empty file.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ ORACLE_STATSD_ADDR="localhost:8125"
ORACLE_STATSD_STUCK_DUR="5m"
ORACLE_STATSD_MOCKING=false
ORACLE_STATSD_DISABLED=false

STORK_WEBSOCKET_URL="wss://dev.api.stork-oracle.network/evm/subscribe"
STORK_WEBSOCKET_HEADER=
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide guidance for STORK_WEBSOCKET_HEADER.

The variable is left empty, which might be confusing for users. Consider adding a comment or example value to clarify what kind of data might be expected here, such as authentication tokens or specific headers required by the Stork oracle.

23 changes: 23 additions & 0 deletions cmd/injective-price-oracle/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func initExternalFeedsOptions(
cmd *cli.Cmd,
binanceBaseURL **string,
dynamicFeedsDir **string,
storkFeedsDir **string,
) {
*binanceBaseURL = cmd.String(cli.StringOpt{
Name: "binance-url",
Expand All @@ -148,6 +149,11 @@ func initExternalFeedsOptions(
Desc: "Path to dynamic feeds configuration files in TOML format",
EnvVar: "ORACLE_DYNAMIC_FEEDS_DIR",
})
*storkFeedsDir = cmd.String(cli.StringOpt{
Name: "stork-feeds",
Desc: "Path to stork feeds configuration files in TOML format",
EnvVar: "ORACLE_STORK_FEEDS_DIR",
})
}

// initStatsdOptions sets options for StatsD metrics.
Expand Down Expand Up @@ -194,3 +200,20 @@ func initStatsdOptions(
Value: "true",
})
}

func initStorkOracleWebSocket(
cmd *cli.Cmd,
websocketUrl **string,
websocketHeader **string,
) {
*websocketUrl = cmd.String(cli.StringOpt{
Name: "websocket-url",
Desc: "Stork websocket URL",
EnvVar: "STORK_WEBSOCKET_URL",
})
*websocketHeader = cmd.String(cli.StringOpt{
Name: "websocket header",
Desc: "Stork websocket header",
EnvVar: "STORK_WEBSOCKET_HEADER",
})
}
96 changes: 96 additions & 0 deletions cmd/injective-price-oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ import (
"path/filepath"
"strings"
"time"
"net/url"
"net/http"
"encoding/base64"

exchangetypes "github.com/InjectiveLabs/sdk-go/chain/exchange/types"
oracletypes "github.com/InjectiveLabs/sdk-go/chain/oracle/types"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
"github.com/gorilla/websocket"
cli "github.com/jawher/mow.cli"
"github.com/pkg/errors"
"github.com/xlab/closer"
Expand Down Expand Up @@ -49,13 +53,18 @@ func oracleCmd(cmd *cli.Cmd) {
// External Feeds params
dynamicFeedsDir *string
binanceBaseURL *string
storkFeedsDir *string

// Metrics
statsdPrefix *string
statsdAddr *string
statsdStuckDur *string
statsdMocking *string
statsdDisabled *string

// Stork Oracle websocket params
websocketUrl *string
websocketHeader *string
)

initCosmosOptions(
Expand All @@ -82,6 +91,7 @@ func oracleCmd(cmd *cli.Cmd) {
cmd,
&binanceBaseURL,
&dynamicFeedsDir,
&storkFeedsDir,
)

initStatsdOptions(
Expand All @@ -93,6 +103,12 @@ func oracleCmd(cmd *cli.Cmd) {
&statsdDisabled,
)

initStorkOracleWebSocket(
cmd,
&websocketUrl,
&websocketHeader,
)

cmd.Action = func() {
// ensure a clean exit
defer closer.Close()
Expand Down Expand Up @@ -201,12 +217,61 @@ func oracleCmd(cmd *cli.Cmd) {
log.Infof("found %d dynamic feed configs", len(dynamicFeedConfigs))
}

storkFeedConfigs := make([]*oracle.StorkFeedConfig, 0, 10)
if len(*storkFeedsDir) > 0 {
err := filepath.WalkDir(*storkFeedsDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
} else if d.IsDir() {
return nil
} else if filepath.Ext(path) != ".toml" {
return nil
}

cfgBody, err := ioutil.ReadFile(path)
if err != nil {
err = errors.Wrapf(err, "failed to read stork feed config")
return err
}

feedCfg, err := oracle.ParseStorkFeedConfig(cfgBody)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should support price updates for multiple tickers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated flow, can you plz recheck?

if err != nil {
log.WithError(err).WithFields(log.Fields{
"filename": d.Name(),
}).Errorln("failed to parse stork feed config")
return nil
}

storkFeedConfigs = append(storkFeedConfigs, feedCfg)

return nil
})

if err != nil {
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")
Copy link

@coderabbitai coderabbitai bot Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix copy-paste error in error handling message.

The error message incorrectly references *dynamicFeedsDir instead of *storkFeedsDir. Correct this to avoid confusion.

- err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
+ err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *storkFeedsDir)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *storkFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ThanhNhann can you accept this please 😉

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

return
}

log.Infof("found %d stork feed configs", len(storkFeedConfigs))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of Stork feed configuration handling in oracleCmd function.

The handling of Stork feed configurations is well-integrated into the existing system. The use of filepath.WalkDir to load configurations from a directory is efficient and appropriate.

However, there's a copy-paste error in the error handling (line 251). The error message incorrectly references *dynamicFeedsDir instead of *storkFeedsDir. This needs correction to avoid confusion during troubleshooting.

- err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
+ err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *storkFeedsDir)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
storkFeedConfigs := make([]*oracle.StorkFeedConfig, 0, 10)
if len(*storkFeedsDir) > 0 {
err := filepath.WalkDir(*storkFeedsDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
} else if d.IsDir() {
return nil
} else if filepath.Ext(path) != ".toml" {
return nil
}
cfgBody, err := ioutil.ReadFile(path)
if err != nil {
err = errors.Wrapf(err, "failed to read stork feed config")
return err
}
feedCfg, err := oracle.ParseStorkFeedConfig(cfgBody)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"filename": d.Name(),
}).Errorln("failed to parse stork feed config")
return nil
}
storkFeedConfigs = append(storkFeedConfigs, feedCfg)
return nil
})
if err != nil {
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")
return
}
log.Infof("found %d stork feed configs", len(storkFeedConfigs))
}
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *storkFeedsDir)


storkWebsocket, err := ConnectWebSocket(*websocketUrl, *websocketHeader)
if err != nil {
err = errors.Wrapf(err, "can not connect with stork oracle websocket")
log.WithError(err).Fatalln("failed to load stork feeds")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be a fatal error; if the Stork feed WS is down for whatever reason this would prevent the whole Injective oracle from starting, if we cannot connect it should disable pulling/pushing Stork oracle prices. Ideally, it should try to reconnect so it can eventually resume the operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we will try to reconnect ws with MaxRetriesReConnectWebSocket = 5, so after that times it will return error with the log that can't not connect with ws, I think in case cannot connect it should disable pulling/pushing Stork as your recommend because we dont want to try to reconnect alot of time and make stuck for another price feeder, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this should be fine 👍

return
}
log.Info("Connected to stork websocket")

svc, err := oracle.NewService(
cosmosClient,
exchangetypes.NewQueryClient(daemonConn),
oracletypes.NewQueryClient(daemonConn),
feedProviderConfigs,
dynamicFeedConfigs,
storkFeedConfigs,
storkWebsocket,
)
if err != nil {
log.Fatalln(err)
Expand All @@ -228,3 +293,34 @@ func oracleCmd(cmd *cli.Cmd) {
closer.Hold()
}
}

func ConnectWebSocket(websocketUrl, urlHeader string) (conn *websocket.Conn, err error) {
u, err := url.Parse(websocketUrl)
if err != nil {
log.Fatal("Error parsing URL:", err)
return &websocket.Conn{}, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fatal will panic, it should return the error instead

Suggested change
log.Fatal("Error parsing URL:", err)
return &websocket.Conn{}, err
return &websocket.Conn{}, errors.Wrapf(err, "can not parse WS url %s: %v", websocketUrl, err)

}

header := http.Header{}
header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(urlHeader)))

dialer := websocket.DefaultDialer
dialer.EnableCompression = true
retries := 0
for {
conn, _, err = websocket.DefaultDialer.Dial(u.String(), header)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should retry only in case of no context error

Suggested change
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
} else if err != nil {

log.Infof("Failed to connect to WebSocket server: %v", err)
retries++
if retries > oracle.MaxRetriesReConnectWebSocket {
log.Infof("Reached maximum retries (%d), exiting...", oracle.MaxRetriesReConnectWebSocket)
return &websocket.Conn{}, err
}
log.Infof("Retrying in 5s...")
time.Sleep(5 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is blocking, it should acknowledge the context

Suggested change
time.Sleep(5 * time.Second)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.NewTimer(5*time.Second).C:
}

} else {
log.Infof("Connected to WebSocket server")
return conn, nil
}
}
}
5 changes: 5 additions & 0 deletions examples/stork.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
provider = "Stork"
Copy link
Contributor

@hmoragrega hmoragrega Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should be committed in a different folder, with the code as it is, we cannot have price feed and stork configs in the same folder, because they are parsed twice, as DynamicFeedConfig and then as StorkFeedConfig too

Ideally, it'll be better if both file types can coexist in the same folder, it'll require less DevOps overhead and it's much clearer to know all the enabled feeds, the oracleType should be used to discriminate which type of config and the puller to get the prices

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First we should separate them into different folders then process your idea in another pr to make this pr can be merged soon

ticker = "BTCUSD"
pullInterval = "1m"
oracleType = "Stork"
message = "{\"type\":\"subscribe\",\"trace_id\":\"123\",\"data\":[\"BTCUSD\",\"ETHUSD\"]}"
Loading