This UDF will help you with arbitrary aggregations on the Aerospike database. You need to register this lua UDF on the server using your client's API, and then use aggregation API on your chosen Aerospike Client to call it with required parameters.
Aerospike Server supports Lua 5.1, in which all numbers are floats with 51 bits significands (52 with sign bit). This means integers bigger than 2^51 will return invalid values.
You need to register the provided aggAPI.lua
file as a UDF in your database.
Here an example in Go (note that for the sake of conciseness, the errors are not checked in this example):
luaFile, _ := ioutil.ReadFile("aggAPI.lua")
regTask, _ := client.RegisterUDF(nil, luaFile, "aggAPI.lua", aero.LUA)
// wait until UDF is created on the server.
_ <-regTask.OnComplete()
For more information: Go Client register UDF.
RegisterTask task = client.register(params.policy, "udf/aggAPI.lua", "aggAPI.lua", Language.LUA);
// Alternately register from resource.
task.waitTillComplete();
For more information: Java Client register UDF.
aql> register module 'aggAPI.lua'
For more information: manging UDF using AQL.
The Lua streaming UDF will use the argument you pass to it in its calculations by eval
ing the arguments, and then using them in its logic to calculate and filter the records mostly on the server-side.
"fields": {
"name": "name",
"max(age)": {"func": "max", "expr": "rec['age'] ~= nil and rec['age']"},
"count(age)": {"func": "count", "expr": "( rec['age'] ) ~= nil and 1"},
"min(age)": {"func": "min", "expr": "rec['age'] ~= nil and rec['age']"},
"sum(age*salary)": {"func": "sum", "expr": " (rec['age'] or 0) * (rec['salary'] or 0)"},
},
"filter": "rec['age'] ~= nil and rec['age'] >5 ",
"group_by_fields": {
"name"
}
A last pass will occur on client-side and the results will return in the following format:
{
"8de6a795aaf29f2a7dad71c6631a1efc": {
"count(age)": 3.000000,
"max(age)": 45.000000,
"count": 3.000000,
"sum(age*salary)": 101400,
"min(age)": 25.000000,
"name": "Eva",
},
"ed57af7ff6ed54ec8b6b5eec3e2b649a": {
"count(age)": 1.000000,
"max(age)": 26.000000,
"count": 1.000000,
"sum(age*salary)": 83200,
"min(age)": 26.000000,
"name": "Riley",
},
}
The key
is a hash used to group the results for reduction. The value is a map of the returned fields. In the map, the key is the alias of the field.
Keep in mind that the values are limited to the size of Lua's value size, which is 51 bits of significant integer values.
There are 3 different input that need to be sent to the Lua UDF. Not all are required for every command. These values are:
-
"fields"
: Choosing the fields to return - this is the equivalent of theselect
part of the query.-
Fields which do not require any complex calculation: the map key is the alias, while the map value is the name of the existing bin in the database.
Example:fields": { "age": "age", "salary_usd" : "salary" }
-
Fields which are calculated (apply an aggregate function on): the map key is the aliases, and the value is a map of the function and its calculation. Available functions are
count
,sum
,min
,max
.Example:
"fields": { "sum(salary * 2)": {"func": "sum", "expr": "rec['salary'] * 2"}, "min(salary)": {"func": "sum", "expr": "rec['salary']"}, "max(salary * 5)": {"func": "sum", "expr": "(rec['salary'] or 0) * 5"} }
"sum(salary * 2)": {"func": "sum", "expr": "rec['salary'] * 2"}
means a field with the namesum(salary * 2)
should be calculated from the value of the binsalary
multiplied by 2."min(salary)": {"func": "sum", "expr": "rec['salary']"}
: use the value of the binsalary
to calculatemin(salary)
"max(salary * 5)": {"func": "sum", "expr": "(rec['salary'] or 0) * 5"}
: use the value of the binsalary
multiplied by 5 to calculate the max value. If thesalary
bin isnull
, 0 will be used as default value.
-
-
"filter"
: Filter is a lua boolean statement to filter records - this is the equivalent of awhere
in a query.
If the value of thestatement
istrue
, the record will be included in the results.Example:
"filter": "rec['age'] ~= nil and rec['age'] > 25"
-
"group_by_fields"
: List of field aliases to group the records by - this is the equivalent ofgroup by
in a query.
Example:"group_by_fields": [ "age", "salary_udf" ]
To calculate the equivalent of the following SQL:
select sum(salary) from employees
We would call the select_agg_records
function with following argument:
{
"fields": {
"sum(salary)": {"func": "sum" , "expr": "rec['salary'] or 0"},
},
}
Which means use the value of the bin salary
. If the salary
bin is null
or does not exist, 0 will be used as default value.
We name this calculation sum(salary)
.
To add a filtering condition:
select sum(salary) from employees where age > 25
provide the following arguments:
{
"fields": {
"sum(salary)": {"func": "sum" , "expr": "rec['salary'] or 0"},
},
"filter": "rec['age'] ~= nil and rec['age'] > 25",
}
Which means check if the age
bin exists, and if it does, check if the age is over 25.
To create a group by aggregation, we would need to add the fields both to the fields
part and to the group by.
select name, age, sum(salary) from employees where age > 25 group by name, age
provide the following arguments:
{
"fields": {
"age": "age",
"name": "name",
"sum(salary)": {"func": "sum" , "expr": "rec['salary'] or 0"},
},
"filter": "rec['age'] ~= nil and rec['age'] > 25",
"group_by_fields": [
"age",
"name",
],
}
Please note: the UDF logic does not validate if your provided group by
arguments are valid for the logic of the equivalent SQL command.
To add other fields to the query like:
select age, min(salary), max(salary), sum(salary) from employees where age > 25 group by age
provide the following arguments:
{
"fields": {
"age": "age",
"min(salary)": {"func": "min" , "expr": "rec['salary']"},
"max(salary)": {"func": "max" , "expr": "rec['salary']"},
"sum(salary)": {"func": "sum" , "expr": "rec['salary']"},
},
"filter": "rec['age'] ~= nil and rec['age'] > 25",
"group_by_fields": [
"age",
],
}
YES! The execute the equivalent of the following SQL command:
select age, min(salary), max(salary), sum(salary * 2) from employees where age > 25 group by age
provide the following arguments:
{
"fields": {
"age": "age",
"sum(salary * 2)": {"func": "sum" , "expr": "rec['salary'] * 2"},
"min(salary)": {"func": "min" , "expr": "rec['salary']"},
"max(salary)": {"func": "max" , "expr": "rec['salary']"},
},
"filter": "rec['age'] ~= nil and rec['age'] > 25",
"group_by_fields": [
"age",
],
}
The UDF does not have an avg
function so there is no equivalent for this SQL query:
select age, avg(salary) from employees group by age
Calculating the average needs to be done manually by the client - query sum
and count
and calculate the avg
from them.
Using the UDF to get sum and count:
{
"fields": {
"age": "age",
"sum(salary)": {"func": "sum" , "expr": "rec['salary']"},
"count(*)": {"func": "count" , "expr": "1"}
},
"group_by_fields": [
"age",
],
}
In case you would want to return the following SQL statement:
select distinct age from employees
since you can rewrite the above query as:
select age from employees group by age
then the parameters sent to the UDF would be:
{
"fields": {
"age": "age",
},
"group_by_fields": [
"age",
],
}
stm := aero.NewStatement(nsName, setName)
functionArgsMap := map[string]interface{}{
"fields": map[string]interface{}{
"name": "name",
"max(age)": map[string]string{"func": "max", "expr": "rec['age'] ~= nil and rec['age']"},
"count(age)": map[string]string{"func": "count", "expr": "( rec['age'] ) ~= nil and 1"},
"min(age)": map[string]string{"func": "min", "expr": "rec['age'] ~= nil and rec['age']"},
"sum(age*salary)": map[string]string{"func": "sum", "expr": " (rec['age'] or 0) * (rec['salary'] or 0)"},
},
"filter": "rec['age'] ~= nil and rec['age'] >5 ",
"group_by_fields": []string{
"name",
},
}
recordset, err := client.QueryAggregate(nil, stm, "aggAPI", "select_agg_records", aero.NewValue(functionArgsMap))
defer recordset.Close()
if err != nil {
return err
}
for result := range recordset.Results() {
if result.Err != nil {
return result.Err
}
pp.Println(result.Record.Bins["SUCCESS"])
}
String stringToParse = String.format("{\n" +
" \"fields\": {\n" +
" \"test_id\": \"test_id\",\n" +
" \"state\": \"state\",\n" +
" \"count(state)\": {\"func\":\"count\", \"expr\": \"rec['state'] ~= nil and 1\"},\n" +
" },\n" +
" \"filter\": \"rec['test_id'] ~= nil and rec['test_id'] == %s\",\n" +
" \"group_by_fields\": [\n" +
" \"test_id\",\n" +
" \"state\",\n" +
" ],\n" +
"}", testId.toString());
JSONObject json = new JSONObject(stringToParse);
Map functionArgsMap = toMap(json);
Statement stmt = new Statement();
stmt.setNamespace(namespace);
stmt.setSetName(set);
stmt.setBinNames(binName);
// Optional filter via Index
stmt.setFilter(Filter.equal(binName, testId));
stmt.setIndexName(indexName);
ResultSet rs = client.queryAggregate(null, stmt,"aggAPI","select_agg_records", Value.get(functionArgsMap));
try {
if (rs.next()) {
Object obj = rs.getObject();
if (obj instanceof Map<?, ?>) {
Map<String, Map> map = (Map<String, Map>) obj;
map.values().forEach(res -> {
console.info(res.toString());
console.info("res: %s, %s => %s",
res.get("test_id"),
res.get("state"),
res.get("count(*)"));
}
}
}
}
finally {
rs.close();
}