diff --git a/cmd/in.go b/cmd/in.go index 4252531..b2df255 100644 --- a/cmd/in.go +++ b/cmd/in.go @@ -15,6 +15,8 @@ package cmd import ( + "github.com/meltwater/rabbitio/file" + "github.com/meltwater/rabbitio/rmq" "github.com/spf13/cobra" ) @@ -29,14 +31,15 @@ var inCmd = &cobra.Command{ Long: `Specify a directory or file and tarballs will be published.`, Run: func(cmd *cobra.Command, args []string) { - channel := make(chan Message, prefetch) + channel := make(chan rmq.Message, prefetch) - rabbit := NewRabbitMQ(uri, exchange, userQueue, routingKey, tag, prefetch, false, true) - path := NewFileInput(fileInput) + override := rmq.Override{RoutingKey: routingKey} + rabbit := rmq.NewPublisher(uri, exchange, queue, tag, prefetch) + path := file.NewInput(fileInput) go path.Send(channel) - rabbit.Publish(channel) + rabbit.Publish(channel, override) }, } diff --git a/cmd/out.go b/cmd/out.go index 717b0b5..adf6dba 100644 --- a/cmd/out.go +++ b/cmd/out.go @@ -20,6 +20,8 @@ import ( "os/signal" "syscall" + "github.com/meltwater/rabbitio/file" + "github.com/meltwater/rabbitio/rmq" "github.com/spf13/cobra" ) @@ -36,10 +38,10 @@ var outCmd = &cobra.Command{ When there are no more messages in the queue, press CTRL + c, to interrupt the consumption and save the last message buffers.`, Run: func(cmd *cobra.Command, args []string) { - channel := make(chan Message, prefetch*2) + channel := make(chan rmq.Message, prefetch*2) - rabbit := NewRabbitMQ(uri, exchange, userQueue, routingKey, tag, prefetch, true, false) - savePath := NewFileOutput(outputDirectory, batchSize) + rabbit := rmq.NewConsumer(uri, exchange, queue, routingKey, tag, prefetch) + path := file.NewOutput(outputDirectory, batchSize) go rabbit.Consume(channel) @@ -51,7 +53,7 @@ var outCmd = &cobra.Command{ close(channel) }() - savePath.Receive(channel) + path.Receive(channel) }, } diff --git a/cmd/rabbitmq.go b/cmd/rabbitmq.go deleted file mode 100644 index 7fa7ba3..0000000 --- a/cmd/rabbitmq.go +++ /dev/null @@ -1,207 +0,0 @@ -// Copyright © 2017 Meltwater -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "log" - - "github.com/streadway/amqp" -) - -// RabbitMQ type for talking to RabbitMQ -type RabbitMQ struct { - conn *amqp.Connection - channel *amqp.Channel - exchange string - contentType string - contentEncoding string - queue string - tag string - routingKey string - prefetch int - consume bool - publish bool -} - -// Message contains the most basic about the message -type Message struct { - Body []byte - RoutingKey string - Headers map[string]interface{} -} - -// NewMessageFromAttrs will create a new message from a byte slice and attributes -func NewMessageFromAttrs(bytes []byte, attrs map[string]string) *Message { - - // add header information to the Message - var headers = make(map[string]interface{}) - var key string - for k, v := range attrs { - switch k { - // use the provided routing key to override tarball configuration - case "amqp.routingKey": - if routingKey != "" { - key = routingKey - } else { - key = v - } - default: - headers[k] = v - } - } - - // create a message - m := &Message{ - Body: bytes, - RoutingKey: key, - Headers: headers, - } - - return m -} - -// NewRabbitMQ creates and sets up a RabbitOutput -func NewRabbitMQ(amqpURI, exchange, queue, routingKey, tag string, prefetch int, consume, publish bool) *RabbitMQ { - conn, err := amqp.Dial(amqpURI) - if err != nil { - log.Fatalf("writer failed to connect to Rabbit: %s", err) - return nil - } - - go func() { - log.Printf("writer closing: %s", <-conn.NotifyClose(make(chan *amqp.Error))) - log.Printf("writer blocked by rabbit: %v", <-conn.NotifyBlocked(make(chan amqp.Blocking))) - }() - - channel, err := conn.Channel() - if err != nil { - log.Fatalf("writer failed to get a channel from Rabbit: %s", err) - return nil - } - - if publish { - if err = channel.ExchangeDeclarePassive( - exchange, // name - "topic", // type - true, // durable - false, // auto-deleted - false, // internal - false, // noWait - nil, // arguments - ); err != nil { - log.Fatalf("Exchange Declare: %s", err) - } - } - - if consume { - - q, err := channel.QueueDeclarePassive( - queue, // name of the queue - true, // durable - false, // delete when usused - false, // exclusive - false, // noWait - nil, // arguments - ) - if err != nil { - log.Fatalf("Queue Declare: %s", err) - } - if q.Messages == 0 { - log.Fatalf("No messages in RabbitMQ Queue: %s", q.Name) - } - if err = channel.QueueBind( - q.Name, // name of the queue - "#", // bindingKey - exchange, // sourceExchange - false, // noWait - nil, // arguments - ); err != nil { - log.Fatalf("Queue Bind: %s", err) - } - log.Printf("Bind to Exchange: %q and Queue: %q, Messaging waiting: %d", exchange, queue, q.Messages) - } - - r := &RabbitMQ{ - conn: conn, - channel: channel, - exchange: exchange, - contentType: "application/json", - contentEncoding: "UTF-8", - } - log.Print("RabbitMQ connected: ", amqpURI) - - return r -} - -// Publish Takes stream of messages and publish them to rabbit -func (r *RabbitMQ) Publish(in chan Message) { - for doc := range in { - - // var table amqp.Table = doc.Headers - - if err := r.channel.Publish( - r.exchange, - doc.RoutingKey, - false, // mandatory - false, // immediate - amqp.Publishing{ - Headers: doc.Headers, - ContentType: r.contentType, - ContentEncoding: r.contentEncoding, - Body: doc.Body, - DeliveryMode: amqp.Persistent, - }, - ); err != nil { - log.Fatalf("writer failed to write document to rabbit: %s", err) - } - } -} - -// Consume outputs a stream of Message into a channel from rabbit -func (r *RabbitMQ) Consume(out chan Message) { - - // set up a channel consumer - deliveries, err := r.channel.Consume( - r.queue, // name - r.tag, // consumerTag, - false, // noAck - false, // exclusive - false, // noLocal - false, // noWait - nil, // arguments - ) - if err != nil { - log.Fatalf("rabbit consumer failed %s", err) - } - - // process deliveries from the queue - for d := range deliveries { - // create a new Message for the rabbit message - msg := Message{ - Body: d.Body, - RoutingKey: d.RoutingKey, - Headers: d.Headers, - } - // write Message to channel - out <- msg - // ack message - r.channel.Ack(d.DeliveryTag, false) - } - - log.Print("All messages consumed") - - // when deliveries are done, close - close(out) -} diff --git a/cmd/root.go b/cmd/root.go index 7406685..b6095e2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -22,9 +22,9 @@ import ( ) var ( - version string - uri, exchange, userQueue, tag, routingKey string - prefetch int + version string + uri, exchange, queue, tag, routingKey string + prefetch int ) // RootCmd represents the base command when called without any subcommands @@ -49,8 +49,8 @@ func Execute(ver string) { func init() { RootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "amqp://guest:guest@localhost:5672/", "AMQP URI, uri to for instance RabbitMQ") RootCmd.PersistentFlags().StringVarP(&exchange, "exchange", "e", "", "Exchange to connect to") - RootCmd.PersistentFlags().StringVarP(&userQueue, "queue", "q", "", "Queue to connect to") - RootCmd.PersistentFlags().StringVarP(&routingKey, "routingkey", "r", "", "Routing Key, if specified will override tarball routing key configuration") + RootCmd.PersistentFlags().StringVarP(&queue, "queue", "q", "", "Queue to connect to") + RootCmd.PersistentFlags().StringVarP(&routingKey, "routingkey", "r", "#", "Routing Key, if specified will override tarball routing key configuration") RootCmd.PersistentFlags().StringVarP(&tag, "tag", "t", "Rabbit IO Connector "+version, "AMQP Client Tag") RootCmd.PersistentFlags().IntVarP(&prefetch, "prefetch", "p", 100, "Prefetch for batches") } diff --git a/cmd/file.go b/file/file.go similarity index 76% rename from cmd/file.go rename to file/file.go index 1e5648a..ea476f2 100644 --- a/cmd/file.go +++ b/file/file.go @@ -12,34 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cmd +package file import ( "io/ioutil" "log" "os" "path/filepath" -) -// FileInput is nice -type FileInput struct { - queue []string -} + "github.com/meltwater/rabbitio/rmq" +) -// Path is directory path for consumed RabbitMQ messages +// Path is a directory file path type Path struct { name string batchSize int + queue []string } -// NewFileInput creates a FileInput from the specified directory -func NewFileInput(path string) *FileInput { +// NewInput returns a *Path with a queue of files paths, all files in a directory +func NewInput(path string) *Path { fi, err := os.Stat(path) if err != nil { log.Fatalln(err) } - var f *FileInput + var f *Path q := []string{} switch mode := fi.Mode(); { case mode.IsDir(): @@ -55,7 +53,7 @@ func NewFileInput(path string) *FileInput { q = append(q, path) } - f = &FileInput{ + f = &Path{ queue: q, } @@ -72,11 +70,11 @@ func writeFile(b []byte, dir, file string) { } // Send delivers messages to the channel -func (f *FileInput) Send(messages chan Message) { +func (p *Path) Send(messages chan rmq.Message) { var num int // loop over the queued up files - for _, file := range f.queue { + for _, file := range p.queue { // open file from the queue fh, err := os.Open(file) if err != nil { @@ -98,8 +96,15 @@ func (f *FileInput) Send(messages chan Message) { } -// NewFileOutput creates a Path to output files in from RabbitMQ -func NewFileOutput(path string, batchSize int) *Path { +// NewOutput creates a Path to output files in from RabbitMQ +func NewOutput(path string, batchSize int) *Path { + if _, err := os.Stat(path); os.IsNotExist(err) { + log.Println("Creating missing directory:", path) + err := os.MkdirAll(path, os.ModePerm) + if err != nil { + log.Fatalln(err) + } + } return &Path{ name: path, batchSize: batchSize, @@ -107,7 +112,7 @@ func NewFileOutput(path string, batchSize int) *Path { } // Receive will handle messages and save to path -func (p *Path) Receive(messages chan Message) { +func (p *Path) Receive(messages chan rmq.Message) { // create new TarballBuilder builder := NewTarballBuilder(p.batchSize) diff --git a/cmd/tarball.go b/file/tarball.go similarity index 92% rename from cmd/tarball.go rename to file/tarball.go index 3cfb626..d5c2acd 100644 --- a/cmd/tarball.go +++ b/file/tarball.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cmd +package file import ( "archive/tar" @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/meltwater/rabbitio/rmq" "github.com/pborman/uuid" ) @@ -64,7 +65,7 @@ func (t *TarballBuilder) getWriters() (err error) { } // add a new file to the tarball writer -func (t *TarballBuilder) addFile(tw *tar.Writer, name string, m *Message) error { +func (t *TarballBuilder) addFile(tw *tar.Writer, name string, m *rmq.Message) error { header := new(tar.Header) header.Name = name header.Size = int64(len(m.Body)) @@ -90,7 +91,7 @@ func (t *TarballBuilder) addFile(tw *tar.Writer, name string, m *Message) error } // UnPack will decompress and send messages out on channel from file -func UnPack(file *os.File, messages chan Message) (n int, err error) { +func UnPack(file *os.File, messages chan rmq.Message) (n int, err error) { // wrap fh in a gzip reader gr, err := gzip.NewReader(file) @@ -119,14 +120,14 @@ func UnPack(file *os.File, messages chan Message) (n int, err error) { } // generate and push the message to the output channel - messages <- *NewMessageFromAttrs(buf.Bytes(), hdr.Xattrs) + messages <- *rmq.NewMessageFromAttrs(buf.Bytes(), hdr.Xattrs) n++ } return n, err } // Pack messages from the channel into the directory -func (t *TarballBuilder) Pack(messages chan Message, dir string) { +func (t *TarballBuilder) Pack(messages chan rmq.Message, dir string) { t.wg.Add(1) diff --git a/rmq/consume.go b/rmq/consume.go new file mode 100644 index 0000000..028a131 --- /dev/null +++ b/rmq/consume.go @@ -0,0 +1,114 @@ +// Copyright © 2017 Meltwater +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rmq + +import ( + "log" + + "github.com/streadway/amqp" +) + +// NewConsumer creates and sets up a RabbitMQ struct best used for consuming messages +func NewConsumer(amqpURI, exchange, queue, routingKey, tag string, prefetch int) *RabbitMQ { + conn, err := amqp.Dial(amqpURI) + if err != nil { + log.Fatalf("writer failed to connect to Rabbit: %s", err) + return nil + } + + go func() { + log.Printf("writer closing: %s", <-conn.NotifyClose(make(chan *amqp.Error))) + log.Printf("writer blocked by rabbit: %v", <-conn.NotifyBlocked(make(chan amqp.Blocking))) + }() + + channel, err := conn.Channel() + if err != nil { + log.Fatalf("writer failed to get a channel from Rabbit: %s", err) + return nil + } + + q, err := channel.QueueDeclarePassive( + queue, // name of the queue + true, // durable + false, // delete when usused + false, // exclusive + false, // noWait + nil, // arguments + ) + if err != nil { + log.Fatalf("Queue Declare: %s", err) + } + if q.Messages == 0 { + log.Fatalf("No messages in RabbitMQ Queue: %s", q.Name) + } + if err = channel.QueueBind( + q.Name, // name of the queue + routingKey, // bindingKey + exchange, // sourceExchange + false, // noWait + nil, // arguments + ); err != nil { + log.Fatalf("Queue Bind: %s", err) + } + + r := &RabbitMQ{ + conn: conn, + channel: channel, + exchange: exchange, + contentType: "application/json", + contentEncoding: "UTF-8", + } + log.Print("RabbitMQ connected: ", amqpURI) + log.Printf("Bind to Exchange: %q and Queue: %q, Messaging waiting: %d", exchange, queue, q.Messages) + + return r +} + +// Consume outputs a stream of Message into a channel from rabbit +func (r *RabbitMQ) Consume(out chan Message) { + + // set up a channel consumer + deliveries, err := r.channel.Consume( + r.queue, // name + r.tag, // consumerTag, + false, // noAck + false, // exclusive + false, // noLocal + false, // noWait + nil, // arguments + ) + if err != nil { + log.Fatalf("rabbit consumer failed %s", err) + } + + // process deliveries from the queue + for d := range deliveries { + // create a new Message for the rabbit message + msg := Message{ + Body: d.Body, + RoutingKey: d.RoutingKey, + Headers: d.Headers, + } + // write Message to channel + out <- msg + // ack message + r.channel.Ack(d.DeliveryTag, false) + } + + log.Print("All messages consumed") + + // when deliveries are done, close + close(out) +} diff --git a/rmq/message.go b/rmq/message.go new file mode 100644 index 0000000..060ca25 --- /dev/null +++ b/rmq/message.go @@ -0,0 +1,50 @@ +// Copyright © 2017 Meltwater +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rmq + +// Message contains the most basic about the message +type Message struct { + Body []byte + RoutingKey string + Headers map[string]interface{} +} + +// NewMessageFromAttrs will create a new message from a byte slice and attributes +func NewMessageFromAttrs(bytes []byte, attrs map[string]string) *Message { + + // add amqp header information to the Message + var headers = make(map[string]interface{}) + var key string + + // need to support more than just string here for v + for k, v := range attrs { + switch k { + // use the routing key from tarball header configuration + case "amqp.routingKey": + key = v + default: + headers[k] = v + } + } + + // create a message + m := &Message{ + Body: bytes, + RoutingKey: key, + Headers: headers, + } + + return m +} diff --git a/rmq/publish.go b/rmq/publish.go new file mode 100644 index 0000000..cdb9824 --- /dev/null +++ b/rmq/publish.go @@ -0,0 +1,94 @@ +// Copyright © 2017 Meltwater +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rmq + +import ( + "log" + + "github.com/streadway/amqp" +) + +// NewPublisher creates and sets up a RabbitMQ Publisher +func NewPublisher(amqpURI, exchange, queue, tag string, prefetch int) *RabbitMQ { + conn, err := amqp.Dial(amqpURI) + if err != nil { + log.Fatalf("writer failed to connect to Rabbit: %s", err) + return nil + } + + go func() { + log.Printf("writer closing: %s", <-conn.NotifyClose(make(chan *amqp.Error))) + log.Printf("writer blocked by rabbit: %v", <-conn.NotifyBlocked(make(chan amqp.Blocking))) + }() + + channel, err := conn.Channel() + if err != nil { + log.Fatalf("writer failed to get a channel from Rabbit: %s", err) + return nil + } + + if err = channel.ExchangeDeclarePassive( + exchange, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // noWait + nil, // arguments + ); err != nil { + log.Fatalf("Exchange Declare: %s", err) + } + + r := &RabbitMQ{ + conn: conn, + channel: channel, + exchange: exchange, + contentType: "application/json", + contentEncoding: "UTF-8", + } + log.Print("RabbitMQ connected: ", amqpURI) + + return r +} + +// Publish Takes stream of messages and publish them to rabbit +func (r *RabbitMQ) Publish(messages chan Message, o Override) { + for m := range messages { + + // override routingKey stored in Message with the executed options + var routingKey string + if o.RoutingKey != "#" { + routingKey = o.RoutingKey + } else { + routingKey = m.RoutingKey + } + + if err := r.channel.Publish( + r.exchange, + routingKey, + false, // mandatory + false, // immediate + amqp.Publishing{ + Headers: m.Headers, + ContentType: r.contentType, + ContentEncoding: r.contentEncoding, + Body: m.Body, + DeliveryMode: amqp.Persistent, + }, + ); err != nil { + log.Fatalf("writer failed to write document to rabbit: %s", err) + } + } +} diff --git a/rmq/rabbitmq.go b/rmq/rabbitmq.go new file mode 100644 index 0000000..4f7d87e --- /dev/null +++ b/rmq/rabbitmq.go @@ -0,0 +1,37 @@ +// Copyright © 2017 Meltwater +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rmq + +import "github.com/streadway/amqp" + +// RabbitMQ type for talking to RabbitMQ +type RabbitMQ struct { + conn *amqp.Connection + channel *amqp.Channel + override Override + exchange string + contentType string + contentEncoding string + queue string + tag string + prefetch int + consume bool + publish bool +} + +// Override will be used to override RabbitMQ settings on publishing messages +type Override struct { + RoutingKey string +}