Skip to content

Commit

Permalink
Set partition as block height % number of partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
fernandofcampos committed Oct 1, 2024
1 parent 96948a5 commit cff5a33
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 67 deletions.
21 changes: 11 additions & 10 deletions app/domain/mocks/StreamingClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion app/domain/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type TopicRouter interface {

type StreamingClient interface {
// PublishAsync publishes a message to a topic asynchronously
PublishAsync(ctx context.Context, msgType string, message []byte) error
PublishAsync(ctx context.Context, msgType string, message []byte, blockHeight int64) error
// Close closes the client
Close() error
}
Expand Down
33 changes: 20 additions & 13 deletions app/service/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,19 @@ func NewProcessorService(streamingClient domain.StreamingClient, codec domain.Co

// ProcessBlock implements domain.ProcessorService.
func (m *ProcessorService) ProcessBlock(ctx context.Context, block *coretypes.ResultBlock) error {
defer util.LogExecutionTime(time.Now(), "ProcessBlock")
if block == nil {
return fmt.Errorf("block is nil")
}

if block.Block == nil {
return fmt.Errorf("block is nil")
return fmt.Errorf("block.Block is nil")
}

defer util.LogExecutionTime(time.Now(), "ProcessBlock", map[string]interface{}{
"height": block.Block.Header.Height,
"txs": len(block.Block.Data.Txs),
})

for i, tx := range block.Block.Data.Txs {
err := m.ProcessTransaction(ctx, tx, i, &block.Block.Header)
if err != nil {
Expand All @@ -76,8 +83,6 @@ func (m *ProcessorService) ProcessBlock(ctx context.Context, block *coretypes.Re
}

func (m *ProcessorService) ProcessTransaction(ctx context.Context, tx []byte, txIndex int, header *types.Header) error {
defer util.LogExecutionTime(time.Now(), "ProcessTransaction")

if header == nil {
return fmt.Errorf("header is nil")
}
Expand Down Expand Up @@ -112,28 +117,32 @@ func (m *ProcessorService) ProcessTransaction(ctx context.Context, tx []byte, tx
return fmt.Errorf("failed to marshal message: %w", err)
}

err = m.streamingClient.PublishAsync(ctx, txMsg.TypeUrl, jsonMessage)
err = m.streamingClient.PublishAsync(ctx, txMsg.TypeUrl, jsonMessage, header.Height)
if err != nil {
return fmt.Errorf("failed to publish transaction message: %w", err)
}

log.Info().Str("typeUrl", txMsg.TypeUrl).Msg("transaction message processed successfully")
log.Debug().Str("typeUrl", txMsg.TypeUrl).Str("message", string(jsonMessage)).Msg("transaction message processed successfully")
}
return nil
}

// ProcessBlockResults implements domain.ProcessorService.
func (m *ProcessorService) ProcessBlockResults(ctx context.Context, blockResults *coretypes.ResultBlockResults, header *types.Header) error {
defer util.LogExecutionTime(time.Now(), "ProcessBlockResults")

if blockResults == nil {
return fmt.Errorf("block results is nil")
return fmt.Errorf("blockResults is nil")
}

if header == nil {
return fmt.Errorf("header is nil")
}

defer util.LogExecutionTime(time.Now(), "ProcessBlockResults", map[string]interface{}{
"height": header.Height,
"tx results": len(blockResults.TxsResults),
"finalize block events": len(blockResults.FinalizeBlockEvents),
})

for i, txResult := range blockResults.TxsResults {
if txResult == nil {
log.Warn().Int64("height", blockResults.Height).Int("txIndex", i).Msg("transaction result is nil")
Expand Down Expand Up @@ -162,8 +171,6 @@ func (m *ProcessorService) ProcessBlockResults(ctx context.Context, blockResults
}

func (m *ProcessorService) ProcessEvent(ctx context.Context, event *abci.Event, header *types.Header) error {
defer util.LogExecutionTime(time.Now(), "ProcessEvent")

if event == nil {
return fmt.Errorf("event is nil")
}
Expand Down Expand Up @@ -197,11 +204,11 @@ func (m *ProcessorService) ProcessEvent(ctx context.Context, event *abci.Event,
return fmt.Errorf("failed to marshal message: %w", err)
}

err = m.streamingClient.PublishAsync(ctx, event.Type, jsonMessage)
err = m.streamingClient.PublishAsync(ctx, event.Type, jsonMessage, header.Height)
if err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}

log.Info().Str("eventType", event.Type).Msg("event processed successfully")
log.Debug().Str("eventType", event.Type).Str("event", string(jsonMessage)).Msg("event processed successfully")
return nil
}
10 changes: 5 additions & 5 deletions app/service/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestProcessorService_ProcessBlock(t *testing.T) {
mockCodec.On("ParseTx", []byte("tx1")).Return(parsedTx, nil)
mockTxFilter.On("ShouldProcess", parsedTx.Body.Messages[0]).Return(true)
mockCodec.On("MarshalProtoJSON", parsedTx.Body.Messages[0]).Return(json.RawMessage(`{"key":"value"}`), nil)
mockKafka.On("PublishAsync", mock.Anything, "/test.Msg", mock.Anything).Return(nil)
mockKafka.On("PublishAsync", mock.Anything, "/test.Msg", mock.Anything, mock.Anything).Return(nil)
mockCodec.On("ParseTx", []byte("tx2")).Return(nil, errors.New("parse error"))

err = service.ProcessBlock(context.Background(), block)
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestProcessorService_ProcessTransaction(t *testing.T) {
mockCodec.On("ParseTx", tx).Return(parsedTx, nil)
mockTxFilter.On("ShouldProcess", parsedTx.Body.Messages[0]).Return(true)
mockCodec.On("MarshalProtoJSON", parsedTx.Body.Messages[0]).Return(json.RawMessage(`{"key":"value"}`), nil)
mockKafka.On("PublishAsync", mock.Anything, "/test.Msg", mock.Anything).Return(nil)
mockKafka.On("PublishAsync", mock.Anything, "/test.Msg", mock.Anything, mock.Anything).Return(nil)

err = service.ProcessTransaction(context.Background(), tx, 0, header)
require.NoError(t, err)
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestProcessorService_ProcessEvent(t *testing.T) {
mockEventFilter.On("ShouldProcess", event).Return(true)
mockCodec.On("ParseEvent", event).Return(parsedEvent, nil)
mockCodec.On("MarshalProtoJSON", parsedEvent).Return(json.RawMessage(`{"event":"data"}`), nil)
mockKafka.On("PublishAsync", mock.Anything, "test_event", mock.Anything).Return(nil)
mockKafka.On("PublishAsync", mock.Anything, "test_event", mock.Anything, mock.Anything).Return(nil)

err = service.ProcessEvent(context.Background(), event, header)
require.NoError(t, err)
Expand Down Expand Up @@ -202,12 +202,12 @@ func TestProcessorService_ProcessBlockResults(t *testing.T) {
mockEventFilter.On("ShouldProcess", event1).Return(true)
mockCodec.On("ParseEvent", event1).Return(parsedEvent1, nil)
mockCodec.On("MarshalProtoJSON", parsedEvent1).Return(json.RawMessage(`{"event":"tx_event"}`), nil)
mockKafka.On("PublishAsync", mock.Anything, "tx_event", mock.Anything).Return(nil)
mockKafka.On("PublishAsync", mock.Anything, "tx_event", mock.Anything, mock.Anything).Return(nil)

mockEventFilter.On("ShouldProcess", event2).Return(true)
mockCodec.On("ParseEvent", event2).Return(parsedEvent2, nil)
mockCodec.On("MarshalProtoJSON", parsedEvent2).Return(json.RawMessage(`{"event":"finalize_event"}`), nil)
mockKafka.On("PublishAsync", mock.Anything, "finalize_event", mock.Anything).Return(nil)
mockKafka.On("PublishAsync", mock.Anything, "finalize_event", mock.Anything, mock.Anything).Return(nil)

err = service.ProcessBlockResults(context.Background(), blockResults, header)
require.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion app/usecase/baseproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func (bm *BaseProducer) MonitorLoopParallel(ctx context.Context, processBlock fu
go func() {
defer wg.Done()
for height := range blockQueue {
bm.logger.Debug().Msgf("Worker %d processing block at height %d", i, height)
if err := processBlock(ctx, height); err != nil {
bm.logger.Warn().Err(err).Msgf("failed to process block at height %d", height)
// Re-enqueue for immediate retry
Expand Down
2 changes: 1 addition & 1 deletion app/usecase/eventsproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (m *EventsProducer) Execute(ctx context.Context) error {
}

func (m *EventsProducer) processBlockResults(ctx context.Context, height int64) error {
m.logger.Debug().Msgf("Processing block results for height %d", height)
m.logger.Info().Msgf("Processing block results for height %d", height)
// Fetch BlockResults
blockResults, err := m.alloraClient.GetBlockResults(ctx, height)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion app/usecase/txproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (m *TransactionsProducer) Execute(ctx context.Context) error {
}

func (m *TransactionsProducer) processBlock(ctx context.Context, height int64) error {
m.logger.Debug().Msgf("Processing block for height %d", height)
m.logger.Info().Msgf("Processing block for height %d", height)
// Fetch Block
block, err := m.alloraClient.GetBlockByHeight(ctx, height)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func main() {
if err != nil {
log.Fatal().Err(err).Str("component", "KafkaClient").Msg("failed to create Kafka client")
}
kafkaClient, err := infra.NewKafkaClient(franzClient, topicRouter)
kafkaClient, err := infra.NewKafkaClient(franzClient, topicRouter, cfg.Kafka.NumPartitions)
if err != nil {
log.Fatal().Err(err).Msg("failed to create Kafka client")
}
Expand Down
1 change: 1 addition & 0 deletions config/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ kafka:
- "localhost:9092"
user: "user"
password: "pass"
num_partitions: 6

allora:
rpc: "https://allora-rpc.testnet.allora.network"
Expand Down
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ type Config struct {
}

type KafkaConfig struct {
Seeds []string `mapstructure:"seeds" validate:"required,gt=0,dive,hostname_port"` // Kafka seeds
User string `mapstructure:"user" validate:"required,min=1"` // Kafka user
Password string `mapstructure:"password" validate:"required,min=1"` // Kafka password
Seeds []string `mapstructure:"seeds" validate:"required,gt=0,dive,hostname_port"` // Kafka seeds
User string `mapstructure:"user" validate:"required,min=1"` // Kafka user
Password string `mapstructure:"password" validate:"required,min=1"` // Kafka password
NumPartitions int32 `mapstructure:"num_partitions" validate:"required,min=1"` // Number of partitions for the Kafka topic
}

type DatabaseConfig struct {
Expand Down
12 changes: 12 additions & 0 deletions infra/allora_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package infra
import (
"context"
"fmt"
"time"

"github.com/allora-network/allora-producer/app/domain"
"github.com/allora-network/allora-producer/util"
coretypes "github.com/cometbft/cometbft/rpc/core/types"

rpchttp "github.com/cometbft/cometbft/rpc/client/http"
Expand All @@ -28,6 +30,9 @@ var _ domain.AlloraClientInterface = &AlloraClient{}

// GetBlockByHeight implements domain.AlloraClientInterface.
func (a *AlloraClient) GetBlockByHeight(ctx context.Context, height int64) (*coretypes.ResultBlock, error) {
defer util.LogExecutionTime(time.Now(), "GetBlockByHeight", map[string]interface{}{
"height": height,
})
block, err := a.client.Block(ctx, &height)
if err != nil {
return nil, fmt.Errorf("failed to retrieve block %d from %s: %w", height, a.rpcURL, err)
Expand All @@ -38,6 +43,9 @@ func (a *AlloraClient) GetBlockByHeight(ctx context.Context, height int64) (*cor

// GetBlockResults implements domain.AlloraClientInterface.
func (a *AlloraClient) GetBlockResults(ctx context.Context, height int64) (*coretypes.ResultBlockResults, error) {
defer util.LogExecutionTime(time.Now(), "GetBlockResults", map[string]interface{}{
"height": height,
})
results, err := a.client.BlockResults(ctx, &height)
if err != nil {
return nil, fmt.Errorf("failed to retrieve block results for block %d from %s: %w", height, a.rpcURL, err)
Expand All @@ -48,6 +56,7 @@ func (a *AlloraClient) GetBlockResults(ctx context.Context, height int64) (*core

// GetLatestBlockHeight implements domain.AlloraClientInterface.
func (a *AlloraClient) GetLatestBlockHeight(ctx context.Context) (int64, error) {
defer util.LogExecutionTime(time.Now(), "GetLatestBlockHeight", map[string]interface{}{})
status, err := a.client.Status(ctx)
if err != nil {
return 0, fmt.Errorf("failed to retrieve status from %s: %w", a.rpcURL, err)
Expand All @@ -56,6 +65,9 @@ func (a *AlloraClient) GetLatestBlockHeight(ctx context.Context) (int64, error)
}

func (a *AlloraClient) GetHeader(ctx context.Context, height int64) (*coretypes.ResultHeader, error) {
defer util.LogExecutionTime(time.Now(), "GetHeader", map[string]interface{}{
"height": height,
})
header, err := a.client.Header(ctx, &height)
if err != nil {
return nil, fmt.Errorf("failed to retrieve header for block %d from %s: %w", height, a.rpcURL, err)
Expand Down
42 changes: 22 additions & 20 deletions infra/kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/allora-network/allora-producer/app/domain"
"github.com/allora-network/allora-producer/util"
"github.com/rs/zerolog/log"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/plain"
Expand All @@ -24,12 +23,13 @@ type KafkaClient interface {
}

type kafkaStreamingClient struct {
client KafkaClient
router domain.TopicRouter
client KafkaClient
router domain.TopicRouter
numPartitions int32
}

// Update the constructor to accept a TopicRouter.
func NewKafkaClient(client KafkaClient, router domain.TopicRouter) (domain.StreamingClient, error) {
func NewKafkaClient(client KafkaClient, router domain.TopicRouter, numPartitions int32) (domain.StreamingClient, error) {
if client == nil {
return nil, errors.New("client is nil")
}
Expand All @@ -38,36 +38,41 @@ func NewKafkaClient(client KafkaClient, router domain.TopicRouter) (domain.Strea
}

return &kafkaStreamingClient{
client: client,
router: router,
client: client,
router: router,
numPartitions: numPartitions,
}, nil
}

// Publish publishes a message based on its MessageType.
func (k *kafkaStreamingClient) PublishAsync(ctx context.Context, msgType string, message []byte) error {
defer util.LogExecutionTime(time.Now(), "Publish")

func (k *kafkaStreamingClient) PublishAsync(ctx context.Context, msgType string, message []byte, blockHeight int64) error {
topic, err := k.router.GetTopic(msgType)
if err != nil {
log.Warn().Any("msgType", msgType).Err(err).Msg("failed to get topic")
return err
}

partition := k.getPartition(blockHeight)
record := &kgo.Record{
Topic: topic,
Value: message,
Topic: topic,
Value: message,
Partition: partition,
}
// Async produce
k.client.Produce(ctx, record, func(record *kgo.Record, err error) {
if err != nil {
log.Error().Err(err).Str("topic", record.Topic).Msg("failed to deliver message")
log.Warn().Err(err).Str("topic", record.Topic).Msg("failed to deliver message")
} else {
log.Debug().Str("topic", record.Topic).Msg("message delivered")
}
})
return nil
}

func (k *kafkaStreamingClient) getPartition(blockHeight int64) int32 {
return int32(blockHeight % int64(k.numPartitions)) //nolint:gosec // k.numPartitions is a small number, so the modulo will not overflow an int32
}

func (k *kafkaStreamingClient) Close() error {
// Gracefully flush all pending messages before closing
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -87,15 +92,12 @@ func NewFranzClient(seeds []string, user, password string) (*kgo.Client, error)
User: user,
Pass: password,
}.AsMechanism()),
kgo.ProducerBatchCompression(kgo.SnappyCompression()),
kgo.RecordRetries(10),
kgo.ProducerBatchCompression(kgo.ZstdCompression()),
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.ProducerBatchMaxBytes(1e6),
kgo.ProducerLinger(100 * time.Millisecond),
kgo.RetryTimeout(30 * time.Second),
kgo.RetryBackoffFn(func(attempt int) time.Duration {
return time.Duration(attempt) * 100 * time.Millisecond
}),
kgo.ProducerBatchMaxBytes(16e6),
kgo.RecordPartitioner(kgo.ManualPartitioner()),
kgo.MaxBufferedRecords(1e6),
kgo.MetadataMaxAge(60 * time.Second),
}

return kgo.NewClient(opts...)
Expand Down
Loading

0 comments on commit cff5a33

Please sign in to comment.