diff --git a/cmd/http/graphqlServer/go.mod b/cmd/http/graphqlServer/go.mod index ccf40dda8..dedb442e0 100644 --- a/cmd/http/graphqlServer/go.mod +++ b/cmd/http/graphqlServer/go.mod @@ -3,7 +3,7 @@ module graphqlServer go 1.17 require ( - github.com/diadata-org/diadata v1.4.320 + github.com/diadata-org/diadata v1.4.334 github.com/graph-gophers/graphql-go v1.1.0 github.com/sirupsen/logrus v1.8.1 ) diff --git a/pkg/dia/helpers/queryHelper/filtersExtended.go b/pkg/dia/helpers/queryHelper/filtersExtended.go index 09f4fb998..cce061b56 100644 --- a/pkg/dia/helpers/queryHelper/filtersExtended.go +++ b/pkg/dia/helpers/queryHelper/filtersExtended.go @@ -30,6 +30,10 @@ func FilterMAextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volum maFilter.FinalCompute(time.Unix(block.TimeStamp/1e9, 0)) fp := maFilter.FilterPointForBlock() + // In case the volume filter above filters out all trades, set fp to the last one. + if fpe.TradesCount == 0 { + fp = lastfp + } if len(block.Trades) > 0 { fp.LastTrade = block.Trades[len(block.Trades)-1] @@ -60,8 +64,7 @@ func FilterMAextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volum func FilterMAIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64) (filterPointsExtended []dia.FilterPointExtended) { lastfp := &dia.FilterPoint{} - for i, block := range tradeBlocks { - log.Infof("block number: %v", i) + for _, block := range tradeBlocks { fpe := &dia.FilterPointExtended{} pairs, pools := block.GetBlockSources() fpe.Pairs = pairs @@ -69,7 +72,6 @@ func FilterMAIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vol if len(block.Trades) > 0 { mairFilter := filters.NewFilterMAIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) - firstBlock := block.Trades[0] for _, trade := range block.Trades { if trade.VolumeUSD() > volumeThresholdUSD { fpe.TradesCount++ @@ -79,8 +81,12 @@ func FilterMAIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vol mairFilter.FinalCompute(time.Unix(block.TimeStamp/1e9, 0)) fp := mairFilter.FilterPointForBlock() + // In case the volume filter above filters out all trades, set fp to the last one. + if fpe.TradesCount == 0 { + fp = lastfp + } - fp.FirstTrade = firstBlock + fp.FirstTrade = block.Trades[0] if len(block.Trades) > 0 { fp.LastTrade = block.Trades[len(block.Trades)-1] } else { @@ -128,6 +134,10 @@ func FilterVWAPextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vol vwapFilter.FinalCompute(block.Trades[0].Time) fp := vwapFilter.FilterPointForBlock() + // In case the volume filter above filters out all trades, set fp to the last one. + if fpe.TradesCount == 0 { + fp = lastfp + } fp.FirstTrade = block.Trades[0] @@ -172,6 +182,10 @@ func FilterVWAPIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, v } vwapirFilter.FinalCompute(time.Unix(block.TimeStamp/1e9, 0)) fp := vwapirFilter.FilterPointForBlock() + // In case the volume filter above filters out all trades, set fp to the last one. + if fpe.TradesCount == 0 { + fp = lastfp + } fp.FirstTrade = block.Trades[0] @@ -223,9 +237,9 @@ func FilterMEDIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vo } medirFilter.FinalCompute(time.Unix(block.TimeStamp/1e9, 0)) fp := medirFilter.FilterPointForBlock() - if fp == nil { - log.Error("failed getting FilterPointForBlock") - return + // In case the volume filter above filters out all trades, set fp to the last one. + if fpe.TradesCount == 0 { + fp = lastfp } fp.FirstTrade = block.Trades[0] diff --git a/pkg/graphql/resolver/root.go b/pkg/graphql/resolver/root.go index 369b05f1e..0b60f6bb8 100644 --- a/pkg/graphql/resolver/root.go +++ b/pkg/graphql/resolver/root.go @@ -19,9 +19,10 @@ import ( ) var ( - log = logrus.New() - EXCHANGES = scrapers.Exchanges - BLOCKCHAINS = scrapers.Blockchains + log = logrus.New() + EXCHANGES = scrapers.Exchanges + BLOCKCHAINS = scrapers.Blockchains + lookbackTradesNumber = 10 ) const ( @@ -550,7 +551,8 @@ func (r *DiaResolver) GetFeed(ctx context.Context, args struct { endtimes = append(endtimes, bin.Endtime) } } - trades, err = r.DS.GetTradesByFeedSelection(feedselection, starttimes, endtimes) + + trades, err = r.DS.GetTradesByFeedSelection(feedselection, starttimes, endtimes, 0) if err != nil { return sr, err } @@ -558,17 +560,12 @@ func (r *DiaResolver) GetFeed(ctx context.Context, args struct { log.Info("generating bins. Total bins: ", len(bins)) if len(trades) > 0 && len(bins) > 0 { - // In case the first bin is empty, look for the last trade before @starttime. + // In case the first bin is empty, look for the last trades before @starttime + // in order to select the most recent one with sufficient volume. if !utils.IsInBin(trades[0].Time, bins[0]) { - var exchanges []string - for _, f := range feedselection { - for _, e := range f.Exchangepairs { - exchanges = append(exchanges, e.Exchange.Name) - } - } - previousTrade, baseAsseteErr := r.DS.GetTradesByExchangesAndBaseAssets(feedselection[0].Asset, []dia.Asset{}, exchanges, endtime.AddDate(0, 0, -10), starttime, 1) - if baseAsseteErr != nil || len(previousTrade) == 0 { - log.Error("get initial trade: ", err, baseAsseteErr) + previousTrade, err := r.DS.GetTradesByFeedSelection(feedselection, []time.Time{endtime.AddDate(0, 0, -10)}, []time.Time{starttime}, lookbackTradesNumber) + if len(previousTrade) == 0 { + log.Error("get initial trade: ", err) // Fill with a zero trade so we can build blocks. auxTrade := trades[0] auxTrade.Volume = 0 @@ -576,7 +573,12 @@ func (r *DiaResolver) GetFeed(ctx context.Context, args struct { auxTrade.EstimatedUSDPrice = 0 trades = append([]dia.Trade{auxTrade}, trades...) } else { - trades = append([]dia.Trade{previousTrade[0]}, trades...) + for _, t := range previousTrade { + if t.VolumeUSD() > tradeVolumeThreshold { + trades = append([]dia.Trade{t}, trades...) + break + } + } } } tradeBlocks = queryhelper.NewBlockGenerator(trades).GenerateBlocks(blockSizeSeconds, blockShiftSeconds, bins) diff --git a/pkg/model/db.go b/pkg/model/db.go index 981eb3747..d5c76774d 100644 --- a/pkg/model/db.go +++ b/pkg/model/db.go @@ -54,7 +54,7 @@ type Datastore interface { GetxcTradesByExchangesBatched(quoteassets []dia.Asset, exchanges []string, startTimes []time.Time, endTimes []time.Time) ([]dia.Trade, error) GetTradesByExchangepairs(exchangepairMap map[string][]dia.Pair, exchangepoolMap map[string][]string, starttime time.Time, endtime time.Time) ([]dia.Trade, error) - GetTradesByFeedSelection(feedselection []dia.FeedSelection, starttimes []time.Time, endtimes []time.Time) ([]dia.Trade, error) + GetTradesByFeedSelection(feedselection []dia.FeedSelection, starttimes []time.Time, endtimes []time.Time, limit int) ([]dia.Trade, error) GetActiveExchangesAndPairs(address string, blockchain string, numTradesThreshold int64, starttime time.Time, endtime time.Time) (map[string][]dia.Pair, map[string]int64, error) GetOldTradesFromInflux(table string, exchange string, verified bool, timeInit, timeFinal time.Time) ([]dia.Trade, error) diff --git a/pkg/model/trades.go b/pkg/model/trades.go index 2f30588c4..3941edf0b 100644 --- a/pkg/model/trades.go +++ b/pkg/model/trades.go @@ -599,7 +599,12 @@ func (datastore *DB) GetxcTradesByExchangesBatched( } // GetTradesByFeedSelection returns all trades with restrictions given by the struct @feedselection. -func (datastore *DB) GetTradesByFeedSelection(feedselection []dia.FeedSelection, starttimes []time.Time, endtimes []time.Time) ([]dia.Trade, error) { +func (datastore *DB) GetTradesByFeedSelection( + feedselection []dia.FeedSelection, + starttimes []time.Time, + endtimes []time.Time, + limit int, +) ([]dia.Trade, error) { var ( query string r []dia.Trade @@ -610,7 +615,7 @@ func (datastore *DB) GetTradesByFeedSelection(feedselection []dia.FeedSelection, } for i := range starttimes { - query = fmt.Sprintf(` + query += fmt.Sprintf(` SELECT time,estimatedUSDPrice,exchange,foreignTradeID,pair,price,symbol,volume,verified,basetokenblockchain,basetokenaddress,quotetokenblockchain,quotetokenaddress,pooladdress FROM %s WHERE ( `, @@ -685,13 +690,18 @@ func (datastore *DB) GetTradesByFeedSelection(feedselection []dia.FeedSelection, } // The bracket closes the main statement from the first WHERE clause. + var limitQuery string + if len(starttimes) == 1 && limit > 0 { + limitQuery = fmt.Sprintf(" ORDER BY DESC LIMIT %v", limit) + } query += fmt.Sprintf(` ) AND estimatedUSDPrice > 0 AND time > %d - AND time < %d ;`, + AND time < %d %s;`, starttimes[i].UnixNano(), endtimes[i].UnixNano(), + limitQuery, ) }