Skip to content

Commit

Permalink
Merge branch 'master' of github.com:diadata-org/diadata
Browse files Browse the repository at this point in the history
  • Loading branch information
nnn-gif committed Aug 30, 2023
2 parents 23ee0b9 + 3bf502c commit 23c5a1c
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 27 deletions.
2 changes: 1 addition & 1 deletion cmd/http/graphqlServer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module graphqlServer
go 1.17

require (
github.com/diadata-org/diadata v1.4.320
github.com/diadata-org/diadata v1.4.334
github.com/graph-gophers/graphql-go v1.1.0
github.com/sirupsen/logrus v1.8.1
)
Expand Down
28 changes: 21 additions & 7 deletions pkg/dia/helpers/queryHelper/filtersExtended.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func FilterMAextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volum

maFilter.FinalCompute(time.Unix(block.TimeStamp/1e9, 0))
fp := maFilter.FilterPointForBlock()
// In case the volume filter above filters out all trades, set fp to the last one.
if fpe.TradesCount == 0 {
fp = lastfp
}

if len(block.Trades) > 0 {
fp.LastTrade = block.Trades[len(block.Trades)-1]
Expand Down Expand Up @@ -60,16 +64,14 @@ func FilterMAextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volum
func FilterMAIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64) (filterPointsExtended []dia.FilterPointExtended) {
lastfp := &dia.FilterPoint{}

for i, block := range tradeBlocks {
log.Infof("block number: %v", i)
for _, block := range tradeBlocks {
fpe := &dia.FilterPointExtended{}
pairs, pools := block.GetBlockSources()
fpe.Pairs = pairs
fpe.Pools = pools

if len(block.Trades) > 0 {
mairFilter := filters.NewFilterMAIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize)
firstBlock := block.Trades[0]
for _, trade := range block.Trades {
if trade.VolumeUSD() > volumeThresholdUSD {
fpe.TradesCount++
Expand All @@ -79,8 +81,12 @@ func FilterMAIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vol

mairFilter.FinalCompute(time.Unix(block.TimeStamp/1e9, 0))
fp := mairFilter.FilterPointForBlock()
// In case the volume filter above filters out all trades, set fp to the last one.
if fpe.TradesCount == 0 {
fp = lastfp
}

fp.FirstTrade = firstBlock
fp.FirstTrade = block.Trades[0]
if len(block.Trades) > 0 {
fp.LastTrade = block.Trades[len(block.Trades)-1]
} else {
Expand Down Expand Up @@ -128,6 +134,10 @@ func FilterVWAPextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vol

vwapFilter.FinalCompute(block.Trades[0].Time)
fp := vwapFilter.FilterPointForBlock()
// In case the volume filter above filters out all trades, set fp to the last one.
if fpe.TradesCount == 0 {
fp = lastfp
}

fp.FirstTrade = block.Trades[0]

Expand Down Expand Up @@ -172,6 +182,10 @@ func FilterVWAPIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, v
}
vwapirFilter.FinalCompute(time.Unix(block.TimeStamp/1e9, 0))
fp := vwapirFilter.FilterPointForBlock()
// In case the volume filter above filters out all trades, set fp to the last one.
if fpe.TradesCount == 0 {
fp = lastfp
}

fp.FirstTrade = block.Trades[0]

Expand Down Expand Up @@ -223,9 +237,9 @@ func FilterMEDIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vo
}
medirFilter.FinalCompute(time.Unix(block.TimeStamp/1e9, 0))
fp := medirFilter.FilterPointForBlock()
if fp == nil {
log.Error("failed getting FilterPointForBlock")
return
// In case the volume filter above filters out all trades, set fp to the last one.
if fpe.TradesCount == 0 {
fp = lastfp
}

fp.FirstTrade = block.Trades[0]
Expand Down
32 changes: 17 additions & 15 deletions pkg/graphql/resolver/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
)

var (
log = logrus.New()
EXCHANGES = scrapers.Exchanges
BLOCKCHAINS = scrapers.Blockchains
log = logrus.New()
EXCHANGES = scrapers.Exchanges
BLOCKCHAINS = scrapers.Blockchains
lookbackTradesNumber = 10
)

const (
Expand Down Expand Up @@ -550,33 +551,34 @@ func (r *DiaResolver) GetFeed(ctx context.Context, args struct {
endtimes = append(endtimes, bin.Endtime)
}
}
trades, err = r.DS.GetTradesByFeedSelection(feedselection, starttimes, endtimes)

trades, err = r.DS.GetTradesByFeedSelection(feedselection, starttimes, endtimes, 0)
if err != nil {
return sr, err
}
log.Println("Generating blocks, Total Trades", len(trades))
log.Info("generating bins. Total bins: ", len(bins))

if len(trades) > 0 && len(bins) > 0 {
// In case the first bin is empty, look for the last trade before @starttime.
// In case the first bin is empty, look for the last trades before @starttime
// in order to select the most recent one with sufficient volume.
if !utils.IsInBin(trades[0].Time, bins[0]) {
var exchanges []string
for _, f := range feedselection {
for _, e := range f.Exchangepairs {
exchanges = append(exchanges, e.Exchange.Name)
}
}
previousTrade, baseAsseteErr := r.DS.GetTradesByExchangesAndBaseAssets(feedselection[0].Asset, []dia.Asset{}, exchanges, endtime.AddDate(0, 0, -10), starttime, 1)
if baseAsseteErr != nil || len(previousTrade) == 0 {
log.Error("get initial trade: ", err, baseAsseteErr)
previousTrade, err := r.DS.GetTradesByFeedSelection(feedselection, []time.Time{endtime.AddDate(0, 0, -10)}, []time.Time{starttime}, lookbackTradesNumber)
if len(previousTrade) == 0 {
log.Error("get initial trade: ", err)
// Fill with a zero trade so we can build blocks.
auxTrade := trades[0]
auxTrade.Volume = 0
auxTrade.Price = 0
auxTrade.EstimatedUSDPrice = 0
trades = append([]dia.Trade{auxTrade}, trades...)
} else {
trades = append([]dia.Trade{previousTrade[0]}, trades...)
for _, t := range previousTrade {
if t.VolumeUSD() > tradeVolumeThreshold {
trades = append([]dia.Trade{t}, trades...)
break
}
}
}
}
tradeBlocks = queryhelper.NewBlockGenerator(trades).GenerateBlocks(blockSizeSeconds, blockShiftSeconds, bins)
Expand Down
2 changes: 1 addition & 1 deletion pkg/model/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Datastore interface {
GetxcTradesByExchangesBatched(quoteassets []dia.Asset, exchanges []string, startTimes []time.Time, endTimes []time.Time) ([]dia.Trade, error)

GetTradesByExchangepairs(exchangepairMap map[string][]dia.Pair, exchangepoolMap map[string][]string, starttime time.Time, endtime time.Time) ([]dia.Trade, error)
GetTradesByFeedSelection(feedselection []dia.FeedSelection, starttimes []time.Time, endtimes []time.Time) ([]dia.Trade, error)
GetTradesByFeedSelection(feedselection []dia.FeedSelection, starttimes []time.Time, endtimes []time.Time, limit int) ([]dia.Trade, error)

GetActiveExchangesAndPairs(address string, blockchain string, numTradesThreshold int64, starttime time.Time, endtime time.Time) (map[string][]dia.Pair, map[string]int64, error)
GetOldTradesFromInflux(table string, exchange string, verified bool, timeInit, timeFinal time.Time) ([]dia.Trade, error)
Expand Down
16 changes: 13 additions & 3 deletions pkg/model/trades.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,12 @@ func (datastore *DB) GetxcTradesByExchangesBatched(
}

// GetTradesByFeedSelection returns all trades with restrictions given by the struct @feedselection.
func (datastore *DB) GetTradesByFeedSelection(feedselection []dia.FeedSelection, starttimes []time.Time, endtimes []time.Time) ([]dia.Trade, error) {
func (datastore *DB) GetTradesByFeedSelection(
feedselection []dia.FeedSelection,
starttimes []time.Time,
endtimes []time.Time,
limit int,
) ([]dia.Trade, error) {
var (
query string
r []dia.Trade
Expand All @@ -610,7 +615,7 @@ func (datastore *DB) GetTradesByFeedSelection(feedselection []dia.FeedSelection,
}

for i := range starttimes {
query = fmt.Sprintf(`
query += fmt.Sprintf(`
SELECT time,estimatedUSDPrice,exchange,foreignTradeID,pair,price,symbol,volume,verified,basetokenblockchain,basetokenaddress,quotetokenblockchain,quotetokenaddress,pooladdress
FROM %s
WHERE ( `,
Expand Down Expand Up @@ -685,13 +690,18 @@ func (datastore *DB) GetTradesByFeedSelection(feedselection []dia.FeedSelection,
}

// The bracket closes the main statement from the first WHERE clause.
var limitQuery string
if len(starttimes) == 1 && limit > 0 {
limitQuery = fmt.Sprintf(" ORDER BY DESC LIMIT %v", limit)
}
query += fmt.Sprintf(`
)
AND estimatedUSDPrice > 0
AND time > %d
AND time < %d ;`,
AND time < %d %s;`,
starttimes[i].UnixNano(),
endtimes[i].UnixNano(),
limitQuery,
)
}

Expand Down

0 comments on commit 23c5a1c

Please sign in to comment.