Skip to content

Commit

Permalink
support sliding window, fix visitSlidingWindow method of OperatorGene…
Browse files Browse the repository at this point in the history
…ratorTree; support order by time; add schemaMap for templateInfo; fix devicePath of SingleDeviceViewNode.
  • Loading branch information
Beyyes committed May 16, 2024
1 parent 7d8f8f8 commit 8854da1
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.sql.Connection;
import java.sql.Statement;

import static org.apache.iotdb.db.it.utils.TestUtils.assertTestFail;
import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;

@RunWith(IoTDBTestRunner.class)
Expand Down Expand Up @@ -100,32 +101,95 @@ public void aggregationTest() {
expectedHeader,
retArray);

// value filter + having + group by time
// ascending with descending

// group by session, condition,

// wildcard: agg(*)

// count_time(*)

// agg operation expression : agg(s1+1)

// duplicate select expressions

// non aligned template: sg1

// filter cannot push down

// having count(s1+s2)
}

@Test
public void groupByTest() {
//
String[] expectedHeader =
new String[] {"Time,Device,max_time(s1),last_value(s1),last_value(s2)"};
String[] retArray =
new String[] {
"1,root.sg2.d1,2,2.2,false,",
"5,root.sg2.d1,5,5.5,true,",
"1,root.sg2.d2,2,22.2,false,",
"5,root.sg2.d2,5,50.0,false,",
"1,root.sg2.d3,1,111.1,true,",
};
resultSetEqualTest(
"select max_time(s1), last_value(s1), last_value(s2) from root.sg2.** group by ([1,10), 2ms) having last_value(s2) is not null limit 5 align by device;",
expectedHeader,
retArray);

// sliding window
expectedHeader = new String[] {"Time,Device,max_time(s1),last_value(s1),last_value(s2)"};
retArray =
new String[] {
"1,root.sg2.d1,2,2.2,false,",
"1,root.sg2.d2,2,22.2,false,",
"3,root.sg2.d2,5,50.0,false,",
"5,root.sg2.d2,5,50.0,false,",
"7,root.sg2.d3,8,8.8,false,",
"3,root.sg2.d4,5,5555.5,false,",
"5,root.sg2.d4,5,5555.5,false,",
};
resultSetEqualTest(
"SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg2.** where s3+1=1316 or s2=false group by ([1,10),2ms) having avg(s1)>0 align by device;",
"SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg2.** where s3+1=1316 or s2=false group by ([1,10),3ms,2ms) having avg(s1)>0 align by device;",
expectedHeader,
retArray);
}

// agg operation expression

// group by session,

// order by

// wildcald
@Test
public void orderByTest() {
String[] expectedHeader = new String[] {"Time,Device,sum(s3)"};
String[] retArray =
new String[] {
"4,root.sg2.d1,5.0,",
"4,root.sg2.d2,5.0,",
"4,root.sg2.d3,44.0,",
"4,root.sg2.d4,5555.0,",
"2,root.sg2.d1,2.0,",
};
resultSetEqualTest(
"select sum(s3) from root.sg2.** where s1>1 GROUP BY([0, 10), 2ms) order by time desc offset 8 limit 5 align by device;",
expectedHeader,
retArray);

// ascending with descending
expectedHeader = new String[] {"Time,Device,sum(s3)"};
retArray =
new String[] {
"0,root.sg2.d1,1.0,",
"2,root.sg2.d1,2.0,",
"4,root.sg2.d1,5.0,",
"0,root.sg2.d2,11.0,",
"2,root.sg2.d2,22.0,",
};
resultSetEqualTest(
"select sum(s3) from root.sg2.** where s1>1 GROUP BY([0, 10), 2ms) order by count(s2) desc limit 5 align by device;",
expectedHeader,
retArray);

// count_time
// order by non-existent measurement
assertTestFail(
"select sum(s3) from root.sg2.** where s1>1 order by count(s_null) desc limit 5 align by device;",
"count(s_null) in order by clause doesn't exist.");
}

