Skip to content

Commit

Permalink
merge usd liquidity based pool selection for dex scrapers.
Browse files Browse the repository at this point in the history
  • Loading branch information
jppade committed Jul 19, 2023
2 parents 63c6050 + e917338 commit 9f13bd6
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 83 deletions.
2 changes: 1 addition & 1 deletion cmd/exchange-scrapers/collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/exchange-scrapers/collector
go 1.17

require (
github.com/diadata-org/diadata v1.4.304
github.com/diadata-org/diadata v1.4.306
github.com/segmentio/kafka-go v0.4.35
github.com/sirupsen/logrus v1.9.0
)
Expand Down
11 changes: 6 additions & 5 deletions pkg/dia/Messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,14 @@ func (p *Pool) SufficientNativeBalance(threshold float64) bool {
return sufficientNativeBalance
}

func (p *Pool) TotalUSDLiquidity() (totalLiquidity float64, lowerBound bool) {
for _, pa := range p.Assetvolumes {
if pa.VolumeUSD > 0 {
totalLiquidity += pa.VolumeUSD
} else {
// GetPoolLiquidityUSD returns the total USD liquidity if available.
// @lowerBound is true in case USD liquidity is not available for all pool assets.
func (p *Pool) GetPoolLiquidityUSD() (totalLiquidity float64, lowerBound bool) {
for _, av := range p.Assetvolumes {
if av.VolumeUSD == 0 {
lowerBound = true
}
totalLiquidity += av.VolumeUSD
}
return
}
Expand Down
74 changes: 37 additions & 37 deletions pkg/dia/scraper/exchange-scrapers/APIScraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,31 +142,31 @@ func NewAPIScraper(exchange string, scrape bool, key string, secret string, relD
case dia.BancorExchange:
return NewBancorScraper(Exchanges[dia.BancorExchange], scrape)
case dia.UniswapExchange:
return NewUniswapScraper(Exchanges[dia.UniswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.UniswapExchange], scrape, relDB)
case dia.PanCakeSwap:
return NewUniswapScraper(Exchanges[dia.PanCakeSwap], scrape)
return NewUniswapScraper(Exchanges[dia.PanCakeSwap], scrape, relDB)
case dia.PanCakeSwapExchangeV3:
return NewUniswapV3Scraper(Exchanges[dia.PanCakeSwapExchangeV3], scrape)
return NewUniswapV3Scraper(Exchanges[dia.PanCakeSwapExchangeV3], scrape, relDB)
case dia.SushiSwapExchange:
return NewUniswapScraper(Exchanges[dia.SushiSwapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.SushiSwapExchange], scrape, relDB)
case dia.SushiSwapExchangePolygon:
return NewUniswapScraper(Exchanges[dia.SushiSwapExchangePolygon], scrape)
return NewUniswapScraper(Exchanges[dia.SushiSwapExchangePolygon], scrape, relDB)
case dia.SushiSwapExchangeArbitrum:
return NewUniswapScraper(Exchanges[dia.SushiSwapExchangeArbitrum], scrape)
return NewUniswapScraper(Exchanges[dia.SushiSwapExchangeArbitrum], scrape, relDB)
case dia.SushiSwapExchangeFantom:
return NewUniswapScraper(Exchanges[dia.SushiSwapExchangeFantom], scrape)
return NewUniswapScraper(Exchanges[dia.SushiSwapExchangeFantom], scrape, relDB)
case dia.CamelotExchange:
return NewUniswapScraper(Exchanges[dia.CamelotExchange], scrape)
return NewUniswapScraper(Exchanges[dia.CamelotExchange], scrape, relDB)
case dia.CurveFIExchange:
return NewCurveFIScraper(Exchanges[dia.CurveFIExchange], scrape)
return NewCurveFIScraper(Exchanges[dia.CurveFIExchange], scrape, relDB)
case dia.CurveFIExchangeFantom:
return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeFantom], scrape)
return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeFantom], scrape, relDB)
case dia.CurveFIExchangeMoonbeam:
return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeMoonbeam], scrape)
return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeMoonbeam], scrape, relDB)
case dia.CurveFIExchangePolygon:
return NewCurveFIScraper(Exchanges[dia.CurveFIExchangePolygon], scrape)
return NewCurveFIScraper(Exchanges[dia.CurveFIExchangePolygon], scrape, relDB)
case dia.CurveFIExchangeArbitrum:
return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeArbitrum], scrape)
return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeArbitrum], scrape, relDB)
case dia.BalancerV2Exchange:
return NewBalancerV2Scraper(Exchanges[dia.BalancerV2Exchange], scrape, relDB)
case dia.BalancerV2ExchangeArbitrum:
Expand All @@ -188,61 +188,61 @@ func NewAPIScraper(exchange string, scrape bool, key string, secret string, relD
case dia.BKEX2Exchange:
return NewBKEXScraper(Exchanges[dia.BKEXExchange], dia.BKEX2Exchange, scrape, relDB)
case dia.UniswapExchangeV3:
return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3], scrape)
return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3], scrape, relDB)
case dia.DfynNetwork:
return NewUniswapScraper(Exchanges[dia.DfynNetwork], scrape)
return NewUniswapScraper(Exchanges[dia.DfynNetwork], scrape, relDB)
case dia.UbeswapExchange:
return NewUniswapScraper(Exchanges[dia.UbeswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.UbeswapExchange], scrape, relDB)
case dia.UniswapExchangeV3Polygon:
return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3Polygon], scrape)
return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3Polygon], scrape, relDB)
case dia.UniswapExchangeV3Arbitrum:
return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3Arbitrum], scrape)
return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3Arbitrum], scrape, relDB)
case dia.HuckleberryExchange:
return NewUniswapScraper(Exchanges[dia.HuckleberryExchange], scrape)
return NewUniswapScraper(Exchanges[dia.HuckleberryExchange], scrape, relDB)
case dia.TraderJoeExchange:
return NewUniswapScraper(Exchanges[dia.TraderJoeExchange], scrape)
return NewUniswapScraper(Exchanges[dia.TraderJoeExchange], scrape, relDB)
case dia.PangolinExchange:
return NewUniswapScraper(Exchanges[dia.PangolinExchange], scrape)
return NewUniswapScraper(Exchanges[dia.PangolinExchange], scrape, relDB)
case dia.PlatypusExchange:
return NewPlatypusScraper(Exchanges[dia.PlatypusExchange], scrape)
return NewPlatypusScraper(Exchanges[dia.PlatypusExchange], scrape, relDB)
case dia.SpookyswapExchange:
return NewUniswapScraper(Exchanges[dia.SpookyswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.SpookyswapExchange], scrape, relDB)
case dia.QuickswapExchange:
return NewUniswapScraper(Exchanges[dia.QuickswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.QuickswapExchange], scrape, relDB)
case dia.SpiritswapExchange:
return NewUniswapScraper(Exchanges[dia.SpiritswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.SpiritswapExchange], scrape, relDB)
case dia.SolarbeamExchange:
return NewUniswapScraper(Exchanges[dia.SolarbeamExchange], scrape)
return NewUniswapScraper(Exchanges[dia.SolarbeamExchange], scrape, relDB)
case dia.TrisolarisExchange:
return NewUniswapScraper(Exchanges[dia.TrisolarisExchange], scrape)
return NewUniswapScraper(Exchanges[dia.TrisolarisExchange], scrape, relDB)
case dia.ByBitExchange:
return NewByBitScraper(Exchanges[dia.ByBitExchange], scrape, relDB)
case dia.OrcaExchange:
return NewOrcaScraper(Exchanges[dia.OrcaExchange], scrape)
case dia.AnyswapExchange:
return NewAnyswapScraper(Exchanges[dia.AnyswapExchange], scrape, relDB)
case dia.NetswapExchange:
return NewUniswapScraper(Exchanges[dia.NetswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.NetswapExchange], scrape, relDB)
case dia.BitMexExchange:
return NewBitMexScraper(Exchanges[dia.BitMexExchange], scrape, relDB)
case dia.TethysExchange:
return NewUniswapScraper(Exchanges[dia.TethysExchange], scrape)
return NewUniswapScraper(Exchanges[dia.TethysExchange], scrape, relDB)
case dia.HermesExchange:
return NewUniswapScraper(Exchanges[dia.HermesExchange], scrape)
return NewUniswapScraper(Exchanges[dia.HermesExchange], scrape, relDB)
case dia.OmniDexExchange:
return NewUniswapScraper(Exchanges[dia.OmniDexExchange], scrape)
return NewUniswapScraper(Exchanges[dia.OmniDexExchange], scrape, relDB)
case dia.DiffusionExchange:
return NewUniswapScraper(Exchanges[dia.DiffusionExchange], scrape)
return NewUniswapScraper(Exchanges[dia.DiffusionExchange], scrape, relDB)
case dia.ApeswapExchange:
return NewUniswapScraper(Exchanges[dia.ApeswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.ApeswapExchange], scrape, relDB)
case dia.BiswapExchange:
return NewUniswapScraper(Exchanges[dia.BiswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.BiswapExchange], scrape, relDB)
case dia.ArthswapExchange:
return NewUniswapScraper(Exchanges[dia.ArthswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.ArthswapExchange], scrape, relDB)
case dia.StellaswapExchange:
return NewUniswapScraper(Exchanges[dia.StellaswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.StellaswapExchange], scrape, relDB)
case dia.WanswapExchange:
return NewUniswapScraper(Exchanges[dia.WanswapExchange], scrape)
return NewUniswapScraper(Exchanges[dia.WanswapExchange], scrape, relDB)
case dia.OsmosisExchange:
return NewOsmosisScraper(Exchanges[dia.OsmosisExchange], scrape, relDB)
case dia.ZenlinkswapExchange:
Expand Down
23 changes: 19 additions & 4 deletions pkg/dia/scraper/exchange-scrapers/BalancerV2Scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,14 @@ func NewBalancerV2Scraper(exchange dia.Exchange, scrape bool, relDB *models.RelD
log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThreshold)
}

