Skip to content
This repository has been archived by the owner on Feb 8, 2023. It is now read-only.

Commit

Permalink
Add support for bool, float and int amqp headers (#11)
Browse files Browse the repository at this point in the history
* added support for bool, float and int amqp headers

* add documentation about amqp headers

* make sure cache is not on when running go test
  • Loading branch information
stiangrindvoll authored Mar 21, 2018
1 parent 3794748 commit ac24f6e
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 30 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ rabbitio
data/
build/
vendor/
message*
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ deps:

test:
go vet ${DIRECTORIES}
go test -v ${DIRECTORIES}
GOCACHE=off go test -v -cover ${DIRECTORIES}

build:
go build -o ${NAME} -ldflags "-X main.version=${VERSION}" main.go
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ Flags:
Use "rabbitio [command] --help" for more information about a command.
```
### AMQP Headers and Routing Key

Currently RabbitIO supports AMQP Headers of the types:
* string
* number
* boolean

When you read messages from a queue, the headers as well as the routing key will be saved as metadata in the tarballs, utilizing what in tar is called XAttrs. This is helpful if you one day want to replay the data back into the original queue, while keeping the attributes that belong to the message. This currently only works on messages in the tarballs that has been written by RabbitIO.

## Contributing

Expand Down
11 changes: 2 additions & 9 deletions file/tarball.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,9 @@ func (t *TarballBuilder) addFile(tw *tar.Writer, name string, m *rmq.Message) er
header.Size = int64(len(m.Body))
header.Mode = 0644
header.ModTime = time.Now()
header.Xattrs = make(map[string]string)
header.Xattrs = m.ToXAttrs()
header.Xattrs["amqp.routingKey"] = m.RoutingKey

for k, v := range m.Headers {
switch v.(type) {
case string:
header.Xattrs[k] = v.(string)
}
}

if err := tw.WriteHeader(header); err != nil {
return err
}
Expand Down Expand Up @@ -121,7 +114,7 @@ func UnPack(wg *sync.WaitGroup, file *os.File, messages chan rmq.Message) (n int
}

// generate and push the message to the output channel
messages <- *rmq.NewMessageFromAttrs(buf.Bytes(), hdr.Xattrs)
messages <- *rmq.NewMessage(buf.Bytes(), hdr.Xattrs)
n++
}
return n, err
Expand Down
76 changes: 63 additions & 13 deletions rmq/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,19 @@

package rmq

import (
"fmt"
"strconv"
"strings"

"github.com/streadway/amqp"
)

// Message contains the most basic about the message
type Message struct {
Body []byte
RoutingKey string
Headers map[string]interface{}
Headers amqp.Table
DeliveryTag uint64
}

Expand All @@ -28,28 +36,70 @@ type Verify struct {
MultiAck bool
}

// NewMessageFromAttrs will create a new message from a byte slice and attributes
func NewMessageFromAttrs(bytes []byte, attrs map[string]string) *Message {
// ToXAttrs takes amqp headers and convert them to attributes
func (m *Message) ToXAttrs() map[string]string {

xattrs := make(map[string]string)
var headerType string

for k, v := range m.Headers {
switch v.(type) {
case int, int32, int64:
headerType = "int"
case float32, float64:
headerType = "float"
case bool:
headerType = "bool"
case string:
headerType = "string"
}
xattrs[fmt.Sprintf("amqp.Headers.%s.%s", headerType, k)] = fmt.Sprintf("%v", v)
}
return xattrs
}

// NewMessage will create a new message from a byte slice and attributes
func NewMessage(bytes []byte, xattr map[string]string) *Message {

// add amqp header information to the Message
var headers = make(map[string]interface{})
var key string
var headers = make(amqp.Table)
var routingKey string

// need to support more than just string here for v
for k, v := range attrs {
switch k {
// use the routing key from tarball header configuration
case "amqp.routingKey":
key = v
default:
headers[k] = v
for k, v := range xattr {

switch {
case k == "amqp.routingKey":
routingKey = v
case strings.HasPrefix(k, "amqp.Headers."):
// th is now [type, header]
th := strings.SplitN(strings.TrimPrefix(k, "amqp.Headers."), ".", 2)
headerType := th[0]
header := strings.Join(th[1:], ".")

switch headerType {
case "bool":
if b, err := strconv.ParseBool(v); err == nil {
headers[header] = b
}
case "int":
if i, err := strconv.ParseInt(v, 10, 64); err == nil {
headers[header] = i
}
case "float":
if f, err := strconv.ParseFloat(v, 64); err == nil {
headers[header] = f
}
case "string":
headers[header] = v
}
}
}

// create a message
m := &Message{
Body: bytes,
RoutingKey: key,
RoutingKey: routingKey,
Headers: headers,
}

Expand Down
57 changes: 51 additions & 6 deletions rmq/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,62 @@ package rmq
import (
"testing"

"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
)

func TestNewMessageFromAttrs(t *testing.T) {
var (
myStringHeader = "amqp.Headers.string.myStringHeader"
myInt32Header = "amqp.Headers.int.myInt32Header"
myInt64Header = "amqp.Headers.int.myInt64Header"
myFloat32Header = "amqp.Headers.float.myFloat32Header"
myFloat64Header = "amqp.Headers.float.myFloat64Header"
myBoolHeader = "amqp.Headers.bool.myBoolHeader"
)

func TestToXAttrs(t *testing.T) {
messageHeaders := make(amqp.Table)
messageHeaders["myStringHeader"] = "myString"
messageHeaders["myInt32Header"] = int32(32)
messageHeaders["myInt64Header"] = int64(64)
messageHeaders["myFloat32Header"] = float32(32.32)
messageHeaders["myFloat64Header"] = float64(64.64)
messageHeaders["myBoolHeader"] = true
message := &Message{Headers: messageHeaders}

var attrHeaders = make(map[string]string)
attrHeaders[myStringHeader] = "myString"
attrHeaders[myInt32Header] = "32"
attrHeaders[myInt64Header] = "64"
attrHeaders[myFloat32Header] = "32.32"
attrHeaders[myFloat64Header] = "64.64"
attrHeaders[myBoolHeader] = "true"

attrs := message.ToXAttrs()

assert.NoError(t, messageHeaders.Validate(), "should be valid Headers")

assert.Equal(t, attrHeaders[myStringHeader], attrs[myStringHeader])
assert.Equal(t, attrHeaders[myInt32Header], attrs[myInt32Header])
assert.Equal(t, attrHeaders[myInt64Header], attrs[myInt64Header])
assert.Equal(t, attrHeaders[myFloat32Header], attrs[myFloat32Header])
assert.Equal(t, attrHeaders[myFloat64Header], attrs[myFloat64Header])
assert.Equal(t, attrHeaders[myBoolHeader], attrs[myBoolHeader])
}

func TestNewMessage(t *testing.T) {
var headers = make(map[string]string)
headers["amqp.routingKey"] = "routingKey from tarball header"
headers["myHeaderString"] = "myString"
headers["amqp.routingKey"] = "routingKey from tarball XAttrs"
headers[myStringHeader] = "myString"
headers[myInt32Header] = "3232"
headers[myInt64Header] = "6464"
headers[myFloat32Header] = "32.123"
headers[myFloat64Header] = "64.123"
headers[myBoolHeader] = "true"

m := *NewMessageFromAttrs([]byte("Message"), headers)
m := NewMessage([]byte("Message"), headers)

assert.Equal(t, "routingKey from tarball header", m.RoutingKey)
assert.Equal(t, "routingKey from tarball XAttrs", m.RoutingKey)
assert.Equal(t, []byte("Message"), m.Body)
assert.Equal(t, "myString", m.Headers["myHeaderString"])
assert.NoError(t, m.Headers.Validate())
}

0 comments on commit ac24f6e

Please sign in to comment.