Skip to content

Commit

Permalink
Merge pull request #393 from aerospike/v6.7.0
Browse files Browse the repository at this point in the history
v6.7.0
  • Loading branch information
khaf authored Dec 5, 2022
2 parents 65bfc45 + c1be1aa commit 029420b
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 18 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Change History

## December 5 2022: v6.7.0

This is a minor improvement and fix release.

* **Improvements**

- Improves testing for float64 precision formatting between amd64 and aarch64.

* **Fixes**

- [CLIENT-2019] Write Batch operation with an invalid namespace record causes all batch transactions to fail.
- [CLIENT-2020] Support `QueryPartitions` with non-nil filter (secondary index query)

## November 8 2022: v6.6.0

This is a minor improvement and fix release.
Expand Down
4 changes: 3 additions & 1 deletion batch_node_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,10 @@ func newBatchOperateNodeListIfc(cluster *Cluster, policy *BatchPolicy, records [
}

if err != nil {
records[i].chainError(err)
records[i].setError(err.resultCode(), false)
errs = chainErrors(err, errs)
// return nil, err
continue
}

if batchNode := findBatchNode(batchNodes, node); batchNode == nil {
Expand Down
90 changes: 90 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,96 @@ var _ = gg.Describe("Aerospike", func() {
})

gg.Context("BatchUDF operations", func() {
gg.It("must return the results when one operation is against an invalid namespace", func() {
luaCode := []byte(`-- Create a record
function rec_create(rec, bins)
if bins ~= nil then
for b, bv in map.pairs(bins) do
rec[b] = bv
end
end
status = aerospike:create(rec)
return status
end`)

client.RemoveUDF(nil, "test_ops.lua")
client.RegisterUDF(nil, luaCode, "test_ops.lua", as.LUA)

batchRecords := []as.BatchRecordIfc{}

key1, _ := as.NewKey(randString(10), set, 1)
args := make(map[interface{}]interface{})
args["bin1_str"] = "a"
batchRecords = append(batchRecords, as.NewBatchUDF(
nil,
key1,
"test_ops",
"rec_create",
as.NewMapValue(args),
))

key2, _ := as.NewKey(ns, set, 2)
batchRecords = append(batchRecords, as.NewBatchWrite(
nil,
key2,
as.PutOp(as.NewBin("bin1_str", "aa")),
))

key3, _ := as.NewKey(ns, set, 3)
batchRecords = append(batchRecords, as.NewBatchWrite(
nil,
key3,
as.PutOp(as.NewBin("bin1_str", "aaa")),
))

batchRecords = append(batchRecords, as.NewBatchRead(
key1,
[]string{"bin1_str"},
))

batchRecords = append(batchRecords, as.NewBatchRead(
key2,
[]string{"bin1_str"},
))

batchRecords = append(batchRecords, as.NewBatchRead(
key3,
[]string{"bin1_str"},
))

bp := as.NewBatchPolicy()
bp.RespondAllKeys = false
err := client.BatchOperate(bp, batchRecords)
gm.Expect(err).ToNot(gm.HaveOccurred())

gm.Expect(batchRecords[0].BatchRec().Err.Matches(types.INVALID_NAMESPACE)).To(gm.BeTrue())
gm.Expect(batchRecords[0].BatchRec().ResultCode).To(gm.Equal(types.INVALID_NAMESPACE))

gm.Expect(batchRecords[1].BatchRec().Err).To(gm.BeNil())
gm.Expect(batchRecords[1].BatchRec().ResultCode).To(gm.Equal(types.OK))
gm.Expect(batchRecords[1].BatchRec().Record.Bins).To(gm.Equal(as.BinMap{"bin1_str": nil}))

gm.Expect(batchRecords[2].BatchRec().Err).To(gm.BeNil())
gm.Expect(batchRecords[2].BatchRec().ResultCode).To(gm.Equal(types.OK))
gm.Expect(batchRecords[2].BatchRec().Record.Bins).To(gm.Equal(as.BinMap{"bin1_str": nil}))

gm.Expect(batchRecords[3].BatchRec().Err.Matches(types.INVALID_NAMESPACE)).To(gm.BeTrue())
gm.Expect(batchRecords[3].BatchRec().ResultCode).To(gm.Equal(types.INVALID_NAMESPACE))

gm.Expect(batchRecords[4].BatchRec().Err).To(gm.BeNil())
gm.Expect(batchRecords[4].BatchRec().ResultCode).To(gm.Equal(types.OK))
gm.Expect(batchRecords[4].BatchRec().Record.Bins).To(gm.Equal(as.BinMap{"bin1_str": "aa"}))

gm.Expect(batchRecords[5].BatchRec().Err).To(gm.BeNil())
gm.Expect(batchRecords[5].BatchRec().ResultCode).To(gm.Equal(types.OK))
gm.Expect(batchRecords[5].BatchRec().Record.Bins).To(gm.Equal(as.BinMap{"bin1_str": "aaa"}))

bp.RespondAllKeys = true
err = client.BatchOperate(bp, batchRecords)
gm.Expect(err).To(gm.HaveOccurred())
gm.Expect(err.Matches(types.INVALID_NAMESPACE)).To(gm.BeTrue())
})

gg.It("must return the result with same ordering", func() {
const keyCount = 50
keys := []*as.Key{}
Expand Down
6 changes: 1 addition & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (clnt *Client) BatchOperate(policy *BatchPolicy, records []BatchRecordIfc)
policy = clnt.getUsableBatchPolicy(policy)

batchNodes, err := newBatchOperateNodeListIfc(clnt.cluster, policy, records)
if err != nil {
if err != nil && policy.RespondAllKeys {
return err
}

Expand Down Expand Up @@ -1024,10 +1024,6 @@ func parseIndexErrorCode(response string) types.ResultCode {
// This method is only supported by Aerospike 4.9+ servers.
// If the policy is nil, the default relevant policy will be used.
func (clnt *Client) QueryPartitions(policy *QueryPolicy, statement *Statement, partitionFilter *PartitionFilter) (*Recordset, Error) {
if statement.Filter != nil {
return nil, ErrPartitionScanQueryNotSupported.err()
}

policy = clnt.getUsableQueryPolicy(policy)
nodes := clnt.cluster.GetNodes()
if len(nodes) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion client_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ var _ = gg.Describe("Aerospike", func() {
gm.Expect(err).ToNot(gm.HaveOccurred())

gm.Expect(t.Test).To(gm.Equal(float64(i)))
gm.Expect(t.TestLua).To(gm.Equal(testLua))
gm.Expect(t.TestLua).To(gm.BeNumerically("~", testLua))
}

}) // it
Expand Down
2 changes: 1 addition & 1 deletion multi_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (cmd *baseMultiCommand) parseKey(fieldCount int, bval *int64) (*Key, Error)
return nil, err
}
case BVAL_ARRAY:
*bval = Buffer.LittleBytesToInt64(cmd.dataBuffer, cmd.dataOffset)
*bval = Buffer.LittleBytesToInt64(cmd.dataBuffer, 1)
}
}

Expand Down
7 changes: 3 additions & 4 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,13 @@ func (clnt *Client) queryPartitions(policy *QueryPolicy, tracker *partitionTrack
cmd := newQueryPartitionCommand(policy, tracker, nodePartition, statement, recordset)
weg.execute(cmd)
}
// no need to manage the errors; they are send back via the recordset
weg.wait()
errs = chainErrors(weg.wait(), errs)

done, err := tracker.isComplete(clnt.Cluster(), &policy.BasePolicy)
if done || err != nil {
errs = chainErrors(err, errs)
// Query is complete.
if err != nil {
errs = chainErrors(err, errs)
if errs != nil {
recordset.sendError(errs)
}
return
Expand Down
85 changes: 83 additions & 2 deletions query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ var _ = gg.Describe("Query operations", func() {
bin4 := as.NewBin("Aerospike4", "constValue")
bin5 := as.NewBin("Aerospike5", -1)
bin6 := as.NewBin("Aerospike6", 1)
bin7 := as.NewBin("Aerospike7", nil)
var keys map[string]*as.Key
var indexName string
var indexName2 string
var indexName3 string

// read all records from the channel and make sure all of them are returned
var checkResults = func(recordset *as.Recordset, cancelCnt int) {
Expand Down Expand Up @@ -118,7 +120,8 @@ var _ = gg.Describe("Query operations", func() {

keys[string(key.Digest())] = key
bin3 = as.NewBin("Aerospike3", rand.Intn(math.MaxInt16))
err = client.PutBins(wpolicy, key, bin1, bin2, bin3, bin4, bin5, bin6)
bin7 = as.NewBin("Aerospike7", i%3)
err = client.PutBins(wpolicy, key, bin1, bin2, bin3, bin4, bin5, bin6, bin7)
gm.Expect(err).ToNot(gm.HaveOccurred())
}

Expand All @@ -129,6 +132,10 @@ var _ = gg.Describe("Query operations", func() {
// queries only work on indices
indexName2 = set + bin6.Name
createIndex(wpolicy, ns, set, indexName2, bin6.Name, as.NUMERIC)

// queries only work on indices
indexName3 = set + bin7.Name
createIndex(wpolicy, ns, set, indexName3, bin7.Name, as.NUMERIC)
})

gg.AfterEach(func() {
Expand All @@ -137,6 +144,9 @@ var _ = gg.Describe("Query operations", func() {

indexName = set + bin6.Name
gm.Expect(client.DropIndex(nil, ns, set, indexName)).ToNot(gm.HaveOccurred())

indexName = set + bin7.Name
gm.Expect(client.DropIndex(nil, ns, set, indexName)).ToNot(gm.HaveOccurred())
})

var queryPolicy = as.NewQueryPolicy()
Expand Down Expand Up @@ -170,7 +180,7 @@ var _ = gg.Describe("Query operations", func() {
gm.Expect(counter).To(gm.Equal(keyCount))
})

gg.It("must Query and get all partition records back for a specified key", func() {
gg.It("must Scan and get all partition records back for a specified key", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

counter := 0
Expand Down Expand Up @@ -203,6 +213,77 @@ var _ = gg.Describe("Query operations", func() {
gm.Expect(counter).To(gm.BeNumerically("<", keyCount))
})

gg.It("must Query per key partition and get all partition records back for a specified key and filter", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

counter := 0

var rkey *as.Key
for _, k := range keys {
rkey = k

pf := as.NewPartitionFilterByKey(rkey)
stm := as.NewStatement(ns, set)
stm.SetFilter(as.NewRangeFilter(bin7.Name, 1, 2))
recordset, err := client.QueryPartitions(queryPolicy, stm, pf)
gm.Expect(err).ToNot(gm.HaveOccurred())

for res := range recordset.Results() {
gm.Expect(res.Err).NotTo(gm.HaveOccurred())
gm.Expect(res.Record.Bins[bin1.Name]).To(gm.Equal(bin1.Value.GetObject()))
gm.Expect(res.Record.Bins[bin2.Name]).To(gm.Equal(bin2.Value.GetObject()))

delete(keys, string(res.Record.Key.Digest()))

counter++
}
}

gm.Expect(len(keys)).To(gm.Equal(334))
// This depends on how many keys end up in the same partition.
// Since keys are statistically distributed randomly and uniformly,
// we expect that there aren't many partitions that share more than one key.
gm.Expect(counter).To(gm.BeNumerically("~", keyCount - 334, 50))
})

gg.It("must Query and get all partition records back for a specified key and filter", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

counter := 0

pf := as.NewPartitionFilterAll()
stm := as.NewStatement(ns, set)
stm.SetFilter(as.NewRangeFilter(bin7.Name, 1, 2))
recordset, err := client.QueryPartitions(queryPolicy, stm, pf)
gm.Expect(err).ToNot(gm.HaveOccurred())

for res := range recordset.Results() {
gm.Expect(res.Err).NotTo(gm.HaveOccurred())
gm.Expect(res.Record.Bins[bin1.Name]).To(gm.Equal(bin1.Value.GetObject()))
gm.Expect(res.Record.Bins[bin2.Name]).To(gm.Equal(bin2.Value.GetObject()))

delete(keys, string(res.Record.Key.Digest()))

counter++
}

gm.Expect(len(keys)).To(gm.Equal(334))
gm.Expect(counter).To(gm.Equal(keyCount - 334))
})

gg.It("must return error on a Query when index is not found", func() {
pf := as.NewPartitionFilterAll()
stm := as.NewStatement(ns, set)
stm.SetFilter(as.NewRangeFilter(randString(10), 1, 2))
recordset, err := client.QueryPartitions(queryPolicy, stm, pf)
gm.Expect(err).ToNot(gm.HaveOccurred())

for res := range recordset.Results() {
gm.Expect(res.Err).To(gm.HaveOccurred())
gm.Expect(res.Err.Matches(ast.INDEX_NOTFOUND)).To(gm.BeTrue())
}
})

gg.It("must Query and get all partition records back for a specified partition range", func() {
gm.Expect(len(keys)).To(gm.Equal(keyCount))

Expand Down
7 changes: 3 additions & 4 deletions scan_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ func (clnt *Client) scanPartitions(policy *ScanPolicy, tracker *partitionTracker
cmd := newScanPartitionCommand(policy, tracker, nodePartition, namespace, setName, binNames, recordset)
weg.execute(cmd)
}
// no need to manage the errors; they are send back via the recordset
weg.wait()
errs = chainErrors(weg.wait(), errs)

if done, err := tracker.isComplete(clnt.Cluster(), &policy.BasePolicy); done || err != nil {
errs = chainErrors(err, errs)
// Scan is complete.
if err != nil {
errs = chainErrors(err, errs)
if errs != nil {
recordset.sendError(errs)
}
return
Expand Down

0 comments on commit 029420b

Please sign in to comment.