Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unaligned tvlist feat #14265

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ef5de53
null bitmap for int tvlist
shizy818 Nov 27, 2024
7a15073
update min/max timestamp and sequential part of tvlist during insert
shizy818 Nov 30, 2024
c28fe3f
mutable & immutable tvlists in writable memchunk
shizy818 Dec 1, 2024
6df2e10
copy-on-write array list
shizy818 Dec 1, 2024
435d5da
review comments part 1
shizy818 Dec 3, 2024
d5fad22
fix unit test errors
shizy818 Dec 3, 2024
402c869
review comments part 2
shizy818 Dec 4, 2024
5aaa59b
push down global time filter
shizy818 Dec 5, 2024
cfc9e74
fix MemPageReaderTest case
shizy818 Dec 5, 2024
cd3f7d9
fix memory page offsets error
shizy818 Dec 5, 2024
e3d2d5e
synchronized sort & MergeSortTvListIterator bug
shizy818 Dec 6, 2024
73e9f83
tvlist_sort_threshold config property
shizy818 Dec 7, 2024
0545ff1
bug fix:
shizy818 Dec 11, 2024
2fcdcdb
optimize TVListIterator & MergeSortTvListIterator
shizy818 Dec 11, 2024
45db409
retrofit encode when tvlist_sort_threshold is zero
shizy818 Dec 13, 2024
9411fea
delay sort & statistic generation to query execution
shizy818 Dec 11, 2024
e3346d3
fix: skip deleted data during encode
shizy818 Dec 13, 2024
8e0f8e5
aligned time series part
shizy818 Dec 16, 2024
6eedcf6
fix: MemAlignedChunkReader page offset
shizy818 Dec 18, 2024
0a9c4ed
performance issue:
shizy818 Dec 17, 2024
9b0a6f8
fix: memory chunk reader may read more points than expected in one page
shizy818 Dec 23, 2024
23bf1f7
update chunk & page statistic for aligend memchunk by column
shizy818 Dec 21, 2024
298e16c
revert: getAlignedValueForQuery
shizy818 Dec 24, 2024
359929d
fix: * CopyOnWriteArrayList for AlignedTVList bitmaps
shizy818 Dec 25, 2024
6817370
refactor: Tim/Quick/Backward TVList
shizy818 Dec 25, 2024
a8beea5
refactor: synchronized tvlist method: sort, putXXX
shizy818 Dec 25, 2024
6369293
refactor: change list to array in AlignedTVList iterator
shizy818 Dec 25, 2024
a80bcc0
revert: remove CopyOnWriteArrayList
shizy818 Dec 26, 2024
3e0b904
refactor: clone MergeSort iterator from ReadOnlyChunk
shizy818 Dec 27, 2024
ca08605
fix: clone working tvlist during flush if there is query on it
shizy818 Dec 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;

import org.apache.tsfile.file.metadata.IDeviceID;
Expand Down Expand Up @@ -649,6 +650,35 @@ public void releaseResourceWhenAllDriversAreClosed() {
releaseResource();
}

private void releaseTVListOwnedByQuery() {
for (TVList tvList : tvListSet) {
tvList.lockQueryList();
List<QueryContext> queryContextList = tvList.getQueryContextList();
try {
queryContextList.remove(this);
if (tvList.getOwnerQuery() == this) {
tvList.setOwnerQuery(null);
if (queryContextList.isEmpty()) {
LOGGER.debug(
"TVList {} is released by the query, FragmentInstance Id is {}",
tvList,
this.getId());
tvList.clear();
shizy818 marked this conversation as resolved.
Show resolved Hide resolved
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
} else {
LOGGER.debug(
"TVList {} is owned by another query, FragmentInstance Id is {}",
tvList,
((FragmentInstanceContext) queryContextList.get(0)).getId());
tvList.setOwnerQuery(queryContextList.get(0));
}
}
} finally {
tvList.unlockQueryList();
}
}
}

