From f1829c780f09dda9bd32dd03d7448248f131a3ef Mon Sep 17 00:00:00 2001 From: Brendan Clougherty Date: Fri, 9 Aug 2019 11:52:33 -0400 Subject: [PATCH 1/7] Adding support for using multiple RabbitMQ channels per connection --- config/config.go | 6 ++ consumer/channel_list.go | 63 ++++++++++++++++ consumer/config.go | 1 + consumer/consumer.go | 71 +++++++++++-------- consumer/consumer_consume_test.go | 32 +++++---- consumer/consumer_test.go | 13 ++-- consumer/mock_test.go | 4 +- consumer/setup.go | 17 +++-- consumer/setup_test.go | 4 +- go.sum | 6 ++ main.go | 11 +++ .../fixtures/TestProcessor_Run/error.golden | 10 +-- .../TestProcessor_Run/errorCapture.golden | 8 +-- .../fixtures/TestProcessor_Run/success.golden | 4 +- processor/processor.go | 18 ++--- processor/processor_test.go | 4 +- 16 files changed, 195 insertions(+), 77 deletions(-) create mode 100644 consumer/channel_list.go diff --git a/config/config.go b/config/config.go index 40dbe29..95fb1d0 100644 --- a/config/config.go +++ b/config/config.go @@ -21,6 +21,7 @@ type Config struct { Compression bool Onfailure int Stricfailure bool + NumChannels int } Prefetch struct { Count int @@ -217,6 +218,11 @@ func (c Config) QueueIsNoWait() bool { return c.QueueSettings.NoWait } +// NumChannels determines how many Channels to open on this Connection +func (c Config) NumChannels() int { + return c.RabbitMq.NumChannels +} + // ConsumerTag returns the tag used to identify the consumer. func (c Config) ConsumerTag() string { if v, set := os.LookupEnv("GO_WANT_HELPER_PROCESS"); set && v == "1" { diff --git a/consumer/channel_list.go b/consumer/channel_list.go new file mode 100644 index 0000000..a47c93e --- /dev/null +++ b/consumer/channel_list.go @@ -0,0 +1,63 @@ +package consumer + +import ( + "errors" + "fmt" + + "github.com/bketelsen/logr" +) + +// ChannelMultiplexer describes an object that holds multiple Channels +type ChannelMultiplexer interface { + AddChannel() + Channels() []Channel + FirstChannel() Channel +} + +// ChannelList is an implementation of the ChannelMultiplexer interface. +type ChannelList struct { + channels []Channel +} + +// NewChannelList creates a new ChannelList with a single Channel +func NewChannelList(conn Connection, len int, l logr.Logger) (*ChannelList, error) { + if (len < 1) { + return nil, fmt.Errorf("cannot create a ChannelList with less than one channel (got %d)", len) + } + + chs := make([]Channel, 0) + l.Infof("Opening %d channel(s)...", len) + for i := 0; i < len; i++ { + ch, err := conn.Channel() + if nil != err { + return nil, fmt.Errorf("failed to open a channel: %v", err) + } + l.Infof("Opened channel %d...", i) + chs = append(chs, ch) + } + l.Info("Done.") + + return &ChannelList{channels: chs}, nil +} + +// AddChannel adds the given channel to the end of the ChannelList +func (cl *ChannelList) AddChannel(c Channel) { + cl.channels = append(cl.channels, c) +} + +// Channels returns a slice of Channels represented by this ChannelList. +func (cl *ChannelList) Channels() []Channel { + return cl.channels +} + +// FirstChannel is a convenience function to get the first Channel in this ChannelList. +func (cl *ChannelList) FirstChannel() (Channel, error) { + var ch Channel + chs := cl.Channels() + if len(chs) < 1 { + return ch, errors.New("tried to get the first channel from an uninitialized ChannelList") + } + + ch = cl.channels[0] + return ch, nil +} diff --git a/consumer/config.go b/consumer/config.go index f982f63..8981fc1 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -17,6 +17,7 @@ type Config interface { HasPriority() bool MessageTTL() int32 MustDeclareQueue() bool + NumChannels() int PrefetchCount() int PrefetchIsGlobal() bool Priority() int32 diff --git a/consumer/consumer.go b/consumer/consumer.go index ad78c56..4346766 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -11,7 +11,7 @@ import ( type Consumer struct { Connection Connection - Channel Channel + Channels *ChannelList Queue string Tag string Processor processor.Processor @@ -21,10 +21,10 @@ type Consumer struct { // New creates a new consumer instance. The setup of the amqp connection and channel is expected to be done by the // calling code. -func New(conn Connection, ch Channel, p processor.Processor, l logr.Logger) *Consumer { +func New(conn Connection, chs *ChannelList, p processor.Processor, l logr.Logger) *Consumer { return &Consumer{ Connection: conn, - Channel: ch, + Channels: chs, Processor: p, Log: l, } @@ -40,20 +40,18 @@ func NewFromConfig(cfg Config, p processor.Processor, l logr.Logger) (*Consumer, } l.Info("Connected.") - l.Info("Opening channel...") - ch, err := conn.Channel() + cl, err := NewChannelList(conn, cfg.NumChannels(), l) if nil != err { - return nil, fmt.Errorf("failed to open a channel: %v", err) + return nil, fmt.Errorf("failed creating channel(s): %v", err) } - l.Info("Done.") - if err := Setup(cfg, ch, l); err != nil { + if err := Setup(cfg, cl, l); err != nil { return nil, err } return &Consumer{ Connection: conn, - Channel: ch, + Channels: cl, Queue: cfg.QueueName(), Tag: cfg.ConsumerTag(), Processor: p, @@ -63,46 +61,63 @@ func NewFromConfig(cfg Config, p processor.Processor, l logr.Logger) (*Consumer, // Consume subscribes itself to the message queue and starts consuming messages. func (c *Consumer) Consume(ctx context.Context) error { - c.Log.Info("Registering consumer... ") - msgs, err := c.Channel.Consume(c.Queue, c.Tag, false, false, false, false, nil) - if err != nil { - return fmt.Errorf("failed to register a consumer: %s", err) - } + remoteClose := make(chan *amqp.Error) + done := make(chan error) - c.Log.Info("Succeeded registering consumer.") - c.Log.Info("Waiting for messages...") + c.Log.Info("Registering channels... ") + for i, ch := range c.Channels.Channels() { + msgs, err := ch.Consume(c.Queue, c.Tag, false, false, false, false, nil) + if err != nil { + return fmt.Errorf("failed to register a channel: %s", err) + } - remoteClose := make(chan *amqp.Error) - c.Channel.NotifyClose(remoteClose) + c.Log.Infof("Succeeded registering channel %d.", i) - done := make(chan error) - go c.consume(msgs, done) + ch.NotifyClose(remoteClose) + + go c.consume(i, msgs, done) + } + + c.Log.Info("Waiting for messages...") select { case err := <-remoteClose: return err case <-ctx.Done(): - c.canceled = true - err := c.Channel.Cancel(c.Tag, false) - if err == nil { - err = <-done - } - return err + return c.Cancel(done) case err := <-done: return err } } -func (c *Consumer) consume(msgs <-chan amqp.Delivery, done chan error) { +// Cancel marks the Consumer as cancelled, and then cancels all the Channels. +func (c *Consumer) Cancel(done chan error) error { + c.canceled = true + var firstError error + for i, ch := range c.Channels.Channels() { + fmt.Printf("closing channel %d...", i) + err := ch.Cancel(c.Tag, false) + if err == nil { + err = <-done + } + if nil != err && nil == firstError { + firstError = err + } + } + + return firstError +} + +func (c *Consumer) consume(channel int, msgs <-chan amqp.Delivery, done chan error) { for m := range msgs { d := delivery.New(m) if c.canceled { d.Nack(true) continue } - if err := c.checkError(c.Processor.Process(d)); err != nil { + if err := c.checkError(c.Processor.Process(channel, d)); err != nil { done <- err return } diff --git a/consumer/consumer_consume_test.go b/consumer/consumer_consume_test.go index a263351..2e8a567 100644 --- a/consumer/consumer_consume_test.go +++ b/consumer/consumer_consume_test.go @@ -64,7 +64,9 @@ func newConsumeTest(name, output string, count uint64, cancelCount int, setup se func (ct *consumeTest) Run(t *testing.T) { exp := ct.Setup(t, ct) l := log.New(0) - c := consumer.New(nil, ct.ch, ct.p, l) + cl := &consumer.ChannelList{} + cl.AddChannel(ct.ch) + c := consumer.New(nil, cl, ct.p, l) c.Queue = t.Name() c.Tag = ct.Tag ctx, cancel := context.WithCancel(context.Background()) @@ -102,62 +104,62 @@ func (ct *consumeTest) produce(cancel func()) { var consumeTests = []*consumeTest{ newConsumeTest( "happy path", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\n", 3, intMax, func(t *testing.T, ct *consumeTest) error { ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable). Once(). Return(ct.msgs, nil) - ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(nil) - ct.p.On("Process", delivery.New(ct.dd[1])).Once().Return(nil) - ct.p.On("Process", delivery.New(ct.dd[2])).Once().Return(nil) + ct.p.On("Process", 0, delivery.New(ct.dd[0])).Once().Return(nil) + ct.p.On("Process", 0, delivery.New(ct.dd[1])).Once().Return(nil) + ct.p.On("Process", 0, delivery.New(ct.dd[2])).Once().Return(nil) return nil }, ), newSimpleConsumeTest( "consume error", - "INFO Registering consumer... \n", + "INFO Registering channels... \n", func(t *testing.T, ct *consumeTest) error { ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable). Once(). Return(nil, fmt.Errorf("consume error")) - return fmt.Errorf("failed to register a consumer: consume error") + return fmt.Errorf("failed to register a channel: consume error") }, ), newSimpleConsumeTest( "process error", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\n", func(t *testing.T, ct *consumeTest) error { err := fmt.Errorf("process error") ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable). Once(). Return(ct.msgs, nil) - ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(err) + ct.p.On("Process", 0, delivery.New(ct.dd[0])).Once().Return(err) return err }, ), newSimpleConsumeTest( "create command error", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\nERROR failed to register a consumer: create command error\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\nERROR failed to register a consumer: create command error\n", func(t *testing.T, ct *consumeTest) error { err := processor.NewCreateCommandError(fmt.Errorf("create command error")) ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable). Once(). Return(ct.msgs, nil) - ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(err) + ct.p.On("Process", 0, delivery.New(ct.dd[0])).Once().Return(err) return nil }, ), newSimpleConsumeTest( "ack error", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\n", func(t *testing.T, ct *consumeTest) error { err := processor.NewAcknowledgmentError(fmt.Errorf("ack error")) ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable). Once(). Return(ct.msgs, nil) - ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(err) + ct.p.On("Process", 0, delivery.New(ct.dd[0])).Once().Return(err) return err }, ), @@ -177,7 +179,9 @@ func TestConsumer_Consume_NotifyClose(t *testing.T) { ch.On("Consume", "", "", false, false, false, false, nilAmqpTable).Once().Return(d, nil) - c := consumer.New(nil, ch, new(TestProcessor), l) + cl := &consumer.ChannelList{} + cl.AddChannel(ch) + c := consumer.New(nil, cl, new(TestProcessor), l) go func() { done <- c.Consume(context.Background()) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 82a2ace..5f9868e 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -45,21 +45,24 @@ func testConsumerCancel(t *testing.T, err error) { close(msgs) }) ctx, cancel := context.WithCancel(context.Background()) - c := consumer.New(nil, ch, nil, log.New(0)) + cl := &consumer.ChannelList{} + cl.AddChannel(ch) + c := consumer.New(nil, cl, nil, log.New(0)) c.Queue = "queue" c.Tag = t.Name() go func() { done <- c.Consume(ctx) }() cancel() - assert.Equal(t, err, <-done) + cerr := <-done + assert.Equal(t, err, cerr) ch.AssertExpectations(t) } var cancelTests = []*consumeTest{ newConsumeTest( "skip remaining", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\n", 3, 1, func(t *testing.T, ct *consumeTest) error { @@ -67,7 +70,7 @@ var cancelTests = []*consumeTest{ Once(). Return(ct.msgs, nil) ct.ch.On("Cancel", ct.Tag, false).Return(nil) - ct.p.On("Process", delivery.New(ct.dd[0])).Return(nil).Run(func(_ mock.Arguments) { + ct.p.On("Process", 0, delivery.New(ct.dd[0])).Return(nil).Run(func(_ mock.Arguments) { ct.sync <- true <-ct.sync }) @@ -78,7 +81,7 @@ var cancelTests = []*consumeTest{ ), newConsumeTest( "no messages", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\n", 0, 0, func(t *testing.T, ct *consumeTest) error { diff --git a/consumer/mock_test.go b/consumer/mock_test.go index a2c1027..dd58723 100644 --- a/consumer/mock_test.go +++ b/consumer/mock_test.go @@ -104,8 +104,8 @@ type TestProcessor struct { mock.Mock } -func (p *TestProcessor) Process(d delivery.Delivery) error { - return p.Called(d).Error(0) +func (p *TestProcessor) Process(channel int, d delivery.Delivery) error { + return p.Called(channel, d).Error(0) } func (p *TestProcessor) Cancel() error { diff --git a/consumer/setup.go b/consumer/setup.go index 83a4dde..293ba93 100644 --- a/consumer/setup.go +++ b/consumer/setup.go @@ -7,8 +7,13 @@ import ( ) // Setup configures queues, exchanges and bindings in between according to the configuration. -func Setup(cfg Config, ch Channel, l logr.Logger) error { - if err := setupQoS(cfg, ch, l); err != nil { +func Setup(cfg Config, cl *ChannelList, l logr.Logger) error { + if err := setupQoS(cfg, cl, l); err != nil { + return err + } + + ch, err := cl.FirstChannel() + if nil != err { return err } @@ -28,10 +33,12 @@ func Setup(cfg Config, ch Channel, l logr.Logger) error { return nil } -func setupQoS(cfg Config, ch Channel, l logr.Logger) error { +func setupQoS(cfg Config, cl *ChannelList, l logr.Logger) error { l.Info("Setting QoS... ") - if err := ch.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal()); err != nil { - return fmt.Errorf("failed to set QoS: %v", err) + for _, ch := range cl.Channels() { + if err := ch.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal()); err != nil { + return fmt.Errorf("failed to set QoS: %v", err) + } } l.Info("Succeeded setting QoS.") return nil diff --git a/consumer/setup_test.go b/consumer/setup_test.go index f29a767..5d724a4 100644 --- a/consumer/setup_test.go +++ b/consumer/setup_test.go @@ -277,7 +277,9 @@ func TestQueueSettings(t *testing.T) { cfg, _ := config.LoadAndParse(fmt.Sprintf("fixtures/%s.conf", test.config)) ch := new(TestChannel) test.setup(ch) - assert.Equal(t, test.err, consumer.Setup(cfg, ch, log.New(0))) + cl := &consumer.ChannelList{} + cl.AddChannel(ch) + assert.Equal(t, test.err, consumer.Setup(cfg, cl, log.New(0))) ch.AssertExpectations(t) }) } diff --git a/go.sum b/go.sum index 93b9ac3..1e037a5 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,7 @@ bou.ke/monkey v1.0.1 h1:zEMLInw9xvNakzUUPjfS4Ds6jYPqCFx3m7bRmG5NH2U= bou.ke/monkey v1.0.1/go.mod h1:FgHuK96Rv2Nlf+0u1OOVDpCMdsWyOFmeeketDHE7LIg= github.com/AlekSi/gocoverutil v0.2.0 h1:lpfoGyib/qbTh7PajsBL6Upv3Fnn2o3A6Mn0naFR0E8= github.com/AlekSi/gocoverutil v0.2.0/go.mod h1:/SQ8potkEzPK7N0+EyZi8sPtf/nK3BnHjw7tVmlDdUs= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bketelsen/logr v0.0.0-20170116012416-f3d070bdd1c5 h1:k5oblHm+AdEGmDZA1NwxXjKuinRB6WsEuloKQ3i5u5g= github.com/bketelsen/logr v0.0.0-20170116012416-f3d070bdd1c5/go.mod h1:to4EbfYTEzdQuHxVGz1ug+d7a3bqOjR1r02gJ1Xv4z8= @@ -15,14 +16,19 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/sebdah/goldie v0.0.0-20190305024101-629351c67c53 h1:qR9Fm180+oJEk1tyg+3wysrby5LvT9Y2vD4uKdLZlYw= github.com/sebdah/goldie v0.0.0-20190305024101-629351c67c53/go.mod h1:lvjGftC8oe7XPtyrOidaMi0rp5B9+XY/ZRUynGnuaxQ= diff --git a/main.go b/main.go index 3fdf796..4875c4b 100644 --- a/main.go +++ b/main.go @@ -79,6 +79,11 @@ var flags []cli.Flag = []cli.Flag{ Name: "no-declare", Usage: "prevents the queue from being declared.", }, + cli.IntFlag{ + Name: "num-channels, n", + Value: 1, + Usage: "Specifies the number of channels to use on this connection.", + }, cli.BoolFlag{ Name: "metrics, m", Usage: "enables metric to be exposed.", @@ -311,6 +316,12 @@ func LoadConfiguration(c *cli.Context) (*config.Config, error) { cfg.QueueSettings.Nodeclare = c.Bool("no-declare") } + cfg.RabbitMq.NumChannels = c.Int("num-channels") + + if (cfg.RabbitMq.NumChannels < 1) { + cfg.RabbitMq.NumChannels = 1 + } + return cfg, nil } diff --git a/processor/fixtures/TestProcessor_Run/error.golden b/processor/fixtures/TestProcessor_Run/error.golden index 7cba1b7..2a7c107 100644 --- a/processor/fixtures/TestProcessor_Run/error.golden +++ b/processor/fixtures/TestProcessor_Run/error.golden @@ -1,6 +1,6 @@ -INFO Processing message... -INFO Failed. Check error log for details. -ERROR Error: exit status 1 -ERROR Failed: lorem +INFO [Channel 1] Processing message... +INFO [Channel 1] Failed. Check error log for details. +ERROR [Channel 1] Error: exit status 1 +ERROR [Channel 1] Failed: lorem ipsum -INFO Processed! +INFO [Channel 1] Processed! diff --git a/processor/fixtures/TestProcessor_Run/errorCapture.golden b/processor/fixtures/TestProcessor_Run/errorCapture.golden index 00f71a9..b0e7d92 100644 --- a/processor/fixtures/TestProcessor_Run/errorCapture.golden +++ b/processor/fixtures/TestProcessor_Run/errorCapture.golden @@ -1,4 +1,4 @@ -INFO Processing message... -INFO Failed. Check error log for details. -ERROR Error: exit status 1 -INFO Processed! +INFO [Channel 1] Processing message... +INFO [Channel 1] Failed. Check error log for details. +ERROR [Channel 1] Error: exit status 1 +INFO [Channel 1] Processed! diff --git a/processor/fixtures/TestProcessor_Run/success.golden b/processor/fixtures/TestProcessor_Run/success.golden index c911554..afecddd 100644 --- a/processor/fixtures/TestProcessor_Run/success.golden +++ b/processor/fixtures/TestProcessor_Run/success.golden @@ -1,2 +1,2 @@ -INFO Processing message... -INFO Processed! +INFO [Channel 1] Processing message... +INFO [Channel 1] Processed! diff --git a/processor/processor.go b/processor/processor.go index 2ccb197..b3be857 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -17,7 +17,7 @@ import ( // Processor describes the interface used by the consumer to process messages. type Processor interface { - Process(delivery.Delivery) error + Process(int, delivery.Delivery) error } // New creates a new processor instance. @@ -36,7 +36,7 @@ type processor struct { // Process creates a new exec command using the builder and executes the command. The message gets acknowledged // according to the commands exit code using the acknowledger. -func (p *processor) Process(d delivery.Delivery) error { +func (p *processor) Process(channel int, d delivery.Delivery) error { p.mu.Lock() defer p.mu.Unlock() @@ -52,7 +52,7 @@ func (p *processor) Process(d delivery.Delivery) error { } start := time.Now() - exitCode := p.run() + exitCode := p.run(channel) collector.ProcessCounter.With(prometheus.Labels{"exit_code": strconv.Itoa(exitCode)}).Inc() collector.ProcessDuration.Observe(time.Since(start).Seconds()) @@ -67,9 +67,9 @@ func (p *processor) Process(d delivery.Delivery) error { return nil } -func (p *processor) run() int { - p.log.Info("Processing message...") - defer p.log.Info("Processed!") +func (p *processor) run(channel int) int { + p.log.Infof("[Channel %d] Processing message...", channel) + defer p.log.Infof("[Channel %d] Processed!", channel) var out []byte var err error @@ -82,10 +82,10 @@ func (p *processor) run() int { } if err != nil { - p.log.Info("Failed. Check error log for details.") - p.log.Errorf("Error: %s\n", err) + p.log.Infof("[Channel %d] Failed. Check error log for details.", channel) + p.log.Errorf("[Channel %d] Error: %s\n", channel, err) if capture { - p.log.Errorf("Failed: %s", string(out)) + p.log.Errorf("[Channel %d] Failed: %s", channel, string(out)) } return exitCode(err) diff --git a/processor/processor_test.go b/processor/processor_test.go index 43cfd9f..48081d5 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -47,7 +47,7 @@ func TestProcessor_Run(t *testing.T) { l := log.New(0) p := processor{log: l, cmd: test.cmd} - assert.Equal(t, p.run(), test.code) + assert.Equal(t, p.run(1), test.code) goldie.Assert(t, t.Name(), l.Buf().Bytes()) }) } @@ -99,7 +99,7 @@ func testProcessing(t *testing.T, name string, setup func(t *testing.T, a *TestA d.On("Info").Return(di) exp := setup(t, a, b, d) - err := p.Process(d) + err := p.Process(1, d) if len(exp) > 0 { assert.Equal(t, exp, err.Error()) From 6203daa2a18524cbf4d3c92533387a5c7dd0f124 Mon Sep 17 00:00:00 2001 From: Brendan Clougherty Date: Mon, 12 Aug 2019 12:20:02 -0400 Subject: [PATCH 2/7] Fixing go fmt issue, cleaning up the num-channels config --- consumer/channel_list.go | 2 +- main.go | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/consumer/channel_list.go b/consumer/channel_list.go index a47c93e..c5ba55b 100644 --- a/consumer/channel_list.go +++ b/consumer/channel_list.go @@ -21,7 +21,7 @@ type ChannelList struct { // NewChannelList creates a new ChannelList with a single Channel func NewChannelList(conn Connection, len int, l logr.Logger) (*ChannelList, error) { - if (len < 1) { + if len < 1 { return nil, fmt.Errorf("cannot create a ChannelList with less than one channel (got %d)", len) } diff --git a/main.go b/main.go index 4875c4b..8785e25 100644 --- a/main.go +++ b/main.go @@ -316,10 +316,8 @@ func LoadConfiguration(c *cli.Context) (*config.Config, error) { cfg.QueueSettings.Nodeclare = c.Bool("no-declare") } - cfg.RabbitMq.NumChannels = c.Int("num-channels") - - if (cfg.RabbitMq.NumChannels < 1) { - cfg.RabbitMq.NumChannels = 1 + if c.IsSet("num-channels") { + cfg.RabbitMq.NumChannels = c.Int("num-channels") } return cfg, nil From 7aedfe575f75ddc18b328752092675f50aa09a65 Mon Sep 17 00:00:00 2001 From: Brendan Clougherty Date: Tue, 13 Aug 2019 11:51:27 -0400 Subject: [PATCH 3/7] Using ChannelMultiplexer interface instead of ChannelList struct --- consumer/channel_list.go | 4 ++-- consumer/consumer.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/consumer/channel_list.go b/consumer/channel_list.go index c5ba55b..881ed7a 100644 --- a/consumer/channel_list.go +++ b/consumer/channel_list.go @@ -9,9 +9,9 @@ import ( // ChannelMultiplexer describes an object that holds multiple Channels type ChannelMultiplexer interface { - AddChannel() + AddChannel(Channel) Channels() []Channel - FirstChannel() Channel + FirstChannel() (Channel, error) } // ChannelList is an implementation of the ChannelMultiplexer interface. diff --git a/consumer/consumer.go b/consumer/consumer.go index 4346766..14f623c 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -11,7 +11,7 @@ import ( type Consumer struct { Connection Connection - Channels *ChannelList + Channels ChannelMultiplexer Queue string Tag string Processor processor.Processor @@ -21,10 +21,10 @@ type Consumer struct { // New creates a new consumer instance. The setup of the amqp connection and channel is expected to be done by the // calling code. -func New(conn Connection, chs *ChannelList, p processor.Processor, l logr.Logger) *Consumer { +func New(conn Connection, cm ChannelMultiplexer, p processor.Processor, l logr.Logger) *Consumer { return &Consumer{ Connection: conn, - Channels: chs, + Channels: cm, Processor: p, Log: l, } From 12642a5b8b21fa0a240789684a27ef7c9ec53eab Mon Sep 17 00:00:00 2001 From: Brendan Clougherty Date: Tue, 13 Aug 2019 11:54:57 -0400 Subject: [PATCH 4/7] Implementing feedback from PR review --- consumer/channel_list.go | 19 ++++++++--- consumer/channel_list_test.go | 60 +++++++++++++++++++++++++++++++++++ consumer/consumer.go | 15 +++++++-- consumer/consumer_test.go | 4 +-- consumer/setup.go | 22 +------------ consumer/setup_test.go | 43 +------------------------ 6 files changed, 91 insertions(+), 72 deletions(-) create mode 100644 consumer/channel_list_test.go diff --git a/consumer/channel_list.go b/consumer/channel_list.go index 881ed7a..f81fea7 100644 --- a/consumer/channel_list.go +++ b/consumer/channel_list.go @@ -12,6 +12,7 @@ type ChannelMultiplexer interface { AddChannel(Channel) Channels() []Channel FirstChannel() (Channel, error) + Qos(prefetchCount, prefetchSize int, global bool) error } // ChannelList is an implementation of the ChannelMultiplexer interface. @@ -52,12 +53,20 @@ func (cl *ChannelList) Channels() []Channel { // FirstChannel is a convenience function to get the first Channel in this ChannelList. func (cl *ChannelList) FirstChannel() (Channel, error) { - var ch Channel - chs := cl.Channels() - if len(chs) < 1 { + if len(cl.channels) < 1 { + var ch Channel return ch, errors.New("tried to get the first channel from an uninitialized ChannelList") } - ch = cl.channels[0] - return ch, nil + return cl.channels[0], nil +} + +// Qos sets Qos settings for all Channels in this ChannelList. +func (cl *ChannelList) Qos(prefetchCount, prefetchSize int, global bool) error { + for i, ch := range cl.channels { + if err := ch.Qos(prefetchCount, prefetchSize, global); err != nil { + return fmt.Errorf("failed to set QoS on channel %d: %v", i, err) + } + } + return nil } diff --git a/consumer/channel_list_test.go b/consumer/channel_list_test.go new file mode 100644 index 0000000..16bdc81 --- /dev/null +++ b/consumer/channel_list_test.go @@ -0,0 +1,60 @@ +package consumer_test + +import ( + "fmt" + "testing" + + "github.com/corvus-ch/rabbitmq-cli-consumer/config" + "github.com/corvus-ch/rabbitmq-cli-consumer/consumer" + "github.com/stretchr/testify/assert" +) + +const qosConfig = "qos" + +var channelListTests = []struct { + name string + config string + setup func(*consumer.ChannelList) + err error +}{ + // Set QoS. + { + "setQos", + qosConfig, + func(cl *consumer.ChannelList) { + for _, ch := range cl.Channels() { + tc := ch.(*TestChannel) + tc.On("Qos", 42, 0, true).Return(nil).Once() + } + }, + nil, + }, + // Set QoS fails. + { + "setQosFail", + qosConfig, + func(cl *consumer.ChannelList) { + for _, ch := range cl.Channels() { + tc := ch.(*TestChannel) + tc.On("Qos", 42, 0, true).Return(fmt.Errorf("QoS error")).Once() + } + }, + fmt.Errorf("failed to set QoS on channel 0: QoS error"), + }, +} + +func TestChannelList(t *testing.T) { + for _, test := range channelListTests { + t.Run(test.name, func(t *testing.T) { + cfg, _ := config.LoadAndParse(fmt.Sprintf("fixtures/%s.conf", test.config)) + cl := &consumer.ChannelList{} + cl.AddChannel(new(TestChannel)) + test.setup(cl) + assert.Equal(t, test.err, cl.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal())) + for _, ch := range cl.Channels() { + tc := ch.(*TestChannel) + tc.AssertExpectations(t) + } + }) + } +} diff --git a/consumer/consumer.go b/consumer/consumer.go index 14f623c..512e7bc 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -45,7 +45,18 @@ func NewFromConfig(cfg Config, p processor.Processor, l logr.Logger) (*Consumer, return nil, fmt.Errorf("failed creating channel(s): %v", err) } - if err := Setup(cfg, cl, l); err != nil { + l.Info("Setting QoS... ") + if err := cl.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal()); err != nil { + return nil, err + } + l.Info("Succeeded setting QoS.") + + ch, err := cl.FirstChannel() + if nil != err { + return nil, err + } + + if err := Setup(cfg, ch, l); err != nil { return nil, err } @@ -97,7 +108,7 @@ func (c *Consumer) Cancel(done chan error) error { c.canceled = true var firstError error for i, ch := range c.Channels.Channels() { - fmt.Printf("closing channel %d...", i) + c.Log.Infof("closing channel %d...", i) err := ch.Cancel(c.Tag, false) if err == nil { err = <-done diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 5f9868e..2feb712 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -62,7 +62,7 @@ func testConsumerCancel(t *testing.T, err error) { var cancelTests = []*consumeTest{ newConsumeTest( "skip remaining", - "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\nINFO closing channel 0...\n", 3, 1, func(t *testing.T, ct *consumeTest) error { @@ -81,7 +81,7 @@ var cancelTests = []*consumeTest{ ), newConsumeTest( "no messages", - "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\nINFO closing channel 0...\n", 0, 0, func(t *testing.T, ct *consumeTest) error { diff --git a/consumer/setup.go b/consumer/setup.go index 293ba93..97e3de8 100644 --- a/consumer/setup.go +++ b/consumer/setup.go @@ -7,16 +7,7 @@ import ( ) // Setup configures queues, exchanges and bindings in between according to the configuration. -func Setup(cfg Config, cl *ChannelList, l logr.Logger) error { - if err := setupQoS(cfg, cl, l); err != nil { - return err - } - - ch, err := cl.FirstChannel() - if nil != err { - return err - } - +func Setup(cfg Config, ch Channel, l logr.Logger) error { if cfg.MustDeclareQueue() { if err := declareQueue(cfg, ch, l); err != nil { return err @@ -33,17 +24,6 @@ func Setup(cfg Config, cl *ChannelList, l logr.Logger) error { return nil } -func setupQoS(cfg Config, cl *ChannelList, l logr.Logger) error { - l.Info("Setting QoS... ") - for _, ch := range cl.Channels() { - if err := ch.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal()); err != nil { - return fmt.Errorf("failed to set QoS: %v", err) - } - } - l.Info("Succeeded setting QoS.") - return nil -} - func declareQueue(cfg Config, ch Channel, l logr.Logger) error { l.Infof("Declaring queue \"%s\"...", cfg.QueueName()) _, err := ch.QueueDeclare( diff --git a/consumer/setup_test.go b/consumer/setup_test.go index 5d724a4..809ff95 100644 --- a/consumer/setup_test.go +++ b/consumer/setup_test.go @@ -19,7 +19,6 @@ const ( noRoutingKeyConfig = "no_routing" oneEmptyRoutingKeyConfig = "empty_routing" priorityConfig = "priority" - qosConfig = "qos" routingConfig = "routing" simpleExchangeConfig = "exchange" ttlConfig = "ttl" @@ -45,7 +44,6 @@ var queueTests = []struct { "simpleQueue", defaultConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "defaultQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -55,7 +53,6 @@ var queueTests = []struct { "queueWithTTL", ttlConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "ttlQueue", true, false, false, false, amqp.Table{"x-message-ttl": int32(1200)}).Return(amqp.Queue{}, nil).Once() }, nil, @@ -65,7 +62,6 @@ var queueTests = []struct { "queueWithPriority", priorityConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "priorityWorker", true, false, false, false, amqp.Table{"x-max-priority": int32(42)}).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "priorityExchange", "priorityType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "priorityWorker", "", "priorityExchange", false, nilAmqpTable).Return(nil).Once() @@ -77,7 +73,6 @@ var queueTests = []struct { "queueWithMultipleRoutingKeys", multipleRoutingKeysConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "multiRoutingQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "multiRoutingExchange", "multiRoutingType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "multiRoutingQueue", "foo", "multiRoutingExchange", false, nilAmqpTable).Return(nil).Once() @@ -90,7 +85,6 @@ var queueTests = []struct { "queueWithOneEmptyRoutingKey", oneEmptyRoutingKeyConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "emptyRoutingQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "emptyRoutingExchange", "emptyRoutingType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "emptyRoutingQueue", "", "emptyRoutingExchange", false, nilAmqpTable).Return(nil).Once() @@ -102,38 +96,17 @@ var queueTests = []struct { "queueWithoutRoutingKey", noRoutingKeyConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "noRoutingQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "noRoutingExchange", "noRoutingType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "noRoutingQueue", "", "noRoutingExchange", false, nilAmqpTable).Return(nil).Once() }, nil, }, - // Set QoS. - { - "setQos", - qosConfig, - func(ch *TestChannel) { - ch.On("Qos", 42, 0, true).Return(nil).Once() - ch.On("QueueDeclare", "qosQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() - }, - nil, - }, - // Set QoS fails. - { - "setQosFail", - qosConfig, - func(ch *TestChannel) { - ch.On("Qos", 42, 0, true).Return(fmt.Errorf("QoS error")).Once() - }, - fmt.Errorf("failed to set QoS: QoS error"), - }, // Declare queue fails. { "declareQueueFail", defaultConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "defaultQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, fmt.Errorf("queue error")).Once() }, fmt.Errorf("failed to declare queue: queue error"), @@ -143,7 +116,6 @@ var queueTests = []struct { "declareExchange", simpleExchangeConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "queueName", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "exchangeName", "exchangeType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "queueName", "", "exchangeName", false, nilAmqpTable).Return(nil).Once() @@ -155,7 +127,6 @@ var queueTests = []struct { "declareDurableExchange", durableExchangeConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "queueName", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "exchangeName", "exchangeType", true, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "queueName", "", "exchangeName", false, nilAmqpTable).Return(nil).Once() @@ -167,7 +138,6 @@ var queueTests = []struct { "declareAutoDeleteExchange", autodeleteExchangeConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "queueName", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "exchangeName", "exchangeType", false, true, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "queueName", "", "exchangeName", false, nilAmqpTable).Return(nil).Once() @@ -179,7 +149,6 @@ var queueTests = []struct { "declareExchangeFail", simpleExchangeConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "queueName", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "exchangeName", "exchangeType", false, false, false, false, emptyAmqpTable).Return(fmt.Errorf("declare exchagne error")).Once() }, @@ -190,7 +159,6 @@ var queueTests = []struct { "bindQueue", routingConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "routingQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "routingExchange", "routingType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "routingQueue", "routingKey", "routingExchange", false, nilAmqpTable).Return(nil).Once() @@ -202,7 +170,6 @@ var queueTests = []struct { "bindQueueFail", routingConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "routingQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "routingExchange", "routingType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "routingQueue", "routingKey", "routingExchange", false, nilAmqpTable).Return(fmt.Errorf("queue bind error")).Once() @@ -214,7 +181,6 @@ var queueTests = []struct { "durableQueue", durableQueue, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "durableQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -224,7 +190,6 @@ var queueTests = []struct { "nonDurableQueue", nonDurableQueue, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "nonDurableQueue", false, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -234,7 +199,6 @@ var queueTests = []struct { "defaultQueueDurability", defaultQueueDurability, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "defaultQueueDurability", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -244,7 +208,6 @@ var queueTests = []struct { "autoDeleteQueue", autoDeleteQueue, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "autoDeleteQueue", true, true, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -254,7 +217,6 @@ var queueTests = []struct { "exclusiveQueue", exclusiveQueue, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "exclusiveQueue", true, false, true, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -264,7 +226,6 @@ var queueTests = []struct { "noWaitQueue", noWaitQueue, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "noWaitQueue", true, false, false, true, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -277,9 +238,7 @@ func TestQueueSettings(t *testing.T) { cfg, _ := config.LoadAndParse(fmt.Sprintf("fixtures/%s.conf", test.config)) ch := new(TestChannel) test.setup(ch) - cl := &consumer.ChannelList{} - cl.AddChannel(ch) - assert.Equal(t, test.err, consumer.Setup(cfg, cl, log.New(0))) + assert.Equal(t, test.err, consumer.Setup(cfg, ch, log.New(0))) ch.AssertExpectations(t) }) } From 410ed7e47416c7496e7789f4d442932faf254747 Mon Sep 17 00:00:00 2001 From: Brendan Clougherty Date: Wed, 14 Aug 2019 15:53:50 -0400 Subject: [PATCH 5/7] Setting a default num-channels (1) when reading from a config file --- config/config.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/config/config.go b/config/config.go index 95fb1d0..32fc338 100644 --- a/config/config.go +++ b/config/config.go @@ -251,6 +251,7 @@ func LoadAndParse(location string) (*Config, error) { cfg := &Config{} SetDefaultQueueDurability(cfg) + SetDefaultNumChannels(cfg) if err := gcfg.ReadFileInto(cfg, location); err != nil { return nil, err @@ -263,6 +264,7 @@ func CreateFromString(data string) (*Config, error) { cfg := &Config{} SetDefaultQueueDurability(cfg) + SetDefaultNumChannels(cfg) if err := gcfg.ReadStringInto(cfg, data); err != nil { return nil, err @@ -276,6 +278,11 @@ func SetDefaultQueueDurability(cfg *Config) { cfg.QueueSettings.Durable = true } +// SetDefaultNumChannels sets NumChannels to 1 to keep backwards compatibility +func SetDefaultNumChannels(cfg *Config) { + cfg.RabbitMq.NumChannels = 1 +} + func transformToStringValue(val string) string { if val == "" { return "" From 52a4b290e20f06d538a3236857d2810cb67decf8 Mon Sep 17 00:00:00 2001 From: Brendan Clougherty Date: Wed, 14 Aug 2019 15:54:20 -0400 Subject: [PATCH 6/7] Updating integration test golden files for new log output --- fixtures/TestEndToEnd/amqpUrlOutput.golden | 11 ++++++----- fixtures/TestEndToEnd/compressedOutput.golden | 11 ++++++----- fixtures/TestEndToEnd/declareErrorOutput.golden | 3 ++- fixtures/TestEndToEnd/defaultOutput.golden | 11 ++++++----- fixtures/TestEndToEnd/envAmqpUrlNoConfigOutput.golden | 11 ++++++----- fixtures/TestEndToEnd/envAmqpUrlOutput.golden | 11 ++++++----- fixtures/TestEndToEnd/noAmqpUrlOutput.golden | 11 ++++++----- fixtures/TestEndToEnd/noDeclareConfigOutput.golden | 7 ++++--- fixtures/TestEndToEnd/noDeclareOutput.golden | 7 ++++--- fixtures/TestEndToEnd/noLogsOutput.golden | 11 ++++++----- fixtures/TestEndToEnd/outputOutput.golden | 11 ++++++----- fixtures/TestEndToEnd/pipeOutput.golden | 11 ++++++----- fixtures/TestEndToEnd/propertiesOutput.golden | 11 ++++++----- fixtures/TestEndToEnd/queueNameOutput.golden | 11 ++++++----- 14 files changed, 76 insertions(+), 62 deletions(-) diff --git a/fixtures/TestEndToEnd/amqpUrlOutput.golden b/fixtures/TestEndToEnd/amqpUrlOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/amqpUrlOutput.golden +++ b/fixtures/TestEndToEnd/amqpUrlOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/compressedOutput.golden b/fixtures/TestEndToEnd/compressedOutput.golden index 6c23959..6b09ab3 100644 --- a/fixtures/TestEndToEnd/compressedOutput.golden +++ b/fixtures/TestEndToEnd/compressedOutput.golden @@ -1,14 +1,15 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... Compressed message -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/declareErrorOutput.golden b/fixtures/TestEndToEnd/declareErrorOutput.golden index a2de418..0c2e798 100644 --- a/fixtures/TestEndToEnd/declareErrorOutput.golden +++ b/fixtures/TestEndToEnd/declareErrorOutput.golden @@ -1,6 +1,7 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. diff --git a/fixtures/TestEndToEnd/defaultOutput.golden b/fixtures/TestEndToEnd/defaultOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/defaultOutput.golden +++ b/fixtures/TestEndToEnd/defaultOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/envAmqpUrlNoConfigOutput.golden b/fixtures/TestEndToEnd/envAmqpUrlNoConfigOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/envAmqpUrlNoConfigOutput.golden +++ b/fixtures/TestEndToEnd/envAmqpUrlNoConfigOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/envAmqpUrlOutput.golden b/fixtures/TestEndToEnd/envAmqpUrlOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/envAmqpUrlOutput.golden +++ b/fixtures/TestEndToEnd/envAmqpUrlOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/noAmqpUrlOutput.golden b/fixtures/TestEndToEnd/noAmqpUrlOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/noAmqpUrlOutput.golden +++ b/fixtures/TestEndToEnd/noAmqpUrlOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/noDeclareConfigOutput.golden b/fixtures/TestEndToEnd/noDeclareConfigOutput.golden index d6525eb..9864423 100644 --- a/fixtures/TestEndToEnd/noDeclareConfigOutput.golden +++ b/fixtures/TestEndToEnd/noDeclareConfigOutput.golden @@ -1,10 +1,11 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... diff --git a/fixtures/TestEndToEnd/noDeclareOutput.golden b/fixtures/TestEndToEnd/noDeclareOutput.golden index d6525eb..9864423 100644 --- a/fixtures/TestEndToEnd/noDeclareOutput.golden +++ b/fixtures/TestEndToEnd/noDeclareOutput.golden @@ -1,10 +1,11 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... diff --git a/fixtures/TestEndToEnd/noLogsOutput.golden b/fixtures/TestEndToEnd/noLogsOutput.golden index 8007438..d223043 100644 --- a/fixtures/TestEndToEnd/noLogsOutput.golden +++ b/fixtures/TestEndToEnd/noLogsOutput.golden @@ -1,16 +1,17 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... +[Channel 0] Processing message... Got executed bm9Mb2dz noLogs -Processed! +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/outputOutput.golden b/fixtures/TestEndToEnd/outputOutput.golden index 347f528..7777fbf 100644 --- a/fixtures/TestEndToEnd/outputOutput.golden +++ b/fixtures/TestEndToEnd/outputOutput.golden @@ -1,16 +1,17 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... +[Channel 0] Processing message... Got executed b3V0cHV0 output -Processed! +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/pipeOutput.golden b/fixtures/TestEndToEnd/pipeOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/pipeOutput.golden +++ b/fixtures/TestEndToEnd/pipeOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/propertiesOutput.golden b/fixtures/TestEndToEnd/propertiesOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/propertiesOutput.golden +++ b/fixtures/TestEndToEnd/propertiesOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/queueNameOutput.golden b/fixtures/TestEndToEnd/queueNameOutput.golden index ff3e47e..25224c4 100644 --- a/fixtures/TestEndToEnd/queueNameOutput.golden +++ b/fixtures/TestEndToEnd/queueNameOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "altTest"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! From 26c57d781379bf9ddcb59a9224488892c295c0dc Mon Sep 17 00:00:00 2001 From: Brendan Clougherty Date: Thu, 22 Aug 2019 12:32:57 -0400 Subject: [PATCH 7/7] Enabling parallel processing of worker commands --- processor/processor.go | 18 +++++++----------- processor/processor_test.go | 4 ++-- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index b3be857..f396474 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -31,20 +31,16 @@ type processor struct { ack acknowledger.Acknowledger log logr.Logger mu sync.Mutex - cmd *exec.Cmd } // Process creates a new exec command using the builder and executes the command. The message gets acknowledged // according to the commands exit code using the acknowledger. func (p *processor) Process(channel int, d delivery.Delivery) error { - p.mu.Lock() - defer p.mu.Unlock() - var err error - p.cmd, err = p.builder.GetCommand(d.Properties(), d.Info(), d.Body()) + cmd, err := p.builder.GetCommand(d.Properties(), d.Info(), d.Body()) defer func() { - p.cmd = nil + cmd = nil }() if err != nil { d.Nack(true) @@ -52,7 +48,7 @@ func (p *processor) Process(channel int, d delivery.Delivery) error { } start := time.Now() - exitCode := p.run(channel) + exitCode := p.run(cmd, channel) collector.ProcessCounter.With(prometheus.Labels{"exit_code": strconv.Itoa(exitCode)}).Inc() collector.ProcessDuration.Observe(time.Since(start).Seconds()) @@ -67,18 +63,18 @@ func (p *processor) Process(channel int, d delivery.Delivery) error { return nil } -func (p *processor) run(channel int) int { +func (p *processor) run(cmd *exec.Cmd, channel int) int { p.log.Infof("[Channel %d] Processing message...", channel) defer p.log.Infof("[Channel %d] Processed!", channel) var out []byte var err error - capture := p.cmd.Stdout == nil && p.cmd.Stderr == nil + capture := cmd.Stdout == nil && cmd.Stderr == nil if capture { - out, err = p.cmd.CombinedOutput() + out, err = cmd.CombinedOutput() } else { - err = p.cmd.Run() + err = cmd.Run() } if err != nil { diff --git a/processor/processor_test.go b/processor/processor_test.go index 48081d5..a33ab9a 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -45,9 +45,9 @@ func TestProcessor_Run(t *testing.T) { for _, test := range execCommandRunTests { t.Run(test.name, func(t *testing.T) { l := log.New(0) - p := processor{log: l, cmd: test.cmd} + p := processor{log: l} - assert.Equal(t, p.run(1), test.code) + assert.Equal(t, p.run(test.cmd, 1), test.code) goldie.Assert(t, t.Name(), l.Buf().Bytes()) }) }