Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Stork oracle #13

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

ThanhNhann
Copy link
Contributor

@ThanhNhann ThanhNhann commented Jun 29, 2024

Change

Update sdk-go version compatible with MsgRelayStorkMessage
Add support for MsgRelayStorkMessage

Testing

  • Firstly, you need to setup 2 env variables STORK_WEBSOCKET_UR and STORK_WEBSOCKET_HEADER here
  • Then injective-price-oracle start --stork-feeds examples/stork.toml

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced support for Stork Oracle feeds, enabling integration with the Stork provider for real-time price data.
    • Added configuration options for Stork feeds, including command-line parameters and configuration files.
    • Implemented WebSocket connectivity for Stork Oracle to fetch and process live price data.
  • Enhancements

    • Enhanced the PriceData structure to include asset pairs from Stork Oracle.
    • Expanded the PricePuller interface and oracleSvc service to support Stork feeds.
    • Improved error handling and logging during WebSocket connections for better resilience.

Copy link

coderabbitai bot commented Jun 29, 2024

Walkthrough

The recent changes enhance the Injective Price Oracle by integrating support for Stork feeds. New configuration options and methods have been introduced for managing WebSocket connections and processing Stork price data. The dynamicPriceFeed structure has been augmented with a method to pull asset pairs, while a new storkPriceFeed structure has been implemented to handle Stork-specific data and interactions, improving overall functionality and resilience.

Changes

File Change Summary
cmd/injective-price-oracle/options.go Added storkFeedsDir parameter and initStorkOracleWebSocket function for Stork configuration.
cmd/injective-price-oracle/oracle.go Expanded oracleCmd to include Stork parameters and added ConnectWebSocket for handling WebSocket connections.
examples/stork.toml Introduced configuration settings for the Stork provider.
oracle/feed_dynamic.go Added PullAssetPair method to the dynamicPriceFeed structure.
oracle/feed_stork.go Introduced storkPriceFeed structure with methods for parsing configurations and managing WebSocket data.
oracle/models.go Included AssetPairs field in the PriceData struct for Stork integration.
oracle/service.go Enhanced service logic for Stork feeds, added new fields, and updated NewService and Start methods.
go.mod Updated Go version, modified dependencies, and added direct dependency on github.com/gorilla/websocket.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant CLI
    participant OracleService
    participant StorkWebSocket

    User->>CLI: Provide Stork feed configurations
    CLI->>OracleService: Initialize Oracle with Stork configurations
    OracleService->>StorkWebSocket: Connect and retrieve price data
    StorkWebSocket-->>OracleService: Send price data
    OracleService->>CLI: Provide updated prices
    CLI-->>User: Display updated prices
Loading

Poem

In lands where data flows like streams,
The Stork now joins our pricing dreams.
With WebSocket links, it pulls the thread,
"BTCUSD," the prices spread.
So rejoice, dear coders, in this feat,
For Oracle's heart now skips a beat.
And in the code, new worlds we meet.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@ThanhNhann ThanhNhann marked this pull request as ready for review July 4, 2024 07:43
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between b357dd8 and 54a30d3.

Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
Files selected for processing (9)
  • cmd/injective-price-oracle/options.go (2 hunks)
  • cmd/injective-price-oracle/oracle.go (3 hunks)
  • examples/stork.toml (1 hunks)
  • go.mod (2 hunks)
  • oracle/feed_binance.go (1 hunks)
  • oracle/feed_dynamic.go (1 hunks)
  • oracle/feed_stork.go (1 hunks)
  • oracle/models.go (1 hunks)
  • oracle/service.go (14 hunks)
Files skipped from review due to trivial changes (2)
  • examples/stork.toml
  • oracle/models.go
Additional comments not posted (24)
cmd/injective-price-oracle/options.go (1)

Line range hint 139-156:
LGTM! Verify the usage of the new parameter.

The addition of the storkFeedsDir parameter looks good.

However, ensure that the new parameter is correctly used in the codebase.

Verification successful

Verified: Proper usage of the storkFeedsDir parameter.

The storkFeedsDir parameter has been correctly integrated and utilized in the codebase.

  • Initialization: The parameter is properly initialized in initExternalFeedsOptions.
  • Usage: The parameter is used in a conditional check and a directory walk in oracle.go.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of the new parameter `storkFeedsDir`.

# Test: Search for the usage of the parameter. Expect: Proper usage in the codebase.
rg --type go $'storkFeedsDir'

Length of output: 491


Script:

#!/bin/bash
# Extract relevant lines around the usage of `storkFeedsDir` in oracle.go for review.
rg --type go --context 5 'storkFeedsDir' cmd/injective-price-oracle/oracle.go

Length of output: 834

cmd/injective-price-oracle/oracle.go (1)

251-251: LGTM!

The code correctly passes the Stork feed configurations to the NewService function.

oracle/feed_stork.go (9)

26-34: LGTM! Struct definition is correct.

The StorkFeedConfig struct is well-defined with appropriate field tags for TOML unmarshalling.


39-51: LGTM! Struct definition is correct.

The storkPriceFeed struct is well-defined with appropriate fields for the Stork price feed.


53-61: LGTM! Function implementation is correct.

The ParseStorkFeedConfig function is well-implemented with proper error handling.


63-113: LGTM! Function implementation is correct.

The NewStorkPriceFeed function is well-implemented with proper error handling and initialization logic.


136-201: LGTM! Function implementation is correct.

The PullAssetPair function is well-implemented with proper error handling and WebSocket communication logic.


204-209: LGTM! Function implementation is correct.

The PullPrice function is well-implemented as a placeholder that returns a zero price.


211-222: LGTM! Function implementation is correct.

The ConvertDataToAssetPair function is well-implemented with proper conversion logic.


224-236: LGTM! Function implementation is correct.

The ConvertSignedPrice function is well-implemented with proper conversion logic.