scraper.fetchAdmissiblePools(liquidityThreshold)
// Only include pools with (minimum) liquidity USD value bigger than given env var.
liquidityThresholdUSD, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD_USD", "0"), 64)
if err != nil {
liquidityThresholdUSD = float64(0)
log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThresholdUSD)
}

scraper.fetchAdmissiblePools(liquidityThreshold, liquidityThresholdUSD)

scraper.ws = ws
scraper.rest = rest
Expand Down Expand Up @@ -333,15 +340,23 @@ func (s *BalancerV2Scraper) ScrapePair(pair dia.ExchangePair) (PairScraper, erro

// fetchAdmissiblePools fetches all pools from postgres with native liquidity > liquidityThreshold and
// (if available) liquidity in USD > liquidityThresholdUSD.
func (s *BalancerV2Scraper) fetchAdmissiblePools(liquidityThreshold float64) {
func (s *BalancerV2Scraper) fetchAdmissiblePools(liquidityThreshold float64, liquidityThresholdUSD float64) {
poolsPreselection, err := s.relDB.GetAllPoolsExchange(s.exchangeName, liquidityThreshold)
if err != nil {
log.Error("fetch all admissible pools: ", err)
}
log.Infof("Found %v pools after preselection.", len(poolsPreselection))

for _, pool := range poolsPreselection {
s.admissiblePools[common.HexToAddress(pool.Address)] = struct{}{}
liquidity, lowerBound := pool.GetPoolLiquidityUSD()
// Discard pool if complete USD liquidity is below threshold.
if !lowerBound && liquidity < liquidityThresholdUSD {
continue
} else {
s.admissiblePools[common.HexToAddress(pool.Address)] = struct{}{}
}
}
log.Infof("Found %v pools after perselection.", len(poolsPreselection))
log.Infof("Found %v pools after USD liquidity filtering.", len(s.admissiblePools))
}

func (s *BalancerV2Scraper) FetchAvailablePairs() (pairs []dia.ExchangePair, err error) {
Expand Down
48 changes: 33 additions & 15 deletions pkg/dia/scraper/exchange-scrapers/CurvefiScraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type CurveFIScraper struct {
}

// makeCurvefiScraper returns a curve finance scraper as used in NewCurvefiScraper.
func makeCurvefiScraper(exchange dia.Exchange, restDial string, wsDial string) *CurveFIScraper {
func makeCurvefiScraper(exchange dia.Exchange, restDial string, wsDial string, relDB *models.RelDB) *CurveFIScraper {
var (
restClient, wsClient *ethclient.Client
err error
Expand All @@ -143,6 +143,7 @@ func makeCurvefiScraper(exchange dia.Exchange, restDial string, wsDial string) *
exchangeName: exchange.Name,
RestClient: restClient,
WsClient: wsClient,
relDB: relDB,
initDone: make(chan nothing),
shutdown: make(chan nothing),
shutdownDone: make(chan nothing),
Expand All @@ -156,52 +157,56 @@ func makeCurvefiScraper(exchange dia.Exchange, restDial string, wsDial string) *
},
}

scraper.relDB, err = models.NewPostgresDataStore()
if err != nil {
log.Fatal("new postgres datastore: ", err)
}
// Only include pools with (minimum) liquidity bigger than given env var.
liquidityThreshold, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD", "0"), 64)
if err != nil {
liquidityThreshold = float64(0)
log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThreshold)
}

scraper.loadPools(liquidityThreshold)
// Only include pools with (minimum) liquidity USD value bigger than given env var.
liquidityThresholdUSD, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD_USD", "0"), 64)
if err != nil {
liquidityThresholdUSD = float64(0)
log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThresholdUSD)
}

scraper.loadPools(liquidityThreshold, liquidityThresholdUSD)

return scraper
}

func NewCurveFIScraper(exchange dia.Exchange, scrape bool) *CurveFIScraper {
func NewCurveFIScraper(exchange dia.Exchange, scrape bool, relDB *models.RelDB) *CurveFIScraper {

var scraper *CurveFIScraper

switch exchange.Name {
case dia.CurveFIExchange:
scraper = makeCurvefiScraper(exchange, curveRestDialEth, curveWsDialEth)
scraper = makeCurvefiScraper(exchange, curveRestDialEth, curveWsDialEth, relDB)
basePoolRegistry := curveRegistry{Type: 1, Address: common.HexToAddress("0x90E00ACe148ca3b23Ac1bC8C240C2a7Dd9c2d7f5")}
metaPoolRegistry := curveRegistry{Type: 2, Address: common.HexToAddress("0xB9fC157394Af804a3578134A6585C0dc9cc990d4")}
scraper.registriesUnderlying = []curveRegistry{metaPoolRegistry, basePoolRegistry}
scraper.screenPools = true

case dia.CurveFIExchangeFantom:
scraper = makeCurvefiScraper(exchange, curveRestDialFantom, curveWsDialFantom)
scraper = makeCurvefiScraper(exchange, curveRestDialFantom, curveWsDialFantom, relDB)
stableSwapFactory := curveRegistry{Type: 2, Address: common.HexToAddress("0x686d67265703d1f124c45e33d47d794c566889ba")}
scraper.registriesUnderlying = []curveRegistry{stableSwapFactory}
scraper.screenPools = false

case dia.CurveFIExchangeMoonbeam:
scraper = makeCurvefiScraper(exchange, curveRestDialMoonbeam, curveWsDialMoonbeam)
scraper = makeCurvefiScraper(exchange, curveRestDialMoonbeam, curveWsDialMoonbeam, relDB)
stableSwapFactory := curveRegistry{Type: 2, Address: common.HexToAddress("0x4244eB811D6e0Ef302326675207A95113dB4E1F8")}
scraper.registriesUnderlying = []curveRegistry{stableSwapFactory}
scraper.screenPools = false

case dia.CurveFIExchangePolygon:
scraper = makeCurvefiScraper(exchange, curveRestDialPolygon, curveWsDialPolygon)
scraper = makeCurvefiScraper(exchange, curveRestDialPolygon, curveWsDialPolygon, relDB)
stableSwapFactory := curveRegistry{Type: 2, Address: common.HexToAddress("0x722272D36ef0Da72FF51c5A65Db7b870E2e8D4ee")}
scraper.registriesUnderlying = []curveRegistry{stableSwapFactory}
scraper.screenPools = false
case dia.CurveFIExchangeArbitrum:
scraper = makeCurvefiScraper(exchange, curveRestDialArbitrum, curveWsDialArbitrum)
scraper = makeCurvefiScraper(exchange, curveRestDialArbitrum, curveWsDialArbitrum, relDB)
stableSwapFactory := curveRegistry{Type: 2, Address: common.HexToAddress("0xb17b674D9c5CB2e441F8e196a2f048A81355d031")}
scraper.registriesUnderlying = []curveRegistry{stableSwapFactory}
}
Expand Down Expand Up @@ -317,7 +322,7 @@ func (scraper *CurveFIScraper) watchSwaps(pool string) error {
log.Info("got swap V2")
scraper.processSwap(pool, swpV2)
case swpUnderlying := <-sinkUnderlying:
log.Warn("got underlying USDR swap: ", swpUnderlying)
log.Warn("got underlying swap: ", swpUnderlying)
// Only fetch trades from USDR pool until we have parsing TokenExchangeUnderlying resolved.
// if pool == common.HexToAddress("0xa138341185a9d0429b0021a11fb717b225e13e1f").Hex() && Exchanges[scraper.exchangeName].BlockChain.Name == dia.POLYGON {
// scraper.processSwap(pool, swpUnderlying)
Expand Down Expand Up @@ -515,15 +520,26 @@ func (scraper *CurveFIScraper) getSwapDataCurve(pool string, swp interface{}) (
return
}

func (scraper *CurveFIScraper) loadPools(liquidityThreshold float64) {
func (scraper *CurveFIScraper) loadPools(liquidityThreshold float64, liquidityThresholdUSD float64) {

pools, err := scraper.relDB.GetAllPoolsExchange(scraper.exchangeName, liquidityThreshold)
if err != nil {
log.Fatal("fetch pools: ", err)
}
log.Infof("found %v pools to subscribe: ", len(pools))

lowerBoundCount := 0
for _, pool := range pools {

liquidity, lowerBound := pool.GetPoolLiquidityUSD()
// Discard pool if complete USD liquidity is below threshold.
if !lowerBound && liquidity < liquidityThresholdUSD {
continue
}
if lowerBound {
log.Info("lowerBound pool: ", pool.Address)
lowerBoundCount++
}

var poolCoinsMap = make(map[int]*CurveCoin)
for _, av := range pool.Assetvolumes {
poolCoinsMap[int(av.Index)] = &CurveCoin{
Expand All @@ -539,6 +555,8 @@ func (scraper *CurveFIScraper) loadPools(liquidityThreshold float64) {
}
scraper.pools.setPool(pool.Address, poolCoinsMap)
}
log.Infof("Total number of %v pools to subscribe.", len(scraper.pools.pools))
log.Infof("Number of lower bound pools: %v", lowerBoundCount)
}

func (scraper *CurveFIScraper) FetchAvailablePairs() (pairs []dia.ExchangePair, err error) {
Expand Down
Loading

0 comments on commit 9f13bd6

Please sign in to comment.