Skip to content

Commit

Permalink
[CLIENT-3217] Do not send nil or NullValue as key to the server
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Dec 18, 2024
1 parent ab899d5 commit 4ea7c50
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 12 deletions.
4 changes: 2 additions & 2 deletions batch_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (bd *BatchDelete) size(parentPolicy *BasePolicy) (int, Error) {
}
}

if bd.Policy.SendKey || parentPolicy.SendKey {
if (bd.Policy.SendKey || parentPolicy.SendKey) && bd.Key.hasValueToSend() {
if sz, err := bd.Key.userKey.EstimateSize(); err != nil {
return -1, err
} else {
size += sz + int(_FIELD_HEADER_SIZE) + 1
}
}
} else if parentPolicy.SendKey {
} else if parentPolicy.SendKey && bd.Key.hasValueToSend() {
sz, err := bd.Key.userKey.EstimateSize()
if err != nil {
return -1, err
Expand Down
4 changes: 2 additions & 2 deletions batch_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ func (bu *BatchUDF) size(parentPolicy *BasePolicy) (int, Error) {
size += sz + int(_FIELD_HEADER_SIZE)
}

if bu.Policy.SendKey || parentPolicy.SendKey {
if (bu.Policy.SendKey || parentPolicy.SendKey) && bu.Key.hasValueToSend() {
if sz, err := bu.Key.userKey.EstimateSize(); err != nil {
return -1, err
} else {
size += sz + int(_FIELD_HEADER_SIZE) + 1
}
}
} else if parentPolicy.SendKey {
} else if parentPolicy.SendKey && bu.Key.hasValueToSend() {
sz, err := bu.Key.userKey.EstimateSize()
if err != nil {
return -1, err
Expand Down
4 changes: 2 additions & 2 deletions batch_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (bw *BatchWrite) size(parentPolicy *BasePolicy) (int, Error) {
}
}

if bw.Policy.SendKey || parentPolicy.SendKey {
if (bw.Policy.SendKey || parentPolicy.SendKey) && bw.Key.hasValueToSend() {
if sz, err := bw.Key.userKey.EstimateSize(); err != nil {
return -1, err
} else {
size += sz + int(_FIELD_HEADER_SIZE) + 1
}
}
} else if parentPolicy.SendKey {
} else if parentPolicy.SendKey && bw.Key.hasValueToSend() {
sz, err := bw.Key.userKey.EstimateSize()
if err != nil {
return -1, err
Expand Down
38 changes: 38 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,44 @@ var _ = gg.Describe("Aerospike", func() {

gg.Context("Put operations", func() {

gg.It("must support nil key values", func() {
wpolicy := as.NewWritePolicy(0, 0)
keyp, _ := as.NewKey(ns, set, 1)
key, _ := as.NewKeyWithDigest(ns, set, nil, keyp.Digest())
bin := as.NewBin("Aerospike", "value")

wpolicy.SendKey = true
err = client.PutBins(wpolicy, keyp, bin)
gm.Expect(err).ToNot(gm.HaveOccurred())

err = client.PutBins(wpolicy, key, bin)
gm.Expect(err).ToNot(gm.HaveOccurred())

wpolicy.SendKey = false
key.RemoveValue()
err = client.PutBins(wpolicy, key, bin)
gm.Expect(err).ToNot(gm.HaveOccurred())

wpolicy.SendKey = true
key.RemoveValue()
err = client.PutBins(wpolicy, key, bin)
gm.Expect(err).ToNot(gm.HaveOccurred())

// scan the record back and make sure the values
// for the key are correct and intact in the database
pf := as.NewPartitionFilterAll()
spolicy := as.NewScanPolicy()
spolicy.MaxRecords = 1

recordset, err := client.ScanPartitions(spolicy, pf, ns, set)
gm.Expect(err).ToNot(gm.HaveOccurred())
for res := range recordset.Results() {
gm.Expect(res.Err).ToNot(gm.HaveOccurred())
gm.Expect(res.Record.Bins[bin.Name]).To(gm.Equal(bin.Value.GetObject().(string)))
gm.Expect(res.Record.Key.Value()).To(gm.Equal(as.NewLongValue(1)))
}
})

gg.Context("Expiration values", func() {

gg.BeforeEach(func() {
Expand Down
10 changes: 5 additions & 5 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func (cmd *baseCommand) setBatchOperate(policy *BatchPolicy, keys []*Key, batch
cmd.dataOffset += len(key.namespace) + int(_FIELD_HEADER_SIZE)
cmd.dataOffset += len(key.setName) + int(_FIELD_HEADER_SIZE)

if attr.sendKey {
if attr.sendKey && key.hasValueToSend() {
if sz, err := key.userKey.EstimateSize(); err != nil {
return err
} else {
Expand Down Expand Up @@ -920,7 +920,7 @@ func (cmd *baseCommand) setBatchUDF(policy *BatchPolicy, keys []*Key, batch *bat
cmd.dataOffset += len(key.namespace) + int(_FIELD_HEADER_SIZE)
cmd.dataOffset += len(key.setName) + int(_FIELD_HEADER_SIZE)

if attr.sendKey {
if attr.sendKey && key.hasValueToSend() {
if sz, err := key.userKey.EstimateSize(); err != nil {
return err
} else {
Expand Down Expand Up @@ -1054,7 +1054,7 @@ func (cmd *baseCommand) writeBatchWrite(key *Key, attr *batchAttr, filter *Expre
cmd.WriteUint16(uint16(attr.generation))
cmd.WriteUint32(attr.expiration)

if attr.sendKey {
if attr.sendKey && key.hasValueToSend() {
fieldCount++
cmd.writeBatchFieldsWithFilter(key, filter, fieldCount, opCount)
cmd.writeFieldValue(key.userKey, KEY)
Expand Down Expand Up @@ -1886,7 +1886,7 @@ func (cmd *baseCommand) estimateKeySize(key *Key, sendKey bool) (int, Error) {
cmd.dataOffset += int(_DIGEST_SIZE + _FIELD_HEADER_SIZE)
fieldCount++

if sendKey {
if sendKey && key.hasValueToSend() {
// field header size + key size
sz, err := key.userKey.EstimateSize()
if err != nil {
Expand Down Expand Up @@ -2190,7 +2190,7 @@ func (cmd *baseCommand) writeKey(key *Key, sendKey bool) Error {

cmd.writeFieldBytes(key.digest[:], DIGEST_RIPE)

if sendKey {
if sendKey && key.hasValueToSend() {
if err := cmd.writeFieldValue(key.userKey, KEY); err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,11 @@ func (nd *Node) ConnsCount() int {
func (nd *Node) CloseConnections() {
nd.closeConnections()
}

func (k *Key) HasValueToSend() bool {
return k.hasValueToSend()
}

func (k *Key) RemoveValue() {
k.userKey = nil
}
5 changes: 5 additions & 0 deletions key.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,8 @@ func (ky *Key) PartitionId() int {
// First AND makes positive and negative correctly, then mod.
return int(Buffer.LittleBytesToInt32(ky.digest[:], 0)&0xFFFF) & (_PARTITIONS - 1)
}

// returns true if the key has an associated value that can be sent to the server
func (ky *Key) hasValueToSend() bool {
return ky.userKey != nil && ky.userKey != nullValue
}
2 changes: 2 additions & 0 deletions key_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ func (vb *keyWriter) writeKey(val Value) Error {
case BytesValue:
vb.Write(v)
return nil
case NullValue:
return nil
}

// TODO: Replace the error message with fmt.Sprintf("Key Generation Error. Value type not supported: %T", val)
Expand Down
6 changes: 6 additions & 0 deletions key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ var _ = gg.Describe("Key Test", func() {

// })

gg.It("null Key value should be handled correctly", func() {
key, _ := as.NewKeyWithDigest("namespace", "set", nil, []byte("01234567890123456789"))
gm.Expect(key.Digest()).To(gm.Equal([]byte("01234567890123456789")))
gm.Expect(key.HasValueToSend()).To(gm.BeFalse())
})

gg.It("for custom digest", func() {
// key, _ := as.NewKey("namespace", "set", []interface{}{})
// gm.Expect(hex.EncodeToString(key.Digest())).To(gm.Equal("2af0111192df4ca297232d1641ff52c2ce51ce2d"))
Expand Down
2 changes: 1 addition & 1 deletion value.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,7 @@ func bytesToKeyValue(pType int, buf []byte, offset int, length int) (Value, Erro
return ListValue(v), nil

case ParticleType.NULL:
return NewNullValue(), nil
return nil, nil

default:
return nil, newError(types.PARSE_ERROR, fmt.Sprintf("ParticleType %d not recognized. Please file a github issue.", pType))
Expand Down

0 comments on commit 4ea7c50

Please sign in to comment.