Skip to content

Commit

Permalink
first commit. volume filtering for current main system.
Browse files Browse the repository at this point in the history
  • Loading branch information
jppade committed Jul 24, 2023
1 parent ddd2fe7 commit cc8cf7e
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 156 deletions.
80 changes: 16 additions & 64 deletions cmd/exchange-scrapers/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"flag"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -66,13 +65,13 @@ func init() {
if !isValidExchange(*exchange) {
log.Fatal("Invalid exchange string: ", *exchange)
}
replicaKafkaTopic = utils.Getenv("REPLICA_KAFKA_TOPIC", "false")

}

// main manages all PairScrapers and handles incoming trade information
func main() {

log.Infof("start collector for %s in %s mode...", *exchange, *mode)
log.Infof("start collector for %s in test-space...", *exchange)

relDB, err := models.NewRelDataStore()
if err != nil {
Expand Down Expand Up @@ -104,25 +103,9 @@ func main() {
log.Warning("no config for exchange's api ", err)
}
es := scrapers.NewAPIScraper(*exchange, true, configApi.ApiKey, configApi.SecretKey, relDB)

// Set up kafka writers for various modes.
var (
w *kafka.Writer
// This topic can be used to forward trades to services other than the prod. tradesblockservice.
wReplica *kafka.Writer
wTest *kafka.Writer
)

switch *mode {
case "current":
w = kafkaHelper.NewWriter(kafkaHelper.TopicTrades)
wReplica = kafkaHelper.NewWriter(kafkaHelper.TopicTradesReplica)
wTest = kafkaHelper.NewWriter(kafkaHelper.TopicTradesTest)
case "estimation":
w = kafkaHelper.NewWriter(kafkaHelper.TopicTradesEstimation)
case "assetmap":
w = kafkaHelper.NewWriter(kafkaHelper.TopicTradesEstimation)
}
// Set up kafka writer.
w := kafkaHelper.NewWriter(kafkaHelper.TopicTradesTest)
log.Info("writer topic: ", w.Topic)

defer func() {
err := w.Close()
Expand Down Expand Up @@ -156,19 +139,21 @@ func main() {
defer wg.Wait()

}
go handleTrades(es.Channel(), &wg, w, wTest, wReplica, ds, *exchange, *mode)
go handleTrades(es.Channel(), &wg, w, ds, *exchange)

}

func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, wTest *kafka.Writer, wReplica *kafka.Writer, ds *models.DB, exchange string, mode string) {
func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, ds *models.DB, exchange string) {
lastTradeTime := time.Now()
watchdogDelay := scrapers.Exchanges[exchange].WatchdogDelay
if watchdogDelay == 0 {
watchdogDelay = scrapers.ExchangeDuplicates[exchange].WatchdogDelay
}
t := time.NewTicker(time.Duration(watchdogDelay) * time.Second)
tk := time.NewTicker(time.Duration(watchdogDelay) * time.Second)

for {
select {
case <-t.C:
case <-tk.C:
duration := time.Since(lastTradeTime)
if duration > time.Duration(watchdogDelay)*time.Second {
log.Error(duration)
Expand All @@ -181,48 +166,15 @@ func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, wTest
return
}
lastTradeTime = time.Now()
// Trades are sent to the tradesblockservice through a kafka channel - either
// through trades topic or historical trades topic.
if mode == "current" || mode == "historical" || mode == "estimation" {

// Write trade to productive Kafka.
err := writeTradeToKafka(w, t)
if err != nil {
log.Error(err)
}

if scrapers.Exchanges[t.Source].Centralized {
// Write CEX trades to test Kafka.
if mode == "current" {
err = writeTradeToKafka(wTest, t)
if err != nil {
log.Error(err)
}
}
}

if replicaKafkaTopic == "true" {
err := writeTradeToKafka(wReplica, t)
if err != nil {
log.Error(err)
}
}

}
// Trades are just saved in influx - not sent to the tradesblockservice through a kafka channel.
if mode == "storeTrades" {
err := ds.SaveTradeInflux(t)
if err != nil {
log.Error(err)
}
// Trades are sent to the tradesblockservice through a kafka channel.
err := writeTradeToKafka(w, t)
if err != nil {
log.Error(err)
}

if mode == "assetmap" {

fmt.Println("recieved trade", t)

}
}

}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/exchange-scrapers/collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/exchange-scrapers/collector
go 1.17

require (
github.com/diadata-org/diadata v1.4.309
github.com/diadata-org/diadata v1.4.312-rc-1
github.com/segmentio/kafka-go v0.4.35
github.com/sirupsen/logrus v1.9.0
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/services/tradesBlockService/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/services/tradesBlockService
go 1.17

require (
github.com/diadata-org/diadata v1.4.297
github.com/diadata-org/diadata v1.4.312-rc-1
github.com/segmentio/kafka-go v0.4.35
github.com/sirupsen/logrus v1.8.1
)
Expand Down
8 changes: 0 additions & 8 deletions config/uniswap/reverse_tokens/SushiswapQuotetoken.json

This file was deleted.

8 changes: 0 additions & 8 deletions config/uniswap/reverse_tokens/UniswapQuotetoken.json

This file was deleted.

12 changes: 0 additions & 12 deletions config/uniswapv3/reverse_tokens/UniswapV3Quotetoken.json

This file was deleted.

Loading

0 comments on commit cc8cf7e

Please sign in to comment.