Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for using multiple RabbitMQ channels per connection #77

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Config struct {
Compression bool
Onfailure int
Stricfailure bool
NumChannels int
}
Prefetch struct {
Count int
Expand Down Expand Up @@ -217,6 +218,11 @@ func (c Config) QueueIsNoWait() bool {
return c.QueueSettings.NoWait
}

// NumChannels determines how many Channels to open on this Connection
func (c Config) NumChannels() int {
return c.RabbitMq.NumChannels
}

// ConsumerTag returns the tag used to identify the consumer.
func (c Config) ConsumerTag() string {
if v, set := os.LookupEnv("GO_WANT_HELPER_PROCESS"); set && v == "1" {
Expand Down Expand Up @@ -245,6 +251,7 @@ func LoadAndParse(location string) (*Config, error) {
cfg := &Config{}

SetDefaultQueueDurability(cfg)
SetDefaultNumChannels(cfg)

if err := gcfg.ReadFileInto(cfg, location); err != nil {
return nil, err
Expand All @@ -257,6 +264,7 @@ func CreateFromString(data string) (*Config, error) {
cfg := &Config{}

SetDefaultQueueDurability(cfg)
SetDefaultNumChannels(cfg)

if err := gcfg.ReadStringInto(cfg, data); err != nil {
return nil, err
Expand All @@ -270,6 +278,11 @@ func SetDefaultQueueDurability(cfg *Config) {
cfg.QueueSettings.Durable = true
}

// SetDefaultNumChannels sets NumChannels to 1 to keep backwards compatibility
func SetDefaultNumChannels(cfg *Config) {
cfg.RabbitMq.NumChannels = 1
}

func transformToStringValue(val string) string {
if val == "<empty>" {
return ""
Expand Down
72 changes: 72 additions & 0 deletions consumer/channel_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package consumer

import (
"errors"
"fmt"

"github.com/bketelsen/logr"
)

// ChannelMultiplexer describes an object that holds multiple Channels
type ChannelMultiplexer interface {
AddChannel(Channel)
Channels() []Channel
FirstChannel() (Channel, error)
Qos(prefetchCount, prefetchSize int, global bool) error
}

// ChannelList is an implementation of the ChannelMultiplexer interface.
type ChannelList struct {
channels []Channel
}

// NewChannelList creates a new ChannelList with a single Channel
func NewChannelList(conn Connection, len int, l logr.Logger) (*ChannelList, error) {
if len < 1 {
return nil, fmt.Errorf("cannot create a ChannelList with less than one channel (got %d)", len)
}

chs := make([]Channel, 0)
l.Infof("Opening %d channel(s)...", len)
for i := 0; i < len; i++ {
ch, err := conn.Channel()
if nil != err {
return nil, fmt.Errorf("failed to open a channel: %v", err)
}
l.Infof("Opened channel %d...", i)
chs = append(chs, ch)
}
l.Info("Done.")

return &ChannelList{channels: chs}, nil
}

// AddChannel adds the given channel to the end of the ChannelList
func (cl *ChannelList) AddChannel(c Channel) {
cl.channels = append(cl.channels, c)
}

// Channels returns a slice of Channels represented by this ChannelList.
func (cl *ChannelList) Channels() []Channel {
return cl.channels
}

// FirstChannel is a convenience function to get the first Channel in this ChannelList.
func (cl *ChannelList) FirstChannel() (Channel, error) {
if len(cl.channels) < 1 {
var ch Channel
return ch, errors.New("tried to get the first channel from an uninitialized ChannelList")
}

return cl.channels[0], nil
}

// Qos sets Qos settings for all Channels in this ChannelList.
func (cl *ChannelList) Qos(prefetchCount, prefetchSize int, global bool) error {
for i, ch := range cl.channels {
if err := ch.Qos(prefetchCount, prefetchSize, global); err != nil {
return fmt.Errorf("failed to set QoS on channel %d: %v", i, err)
}
}
return nil
}
60 changes: 60 additions & 0 deletions consumer/channel_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package consumer_test

import (
"fmt"
"testing"

"github.com/corvus-ch/rabbitmq-cli-consumer/config"
"github.com/corvus-ch/rabbitmq-cli-consumer/consumer"
"github.com/stretchr/testify/assert"
)

const qosConfig = "qos"

var channelListTests = []struct {
name string
config string
setup func(*consumer.ChannelList)
err error
}{
// Set QoS.
{
"setQos",
qosConfig,
func(cl *consumer.ChannelList) {
for _, ch := range cl.Channels() {
tc := ch.(*TestChannel)
tc.On("Qos", 42, 0, true).Return(nil).Once()
}
},
nil,
},
// Set QoS fails.
{
"setQosFail",
qosConfig,
func(cl *consumer.ChannelList) {
for _, ch := range cl.Channels() {
tc := ch.(*TestChannel)
tc.On("Qos", 42, 0, true).Return(fmt.Errorf("QoS error")).Once()
}
},
fmt.Errorf("failed to set QoS on channel 0: QoS error"),
},
}

func TestChannelList(t *testing.T) {
for _, test := range channelListTests {
t.Run(test.name, func(t *testing.T) {
cfg, _ := config.LoadAndParse(fmt.Sprintf("fixtures/%s.conf", test.config))
cl := &consumer.ChannelList{}
cl.AddChannel(new(TestChannel))
test.setup(cl)
assert.Equal(t, test.err, cl.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal()))
for _, ch := range cl.Channels() {
tc := ch.(*TestChannel)
tc.AssertExpectations(t)
}
})
}
}
1 change: 1 addition & 0 deletions consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config interface {
HasPriority() bool
MessageTTL() int32
MustDeclareQueue() bool
NumChannels() int
PrefetchCount() int
PrefetchIsGlobal() bool
Priority() int32
Expand Down
80 changes: 53 additions & 27 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type Consumer struct {
Connection Connection
Channel Channel
Channels ChannelMultiplexer
Queue string
Tag string
Processor processor.Processor
Expand All @@ -21,10 +21,10 @@ type Consumer struct {

// New creates a new consumer instance. The setup of the amqp connection and channel is expected to be done by the
// calling code.
func New(conn Connection, ch Channel, p processor.Processor, l logr.Logger) *Consumer {
func New(conn Connection, cm ChannelMultiplexer, p processor.Processor, l logr.Logger) *Consumer {
return &Consumer{
Connection: conn,
Channel: ch,
Channels: cm,
Processor: p,
Log: l,
}
Expand All @@ -40,20 +40,29 @@ func NewFromConfig(cfg Config, p processor.Processor, l logr.Logger) (*Consumer,
}
l.Info("Connected.")

l.Info("Opening channel...")
ch, err := conn.Channel()
cl, err := NewChannelList(conn, cfg.NumChannels(), l)
if nil != err {
return nil, fmt.Errorf("failed to open a channel: %v", err)
return nil, fmt.Errorf("failed creating channel(s): %v", err)
}

l.Info("Setting QoS... ")
if err := cl.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal()); err != nil {
return nil, err
}
l.Info("Succeeded setting QoS.")

ch, err := cl.FirstChannel()
if nil != err {
return nil, err
}
l.Info("Done.")

if err := Setup(cfg, ch, l); err != nil {
return nil, err
}

return &Consumer{
Connection: conn,
Channel: ch,
Channels: cl,
Queue: cfg.QueueName(),
Tag: cfg.ConsumerTag(),
Processor: p,
Expand All @@ -63,46 +72,63 @@ func NewFromConfig(cfg Config, p processor.Processor, l logr.Logger) (*Consumer,

// Consume subscribes itself to the message queue and starts consuming messages.
func (c *Consumer) Consume(ctx context.Context) error {
c.Log.Info("Registering consumer... ")
msgs, err := c.Channel.Consume(c.Queue, c.Tag, false, false, false, false, nil)
if err != nil {
return fmt.Errorf("failed to register a consumer: %s", err)
}
remoteClose := make(chan *amqp.Error)
done := make(chan error)

c.Log.Info("Succeeded registering consumer.")
c.Log.Info("Waiting for messages...")
c.Log.Info("Registering channels... ")
for i, ch := range c.Channels.Channels() {
msgs, err := ch.Consume(c.Queue, c.Tag, false, false, false, false, nil)
if err != nil {
return fmt.Errorf("failed to register a channel: %s", err)
}

remoteClose := make(chan *amqp.Error)
c.Channel.NotifyClose(remoteClose)
c.Log.Infof("Succeeded registering channel %d.", i)

done := make(chan error)
go c.consume(msgs, done)
ch.NotifyClose(remoteClose)

go c.consume(i, msgs, done)
}

c.Log.Info("Waiting for messages...")

select {
case err := <-remoteClose:
return err

case <-ctx.Done():
c.canceled = true
err := c.Channel.Cancel(c.Tag, false)
if err == nil {
err = <-done
}
return err
return c.Cancel(done)

case err := <-done:
return err
}
}

func (c *Consumer) consume(msgs <-chan amqp.Delivery, done chan error) {
// Cancel marks the Consumer as cancelled, and then cancels all the Channels.
func (c *Consumer) Cancel(done chan error) error {
c.canceled = true
var firstError error
for i, ch := range c.Channels.Channels() {
c.Log.Infof("closing channel %d...", i)
err := ch.Cancel(c.Tag, false)
if err == nil {
err = <-done
}
if nil != err && nil == firstError {
firstError = err
}
}

return firstError
}

func (c *Consumer) consume(channel int, msgs <-chan amqp.Delivery, done chan error) {
for m := range msgs {
d := delivery.New(m)
if c.canceled {
d.Nack(true)
continue
}
if err := c.checkError(c.Processor.Process(d)); err != nil {
if err := c.checkError(c.Processor.Process(channel, d)); err != nil {
done <- err
return
}
Expand Down
Loading