Skip to content

Commit

Permalink
Process multiple blocks in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
fernandofcampos committed Sep 26, 2024
1 parent d177cd8 commit feedabd
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 15 deletions.
57 changes: 55 additions & 2 deletions app/usecase/baseproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package usecase
import (
"context"
"fmt"
"sync"
"time"

"github.com/allora-network/allora-producer/app/domain"
Expand All @@ -11,17 +12,18 @@ import (

type BaseProducer struct {
service domain.ProcessorService
client domain.AlloraClientInterface
alloraClient domain.AlloraClientInterface
repository domain.ProcessedBlockRepositoryInterface
startHeight int64
blockRefreshInterval time.Duration
rateLimitInterval time.Duration
numWorkers int
}

// InitStartHeight checks if the start height is zero and fetches the latest block height.
func (bm *BaseProducer) InitStartHeight(ctx context.Context) error {
if bm.startHeight == 0 {
latestHeight, err := bm.client.GetLatestBlockHeight(ctx)
latestHeight, err := bm.alloraClient.GetLatestBlockHeight(ctx)
if err != nil {
return fmt.Errorf("failed to get latest block height: %w", err)
}
Expand Down Expand Up @@ -54,3 +56,54 @@ func (bm *BaseProducer) MonitorLoop(ctx context.Context, processBlock func(ctx c
time.Sleep(bm.blockRefreshInterval)
}
}

func (bm *BaseProducer) MonitorLoopParallel(ctx context.Context, processBlock func(ctx context.Context, height int64) error, numWorkers int) error {
blockQueue := make(chan int64, 100) // Buffered channel for block heights

// Producer Goroutine
go func() {
for {
select {
case <-ctx.Done():
close(blockQueue)
return
default:
}

latestHeight, err := bm.alloraClient.GetLatestBlockHeight(ctx)
if err != nil {
log.Error().Err(err).Msg("failed to get latest block height")
time.Sleep(bm.rateLimitInterval)
continue
}

for bm.startHeight <= latestHeight {
blockQueue <- bm.startHeight
bm.startHeight++
}

time.Sleep(bm.blockRefreshInterval)
}
}()

// Consumer Goroutines
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for height := range blockQueue {
log.Debug().Msgf("Worker %d processing block at height %d", i, height)
if err := processBlock(ctx, height); err != nil {
log.Warn().Err(err).Msgf("failed to process block at height %d", height)
// Re-enqueue for immediate retry
blockQueue <- height
}
time.Sleep(bm.rateLimitInterval)
}
}()
}

wg.Wait()
return nil
}
11 changes: 6 additions & 5 deletions app/usecase/eventsproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type EventsProducer struct {
var _ domain.EventsProducer = &EventsProducer{}

func NewEventsProducer(service domain.ProcessorService, client domain.AlloraClientInterface, repository domain.ProcessedBlockRepositoryInterface,
startHeight int64, blockRefreshInterval time.Duration, rateLimitInterval time.Duration) (*EventsProducer, error) {
startHeight int64, blockRefreshInterval time.Duration, rateLimitInterval time.Duration, numWorkers int) (*EventsProducer, error) {
if service == nil {
return nil, fmt.Errorf("service is nil")
}
Expand All @@ -29,11 +29,12 @@ func NewEventsProducer(service domain.ProcessorService, client domain.AlloraClie
return &EventsProducer{
BaseProducer: BaseProducer{
service: service,
client: client,
alloraClient: client,
repository: repository,
startHeight: startHeight,
blockRefreshInterval: blockRefreshInterval,
rateLimitInterval: rateLimitInterval,
numWorkers: numWorkers,
},
}, nil
}
Expand All @@ -43,18 +44,18 @@ func (m *EventsProducer) Execute(ctx context.Context) error {
return err
}

return m.MonitorLoop(ctx, m.processBlockResults)
return m.MonitorLoopParallel(ctx, m.processBlockResults, m.numWorkers)
}

func (m *EventsProducer) processBlockResults(ctx context.Context, height int64) error {
// Fetch BlockResults
blockResults, err := m.client.GetBlockResults(ctx, height)
blockResults, err := m.alloraClient.GetBlockResults(ctx, height)
if err != nil {
return fmt.Errorf("failed to get block results for height %d: %w", height, err)
}

// Fetch the Header separately
header, err := m.client.GetHeader(ctx, height)
header, err := m.alloraClient.GetHeader(ctx, height)
if err != nil {
return fmt.Errorf("failed to get block header for height %d: %w", height, err)
}
Expand Down
9 changes: 5 additions & 4 deletions app/usecase/txproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type TransactionsProducer struct {
var _ domain.TransactionsProducer = &TransactionsProducer{}

func NewTransactionsProducer(service domain.ProcessorService, client domain.AlloraClientInterface, repository domain.ProcessedBlockRepositoryInterface,
startHeight int64, blockRefreshInterval time.Duration, rateLimitInterval time.Duration) (*TransactionsProducer, error) {
startHeight int64, blockRefreshInterval time.Duration, rateLimitInterval time.Duration, numWorkers int) (*TransactionsProducer, error) {
if service == nil {
return nil, fmt.Errorf("service is nil")
}
Expand All @@ -29,11 +29,12 @@ func NewTransactionsProducer(service domain.ProcessorService, client domain.Allo
return &TransactionsProducer{
BaseProducer: BaseProducer{
service: service,
client: client,
alloraClient: client,
repository: repository,
startHeight: startHeight,
blockRefreshInterval: blockRefreshInterval,
rateLimitInterval: rateLimitInterval,
numWorkers: numWorkers,
},
}, nil
}
Expand All @@ -43,12 +44,12 @@ func (m *TransactionsProducer) Execute(ctx context.Context) error {
return err
}

return m.MonitorLoop(ctx, m.processBlock)
return m.MonitorLoopParallel(ctx, m.processBlock, m.numWorkers)
}

func (m *TransactionsProducer) processBlock(ctx context.Context, height int64) error {
// Fetch Block
block, err := m.client.GetBlockByHeight(ctx, height)
block, err := m.alloraClient.GetBlockByHeight(ctx, height)
if err != nil {
return fmt.Errorf("failed to get block for height %d: %w", height, err)
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ func main() {
if err != nil {
log.Fatal().Err(err).Msg("failed to create processor service")
}
eventsProducer, err := usecase.NewEventsProducer(processorService, alloraClient, processedBlockRepository, 0, cfg.Producer.BlockRefreshInterval, cfg.Producer.RateLimitInterval)
eventsProducer, err := usecase.NewEventsProducer(processorService, alloraClient, processedBlockRepository, 0,
cfg.Producer.BlockRefreshInterval, cfg.Producer.RateLimitInterval, cfg.Producer.NumWorkers)
if err != nil {
log.Fatal().Err(err).Msg("failed to create events producer use case")
}
transactionsProducer, err := usecase.NewTransactionsProducer(processorService, alloraClient, processedBlockRepository, 0, cfg.Producer.BlockRefreshInterval, cfg.Producer.RateLimitInterval)
transactionsProducer, err := usecase.NewTransactionsProducer(processorService, alloraClient, processedBlockRepository, 0,
cfg.Producer.BlockRefreshInterval, cfg.Producer.RateLimitInterval, cfg.Producer.NumWorkers)
if err != nil {
log.Fatal().Err(err).Msg("failed to create transactions producer use case")
}
Expand Down
1 change: 1 addition & 0 deletions config/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ log:
producer:
block_refresh_interval: 5s
rate_limit_interval: 1s
num_workers: 5

kafka_topic_router:
- name: "topic.staking"
Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ type FilterEventConfig struct {
}

type FilterTransactionConfig struct {
Types []string `mapstructure:"types" validate:"required,gt=0,dive,min=1"` // Types of transactions to filter
Types []string `mapstructure:"types" validate:"gt=0,dive,min=1"` // Types of transactions to filter
}

type LogConfig struct {
Level int8 `mapstructure:"level" validate:"required"` // Log level
Level int8 `mapstructure:"level" validate:"gte=-1,lte=7"` // Log level
}

type ProducerConfig struct {
BlockRefreshInterval time.Duration `mapstructure:"block_refresh_interval" validate:"required"` // Block refresh interval
RateLimitInterval time.Duration `mapstructure:"rate_limit_interval" validate:"required"` // Rate limit interval
NumWorkers int `mapstructure:"num_workers" validate:"required,gt=0"` // Number of workers to process blocks and block results
}

0 comments on commit feedabd

Please sign in to comment.