From 40e3da3594785d30a2facf1765d32fffc1a8340e Mon Sep 17 00:00:00 2001 From: jppade Date: Tue, 4 Jul 2023 11:08:56 +0200 Subject: [PATCH 1/2] first commit in usd liquidity filters test branch. --- pkg/dia/Messages.go | 12 + .../scraper/exchange-scrapers/APIScraper.go | 78 ++--- .../exchange-scrapers/BalancerV2Scraper.go | 269 ++++++++---------- .../exchange-scrapers/CurvefiScraper.go | 46 ++- .../exchange-scrapers/UniswapV2Scraper.go | 30 +- .../exchange-scrapers/UniswapV3Scraper.go | 28 +- 6 files changed, 237 insertions(+), 226 deletions(-) diff --git a/pkg/dia/Messages.go b/pkg/dia/Messages.go index 4c5576154..4a046ca94 100644 --- a/pkg/dia/Messages.go +++ b/pkg/dia/Messages.go @@ -380,6 +380,18 @@ func (p *Pool) SufficientNativeBalance(threshold float64) bool { return sufficientNativeBalance } +// GetPoolLiquidityUSD returns the total USD liquidity if available. +// @lowerBound is true in case USD liquidity is not available for all pool assets. +func (p *Pool) GetPoolLiquidityUSD() (totalLiquidity float64, lowerBound bool) { + for _, av := range p.Assetvolumes { + if av.VolumeUSD == 0 { + lowerBound = true + } + totalLiquidity += av.VolumeUSD + } + return +} + // MarshalBinary is a custom marshaller for BlockChain type func (bc *BlockChain) MarshalBinary() ([]byte, error) { return json.Marshal(bc) diff --git a/pkg/dia/scraper/exchange-scrapers/APIScraper.go b/pkg/dia/scraper/exchange-scrapers/APIScraper.go index 3312f767d..b9446c689 100644 --- a/pkg/dia/scraper/exchange-scrapers/APIScraper.go +++ b/pkg/dia/scraper/exchange-scrapers/APIScraper.go @@ -142,37 +142,37 @@ func NewAPIScraper(exchange string, scrape bool, key string, secret string, relD case dia.BancorExchange: return NewBancorScraper(Exchanges[dia.BancorExchange], scrape) case dia.UniswapExchange: - return NewUniswapScraper(Exchanges[dia.UniswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.UniswapExchange], scrape, relDB) case dia.PanCakeSwap: - return NewUniswapScraper(Exchanges[dia.PanCakeSwap], scrape) + return NewUniswapScraper(Exchanges[dia.PanCakeSwap], scrape, relDB) case dia.SushiSwapExchange: - return NewUniswapScraper(Exchanges[dia.SushiSwapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.SushiSwapExchange], scrape, relDB) case dia.SushiSwapExchangePolygon: - return NewUniswapScraper(Exchanges[dia.SushiSwapExchangePolygon], scrape) + return NewUniswapScraper(Exchanges[dia.SushiSwapExchangePolygon], scrape, relDB) case dia.SushiSwapExchangeArbitrum: - return NewUniswapScraper(Exchanges[dia.SushiSwapExchangeArbitrum], scrape) + return NewUniswapScraper(Exchanges[dia.SushiSwapExchangeArbitrum], scrape, relDB) case dia.SushiSwapExchangeFantom: - return NewUniswapScraper(Exchanges[dia.SushiSwapExchangeFantom], scrape) + return NewUniswapScraper(Exchanges[dia.SushiSwapExchangeFantom], scrape, relDB) case dia.CamelotExchange: - return NewUniswapScraper(Exchanges[dia.CamelotExchange], scrape) + return NewUniswapScraper(Exchanges[dia.CamelotExchange], scrape, relDB) case dia.CurveFIExchange: - return NewCurveFIScraper(Exchanges[dia.CurveFIExchange], scrape) + return NewCurveFIScraper(Exchanges[dia.CurveFIExchange], scrape, relDB) case dia.CurveFIExchangeFantom: - return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeFantom], scrape) + return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeFantom], scrape, relDB) case dia.CurveFIExchangeMoonbeam: - return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeMoonbeam], scrape) + return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeMoonbeam], scrape, relDB) case dia.CurveFIExchangePolygon: - return NewCurveFIScraper(Exchanges[dia.CurveFIExchangePolygon], scrape) + return NewCurveFIScraper(Exchanges[dia.CurveFIExchangePolygon], scrape, relDB) case dia.CurveFIExchangeArbitrum: - return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeArbitrum], scrape) + return NewCurveFIScraper(Exchanges[dia.CurveFIExchangeArbitrum], scrape, relDB) case dia.BalancerV2Exchange: - return NewBalancerV2Scraper(Exchanges[dia.BalancerV2Exchange], scrape) + return NewBalancerV2Scraper(Exchanges[dia.BalancerV2Exchange], scrape, relDB) case dia.BalancerV2ExchangeArbitrum: - return NewBalancerV2Scraper(Exchanges[dia.BalancerV2ExchangeArbitrum], scrape) + return NewBalancerV2Scraper(Exchanges[dia.BalancerV2ExchangeArbitrum], scrape, relDB) case dia.BalancerV2ExchangePolygon: - return NewBalancerV2Scraper(Exchanges[dia.BalancerV2ExchangePolygon], scrape) + return NewBalancerV2Scraper(Exchanges[dia.BalancerV2ExchangePolygon], scrape, relDB) case dia.BeetsExchange: - return NewBalancerV2Scraper(Exchanges[dia.BeetsExchange], scrape) + return NewBalancerV2Scraper(Exchanges[dia.BeetsExchange], scrape, relDB) case dia.KuCoinExchange: return NewKuCoinScraper(key, secret, Exchanges[dia.KuCoinExchange], scrape, relDB) case dia.BitMartExchange: @@ -186,33 +186,33 @@ func NewAPIScraper(exchange string, scrape bool, key string, secret string, relD case dia.BKEX2Exchange: return NewBKEXScraper(Exchanges[dia.BKEXExchange], dia.BKEX2Exchange, scrape, relDB) case dia.UniswapExchangeV3: - return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3], scrape) + return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3], scrape, relDB) case dia.DfynNetwork: - return NewUniswapScraper(Exchanges[dia.DfynNetwork], scrape) + return NewUniswapScraper(Exchanges[dia.DfynNetwork], scrape, relDB) case dia.UbeswapExchange: - return NewUniswapScraper(Exchanges[dia.UbeswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.UbeswapExchange], scrape, relDB) case dia.UniswapExchangeV3Polygon: - return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3Polygon], scrape) + return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3Polygon], scrape, relDB) case dia.UniswapExchangeV3Arbitrum: - return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3Arbitrum], scrape) + return NewUniswapV3Scraper(Exchanges[dia.UniswapExchangeV3Arbitrum], scrape, relDB) case dia.HuckleberryExchange: - return NewUniswapScraper(Exchanges[dia.HuckleberryExchange], scrape) + return NewUniswapScraper(Exchanges[dia.HuckleberryExchange], scrape, relDB) case dia.TraderJoeExchange: - return NewUniswapScraper(Exchanges[dia.TraderJoeExchange], scrape) + return NewUniswapScraper(Exchanges[dia.TraderJoeExchange], scrape, relDB) case dia.PangolinExchange: - return NewUniswapScraper(Exchanges[dia.PangolinExchange], scrape) + return NewUniswapScraper(Exchanges[dia.PangolinExchange], scrape, relDB) case dia.PlatypusExchange: return NewPlatypusScraper(Exchanges[dia.PlatypusExchange], scrape) case dia.SpookyswapExchange: - return NewUniswapScraper(Exchanges[dia.SpookyswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.SpookyswapExchange], scrape, relDB) case dia.QuickswapExchange: - return NewUniswapScraper(Exchanges[dia.QuickswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.QuickswapExchange], scrape, relDB) case dia.SpiritswapExchange: - return NewUniswapScraper(Exchanges[dia.SpiritswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.SpiritswapExchange], scrape, relDB) case dia.SolarbeamExchange: - return NewUniswapScraper(Exchanges[dia.SolarbeamExchange], scrape) + return NewUniswapScraper(Exchanges[dia.SolarbeamExchange], scrape, relDB) case dia.TrisolarisExchange: - return NewUniswapScraper(Exchanges[dia.TrisolarisExchange], scrape) + return NewUniswapScraper(Exchanges[dia.TrisolarisExchange], scrape, relDB) case dia.ByBitExchange: return NewByBitScraper(Exchanges[dia.ByBitExchange], scrape, relDB) case dia.OrcaExchange: @@ -220,27 +220,27 @@ func NewAPIScraper(exchange string, scrape bool, key string, secret string, relD case dia.AnyswapExchange: return NewAnyswapScraper(Exchanges[dia.AnyswapExchange], scrape, relDB) case dia.NetswapExchange: - return NewUniswapScraper(Exchanges[dia.NetswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.NetswapExchange], scrape, relDB) case dia.BitMexExchange: return NewBitMexScraper(Exchanges[dia.BitMexExchange], scrape, relDB) case dia.TethysExchange: - return NewUniswapScraper(Exchanges[dia.TethysExchange], scrape) + return NewUniswapScraper(Exchanges[dia.TethysExchange], scrape, relDB) case dia.HermesExchange: - return NewUniswapScraper(Exchanges[dia.HermesExchange], scrape) + return NewUniswapScraper(Exchanges[dia.HermesExchange], scrape, relDB) case dia.OmniDexExchange: - return NewUniswapScraper(Exchanges[dia.OmniDexExchange], scrape) + return NewUniswapScraper(Exchanges[dia.OmniDexExchange], scrape, relDB) case dia.DiffusionExchange: - return NewUniswapScraper(Exchanges[dia.DiffusionExchange], scrape) + return NewUniswapScraper(Exchanges[dia.DiffusionExchange], scrape, relDB) case dia.ApeswapExchange: - return NewUniswapScraper(Exchanges[dia.ApeswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.ApeswapExchange], scrape, relDB) case dia.BiswapExchange: - return NewUniswapScraper(Exchanges[dia.BiswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.BiswapExchange], scrape, relDB) case dia.ArthswapExchange: - return NewUniswapScraper(Exchanges[dia.ArthswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.ArthswapExchange], scrape, relDB) case dia.StellaswapExchange: - return NewUniswapScraper(Exchanges[dia.StellaswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.StellaswapExchange], scrape, relDB) case dia.WanswapExchange: - return NewUniswapScraper(Exchanges[dia.WanswapExchange], scrape) + return NewUniswapScraper(Exchanges[dia.WanswapExchange], scrape, relDB) case dia.OsmosisExchange: return NewOsmosisScraper(Exchanges[dia.OsmosisExchange], scrape, relDB) case dia.ZenlinkswapExchange: diff --git a/pkg/dia/scraper/exchange-scrapers/BalancerV2Scraper.go b/pkg/dia/scraper/exchange-scrapers/BalancerV2Scraper.go index d297b5c6a..4d9ae7fb8 100644 --- a/pkg/dia/scraper/exchange-scrapers/BalancerV2Scraper.go +++ b/pkg/dia/scraper/exchange-scrapers/BalancerV2Scraper.go @@ -5,12 +5,10 @@ import ( "fmt" "math" "math/big" - "sort" + "strconv" "sync" "time" - "golang.org/x/sync/errgroup" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" @@ -22,6 +20,7 @@ import ( "github.com/diadata-org/diadata/pkg/dia" "github.com/diadata-org/diadata/pkg/dia/helpers/ethhelper" + models "github.com/diadata-org/diadata/pkg/model" ) const ( @@ -50,9 +49,10 @@ type BalancerV2Swap struct { // BalancerV2Scraper is a scraper for Balancer V2 type BalancerV2Scraper struct { - rest *ethclient.Client - ws *ethclient.Client - rl ratelimit.Limiter + rest *ethclient.Client + ws *ethclient.Client + rl ratelimit.Limiter + relDB *models.RelDB // signaling channels for session initialization and finishing shutdown chan nothing @@ -72,21 +72,27 @@ type BalancerV2Scraper struct { exchangeName string chanTrades chan *dia.Trade - tokensMap map[string]dia.Asset - cachedAssets sync.Map // map[string]dia.Asset + tokensMap map[string]dia.Asset + // @poolsMap maps Balancer internal pool id onto its EVM address. + poolsMap map[[32]byte]common.Address + // @admissiblePools contains all pools which are admissible w.r.t. liquidity conditions. + admissiblePools map[common.Address]struct{} + cachedAssets sync.Map // map[string]dia.Asset } // NewBalancerV2Scraper returns a Balancer V2 scraper -func NewBalancerV2Scraper(exchange dia.Exchange, scrape bool) *BalancerV2Scraper { +func NewBalancerV2Scraper(exchange dia.Exchange, scrape bool, relDB *models.RelDB) *BalancerV2Scraper { balancerV2VaultContract = exchange.Contract scraper := &BalancerV2Scraper{ - exchangeName: exchange.Name, - err: nil, - shutdown: make(chan nothing), - shutdownDone: make(chan nothing), - pairScrapers: make(map[string]*BalancerV2PairScraper), - chanTrades: make(chan *dia.Trade), - tokensMap: make(map[string]dia.Asset), + exchangeName: exchange.Name, + err: nil, + shutdown: make(chan nothing), + shutdownDone: make(chan nothing), + pairScrapers: make(map[string]*BalancerV2PairScraper), + chanTrades: make(chan *dia.Trade), + tokensMap: make(map[string]dia.Asset), + poolsMap: make(map[[32]byte]common.Address), + admissiblePools: make(map[common.Address]struct{}), } switch exchange.Name { @@ -114,10 +120,33 @@ func NewBalancerV2Scraper(exchange dia.Exchange, scrape bool) *BalancerV2Scraper return nil } + // Only include pools with (minimum) liquidity bigger than given env var. + liquidityThreshold, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD", "0"), 64) + if err != nil { + liquidityThreshold = float64(0) + log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThreshold) + } + // Only include pools with (minimum) liquidity USD value bigger than given env var. + liquidityThresholdUSD, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD_USD", "0"), 64) + if err != nil { + liquidityThresholdUSD = float64(0) + log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThresholdUSD) + } + + scraper.relDB = relDB scraper.ws = ws scraper.rest = rest scraper.rl = ratelimit.New(balancerV2RateLimitPerSec) + err = scraper.loadPoolMap() + if err != nil { + log.Fatal("Load pool map: ", err) + } + log.Info("... pool map loaded.") + log.Info("Fetch admissible pools...") + scraper.fetchAdmissiblePools(liquidityThreshold, liquidityThresholdUSD) + log.Info("...admissible pools loaded.") + if scrape { go scraper.mainLoop() } @@ -148,6 +177,11 @@ func (s *BalancerV2Scraper) mainLoop() { log.Fatalf("%s: Cannot create vault filter, err=%s", s.exchangeName, err.Error()) } + balancerVaultCaller, err := balancervault.NewBalancerVaultCaller(common.HexToAddress(balancerV2VaultContract), s.rest) + if err != nil { + log.Error("balancer vault caller: ", err) + } + currBlock, err := s.rest.BlockNumber(context.Background()) if err != nil { s.setError(err) @@ -171,6 +205,20 @@ func (s *BalancerV2Scraper) mainLoop() { s.setError(err) log.Errorf("BalancerV2Scraper: Subscription error, err=%s", err.Error()) case event := <-sink: + + // Fetch pool address in order to check admissibility of trade. + poolAddress, ok := s.poolsMap[event.PoolId] + if !ok { + poolAddress, _, err = balancerVaultCaller.GetPool(&bind.CallOpts{}, event.PoolId) + if err != nil { + log.Error("get pool: ", err) + } + } + if _, ok = s.admissiblePools[poolAddress]; !ok { + log.Warnf("pool %s not admissible, skip trade.", poolAddress) + continue + } + assetIn, ok := s.tokensMap[event.TokenIn.Hex()] if !ok { asset, err := s.assetFromToken(event.TokenIn) @@ -255,6 +303,52 @@ func (s *BalancerV2Scraper) mainLoop() { } } +// fetchAdmissiblePools fetches all pools from postgres with native liquidity > liquidityThreshold and +// (if available) liquidity in USD > liquidityThresholdUSD. +func (s *BalancerV2Scraper) fetchAdmissiblePools(liquidityThreshold float64, liquidityThresholdUSD float64) { + poolsPreselection, err := s.relDB.GetAllPoolsExchange(s.exchangeName, liquidityThreshold) + if err != nil { + log.Error("fetch all admissible pools: ", err) + } + log.Infof("Found %v pools after preselection.", len(poolsPreselection)) + + for _, pool := range poolsPreselection { + liquidity, lowerBound := pool.GetPoolLiquidityUSD() + // Discard pool if complete USD liquidity is below threshold. + if !lowerBound && liquidity < liquidityThresholdUSD { + continue + } else { + s.admissiblePools[common.HexToAddress(pool.Address)] = struct{}{} + } + } + log.Infof("Found %v pools after USD liquidity filtering.", len(s.admissiblePools)) +} + +func (s *BalancerV2Scraper) loadPoolMap() error { + t0 := time.Now() + events, err := s.allRegisteredPools() + if err != nil { + return err + } + t1 := time.Now() + log.Info("time spent for loading all pools: ", t1.Sub(t0)) + + caller, err := balancervault.NewBalancerVaultCaller(common.HexToAddress(balancerV2VaultContract), s.rest) + if err != nil { + return err + } + + for _, evt := range events { + pool, _, err := caller.GetPool(&bind.CallOpts{}, evt.PoolId) + if err != nil { + log.Error("get pool tokens: ", err) + return err + } + s.poolsMap[evt.PoolId] = pool + } + return nil +} + // Close unsubscribes data and closes any existing WebSocket connections, as well as channels of BalancerV2Scraper func (s *BalancerV2Scraper) Close() error { if s.isClosed() { @@ -295,31 +389,7 @@ func (s *BalancerV2Scraper) ScrapePair(pair dia.ExchangePair) (PairScraper, erro } func (s *BalancerV2Scraper) FetchAvailablePairs() (pairs []dia.ExchangePair, err error) { - pools, err := s.listPools() - if err != nil { - log.Warn("list pools: ", err) - // return nil, err - } - - log.Infof("%s: Total pools are %v", s.exchangeName, len(pools)) - - pp, err := s.listPairs(pools) - if err != nil { - return nil, err - } - - existingPair := make(map[string]struct{}) - for _, p := range pp { - quoteAddr := p.UnderlyingPair.QuoteToken.Address - baseAddr := p.UnderlyingPair.BaseToken.Address - if _, ok := existingPair[baseAddr+":"+quoteAddr]; !ok { - pairs = append(pairs, p) - existingPair[baseAddr+":"+quoteAddr] = struct{}{} - } - } - - log.Infof("%s: Total pairs are %v", s.exchangeName, len(pairs)) - + // Not available for DEXes. return } @@ -341,123 +411,6 @@ func (s *BalancerV2Scraper) assetFromToken(token common.Address) (dia.Asset, err return asset, nil } -func (s *BalancerV2Scraper) makePair(token0, token1 common.Address) (dia.ExchangePair, error) { - asset0, err := s.assetFromToken(token0) - if err != nil { - return dia.ExchangePair{}, err - } - asset1, err := s.assetFromToken(token1) - if err != nil { - return dia.ExchangePair{}, err - } - - var pair dia.ExchangePair - pair.UnderlyingPair.QuoteToken = asset0 - pair.UnderlyingPair.BaseToken = asset1 - pair.ForeignName = asset0.Symbol + "-" + asset1.Symbol - pair.Verified = true - pair.Exchange = s.exchangeName - pair.Symbol = asset0.Symbol - - return pair, nil -} - -func (s *BalancerV2Scraper) listPairs(pools [][]common.Address) (pairs []dia.ExchangePair, err error) { - pairCount := 0 - pairMap := make(map[int]dia.ExchangePair) - var g errgroup.Group - var mu sync.Mutex - for _, tokens := range pools { - if len(tokens) < 2 { - continue - } - - for i := 0; i < len(tokens); i++ { - for j := i + 1; j < len(tokens); j++ { - pairCount++ - i := i - j := j - pairCount := pairCount - tokens := tokens - g.Go(func() error { - s.rl.Take() - pair, err := s.makePair(tokens[i], tokens[j]) - if err != nil { - log.Warn(err) - - return nil - } - - mu.Lock() - defer mu.Unlock() - - pairMap[pairCount] = pair - - return nil - }) - } - } - } - - if err := g.Wait(); err != nil { - return nil, err - } - - keys := make([]int, 0, len(pairMap)) - for k := range pairMap { - keys = append(keys, k) - } - - sort.Ints(keys) - - for _, k := range keys { - pairs = append(pairs, pairMap[k]) - } - - return -} - -func (s *BalancerV2Scraper) listPools() ([][]common.Address, error) { - events, err := s.allRegisteredPools() - if err != nil { - return nil, err - } - - caller, err := balancervault.NewBalancerVaultCaller(common.HexToAddress(balancerV2VaultContract), s.rest) - if err != nil { - return nil, err - } - - var g errgroup.Group - var mu sync.Mutex - pools := make([][]common.Address, len(events)) - for idx, evt := range events { - idx := idx - evt := evt - g.Go(func() error { - s.rl.Take() - pool, err := caller.GetPoolTokens(&bind.CallOpts{}, evt.PoolId) - if err != nil { - log.Warn("get pool tokens: ", err) - return err - } - - mu.Lock() - defer mu.Unlock() - - pools[idx] = pool.Tokens - - return nil - }) - } - - if err := g.Wait(); err != nil { - return nil, err - } - - return pools, nil -} - func (s *BalancerV2Scraper) allRegisteredPools() ([]*balancervault.BalancerVaultPoolRegistered, error) { filterer, err := balancervault.NewBalancerVaultFilterer(common.HexToAddress(balancerV2VaultContract), s.rest) if err != nil { diff --git a/pkg/dia/scraper/exchange-scrapers/CurvefiScraper.go b/pkg/dia/scraper/exchange-scrapers/CurvefiScraper.go index 2485ac5de..d2b5ebe94 100644 --- a/pkg/dia/scraper/exchange-scrapers/CurvefiScraper.go +++ b/pkg/dia/scraper/exchange-scrapers/CurvefiScraper.go @@ -122,7 +122,7 @@ type CurveFIScraper struct { } // makeCurvefiScraper returns a curve finance scraper as used in NewCurvefiScraper. -func makeCurvefiScraper(exchange dia.Exchange, restDial string, wsDial string) *CurveFIScraper { +func makeCurvefiScraper(exchange dia.Exchange, restDial string, wsDial string, relDB *models.RelDB) *CurveFIScraper { var ( restClient, wsClient *ethclient.Client err error @@ -143,6 +143,7 @@ func makeCurvefiScraper(exchange dia.Exchange, restDial string, wsDial string) * exchangeName: exchange.Name, RestClient: restClient, WsClient: wsClient, + relDB: relDB, initDone: make(chan nothing), shutdown: make(chan nothing), shutdownDone: make(chan nothing), @@ -156,10 +157,6 @@ func makeCurvefiScraper(exchange dia.Exchange, restDial string, wsDial string) * }, } - scraper.relDB, err = models.NewPostgresDataStore() - if err != nil { - log.Fatal("new postgres datastore: ", err) - } // Only include pools with (minimum) liquidity bigger than given env var. liquidityThreshold, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD", "0"), 64) if err != nil { @@ -167,41 +164,49 @@ func makeCurvefiScraper(exchange dia.Exchange, restDial string, wsDial string) * log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThreshold) } - scraper.loadPools(liquidityThreshold) + // Only include pools with (minimum) liquidity USD value bigger than given env var. + liquidityThresholdUSD, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD_USD", "0"), 64) + if err != nil { + liquidityThresholdUSD = float64(0) + log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThresholdUSD) + } + + scraper.loadPools(liquidityThreshold, liquidityThresholdUSD) + return scraper } -func NewCurveFIScraper(exchange dia.Exchange, scrape bool) *CurveFIScraper { +func NewCurveFIScraper(exchange dia.Exchange, scrape bool, relDB *models.RelDB) *CurveFIScraper { var scraper *CurveFIScraper switch exchange.Name { case dia.CurveFIExchange: - scraper = makeCurvefiScraper(exchange, curveRestDialEth, curveWsDialEth) + scraper = makeCurvefiScraper(exchange, curveRestDialEth, curveWsDialEth, relDB) basePoolRegistry := curveRegistry{Type: 1, Address: common.HexToAddress("0x90E00ACe148ca3b23Ac1bC8C240C2a7Dd9c2d7f5")} metaPoolRegistry := curveRegistry{Type: 2, Address: common.HexToAddress("0xB9fC157394Af804a3578134A6585C0dc9cc990d4")} scraper.registriesUnderlying = []curveRegistry{metaPoolRegistry, basePoolRegistry} scraper.screenPools = true case dia.CurveFIExchangeFantom: - scraper = makeCurvefiScraper(exchange, curveRestDialFantom, curveWsDialFantom) + scraper = makeCurvefiScraper(exchange, curveRestDialFantom, curveWsDialFantom, relDB) stableSwapFactory := curveRegistry{Type: 2, Address: common.HexToAddress("0x686d67265703d1f124c45e33d47d794c566889ba")} scraper.registriesUnderlying = []curveRegistry{stableSwapFactory} scraper.screenPools = false case dia.CurveFIExchangeMoonbeam: - scraper = makeCurvefiScraper(exchange, curveRestDialMoonbeam, curveWsDialMoonbeam) + scraper = makeCurvefiScraper(exchange, curveRestDialMoonbeam, curveWsDialMoonbeam, relDB) stableSwapFactory := curveRegistry{Type: 2, Address: common.HexToAddress("0x4244eB811D6e0Ef302326675207A95113dB4E1F8")} scraper.registriesUnderlying = []curveRegistry{stableSwapFactory} scraper.screenPools = false case dia.CurveFIExchangePolygon: - scraper = makeCurvefiScraper(exchange, curveRestDialPolygon, curveWsDialPolygon) + scraper = makeCurvefiScraper(exchange, curveRestDialPolygon, curveWsDialPolygon, relDB) stableSwapFactory := curveRegistry{Type: 2, Address: common.HexToAddress("0x722272D36ef0Da72FF51c5A65Db7b870E2e8D4ee")} scraper.registriesUnderlying = []curveRegistry{stableSwapFactory} scraper.screenPools = false case dia.CurveFIExchangeArbitrum: - scraper = makeCurvefiScraper(exchange, curveRestDialArbitrum, curveWsDialArbitrum) + scraper = makeCurvefiScraper(exchange, curveRestDialArbitrum, curveWsDialArbitrum, relDB) stableSwapFactory := curveRegistry{Type: 2, Address: common.HexToAddress("0xb17b674D9c5CB2e441F8e196a2f048A81355d031")} scraper.registriesUnderlying = []curveRegistry{stableSwapFactory} } @@ -514,15 +519,26 @@ func (scraper *CurveFIScraper) getSwapDataCurve(pool string, swp interface{}) ( return } -func (scraper *CurveFIScraper) loadPools(liquidityThreshold float64) { +func (scraper *CurveFIScraper) loadPools(liquidityThreshold float64, liquidityThresholdUSD float64) { pools, err := scraper.relDB.GetAllPoolsExchange(scraper.exchangeName, liquidityThreshold) if err != nil { log.Fatal("fetch pools: ", err) } log.Infof("found %v pools to subscribe: ", len(pools)) - + lowerBoundCount := 0 for _, pool := range pools { + + liquidity, lowerBound := pool.GetPoolLiquidityUSD() + // Discard pool if complete USD liquidity is below threshold. + if !lowerBound && liquidity < liquidityThresholdUSD { + continue + } + if lowerBound { + log.Info("lowerBound pool: ", pool.Address) + lowerBoundCount++ + } + var poolCoinsMap = make(map[int]*CurveCoin) for _, av := range pool.Assetvolumes { poolCoinsMap[int(av.Index)] = &CurveCoin{ @@ -538,6 +554,8 @@ func (scraper *CurveFIScraper) loadPools(liquidityThreshold float64) { } scraper.pools.setPool(pool.Address, poolCoinsMap) } + log.Infof("Total number of %v pools to subscribe.", len(scraper.pools.pools)) + log.Infof("Number of lower bound pools: %v", lowerBoundCount) } func (scraper *CurveFIScraper) FetchAvailablePairs() (pairs []dia.ExchangePair, err error) { diff --git a/pkg/dia/scraper/exchange-scrapers/UniswapV2Scraper.go b/pkg/dia/scraper/exchange-scrapers/UniswapV2Scraper.go index a6fcbec95..109c962ad 100644 --- a/pkg/dia/scraper/exchange-scrapers/UniswapV2Scraper.go +++ b/pkg/dia/scraper/exchange-scrapers/UniswapV2Scraper.go @@ -148,7 +148,7 @@ type UniswapScraper struct { } // NewUniswapScraper returns a new UniswapScraper for the given pair -func NewUniswapScraper(exchange dia.Exchange, scrape bool) *UniswapScraper { +func NewUniswapScraper(exchange dia.Exchange, scrape bool, relDB *models.RelDB) *UniswapScraper { log.Info("NewUniswapScraper: ", exchange.Name) var ( s *UniswapScraper @@ -225,10 +225,7 @@ func NewUniswapScraper(exchange dia.Exchange, scrape bool) *UniswapScraper { s = makeUniswapScraper(exchange, listenByAddress, fetchPoolsFromDB, restDialWanchain, wsDialWanchain, wanchainWaitMilliseconds) } - s.relDB, err = models.NewPostgresDataStore() - if err != nil { - log.Fatal("new postgres datastore: ", err) - } + s.relDB = relDB // Only include pools with (minimum) liquidity bigger than given env var. liquidityThreshold, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD", "0"), 64) @@ -236,9 +233,15 @@ func NewUniswapScraper(exchange dia.Exchange, scrape bool) *UniswapScraper { liquidityThreshold = float64(0) log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThreshold) } + // Only include pools with (minimum) liquidity USD value bigger than given env var. + liquidityThresholdUSD, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD_USD", "0"), 64) + if err != nil { + liquidityThresholdUSD = float64(0) + log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThresholdUSD) + } // Fetch all pool with given liquidity threshold from database. - poolMap, err = s.makeUniPoolMap(liquidityThreshold) + poolMap, err = s.makeUniPoolMap(liquidityThreshold, liquidityThresholdUSD) if err != nil { log.Fatal("build poolMap: ", err) } @@ -928,7 +931,7 @@ func (ps *UniswapPairScraper) Pair() dia.ExchangePair { // makeUniPoolMap returns a map with pool addresses as keys and the underlying UniswapPair as values. // If s.listenByAddress is true, it only loads the corresponding assets from the list. -func (s *UniswapScraper) makeUniPoolMap(liquiThreshold float64) (map[string]UniswapPair, error) { +func (s *UniswapScraper) makeUniPoolMap(liquiThreshold float64, liquidityThresholdUSD float64) (map[string]UniswapPair, error) { pm := make(map[string]UniswapPair) var ( pools []dia.Pool @@ -954,7 +957,6 @@ func (s *UniswapScraper) makeUniPoolMap(liquiThreshold float64) (map[string]Unis if err != nil { return pm, err } - } else { // Pool info will be fetched from on-chain and poolMap is not needed. return pm, nil @@ -962,11 +964,22 @@ func (s *UniswapScraper) makeUniPoolMap(liquiThreshold float64) (map[string]Unis log.Info("Found ", len(pools), " pools.") log.Info("make pool map...") + lowerBoundCount := 0 for _, pool := range pools { if len(pool.Assetvolumes) != 2 { log.Warn("not enough assets in pool with address: ", pool.Address) continue } + + liquidity, lowerBound := pool.GetPoolLiquidityUSD() + // Discard pool if complete USD liquidity is below threshold. + if !lowerBound && liquidity < liquidityThresholdUSD { + continue + } + if lowerBound { + lowerBoundCount++ + } + up := UniswapPair{ Address: common.HexToAddress(pool.Address), } @@ -982,5 +995,6 @@ func (s *UniswapScraper) makeUniPoolMap(liquiThreshold float64) (map[string]Unis } log.Infof("found %v subscribable pools.", len(pm)) + log.Infof("%v pools with lowerBound=true.", lowerBoundCount) return pm, err } diff --git a/pkg/dia/scraper/exchange-scrapers/UniswapV3Scraper.go b/pkg/dia/scraper/exchange-scrapers/UniswapV3Scraper.go index bda2b88e8..8d122785c 100644 --- a/pkg/dia/scraper/exchange-scrapers/UniswapV3Scraper.go +++ b/pkg/dia/scraper/exchange-scrapers/UniswapV3Scraper.go @@ -58,7 +58,7 @@ type UniswapV3Scraper struct { } // NewUniswapV3Scraper returns a new UniswapV3Scraper -func NewUniswapV3Scraper(exchange dia.Exchange, scrape bool) *UniswapV3Scraper { +func NewUniswapV3Scraper(exchange dia.Exchange, scrape bool, relDB *models.RelDB) *UniswapV3Scraper { log.Info("NewUniswapScraper ", exchange.Name) log.Info("NewUniswapScraper Address ", exchange.Contract) @@ -76,10 +76,7 @@ func NewUniswapV3Scraper(exchange dia.Exchange, scrape bool) *UniswapV3Scraper { s = makeUniswapV3Scraper(exchange, false, "", "", "200", uint64(165)) } - s.relDB, err = models.NewPostgresDataStore() - if err != nil { - log.Fatal("new postgres datastore: ", err) - } + s.relDB = relDB // Only include pools with (minimum) liquidity bigger than given env var. liquidityThreshold, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD", "0"), 64) @@ -87,8 +84,14 @@ func NewUniswapV3Scraper(exchange dia.Exchange, scrape bool) *UniswapV3Scraper { liquidityThreshold = float64(0) log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThreshold) } + // Only include pools with (minimum) liquidity USD value bigger than given env var. + liquidityThresholdUSD, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD_USD", "0"), 64) + if err != nil { + liquidityThresholdUSD = float64(0) + log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThresholdUSD) + } - poolMap, err = s.makeUniV3PoolMap(liquidityThreshold) + poolMap, err = s.makeUniV3PoolMap(liquidityThreshold, liquidityThresholdUSD) if err != nil { log.Fatal("build poolMap: ", err) } @@ -472,7 +475,7 @@ func (ps *UniswapPairV3Scraper) Pair() dia.ExchangePair { } // makeUniPoolMap returns a map with pool addresses as keys and the underlying UniswapPair as values. -func (s *UniswapV3Scraper) makeUniV3PoolMap(liquiThreshold float64) (map[string]UniswapPair, error) { +func (s *UniswapV3Scraper) makeUniV3PoolMap(liquiThreshold float64, liquidityThresholdUSD float64) (map[string]UniswapPair, error) { pm := make(map[string]UniswapPair) // Load all pools above liqui threshold. @@ -483,10 +486,20 @@ func (s *UniswapV3Scraper) makeUniV3PoolMap(liquiThreshold float64) (map[string] log.Info("Found ", len(pools), " pools.") log.Info("make pool map...") + lowerBoundCount := 0 for _, pool := range pools { if len(pool.Assetvolumes) != 2 { continue } + liquidity, lowerBound := pool.GetPoolLiquidityUSD() + // Discard pool if complete USD liquidity is below threshold. + if !lowerBound && liquidity < liquidityThresholdUSD { + continue + } + if lowerBound { + lowerBoundCount++ + } + up := UniswapPair{ Address: common.HexToAddress(pool.Address), } @@ -502,5 +515,6 @@ func (s *UniswapV3Scraper) makeUniV3PoolMap(liquiThreshold float64) (map[string] } log.Infof("found %v subscribable pools.", len(pm)) + log.Infof("%v pools with lowerBound=true.", lowerBoundCount) return pm, err } From e91733880bbe11d12989ef14ee59fbdf4de4b793 Mon Sep 17 00:00:00 2001 From: jppade Date: Wed, 5 Jul 2023 15:24:29 +0200 Subject: [PATCH 2/2] add liquidity filter for pools in platypus dex scraper. --- .../scraper/exchange-scrapers/APIScraper.go | 2 +- .../exchange-scrapers/PlatypusScraper.go | 50 +++++++++++++++++-- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/pkg/dia/scraper/exchange-scrapers/APIScraper.go b/pkg/dia/scraper/exchange-scrapers/APIScraper.go index b9446c689..485124451 100644 --- a/pkg/dia/scraper/exchange-scrapers/APIScraper.go +++ b/pkg/dia/scraper/exchange-scrapers/APIScraper.go @@ -202,7 +202,7 @@ func NewAPIScraper(exchange string, scrape bool, key string, secret string, relD case dia.PangolinExchange: return NewUniswapScraper(Exchanges[dia.PangolinExchange], scrape, relDB) case dia.PlatypusExchange: - return NewPlatypusScraper(Exchanges[dia.PlatypusExchange], scrape) + return NewPlatypusScraper(Exchanges[dia.PlatypusExchange], scrape, relDB) case dia.SpookyswapExchange: return NewUniswapScraper(Exchanges[dia.SpookyswapExchange], scrape, relDB) case dia.QuickswapExchange: diff --git a/pkg/dia/scraper/exchange-scrapers/PlatypusScraper.go b/pkg/dia/scraper/exchange-scrapers/PlatypusScraper.go index 25a7aaee6..2864ead2a 100644 --- a/pkg/dia/scraper/exchange-scrapers/PlatypusScraper.go +++ b/pkg/dia/scraper/exchange-scrapers/PlatypusScraper.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "math/big" + "strconv" "strings" "sync" "time" @@ -14,6 +15,7 @@ import ( platypusAssetABI "github.com/diadata-org/diadata/pkg/dia/scraper/exchange-scrapers/platypusfinance/asset" platypusPoolABI "github.com/diadata-org/diadata/pkg/dia/scraper/exchange-scrapers/platypusfinance/pool" platypusTokenABI "github.com/diadata-org/diadata/pkg/dia/scraper/exchange-scrapers/platypusfinance/token" + models "github.com/diadata-org/diadata/pkg/model" "github.com/diadata-org/diadata/pkg/utils" "github.com/diadata-org/diadata/pkg/dia" @@ -91,6 +93,7 @@ type PlatypusScraper struct { WsClient *ethclient.Client RestClient *ethclient.Client + relDB *models.RelDB platypusCoins map[string]*PlatypusCoin pools *PlatypusPools screenPools bool @@ -98,7 +101,7 @@ type PlatypusScraper struct { } // NewPlatypusScraper Returns a new exchange scraper -func NewPlatypusScraper(exchange dia.Exchange, scrape bool) *PlatypusScraper { +func NewPlatypusScraper(exchange dia.Exchange, scrape bool, relDB *models.RelDB) *PlatypusScraper { registries := []platypusRegistry{ {Version: 3, Address: common.HexToAddress(platypusMasterRegV3Addr)}, @@ -116,6 +119,19 @@ func NewPlatypusScraper(exchange dia.Exchange, scrape bool) *PlatypusScraper { log.Fatal("init ws client: ", err) } + // Only include pools with (minimum) liquidity bigger than given env var. + liquidityThreshold, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD", "0"), 64) + if err != nil { + liquidityThreshold = float64(0) + log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThreshold) + } + // Only include pools with (minimum) liquidity USD value bigger than given env var. + liquidityThresholdUSD, err := strconv.ParseFloat(utils.Getenv("LIQUIDITY_THRESHOLD_USD", "0"), 64) + if err != nil { + liquidityThresholdUSD = float64(0) + log.Warnf("parse liquidity threshold: %v. Set to default %v", err, liquidityThresholdUSD) + } + scraper := &PlatypusScraper{ exchangeName: exchange.Name, RestClient: restClient, @@ -132,9 +148,10 @@ func NewPlatypusScraper(exchange dia.Exchange, scrape bool) *PlatypusScraper { }, } + scraper.relDB = relDB // Load metadata from master registries for _, registry := range registries { - err := scraper.loadPoolsAndCoins(registry) + err := scraper.loadPoolsAndCoins(registry, liquidityThreshold, liquidityThresholdUSD) if err != nil { log.Errorf("loadPoolsAndCoins error w %s registry (v%d): %s", registry.Address.Hex(), registry.Version, err) } @@ -218,7 +235,7 @@ func (ps *PlatypusPairScraper) Pair() dia.ExchangePair { } // Load pools and coins metadata from master registry -func (s *PlatypusScraper) loadPoolsAndCoins(registry platypusRegistry) (err error) { +func (s *PlatypusScraper) loadPoolsAndCoins(registry platypusRegistry, liquidityThreshold float64, liquidityThresholdUSD float64) (err error) { log.Infof("loading master contract %s version %d and querying registry", registry.Address.Hex(), registry.Version) contractMaster, err := platypusfinance.NewBaseMasterPlatypusCaller(registry.Address, s.RestClient) if err != nil { @@ -232,12 +249,38 @@ func (s *PlatypusScraper) loadPoolsAndCoins(registry platypusRegistry) (err erro return err } + lowerBoundCount := 0 for i := 0; i < int(poolCount.Int64()); i++ { asset, errPoolInfo := contractMaster.PoolInfo(&bind.CallOpts{}, big.NewInt(int64(i))) if errPoolInfo != nil { log.Error("PoolInfo: ", errPoolInfo) return err } + pool, errPool := s.relDB.GetPoolByAddress(dia.ETHEREUM, asset.LpToken.Hex()) + if errPool != nil { + log.Errorf("Get pool %s by address: %v", asset.LpToken.Hex(), errPool) + } + + lowLiqui := false + for _, av := range pool.Assetvolumes { + if av.Volume < liquidityThreshold { + log.Warnf("low liquidity on %s: %v", pool.Address, av.Volume) + lowLiqui = true + break + } + } + if lowLiqui { + continue + } + + liquidity, lowerBound := pool.GetPoolLiquidityUSD() + // Discard pool if complete USD liquidity is below threshold. + if !lowerBound && liquidity < liquidityThresholdUSD { + continue + } + if lowerBound { + lowerBoundCount++ + } errPoolData := s.loadPoolData(asset.LpToken.Hex()) if errPoolData != nil { @@ -245,6 +288,7 @@ func (s *PlatypusScraper) loadPoolsAndCoins(registry platypusRegistry) (err erro return errPoolData } } + log.Info("lowerBound: ", lowerBoundCount) return err }