diff --git a/build/Dockerfile-genericCollector-test b/build/Dockerfile-genericCollector-test new file mode 100644 index 000000000..0ac15d8f9 --- /dev/null +++ b/build/Dockerfile-genericCollector-test @@ -0,0 +1,14 @@ +ARG exchange +FROM us.icr.io/dia-registry/devops/build-117-test:latest as build + +WORKDIR $GOPATH/src/ + +COPY ./cmd/exchange-scrapers/collector ./ +RUN go mod tidy -go=1.16 && go mod tidy -go=1.17 && go install + +FROM gcr.io/distroless/base + +COPY --from=build /go/bin/collector /bin/collector +COPY --from=build /config/ /config/ + +CMD ["collector"] diff --git a/cmd/exchange-scrapers/collector/collector.go b/cmd/exchange-scrapers/collector/collector.go index 751b7910d..cb2a812d7 100644 --- a/cmd/exchange-scrapers/collector/collector.go +++ b/cmd/exchange-scrapers/collector/collector.go @@ -2,7 +2,6 @@ package main import ( "flag" - "fmt" "sync" "time" @@ -67,13 +66,13 @@ func init() { if !isValidExchange(*exchange) { log.Fatal("Invalid exchange string: ", *exchange) } - replicaKafkaTopic = utils.Getenv("REPLICA_KAFKA_TOPIC", "false") + } // main manages all PairScrapers and handles incoming trade information func main() { - log.Infof("start collector for %s in %s mode...", *exchange, *mode) + log.Infof("start collector for %s in test-space...", *exchange) relDB, err := models.NewRelDataStore() if err != nil { @@ -105,25 +104,9 @@ func main() { log.Warning("no config for exchange's api ", err) } es := scrapers.NewAPIScraper(*exchange, true, configApi.ApiKey, configApi.SecretKey, relDB) - - // Set up kafka writers for various modes. - var ( - w *kafka.Writer - // This topic can be used to forward trades to services other than the prod. tradesblockservice. - wReplica *kafka.Writer - wTest *kafka.Writer - ) - - switch *mode { - case "current": - w = kafkaHelper.NewWriter(kafkaHelper.TopicTrades) - wReplica = kafkaHelper.NewWriter(kafkaHelper.TopicTradesReplica) - wTest = kafkaHelper.NewWriter(kafkaHelper.TopicTradesTest) - case "estimation": - w = kafkaHelper.NewWriter(kafkaHelper.TopicTradesEstimation) - case "assetmap": - w = kafkaHelper.NewWriter(kafkaHelper.TopicTradesEstimation) - } + // Set up kafka writer. + w := kafkaHelper.NewWriter(kafkaHelper.TopicTradesTest) + log.Info("writer topic: ", w.Topic) defer func() { err := w.Close() @@ -157,19 +140,21 @@ func main() { defer wg.Wait() } - go handleTrades(es.Channel(), &wg, w, wTest, wReplica, ds, *exchange, *mode) + go handleTrades(es.Channel(), &wg, w, ds, *exchange) + } -func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, wTest *kafka.Writer, wReplica *kafka.Writer, ds *models.DB, exchange string, mode string) { +func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, ds *models.DB, exchange string) { lastTradeTime := time.Now() watchdogDelay := scrapers.Exchanges[exchange].WatchdogDelay if watchdogDelay == 0 { watchdogDelay = scrapers.ExchangeDuplicates[exchange].WatchdogDelay } - t := time.NewTicker(time.Duration(watchdogDelay) * time.Second) + tk := time.NewTicker(time.Duration(watchdogDelay) * time.Second) + for { select { - case <-t.C: + case <-tk.C: duration := time.Since(lastTradeTime) if duration > time.Duration(watchdogDelay)*time.Second { log.Error(duration) @@ -182,48 +167,15 @@ func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, wTest return } lastTradeTime = time.Now() - // Trades are sent to the tradesblockservice through a kafka channel - either - // through trades topic or historical trades topic. - if mode == "current" || mode == "historical" || mode == "estimation" { - - // Write trade to productive Kafka. - err := writeTradeToKafka(w, t) - if err != nil { - log.Error(err) - } - - if scrapers.Exchanges[t.Source].Centralized { - // Write CEX trades to test Kafka. - if mode == "current" { - err = writeTradeToKafka(wTest, t) - if err != nil { - log.Error(err) - } - } - } - if replicaKafkaTopic == "true" { - err := writeTradeToKafka(wReplica, t) - if err != nil { - log.Error(err) - } - } - - } - // Trades are just saved in influx - not sent to the tradesblockservice through a kafka channel. - if mode == "storeTrades" { - err := ds.SaveTradeInflux(t) - if err != nil { - log.Error(err) - } + // Trades are sent to the tradesblockservice through a kafka channel. + err := writeTradeToKafka(w, t) + if err != nil { + log.Error(err) } - if mode == "assetmap" { - - fmt.Println("recieved trade", t) - - } } + } } diff --git a/cmd/exchange-scrapers/collector/go.mod b/cmd/exchange-scrapers/collector/go.mod index b6bbf60a3..5abc13404 100644 --- a/cmd/exchange-scrapers/collector/go.mod +++ b/cmd/exchange-scrapers/collector/go.mod @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/exchange-scrapers/collector go 1.17 require ( - github.com/diadata-org/diadata v1.4.322 + github.com/diadata-org/diadata v1.4.323-rc-1 github.com/segmentio/kafka-go v0.4.35 github.com/sirupsen/logrus v1.9.0 ) diff --git a/cmd/services/filtersBlockService/go.mod b/cmd/services/filtersBlockService/go.mod index e70cd3400..27d7f460d 100644 --- a/cmd/services/filtersBlockService/go.mod +++ b/cmd/services/filtersBlockService/go.mod @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/services/filtersBlockService go 1.17 require ( - github.com/diadata-org/diadata v1.4.321 + github.com/diadata-org/diadata v1.4.321-rc-1 github.com/segmentio/kafka-go v0.4.35 github.com/sirupsen/logrus v1.8.1 ) diff --git a/cmd/services/tradesBlockService/go.mod b/cmd/services/tradesBlockService/go.mod index 912ec71c0..fc11fe646 100644 --- a/cmd/services/tradesBlockService/go.mod +++ b/cmd/services/tradesBlockService/go.mod @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/services/tradesBlockService go 1.17 require ( - github.com/diadata-org/diadata v1.4.321 + github.com/diadata-org/diadata v1.4.321-rc-1 github.com/segmentio/kafka-go v0.4.35 github.com/sirupsen/logrus v1.8.1 ) diff --git a/cmd/services/tradesBlockService/main.go b/cmd/services/tradesBlockService/main.go index aea724a80..3d250cbbc 100644 --- a/cmd/services/tradesBlockService/main.go +++ b/cmd/services/tradesBlockService/main.go @@ -78,7 +78,12 @@ func main() { log.Errorln("NewDataStore", err) } - service := tradesBlockService.NewTradesBlockService(s, dia.BlockSizeSeconds, *historical) + relDB, err := models.NewRelDataStore() + if err != nil { + log.Error("New relational datastore: ", err) + } + + service := tradesBlockService.NewTradesBlockService(s, relDB, dia.BlockSizeSeconds, *historical) wg := sync.WaitGroup{} go handleBlocks(service, &wg, kafkaWriter) diff --git a/config/balancer/reverse_tokens/BalancerV2Quotetoken.json b/config/balancer/reverse_tokens/BalancerV2Quotetoken.json deleted file mode 100644 index 1adbcce69..000000000 --- a/config/balancer/reverse_tokens/BalancerV2Quotetoken.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "Tokens": [ - { - "Address": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", - "Symbol": "USDC" - }, - { - "Address": "0xdAC17F958D2ee523a2206206994597C13D831ec7", - "Symbol": "USDT" - }, - { - "Address": "0x6B175474E89094C44Da98b954EedeAC495271d0F", - "Symbol": "DAI" - }, - { - "Address": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", - "Symbol": "WETH" - } - ] -} \ No newline at end of file diff --git a/config/uniswap/reverse_tokens/SushiswapQuotetoken.json b/config/uniswap/reverse_tokens/SushiswapQuotetoken.json deleted file mode 100644 index 695bab358..000000000 --- a/config/uniswap/reverse_tokens/SushiswapQuotetoken.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "Tokens": [ - { - "Address": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", - "Symbol": "USDC" - } - ] -} \ No newline at end of file diff --git a/config/uniswap/reverse_tokens/UniswapQuotetoken.json b/config/uniswap/reverse_tokens/UniswapQuotetoken.json deleted file mode 100644 index 695bab358..000000000 --- a/config/uniswap/reverse_tokens/UniswapQuotetoken.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "Tokens": [ - { - "Address": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", - "Symbol": "USDC" - } - ] -} \ No newline at end of file diff --git a/config/uniswapv3/reverse_tokens/UniswapV3Quotetoken.json b/config/uniswapv3/reverse_tokens/UniswapV3Quotetoken.json index 54b55ed97..695bab358 100644 --- a/config/uniswapv3/reverse_tokens/UniswapV3Quotetoken.json +++ b/config/uniswapv3/reverse_tokens/UniswapV3Quotetoken.json @@ -3,10 +3,6 @@ { "Address": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", "Symbol": "USDC" - }, - { - "Address": "0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599", - "Symbol": "WBTC" } ] } \ No newline at end of file diff --git a/internal/pkg/tradesBlockService/tradesBlockService.go b/internal/pkg/tradesBlockService/tradesBlockService.go index f06117966..1653a4889 100644 --- a/internal/pkg/tradesBlockService/tradesBlockService.go +++ b/internal/pkg/tradesBlockService/tradesBlockService.go @@ -21,17 +21,37 @@ type nothing struct{} func init() { log = logrus.New() - var err error batchTimeSeconds, err = strconv.Atoi(utils.Getenv("BATCH_TIME_SECONDS", "30")) if err != nil { log.Error("parse BATCH_TIME_SECONDS: ", err) } + volumeThreshold, err = strconv.ParseFloat(utils.Getenv("VOLUME_THRESHOLD", "100000"), 64) + if err != nil { + log.Error("parse env var VOLUME_THRESHOLD: ", err) + } + blueChipThreshold, err = strconv.ParseFloat(utils.Getenv("BLUECHIP_THRESHOLD", "50000000"), 64) + if err != nil { + log.Error("parse env var BLUECHIP_THRESHOLD: ", err) + } + smallX, err = strconv.ParseFloat(utils.Getenv("SMALL_X", "10"), 64) + if err != nil { + log.Error("parse env var SMALL_X: ", err) + } + normalX, err = strconv.ParseFloat(utils.Getenv("NORMAL_X", "10"), 64) + if err != nil { + log.Error("parse env var NORMAL_X: ", err) + } tradeVolumeThresholdExponent, err := strconv.ParseFloat(utils.Getenv("TRADE_VOLUME_THRESHOLD_EXPONENT", ""), 64) if err != nil { log.Error("Parse TRADE_VOLUME_THRESHOLD_EXPONENT: ", err) } tradeVolumeThreshold = math.Pow(10, -tradeVolumeThresholdExponent) + tradeVolumeThresholdUSDExponent, err := strconv.ParseFloat(utils.Getenv("TRADE_VOLUME_THRESHOLD_USD_EXPONENT", ""), 64) + if err != nil { + log.Error("Parse TRADE_VOLUME_THRESHOLD_USD_EXPONENT: ", err) + } + tradeVolumeThresholdUSD = math.Pow(10, -tradeVolumeThresholdUSDExponent) } var ( @@ -43,11 +63,32 @@ var ( "PAX": "", "BUSD": "", } - tol = float64(0.04) - log *logrus.Logger - batchTimeSeconds int - tradeVolumeThreshold float64 - checkTradesDuplicate = make(map[string]struct{}) + + // These should be loaded from postgres once we have a list. + USDT = dia.Asset{Address: "0xdAC17F958D2ee523a2206206994597C13D831ec7", Blockchain: dia.ETHEREUM} + USDC = dia.Asset{Address: "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", Blockchain: dia.ETHEREUM} + BUSD = dia.Asset{Address: "0xe9e7CEA3DedcA5984780Bafc599bD69ADd087D56", Blockchain: dia.BINANCESMARTCHAIN} + DAI = dia.Asset{Address: "0x6B175474E89094C44Da98b954EedeAC495271d0F", Blockchain: dia.ETHEREUM} + TUSD = dia.Asset{Address: "0x0000000000085d4780B73119b644AE5ecd22b376", Blockchain: dia.ETHEREUM} + stablecoinAssets = map[string]interface{}{ + USDT.Identifier(): "", + USDC.Identifier(): "", + BUSD.Identifier(): "", + DAI.Identifier(): "", + TUSD.Identifier(): "", + } + + tol = float64(0.04) + log *logrus.Logger + batchTimeSeconds int + tradeVolumeThreshold float64 + tradeVolumeThresholdUSD float64 + volumeUpdateSeconds = 60 * 10 + volumeThreshold float64 + blueChipThreshold float64 + smallX float64 + normalX float64 + checkTradesDuplicate = make(map[string]struct{}) ) type TradesBlockService struct { @@ -61,14 +102,17 @@ type TradesBlockService struct { started bool BlockDuration int64 currentBlock *dia.TradesBlock - priceCache map[dia.Asset]float64 + priceCache map[string]float64 + volumeCache map[string]float64 datastore models.Datastore + relDB models.RelDatastore historical bool writeMeasurement string batchTicker *time.Ticker + volumeTicker *time.Ticker } -func NewTradesBlockService(datastore models.Datastore, blockDuration int64, historical bool) *TradesBlockService { +func NewTradesBlockService(datastore models.Datastore, relDB models.RelDatastore, blockDuration int64, historical bool) *TradesBlockService { s := &TradesBlockService{ shutdown: make(chan nothing), shutdownDone: make(chan nothing), @@ -78,10 +122,13 @@ func NewTradesBlockService(datastore models.Datastore, blockDuration int64, hist started: false, currentBlock: nil, BlockDuration: blockDuration, - priceCache: make(map[dia.Asset]float64), + priceCache: make(map[string]float64), + volumeCache: make(map[string]float64), datastore: datastore, + relDB: relDB, historical: historical, batchTicker: time.NewTicker(time.Duration(batchTimeSeconds) * time.Second), + volumeTicker: time.NewTicker(time.Duration(volumeUpdateSeconds) * time.Second), } if historical { s.writeMeasurement = utils.Getenv("INFLUX_MEASUREMENT_WRITE", "tradesTmp") @@ -89,12 +136,30 @@ func NewTradesBlockService(datastore models.Datastore, blockDuration int64, hist log.Info("write measurement: ", s.writeMeasurement) log.Info("historical: ", s.historical) log.Info("batch ticker time: ", batchTimeSeconds) + log.Info("volume threshold: ", volumeThreshold) + log.Info("bluechip threshold: ", blueChipThreshold) + log.Info("smallX: ", smallX) + log.Info("normalX: ", normalX) + log.Info("tradeVolumeThreshold: ", tradeVolumeThreshold) + log.Info("tradeVolumeThresholdUSD: ", tradeVolumeThresholdUSD) + + s.volumeCache = s.loadVolumes() + log.Info("...done loading volumes.") + go s.mainLoop() + go s.loadVolumesLoop() return s } // runs in a goroutine until s is closed func (s *TradesBlockService) mainLoop() { + var ( + acceptCount int + acceptCountDEX int + acceptCountSwapDEX int + totalCount int + logTicker = *time.NewTicker(120 * time.Second) + ) for { select { case <-s.shutdown: @@ -102,67 +167,139 @@ func (s *TradesBlockService) mainLoop() { s.cleanup(nil) return case t := <-s.chanTrades: - s.process(*t) + + // Only take into account original order for CEX trade. + if scrapers.Exchanges[(*t).Source].Centralized { + s.process(*t) + } else { + + // tSwapped, err := dia.SwapTrade(*t) + // if err != nil { + // log.Error("swap trade: ", err) + // } + + // Collect booleans for stats. + tradeOk := s.checkTrade(*t) + // swapppedTradeOk := s.checkTrade(tSwapped) + // if tradeOk { + // acceptCountDEX++ + // } + // if swapppedTradeOk { + // acceptCountSwapDEX++ + // } + // if tradeOk || swapppedTradeOk { + // acceptCount++ + // } + // totalCount++ + + // Process (possibly) both trades. + if tradeOk { + s.process(*t) + } + // s.process(tSwapped) + } + case <-s.batchTicker.C: err := s.datastore.Flush() if err != nil { log.Error("flush influx batch: ", err) } + case <-logTicker.C: + log.Info("accepted trades DEX: ", acceptCountDEX) + log.Info("accepted swapped trades DEX: ", acceptCountSwapDEX) + log.Info("discarded trades: ", totalCount-acceptCount) + acceptCount = 0 + acceptCountDEX = 0 + acceptCountSwapDEX = 0 + totalCount = 0 + } + } +} + +// checkTrade determines whether a (DEX-)trade should be taken into account for price determination. +func (s *TradesBlockService) checkTrade(t dia.Trade) bool { + + // Discard (very) low volume trade. + if math.Abs(t.Volume) < tradeVolumeThreshold { + log.Info("low volume trade: ", t) + return false + } + + // Replace basetoken with bridged asset for pricing if necessary. + // The basetoken in the stored trade will remain unchanged. + basetoken := buildBridge(t) + + // Allow trade where basetoken is stablecoin. + if _, ok := stablecoinAssets[basetoken.Identifier()]; ok { + return true + } + + // Only take into account stablecoin trade if basetoken is stable coin as well. + if _, ok := stablecoinAssets[t.QuoteToken.Identifier()]; ok { + if _, ok := stablecoinAssets[basetoken.Identifier()]; !ok { + return false + } + } + + if baseVolume, ok := s.volumeCache[basetoken.Identifier()]; ok { + if baseVolume > blueChipThreshold { + return true + } + if quoteVolume, ok := s.volumeCache[t.QuoteToken.Identifier()]; ok { + if baseVolume < volumeThreshold { + // For small volume basetoken, quotetoken must be a small volume asset too. + return quoteVolume < smallX*baseVolume + } + // Discard trade if base volume is too small compared to quote volume. + return quoteVolume < normalX*baseVolume } + // Base asset has enough volume or quotetoken has no volume yet. + return true } + return false } func (s *TradesBlockService) process(t dia.Trade) { - var verifiedTrade bool + var ( + verifiedTrade bool + tradeOk bool + ) + + if scrapers.Exchanges[t.Source].Centralized { + tradeOk = true + } else { + tradeOk = s.checkTrade(t) + } // Price estimation can only be done for verified pairs. // Trades with unverified pairs are still saved, but not sent to the filtersBlockService. - if t.VerifiedPair && s.checkTrade(t) { + if t.VerifiedPair && tradeOk { if t.BaseToken.Address == "840" && t.BaseToken.Blockchain == dia.FIAT { // All prices are measured in US-Dollar, so just price for base token == USD t.EstimatedUSDPrice = t.Price verifiedTrade = true } else { - // Get price of base token. - var quotation *models.AssetQuotation - var price float64 - var ok bool - var err error - if !s.historical { - - // Bridge basetoken if necessary. - basetoken := buildBridge(t) - - // Get latest price from cache. - if _, ok = s.priceCache[basetoken]; ok { - price = s.priceCache[basetoken] - } else { - quotation, err = s.datastore.GetAssetQuotationCache(basetoken) - price = quotation.Price - s.priceCache[basetoken] = price - log.Infof("quotation for %s from redis cache: %v", basetoken.Symbol, price) - } - + var ( + quotation *models.AssetQuotation + price float64 + ok bool + err error + ) + + // Bridge basetoken if necessary. + basetoken := buildBridge(t) + + // Get latest price from cache. + if _, ok = s.priceCache[basetoken.Identifier()]; ok { + price = s.priceCache[basetoken.Identifier()] } else { - - // Look for historic price of base token at trade time. - if _, ok = s.priceCache[t.BaseToken]; ok { - price = s.priceCache[t.BaseToken] - } else { - price, err = s.datastore.GetAssetPriceUSD(t.BaseToken, t.Time) - s.priceCache[t.BaseToken] = price - if t.BaseToken.Address == "0x0000000000000000000000000000000000000000" { - if t.BaseToken.Blockchain == "Bitcoin" { - log.Infof("quotation for BTC from influx: %v", price) - } - if t.BaseToken.Blockchain == "Ethereum" { - log.Infof("quotation for ETH from influx: %v", price) - } - } - } - + quotation, err = s.datastore.GetAssetQuotationCache(basetoken) + price = quotation.Price + s.priceCache[basetoken.Identifier()] = price + // log.Infof("quotation for %s from redis cache: %v", basetoken.Symbol, price) } + if err != nil { log.Errorf("Can't find quotation for base token in trade %s: %v.\n Basetoken address -- blockchain: %s --- %s", t.Pair, @@ -173,8 +310,10 @@ func (s *TradesBlockService) process(t dia.Trade) { } else { if price > 0.0 { t.EstimatedUSDPrice = t.Price * price - if t.EstimatedUSDPrice > 0 { + if t.VolumeUSD() > tradeVolumeThresholdUSD { verifiedTrade = true + } else { + log.Warn("low $ volume on trade: ", t) } } } @@ -183,8 +322,8 @@ func (s *TradesBlockService) process(t dia.Trade) { // // If estimated price for stablecoin diverges too much ignore trade if _, ok := stablecoins[t.Symbol]; ok { - if math.Abs(t.EstimatedUSDPrice-1) > tol { - log.Errorf("price for stablecoin %s diverges by %v", t.Symbol, math.Abs(t.EstimatedUSDPrice-1)) + if math.Abs(t.EstimatedUSDPrice-1) > tol && t.EstimatedUSDPrice > 0 { + log.Errorf("%s on %s. price for %s diverges by %v", t.Pair, t.Source, t.Symbol, math.Abs(t.EstimatedUSDPrice-1)) verifiedTrade = false } } @@ -213,7 +352,7 @@ func (s *TradesBlockService) process(t dia.Trade) { if s.currentBlock == nil || s.currentBlock.TradesBlockData.EndTime.Before(t.Time) { if s.currentBlock != nil { s.finaliseCurrentBlock() - s.priceCache = make(map[dia.Asset]float64) + s.priceCache = make(map[string]float64) } b := &dia.TradesBlock{ @@ -251,6 +390,26 @@ func (s *TradesBlockService) process(t dia.Trade) { } } +func (s *TradesBlockService) loadVolumes() map[string]float64 { + // Clean asset volumes + volumeCache := make(map[string]float64) + endtime := time.Now() + assets, err := s.relDB.GetAssetsWithVolByBlockchain(endtime.AddDate(0, 0, -7), endtime, "") + if err != nil { + log.Error("could not load asset with volume: ", err) + } + for _, asset := range assets { + volumeCache[asset.Asset.Identifier()] = asset.Volume + } + return volumeCache +} + +func (s *TradesBlockService) loadVolumesLoop() { + for range s.volumeTicker.C { + s.volumeCache = s.loadVolumes() + } +} + func (s *TradesBlockService) finaliseCurrentBlock() { sort.Slice(s.currentBlock.TradesBlockData.Trades, func(i, j int) bool { @@ -297,25 +456,17 @@ func (s *TradesBlockService) Channel() chan *dia.TradesBlock { return s.chanTradesBlock } -func (s *TradesBlockService) checkTrade(t dia.Trade) bool { - if math.Abs(t.Volume) < tradeVolumeThreshold { - log.Info("low volume trade: ", t) - return false - } - return true -} - func buildBridge(t dia.Trade) dia.Asset { basetoken := t.BaseToken - if basetoken.Blockchain == dia.ETHEREUM && basetoken.Address == "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" { - basetoken = dia.Asset{ - Symbol: "ETH", - Address: "0x0000000000000000000000000000000000000000", - Blockchain: dia.ETHEREUM, - } - } + // if basetoken.Blockchain == dia.ETHEREUM && basetoken.Address == "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" { + // basetoken = dia.Asset{ + // Symbol: "ETH", + // Address: "0x0000000000000000000000000000000000000000", + // Blockchain: dia.ETHEREUM, + // } + // } if basetoken.Blockchain == dia.SOLANA && t.Source == dia.OrcaExchange { if basetoken.Address == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" { basetoken = dia.Asset{