diff --git a/config/config.go b/config/config.go index 40dbe29..32fc338 100644 --- a/config/config.go +++ b/config/config.go @@ -21,6 +21,7 @@ type Config struct { Compression bool Onfailure int Stricfailure bool + NumChannels int } Prefetch struct { Count int @@ -217,6 +218,11 @@ func (c Config) QueueIsNoWait() bool { return c.QueueSettings.NoWait } +// NumChannels determines how many Channels to open on this Connection +func (c Config) NumChannels() int { + return c.RabbitMq.NumChannels +} + // ConsumerTag returns the tag used to identify the consumer. func (c Config) ConsumerTag() string { if v, set := os.LookupEnv("GO_WANT_HELPER_PROCESS"); set && v == "1" { @@ -245,6 +251,7 @@ func LoadAndParse(location string) (*Config, error) { cfg := &Config{} SetDefaultQueueDurability(cfg) + SetDefaultNumChannels(cfg) if err := gcfg.ReadFileInto(cfg, location); err != nil { return nil, err @@ -257,6 +264,7 @@ func CreateFromString(data string) (*Config, error) { cfg := &Config{} SetDefaultQueueDurability(cfg) + SetDefaultNumChannels(cfg) if err := gcfg.ReadStringInto(cfg, data); err != nil { return nil, err @@ -270,6 +278,11 @@ func SetDefaultQueueDurability(cfg *Config) { cfg.QueueSettings.Durable = true } +// SetDefaultNumChannels sets NumChannels to 1 to keep backwards compatibility +func SetDefaultNumChannels(cfg *Config) { + cfg.RabbitMq.NumChannels = 1 +} + func transformToStringValue(val string) string { if val == "" { return "" diff --git a/consumer/channel_list.go b/consumer/channel_list.go new file mode 100644 index 0000000..f81fea7 --- /dev/null +++ b/consumer/channel_list.go @@ -0,0 +1,72 @@ +package consumer + +import ( + "errors" + "fmt" + + "github.com/bketelsen/logr" +) + +// ChannelMultiplexer describes an object that holds multiple Channels +type ChannelMultiplexer interface { + AddChannel(Channel) + Channels() []Channel + FirstChannel() (Channel, error) + Qos(prefetchCount, prefetchSize int, global bool) error +} + +// ChannelList is an implementation of the ChannelMultiplexer interface. +type ChannelList struct { + channels []Channel +} + +// NewChannelList creates a new ChannelList with a single Channel +func NewChannelList(conn Connection, len int, l logr.Logger) (*ChannelList, error) { + if len < 1 { + return nil, fmt.Errorf("cannot create a ChannelList with less than one channel (got %d)", len) + } + + chs := make([]Channel, 0) + l.Infof("Opening %d channel(s)...", len) + for i := 0; i < len; i++ { + ch, err := conn.Channel() + if nil != err { + return nil, fmt.Errorf("failed to open a channel: %v", err) + } + l.Infof("Opened channel %d...", i) + chs = append(chs, ch) + } + l.Info("Done.") + + return &ChannelList{channels: chs}, nil +} + +// AddChannel adds the given channel to the end of the ChannelList +func (cl *ChannelList) AddChannel(c Channel) { + cl.channels = append(cl.channels, c) +} + +// Channels returns a slice of Channels represented by this ChannelList. +func (cl *ChannelList) Channels() []Channel { + return cl.channels +} + +// FirstChannel is a convenience function to get the first Channel in this ChannelList. +func (cl *ChannelList) FirstChannel() (Channel, error) { + if len(cl.channels) < 1 { + var ch Channel + return ch, errors.New("tried to get the first channel from an uninitialized ChannelList") + } + + return cl.channels[0], nil +} + +// Qos sets Qos settings for all Channels in this ChannelList. +func (cl *ChannelList) Qos(prefetchCount, prefetchSize int, global bool) error { + for i, ch := range cl.channels { + if err := ch.Qos(prefetchCount, prefetchSize, global); err != nil { + return fmt.Errorf("failed to set QoS on channel %d: %v", i, err) + } + } + return nil +} diff --git a/consumer/channel_list_test.go b/consumer/channel_list_test.go new file mode 100644 index 0000000..16bdc81 --- /dev/null +++ b/consumer/channel_list_test.go @@ -0,0 +1,60 @@ +package consumer_test + +import ( + "fmt" + "testing" + + "github.com/corvus-ch/rabbitmq-cli-consumer/config" + "github.com/corvus-ch/rabbitmq-cli-consumer/consumer" + "github.com/stretchr/testify/assert" +) + +const qosConfig = "qos" + +var channelListTests = []struct { + name string + config string + setup func(*consumer.ChannelList) + err error +}{ + // Set QoS. + { + "setQos", + qosConfig, + func(cl *consumer.ChannelList) { + for _, ch := range cl.Channels() { + tc := ch.(*TestChannel) + tc.On("Qos", 42, 0, true).Return(nil).Once() + } + }, + nil, + }, + // Set QoS fails. + { + "setQosFail", + qosConfig, + func(cl *consumer.ChannelList) { + for _, ch := range cl.Channels() { + tc := ch.(*TestChannel) + tc.On("Qos", 42, 0, true).Return(fmt.Errorf("QoS error")).Once() + } + }, + fmt.Errorf("failed to set QoS on channel 0: QoS error"), + }, +} + +func TestChannelList(t *testing.T) { + for _, test := range channelListTests { + t.Run(test.name, func(t *testing.T) { + cfg, _ := config.LoadAndParse(fmt.Sprintf("fixtures/%s.conf", test.config)) + cl := &consumer.ChannelList{} + cl.AddChannel(new(TestChannel)) + test.setup(cl) + assert.Equal(t, test.err, cl.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal())) + for _, ch := range cl.Channels() { + tc := ch.(*TestChannel) + tc.AssertExpectations(t) + } + }) + } +} diff --git a/consumer/config.go b/consumer/config.go index f982f63..8981fc1 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -17,6 +17,7 @@ type Config interface { HasPriority() bool MessageTTL() int32 MustDeclareQueue() bool + NumChannels() int PrefetchCount() int PrefetchIsGlobal() bool Priority() int32 diff --git a/consumer/consumer.go b/consumer/consumer.go index ad78c56..512e7bc 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -11,7 +11,7 @@ import ( type Consumer struct { Connection Connection - Channel Channel + Channels ChannelMultiplexer Queue string Tag string Processor processor.Processor @@ -21,10 +21,10 @@ type Consumer struct { // New creates a new consumer instance. The setup of the amqp connection and channel is expected to be done by the // calling code. -func New(conn Connection, ch Channel, p processor.Processor, l logr.Logger) *Consumer { +func New(conn Connection, cm ChannelMultiplexer, p processor.Processor, l logr.Logger) *Consumer { return &Consumer{ Connection: conn, - Channel: ch, + Channels: cm, Processor: p, Log: l, } @@ -40,12 +40,21 @@ func NewFromConfig(cfg Config, p processor.Processor, l logr.Logger) (*Consumer, } l.Info("Connected.") - l.Info("Opening channel...") - ch, err := conn.Channel() + cl, err := NewChannelList(conn, cfg.NumChannels(), l) if nil != err { - return nil, fmt.Errorf("failed to open a channel: %v", err) + return nil, fmt.Errorf("failed creating channel(s): %v", err) + } + + l.Info("Setting QoS... ") + if err := cl.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal()); err != nil { + return nil, err + } + l.Info("Succeeded setting QoS.") + + ch, err := cl.FirstChannel() + if nil != err { + return nil, err } - l.Info("Done.") if err := Setup(cfg, ch, l); err != nil { return nil, err @@ -53,7 +62,7 @@ func NewFromConfig(cfg Config, p processor.Processor, l logr.Logger) (*Consumer, return &Consumer{ Connection: conn, - Channel: ch, + Channels: cl, Queue: cfg.QueueName(), Tag: cfg.ConsumerTag(), Processor: p, @@ -63,46 +72,63 @@ func NewFromConfig(cfg Config, p processor.Processor, l logr.Logger) (*Consumer, // Consume subscribes itself to the message queue and starts consuming messages. func (c *Consumer) Consume(ctx context.Context) error { - c.Log.Info("Registering consumer... ") - msgs, err := c.Channel.Consume(c.Queue, c.Tag, false, false, false, false, nil) - if err != nil { - return fmt.Errorf("failed to register a consumer: %s", err) - } + remoteClose := make(chan *amqp.Error) + done := make(chan error) - c.Log.Info("Succeeded registering consumer.") - c.Log.Info("Waiting for messages...") + c.Log.Info("Registering channels... ") + for i, ch := range c.Channels.Channels() { + msgs, err := ch.Consume(c.Queue, c.Tag, false, false, false, false, nil) + if err != nil { + return fmt.Errorf("failed to register a channel: %s", err) + } - remoteClose := make(chan *amqp.Error) - c.Channel.NotifyClose(remoteClose) + c.Log.Infof("Succeeded registering channel %d.", i) - done := make(chan error) - go c.consume(msgs, done) + ch.NotifyClose(remoteClose) + + go c.consume(i, msgs, done) + } + + c.Log.Info("Waiting for messages...") select { case err := <-remoteClose: return err case <-ctx.Done(): - c.canceled = true - err := c.Channel.Cancel(c.Tag, false) - if err == nil { - err = <-done - } - return err + return c.Cancel(done) case err := <-done: return err } } -func (c *Consumer) consume(msgs <-chan amqp.Delivery, done chan error) { +// Cancel marks the Consumer as cancelled, and then cancels all the Channels. +func (c *Consumer) Cancel(done chan error) error { + c.canceled = true + var firstError error + for i, ch := range c.Channels.Channels() { + c.Log.Infof("closing channel %d...", i) + err := ch.Cancel(c.Tag, false) + if err == nil { + err = <-done + } + if nil != err && nil == firstError { + firstError = err + } + } + + return firstError +} + +func (c *Consumer) consume(channel int, msgs <-chan amqp.Delivery, done chan error) { for m := range msgs { d := delivery.New(m) if c.canceled { d.Nack(true) continue } - if err := c.checkError(c.Processor.Process(d)); err != nil { + if err := c.checkError(c.Processor.Process(channel, d)); err != nil { done <- err return } diff --git a/consumer/consumer_consume_test.go b/consumer/consumer_consume_test.go index a263351..2e8a567 100644 --- a/consumer/consumer_consume_test.go +++ b/consumer/consumer_consume_test.go @@ -64,7 +64,9 @@ func newConsumeTest(name, output string, count uint64, cancelCount int, setup se func (ct *consumeTest) Run(t *testing.T) { exp := ct.Setup(t, ct) l := log.New(0) - c := consumer.New(nil, ct.ch, ct.p, l) + cl := &consumer.ChannelList{} + cl.AddChannel(ct.ch) + c := consumer.New(nil, cl, ct.p, l) c.Queue = t.Name() c.Tag = ct.Tag ctx, cancel := context.WithCancel(context.Background()) @@ -102,62 +104,62 @@ func (ct *consumeTest) produce(cancel func()) { var consumeTests = []*consumeTest{ newConsumeTest( "happy path", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\n", 3, intMax, func(t *testing.T, ct *consumeTest) error { ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable). Once(). Return(ct.msgs, nil) - ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(nil) - ct.p.On("Process", delivery.New(ct.dd[1])).Once().Return(nil) - ct.p.On("Process", delivery.New(ct.dd[2])).Once().Return(nil) + ct.p.On("Process", 0, delivery.New(ct.dd[0])).Once().Return(nil) + ct.p.On("Process", 0, delivery.New(ct.dd[1])).Once().Return(nil) + ct.p.On("Process", 0, delivery.New(ct.dd[2])).Once().Return(nil) return nil }, ), newSimpleConsumeTest( "consume error", - "INFO Registering consumer... \n", + "INFO Registering channels... \n", func(t *testing.T, ct *consumeTest) error { ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable). Once(). Return(nil, fmt.Errorf("consume error")) - return fmt.Errorf("failed to register a consumer: consume error") + return fmt.Errorf("failed to register a channel: consume error") }, ), newSimpleConsumeTest( "process error", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\n", func(t *testing.T, ct *consumeTest) error { err := fmt.Errorf("process error") ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable). Once(). Return(ct.msgs, nil) - ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(err) + ct.p.On("Process", 0, delivery.New(ct.dd[0])).Once().Return(err) return err }, ), newSimpleConsumeTest( "create command error", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\nERROR failed to register a consumer: create command error\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\nERROR failed to register a consumer: create command error\n", func(t *testing.T, ct *consumeTest) error { err := processor.NewCreateCommandError(fmt.Errorf("create command error")) ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable). Once(). Return(ct.msgs, nil) - ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(err) + ct.p.On("Process", 0, delivery.New(ct.dd[0])).Once().Return(err) return nil }, ), newSimpleConsumeTest( "ack error", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\n", func(t *testing.T, ct *consumeTest) error { err := processor.NewAcknowledgmentError(fmt.Errorf("ack error")) ct.ch.On("Consume", t.Name(), "ctag", false, false, false, false, nilAmqpTable). Once(). Return(ct.msgs, nil) - ct.p.On("Process", delivery.New(ct.dd[0])).Once().Return(err) + ct.p.On("Process", 0, delivery.New(ct.dd[0])).Once().Return(err) return err }, ), @@ -177,7 +179,9 @@ func TestConsumer_Consume_NotifyClose(t *testing.T) { ch.On("Consume", "", "", false, false, false, false, nilAmqpTable).Once().Return(d, nil) - c := consumer.New(nil, ch, new(TestProcessor), l) + cl := &consumer.ChannelList{} + cl.AddChannel(ch) + c := consumer.New(nil, cl, new(TestProcessor), l) go func() { done <- c.Consume(context.Background()) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 82a2ace..2feb712 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -45,21 +45,24 @@ func testConsumerCancel(t *testing.T, err error) { close(msgs) }) ctx, cancel := context.WithCancel(context.Background()) - c := consumer.New(nil, ch, nil, log.New(0)) + cl := &consumer.ChannelList{} + cl.AddChannel(ch) + c := consumer.New(nil, cl, nil, log.New(0)) c.Queue = "queue" c.Tag = t.Name() go func() { done <- c.Consume(ctx) }() cancel() - assert.Equal(t, err, <-done) + cerr := <-done + assert.Equal(t, err, cerr) ch.AssertExpectations(t) } var cancelTests = []*consumeTest{ newConsumeTest( "skip remaining", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\nINFO closing channel 0...\n", 3, 1, func(t *testing.T, ct *consumeTest) error { @@ -67,7 +70,7 @@ var cancelTests = []*consumeTest{ Once(). Return(ct.msgs, nil) ct.ch.On("Cancel", ct.Tag, false).Return(nil) - ct.p.On("Process", delivery.New(ct.dd[0])).Return(nil).Run(func(_ mock.Arguments) { + ct.p.On("Process", 0, delivery.New(ct.dd[0])).Return(nil).Run(func(_ mock.Arguments) { ct.sync <- true <-ct.sync }) @@ -78,7 +81,7 @@ var cancelTests = []*consumeTest{ ), newConsumeTest( "no messages", - "INFO Registering consumer... \nINFO Succeeded registering consumer.\nINFO Waiting for messages...\n", + "INFO Registering channels... \nINFO Succeeded registering channel 0.\nINFO Waiting for messages...\nINFO closing channel 0...\n", 0, 0, func(t *testing.T, ct *consumeTest) error { diff --git a/consumer/mock_test.go b/consumer/mock_test.go index a2c1027..dd58723 100644 --- a/consumer/mock_test.go +++ b/consumer/mock_test.go @@ -104,8 +104,8 @@ type TestProcessor struct { mock.Mock } -func (p *TestProcessor) Process(d delivery.Delivery) error { - return p.Called(d).Error(0) +func (p *TestProcessor) Process(channel int, d delivery.Delivery) error { + return p.Called(channel, d).Error(0) } func (p *TestProcessor) Cancel() error { diff --git a/consumer/setup.go b/consumer/setup.go index 83a4dde..97e3de8 100644 --- a/consumer/setup.go +++ b/consumer/setup.go @@ -8,10 +8,6 @@ import ( // Setup configures queues, exchanges and bindings in between according to the configuration. func Setup(cfg Config, ch Channel, l logr.Logger) error { - if err := setupQoS(cfg, ch, l); err != nil { - return err - } - if cfg.MustDeclareQueue() { if err := declareQueue(cfg, ch, l); err != nil { return err @@ -28,15 +24,6 @@ func Setup(cfg Config, ch Channel, l logr.Logger) error { return nil } -func setupQoS(cfg Config, ch Channel, l logr.Logger) error { - l.Info("Setting QoS... ") - if err := ch.Qos(cfg.PrefetchCount(), 0, cfg.PrefetchIsGlobal()); err != nil { - return fmt.Errorf("failed to set QoS: %v", err) - } - l.Info("Succeeded setting QoS.") - return nil -} - func declareQueue(cfg Config, ch Channel, l logr.Logger) error { l.Infof("Declaring queue \"%s\"...", cfg.QueueName()) _, err := ch.QueueDeclare( diff --git a/consumer/setup_test.go b/consumer/setup_test.go index f29a767..809ff95 100644 --- a/consumer/setup_test.go +++ b/consumer/setup_test.go @@ -19,7 +19,6 @@ const ( noRoutingKeyConfig = "no_routing" oneEmptyRoutingKeyConfig = "empty_routing" priorityConfig = "priority" - qosConfig = "qos" routingConfig = "routing" simpleExchangeConfig = "exchange" ttlConfig = "ttl" @@ -45,7 +44,6 @@ var queueTests = []struct { "simpleQueue", defaultConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "defaultQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -55,7 +53,6 @@ var queueTests = []struct { "queueWithTTL", ttlConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "ttlQueue", true, false, false, false, amqp.Table{"x-message-ttl": int32(1200)}).Return(amqp.Queue{}, nil).Once() }, nil, @@ -65,7 +62,6 @@ var queueTests = []struct { "queueWithPriority", priorityConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "priorityWorker", true, false, false, false, amqp.Table{"x-max-priority": int32(42)}).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "priorityExchange", "priorityType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "priorityWorker", "", "priorityExchange", false, nilAmqpTable).Return(nil).Once() @@ -77,7 +73,6 @@ var queueTests = []struct { "queueWithMultipleRoutingKeys", multipleRoutingKeysConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "multiRoutingQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "multiRoutingExchange", "multiRoutingType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "multiRoutingQueue", "foo", "multiRoutingExchange", false, nilAmqpTable).Return(nil).Once() @@ -90,7 +85,6 @@ var queueTests = []struct { "queueWithOneEmptyRoutingKey", oneEmptyRoutingKeyConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "emptyRoutingQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "emptyRoutingExchange", "emptyRoutingType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "emptyRoutingQueue", "", "emptyRoutingExchange", false, nilAmqpTable).Return(nil).Once() @@ -102,38 +96,17 @@ var queueTests = []struct { "queueWithoutRoutingKey", noRoutingKeyConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "noRoutingQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "noRoutingExchange", "noRoutingType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "noRoutingQueue", "", "noRoutingExchange", false, nilAmqpTable).Return(nil).Once() }, nil, }, - // Set QoS. - { - "setQos", - qosConfig, - func(ch *TestChannel) { - ch.On("Qos", 42, 0, true).Return(nil).Once() - ch.On("QueueDeclare", "qosQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() - }, - nil, - }, - // Set QoS fails. - { - "setQosFail", - qosConfig, - func(ch *TestChannel) { - ch.On("Qos", 42, 0, true).Return(fmt.Errorf("QoS error")).Once() - }, - fmt.Errorf("failed to set QoS: QoS error"), - }, // Declare queue fails. { "declareQueueFail", defaultConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "defaultQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, fmt.Errorf("queue error")).Once() }, fmt.Errorf("failed to declare queue: queue error"), @@ -143,7 +116,6 @@ var queueTests = []struct { "declareExchange", simpleExchangeConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "queueName", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "exchangeName", "exchangeType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "queueName", "", "exchangeName", false, nilAmqpTable).Return(nil).Once() @@ -155,7 +127,6 @@ var queueTests = []struct { "declareDurableExchange", durableExchangeConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "queueName", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "exchangeName", "exchangeType", true, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "queueName", "", "exchangeName", false, nilAmqpTable).Return(nil).Once() @@ -167,7 +138,6 @@ var queueTests = []struct { "declareAutoDeleteExchange", autodeleteExchangeConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "queueName", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "exchangeName", "exchangeType", false, true, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "queueName", "", "exchangeName", false, nilAmqpTable).Return(nil).Once() @@ -179,7 +149,6 @@ var queueTests = []struct { "declareExchangeFail", simpleExchangeConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "queueName", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "exchangeName", "exchangeType", false, false, false, false, emptyAmqpTable).Return(fmt.Errorf("declare exchagne error")).Once() }, @@ -190,7 +159,6 @@ var queueTests = []struct { "bindQueue", routingConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "routingQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "routingExchange", "routingType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "routingQueue", "routingKey", "routingExchange", false, nilAmqpTable).Return(nil).Once() @@ -202,7 +170,6 @@ var queueTests = []struct { "bindQueueFail", routingConfig, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "routingQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "routingExchange", "routingType", false, false, false, false, emptyAmqpTable).Return(nil).Once() ch.On("QueueBind", "routingQueue", "routingKey", "routingExchange", false, nilAmqpTable).Return(fmt.Errorf("queue bind error")).Once() @@ -214,7 +181,6 @@ var queueTests = []struct { "durableQueue", durableQueue, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "durableQueue", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -224,7 +190,6 @@ var queueTests = []struct { "nonDurableQueue", nonDurableQueue, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "nonDurableQueue", false, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -234,7 +199,6 @@ var queueTests = []struct { "defaultQueueDurability", defaultQueueDurability, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "defaultQueueDurability", true, false, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -244,7 +208,6 @@ var queueTests = []struct { "autoDeleteQueue", autoDeleteQueue, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "autoDeleteQueue", true, true, false, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -254,7 +217,6 @@ var queueTests = []struct { "exclusiveQueue", exclusiveQueue, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "exclusiveQueue", true, false, true, false, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, @@ -264,7 +226,6 @@ var queueTests = []struct { "noWaitQueue", noWaitQueue, func(ch *TestChannel) { - ch.On("Qos", 3, 0, false).Return(nil).Once() ch.On("QueueDeclare", "noWaitQueue", true, false, false, true, emptyAmqpTable).Return(amqp.Queue{}, nil).Once() }, nil, diff --git a/fixtures/TestEndToEnd/amqpUrlOutput.golden b/fixtures/TestEndToEnd/amqpUrlOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/amqpUrlOutput.golden +++ b/fixtures/TestEndToEnd/amqpUrlOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/compressedOutput.golden b/fixtures/TestEndToEnd/compressedOutput.golden index 6c23959..6b09ab3 100644 --- a/fixtures/TestEndToEnd/compressedOutput.golden +++ b/fixtures/TestEndToEnd/compressedOutput.golden @@ -1,14 +1,15 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... Compressed message -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/declareErrorOutput.golden b/fixtures/TestEndToEnd/declareErrorOutput.golden index a2de418..0c2e798 100644 --- a/fixtures/TestEndToEnd/declareErrorOutput.golden +++ b/fixtures/TestEndToEnd/declareErrorOutput.golden @@ -1,6 +1,7 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. diff --git a/fixtures/TestEndToEnd/defaultOutput.golden b/fixtures/TestEndToEnd/defaultOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/defaultOutput.golden +++ b/fixtures/TestEndToEnd/defaultOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/envAmqpUrlNoConfigOutput.golden b/fixtures/TestEndToEnd/envAmqpUrlNoConfigOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/envAmqpUrlNoConfigOutput.golden +++ b/fixtures/TestEndToEnd/envAmqpUrlNoConfigOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/envAmqpUrlOutput.golden b/fixtures/TestEndToEnd/envAmqpUrlOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/envAmqpUrlOutput.golden +++ b/fixtures/TestEndToEnd/envAmqpUrlOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/noAmqpUrlOutput.golden b/fixtures/TestEndToEnd/noAmqpUrlOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/noAmqpUrlOutput.golden +++ b/fixtures/TestEndToEnd/noAmqpUrlOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/noDeclareConfigOutput.golden b/fixtures/TestEndToEnd/noDeclareConfigOutput.golden index d6525eb..9864423 100644 --- a/fixtures/TestEndToEnd/noDeclareConfigOutput.golden +++ b/fixtures/TestEndToEnd/noDeclareConfigOutput.golden @@ -1,10 +1,11 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... diff --git a/fixtures/TestEndToEnd/noDeclareOutput.golden b/fixtures/TestEndToEnd/noDeclareOutput.golden index d6525eb..9864423 100644 --- a/fixtures/TestEndToEnd/noDeclareOutput.golden +++ b/fixtures/TestEndToEnd/noDeclareOutput.golden @@ -1,10 +1,11 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... diff --git a/fixtures/TestEndToEnd/noLogsOutput.golden b/fixtures/TestEndToEnd/noLogsOutput.golden index 8007438..d223043 100644 --- a/fixtures/TestEndToEnd/noLogsOutput.golden +++ b/fixtures/TestEndToEnd/noLogsOutput.golden @@ -1,16 +1,17 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... +[Channel 0] Processing message... Got executed bm9Mb2dz noLogs -Processed! +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/outputOutput.golden b/fixtures/TestEndToEnd/outputOutput.golden index 347f528..7777fbf 100644 --- a/fixtures/TestEndToEnd/outputOutput.golden +++ b/fixtures/TestEndToEnd/outputOutput.golden @@ -1,16 +1,17 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... +[Channel 0] Processing message... Got executed b3V0cHV0 output -Processed! +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/pipeOutput.golden b/fixtures/TestEndToEnd/pipeOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/pipeOutput.golden +++ b/fixtures/TestEndToEnd/pipeOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/propertiesOutput.golden b/fixtures/TestEndToEnd/propertiesOutput.golden index d853324..e8b3661 100644 --- a/fixtures/TestEndToEnd/propertiesOutput.golden +++ b/fixtures/TestEndToEnd/propertiesOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "test"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/fixtures/TestEndToEnd/queueNameOutput.golden b/fixtures/TestEndToEnd/queueNameOutput.golden index ff3e47e..25224c4 100644 --- a/fixtures/TestEndToEnd/queueNameOutput.golden +++ b/fixtures/TestEndToEnd/queueNameOutput.golden @@ -1,13 +1,14 @@ Connecting RabbitMQ... Connected. -Opening channel... +Opening 1 channel(s)... +Opened channel 0... Done. Setting QoS... Succeeded setting QoS. Declaring queue "altTest"... Metrics disabled. -Registering consumer... -Succeeded registering consumer. +Registering channels... +Succeeded registering channel 0. Waiting for messages... -Processing message... -Processed! +[Channel 0] Processing message... +[Channel 0] Processed! diff --git a/go.sum b/go.sum index 93b9ac3..1e037a5 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,7 @@ bou.ke/monkey v1.0.1 h1:zEMLInw9xvNakzUUPjfS4Ds6jYPqCFx3m7bRmG5NH2U= bou.ke/monkey v1.0.1/go.mod h1:FgHuK96Rv2Nlf+0u1OOVDpCMdsWyOFmeeketDHE7LIg= github.com/AlekSi/gocoverutil v0.2.0 h1:lpfoGyib/qbTh7PajsBL6Upv3Fnn2o3A6Mn0naFR0E8= github.com/AlekSi/gocoverutil v0.2.0/go.mod h1:/SQ8potkEzPK7N0+EyZi8sPtf/nK3BnHjw7tVmlDdUs= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bketelsen/logr v0.0.0-20170116012416-f3d070bdd1c5 h1:k5oblHm+AdEGmDZA1NwxXjKuinRB6WsEuloKQ3i5u5g= github.com/bketelsen/logr v0.0.0-20170116012416-f3d070bdd1c5/go.mod h1:to4EbfYTEzdQuHxVGz1ug+d7a3bqOjR1r02gJ1Xv4z8= @@ -15,14 +16,19 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/sebdah/goldie v0.0.0-20190305024101-629351c67c53 h1:qR9Fm180+oJEk1tyg+3wysrby5LvT9Y2vD4uKdLZlYw= github.com/sebdah/goldie v0.0.0-20190305024101-629351c67c53/go.mod h1:lvjGftC8oe7XPtyrOidaMi0rp5B9+XY/ZRUynGnuaxQ= diff --git a/main.go b/main.go index 3fdf796..8785e25 100644 --- a/main.go +++ b/main.go @@ -79,6 +79,11 @@ var flags []cli.Flag = []cli.Flag{ Name: "no-declare", Usage: "prevents the queue from being declared.", }, + cli.IntFlag{ + Name: "num-channels, n", + Value: 1, + Usage: "Specifies the number of channels to use on this connection.", + }, cli.BoolFlag{ Name: "metrics, m", Usage: "enables metric to be exposed.", @@ -311,6 +316,10 @@ func LoadConfiguration(c *cli.Context) (*config.Config, error) { cfg.QueueSettings.Nodeclare = c.Bool("no-declare") } + if c.IsSet("num-channels") { + cfg.RabbitMq.NumChannels = c.Int("num-channels") + } + return cfg, nil } diff --git a/processor/fixtures/TestProcessor_Run/error.golden b/processor/fixtures/TestProcessor_Run/error.golden index 7cba1b7..2a7c107 100644 --- a/processor/fixtures/TestProcessor_Run/error.golden +++ b/processor/fixtures/TestProcessor_Run/error.golden @@ -1,6 +1,6 @@ -INFO Processing message... -INFO Failed. Check error log for details. -ERROR Error: exit status 1 -ERROR Failed: lorem +INFO [Channel 1] Processing message... +INFO [Channel 1] Failed. Check error log for details. +ERROR [Channel 1] Error: exit status 1 +ERROR [Channel 1] Failed: lorem ipsum -INFO Processed! +INFO [Channel 1] Processed! diff --git a/processor/fixtures/TestProcessor_Run/errorCapture.golden b/processor/fixtures/TestProcessor_Run/errorCapture.golden index 00f71a9..b0e7d92 100644 --- a/processor/fixtures/TestProcessor_Run/errorCapture.golden +++ b/processor/fixtures/TestProcessor_Run/errorCapture.golden @@ -1,4 +1,4 @@ -INFO Processing message... -INFO Failed. Check error log for details. -ERROR Error: exit status 1 -INFO Processed! +INFO [Channel 1] Processing message... +INFO [Channel 1] Failed. Check error log for details. +ERROR [Channel 1] Error: exit status 1 +INFO [Channel 1] Processed! diff --git a/processor/fixtures/TestProcessor_Run/success.golden b/processor/fixtures/TestProcessor_Run/success.golden index c911554..afecddd 100644 --- a/processor/fixtures/TestProcessor_Run/success.golden +++ b/processor/fixtures/TestProcessor_Run/success.golden @@ -1,2 +1,2 @@ -INFO Processing message... -INFO Processed! +INFO [Channel 1] Processing message... +INFO [Channel 1] Processed! diff --git a/processor/processor.go b/processor/processor.go index 2ccb197..f396474 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -17,7 +17,7 @@ import ( // Processor describes the interface used by the consumer to process messages. type Processor interface { - Process(delivery.Delivery) error + Process(int, delivery.Delivery) error } // New creates a new processor instance. @@ -31,20 +31,16 @@ type processor struct { ack acknowledger.Acknowledger log logr.Logger mu sync.Mutex - cmd *exec.Cmd } // Process creates a new exec command using the builder and executes the command. The message gets acknowledged // according to the commands exit code using the acknowledger. -func (p *processor) Process(d delivery.Delivery) error { - p.mu.Lock() - defer p.mu.Unlock() - +func (p *processor) Process(channel int, d delivery.Delivery) error { var err error - p.cmd, err = p.builder.GetCommand(d.Properties(), d.Info(), d.Body()) + cmd, err := p.builder.GetCommand(d.Properties(), d.Info(), d.Body()) defer func() { - p.cmd = nil + cmd = nil }() if err != nil { d.Nack(true) @@ -52,7 +48,7 @@ func (p *processor) Process(d delivery.Delivery) error { } start := time.Now() - exitCode := p.run() + exitCode := p.run(cmd, channel) collector.ProcessCounter.With(prometheus.Labels{"exit_code": strconv.Itoa(exitCode)}).Inc() collector.ProcessDuration.Observe(time.Since(start).Seconds()) @@ -67,25 +63,25 @@ func (p *processor) Process(d delivery.Delivery) error { return nil } -func (p *processor) run() int { - p.log.Info("Processing message...") - defer p.log.Info("Processed!") +func (p *processor) run(cmd *exec.Cmd, channel int) int { + p.log.Infof("[Channel %d] Processing message...", channel) + defer p.log.Infof("[Channel %d] Processed!", channel) var out []byte var err error - capture := p.cmd.Stdout == nil && p.cmd.Stderr == nil + capture := cmd.Stdout == nil && cmd.Stderr == nil if capture { - out, err = p.cmd.CombinedOutput() + out, err = cmd.CombinedOutput() } else { - err = p.cmd.Run() + err = cmd.Run() } if err != nil { - p.log.Info("Failed. Check error log for details.") - p.log.Errorf("Error: %s\n", err) + p.log.Infof("[Channel %d] Failed. Check error log for details.", channel) + p.log.Errorf("[Channel %d] Error: %s\n", channel, err) if capture { - p.log.Errorf("Failed: %s", string(out)) + p.log.Errorf("[Channel %d] Failed: %s", channel, string(out)) } return exitCode(err) diff --git a/processor/processor_test.go b/processor/processor_test.go index 43cfd9f..a33ab9a 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -45,9 +45,9 @@ func TestProcessor_Run(t *testing.T) { for _, test := range execCommandRunTests { t.Run(test.name, func(t *testing.T) { l := log.New(0) - p := processor{log: l, cmd: test.cmd} + p := processor{log: l} - assert.Equal(t, p.run(), test.code) + assert.Equal(t, p.run(test.cmd, 1), test.code) goldie.Assert(t, t.Name(), l.Buf().Bytes()) }) } @@ -99,7 +99,7 @@ func testProcessing(t *testing.T, name string, setup func(t *testing.T, a *TestA d.On("Info").Return(di) exp := setup(t, a, b, d) - err := p.Process(d) + err := p.Process(1, d) if len(exp) > 0 { assert.Equal(t, exp, err.Error())