Skip to content

Commit

Permalink
Support Metadata in struct tags to fetch TTL and Generation via GetOb…
Browse files Browse the repository at this point in the history
…ject
  • Loading branch information
khaf committed Mar 7, 2016
1 parent b017ba8 commit a927c8e
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 25 deletions.
13 changes: 10 additions & 3 deletions batch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ type baseMultiCommand struct {

errChan chan error

resObjType reflect.Type
selectCases []reflect.SelectCase
resObjType reflect.Type
resObjMappings map[string]string
selectCases []reflect.SelectCase
}

func newMultiCommand(node *Node, recordset *Recordset) *baseMultiCommand {
Expand All @@ -61,6 +62,7 @@ func newMultiCommand(node *Node, recordset *Recordset) *baseMultiCommand {
if cmd.recordset != nil && !cmd.recordset.objChan.IsNil() {
// this channel must be of type chan *T
cmd.resObjType = cmd.recordset.objChan.Type().Elem().Elem()
cmd.resObjMappings = objectMappings.getMapping(cmd.recordset.objChan.Type().Elem().Elem())

cmd.selectCases = []reflect.SelectCase{
reflect.SelectCase{Dir: reflect.SelectSend, Chan: cmd.recordset.objChan},
Expand Down Expand Up @@ -342,7 +344,12 @@ func (cmd *baseMultiCommand) parseObject(
return err
}

if err := setObjectField(obj, name, value); err != nil {
iobj := reflect.Indirect(obj)
for iobj.Kind() == reflect.Ptr {
iobj = reflect.Indirect(iobj)
}

if err := setObjectField(cmd.resObjMappings, iobj, name, value); err != nil {
return err
}
}
Expand Down
44 changes: 44 additions & 0 deletions client_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,50 @@ var _ = Describe("Aerospike", func() {

}) // PutObject context

Context("Metadata operations", func() {

It("must save an object and read its metadata back", func() {

type objMeta struct {
TTL1, TTL2 uint32 `asm:"ttl"`
GEN1, GEN2 uint32 `asm:"gen"`
Val int `as:"val"`
}

testObj := objMeta{Val: 1}
err := client.PutObject(nil, key, &testObj)
Expect(err).ToNot(HaveOccurred())

rec, err := client.Get(nil, key)
Expect(err).ToNot(HaveOccurred())
Expect(rec.Bins).To(Equal(BinMap{"val": 1}))

resObj := &objMeta{}
err = client.GetObject(nil, key, resObj)
Expect(err).ToNot(HaveOccurred())

Expect(resObj.TTL1).NotTo(Equal(uint32(0)))
Expect(resObj.TTL1).To(Equal(resObj.TTL2))

Expect(resObj.GEN1).To(Equal(uint32(1)))
Expect(resObj.GEN2).To(Equal(uint32(1)))

// put it again to check the generation
err = client.PutObject(nil, key, &testObj)
Expect(err).ToNot(HaveOccurred())

err = client.GetObject(nil, key, resObj)
Expect(err).ToNot(HaveOccurred())

Expect(resObj.TTL1).NotTo(Equal(uint32(0)))
Expect(resObj.TTL1).To(Equal(resObj.TTL2))

Expect(resObj.GEN1).To(Equal(uint32(2)))
Expect(resObj.GEN2).To(Equal(uint32(2)))
})

}) // PutObject context

Context("ScanObjects operations", func() {

type InnerStruct struct {
Expand Down
73 changes: 59 additions & 14 deletions marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package aerospike

import (
"fmt"
"math"
"reflect"
"strings"
Expand All @@ -23,8 +24,9 @@ import (
)

const (
aerospikeTag = "as"
keyTag = "key"
aerospikeTag = "as"
aerospikeMetaTag = "asm"
keyTag = "key"
)

func valueToInterface(f reflect.Value, clusterSupportsFloat bool) interface{} {
Expand Down Expand Up @@ -90,6 +92,11 @@ func valueToInterface(f reflect.Value, clusterSupportsFloat bool) interface{} {
}
}

func fieldIsMetadata(f reflect.StructField) bool {
meta := f.Tag.Get(aerospikeMetaTag)
return strings.Trim(meta, " ") != ""
}

func fieldAlias(f reflect.StructField) string {
alias := f.Tag.Get(aerospikeTag)
if alias != "" {
Expand Down Expand Up @@ -122,17 +129,22 @@ func structToMap(s reflect.Value, clusterSupportsFloat bool) map[string]interfac
continue
}

binValue := valueToInterface(s.Field(i), clusterSupportsFloat)

if binMap == nil {
binMap = make(map[string]interface{}, numFields)
if fieldIsMetadata(typeOfT.Field(i)) {
continue
}

// skip transiet fields tagged `-`
alias := fieldAlias(typeOfT.Field(i))
if alias == "" {
continue
}

binValue := valueToInterface(s.Field(i), clusterSupportsFloat)

if binMap == nil {
binMap = make(map[string]interface{}, numFields)
}

binMap[alias] = binValue
}

Expand Down Expand Up @@ -163,14 +175,18 @@ func marshal(v interface{}, clusterSupportsFloat bool) []*Bin {
type SyncMap struct {
objectMappings map[reflect.Type]map[string]string
objectFields map[reflect.Type][]string
objectTTLs map[reflect.Type][]string
objectGen map[reflect.Type][]string
mutex sync.RWMutex
}

func (sm *SyncMap) setMapping(obj reflect.Value, mapping map[string]string, fields []string) {
func (sm *SyncMap) setMapping(obj reflect.Value, mapping map[string]string, fields, ttl, gen []string) {
objType := obj.Type()
sm.mutex.Lock()
sm.objectMappings[objType] = mapping
sm.objectFields[objType] = fields
sm.objectTTLs[objType] = ttl
sm.objectGen[objType] = gen
sm.mutex.Unlock()
}

Expand All @@ -182,15 +198,23 @@ func (sm *SyncMap) mappingExists(obj reflect.Value) bool {
return exists
}

func (sm *SyncMap) getMapping(obj reflect.Value) map[string]string {
func (sm *SyncMap) getMapping(objType reflect.Type) map[string]string {
sm.mutex.RLock()
mapping := sm.objectMappings[objType]
sm.mutex.RUnlock()
return mapping
}

func (sm *SyncMap) getMetaMappings(obj reflect.Value) (ttl, gen []string) {
if !obj.IsValid() {
return nil
return nil, nil
}
objType := obj.Type()
sm.mutex.RLock()
mapping := sm.objectMappings[objType]
ttl = sm.objectTTLs[objType]
gen = sm.objectGen[objType]
sm.mutex.RUnlock()
return mapping
return ttl, gen
}

func (sm *SyncMap) getFields(obj reflect.Value) []string {
Expand All @@ -201,7 +225,12 @@ func (sm *SyncMap) getFields(obj reflect.Value) []string {
return fields
}

var objectMappings = &SyncMap{objectMappings: map[reflect.Type]map[string]string{}, objectFields: map[reflect.Type][]string{}}
var objectMappings = &SyncMap{
objectMappings: map[reflect.Type]map[string]string{},
objectFields: map[reflect.Type][]string{},
objectTTLs: map[reflect.Type][]string{},
objectGen: map[reflect.Type][]string{},
}

func cacheObjectTags(obj reflect.Value) {
// exit if already processed
Expand All @@ -218,6 +247,8 @@ func cacheObjectTags(obj reflect.Value) {

mapping := map[string]string{}
fields := []string{}
ttl := []string{}
gen := []string{}

typeOfT := obj.Type()
numFields := obj.NumField()
Expand All @@ -229,15 +260,29 @@ func cacheObjectTags(obj reflect.Value) {
}

tag := strings.Trim(f.Tag.Get(aerospikeTag), " ")
if tag != "-" {
tagM := strings.Trim(f.Tag.Get(aerospikeMetaTag), " ")

if tag != "" && tagM != "" {
panic(fmt.Sprintf("Cannot accept both data and metadata tags on the same attribute on struct: %s.%s", obj.Type().Name(), f.Name))
}

if tag != "-" && tagM == "" {
if tag != "" {
mapping[tag] = f.Name
fields = append(fields, tag)
} else {
fields = append(fields, f.Name)
}
}

if tagM == "ttl" {
ttl = append(ttl, f.Name)
} else if tagM == "gen" {
gen = append(gen, f.Name)
} else if tagM != "" {
panic(fmt.Sprintf("Invalid metadata tag `%s` on struct attribute: %s.%s", tagM, obj.Type().Name(), f.Name))
}
}

objectMappings.setMapping(obj, mapping, fields)
objectMappings.setMapping(obj, mapping, fields, ttl, gen)
}
52 changes: 44 additions & 8 deletions read_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@ func (cmd *readCommand) parseObject(
cacheObjectTags(rv)
}

// find the name based on tag mapping
iobj := reflect.Indirect(rv)
for iobj.Kind() == reflect.Ptr {
iobj = reflect.Indirect(iobj)
}
mappings := objectMappings.getMapping(iobj.Type())

setObjectMetaFields(iobj, TTL(expiration), generation)

for i := 0; i < opCount; i++ {
opSize := int(Buffer.BytesToUint32(cmd.dataBuffer, receiveOffset))
particleType := int(cmd.dataBuffer[receiveOffset+5])
Expand All @@ -233,7 +242,7 @@ func (cmd *readCommand) parseObject(

particleBytesSize := int(opSize - (4 + nameSize))
value, _ := bytesToParticle(particleType, cmd.dataBuffer, receiveOffset, particleBytesSize)
if err := setObjectField(rv, name, value); err != nil {
if err := setObjectField(mappings, iobj, name, value); err != nil {
return err
}

Expand All @@ -251,17 +260,38 @@ func (cmd *readCommand) Execute() error {
return cmd.execute(cmd)
}

func setObjectField(obj reflect.Value, fieldName string, value interface{}) error {
func setObjectMetaFields(obj reflect.Value, ttl, gen uint32) error {
// find the name based on tag mapping
iobj := reflect.Indirect(obj)

ttlMap, genMap := objectMappings.getMetaMappings(iobj)

if ttlMap != nil {
for i := range ttlMap {
f := iobj.FieldByName(ttlMap[i])
setValue(f, ttl)
}
}

if genMap != nil {
for i := range genMap {
f := iobj.FieldByName(genMap[i])
setValue(f, gen)
}
}

return nil
}

func setObjectField(mappings map[string]string, obj reflect.Value, fieldName string, value interface{}) error {
if value == nil {
return nil
}

// find the name based on tag mapping
iobj := reflect.Indirect(obj)
if name, exists := objectMappings.getMapping(iobj)[fieldName]; exists {
if name, exists := mappings[fieldName]; exists {
fieldName = name
}
f := iobj.FieldByName(fieldName)
f := obj.FieldByName(fieldName)
setValue(f, value)

return nil
Expand All @@ -277,10 +307,16 @@ func setValue(f reflect.Value, value interface{}) error {
switch v := value.(type) {
case uint8:
f.SetUint(uint64(v))
case int:
case uint16:
f.SetUint(uint64(v))
case uint32:
f.SetUint(uint64(v))
case uint64:
f.SetUint(uint64(v))
case uint:
f.SetUint(uint64(v))
default:
f.SetUint(value.(uint64))
f.SetUint(uint64(value.(int)))
}
case reflect.Float64, reflect.Float32:
// if value has returned as a float
Expand Down

0 comments on commit a927c8e

Please sign in to comment.