/**
* All file paths used by this fragment instance must be cleared and thus the usage reference must
* be decreased.
Expand All @@ -669,6 +699,9 @@ public synchronized void releaseResource() {
unClosedFilePaths = null;
}

// release TVList owned by current query
releaseTVListOwnedByQuery();

dataRegion = null;
globalTimeFilter = null;
sharedQueryDataSource = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory.ModsSerializer;
import org.apache.iotdb.db.utils.datastructure.TVList;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -71,6 +73,9 @@ public class QueryContext {

private final Set<TsFileID> nonExistentModFiles = new CopyOnWriteArraySet<>();

// accessed tvlists for the query
protected final Set<TVList> tvListSet = new HashSet<>();

public QueryContext() {}

public QueryContext(long queryId) {
Expand Down Expand Up @@ -206,4 +211,8 @@ public boolean isIgnoreAllNullRows() {
public void setIgnoreAllNullRows(boolean ignoreAllNullRows) {
this.ignoreAllNullRows = ignoreAllNullRows;
}

public void addTVListToSet(Map<TVList, Integer> tvListMap) {
tvListSet.addAll(tvListMap.keySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedReadOnlyMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedWritableMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedWritableMemChunkGroup;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.ModificationUtils;
Expand All @@ -47,15 +49,19 @@
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.VectorMeasurementSchema;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -84,7 +90,8 @@ public abstract ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
QueryContext context,
IMemTable memTable,
List<Pair<ModEntry, IMemTable>> modsToMemtable,
long timeLowerBound)
long timeLowerBound,
Filter globalTimeFilter)
throws QueryProcessException, IOException;

public abstract List<IChunkMetadata> getVisibleMetadataListFromWriter(
Expand Down Expand Up @@ -181,7 +188,8 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
QueryContext context,
IMemTable memTable,
List<Pair<ModEntry, IMemTable>> modsToMemtable,
long timeLowerBound)
long timeLowerBound,
Filter globalTimeFilter)
throws QueryProcessException {
Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
IDeviceID deviceID = alignedFullPath.getDeviceId();
Expand Down Expand Up @@ -322,6 +330,9 @@ public List<IChunkMetadata> getVisibleMetadataListFromWriter(

class MeasurementResourceByPathUtils extends ResourceByPathUtils {

private static final Logger LOGGER =
LoggerFactory.getLogger(MeasurementResourceByPathUtils.class);

NonAlignedFullPath fullPath;

protected MeasurementResourceByPathUtils(IFullPath fullPath) {
Expand Down Expand Up @@ -359,12 +370,79 @@ public ITimeSeriesMetadata generateTimeSeriesMetadata(
return timeSeriesMetadata;
}

private Map<TVList, Integer> prepareTvListMapForQuery(
WritableMemChunk memChunk,
boolean isWorkMemTable,
QueryContext context,
Filter globalTimeFilter) {
Map<TVList, Integer> tvListQueryMap = new LinkedHashMap<>();
// immutable sorted lists
for (TVList tvList : memChunk.getSortedList()) {
if (globalTimeFilter != null
&& !globalTimeFilter.satisfyStartEndTime(tvList.getMinTime(), tvList.getMaxTime())) {
continue;
}
tvList.lockQueryList();
try {
LOGGER.debug(
"Flushing/Working MemTable - add current query context to immutable TVList's query list");
tvList.getQueryContextList().add(context);
tvListQueryMap.put(tvList, tvList.rowCount());
} finally {
tvList.unlockQueryList();
}
}

// mutable tvlist
TVList list = memChunk.getWorkingTVList();
list.lockQueryList();
try {
if (!isWorkMemTable) {
if (globalTimeFilter == null
|| globalTimeFilter.satisfyStartEndTime(list.getMinTime(), list.getMaxTime())) {
LOGGER.debug(
"Flushing MemTable - add current query context to mutable TVList's query list");
list.getQueryContextList().add(context);
tvListQueryMap.put(list, list.rowCount());
}
} else {
if (list.isSorted() || list.getQueryContextList().isEmpty()) {
LOGGER.debug(
"Working MemTable - add current query context to mutable TVList's query list when it's sorted or no other query on it");
list.getQueryContextList().add(context);
tvListQueryMap.put(list, list.rowCount());
} else {
LOGGER.debug(
"Working MemTable - clone mutable TVList and replace old TVList in working MemTable");
QueryContext firstQuery = list.getQueryContextList().get(0);
// reserve query memory
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
}
list.setOwnerQuery(firstQuery);

// clone TVList
TVList cloneList = list.clone();
cloneList.getQueryContextList().add(context);
tvListQueryMap.put(cloneList, cloneList.rowCount());
memChunk.setWorkingTVList(cloneList);
}
}
} finally {
list.unlockQueryList();
}
return tvListQueryMap;
}

@Override
public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
QueryContext context,
IMemTable memTable,
List<Pair<ModEntry, IMemTable>> modsToMemtable,
long timeLowerBound)
long timeLowerBound,
Filter globalTimeFilter)
throws QueryProcessException, IOException {
Map<IDeviceID, IWritableMemChunkGroup> memTableMap = memTable.getMemTableMap();
IDeviceID deviceID = fullPath.getDeviceId();
Expand All @@ -373,10 +451,13 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
|| !memTableMap.get(deviceID).contains(fullPath.getMeasurement())) {
return null;
}
IWritableMemChunk memChunk =
memTableMap.get(deviceID).getMemChunkMap().get(fullPath.getMeasurement());
// get sorted tv list is synchronized so different query can get right sorted list reference
TVList chunkCopy = memChunk.getSortedTvListForQuery();
WritableMemChunk memChunk =
(WritableMemChunk)
memTableMap.get(deviceID).getMemChunkMap().get(fullPath.getMeasurement());
// prepare TVList for query. It should clone and sort TVList if necessary.
// Also, the map keeps TVlist length at this moment.
Map<TVList, Integer> tvListQueryMap =
prepareTvListMapForQuery(memChunk, modsToMemtable == null, context, globalTimeFilter);
List<TimeRange> deletionList = null;
if (modsToMemtable != null) {
deletionList =
Expand All @@ -392,7 +473,7 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
fullPath.getMeasurement(),
fullPath.getMeasurementSchema().getType(),
fullPath.getMeasurementSchema().getEncodingType(),
chunkCopy,
tvListQueryMap,
fullPath.getMeasurementSchema().getProps(),
deletionList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2163,7 +2163,9 @@ private List<TsFileResource> getFileResourceListForQuery(
if (tsFileResource.isClosed()) {
tsfileResourcesForQuery.add(tsFileResource);
} else {
tsFileResource.getProcessor().query(pathList, context, tsfileResourcesForQuery);
tsFileResource
.getProcessor()
.query(pathList, context, tsfileResourcesForQuery, globalTimeFilter);
}
} catch (IOException e) {
throw new MetadataException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.file.metadata.statistics.TimeStatistics;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
Expand Down Expand Up @@ -459,10 +460,12 @@ public ReadOnlyMemChunk query(
QueryContext context,
IFullPath fullPath,
long ttlLowerBound,
List<Pair<ModEntry, IMemTable>> modsToMemtable)
List<Pair<ModEntry, IMemTable>> modsToMemtable,
Filter globalTimeFilter)
throws IOException, QueryProcessException {
return ResourceByPathUtils.getResourceInstance(fullPath)
.getReadOnlyMemChunkFromMemTable(context, this, modsToMemtable, ttlLowerBound);
.getReadOnlyMemChunkFromMemTable(
context, this, modsToMemtable, ttlLowerBound, globalTimeFilter);
}

@Override
Expand Down Expand Up @@ -570,7 +573,7 @@ private void getMemChunkHandleFromMemTable(
buildChunkMetaDataForMemoryChunk(
measurementId,
timestamps[0],
timestamps[tvListCopy.rowCount() - 1],
timestamps[tvListCopy.count() - 1],
Collections.emptyList()));
memChunkHandleMap
.computeIfAbsent(measurementId, k -> new ArrayList<>())
Expand Down Expand Up @@ -669,7 +672,7 @@ private void getMemChunkHandleFromMemTable(
buildChunkMetaDataForMemoryChunk(
measurementId,
timestamps[0],
timestamps[tvListCopy.rowCount() - 1],
timestamps[tvListCopy.count() - 1],
Collections.emptyList()));
memChunkHandleMap
.computeIfAbsent(measurementId, k -> new ArrayList<>())
Expand Down Expand Up @@ -759,7 +762,7 @@ private IChunkMetadata buildChunkMetaDataForMemoryChunk(
}

private long[] filterDeletedTimestamp(TVList tvList, List<TimeRange> deletionList) {
if (deletionList.isEmpty()) {
if (tvList.getBitMap() == null && deletionList.isEmpty()) {
long[] timestamps = tvList.getTimestamps().stream().flatMapToLong(LongStream::of).toArray();
return Arrays.copyOfRange(timestamps, 0, tvList.rowCount());
}
Expand All @@ -771,7 +774,8 @@ private long[] filterDeletedTimestamp(TVList tvList, List<TimeRange> deletionLis

for (int i = 0; i < rowCount; i++) {
long curTime = tvList.getTime(i);
if (!ModificationUtils.isPointDeleted(curTime, deletionList, deletionCursor)
if (!tvList.isNullValue(i)
&& !ModificationUtils.isPointDeleted(curTime, deletionList, deletionCursor)
&& (i == rowCount - 1 || curTime != lastTime)) {
result.add(curTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
}

@Override
public TVList getTVList() {
public TVList getWorkingTVList() {
return list;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public long deleteTime(ModEntry modEntry) {

@Override
public long getCurrentTVListSize(String measurement) {
return memChunk.getTVList().rowCount();
return memChunk.getWorkingTVList().rowCount();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;

Expand Down Expand Up @@ -117,7 +118,8 @@ ReadOnlyMemChunk query(
QueryContext context,
IFullPath fullPath,
long ttlLowerBound,
List<Pair<ModEntry, IMemTable>> modsToMemtabled)
List<Pair<ModEntry, IMemTable>> modsToMemtabled,
Filter globalTimeFilter)
throws IOException, QueryProcessException, MetadataException;

void queryForSeriesRegionScan(
Expand Down
Loading
Loading