From 53b68fb0b4cfaa1e4c83a2e843e8505b4a61b28a Mon Sep 17 00:00:00 2001 From: Stian Grindvoll Date: Mon, 27 Nov 2017 01:06:14 +0100 Subject: [PATCH] This ensures we close connection after sync up on documents sent (#5) --- cmd/in.go | 13 +++++++++---- file/file.go | 8 ++++++-- file/tarball.go | 3 ++- rmq/publish.go | 15 +++++++++++++++ rmq/rabbitmq.go | 7 ++++++- 5 files changed, 38 insertions(+), 8 deletions(-) diff --git a/cmd/in.go b/cmd/in.go index b2df255..cf199be 100644 --- a/cmd/in.go +++ b/cmd/in.go @@ -15,6 +15,8 @@ package cmd import ( + "sync" + "github.com/meltwater/rabbitio/file" "github.com/meltwater/rabbitio/rmq" "github.com/spf13/cobra" @@ -32,14 +34,17 @@ var inCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { channel := make(chan rmq.Message, prefetch) + var wg sync.WaitGroup override := rmq.Override{RoutingKey: routingKey} - rabbit := rmq.NewPublisher(uri, exchange, queue, tag, prefetch) path := file.NewInput(fileInput) + path.Wg = &wg + rabbit := rmq.NewPublisher(uri, exchange, queue, tag, prefetch) + rabbit.Wg = &wg - go path.Send(channel) - - rabbit.Publish(channel, override) + go rabbit.Publish(channel, override) + path.Send(channel) + rabbit.Close() }, } diff --git a/file/file.go b/file/file.go index ea476f2..a923564 100644 --- a/file/file.go +++ b/file/file.go @@ -19,6 +19,7 @@ import ( "log" "os" "path/filepath" + "sync" "github.com/meltwater/rabbitio/rmq" ) @@ -28,6 +29,7 @@ type Path struct { name string batchSize int queue []string + Wg *sync.WaitGroup } // NewInput returns a *Path with a queue of files paths, all files in a directory @@ -83,15 +85,17 @@ func (p *Path) Send(messages chan rmq.Message) { // and clean up afterwards defer fh.Close() - tarNum, err := UnPack(fh, messages) + tarNum, err := UnPack(p.Wg, fh, messages) if err != nil { log.Fatalf("Failed to unpack: %s ", err) } log.Printf("Extracted %d Messages from tarball: %s", tarNum, file) num = num + tarNum } - // when all files are read, close + + p.Wg.Wait() close(messages) + // when all files are read, close log.Printf("Total %d Messages from tarballs", num) } diff --git a/file/tarball.go b/file/tarball.go index d5c2acd..c634854 100644 --- a/file/tarball.go +++ b/file/tarball.go @@ -91,7 +91,7 @@ func (t *TarballBuilder) addFile(tw *tar.Writer, name string, m *rmq.Message) er } // UnPack will decompress and send messages out on channel from file -func UnPack(file *os.File, messages chan rmq.Message) (n int, err error) { +func UnPack(wg *sync.WaitGroup, file *os.File, messages chan rmq.Message) (n int, err error) { // wrap fh in a gzip reader gr, err := gzip.NewReader(file) @@ -109,6 +109,7 @@ func UnPack(file *os.File, messages chan rmq.Message) (n int, err error) { if terr != nil { return n, terr } + wg.Add(1) // create a Buffer to work on // TODO: reuse if GC pressure is a problem diff --git a/rmq/publish.go b/rmq/publish.go index cdb9824..fdb15e2 100644 --- a/rmq/publish.go +++ b/rmq/publish.go @@ -90,5 +90,20 @@ func (r *RabbitMQ) Publish(messages chan Message, o Override) { ); err != nil { log.Fatalf("writer failed to write document to rabbit: %s", err) } + r.Wg.Done() } } + +// Close will close the RabbitMQ channel and connection +func (r *RabbitMQ) Close() error { + err := r.channel.Close() + if err != nil { + return err + } + err = r.conn.Close() + if err != nil { + return err + } + log.Println("RabbitMQ Connection closed with success") + return nil +} diff --git a/rmq/rabbitmq.go b/rmq/rabbitmq.go index 4f7d87e..884719f 100644 --- a/rmq/rabbitmq.go +++ b/rmq/rabbitmq.go @@ -14,7 +14,11 @@ package rmq -import "github.com/streadway/amqp" +import ( + "sync" + + "github.com/streadway/amqp" +) // RabbitMQ type for talking to RabbitMQ type RabbitMQ struct { @@ -29,6 +33,7 @@ type RabbitMQ struct { prefetch int consume bool publish bool + Wg *sync.WaitGroup } // Override will be used to override RabbitMQ settings on publishing messages