diff --git a/.gitignore b/.gitignore index 4412f27..dc46c08 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ bin/ *.out config/config.yaml .env +*.log diff --git a/Dockerfile b/Dockerfile index b793b80..0f29c94 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,6 +21,7 @@ RUN addgroup -S appgroup && adduser -S appuser -G appgroup WORKDIR /home/appuser/ COPY --from=builder /app/allora-producer . +COPY --from=builder /app/config/config.yaml . USER appuser diff --git a/app/usecase/baseproducer.go b/app/usecase/baseproducer.go index bf13225..08504f3 100644 --- a/app/usecase/baseproducer.go +++ b/app/usecase/baseproducer.go @@ -3,25 +3,28 @@ package usecase import ( "context" "fmt" + "sync" "time" "github.com/allora-network/allora-producer/app/domain" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) type BaseProducer struct { service domain.ProcessorService - client domain.AlloraClientInterface + alloraClient domain.AlloraClientInterface repository domain.ProcessedBlockRepositoryInterface startHeight int64 blockRefreshInterval time.Duration rateLimitInterval time.Duration + numWorkers int + logger *zerolog.Logger } // InitStartHeight checks if the start height is zero and fetches the latest block height. func (bm *BaseProducer) InitStartHeight(ctx context.Context) error { if bm.startHeight == 0 { - latestHeight, err := bm.client.GetLatestBlockHeight(ctx) + latestHeight, err := bm.alloraClient.GetLatestBlockHeight(ctx) if err != nil { return fmt.Errorf("failed to get latest block height: %w", err) } @@ -42,7 +45,7 @@ func (bm *BaseProducer) MonitorLoop(ctx context.Context, processBlock func(ctx c // Process the block or block results at the current height if err := processBlock(ctx, bm.startHeight); err != nil { - log.Warn().Err(err).Msgf("failed to process block at height %d", bm.startHeight) + bm.logger.Warn().Err(err).Msgf("failed to process block at height %d", bm.startHeight) time.Sleep(bm.rateLimitInterval) continue } @@ -54,3 +57,54 @@ func (bm *BaseProducer) MonitorLoop(ctx context.Context, processBlock func(ctx c time.Sleep(bm.blockRefreshInterval) } } + +func (bm *BaseProducer) MonitorLoopParallel(ctx context.Context, processBlock func(ctx context.Context, height int64) error, numWorkers int) error { + blockQueue := make(chan int64, 100) // Buffered channel for block heights + + // Producer Goroutine + go func() { + for { + select { + case <-ctx.Done(): + close(blockQueue) + return + default: + } + + latestHeight, err := bm.alloraClient.GetLatestBlockHeight(ctx) + if err != nil { + bm.logger.Error().Err(err).Msg("failed to get latest block height") + time.Sleep(bm.rateLimitInterval) + continue + } + + for bm.startHeight <= latestHeight { + blockQueue <- bm.startHeight + bm.startHeight++ + } + + time.Sleep(bm.blockRefreshInterval) + } + }() + + // Consumer Goroutines + var wg sync.WaitGroup + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for height := range blockQueue { + bm.logger.Debug().Msgf("Worker %d processing block at height %d", i, height) + if err := processBlock(ctx, height); err != nil { + bm.logger.Warn().Err(err).Msgf("failed to process block at height %d", height) + // Re-enqueue for immediate retry + blockQueue <- height + } + time.Sleep(bm.rateLimitInterval) + } + }() + } + + wg.Wait() + return nil +} diff --git a/app/usecase/eventsproducer.go b/app/usecase/eventsproducer.go index f653ad5..848c1cb 100644 --- a/app/usecase/eventsproducer.go +++ b/app/usecase/eventsproducer.go @@ -6,6 +6,7 @@ import ( "time" "github.com/allora-network/allora-producer/app/domain" + "github.com/rs/zerolog/log" ) type EventsProducer struct { @@ -15,7 +16,7 @@ type EventsProducer struct { var _ domain.EventsProducer = &EventsProducer{} func NewEventsProducer(service domain.ProcessorService, client domain.AlloraClientInterface, repository domain.ProcessedBlockRepositoryInterface, - startHeight int64, blockRefreshInterval time.Duration, rateLimitInterval time.Duration) (*EventsProducer, error) { + startHeight int64, blockRefreshInterval time.Duration, rateLimitInterval time.Duration, numWorkers int) (*EventsProducer, error) { if service == nil { return nil, fmt.Errorf("service is nil") } @@ -26,14 +27,18 @@ func NewEventsProducer(service domain.ProcessorService, client domain.AlloraClie return nil, fmt.Errorf("repository is nil") } + logger := log.With().Str("producer", "events").Logger() + return &EventsProducer{ BaseProducer: BaseProducer{ service: service, - client: client, + alloraClient: client, repository: repository, startHeight: startHeight, blockRefreshInterval: blockRefreshInterval, rateLimitInterval: rateLimitInterval, + numWorkers: numWorkers, + logger: &logger, }, }, nil } @@ -43,18 +48,19 @@ func (m *EventsProducer) Execute(ctx context.Context) error { return err } - return m.MonitorLoop(ctx, m.processBlockResults) + return m.MonitorLoopParallel(ctx, m.processBlockResults, m.numWorkers) } func (m *EventsProducer) processBlockResults(ctx context.Context, height int64) error { + m.logger.Debug().Msgf("Processing block results for height %d", height) // Fetch BlockResults - blockResults, err := m.client.GetBlockResults(ctx, height) + blockResults, err := m.alloraClient.GetBlockResults(ctx, height) if err != nil { return fmt.Errorf("failed to get block results for height %d: %w", height, err) } // Fetch the Header separately - header, err := m.client.GetHeader(ctx, height) + header, err := m.alloraClient.GetHeader(ctx, height) if err != nil { return fmt.Errorf("failed to get block header for height %d: %w", height, err) } diff --git a/app/usecase/txproducer.go b/app/usecase/txproducer.go index dbf42d6..ab764c8 100644 --- a/app/usecase/txproducer.go +++ b/app/usecase/txproducer.go @@ -6,6 +6,7 @@ import ( "time" "github.com/allora-network/allora-producer/app/domain" + "github.com/rs/zerolog/log" ) type TransactionsProducer struct { @@ -15,7 +16,7 @@ type TransactionsProducer struct { var _ domain.TransactionsProducer = &TransactionsProducer{} func NewTransactionsProducer(service domain.ProcessorService, client domain.AlloraClientInterface, repository domain.ProcessedBlockRepositoryInterface, - startHeight int64, blockRefreshInterval time.Duration, rateLimitInterval time.Duration) (*TransactionsProducer, error) { + startHeight int64, blockRefreshInterval time.Duration, rateLimitInterval time.Duration, numWorkers int) (*TransactionsProducer, error) { if service == nil { return nil, fmt.Errorf("service is nil") } @@ -26,14 +27,18 @@ func NewTransactionsProducer(service domain.ProcessorService, client domain.Allo return nil, fmt.Errorf("repository is nil") } + logger := log.With().Str("producer", "transactions").Logger() + return &TransactionsProducer{ BaseProducer: BaseProducer{ service: service, - client: client, + alloraClient: client, repository: repository, startHeight: startHeight, blockRefreshInterval: blockRefreshInterval, rateLimitInterval: rateLimitInterval, + numWorkers: numWorkers, + logger: &logger, }, }, nil } @@ -43,12 +48,13 @@ func (m *TransactionsProducer) Execute(ctx context.Context) error { return err } - return m.MonitorLoop(ctx, m.processBlock) + return m.MonitorLoopParallel(ctx, m.processBlock, m.numWorkers) } func (m *TransactionsProducer) processBlock(ctx context.Context, height int64) error { + m.logger.Debug().Msgf("Processing block for height %d", height) // Fetch Block - block, err := m.client.GetBlockByHeight(ctx, height) + block, err := m.alloraClient.GetBlockByHeight(ctx, height) if err != nil { return fmt.Errorf("failed to get block for height %d: %w", height, err) } diff --git a/cmd/producer/main.go b/cmd/producer/main.go index 6d38df9..460cc4c 100644 --- a/cmd/producer/main.go +++ b/cmd/producer/main.go @@ -11,7 +11,6 @@ import ( "github.com/allora-network/allora-producer/app/usecase" "github.com/allora-network/allora-producer/codec" "github.com/allora-network/allora-producer/infra" - "github.com/go-playground/validator/v10" "github.com/jackc/pgx/v4/pgxpool" "github.com/allora-network/allora-producer/config" @@ -30,7 +29,7 @@ func main() { zerolog.SetGlobalLevel(zerolog.Level(cfg.Log.Level)) // Validate config - if err := validateConfig(&cfg); err != nil { + if err := config.ValidateConfig(&cfg); err != nil { log.Fatal().Err(err).Msg("invalid config") } @@ -87,11 +86,13 @@ func main() { if err != nil { log.Fatal().Err(err).Msg("failed to create processor service") } - eventsProducer, err := usecase.NewEventsProducer(processorService, alloraClient, processedBlockRepository, 0, cfg.Producer.BlockRefreshInterval, cfg.Producer.RateLimitInterval) + eventsProducer, err := usecase.NewEventsProducer(processorService, alloraClient, processedBlockRepository, 0, + cfg.Producer.BlockRefreshInterval, cfg.Producer.RateLimitInterval, cfg.Producer.NumWorkers) if err != nil { log.Fatal().Err(err).Msg("failed to create events producer use case") } - transactionsProducer, err := usecase.NewTransactionsProducer(processorService, alloraClient, processedBlockRepository, 0, cfg.Producer.BlockRefreshInterval, cfg.Producer.RateLimitInterval) + transactionsProducer, err := usecase.NewTransactionsProducer(processorService, alloraClient, processedBlockRepository, 0, + cfg.Producer.BlockRefreshInterval, cfg.Producer.RateLimitInterval, cfg.Producer.NumWorkers) if err != nil { log.Fatal().Err(err).Msg("failed to create transactions producer use case") } @@ -103,11 +104,6 @@ func main() { } } -func validateConfig(cfg *config.Config) error { - validate := validator.New() - return validate.Struct(cfg) -} - func getTopicMapping(cfg config.Config) map[string]string { topicMapping := make(map[string]string) for _, topicRouter := range cfg.KafkaTopicRouter { diff --git a/config/config.example.yaml b/config/config.example.yaml index eacf725..afc1bc6 100644 --- a/config/config.example.yaml +++ b/config/config.example.yaml @@ -17,6 +17,7 @@ log: producer: block_refresh_interval: 5s rate_limit_interval: 1s + num_workers: 5 kafka_topic_router: - name: "topic.staking" diff --git a/config/config.go b/config/config.go index 02a9a74..d298628 100644 --- a/config/config.go +++ b/config/config.go @@ -38,14 +38,15 @@ type FilterEventConfig struct { } type FilterTransactionConfig struct { - Types []string `mapstructure:"types" validate:"required,gt=0,dive,min=1"` // Types of transactions to filter + Types []string `mapstructure:"types" validate:"gt=0,dive,min=1"` // Types of transactions to filter } type LogConfig struct { - Level int8 `mapstructure:"level" validate:"required"` // Log level + Level int8 `mapstructure:"level" validate:"gte=-1,lte=7"` // Log level } type ProducerConfig struct { BlockRefreshInterval time.Duration `mapstructure:"block_refresh_interval" validate:"required"` // Block refresh interval RateLimitInterval time.Duration `mapstructure:"rate_limit_interval" validate:"required"` // Rate limit interval + NumWorkers int `mapstructure:"num_workers" validate:"required,gt=0"` // Number of workers to process blocks and block results } diff --git a/config/init.go b/config/init.go index 46d68cc..d84b12f 100644 --- a/config/init.go +++ b/config/init.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "github.com/go-playground/validator/v10" "github.com/rs/zerolog/log" "github.com/spf13/viper" ) @@ -35,3 +36,8 @@ func InitConfig() (Config, error) { log.Info().Msg("Configuration loaded successfully") return cfg, nil } + +func ValidateConfig(cfg *Config) error { + validate := validator.New() + return validate.Struct(cfg) +} diff --git a/docker-compose.yml b/docker-compose.yml index 3a89bef..57bee60 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.8' services: producer: build: @@ -23,7 +22,7 @@ services: networks: - app-network healthcheck: - test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER}"] + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"] interval: 10s timeout: 5s retries: 5