-
Notifications
You must be signed in to change notification settings - Fork 10
Fix rabbitio out panic 'send on closed channel' #38
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ package rmq | |
|
||
import ( | ||
"log" | ||
"time" | ||
|
||
"github.com/streadway/amqp" | ||
) | ||
|
@@ -69,6 +70,7 @@ func NewConsumer(amqpURI, exchange, queue, routingKey, tag string, prefetch int) | |
exchange: exchange, | ||
contentType: "application/json", | ||
contentEncoding: "UTF-8", | ||
tag: tag, | ||
} | ||
log.Print("RabbitMQ connected: ", amqpURI) | ||
log.Printf("Bind to Exchange: %q and Queue: %q, Messaging waiting: %d", exchange, queue, q.Messages) | ||
|
@@ -78,7 +80,25 @@ func NewConsumer(amqpURI, exchange, queue, routingKey, tag string, prefetch int) | |
|
||
func (r *RabbitMQ) ackMultiple(deliveryTag <-chan Verify) { | ||
for v := range deliveryTag { | ||
r.channel.Ack(v.Tag, v.MultiAck) | ||
err := r.channel.Ack(v.Tag, v.MultiAck) | ||
if err != nil { | ||
log.Fatalf("rabbit channel ack failed %s", err) | ||
} | ||
} | ||
} | ||
|
||
func (r *RabbitMQ) SafeStop() { | ||
err := r.channel.Cancel( | ||
r.tag, // name | ||
false, // noWait | ||
) | ||
if err != nil { | ||
log.Fatalf("rabbit channel cancel failed %s", err) | ||
} else { | ||
for _ = range time.Tick(2 * time.Second) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need the time tick here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a "crutch", to allow for all AMQP-messages to be put into the write channel before it closes. I'm not so familiar with the golang, to avoid time tick/sleep usage, but it looks like a better approach exists. |
||
log.Print("Rabbit Channel canceled") | ||
break | ||
} | ||
} | ||
} | ||
|
||
|
@@ -89,12 +109,12 @@ func (r *RabbitMQ) Consume(out chan Message, verify <-chan Verify) { | |
// set up a channel consumer | ||
deliveries, err := r.channel.Consume( | ||
r.queue, // name | ||
r.tag, // consumerTag, | ||
r.tag, // consumer | ||
false, // noAck | ||
false, // exclusive | ||
false, // noLocal | ||
false, // noWait | ||
nil, // arguments | ||
nil, // args | ||
Comment on lines
+112
to
+117
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to change these There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, you're right. |
||
) | ||
if err != nil { | ||
log.Fatalf("rabbit consumer failed %s", err) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this is about graceful shutdown on sigterm right? What's the consequence of not doing the SafeStop? Some stale channel on the broker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It provides a graceful pause of AMQP-messages consuming before the closing go-channel, which writes messages on disk. The consequence of not doing the SafeStop is panic: send on closed channel on SIGTERM.