238-245: LGTM! Function implementation is correct.

The CombineSignatureToString function is well-implemented with proper string manipulation logic.

go.mod (5)

3-5: LGTM! Go version update is correct.

The Go version has been updated from 1.21 to 1.22 to leverage new language features and improvements.


8-8: LGTM! New dependency is correct.

The new dependency cosmossdk.io/math appears to be necessary for mathematical operations.


10-10: LGTM! Dependency update is correct.

The dependency github.com/InjectiveLabs/sdk-go has been updated to version v1.50.2 to support MsgRelayStorkMessage.


17-17: LGTM! New dependency is correct.

The new dependency github.com/pelletier/go-toml/v2 appears to be necessary for TOML parsing.


226-227: LGTM! Dependency replacement is correct.

The dependency cosmossdk.io/store has been replaced with a specific version to ensure compatibility with the InjectiveLabs cosmos-sdk version.

oracle/service.go (8)

37-38: LGTM! Interface update is correct.

The PricePuller interface has been updated to include the PullAssetPair method to support the new Stork feed provider.


55-55: LGTM! Struct update is correct.

The oracleSvc struct has been updated to include a map of supported Stork feeds to manage the new Stork feed configurations.


82-82: LGTM! New constant is correct.

The new constant FeedProviderStork is necessary to identify the Stork feed provider.


Line range hint 146-194:
LGTM! Function update is correct.

The NewService function has been updated to accept Stork feed configurations to initialize the service with Stork feed configurations.


240-293: LGTM! Function update is correct.

The processSetPriceFeed function has been updated to handle Stork oracle types to process Stork-specific logic for pulling asset pairs.


370-387: LGTM! New function is correct.

The new function composeStorkOracleMsgs is necessary to handle the composition of Stork-specific messages.


388-392: LGTM! Function update is correct.

The composeMsgs function has been updated to include Stork oracle messages to compose messages for Stork oracle types.


454-469: LGTM! Function update is correct.

The commitSetPrices function has been updated to handle Stork oracle types to process and commit prices for Stork oracle types.

