Skip to content
This repository has been archived by the owner on Feb 8, 2023. It is now read-only.

Fix rabbitio out panic 'send on closed channel' #38

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/out.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ var outCmd = &cobra.Command{
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
fmt.Println(" Interruption, saving last memory bits..")
fmt.Println("Interruption, saving last memory bits..")
rabbit.SafeStop()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is about graceful shutdown on sigterm right? What's the consequence of not doing the SafeStop? Some stale channel on the broker?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It provides a graceful pause of AMQP-messages consuming before the closing go-channel, which writes messages on disk. The consequence of not doing the SafeStop is panic: send on closed channel on SIGTERM.

close(channel)
}()

Expand Down
26 changes: 23 additions & 3 deletions rmq/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package rmq

import (
"log"
"time"

"github.com/streadway/amqp"
)
Expand Down Expand Up @@ -69,6 +70,7 @@ func NewConsumer(amqpURI, exchange, queue, routingKey, tag string, prefetch int)
exchange: exchange,
contentType: "application/json",
contentEncoding: "UTF-8",
tag: tag,
}
log.Print("RabbitMQ connected: ", amqpURI)
log.Printf("Bind to Exchange: %q and Queue: %q, Messaging waiting: %d", exchange, queue, q.Messages)
Expand All @@ -78,7 +80,25 @@ func NewConsumer(amqpURI, exchange, queue, routingKey, tag string, prefetch int)

func (r *RabbitMQ) ackMultiple(deliveryTag <-chan Verify) {
for v := range deliveryTag {
r.channel.Ack(v.Tag, v.MultiAck)
err := r.channel.Ack(v.Tag, v.MultiAck)
if err != nil {
log.Fatalf("rabbit channel ack failed %s", err)
}
}
}

func (r *RabbitMQ) SafeStop() {
err := r.channel.Cancel(
r.tag, // name
false, // noWait
)
if err != nil {
log.Fatalf("rabbit channel cancel failed %s", err)
} else {
for _ = range time.Tick(2 * time.Second) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need the time tick here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a "crutch", to allow for all AMQP-messages to be put into the write channel before it closes. I'm not so familiar with the golang, to avoid time tick/sleep usage, but it looks like a better approach exists.

log.Print("Rabbit Channel canceled")
break
}
}
}

Expand All @@ -89,12 +109,12 @@ func (r *RabbitMQ) Consume(out chan Message, verify <-chan Verify) {
// set up a channel consumer
deliveries, err := r.channel.Consume(
r.queue, // name
r.tag, // consumerTag,
r.tag, // consumer
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
nil, // args
Comment on lines +112 to +117
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to change these

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right.

)
if err != nil {
log.Fatalf("rabbit consumer failed %s", err)
Expand Down