From 4f5faf308cd9b8d7a071cebf0f48813f133ef35e Mon Sep 17 00:00:00 2001 From: Stian Grindvoll Date: Wed, 29 Nov 2017 12:31:38 +0100 Subject: [PATCH] Now we Ack the messages only if saved to tarball (#8) --- cmd/out.go | 5 +++-- docker-compose.yml | 2 +- file/file.go | 9 +++++---- file/tarball.go | 30 ++++++++++++++++++++++++------ rmq/consume.go | 21 ++++++++++++--------- rmq/message.go | 13 ++++++++++--- 6 files changed, 55 insertions(+), 25 deletions(-) diff --git a/cmd/out.go b/cmd/out.go index adf6dba..fc5f069 100644 --- a/cmd/out.go +++ b/cmd/out.go @@ -39,11 +39,12 @@ var outCmd = &cobra.Command{ the consumption and save the last message buffers.`, Run: func(cmd *cobra.Command, args []string) { channel := make(chan rmq.Message, prefetch*2) + verify := make(chan rmq.Verify) rabbit := rmq.NewConsumer(uri, exchange, queue, routingKey, tag, prefetch) path := file.NewOutput(outputDirectory, batchSize) - go rabbit.Consume(channel) + go rabbit.Consume(channel, verify) c := make(chan os.Signal, 2) signal.Notify(c, os.Interrupt, syscall.SIGTERM) @@ -53,7 +54,7 @@ var outCmd = &cobra.Command{ close(channel) }() - path.Receive(channel) + path.Receive(channel, verify) }, } diff --git a/docker-compose.yml b/docker-compose.yml index 229a71e..0c91e6e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,5 @@ rabbtmq: - image: rabbitmq:3-management-alpine + image: rabbitmq:3.6.12-management-alpine ports: - "15672:15672" - "5672:5672" diff --git a/file/file.go b/file/file.go index a923564..74da15d 100644 --- a/file/file.go +++ b/file/file.go @@ -62,13 +62,14 @@ func NewInput(path string) *Path { return f } -func writeFile(b []byte, dir, file string) { +func writeFile(b []byte, dir, file string) error { filePath := filepath.Join(dir, file) err := ioutil.WriteFile(filePath, b, 0644) if err != nil { - log.Fatalln(err) + return err } log.Printf("Wrote %d bytes to %s", len(b), filePath) + return nil } // Send delivers messages to the channel @@ -116,11 +117,11 @@ func NewOutput(path string, batchSize int) *Path { } // Receive will handle messages and save to path -func (p *Path) Receive(messages chan rmq.Message) { +func (p *Path) Receive(messages chan rmq.Message, verify chan rmq.Verify) { // create new TarballBuilder builder := NewTarballBuilder(p.batchSize) - builder.Pack(messages, p.name) + builder.Pack(messages, p.name, verify) } diff --git a/file/tarball.go b/file/tarball.go index c634854..347a059 100644 --- a/file/tarball.go +++ b/file/tarball.go @@ -128,33 +128,42 @@ func UnPack(wg *sync.WaitGroup, file *os.File, messages chan rmq.Message) (n int } // Pack messages from the channel into the directory -func (t *TarballBuilder) Pack(messages chan rmq.Message, dir string) { +func (t *TarballBuilder) Pack(messages chan rmq.Message, dir string, verify chan rmq.Verify) { t.wg.Add(1) docNum := 0 fileNum := 0 + + var deliveryTag uint64 for doc := range messages { + deliveryTag = doc.DeliveryTag + docNum++ if docNum >= t.tarSize { + fileNum++ t.tar.Flush() t.tar.Close() t.gzip.Close() // writes to tarball here when reached the t.tarSize - writeFile(t.buf.Bytes(), dir, fmt.Sprintf("%d_messages_%d.tgz", fileNum, docNum)) + err := writeFile(t.buf.Bytes(), dir, fmt.Sprintf("%d_messages_%d.tgz", fileNum, docNum)) + if err != nil { + log.Fatal(err) + } + verify <- rmq.Verify{MultiAck: true, Tag: doc.DeliveryTag} - err := t.getWriters() + err = t.getWriters() if err != nil { log.Fatal(err) } docNum = 0 } + if err := t.addFile(t.tar, uuid.New(), &doc); err != nil { log.Fatalln(err) } - docNum++ } t.tar.Flush() t.tar.Close() @@ -163,10 +172,19 @@ func (t *TarballBuilder) Pack(messages chan rmq.Message, dir string) { fileNum++ // writes to tarball here when not reached the t.tarSize - writeFile(t.buf.Bytes(), dir, fmt.Sprintf("%d_messages_%d.tgz", fileNum, docNum)) + err := writeFile(t.buf.Bytes(), dir, fmt.Sprintf("%d_messages_%d.tgz", fileNum, docNum)) + if err != nil { + log.Fatal(err) - t.wg.Done() + } + + // Does not ack the messages unless it is repeated, not sure why yet.. + // Might want to change using delivery ack interface + verify <- rmq.Verify{MultiAck: true, Tag: deliveryTag} + verify <- rmq.Verify{MultiAck: true, Tag: deliveryTag} + t.wg.Done() + close(verify) log.Print("tarball writer closing") } diff --git a/rmq/consume.go b/rmq/consume.go index 028a131..2cb2c73 100644 --- a/rmq/consume.go +++ b/rmq/consume.go @@ -76,8 +76,15 @@ func NewConsumer(amqpURI, exchange, queue, routingKey, tag string, prefetch int) return r } +func (r *RabbitMQ) ackMultiple(deliveryTag <-chan Verify) { + for v := range deliveryTag { + r.channel.Ack(v.Tag, v.MultiAck) + } +} + // Consume outputs a stream of Message into a channel from rabbit -func (r *RabbitMQ) Consume(out chan Message) { +func (r *RabbitMQ) Consume(out chan Message, verify <-chan Verify) { + go r.ackMultiple(verify) // set up a channel consumer deliveries, err := r.channel.Consume( @@ -97,18 +104,14 @@ func (r *RabbitMQ) Consume(out chan Message) { for d := range deliveries { // create a new Message for the rabbit message msg := Message{ - Body: d.Body, - RoutingKey: d.RoutingKey, - Headers: d.Headers, + Body: d.Body, + RoutingKey: d.RoutingKey, + Headers: d.Headers, + DeliveryTag: d.DeliveryTag, } // write Message to channel out <- msg - // ack message - r.channel.Ack(d.DeliveryTag, false) } log.Print("All messages consumed") - - // when deliveries are done, close - close(out) } diff --git a/rmq/message.go b/rmq/message.go index 060ca25..bea777e 100644 --- a/rmq/message.go +++ b/rmq/message.go @@ -16,9 +16,16 @@ package rmq // Message contains the most basic about the message type Message struct { - Body []byte - RoutingKey string - Headers map[string]interface{} + Body []byte + RoutingKey string + Headers map[string]interface{} + DeliveryTag uint64 +} + +// Verify will be used to Ack Message from the queue +type Verify struct { + Tag uint64 + MultiAck bool } // NewMessageFromAttrs will create a new message from a byte slice and attributes