Comment on lines 154 to 156
func (f *binancePriceFeed) PullAssetPair(ctx context.Context) (assetPair oracletypes.AssetPair, err error){
return oracletypes.AssetPair{}, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete Implementation: Add logic to PullAssetPair.

The method PullAssetPair currently returns an empty AssetPair object. Implement the necessary logic to return actual asset pair data.

-  return oracletypes.AssetPair{}, nil
+  // TODO: Implement the logic to fetch and return the actual asset pair data.

Committable suggestion was skipped due to low confidence.

Comment on lines 229 to 231
func (f *dynamicPriceFeed) PullAssetPair(ctx context.Context) (assetPair oracletypes.AssetPair, err error){
return oracletypes.AssetPair{}, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete Implementation: Add logic to PullAssetPair.

The method PullAssetPair currently returns an empty AssetPair object. Implement the necessary logic to return actual asset pair data.

-  return oracletypes.AssetPair{}, nil
+  // TODO: Implement the logic to fetch and return the actual asset pair data.

Committable suggestion was skipped due to low confidence.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 54a30d3 and ca84799.

Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
Files selected for processing (2)
  • go.mod (2 hunks)
  • oracle/service.go (14 hunks)
Files skipped from review as they are similar to previous changes (2)
  • go.mod
  • oracle/service.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between ca84799 and f3b2eca.

Files selected for processing (2)
  • examples/stork.toml (1 hunks)
  • oracle/feed_stork.go (1 hunks)
Files skipped from review due to trivial changes (1)
  • examples/stork.toml
Files skipped from review as they are similar to previous changes (1)
  • oracle/feed_stork.go

@ThanhNhann ThanhNhann force-pushed the nhan/stork-relayer branch 2 times, most recently from f3b2eca to ca84799 Compare July 12, 2024 18:19
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between f3b2eca and 7c72b62.

Files selected for processing (1)
  • oracle/feed_stork.go (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • oracle/feed_stork.go

@Kishan-Dhakan Kishan-Dhakan requested a review from albertchon July 12, 2024 18:34
return err
}

feedCfg, err := oracle.ParseStorkFeedConfig(cfgBody)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should support price updates for multiple tickers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated flow, can you plz recheck?

@@ -151,6 +151,10 @@ func (f *binancePriceFeed) PullPrice(ctx context.Context) (
return priceResp.Price, nil
}

func (f *binancePriceFeed) PullAssetPair(ctx context.Context) (assetPair oracletypes.AssetPair, err error){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty hacky. Better to create a separate flow for Stork

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check the new flow of Stork, currently I still use the base of the core logic

@@ -225,3 +225,7 @@ func (f *dynamicPriceFeed) PullPrice(ctx context.Context) (

return price, nil
}

func (f *dynamicPriceFeed) PullAssetPair(ctx context.Context) (assetPair oracletypes.AssetPair, err error){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing here

oracle/feed_stork.go Outdated Show resolved Hide resolved
Comment on lines 143 to 166
u, err := url.Parse(f.url)
if err != nil {
log.Fatal("Error parsing URL:", err)
return oracletypes.AssetPair{}, nil
}
header := http.Header{}
header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(f.header)))

dialer := websocket.DefaultDialer
dialer.EnableCompression = true

// Connect to the WebSocket server
conn, resp, err := dialer.Dial(u.String(), header)
if err != nil {
if resp != nil {
log.Printf("Handshake failed with status: %d\n", resp.StatusCode)
for k, v := range resp.Header {
log.Printf("%s: %v\n", k, v)
}
}
log.Fatal("Error connecting to WebSocket:", err)
return oracletypes.AssetPair{}, nil
}
defer conn.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't initiate and close a websocket connection every time we want to pull price.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to here, so we don't need to make the connection every time we want to pull price


signedPriceOfAssetPair.Signature = common.Hex2Bytes(signature)
signedPriceOfAssetPair.PublisherKey = signeds.PublisherKey
signedPriceOfAssetPair.Timestamp = ConvertTimestampToSecond(signeds.TimestampedSignature.Timestamp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is quite fragile and assumes prices are ALWAYS signed by relayers in seconds, which may not be the case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have to have a step to convert the timestamp to second because our VerifyStorkMsgSignature needs that type of time to verify the signature, do you think where could we do that in our relayer or not?

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 7c72b62 and 58f2a05.

Files selected for processing (7)
  • cmd/injective-price-oracle/options.go (3 hunks)
  • cmd/injective-price-oracle/oracle.go (6 hunks)
  • examples/stork.toml (1 hunks)
  • oracle/feed_dynamic.go (1 hunks)
  • oracle/feed_stork.go (1 hunks)
  • oracle/models.go (1 hunks)
  • oracle/service.go (13 hunks)
Files skipped from review as they are similar to previous changes (3)
  • examples/stork.toml
  • oracle/feed_dynamic.go
  • oracle/service.go
Additional comments not posted (6)
oracle/models.go (1)

26-28: Review of the new field AssetPairs in PriceData struct.

The addition of AssetPairs to the PriceData struct is aligned with the PR's objective to support Stork Oracle. This field will store asset pairs related to price data, which is crucial for the new functionality.

Ensure that the rest of the system properly utilizes this new field, especially in serialization and deserialization processes, as well as in any logic that processes PriceData instances.

cmd/injective-price-oracle/options.go (2)

Line range hint 139-156: Review of the new parameter storkFeedsDir in initExternalFeedsOptions function.

The addition of storkFeedsDir parameter is consistent with the PR's goal to support Stork feeds. It allows the application to accept a new command-line argument for specifying the directory containing Stork feed configurations.

Ensure that this parameter is correctly handled wherever initExternalFeedsOptions is called, and verify that appropriate error handling is in place if the directory is not accessible or the files within are not correctly formatted.


204-219: New function initStorkOracleWebSocket review.

This function initializes WebSocket parameters for connecting to the Stork Oracle. It's a crucial addition for the real-time data feed functionality.

Check the usage of this function in the system to ensure the WebSocket connection is managed efficiently, including reconnection logic and error handling. Also, verify that the websocketUrl and websocketHeader are secured and validated appropriately to prevent security issues.

oracle/feed_stork.go (2)

42-50: Review of ParseStorkFeedConfig function.

This function parses Stork feed configurations from TOML format. It's well-implemented with proper error handling using errors.Wrap, which enhances error messages with context.

Ensure that the error messages are clear and provide sufficient information for troubleshooting configuration issues. Also, verify that this function is covered by unit tests, especially for edge cases in the TOML format.


52-100: Review of NewStorkPriceFeed function.

This function initializes a new Stork price feed based on the provided configuration. The error handling is robust, with specific messages for different failure scenarios, such as invalid pull intervals.

Consider adding more detailed logging at key steps within this function to aid in debugging and operational monitoring. Also, ensure that the minimum pull interval is justified by the system's requirements and that it doesn't lead to excessive load or data freshness issues.

cmd/injective-price-oracle/oracle.go (1)

297-326: Review of ConnectWebSocket function.

This function manages the WebSocket connection to the Stork Oracle. The retry logic and error handling are well-implemented, ensuring robustness in maintaining the connection.

Ensure that the retry count and delay are configurable to allow fine-tuning based on operational experience. Also, verify that the security of the WebSocket connection is adequately addressed, particularly in the handling of authentication headers.

Comment on lines 122 to 162
// PullAssetPair pulls asset pair for an asset id
func (f *storkPriceFeed) PullAssetPairs(ctx context.Context, conn *websocket.Conn) (assetPairs []*oracletypes.AssetPair, err error) {
metrics.ReportFuncCall(f.svcTags)
doneFn := metrics.ReportFuncTiming(f.svcTags)
defer doneFn()

err = conn.WriteMessage(websocket.TextMessage, []byte(f.message))
if err != nil {
log.Fatal("Error writing message:", err)
return []*oracletypes.AssetPair{}, nil
}

var msgNeed []byte
count := 0
for count < 2 {
_, message, err := conn.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
return []*oracletypes.AssetPair{}, nil

}
msgNeed = message
count += 1
}

var msgResp messageResponse
if err = json.Unmarshal(msgNeed, &msgResp); err != nil {
return []*oracletypes.AssetPair{}, nil
}
assetIds := make([]string, 0)
for key := range msgResp.Data {
assetIds = append(assetIds, key)
}

for _, assetId := range assetIds {
pair := ConvertDataToAssetPair(msgResp.Data[assetId], assetId)
assetPairs = append(assetPairs, &pair)
}

return assetPairs, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of PullAssetPairs function.

This function is responsible for pulling asset pair data via a WebSocket connection. It includes metrics reporting and timing, which is good for performance monitoring.

However, the error handling could be improved. Currently, the function logs errors but still returns an empty slice and nil error, which might lead to silent failures in upstream logic. Consider modifying the error handling to return meaningful errors to the caller.

- return []*oracletypes.AssetPair{}, nil
+ return nil, err
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// PullAssetPair pulls asset pair for an asset id
func (f *storkPriceFeed) PullAssetPairs(ctx context.Context, conn *websocket.Conn) (assetPairs []*oracletypes.AssetPair, err error) {
metrics.ReportFuncCall(f.svcTags)
doneFn := metrics.ReportFuncTiming(f.svcTags)
defer doneFn()
err = conn.WriteMessage(websocket.TextMessage, []byte(f.message))
if err != nil {
log.Fatal("Error writing message:", err)
return []*oracletypes.AssetPair{}, nil
}
var msgNeed []byte
count := 0
for count < 2 {
_, message, err := conn.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
return []*oracletypes.AssetPair{}, nil
}
msgNeed = message
count += 1
}
var msgResp messageResponse
if err = json.Unmarshal(msgNeed, &msgResp); err != nil {
return []*oracletypes.AssetPair{}, nil
}
assetIds := make([]string, 0)
for key := range msgResp.Data {
assetIds = append(assetIds, key)
}
for _, assetId := range assetIds {
pair := ConvertDataToAssetPair(msgResp.Data[assetId], assetId)
assetPairs = append(assetPairs, &pair)
}
return assetPairs, nil
}
// PullAssetPair pulls asset pair for an asset id
func (f *storkPriceFeed) PullAssetPairs(ctx context.Context, conn *websocket.Conn) (assetPairs []*oracletypes.AssetPair, err error) {
metrics.ReportFuncCall(f.svcTags)
doneFn := metrics.ReportFuncTiming(f.svcTags)
defer doneFn()
err = conn.WriteMessage(websocket.TextMessage, []byte(f.message))
if err != nil {
log.Fatal("Error writing message:", err)
return nil, err
}
var msgNeed []byte
count := 0
for count < 2 {
_, message, err := conn.ReadMessage()
if err != nil {
log.Println("Error reading message:", err)
return nil, err
}
msgNeed = message
count += 1
}
var msgResp messageResponse
if err = json.Unmarshal(msgNeed, &msgResp); err != nil {
return nil, err
}
assetIds := make([]string, 0)
for key := range msgResp.Data {
assetIds = append(assetIds, key)
}
for _, assetId := range assetIds {
pair := ConvertDataToAssetPair(msgResp.Data[assetId], assetId)
assetPairs = append(assetPairs, &pair)
}
return assetPairs, nil
}

Comment on lines 220 to 257
storkFeedConfigs := make([]*oracle.StorkFeedConfig, 0, 10)
if len(*storkFeedsDir) > 0 {
err := filepath.WalkDir(*storkFeedsDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
} else if d.IsDir() {
return nil
} else if filepath.Ext(path) != ".toml" {
return nil
}

cfgBody, err := ioutil.ReadFile(path)
if err != nil {
err = errors.Wrapf(err, "failed to read stork feed config")
return err
}

feedCfg, err := oracle.ParseStorkFeedConfig(cfgBody)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"filename": d.Name(),
}).Errorln("failed to parse stork feed config")
return nil
}

