From 31a05b4db5f7d8f1932ac1186b1c485dacb8c247 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 23 Oct 2024 09:55:53 -0400 Subject: [PATCH] feat: add tip listener to daemon (#382) (#385) * added function to listen for tipped queries * added tip listening process to reporter daemon client and updated app.go so that it doesn't generate the config files every time but just reads them. Allowing users to just edit the config files to add pairs or remove sources * made some small changes to function names and added a check to make sure the query that was tipped is a spot price * made lint fixes (cherry picked from commit 06ce0ade63bce330c0aaeb88d234eafcfe5b82ee) Co-authored-by: CJPotter10 <91627020+CJPotter10@users.noreply.github.com> --- app/app.go | 3 -- daemons/reporter/client/broadcast_message.go | 2 +- daemons/reporter/client/client.go | 3 ++ daemons/reporter/client/reporter_monitors.go | 49 +++++++++++++++++++- 4 files changed, 52 insertions(+), 5 deletions(-) diff --git a/app/app.go b/app/app.go index 1787850d6..996fe7714 100644 --- a/app/app.go +++ b/app/app.go @@ -641,9 +641,6 @@ func New( // Start server for handling gRPC messages from daemons. go app.Server.Start() - configs.WriteDefaultPricefeedExchangeToml(homePath) - configs.WriteDefaultMarketParamsToml(homePath) - exchangeQueryConfig := configs.ReadExchangeQueryConfigFile(homePath) marketParamsConfig := configs.ReadMarketParamsConfigFile(homePath) // Start pricefeed client for sending prices for the pricefeed server to consume. These prices diff --git a/daemons/reporter/client/broadcast_message.go b/daemons/reporter/client/broadcast_message.go index 5bb490018..72390adc0 100644 --- a/daemons/reporter/client/broadcast_message.go +++ b/daemons/reporter/client/broadcast_message.go @@ -77,7 +77,7 @@ func (c *Client) generateDepositmessages(ctx context.Context) error { // return nil // } -func (c *Client) CyclelistMessages(ctx context.Context, qd []byte, querymeta *oracletypes.QueryMeta) error { +func (c *Client) GenerateAndBroadcastSpotPriceReport(ctx context.Context, qd []byte, querymeta *oracletypes.QueryMeta) error { value, err := c.median(qd) if err != nil { return fmt.Errorf("error getting median from median client': %w", err) diff --git a/daemons/reporter/client/client.go b/daemons/reporter/client/client.go index 3b2ef88bf..9b2ce4552 100644 --- a/daemons/reporter/client/client.go +++ b/daemons/reporter/client/client.go @@ -177,6 +177,9 @@ func StartReporterDaemonTaskLoop( wg.Add(1) go client.MonitorTokenBridgeReports(ctx, &wg) + wg.Add(1) + go client.MonitorForTippedQueries(ctx, &wg) + wg.Wait() } diff --git a/daemons/reporter/client/reporter_monitors.go b/daemons/reporter/client/reporter_monitors.go index faf1298f4..a895b135e 100644 --- a/daemons/reporter/client/reporter_monitors.go +++ b/daemons/reporter/client/reporter_monitors.go @@ -3,10 +3,13 @@ package client import ( "bytes" "context" + "strings" "sync" "time" oracletypes "github.com/tellor-io/layer/x/oracle/types" + + "github.com/cosmos/cosmos-sdk/types/query" ) func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup) { @@ -25,7 +28,7 @@ func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup) } go func(ctx context.Context, qd []byte, qm *oracletypes.QueryMeta) { - err := c.CyclelistMessages(ctx, querydata, qm) + err := c.GenerateAndBroadcastSpotPriceReport(ctx, querydata, qm) if err != nil { c.logger.Error("Generating CycleList message", "error", err) } @@ -55,3 +58,47 @@ func (c *Client) MonitorTokenBridgeReports(ctx context.Context, wg *sync.WaitGro time.Sleep(4 * time.Minute) } } + +func (c *Client) MonitorForTippedQueries(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + var localWG sync.WaitGroup + for { + res, err := c.OracleQueryClient.TippedQueries(ctx, &oracletypes.QueryTippedQueriesRequest{ + Pagination: &query.PageRequest{ + Offset: 0, + }, + }) + if err != nil { + c.logger.Error("Error querying for TippedQueries: ", err) + time.Sleep(200 * time.Millisecond) + continue + } + if len(res.Queries) == 0 { + c.logger.Info("No tipped queries returned") + time.Sleep(200 * time.Millisecond) + continue + } + status, err := c.cosmosCtx.Client.Status(ctx) + if err != nil { + c.logger.Info("Error getting status from client: ", err) + } + height := uint64(status.SyncInfo.LatestBlockHeight) + for i := 0; i < len(res.Queries); i++ { + if height > res.Queries[i].Expiration || commitedIds[res.Queries[i].Id] || strings.EqualFold(res.Queries[i].QueryType, "SpotPrice") { + continue + } + + localWG.Add(1) + go func(query *oracletypes.QueryMeta) { + defer localWG.Done() + err := c.GenerateAndBroadcastSpotPriceReport(ctx, query.GetQueryData(), query) + if err != nil { + c.logger.Error("Error generating report for tipped query: ", err) + } + }(res.Queries[i]) + } + + wg.Wait() + + } +}