Skip to content

Commit

Permalink
push down global time filter
Browse files Browse the repository at this point in the history
  • Loading branch information
shizy818 committed Dec 5, 2024
1 parent 402c869 commit 5aaa59b
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.VectorMeasurementSchema;
Expand Down Expand Up @@ -89,7 +90,8 @@ public abstract ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
QueryContext context,
IMemTable memTable,
List<Pair<ModEntry, IMemTable>> modsToMemtable,
long timeLowerBound)
long timeLowerBound,
Filter globalTimeFilter)
throws QueryProcessException, IOException;

public abstract List<IChunkMetadata> getVisibleMetadataListFromWriter(
Expand Down Expand Up @@ -186,7 +188,8 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
QueryContext context,
IMemTable memTable,
List<Pair<ModEntry, IMemTable>> modsToMemtable,
long timeLowerBound)
long timeLowerBound,
Filter globalTimeFilter)
throws QueryProcessException {
Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
IDeviceID deviceID = alignedFullPath.getDeviceId();
Expand Down Expand Up @@ -367,16 +370,22 @@ public ITimeSeriesMetadata generateTimeSeriesMetadata(
return timeSeriesMetadata;
}

// TODO: global time filter pushdown
private Map<TVList, Integer> prepareTvListMapForQuery(
WritableMemChunk memChunk, boolean isWorkMemTable, QueryContext context) {
WritableMemChunk memChunk,
boolean isWorkMemTable,
QueryContext context,
Filter globalTimeFilter) {
Map<TVList, Integer> tvListQueryMap = new LinkedHashMap<>();
// immutable sorted lists
for (TVList tvList : memChunk.getSortedList()) {
if (globalTimeFilter != null
&& !globalTimeFilter.satisfyStartEndTime(tvList.getMinTime(), tvList.getMaxTime())) {
continue;
}
tvList.lockQueryList();
try {
LOGGER.debug(
"Flushing/Working MemTable - Add current query context to immutable TVList's query list");
"Flushing/Working MemTable - add current query context to immutable TVList's query list");
tvList.getQueryContextList().add(context);
tvListQueryMap.put(tvList, tvList.rowCount());
} finally {
Expand All @@ -389,10 +398,13 @@ private Map<TVList, Integer> prepareTvListMapForQuery(
list.lockQueryList();
try {
if (!isWorkMemTable) {
LOGGER.debug(
"Flushing MemTable - add current query context to mutable TVList's query list");
list.getQueryContextList().add(context);
tvListQueryMap.put(list, list.rowCount());
if (globalTimeFilter == null
|| globalTimeFilter.satisfyStartEndTime(list.getMinTime(), list.getMaxTime())) {
LOGGER.debug(
"Flushing MemTable - add current query context to mutable TVList's query list");
list.getQueryContextList().add(context);
tvListQueryMap.put(list, list.rowCount());
}
} else {
if (list.isSorted() || list.getQueryContextList().isEmpty()) {
LOGGER.debug(
Expand Down Expand Up @@ -429,7 +441,8 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
QueryContext context,
IMemTable memTable,
List<Pair<ModEntry, IMemTable>> modsToMemtable,
long timeLowerBound)
long timeLowerBound,
Filter globalTimeFilter)
throws QueryProcessException, IOException {
Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
IDeviceID deviceID = fullPath.getDeviceId();
Expand All @@ -444,7 +457,7 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
// prepare TVList for query. It should clone and sort TVList if necessary.
// Also, the map keeps TVlist length at this moment.
Map<TVList, Integer> tvListQueryMap =
prepareTvListMapForQuery(memChunk, modsToMemtable == null, context);
prepareTvListMapForQuery(memChunk, modsToMemtable == null, context, globalTimeFilter);
List<TimeRange> deletionList = null;
if (modsToMemtable != null) {
deletionList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2163,7 +2163,9 @@ private List<TsFileResource> getFileResourceListForQuery(
if (tsFileResource.isClosed()) {
tsfileResourcesForQuery.add(tsFileResource);
} else {
tsFileResource.getProcessor().query(pathList, context, tsfileResourcesForQuery);
tsFileResource
.getProcessor()
.query(pathList, context, tsfileResourcesForQuery, globalTimeFilter);
}
} catch (IOException e) {
throw new MetadataException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.file.metadata.statistics.TimeStatistics;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
Expand Down Expand Up @@ -459,10 +460,12 @@ public ReadOnlyMemChunk query(
QueryContext context,
IFullPath fullPath,
long ttlLowerBound,
List<Pair<ModEntry, IMemTable>> modsToMemtable)
List<Pair<ModEntry, IMemTable>> modsToMemtable,
Filter globalTimeFilter)
throws IOException, QueryProcessException {
return ResourceByPathUtils.getResourceInstance(fullPath)
.getReadOnlyMemChunkFromMemTable(context, this, modsToMemtable, ttlLowerBound);
.getReadOnlyMemChunkFromMemTable(
context, this, modsToMemtable, ttlLowerBound, globalTimeFilter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;

Expand Down Expand Up @@ -117,7 +118,8 @@ ReadOnlyMemChunk query(
QueryContext context,
IFullPath fullPath,
long ttlLowerBound,
List<Pair<ModEntry, IMemTable>> modsToMemtabled)
List<Pair<ModEntry, IMemTable>> modsToMemtabled,
Filter globalTimeFilter)
throws IOException, QueryProcessException, MetadataException;

void queryForSeriesRegionScan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
Expand Down Expand Up @@ -2112,7 +2113,8 @@ public void queryForDeviceRegionScan(
public void query(
List<IFullPath> seriesPaths,
QueryContext context,
List<TsFileResource> tsfileResourcesForQuery)
List<TsFileResource> tsfileResourcesForQuery,
Filter globalTimeFilter)
throws IOException {
long startTime = System.nanoTime();
try {
Expand All @@ -2129,14 +2131,15 @@ public void query(
continue;
}
ReadOnlyMemChunk memChunk =
flushingMemTable.query(context, seriesPath, timeLowerBound, modsToMemtable);
flushingMemTable.query(
context, seriesPath, timeLowerBound, modsToMemtable, globalTimeFilter);
if (memChunk != null) {
readOnlyMemChunks.add(memChunk);
}
}
if (workMemTable != null) {
ReadOnlyMemChunk memChunk =
workMemTable.query(context, seriesPath, timeLowerBound, null);
workMemTable.query(context, seriesPath, timeLowerBound, null, globalTimeFilter);
if (memChunk != null) {
readOnlyMemChunks.add(memChunk);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ record = new TSRecord(deviceId, j);
tsfileProcessor.query(
Collections.singletonList(IFullPath.convertToIFullPath(fullPath)),
EnvironmentUtils.TEST_QUERY_CONTEXT,
tsfileResourcesForQuery);
tsfileResourcesForQuery,
null);
}

Assert.assertEquals(1, tsfileResourcesForQuery.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void simpleTest() throws IOException, QueryProcessException, MetadataExce
}

ReadOnlyMemChunk memChunk =
memTable.query(new QueryContext(), nonAlignedFullPath, Long.MIN_VALUE, null);
memTable.query(new QueryContext(), nonAlignedFullPath, Long.MIN_VALUE, null, null);
IPointReader iterator = memChunk.getPointReader();
for (int i = 0; i < dataSize; i++) {
iterator.hasNextTimeValuePair();
Expand Down Expand Up @@ -259,7 +259,8 @@ public void queryWithDeletionTest() throws IOException, QueryProcessException, M
new TreeDeletionEntry(new MeasurementPath(deviceID, measurementId[0]), 10, dataSize);
modsToMemtable.add(new Pair<>(deletion, memTable));
ReadOnlyMemChunk memChunk =
memTable.query(new QueryContext(), nonAlignedFullPath, Long.MIN_VALUE, modsToMemtable);
memTable.query(
new QueryContext(), nonAlignedFullPath, Long.MIN_VALUE, modsToMemtable, null);
IPointReader iterator = memChunk.getPointReader();
int cnt = 0;
while (iterator.hasNextTimeValuePair()) {
Expand Down Expand Up @@ -304,7 +305,7 @@ public void queryAlignChuckWithDeletionTest()
new TreeDeletionEntry(new MeasurementPath(deviceID, measurementId[0]), 10, dataSize);
modsToMemtable.add(new Pair<>(deletion, memTable));
ReadOnlyMemChunk memChunk =
memTable.query(new QueryContext(), alignedFullPath, Long.MIN_VALUE, modsToMemtable);
memTable.query(new QueryContext(), alignedFullPath, Long.MIN_VALUE, modsToMemtable, null);
IPointReader iterator = memChunk.getPointReader();
int cnt = 0;
while (iterator.hasNextTimeValuePair()) {
Expand Down Expand Up @@ -343,7 +344,7 @@ private void write(
CompressionType.UNCOMPRESSED,
Collections.emptyMap()));
IPointReader tvPair =
memTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null).getPointReader();
memTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null, null).getPointReader();
Arrays.sort(ret);
TimeValuePair last = null;
for (int i = 0; i < ret.length; i++) {
Expand Down Expand Up @@ -392,7 +393,7 @@ private void writeVector(IMemTable memTable)
Collections.emptyMap())));
IPointReader tvPair =
memTable
.query(new QueryContext(), tmpAlignedFullPath, Long.MIN_VALUE, null)
.query(new QueryContext(), tmpAlignedFullPath, Long.MIN_VALUE, null, null)
.getPointReader();
for (int i = 0; i < 100; i++) {
tvPair.hasNextTimeValuePair();
Expand Down Expand Up @@ -421,7 +422,7 @@ private void writeVector(IMemTable memTable)

tvPair =
memTable
.query(new QueryContext(), tmpAlignedFullPath, Long.MIN_VALUE, null)
.query(new QueryContext(), tmpAlignedFullPath, Long.MIN_VALUE, null, null)
.getPointReader();
for (int i = 0; i < 100; i++) {
tvPair.hasNextTimeValuePair();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testWriteAndFlush() throws IOException, WriteProcessException, Metad
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
new MeasurementSchema(
measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props));
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
assertTrue(tsfileResourcesForQuery.isEmpty());

for (int i = 1; i <= 100; i++) {
Expand All @@ -137,7 +137,7 @@ public void testWriteAndFlush() throws IOException, WriteProcessException, Metad

// query data in memory
tsfileResourcesForQuery.clear();
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);

TsFileResource tsFileResource = tsfileResourcesForQuery.get(0);
assertFalse(tsFileResource.getReadOnlyMemChunk(fullPath).isEmpty());
Expand All @@ -156,7 +156,7 @@ public void testWriteAndFlush() throws IOException, WriteProcessException, Metad
processor.syncFlush();

tsfileResourcesForQuery.clear();
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
processor.syncClose();
}
Expand Down Expand Up @@ -184,7 +184,7 @@ public void testWriteAndRestoreMetadata()
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
new MeasurementSchema(
measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props));
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
assertTrue(tsfileResourcesForQuery.isEmpty());

for (int i = 1; i <= 100; i++) {
Expand All @@ -195,7 +195,7 @@ public void testWriteAndRestoreMetadata()

// query data in memory
tsfileResourcesForQuery.clear();
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
int num = 1;
List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
Expand All @@ -213,7 +213,7 @@ public void testWriteAndRestoreMetadata()
processor.syncFlush();

tsfileResourcesForQuery.clear();
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());

RestorableTsFileIOWriter tsFileIOWriter = processor.getWriter();
Expand Down Expand Up @@ -267,7 +267,7 @@ public void testMultiFlush() throws IOException, WriteProcessException, Metadata
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
new MeasurementSchema(
measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props));
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
assertTrue(tsfileResourcesForQuery.isEmpty());

for (int flushId = 0; flushId < 10; flushId++) {
Expand All @@ -281,7 +281,7 @@ public void testMultiFlush() throws IOException, WriteProcessException, Metadata
processor.syncFlush();

tsfileResourcesForQuery.clear();
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
assertFalse(tsfileResourcesForQuery.isEmpty());
assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
processor.syncClose();
Expand Down Expand Up @@ -825,7 +825,7 @@ public void testWriteAndClose() throws IOException, WriteProcessException, Metad
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
new MeasurementSchema(
measurementId, dataType, encoding, CompressionType.UNCOMPRESSED, props));
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
assertTrue(tsfileResourcesForQuery.isEmpty());

for (int i = 1; i <= 100; i++) {
Expand All @@ -836,7 +836,7 @@ public void testWriteAndClose() throws IOException, WriteProcessException, Metad

// query data in memory
tsfileResourcesForQuery.clear();
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery);
processor.query(Collections.singletonList(fullPath), context, tsfileResourcesForQuery, null);
assertFalse(tsfileResourcesForQuery.isEmpty());
assertFalse(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath).isEmpty());
List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(fullPath);
Expand Down
Loading

0 comments on commit 5aaa59b

Please sign in to comment.