storkFeedConfigs = append(storkFeedConfigs, feedCfg)

return nil
})

if err != nil {
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")
return
}

log.Infof("found %d stork feed configs", len(storkFeedConfigs))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of Stork feed configuration handling in oracleCmd function.

The handling of Stork feed configurations is well-integrated into the existing system. The use of filepath.WalkDir to load configurations from a directory is efficient and appropriate.

However, there's a copy-paste error in the error handling (line 251). The error message incorrectly references *dynamicFeedsDir instead of *storkFeedsDir. This needs correction to avoid confusion during troubleshooting.

- err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
+ err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *storkFeedsDir)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
storkFeedConfigs := make([]*oracle.StorkFeedConfig, 0, 10)
if len(*storkFeedsDir) > 0 {
err := filepath.WalkDir(*storkFeedsDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
} else if d.IsDir() {
return nil
} else if filepath.Ext(path) != ".toml" {
return nil
}
cfgBody, err := ioutil.ReadFile(path)
if err != nil {
err = errors.Wrapf(err, "failed to read stork feed config")
return err
}
feedCfg, err := oracle.ParseStorkFeedConfig(cfgBody)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"filename": d.Name(),
}).Errorln("failed to parse stork feed config")
return nil
}
storkFeedConfigs = append(storkFeedConfigs, feedCfg)
return nil
})
if err != nil {
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")
return
}
log.Infof("found %d stork feed configs", len(storkFeedConfigs))
}
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *storkFeedsDir)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 58f2a05 and 7d7e2bb.

Files selected for processing (3)
  • .env.example (1 hunks)
  • examples/stork.toml (1 hunks)
  • oracle/feed_stork.go (1 hunks)
Files skipped from review as they are similar to previous changes (2)
  • examples/stork.toml
  • oracle/feed_stork.go
Additional comments not posted (1)
.env.example (1)

31-31: Well-configured WebSocket URL for Stork oracle.

The URL is correctly using the wss:// protocol for secure communication. However, consider adding a comment to remind users to update this URL for production environments.

@@ -27,3 +27,6 @@ ORACLE_STATSD_ADDR="localhost:8125"
ORACLE_STATSD_STUCK_DUR="5m"
ORACLE_STATSD_MOCKING=false
ORACLE_STATSD_DISABLED=false

STORK_WEBSOCKET_URL="wss://dev.api.stork-oracle.network/evm/subscribe"
STORK_WEBSOCKET_HEADER=
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide guidance for STORK_WEBSOCKET_HEADER.

The variable is left empty, which might be confusing for users. Consider adding a comment or example value to clarify what kind of data might be expected here, such as authentication tokens or specific headers required by the Stork oracle.

@danidomi
Copy link
Contributor

I created this small PR let me know if it makes sense the improvs

decentrio#1

@ThanhNhann

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 7d7e2bb and 08e14c9.

Files selected for processing (5)
  • cmd/injective-price-oracle/oracle.go (9 hunks)
  • oracle/feed_binance.go (2 hunks)
  • oracle/feed_dynamic.go (2 hunks)
  • oracle/feed_stork.go (1 hunks)
  • oracle/service.go (16 hunks)
Files skipped from review due to trivial changes (1)
  • oracle/feed_dynamic.go
Additional comments not posted (1)
oracle/feed_binance.go (1)

125-132: LGTM! Improved resource management with defer.

The replacement of ioutil.ReadAll with io.ReadAll and the use of defer for closing the response body are good practices. They improve resource management and ensure the response body is closed properly.

