Skip to content

Commit

Permalink
update tbs and collector.
Browse files Browse the repository at this point in the history
  • Loading branch information
jppade committed Aug 10, 2023
1 parent 598017f commit c0151a7
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 45 deletions.
14 changes: 0 additions & 14 deletions build/Dockerfile-genericCollector-test

This file was deleted.

77 changes: 61 additions & 16 deletions cmd/exchange-scrapers/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ func init() {
if !isValidExchange(*exchange) {
log.Fatal("Invalid exchange string: ", *exchange)
}

replicaKafkaTopic = utils.Getenv("REPLICA_KAFKA_TOPIC", "false")
}

// main manages all PairScrapers and handles incoming trade information
func main() {

log.Infof("start collector for %s in test-space...", *exchange)
log.Infof("start collector for %s in %s mode...", *exchange, *mode)

relDB, err := models.NewRelDataStore()
if err != nil {
Expand Down Expand Up @@ -104,9 +104,25 @@ func main() {
log.Warning("no config for exchange's api ", err)
}
es := scrapers.NewAPIScraper(*exchange, true, configApi.ApiKey, configApi.SecretKey, relDB)
// Set up kafka writer.
w := kafkaHelper.NewWriter(kafkaHelper.TopicTradesTest)
log.Info("writer topic: ", w.Topic)

// Set up kafka writers for various modes.
var (
w *kafka.Writer
// This topic can be used to forward trades to services other than the prod. tradesblockservice.
wReplica *kafka.Writer
wTest *kafka.Writer
)

switch *mode {
case "current":
w = kafkaHelper.NewWriter(kafkaHelper.TopicTrades)
wReplica = kafkaHelper.NewWriter(kafkaHelper.TopicTradesReplica)
wTest = kafkaHelper.NewWriter(kafkaHelper.TopicTradesTest)
case "estimation":
w = kafkaHelper.NewWriter(kafkaHelper.TopicTradesEstimation)
case "assetmap":
w = kafkaHelper.NewWriter(kafkaHelper.TopicTradesEstimation)
}

defer func() {
err := w.Close()
Expand Down Expand Up @@ -140,21 +156,19 @@ func main() {
defer wg.Wait()

}
go handleTrades(es.Channel(), &wg, w, ds, *exchange)

go handleTrades(es.Channel(), &wg, w, wTest, wReplica, ds, *exchange, *mode)
}

func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, ds *models.DB, exchange string) {
func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, wTest *kafka.Writer, wReplica *kafka.Writer, ds *models.DB, exchange string, mode string) {
lastTradeTime := time.Now()
watchdogDelay := scrapers.Exchanges[exchange].WatchdogDelay
if watchdogDelay == 0 {
watchdogDelay = scrapers.ExchangeDuplicates[exchange].WatchdogDelay
}
tk := time.NewTicker(time.Duration(watchdogDelay) * time.Second)

t := time.NewTicker(time.Duration(watchdogDelay) * time.Second)
for {
select {
case <-tk.C:
case <-t.C:
duration := time.Since(lastTradeTime)
if duration > time.Duration(watchdogDelay)*time.Second {
log.Error(duration)
Expand All @@ -167,15 +181,46 @@ func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer, ds *mo
return
}
lastTradeTime = time.Now()
// Trades are sent to the tradesblockservice through a kafka channel - either
// through trades topic or historical trades topic.
if mode == "current" || mode == "historical" || mode == "estimation" {

// Write trade to productive Kafka.
err := writeTradeToKafka(w, t)
if err != nil {
log.Error(err)
}

if scrapers.Exchanges[t.Source].Centralized {
// Write CEX trades to test Kafka.
if mode == "current" {
err = writeTradeToKafka(wTest, t)
if err != nil {
log.Error(err)
}
}
}

if replicaKafkaTopic == "true" {
err := writeTradeToKafka(wReplica, t)
if err != nil {
log.Error(err)
}
}

// Trades are sent to the tradesblockservice through a kafka channel.
err := writeTradeToKafka(w, t)
if err != nil {
log.Error(err)
}
// Trades are just saved in influx - not sent to the tradesblockservice through a kafka channel.
if mode == "storeTrades" {
err := ds.SaveTradeInflux(t)
if err != nil {
log.Error(err)
}
}

if mode == "assetmap" {
log.Info("recieved trade", t)
}
}

}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/exchange-scrapers/collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/exchange-scrapers/collector
go 1.17

require (
github.com/diadata-org/diadata v1.4.323-rc-1
github.com/diadata-org/diadata v1.4.324
github.com/segmentio/kafka-go v0.4.35
github.com/sirupsen/logrus v1.9.0
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/services/tradesBlockService/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/services/tradesBlockService
go 1.17

require (
github.com/diadata-org/diadata v1.4.321-rc-1
github.com/diadata-org/diadata v1.4.324
github.com/segmentio/kafka-go v0.4.35
github.com/sirupsen/logrus v1.8.1
)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ require (
github.com/hdevalence/ed25519consensus v0.1.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/kennygrant/sanitize v1.2.4 // indirect
github.com/machinebox/graphql v0.2.2
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/prometheus/tsdb v0.10.0 // indirect
github.com/rjeczalik/notify v0.9.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,8 @@ github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/z
github.com/lucasjones/reggen v0.0.0-20180717132126-cdb49ff09d77/go.mod h1:5ELEyG+X8f+meRWHuqUOewBOhvHkl7M76pdGEansxW4=
github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/machinebox/graphql v0.2.2 h1:dWKpJligYKhYKO5A2gvNhkJdQMNZeChZYyBbrZkBZfo=
github.com/machinebox/graphql v0.2.2/go.mod h1:F+kbVMHuwrQ5tYgU9JXlnskM8nOaFxCAEolaQybkjWA=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls=
Expand Down
24 changes: 11 additions & 13 deletions internal/pkg/tradesBlockService/tradesBlockService.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,10 @@ func NewTradesBlockService(datastore models.Datastore, relDB models.RelDatastore
// runs in a goroutine until s is closed
func (s *TradesBlockService) mainLoop() {
var (
acceptCount int
acceptCountDEX int
acceptCountSwapDEX int
totalCount int
logTicker = *time.NewTicker(120 * time.Second)
acceptCountDEX int
// acceptCountSwapDEX int
totalCount int
logTicker = *time.NewTicker(120 * time.Second)
)
for {
select {
Expand All @@ -181,16 +180,16 @@ func (s *TradesBlockService) mainLoop() {
// Collect booleans for stats.
tradeOk := s.checkTrade(*t)
// swapppedTradeOk := s.checkTrade(tSwapped)
// if tradeOk {
// acceptCountDEX++
// }
if tradeOk {
acceptCountDEX++
}
// if swapppedTradeOk {
// acceptCountSwapDEX++
// }
// if tradeOk || swapppedTradeOk {
// acceptCount++
// }
// totalCount++
totalCount++

// Process (possibly) both trades.
if tradeOk {
Expand All @@ -206,11 +205,10 @@ func (s *TradesBlockService) mainLoop() {
}
case <-logTicker.C:
log.Info("accepted trades DEX: ", acceptCountDEX)
log.Info("accepted swapped trades DEX: ", acceptCountSwapDEX)
log.Info("discarded trades: ", totalCount-acceptCount)
acceptCount = 0
// log.Info("accepted swapped trades DEX: ", acceptCountSwapDEX)
log.Info("discarded trades: ", totalCount-acceptCountDEX)
acceptCountDEX = 0
acceptCountSwapDEX = 0
// acceptCountSwapDEX = 0
totalCount = 0
}
}
Expand Down

0 comments on commit c0151a7

Please sign in to comment.