Skip to content

Commit

Permalink
Merge branch 'volume-filter-incr'
Browse files Browse the repository at this point in the history
  • Loading branch information
jppade committed Aug 10, 2023
2 parents 7852795 + 8016096 commit 598017f
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 176 deletions.
14 changes: 14 additions & 0 deletions build/Dockerfile-genericCollector-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
ARG exchange
FROM us.icr.io/dia-registry/devops/build-117-test:latest as build

WORKDIR $GOPATH/src/

COPY ./cmd/exchange-scrapers/collector ./
RUN go mod tidy -go=1.16 && go mod tidy -go=1.17 && go install

FROM gcr.io/distroless/base

COPY --from=build /go/bin/collector /bin/collector
COPY --from=build /config/ /config/

CMD ["collector"]
80 changes: 16 additions & 64 deletions cmd/exchange-scrapers/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"flag"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -67,13 +66,13 @@ func init() {
if !isValidExchange(*exchange) {
log.Fatal("Invalid exchange string: ", *exchange)
}
replicaKafkaTopic = utils.Getenv("REPLICA_KAFKA_TOPIC", "false")

}

// main manages all PairScrapers and handles incoming trade information
func main() {

log.Infof("start collector for %s in %s mode...", *exchange, *mode)
log.Infof("start collector for %s in test-space...", *exchange)

relDB, err := models.NewRelDataStore()
if err != nil {
Expand Down Expand Up @@ -105,25 +104,9 @@ func main() {
log.Warning("no config for exchange's api ", err)
}
es := scrapers.NewAPIScraper(*exchange, true, configApi.ApiKey, configApi.SecretKey, relDB)

// Set up kafka writers for various modes.
var (
w *kafka.Writer
// This topic can be used to forward trades to services other than the prod. tradesblockservice.
wReplica *kafka.Writer
wTest *kafka.Writer
)

switch *mode {
case "current":
w = kafkaHelper.NewWriter(kafkaHelper.TopicTrades)
wReplica = kafkaHelper.NewWriter(kafkaHelper.TopicTradesReplica)
wTest = kafkaHelper.NewWriter(kafkaHelper.TopicTradesTest)
case "estimation":
w = kafkaHelper.NewWriter(kafkaHelper.TopicTradesEstimation)
case "assetmap":
w = kafkaHelper.NewWriter(kafkaHelper.TopicTradesEstimation)
}
// Set up kafka writer.
w := kafkaHelper.NewWriter(kafkaHelper.TopicTradesTest)
log.Info("writer topic: ", w.Topic)

defer func() {
err := w.Close()
Expand Down Expand Up @@ -157,19 +140,21 @@ func main() {
defer wg.Wait()

}
go handleTrades(es.Channel(), &wg, w, wTest, wReplica, ds, *exchange, *mode)
go handleTrades(es.Channel(), &wg, w, ds, *exchange)

}

func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, wTest *kafka.Writer, wReplica *kafka.Writer, ds *models.DB, exchange string, mode string) {
func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, ds *models.DB, exchange string) {
lastTradeTime := time.Now()
watchdogDelay := scrapers.Exchanges[exchange].WatchdogDelay
if watchdogDelay == 0 {
watchdogDelay = scrapers.ExchangeDuplicates[exchange].WatchdogDelay
}
t := time.NewTicker(time.Duration(watchdogDelay) * time.Second)
tk := time.NewTicker(time.Duration(watchdogDelay) * time.Second)

for {
select {
case <-t.C:
case <-tk.C:
duration := time.Since(lastTradeTime)
if duration > time.Duration(watchdogDelay)*time.Second {
log.Error(duration)
Expand All @@ -182,48 +167,15 @@ func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, wTest
return
}
lastTradeTime = time.Now()
// Trades are sent to the tradesblockservice through a kafka channel - either
// through trades topic or historical trades topic.
if mode == "current" || mode == "historical" || mode == "estimation" {

// Write trade to productive Kafka.
err := writeTradeToKafka(w, t)
if err != nil {
log.Error(err)
}

if scrapers.Exchanges[t.Source].Centralized {
// Write CEX trades to test Kafka.
if mode == "current" {
err = writeTradeToKafka(wTest, t)
if err != nil {
log.Error(err)
}
}
}

if replicaKafkaTopic == "true" {
err := writeTradeToKafka(wReplica, t)
if err != nil {
log.Error(err)
}
}

}
// Trades are just saved in influx - not sent to the tradesblockservice through a kafka channel.
if mode == "storeTrades" {
err := ds.SaveTradeInflux(t)
if err != nil {
log.Error(err)
}
// Trades are sent to the tradesblockservice through a kafka channel.
err := writeTradeToKafka(w, t)
if err != nil {
log.Error(err)
}

if mode == "assetmap" {

fmt.Println("recieved trade", t)

}
}

}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/exchange-scrapers/collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/exchange-scrapers/collector
go 1.17

require (
github.com/diadata-org/diadata v1.4.322
github.com/diadata-org/diadata v1.4.323-rc-1
github.com/segmentio/kafka-go v0.4.35
github.com/sirupsen/logrus v1.9.0
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/services/filtersBlockService/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/services/filtersBlockService
go 1.17

require (
github.com/diadata-org/diadata v1.4.321
github.com/diadata-org/diadata v1.4.321-rc-1
github.com/segmentio/kafka-go v0.4.35
github.com/sirupsen/logrus v1.8.1
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/services/tradesBlockService/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/services/tradesBlockService
go 1.17

require (
github.com/diadata-org/diadata v1.4.321
github.com/diadata-org/diadata v1.4.321-rc-1
github.com/segmentio/kafka-go v0.4.35
github.com/sirupsen/logrus v1.8.1
)
Expand Down
7 changes: 6 additions & 1 deletion cmd/services/tradesBlockService/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ func main() {
log.Errorln("NewDataStore", err)
}

service := tradesBlockService.NewTradesBlockService(s, dia.BlockSizeSeconds, *historical)
relDB, err := models.NewRelDataStore()
if err != nil {
log.Error("New relational datastore: ", err)
}

service := tradesBlockService.NewTradesBlockService(s, relDB, dia.BlockSizeSeconds, *historical)

wg := sync.WaitGroup{}
go handleBlocks(service, &wg, kafkaWriter)
Expand Down
20 changes: 0 additions & 20 deletions config/balancer/reverse_tokens/BalancerV2Quotetoken.json

This file was deleted.

8 changes: 0 additions & 8 deletions config/uniswap/reverse_tokens/SushiswapQuotetoken.json

This file was deleted.

8 changes: 0 additions & 8 deletions config/uniswap/reverse_tokens/UniswapQuotetoken.json

This file was deleted.

4 changes: 0 additions & 4 deletions config/uniswapv3/reverse_tokens/UniswapV3Quotetoken.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
{
"Address": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"Symbol": "USDC"
},
{
"Address": "0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599",
"Symbol": "WBTC"
}
]
}
Loading

0 comments on commit 598017f

Please sign in to comment.