diff --git a/app/app.go b/app/app.go index 1787850d6..996fe7714 100644 --- a/app/app.go +++ b/app/app.go @@ -641,9 +641,6 @@ func New( // Start server for handling gRPC messages from daemons. go app.Server.Start() - configs.WriteDefaultPricefeedExchangeToml(homePath) - configs.WriteDefaultMarketParamsToml(homePath) - exchangeQueryConfig := configs.ReadExchangeQueryConfigFile(homePath) marketParamsConfig := configs.ReadMarketParamsConfigFile(homePath) // Start pricefeed client for sending prices for the pricefeed server to consume. These prices diff --git a/daemons/reporter/client/broadcast_message.go b/daemons/reporter/client/broadcast_message.go index 5bb490018..72390adc0 100644 --- a/daemons/reporter/client/broadcast_message.go +++ b/daemons/reporter/client/broadcast_message.go @@ -77,7 +77,7 @@ func (c *Client) generateDepositmessages(ctx context.Context) error { // return nil // } -func (c *Client) CyclelistMessages(ctx context.Context, qd []byte, querymeta *oracletypes.QueryMeta) error { +func (c *Client) GenerateAndBroadcastSpotPriceReport(ctx context.Context, qd []byte, querymeta *oracletypes.QueryMeta) error { value, err := c.median(qd) if err != nil { return fmt.Errorf("error getting median from median client': %w", err) diff --git a/daemons/reporter/client/client.go b/daemons/reporter/client/client.go index 3b2ef88bf..9b2ce4552 100644 --- a/daemons/reporter/client/client.go +++ b/daemons/reporter/client/client.go @@ -177,6 +177,9 @@ func StartReporterDaemonTaskLoop( wg.Add(1) go client.MonitorTokenBridgeReports(ctx, &wg) + wg.Add(1) + go client.MonitorForTippedQueries(ctx, &wg) + wg.Wait() } diff --git a/daemons/reporter/client/reporter_monitors.go b/daemons/reporter/client/reporter_monitors.go index faf1298f4..a895b135e 100644 --- a/daemons/reporter/client/reporter_monitors.go +++ b/daemons/reporter/client/reporter_monitors.go @@ -3,10 +3,13 @@ package client import ( "bytes" "context" + "strings" "sync" "time" oracletypes "github.com/tellor-io/layer/x/oracle/types" + + "github.com/cosmos/cosmos-sdk/types/query" ) func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup) { @@ -25,7 +28,7 @@ func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup) } go func(ctx context.Context, qd []byte, qm *oracletypes.QueryMeta) { - err := c.CyclelistMessages(ctx, querydata, qm) + err := c.GenerateAndBroadcastSpotPriceReport(ctx, querydata, qm) if err != nil { c.logger.Error("Generating CycleList message", "error", err) } @@ -55,3 +58,47 @@ func (c *Client) MonitorTokenBridgeReports(ctx context.Context, wg *sync.WaitGro time.Sleep(4 * time.Minute) } } + +func (c *Client) MonitorForTippedQueries(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + var localWG sync.WaitGroup + for { + res, err := c.OracleQueryClient.TippedQueries(ctx, &oracletypes.QueryTippedQueriesRequest{ + Pagination: &query.PageRequest{ + Offset: 0, + }, + }) + if err != nil { + c.logger.Error("Error querying for TippedQueries: ", err) + time.Sleep(200 * time.Millisecond) + continue + } + if len(res.Queries) == 0 { + c.logger.Info("No tipped queries returned") + time.Sleep(200 * time.Millisecond) + continue + } + status, err := c.cosmosCtx.Client.Status(ctx) + if err != nil { + c.logger.Info("Error getting status from client: ", err) + } + height := uint64(status.SyncInfo.LatestBlockHeight) + for i := 0; i < len(res.Queries); i++ { + if height > res.Queries[i].Expiration || commitedIds[res.Queries[i].Id] || strings.EqualFold(res.Queries[i].QueryType, "SpotPrice") { + continue + } + + localWG.Add(1) + go func(query *oracletypes.QueryMeta) { + defer localWG.Done() + err := c.GenerateAndBroadcastSpotPriceReport(ctx, query.GetQueryData(), query) + if err != nil { + c.logger.Error("Error generating report for tipped query: ", err) + } + }(res.Queries[i]) + } + + wg.Wait() + + } +}