diff --git a/internal/producer.go b/internal/producer.go index a7d6c8b..766a020 100644 --- a/internal/producer.go +++ b/internal/producer.go @@ -1,10 +1,12 @@ package internal import ( + "errors" "fmt" "github.com/IBM/sarama" "github.com/violetpay-org/queue-streamer/common" "sync" + "time" ) // ProducerPool is a pool of producers that can be used to produce messages to Kafka for one set of brokers. @@ -18,6 +20,7 @@ type ProducerPool struct { // For kafka brokers []string configProvider func() *sarama.Config + monitoring bool } // Take returns a producer for a given topic. If the producer does not exist, it creates a new one. @@ -100,10 +103,15 @@ func (p *ProducerPool) generateProducer() sarama.AsyncProducer { return producer } +func (p *ProducerPool) isMonitoring() bool { + return p.monitoring +} + func (p *ProducerPool) monitorErrors(producer sarama.AsyncProducer) { + p.monitoring = true for err := range producer.Errors() { fmt.Println("ERROR! Failed to produce message:", err.Err) - if err.Err == sarama.ErrShuttingDown { + if errors.Is(err.Err, sarama.ErrShuttingDown) { if err.Msg == nil { // 프로듀서가 꺼졌는데 메시지가 남아있지 않은 경우에만 break 합니다. // 메세지가 남아있는 경우 재발행해야 되기 때문에 break 하면 안됩니다. @@ -115,6 +123,8 @@ func (p *ProducerPool) monitorErrors(producer sarama.AsyncProducer) { msg := err.Msg p.republishMessage(msg) } + + p.monitoring = false } // republishMessage republishes a message that failed to be produced. @@ -122,6 +132,7 @@ func (p *ProducerPool) republishMessage(msg *sarama.ProducerMessage) { // Republish message producer := p.Take(common.Topic{Name: msg.Topic, Partition: msg.Partition}) if producer != nil { + time.Sleep(500 * time.Millisecond) producer.Input() <- msg p.Return(producer, common.Topic{Name: msg.Topic, Partition: msg.Partition}) } else {