Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use schema last cache to speed up the last and last_by aggregations in table model #14526

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public class IoTDBTableAggregationIT {
"CLEAR ATTRIBUTE CACHE",
};

// public static void main(String[] args) {
// for (String sql : createSqls) {
// System.out.println(sql + ";");
// }
// }

@BeforeClass
public static void setUp() throws Exception {
EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(128 * 1024);
Expand Down Expand Up @@ -2455,7 +2461,7 @@ public void lastByTest() {
"shanghai,pudong,d08,null,",
};
tableResultSetEqualTest(
"select city,region,device_id,last_by(s5,time,time) from table1 group by city,region,device_id order by 1,2,3",
"select city,region,device_id,last_by(s5,time) from table1 group by city,region,device_id order by 1,2,3",
expectedHeader,
retArray,
DATABASE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ public class IoTDBConfig {
private long cacheFileReaderClearPeriod = 100000;

/** the max executing time of query in ms. Unit: millisecond */
private long queryTimeoutThreshold = 60000;
private long queryTimeoutThreshold = 60000_0000;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove it later


/** the max time to live of a session in ms. Unit: millisecond */
private int sessionTimeoutThreshold = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING;
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
Expand All @@ -80,23 +79,25 @@ public class TableAggregationTableScanOperator extends AbstractDataSourceOperato
private boolean finished = false;
private TsBlock inputTsBlock;

private final List<TableAggregator> tableAggregators;
private final List<ColumnSchema> groupingKeySchemas;
private final int[] groupingKeyIndex;

private final List<DeviceEntry> deviceEntries;
private final int deviceCount;
private int currentDeviceIndex;
private final List<String> measurementColumnNames;
private final Set<String> allSensors;
private final List<IMeasurementSchema> measurementSchemas;
private final List<TSDataType> measurementColumnTSDataTypes;
private final int measurementCount;

private final List<ColumnSchema> aggColumnSchemas;
private final int[] aggColumnsIndexArray;

private final SeriesScanOptions seriesScanOptions;
protected List<TableAggregator> tableAggregators;
protected final List<ColumnSchema> groupingKeySchemas;
protected final int[] groupingKeyIndex;

protected final List<DeviceEntry> deviceEntries;
protected final int deviceCount;
protected int currentDeviceIndex;
protected List<String> measurementColumnNames;
protected Set<String> allSensors;
protected List<IMeasurementSchema> measurementSchemas;
protected List<TSDataType> measurementColumnTSDataTypes;
protected int measurementCount;

// distinct column schemas appeared in aggregation function
protected List<ColumnSchema> aggColumnSchemas;
// length of aggColumnsIndexArray equals the size of aggColumnSchemas
protected int[] aggColumnsIndexArray;

protected SeriesScanOptions seriesScanOptions;
private final boolean ascending;
private final Ordering scanOrder;
// Some special data types(like BLOB) cannot use statistics
Expand All @@ -105,11 +106,11 @@ public class TableAggregationTableScanOperator extends AbstractDataSourceOperato

// stores all inputChannels of tableAggregators,
// e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels should be [0, 1, 0]
private final List<Integer> aggregatorInputChannels;
protected List<Integer> aggregatorInputChannels;

private QueryDataSource queryDataSource;

private final ITableTimeRangeIterator timeIterator;
protected ITableTimeRangeIterator timeIterator;

private boolean allAggregatorsHasFinalResult = false;

Expand All @@ -119,6 +120,7 @@ public TableAggregationTableScanOperator(
List<ColumnSchema> aggColumnSchemas,
int[] aggColumnsIndexArray,
List<DeviceEntry> deviceEntries,
int deviceCount,
SeriesScanOptions seriesScanOptions,
List<String> measurementColumnNames,
Set<String> allSensors,
Expand All @@ -140,7 +142,7 @@ public TableAggregationTableScanOperator(
this.aggColumnSchemas = aggColumnSchemas;
this.aggColumnsIndexArray = aggColumnsIndexArray;
this.deviceEntries = deviceEntries;
this.deviceCount = deviceEntries.size();
this.deviceCount = deviceCount;
this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(this.deviceCount));
this.ascending = ascending;
this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC;
Expand Down Expand Up @@ -170,32 +172,20 @@ public boolean isFinished() throws Exception {
return finished;
}

@Override
public long calculateMaxPeekMemory() {
return cachedRawDataSize + maxReturnSize;
}

@Override
public long calculateMaxReturnSize() {
return maxReturnSize;
}

@Override
public long calculateRetainedSizeAfterCallingNext() {
return timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
? cachedRawDataSize
: 0;
}

@Override
public boolean hasNext() throws Exception {
return timeIterator.hasCachedTimeRange()
|| timeIterator.hasNextTimeRange()
|| !resultTsBlockBuilder.isEmpty();
if (retainedTsBlock != null) {
return true;
}

return timeIterator.hasCachedTimeRange() || timeIterator.hasNextTimeRange();
}

@Override
public TsBlock next() throws Exception {
if (retainedTsBlock != null) {
return getResultFromRetainedTsBlock();
}

// optimize for sql: select count(*) from (select count(s1), sum(s1) from table)
if (tableAggregators.isEmpty()
Expand Down Expand Up @@ -224,35 +214,20 @@ public TsBlock next() throws Exception {
}
}

if (resultTsBlockBuilder.getPositionCount() > 0) {
return buildResultTsBlock();
} else {
if (resultTsBlockBuilder.isEmpty()) {
return null;
}
}

private TsBlock buildResultTsBlock() {
int declaredPositions = resultTsBlockBuilder.getPositionCount();
ColumnBuilder[] valueColumnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
Column[] valueColumns = new Column[valueColumnBuilders.length];
for (int i = 0; i < valueColumns.length; i++) {
valueColumns[i] = valueColumnBuilders[i].build();
if (valueColumns[i].getPositionCount() != declaredPositions) {
throw new IllegalStateException(
format(
"Declared positions (%s) does not match column %s's number of entries (%s)",
declaredPositions, i, valueColumns[i].getPositionCount()));
}
}
buildResultTsBlock();
return checkTsBlockSizeAndGetResult();
}

TsBlock resultTsBlock =
new TsBlock(
resultTsBlockBuilder.getPositionCount(),
protected void buildResultTsBlock() {
resultTsBlock =
resultTsBlockBuilder.build(
new RunLengthEncodedColumn(
TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount()),
valueColumns);
TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount()));
resultTsBlockBuilder.reset();
return resultTsBlock;
}

protected void constructAlignedSeriesScanUtil() {
Expand Down Expand Up @@ -820,7 +795,7 @@ private void nextDevice() {
CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
}

private void resetTableAggregators() {
protected void resetTableAggregators() {
tableAggregators.forEach(TableAggregator::reset);
}

Expand Down Expand Up @@ -856,6 +831,23 @@ public void initQueryDataSource(IQueryDataSource dataSource) {
this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
}

@Override
public long calculateMaxPeekMemory() {
return cachedRawDataSize + maxReturnSize;
}

@Override
public long calculateMaxReturnSize() {
return maxReturnSize;
}

@Override
public long calculateRetainedSizeAfterCallingNext() {
return timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
? cachedRawDataSize
: 0;
}

@Override
public long ramBytesUsed() {
return INSTANCE_SIZE
Expand Down
Loading
Loading