Skip to content
This repository has been archived by the owner on Feb 8, 2023. It is now read-only.

Commit

Permalink
Now we Ack the messages only if saved to tarball (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
stiangrindvoll authored Nov 29, 2017
1 parent 53b68fb commit 4f5faf3
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 25 deletions.
5 changes: 3 additions & 2 deletions cmd/out.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ var outCmd = &cobra.Command{
the consumption and save the last message buffers.`,
Run: func(cmd *cobra.Command, args []string) {
channel := make(chan rmq.Message, prefetch*2)
verify := make(chan rmq.Verify)

rabbit := rmq.NewConsumer(uri, exchange, queue, routingKey, tag, prefetch)
path := file.NewOutput(outputDirectory, batchSize)

go rabbit.Consume(channel)
go rabbit.Consume(channel, verify)

c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
Expand All @@ -53,7 +54,7 @@ var outCmd = &cobra.Command{
close(channel)
}()

path.Receive(channel)
path.Receive(channel, verify)
},
}

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
rabbtmq:
image: rabbitmq:3-management-alpine
image: rabbitmq:3.6.12-management-alpine
ports:
- "15672:15672"
- "5672:5672"
9 changes: 5 additions & 4 deletions file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ func NewInput(path string) *Path {
return f
}

func writeFile(b []byte, dir, file string) {
func writeFile(b []byte, dir, file string) error {
filePath := filepath.Join(dir, file)
err := ioutil.WriteFile(filePath, b, 0644)
if err != nil {
log.Fatalln(err)
return err
}
log.Printf("Wrote %d bytes to %s", len(b), filePath)
return nil
}

// Send delivers messages to the channel
Expand Down Expand Up @@ -116,11 +117,11 @@ func NewOutput(path string, batchSize int) *Path {
}

// Receive will handle messages and save to path
func (p *Path) Receive(messages chan rmq.Message) {
func (p *Path) Receive(messages chan rmq.Message, verify chan rmq.Verify) {

// create new TarballBuilder
builder := NewTarballBuilder(p.batchSize)

builder.Pack(messages, p.name)
builder.Pack(messages, p.name, verify)

}
30 changes: 24 additions & 6 deletions file/tarball.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,42 @@ func UnPack(wg *sync.WaitGroup, file *os.File, messages chan rmq.Message) (n int
}

// Pack messages from the channel into the directory
func (t *TarballBuilder) Pack(messages chan rmq.Message, dir string) {
func (t *TarballBuilder) Pack(messages chan rmq.Message, dir string, verify chan rmq.Verify) {

t.wg.Add(1)

docNum := 0
fileNum := 0

var deliveryTag uint64
for doc := range messages {
deliveryTag = doc.DeliveryTag

docNum++
if docNum >= t.tarSize {

fileNum++
t.tar.Flush()
t.tar.Close()
t.gzip.Close()

// writes to tarball here when reached the t.tarSize
writeFile(t.buf.Bytes(), dir, fmt.Sprintf("%d_messages_%d.tgz", fileNum, docNum))
err := writeFile(t.buf.Bytes(), dir, fmt.Sprintf("%d_messages_%d.tgz", fileNum, docNum))
if err != nil {
log.Fatal(err)
}
verify <- rmq.Verify{MultiAck: true, Tag: doc.DeliveryTag}

err := t.getWriters()
err = t.getWriters()
if err != nil {
log.Fatal(err)
}
docNum = 0
}

if err := t.addFile(t.tar, uuid.New(), &doc); err != nil {
log.Fatalln(err)
}
docNum++
}
t.tar.Flush()
t.tar.Close()
Expand All @@ -163,10 +172,19 @@ func (t *TarballBuilder) Pack(messages chan rmq.Message, dir string) {
fileNum++

// writes to tarball here when not reached the t.tarSize
writeFile(t.buf.Bytes(), dir, fmt.Sprintf("%d_messages_%d.tgz", fileNum, docNum))
err := writeFile(t.buf.Bytes(), dir, fmt.Sprintf("%d_messages_%d.tgz", fileNum, docNum))
if err != nil {
log.Fatal(err)

t.wg.Done()
}

// Does not ack the messages unless it is repeated, not sure why yet..
// Might want to change using delivery ack interface
verify <- rmq.Verify{MultiAck: true, Tag: deliveryTag}
verify <- rmq.Verify{MultiAck: true, Tag: deliveryTag}

t.wg.Done()
close(verify)
log.Print("tarball writer closing")
}

Expand Down
21 changes: 12 additions & 9 deletions rmq/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,15 @@ func NewConsumer(amqpURI, exchange, queue, routingKey, tag string, prefetch int)
return r
}

func (r *RabbitMQ) ackMultiple(deliveryTag <-chan Verify) {
for v := range deliveryTag {
r.channel.Ack(v.Tag, v.MultiAck)
}
}

// Consume outputs a stream of Message into a channel from rabbit
func (r *RabbitMQ) Consume(out chan Message) {
func (r *RabbitMQ) Consume(out chan Message, verify <-chan Verify) {
go r.ackMultiple(verify)

// set up a channel consumer
deliveries, err := r.channel.Consume(
Expand All @@ -97,18 +104,14 @@ func (r *RabbitMQ) Consume(out chan Message) {
for d := range deliveries {
// create a new Message for the rabbit message
msg := Message{
Body: d.Body,
RoutingKey: d.RoutingKey,
Headers: d.Headers,
Body: d.Body,
RoutingKey: d.RoutingKey,
Headers: d.Headers,
DeliveryTag: d.DeliveryTag,
}
// write Message to channel
out <- msg
// ack message
r.channel.Ack(d.DeliveryTag, false)
}

log.Print("All messages consumed")

// when deliveries are done, close
close(out)
}
13 changes: 10 additions & 3 deletions rmq/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ package rmq

// Message contains the most basic about the message
type Message struct {
Body []byte
RoutingKey string
Headers map[string]interface{}
Body []byte
RoutingKey string
Headers map[string]interface{}
DeliveryTag uint64
}

// Verify will be used to Ack Message from the queue
type Verify struct {
Tag uint64
MultiAck bool
}

// NewMessageFromAttrs will create a new message from a byte slice and attributes
Expand Down

0 comments on commit 4f5faf3

Please sign in to comment.