diff --git a/.gitignore b/.gitignore index d28f4dd..a9ac1b8 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,3 @@ rabbitio data/ build/ vendor/ -message* diff --git a/Makefile b/Makefile index 34ec517..7cf51d3 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ deps: test: go vet ${DIRECTORIES} - go test -v ${DIRECTORIES} + GOCACHE=off go test -v -cover ${DIRECTORIES} build: go build -o ${NAME} -ldflags "-X main.version=${VERSION}" main.go diff --git a/README.md b/README.md index 6db6787..7c1ea16 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,14 @@ Flags: Use "rabbitio [command] --help" for more information about a command. ``` +### AMQP Headers and Routing Key +Currently RabbitIO supports AMQP Headers of the types: +* string +* number +* boolean + +When you read messages from a queue, the headers as well as the routing key will be saved as metadata in the tarballs, utilizing what in tar is called XAttrs. This is helpful if you one day want to replay the data back into the original queue, while keeping the attributes that belong to the message. This currently only works on messages in the tarballs that has been written by RabbitIO. ## Contributing diff --git a/file/tarball.go b/file/tarball.go index 347a059..40aaf03 100644 --- a/file/tarball.go +++ b/file/tarball.go @@ -71,16 +71,9 @@ func (t *TarballBuilder) addFile(tw *tar.Writer, name string, m *rmq.Message) er header.Size = int64(len(m.Body)) header.Mode = 0644 header.ModTime = time.Now() - header.Xattrs = make(map[string]string) + header.Xattrs = m.ToXAttrs() header.Xattrs["amqp.routingKey"] = m.RoutingKey - for k, v := range m.Headers { - switch v.(type) { - case string: - header.Xattrs[k] = v.(string) - } - } - if err := tw.WriteHeader(header); err != nil { return err } @@ -121,7 +114,7 @@ func UnPack(wg *sync.WaitGroup, file *os.File, messages chan rmq.Message) (n int } // generate and push the message to the output channel - messages <- *rmq.NewMessageFromAttrs(buf.Bytes(), hdr.Xattrs) + messages <- *rmq.NewMessage(buf.Bytes(), hdr.Xattrs) n++ } return n, err diff --git a/rmq/message.go b/rmq/message.go index bea777e..f07375a 100644 --- a/rmq/message.go +++ b/rmq/message.go @@ -14,11 +14,19 @@ package rmq +import ( + "fmt" + "strconv" + "strings" + + "github.com/streadway/amqp" +) + // Message contains the most basic about the message type Message struct { Body []byte RoutingKey string - Headers map[string]interface{} + Headers amqp.Table DeliveryTag uint64 } @@ -28,28 +36,70 @@ type Verify struct { MultiAck bool } -// NewMessageFromAttrs will create a new message from a byte slice and attributes -func NewMessageFromAttrs(bytes []byte, attrs map[string]string) *Message { +// ToXAttrs takes amqp headers and convert them to attributes +func (m *Message) ToXAttrs() map[string]string { + + xattrs := make(map[string]string) + var headerType string + + for k, v := range m.Headers { + switch v.(type) { + case int, int32, int64: + headerType = "int" + case float32, float64: + headerType = "float" + case bool: + headerType = "bool" + case string: + headerType = "string" + } + xattrs[fmt.Sprintf("amqp.Headers.%s.%s", headerType, k)] = fmt.Sprintf("%v", v) + } + return xattrs +} + +// NewMessage will create a new message from a byte slice and attributes +func NewMessage(bytes []byte, xattr map[string]string) *Message { // add amqp header information to the Message - var headers = make(map[string]interface{}) - var key string + var headers = make(amqp.Table) + var routingKey string // need to support more than just string here for v - for k, v := range attrs { - switch k { - // use the routing key from tarball header configuration - case "amqp.routingKey": - key = v - default: - headers[k] = v + for k, v := range xattr { + + switch { + case k == "amqp.routingKey": + routingKey = v + case strings.HasPrefix(k, "amqp.Headers."): + // th is now [type, header] + th := strings.SplitN(strings.TrimPrefix(k, "amqp.Headers."), ".", 2) + headerType := th[0] + header := strings.Join(th[1:], ".") + + switch headerType { + case "bool": + if b, err := strconv.ParseBool(v); err == nil { + headers[header] = b + } + case "int": + if i, err := strconv.ParseInt(v, 10, 64); err == nil { + headers[header] = i + } + case "float": + if f, err := strconv.ParseFloat(v, 64); err == nil { + headers[header] = f + } + case "string": + headers[header] = v + } } } // create a message m := &Message{ Body: bytes, - RoutingKey: key, + RoutingKey: routingKey, Headers: headers, } diff --git a/rmq/message_test.go b/rmq/message_test.go index 32751cd..4ca08a9 100644 --- a/rmq/message_test.go +++ b/rmq/message_test.go @@ -17,17 +17,62 @@ package rmq import ( "testing" + "github.com/streadway/amqp" "github.com/stretchr/testify/assert" ) -func TestNewMessageFromAttrs(t *testing.T) { +var ( + myStringHeader = "amqp.Headers.string.myStringHeader" + myInt32Header = "amqp.Headers.int.myInt32Header" + myInt64Header = "amqp.Headers.int.myInt64Header" + myFloat32Header = "amqp.Headers.float.myFloat32Header" + myFloat64Header = "amqp.Headers.float.myFloat64Header" + myBoolHeader = "amqp.Headers.bool.myBoolHeader" +) + +func TestToXAttrs(t *testing.T) { + messageHeaders := make(amqp.Table) + messageHeaders["myStringHeader"] = "myString" + messageHeaders["myInt32Header"] = int32(32) + messageHeaders["myInt64Header"] = int64(64) + messageHeaders["myFloat32Header"] = float32(32.32) + messageHeaders["myFloat64Header"] = float64(64.64) + messageHeaders["myBoolHeader"] = true + message := &Message{Headers: messageHeaders} + + var attrHeaders = make(map[string]string) + attrHeaders[myStringHeader] = "myString" + attrHeaders[myInt32Header] = "32" + attrHeaders[myInt64Header] = "64" + attrHeaders[myFloat32Header] = "32.32" + attrHeaders[myFloat64Header] = "64.64" + attrHeaders[myBoolHeader] = "true" + + attrs := message.ToXAttrs() + + assert.NoError(t, messageHeaders.Validate(), "should be valid Headers") + + assert.Equal(t, attrHeaders[myStringHeader], attrs[myStringHeader]) + assert.Equal(t, attrHeaders[myInt32Header], attrs[myInt32Header]) + assert.Equal(t, attrHeaders[myInt64Header], attrs[myInt64Header]) + assert.Equal(t, attrHeaders[myFloat32Header], attrs[myFloat32Header]) + assert.Equal(t, attrHeaders[myFloat64Header], attrs[myFloat64Header]) + assert.Equal(t, attrHeaders[myBoolHeader], attrs[myBoolHeader]) +} + +func TestNewMessage(t *testing.T) { var headers = make(map[string]string) - headers["amqp.routingKey"] = "routingKey from tarball header" - headers["myHeaderString"] = "myString" + headers["amqp.routingKey"] = "routingKey from tarball XAttrs" + headers[myStringHeader] = "myString" + headers[myInt32Header] = "3232" + headers[myInt64Header] = "6464" + headers[myFloat32Header] = "32.123" + headers[myFloat64Header] = "64.123" + headers[myBoolHeader] = "true" - m := *NewMessageFromAttrs([]byte("Message"), headers) + m := NewMessage([]byte("Message"), headers) - assert.Equal(t, "routingKey from tarball header", m.RoutingKey) + assert.Equal(t, "routingKey from tarball XAttrs", m.RoutingKey) assert.Equal(t, []byte("Message"), m.Body) - assert.Equal(t, "myString", m.Headers["myHeaderString"]) + assert.NoError(t, m.Headers.Validate()) }