Skip to content

Commit

Permalink
CP commit 273d483 to rc/1.3.2 (#13195)
Browse files Browse the repository at this point in the history
  • Loading branch information
Beyyes authored Aug 16, 2024
1 parent a1be25e commit ef3738b
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ public void testAlignedRawDataAlignByTime1() {
};
resultSetEqualTest(
"select s2 from root.sg1.d1 where s2 - 1 >= 9 and s2 < 30", expectedHeader3, retArray3);

String expectedHeader4 = "Time,root.sg1.d1.s2,";
String[] retArray4 = new String[] {"14,14,", "15,15,"};
resultSetEqualTest(
"select s2 from root.sg1.d1 where s2 - 1 >= 9 and s2 < 30 offset 3 limit 2",
expectedHeader4,
retArray4);
}

@Test
Expand Down Expand Up @@ -164,6 +171,11 @@ public void testAlignedRawDataAlignByTime2() {
"30,30,",
};
resultSetEqualTest("select s3 from root.sg1.d1 where s3 + 1 > 16", expectedHeader3, retArray3);

String expectedHeader4 = "Time,root.sg1.d1.s3,";
String[] retArray4 = new String[] {"3,30000,", "13,130000,", "16,16,"};
resultSetEqualTest(
"select s3 from root.sg1.d1 where s3 + 1 > 16 limit 3", expectedHeader4, retArray4);
}

@Test
Expand Down Expand Up @@ -203,6 +215,13 @@ public void testNonAlignedRawDataAlignByTime1() {
};
resultSetEqualTest(
"select s2 from root.sg1.d2 where s2 - 1 >= 9 and s2 < 30", expectedHeader3, retArray3);

String expectedHeader4 = "Time,root.sg1.d2.s2,";
String[] retArray4 = new String[] {"12,12,", "13,13,", "14,14,"};
resultSetEqualTest(
"select s2 from root.sg1.d2 where s2 - 1 >= 9 and s2 < 30 limit 3 offset 2",
expectedHeader4,
retArray4);
}

@Test
Expand Down Expand Up @@ -257,6 +276,14 @@ public void testNonAlignedRawDataAlignByTime2() {
"25,25,", "26,26,", "27,27,", "28,28,", "29,29,", "30,30,",
};
resultSetEqualTest("select s3 from root.sg1.d2 where s3 + 1 > 16", expectedHeader3, retArray3);

String expectedHeader4 = "Time,root.sg1.d2.s3,";
String[] retArray4 =
new String[] {
"26,26,", "27,27,", "28,28,", "29,29,", "30,30,",
};
resultSetEqualTest(
"select s3 from root.sg1.d2 where s3 + 1 > 16 offset 10", expectedHeader4, retArray4);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,6 @@ public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext c
SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(node, context);
scanOptionsBuilder.withAllSensors(
context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()));
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());

Expression pushDownPredicate = node.getPushDownPredicate();
boolean predicateCanPushIntoScan = canPushIntoScan(pushDownPredicate);
Expand All @@ -341,6 +339,10 @@ public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext c
context.getTypeProvider().getTemplatedInfo() != null,
context.getTypeProvider()));
}
if (pushDownPredicate == null || predicateCanPushIntoScan) {
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
}

OperatorContext operatorContext =
context
Expand All @@ -364,17 +366,43 @@ public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext c

if (!predicateCanPushIntoScan) {
checkState(!context.isBuildPlanUseTemplate(), "Push down predicate is not supported yet");
return constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
Collections.singletonList(ExpressionFactory.timeSeries(node.getSeriesPath()))
.toArray(new Expression[0]),
Collections.singletonList(node.getSeriesPath().getSeriesType()),
makeLayout(Collections.singletonList(node)),
false,
node.getPlanNodeId(),
node.getScanOrder(),
context);
Operator rootOperator =
constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
Collections.singletonList(ExpressionFactory.timeSeries(node.getSeriesPath()))
.toArray(new Expression[0]),
Collections.singletonList(node.getSeriesPath().getSeriesType()),
makeLayout(Collections.singletonList(node)),
false,
node.getPlanNodeId(),
node.getScanOrder(),
context);
if (node.getPushDownOffset() > 0) {
rootOperator =
new OffsetOperator(
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
OffsetOperator.class.getSimpleName()),
node.getPushDownOffset(),
rootOperator);
}
if (node.getPushDownLimit() > 0) {
rootOperator =
new LimitOperator(
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LimitOperator.class.getSimpleName()),
node.getPushDownLimit(),
rootOperator);
}
return rootOperator;
}
return seriesScanOperator;
}
Expand All @@ -385,8 +413,6 @@ public Operator visitAlignedSeriesScan(
AlignedPath seriesPath = node.getAlignedPath();

SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(node, context);
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
scanOptionsBuilder.withAllSensors(
new HashSet<>(
context.isBuildPlanUseTemplate()
Expand All @@ -403,6 +429,10 @@ public Operator visitAlignedSeriesScan(
context.getTypeProvider().getTemplatedInfo() != null,
context.getTypeProvider()));
}
if (pushDownPredicate == null || predicateCanPushIntoScan) {
scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
}

OperatorContext operatorContext =
context
Expand Down Expand Up @@ -460,16 +490,43 @@ public Operator visitAlignedSeriesScan(
dataTypes.add(alignedPath.getSubMeasurementDataType(i));
}

return constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
expressions.toArray(new Expression[0]),
dataTypes,
makeLayout(Collections.singletonList(node)),
false,
node.getPlanNodeId(),
node.getScanOrder(),
context);
Operator rootOperator =
constructFilterOperator(
pushDownPredicate,
seriesScanOperator,
expressions.toArray(new Expression[0]),
dataTypes,
makeLayout(Collections.singletonList(node)),
false,
node.getPlanNodeId(),
node.getScanOrder(),
context);

if (node.getPushDownOffset() > 0) {
rootOperator =
new OffsetOperator(
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
OffsetOperator.class.getSimpleName()),
node.getPushDownOffset(),
rootOperator);
}
if (node.getPushDownLimit() > 0) {
rootOperator =
new LimitOperator(
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
LimitOperator.class.getSimpleName()),
node.getPushDownLimit(),
rootOperator);
}
return rootOperator;
}
return seriesScanOperator;
}
Expand Down

0 comments on commit ef3738b

Please sign in to comment.