protected static void insertData() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ static boolean canBuildAggregationPlanUseTemplate(
MPPQueryContext context,
Template template) {

if (queryStatement.hasOrderByExpression()) {
return false;
}

analysis.setNoWhereAndAggregation(false);

List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
Expand Down Expand Up @@ -93,7 +97,7 @@ static boolean canBuildAggregationPlanUseTemplate(
return true;
}

analyzeHaving(analysis, queryStatement, schemaTree, deviceList);
analyzeHaving(analysis, queryStatement);

analyzeDeviceToAggregation(analysis);
analyzeDeviceToSourceTransform(analysis);
Expand Down Expand Up @@ -177,11 +181,7 @@ private static boolean analyzeSelect(
return true;
}

private static void analyzeHaving(
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
List<PartialPath> deviceSet) {
private static void analyzeHaving(Analysis analysis, QueryStatement queryStatement) {
if (!queryStatement.hasHaving()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ static void analyzeDeviceToWhere(Analysis analysis, QueryStatement queryStatemen
}
}

private static void analyzeDeviceToOrderBy(
static void analyzeDeviceToOrderBy(
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,12 @@ public class TemplatedInfo {
private int maxTsBlockLineNum = -1;

// variables related to predicate push down
// TODO when to init pushDownPredicate in agg situation?
private Expression pushDownPredicate;

// variables related to aggregation
public List<AggregationDescriptor> aggregationDescriptorList;
public GroupByTimeParameter groupByTimeParameter;
public boolean outputEndTime;

private Expression havingExpression;
private List<AggregationDescriptor> aggregationDescriptorList;
private final GroupByTimeParameter groupByTimeParameter;
private final boolean outputEndTime;

public TemplatedInfo(
List<String> measurementList,
Expand Down Expand Up @@ -117,11 +114,11 @@ public TemplatedInfo(
this.predicate = predicate;
if (predicate != null) {
this.keepNull = keepNull;
this.schemaMap = schemaMap;
this.filterLayoutMap = filterLayoutMap;
}
this.pushDownPredicate = pushDownPredicate;

this.schemaMap = schemaMap;
this.aggregationDescriptorList = aggregationDescriptorList;
this.groupByTimeParameter = groupByTimeParameter;
this.outputEndTime = outputEndTime;
Expand Down Expand Up @@ -202,6 +199,22 @@ public Expression[] getProjectExpressions() {
return projectExpressions;
}

public void setAggregationDescriptorList(List<AggregationDescriptor> aggregationDescriptorList) {
this.aggregationDescriptorList = aggregationDescriptorList;
}

public List<AggregationDescriptor> getAggregationDescriptorList() {
return this.aggregationDescriptorList;
}

public GroupByTimeParameter getGroupByTimeParameter() {
return this.groupByTimeParameter;
}

public boolean isOutputEndTime() {
return outputEndTime;
}

public static Map<String, List<InputLocation>> makeLayout(List<String> measurementList) {
Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
int tsBlockIndex = 0;
Expand Down Expand Up @@ -273,6 +286,8 @@ public void serialize(ByteBuffer byteBuffer) {
} else {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
}

ReadWriteIOUtils.write(outputEndTime, byteBuffer);
}

public void serialize(DataOutputStream stream) throws IOException {
Expand Down Expand Up @@ -332,6 +347,8 @@ public void serialize(DataOutputStream stream) throws IOException {
} else {
ReadWriteIOUtils.write((byte) 0, stream);
}

ReadWriteIOUtils.write(outputEndTime, stream);
}

public static TemplatedInfo deserialize(ByteBuffer byteBuffer) {
Expand Down Expand Up @@ -377,20 +394,21 @@ public static TemplatedInfo deserialize(ByteBuffer byteBuffer) {
long limitValue = ReadWriteIOUtils.readLong(byteBuffer);

Expression predicate = null;
byte hasFilter = ReadWriteIOUtils.readByte(byteBuffer);
Map<String, IMeasurementSchema> currentSchemaMap = null;
Map<String, List<InputLocation>> layoutMap = null;
boolean keepNull = false;
Map<String, List<InputLocation>> filterLayoutMap = null;
byte hasFilter = ReadWriteIOUtils.readByte(byteBuffer);
if (hasFilter == 1) {
predicate = Expression.deserialize(byteBuffer);
keepNull = ReadWriteIOUtils.readBool(byteBuffer);
currentSchemaMap = new HashMap<>();
for (IMeasurementSchema measurementSchema : measurementSchemaList) {
currentSchemaMap.put(measurementSchema.getMeasurementId(), measurementSchema);
}
layoutMap = makeLayout(measurementList);
filterLayoutMap = makeLayout(measurementList);
}

Map<String, IMeasurementSchema> measurementSchemaMap =
new HashMap<>(measurementSchemaList.size());
measurementSchemaList.forEach(
measurementSchema ->
measurementSchemaMap.put(measurementSchema.getMeasurementId(), measurementSchema));

Expression pushDownPredicate = null;
byte hasPushDownFilter = ReadWriteIOUtils.readByte(byteBuffer);
if (hasPushDownFilter == 1) {
Expand All @@ -412,7 +430,7 @@ public static TemplatedInfo deserialize(ByteBuffer byteBuffer) {
groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
}

// TODO add outputEndTime serialization and deserialization
boolean outputEndTime = ReadWriteIOUtils.readBool(byteBuffer);

return new TemplatedInfo(
measurementList,
Expand All @@ -426,11 +444,11 @@ public static TemplatedInfo deserialize(ByteBuffer byteBuffer) {
limitValue,
predicate,
keepNull,
currentSchemaMap,
layoutMap,
measurementSchemaMap,
filterLayoutMap,
pushDownPredicate,
aggregationDescriptorList,
groupByTimeParameter,
false);
outputEndTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public PlanNode visitDeviceView(DeviceViewNode node, RewriterContext context) {
@Override
public PlanNode visitSingleDeviceView(SingleDeviceViewNode node, RewriterContext context) {
context.setCurDevice(node.getDevice());
context.setCurDevicePath(new PartialPath(node.getDevice().split(",")));
PlanNode rewrittenChild = node.getChild().accept(this, context);
node.setChild(rewrittenChild);
return node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,9 +736,6 @@ public LogicalPlanBuilder planSlidingWindowAggregation(
GroupByTimeParameter groupByTimeParameter,
AggregationStep curStep,
Ordering scanOrder) {
if (aggregationExpressions == null) {
return this;
}

this.root =
createSlidingWindowAggregationNode(
Expand Down
Loading

0 comments on commit 8854da1

Please sign in to comment.