Skip to content

Commit

Permalink
Chore: producer monitoring status
Browse files Browse the repository at this point in the history
  • Loading branch information
asheswook committed Aug 8, 2024
1 parent eee283f commit 38f440e
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion internal/producer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package internal

import (
"errors"
"fmt"
"github.com/IBM/sarama"
"github.com/violetpay-org/queue-streamer/common"
"sync"
"time"
)

// ProducerPool is a pool of producers that can be used to produce messages to Kafka for one set of brokers.
Expand All @@ -18,6 +20,7 @@ type ProducerPool struct {
// For kafka
brokers []string
configProvider func() *sarama.Config
monitoring bool
}

// Take returns a producer for a given topic. If the producer does not exist, it creates a new one.
Expand Down Expand Up @@ -100,10 +103,15 @@ func (p *ProducerPool) generateProducer() sarama.AsyncProducer {
return producer
}

func (p *ProducerPool) isMonitoring() bool {
return p.monitoring
}

func (p *ProducerPool) monitorErrors(producer sarama.AsyncProducer) {
p.monitoring = true
for err := range producer.Errors() {
fmt.Println("ERROR! Failed to produce message:", err.Err)
if err.Err == sarama.ErrShuttingDown {
if errors.Is(err.Err, sarama.ErrShuttingDown) {
if err.Msg == nil {
// 프로듀서가 꺼졌는데 메시지가 남아있지 않은 경우에만 break 합니다.
// 메세지가 남아있는 경우 재발행해야 되기 때문에 break 하면 안됩니다.
Expand All @@ -115,13 +123,16 @@ func (p *ProducerPool) monitorErrors(producer sarama.AsyncProducer) {
msg := err.Msg
p.republishMessage(msg)
}

p.monitoring = false
}

// republishMessage republishes a message that failed to be produced.
func (p *ProducerPool) republishMessage(msg *sarama.ProducerMessage) {
// Republish message
producer := p.Take(common.Topic{Name: msg.Topic, Partition: msg.Partition})
if producer != nil {
time.Sleep(500 * time.Millisecond)
producer.Input() <- msg
p.Return(producer, common.Topic{Name: msg.Topic, Partition: msg.Partition})
} else {
Expand Down

0 comments on commit 38f440e

Please sign in to comment.