Comment on lines 128 to 132
err = conn.WriteMessage(websocket.TextMessage, []byte(f.message))
if err != nil {
log.Infoln("Error writing message:", err)
return
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider optimizing WebSocket message reading.

The current implementation reads messages in a loop with a fixed count. Consider using a more robust mechanism to determine when to stop reading, such as a specific message type or content.

Comment on lines +200 to +215
func ConvertTimestampToSecond(timestamp uint64) uint64 {
switch {
// nanosecond
case timestamp > 1e18:
return timestamp / uint64(1_000_000_000)
// microsecond
case timestamp > 1e15:
return timestamp / uint64(1_000_000)
// millisecond
case timestamp > 1e12:
return timestamp / uint64(1_000)
// second
default:
return timestamp
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize timestamp conversion logic.

The ConvertTimestampToSecond function uses a switch-case to convert timestamps. Consider using a more efficient approach, such as logarithmic checks, to determine the scale.

Comment on lines 122 to 163
// PullAssetPairs pulls asset pair for an asset id
func (f *storkPriceFeed) PullAssetPairs(conn *websocket.Conn) (assetPairs []*oracletypes.AssetPair, err error) {
metrics.ReportFuncCall(f.svcTags)
doneFn := metrics.ReportFuncTiming(f.svcTags)
defer doneFn()

err = conn.WriteMessage(websocket.TextMessage, []byte(f.message))
if err != nil {
log.Infoln("Error writing message:", err)
return
}

var msgNeed []byte
count := 0
for count < 2 {
_, message, err := conn.ReadMessage()
if err != nil {
log.Infoln("Error reading message:", err)
return

}
msgNeed = message
count += 1
}

var msgResp messageResponse
if err = json.Unmarshal(msgNeed, &msgResp); err != nil {
return
}

assetIds := make([]string, 0)
for key := range msgResp.Data {
assetIds = append(assetIds, key)
}

for _, assetId := range assetIds {
pair := ConvertDataToAssetPair(msgResp.Data[assetId], assetId)
assetPairs = append(assetPairs, &pair)
}

return assetPairs, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling in PullAssetPairs.

The function logs errors but returns an empty slice and nil error, which might lead to silent failures. Consider returning meaningful errors to the caller.

- return []*oracletypes.AssetPair{}, nil
+ return nil, err

Committable suggestion was skipped due to low confidence.

Comment on lines 312 to 326
conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), header)
if err != nil {
log.Infof("Failed to connect to WebSocket server: %v", err)
retries++
if retries > oracle.MaxRetriesReConnectWebSocket {
log.Infof("Reached maximum retries (%d), exiting...", oracle.MaxRetriesReConnectWebSocket)
return
}
log.Infof("Retrying in 5s...")
time.Sleep(5 * time.Second)
} else {
log.Infof("Connected to WebSocket server")
return
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance WebSocket connection logging.

Consider adding more detailed logging for each retry attempt, including the retry count and any specific error messages received.

Comment on lines 252 to 253
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")
Copy link

@coderabbitai coderabbitai bot Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix copy-paste error in error handling message.

The error message incorrectly references *dynamicFeedsDir instead of *storkFeedsDir. Correct this to avoid confusion.

- err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
+ err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *storkFeedsDir)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *dynamicFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")
err = errors.Wrapf(err, "stork feeds dir is specified, but failed to read from it: %s", *storkFeedsDir)
log.WithError(err).Fatalln("failed to load stork feeds")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ThanhNhann can you accept this please 😉

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Comment on lines 246 to 279
if pricePuller.OracleType() == oracletypes.OracleType_Stork {
storkPricePuller, ok := pricePuller.(*storkPriceFeed)
if !ok {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"oracle": "Stork Oracle",
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("can not convert to stork price feed")
continue
}
assetPairs, err = storkPricePuller.PullAssetPairs(s.storkWebsocket)
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithError(err).Warningln("retrying PullAssetPairs after error")

for i := 0; i < maxRetriesPerInterval; i++ {
if assetPairs, err = storkPricePuller.PullAssetPairs(s.storkWebsocket); err != nil {
time.Sleep(time.Second)
continue
}
break
}

result, err := pricePuller.PullPrice(ctx)
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithError(err).Warningln("retrying PullPrice after error")
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("failed to fetch asset pairs")

for i := 0; i < maxRetriesPerInterval; i++ {
if result, err = pricePuller.PullPrice(ctx); err != nil {
time.Sleep(time.Second)
t.Reset(pricePuller.Interval())
continue
}
break
}

} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve error handling in processSetPriceFeed.

The retry logic for pulling asset pairs could be enhanced by adding exponential backoff to reduce load and improve success rates.

Comment on lines 460 to 475
if priceData.OracleType != oracletypes.OracleType_Stork {
if priceData.Price.IsZero() || priceData.Price.IsNegative() {
s.logger.WithFields(log.Fields{
"ticker": priceData.Ticker,
"provider": priceData.ProviderName,
}).Errorln("got negative or zero price, skipping")
continue
}
} else {
if len(priceData.AssetPairs) == 0 {
s.logger.WithFields(log.Fields{
"ticker": priceData.Ticker,
"provider": priceData.ProviderName,
}).Errorln("got zero asset pair for stork oracle, skipping")
continue
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance validation and logging in commitSetPrices.

Consider adding more detailed logging for skipped price data, including the reason for skipping and any relevant data points.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 08e14c9 and dee052d.

Files selected for processing (1)
  • oracle/feed_stork.go (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • oracle/feed_stork.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between dee052d and 3dcaef3.

Files selected for processing (1)
  • cmd/injective-price-oracle/oracle.go (9 hunks)
Files skipped from review as they are similar to previous changes (1)
  • cmd/injective-price-oracle/oracle.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 3dcaef3 and 1bf959c.

Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
Files selected for processing (2)
  • go.mod (3 hunks)
  • oracle/service.go (14 hunks)
Files skipped from review as they are similar to previous changes (1)
  • oracle/service.go
Additional comments not posted (4)
go.mod (4)

5-6: Specifying the Go toolchain version is a good practice.

This helps ensure consistency across development environments.


8-8: Promotion of cosmossdk.io/math to a direct dependency is acceptable.

This indicates active usage within the codebase.


14-14: Addition of github.com/gorilla/websocket as a direct dependency is appropriate.

This aligns with the PR objectives of supporting Stork feeds.


125-125: Updating github.com/grpc-ecosystem/grpc-gateway/v2 to version 2.16.0 is beneficial.

This update can provide bug fixes and new features. Ensure compatibility with the rest of the codebase.

Run the following script to verify compatibility with the updated dependency:

Verification successful

No direct usage of github.com/grpc-ecosystem/grpc-gateway/v2 found in the codebase.

The dependency is marked as indirect and not directly imported, suggesting minimal risk of compatibility issues with the update.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify compatibility with the updated grpc-gateway dependency.

# Test: Search for grpc-gateway usage. Expect: No compatibility issues.
rg --type go 'grpc-gateway'

Length of output: 27


Script:

#!/bin/bash
# Description: Search for imports of grpc-gateway to verify its usage in the codebase.

# Use ast-grep to find any import statements for grpc-gateway
ast-grep --lang go --pattern 'import $_ "github.com/grpc-ecosystem/grpc-gateway/v2"'

Length of output: 165

storkWebsocket, err := ConnectWebSocket(ctx, *websocketUrl, *websocketHeader)
if err != nil {
err = errors.Wrapf(err, "can not connect with stork oracle websocket")
log.WithError(err).Fatalln("failed to load stork feeds")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be a fatal error; if the Stork feed WS is down for whatever reason this would prevent the whole Injective oracle from starting, if we cannot connect it should disable pulling/pushing Stork oracle prices. Ideally, it should try to reconnect so it can eventually resume the operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we will try to reconnect ws with MaxRetriesReConnectWebSocket = 5, so after that times it will return error with the log that can't not connect with ws, I think in case cannot connect it should disable pulling/pushing Stork as your recommend because we dont want to try to reconnect alot of time and make stuck for another price feeder, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this should be fine 👍

return
}
log.Infof("Retrying connect %sth in 5s...", fmt.Sprint(retries))
time.Sleep(5 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is blocking, it should acknowledge the context

Suggested change
time.Sleep(5 * time.Second)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.NewTimer(5*time.Second).C:
}

retries := 0
for {
conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), header)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should retry only in case of no context error

Suggested change
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
} else if err != nil {

Comment on lines 301 to 302
log.Fatal("Error parsing URL:", err)
return &websocket.Conn{}, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fatal will panic, it should return the error instead

Suggested change
log.Fatal("Error parsing URL:", err)
return &websocket.Conn{}, err
return &websocket.Conn{}, errors.Wrapf(err, "can not parse WS url %s: %v", websocketUrl, err)


var oracleType oracletypes.OracleType
if cfg.OracleType == "" {
oracleType = oracletypes.OracleType_PriceFeed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't default to OracleType_Stork ?

Suggested change
oracleType = oracletypes.OracleType_PriceFeed
oracleType = oracletypes.OracleType_Stork

In fact, I'm confused why the StorkPriceFeed should be able to set any type other than "Stork", what's the rationale behind this?

And moreover seeing that the type is hardcoded in the getter

func (f *storkPriceFeed) OracleType() oracletypes.OracleType {
	return oracletypes.OracleType_Stork
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set wrong for this oracle type, it should be oracleType = oracletypes.OracleType_Stork as you recommend


var msgResp messageResponse
if err = json.Unmarshal(msgNeed, &msgResp); err != nil {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return
f.logger.Warningln("error unmarshalling feed message:", err)
return

@@ -208,38 +230,76 @@ func (s *oracleSvc) processSetPriceFeed(ticker, providerName string, pricePuller
"provider": pricePuller.ProviderName(),
})

ctx := context.Background()
symbol := pricePuller.Symbol()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

symbol will be empty in Stork feeds, why not use the ticker in the config? (even if one file may pull multiple tickers)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update this for stork

}

for _, priceData := range priceBatch {
msg := &oracletypes.MsgRelayStorkPrices{
Copy link
Contributor

@hmoragrega hmoragrega Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the batch includes all the price data structs from all providers, It needs to check the provider type

Suggested change
msg := &oracletypes.MsgRelayStorkPrices{
if priceData.OracleType != oracletypes.OracleType_Stork {
continue
}
msg := &oracletypes.MsgRelayStorkPrices{

}

} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can skip pushing an empty update to the channel

Suggested change
} else {
if len(assetPairs) == 0 {
t.Reset(pricePuller.Interval())
continue
}
} else {

s.logger.WithFields(log.Fields{
"ticker": priceData.Ticker,
"provider": priceData.ProviderName,
}).Errorln("got zero asset pair for stork oracle, skipping")
Copy link
Contributor

@hmoragrega hmoragrega Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seems to be a real error, just something that can happen

Suggested change
}).Errorln("got zero asset pair for stork oracle, skipping")
}).Debugln("got zero asset pair for stork oracle, skipping")

we can avoid this check here completely if we do not push a priceData without any pair as explained in the comment in processSetPriceFeed function

@@ -50,21 +50,24 @@ type MultiPricePuller interface {
type oracleSvc struct {
pricePullers map[string]PricePuller
supportedPriceFeeds map[string]PriceFeedConfig
supportedStorkFeeds map[string]StorkFeedConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is not used

err = errors.Wrapf(err, "failed to init stork price feed for ticker %s", ticker)
return nil, err
}
svc.pricePullers[ticker] = pricePuller
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning: this will overwrite other dynamic feed configs if the ticker collides, I'm ok with it, as long as it is documented, maybe in the Readme so we know that Stork feeds in such case

}

// PullAssetPairs pulls asset pair for an asset id
func (f *storkPriceFeed) PullAssetPairs(conn *websocket.Conn) (assetPairs []*oracletypes.AssetPair, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to send the subscription message more than one time?

I imagine that the optimal way to use this websocket connection is to establish the subscription once and keep reading the incoming message asynchronously in a different goroutine, then when PullAssetPairs is called, simply return the latest values, otherwise, we're using the WS as a normal HTTP request

Copy link
Contributor

@hmoragrega hmoragrega Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the code with multiple stork config and this code is not safe, it tries to send a message to the same WS connection from different pullers concurrently and panics

INFO[0028] Connected to stork websocket                 
INFO[0028] initialized 4 price pullers                   svc=oracle
INFO[0029] starting pullers for 4 feeds                  svc=oracle
panic: concurrent write to websocket connection

We either need to synchronize the access to the WS connection or have different connections

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the logic for stork is the same as your comment, we establish the connection at begin then when we want to get data of assets, just need to make a request

@@ -0,0 +1,5 @@
provider = "Stork"
Copy link
Contributor

@hmoragrega hmoragrega Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should be committed in a different folder, with the code as it is, we cannot have price feed and stork configs in the same folder, because they are parsed twice, as DynamicFeedConfig and then as StorkFeedConfig too

Ideally, it'll be better if both file types can coexist in the same folder, it'll require less DevOps overhead and it's much clearer to know all the enabled feeds, the oracleType should be used to discriminate which type of config and the puller to get the prices

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First we should separate them into different folders then process your idea in another pr to make this pr can be merged soon

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 1bf959c and 75bd2c3.

Files selected for processing (4)
  • cmd/injective-price-oracle/oracle.go (9 hunks)
  • examples/stork/stork.toml (1 hunks)
  • oracle/feed_stork.go (1 hunks)
  • oracle/service.go (13 hunks)
Files skipped from review due to trivial changes (1)
  • examples/stork/stork.toml
Files skipped from review as they are similar to previous changes (2)
  • cmd/injective-price-oracle/oracle.go
  • oracle/feed_stork.go
Additional comments not posted (6)
oracle/service.go (6)

Line range hint 146-190: LGTM! But document the warning about ticker collisions.

The function correctly initializes the new parameters and handles errors appropriately. However, ensure that the warning about overwriting dynamic feed configs if the ticker collides is documented.


Line range hint 206-210: LGTM!

The function correctly includes the new feed provider in the switch case.


378-395: LGTM!

The function correctly composes messages for Stork price feeds.


396-401: LGTM!

The function correctly includes the new composeStorkOracleMsgs method.


Line range hint 106-133: LGTM!

The function correctly retrieves enabled price feeds.


Line range hint 518-533: LGTM!

The function correctly recovers from panics and logs errors.

Comment on lines +464 to +479
if priceData.OracleType != oracletypes.OracleType_Stork {
if priceData.Price.IsZero() || priceData.Price.IsNegative() {
s.logger.WithFields(log.Fields{
"ticker": priceData.Ticker,
"provider": priceData.ProviderName,
}).Debugln("got negative or zero price, skipping")
continue
}
} else {
if len(priceData.AssetPairs) == 0 {
s.logger.WithFields(log.Fields{
"ticker": priceData.Ticker,
"provider": priceData.ProviderName,
}).Debugln("got zero asset pair for stork oracle, skipping")
continue
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance validation and logging.

The function correctly includes validation for Stork price data. Consider adding more detailed logging for skipped price data, including the reason for skipping and any relevant data points.

Apply this diff to enhance logging for skipped price data:

 if priceData.OracleType != oracletypes.OracleType_Stork {
     if priceData.Price.IsZero() || priceData.Price.IsNegative() {
         s.logger.WithFields(log.Fields{
             "ticker":   priceData.Ticker,
             "provider": priceData.ProviderName,
+            "reason":   "negative or zero price",
         }).Debugln("got negative or zero price, skipping")
         continue
     }
 } else {
     if len(priceData.AssetPairs) == 0 {
         s.logger.WithFields(log.Fields{
             "ticker":   priceData.Ticker,
             "provider": priceData.ProviderName,
+            "reason":   "zero asset pair",
         }).Debugln("got zero asset pair for stork oracle, skipping")
         continue
     }
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if priceData.OracleType != oracletypes.OracleType_Stork {
if priceData.Price.IsZero() || priceData.Price.IsNegative() {
s.logger.WithFields(log.Fields{
"ticker": priceData.Ticker,
"provider": priceData.ProviderName,
}).Debugln("got negative or zero price, skipping")
continue
}
} else {
if len(priceData.AssetPairs) == 0 {
s.logger.WithFields(log.Fields{
"ticker": priceData.Ticker,
"provider": priceData.ProviderName,
}).Debugln("got zero asset pair for stork oracle, skipping")
continue
}
if priceData.OracleType != oracletypes.OracleType_Stork {
if priceData.Price.IsZero() || priceData.Price.IsNegative() {
s.logger.WithFields(log.Fields{
"ticker": priceData.Ticker,
"provider": priceData.ProviderName,
"reason": "negative or zero price",
}).Debugln("got negative or zero price, skipping")
continue
}
} else {
if len(priceData.AssetPairs) == 0 {
s.logger.WithFields(log.Fields{
"ticker": priceData.Ticker,
"provider": priceData.ProviderName,
"reason": "zero asset pair",
}).Debugln("got zero asset pair for stork oracle, skipping")
continue
}

Comment on lines +234 to +301
ctx, cancelFn := context.WithTimeout(context.Background(), maxRespTime)
defer cancelFn()
// define price and asset pair to tracking late
var err error
price := zeroPrice
var assetPairs []*oracletypes.AssetPair

if pricePuller.OracleType() == oracletypes.OracleType_Stork {
storkPricePuller, ok := pricePuller.(*storkPriceFeed)
if !ok {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"oracle": "Stork Oracle",
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("can not convert to stork price feed")
continue
}
assetPairs, err = storkPricePuller.PullAssetPairs(s.storkWebsocket)
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithError(err).Warningln("retrying PullAssetPairs after error")

for i := 0; i < maxRetriesPerInterval; i++ {
if assetPairs, err = storkPricePuller.PullAssetPairs(s.storkWebsocket); err != nil {
time.Sleep(time.Second)
continue
}
break
}

result, err := pricePuller.PullPrice(ctx)
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithError(err).Warningln("retrying PullPrice after error")
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("failed to fetch asset pairs")

for i := 0; i < maxRetriesPerInterval; i++ {
if result, err = pricePuller.PullPrice(ctx); err != nil {
time.Sleep(time.Second)
t.Reset(pricePuller.Interval())
continue
}
break
}

if len(assetPairs) == 0 {
t.Reset(pricePuller.Interval())
continue
}
} else {
price, err = pricePuller.PullPrice(ctx)
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"symbol": symbol,
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("failed to fetch price")
feedLogger.WithError(err).Warningln("retrying PullPrice after error")

for i := 0; i < maxRetriesPerInterval; i++ {
if price, err = pricePuller.PullPrice(ctx); err != nil {
time.Sleep(time.Second)
continue
}
break
}

t.Reset(pricePuller.Interval())
continue
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"symbol": symbol,
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("failed to fetch price")

t.Reset(pricePuller.Interval())
continue
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance retry logic with exponential backoff.

The function correctly implements the logic for Stork price feeds and includes appropriate error handling and retry logic. However, consider enhancing the retry logic with exponential backoff to reduce load and improve success rates.

Apply this diff to enhance the retry logic with exponential backoff:

 for i := 0; i < maxRetriesPerInterval; i++ {
-    if assetPairs, err = storkPricePuller.PullAssetPairs(s.storkWebsocket); err != nil {
-        time.Sleep(time.Second)
+    backoff := time.Second
+    for i := 0; i < maxRetriesPerInterval; i++ {
+        if assetPairs, err = storkPricePuller.PullAssetPairs(s.storkWebsocket); err != nil {
+            time.Sleep(backoff)
+            backoff *= 2
             continue
         }
         break
     }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ctx, cancelFn := context.WithTimeout(context.Background(), maxRespTime)
defer cancelFn()
// define price and asset pair to tracking late
var err error
price := zeroPrice
var assetPairs []*oracletypes.AssetPair
if pricePuller.OracleType() == oracletypes.OracleType_Stork {
storkPricePuller, ok := pricePuller.(*storkPriceFeed)
if !ok {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"oracle": "Stork Oracle",
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("can not convert to stork price feed")
continue
}
assetPairs, err = storkPricePuller.PullAssetPairs(s.storkWebsocket)
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithError(err).Warningln("retrying PullAssetPairs after error")
for i := 0; i < maxRetriesPerInterval; i++ {
if assetPairs, err = storkPricePuller.PullAssetPairs(s.storkWebsocket); err != nil {
time.Sleep(time.Second)
continue
}
break
}
result, err := pricePuller.PullPrice(ctx)
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithError(err).Warningln("retrying PullPrice after error")
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("failed to fetch asset pairs")
for i := 0; i < maxRetriesPerInterval; i++ {
if result, err = pricePuller.PullPrice(ctx); err != nil {
time.Sleep(time.Second)
t.Reset(pricePuller.Interval())
continue
}
break
}
if len(assetPairs) == 0 {
t.Reset(pricePuller.Interval())
continue
}
} else {
price, err = pricePuller.PullPrice(ctx)
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"symbol": symbol,
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("failed to fetch price")
feedLogger.WithError(err).Warningln("retrying PullPrice after error")
for i := 0; i < maxRetriesPerInterval; i++ {
if price, err = pricePuller.PullPrice(ctx); err != nil {
time.Sleep(time.Second)
continue
}
break
}
t.Reset(pricePuller.Interval())
continue
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"symbol": symbol,
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("failed to fetch price")
t.Reset(pricePuller.Interval())
continue
}
ctx, cancelFn := context.WithTimeout(context.Background(), maxRespTime)
defer cancelFn()
// define price and asset pair to tracking late
var err error
price := zeroPrice
var assetPairs []*oracletypes.AssetPair
if pricePuller.OracleType() == oracletypes.OracleType_Stork {
storkPricePuller, ok := pricePuller.(*storkPriceFeed)
if !ok {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"oracle": "Stork Oracle",
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("can not convert to stork price feed")
continue
}
assetPairs, err = storkPricePuller.PullAssetPairs(s.storkWebsocket)
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithError(err).Warningln("retrying PullAssetPairs after error")
backoff := time.Second
for i := 0; i < maxRetriesPerInterval; i++ {
if assetPairs, err = storkPricePuller.PullAssetPairs(s.storkWebsocket); err != nil {
time.Sleep(backoff)
backoff *= 2
continue
}
break
}
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("failed to fetch asset pairs")
t.Reset(pricePuller.Interval())
continue
}
}
if len(assetPairs) == 0 {
t.Reset(pricePuller.Interval())
continue
}
} else {
price, err = pricePuller.PullPrice(ctx)
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithError(err).Warningln("retrying PullPrice after error")
for i := 0; i < maxRetriesPerInterval; i++ {
if price, err = pricePuller.PullPrice(ctx); err != nil {
time.Sleep(time.Second)
continue
}
break
}
if err != nil {
metrics.ReportFuncError(s.svcTags)
feedLogger.WithFields(log.Fields{
"symbol": symbol,
"retries": maxRetriesPerInterval,
}).WithError(err).Errorln("failed to fetch price")
t.Reset(pricePuller.Interval())
continue
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants