Skip to content

Commit

Permalink
delay sort & statistic generation to query execution
Browse files Browse the repository at this point in the history
  • Loading branch information
shizy818 committed Dec 13, 2024
1 parent 45db409 commit 9411fea
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ public ITimeSeriesMetadata generateTimeSeriesMetadata(

for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) {
if (!memChunk.isEmpty()) {
memChunk.sortTvLists();
memChunk.initChunkMetaFromTvLists();
seriesStatistics.mergeStatistics(memChunk.getChunkMetaData().getStatistics());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,9 @@ public ReadOnlyMemChunk(
this.pageStatisticsList = new ArrayList<>();
this.pageOffsetsList = new ArrayList<>();
this.context.addTVListToSet(tvListQueryMap);

initChunkAndPageStatistics();
}

private void initChunkAndPageStatistics() {
// create chunk metadata
Statistics chunkStatistics = Statistics.getStatsByType(dataType);
IChunkMetadata metaData =
new ChunkMetadata(measurementUid, dataType, null, null, 0, chunkStatistics);
metaData.setChunkLoader(new MemChunkLoader(context, this));
metaData.setVersion(Long.MAX_VALUE);
cachedMetaData = metaData;

sortTvLists();
updateChunkAndPageStatisticsFromTvLists();
}

private void sortTvLists() {
public void sortTvLists() {
for (Map.Entry<TVList, Integer> entry : getTvListQueryMap().entrySet()) {
TVList tvList = entry.getKey();
int queryRowCount = entry.getValue();
Expand All @@ -152,9 +137,9 @@ private void sortTvLists() {
}
}

private void updateChunkAndPageStatisticsFromTvLists() {
Statistics chunkStatistics = cachedMetaData.getStatistics();

public void initChunkMetaFromTvLists() {
// create chunk statistics
Statistics chunkStatistics = Statistics.getStatsByType(dataType);
int cnt = 0;
int[] deleteCursor = {0};
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
Expand Down Expand Up @@ -209,6 +194,13 @@ private void updateChunkAndPageStatisticsFromTvLists() {
}
pageOffsetsList.add(Arrays.copyOf(tvListOffsets, tvListOffsets.length));
chunkStatistics.setEmpty(cnt == 0);

// chunk meta
IChunkMetadata metaData =
new ChunkMetadata(measurementUid, dataType, null, null, 0, chunkStatistics);
metaData.setChunkLoader(new MemChunkLoader(context, this));
metaData.setVersion(Long.MAX_VALUE);
cachedMetaData = metaData;
}

public TSDataType getDataType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,31 +99,33 @@ protected void handoverTvList() {
@Override
public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
boolean shouldFlush;
switch (schema.getType()) {
case BOOLEAN:
shouldFlush = putBooleanWithFlushCheck(insertTime, (boolean) objectValue);
break;
case INT32:
case DATE:
shouldFlush = putIntWithFlushCheck(insertTime, (int) objectValue);
break;
case INT64:
case TIMESTAMP:
shouldFlush = putLongWithFlushCheck(insertTime, (long) objectValue);
break;
case FLOAT:
shouldFlush = putFloatWithFlushCheck(insertTime, (float) objectValue);
break;
case DOUBLE:
shouldFlush = putDoubleWithFlushCheck(insertTime, (double) objectValue);
break;
case TEXT:
case BLOB:
case STRING:
shouldFlush = putBinaryWithFlushCheck(insertTime, (Binary) objectValue);
break;
default:
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType().name());
synchronized (list) {
switch (schema.getType()) {
case BOOLEAN:
shouldFlush = putBooleanWithFlushCheck(insertTime, (boolean) objectValue);
break;
case INT32:
case DATE:
shouldFlush = putIntWithFlushCheck(insertTime, (int) objectValue);
break;
case INT64:
case TIMESTAMP:
shouldFlush = putLongWithFlushCheck(insertTime, (long) objectValue);
break;
case FLOAT:
shouldFlush = putFloatWithFlushCheck(insertTime, (float) objectValue);
break;
case DOUBLE:
shouldFlush = putDoubleWithFlushCheck(insertTime, (double) objectValue);
break;
case TEXT:
case BLOB:
case STRING:
shouldFlush = putBinaryWithFlushCheck(insertTime, (Binary) objectValue);
break;
default:
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType().name());
}
}
if (shouldFlush) {
return true;
Expand All @@ -145,36 +147,38 @@ public boolean writeAlignedValueWithFlushCheck(
public boolean writeWithFlushCheck(
long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) {
boolean shouldFlush;
switch (dataType) {
case BOOLEAN:
boolean[] boolValues = (boolean[]) valueList;
shouldFlush = putBooleansWithFlushCheck(times, boolValues, bitMap, start, end);
break;
case INT32:
case DATE:
int[] intValues = (int[]) valueList;
shouldFlush = putIntsWithFlushCheck(times, intValues, bitMap, start, end);
break;
case INT64:
case TIMESTAMP:
long[] longValues = (long[]) valueList;
return putLongsWithFlushCheck(times, longValues, bitMap, start, end);
case FLOAT:
float[] floatValues = (float[]) valueList;
shouldFlush = putFloatsWithFlushCheck(times, floatValues, bitMap, start, end);
break;
case DOUBLE:
double[] doubleValues = (double[]) valueList;
shouldFlush = putDoublesWithFlushCheck(times, doubleValues, bitMap, start, end);
break;
case TEXT:
case BLOB:
case STRING:
Binary[] binaryValues = (Binary[]) valueList;
shouldFlush = putBinariesWithFlushCheck(times, binaryValues, bitMap, start, end);
break;
default:
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType.name());
synchronized (list) {
switch (dataType) {
case BOOLEAN:
boolean[] boolValues = (boolean[]) valueList;
shouldFlush = putBooleansWithFlushCheck(times, boolValues, bitMap, start, end);
break;
case INT32:
case DATE:
int[] intValues = (int[]) valueList;
shouldFlush = putIntsWithFlushCheck(times, intValues, bitMap, start, end);
break;
case INT64:
case TIMESTAMP:
long[] longValues = (long[]) valueList;
return putLongsWithFlushCheck(times, longValues, bitMap, start, end);
case FLOAT:
float[] floatValues = (float[]) valueList;
shouldFlush = putFloatsWithFlushCheck(times, floatValues, bitMap, start, end);
break;
case DOUBLE:
double[] doubleValues = (double[]) valueList;
shouldFlush = putDoublesWithFlushCheck(times, doubleValues, bitMap, start, end);
break;
case TEXT:
case BLOB:
case STRING:
Binary[] binaryValues = (Binary[]) valueList;
shouldFlush = putBinariesWithFlushCheck(times, binaryValues, bitMap, start, end);
break;
default:
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType.name());
}
}
if (shouldFlush) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
Expand All @@ -83,6 +84,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -181,7 +183,8 @@ public class TsFileResource implements PersistentResource {
private Map<IFullPath, List<ReadOnlyMemChunk>> pathToReadOnlyMemChunkMap = new HashMap<>();

/** used for unsealed file to get TimeseriesMetadata */
private Map<IFullPath, ITimeSeriesMetadata> pathToTimeSeriesMetadataMap = new HashMap<>();
private Map<IFullPath, ITimeSeriesMetadata> pathToTimeSeriesMetadataMap =
new ConcurrentHashMap<>();

/**
* If it is not null, it indicates that the current tsfile resource is a snapshot of the
Expand Down Expand Up @@ -242,7 +245,6 @@ public TsFileResource(
this.timeIndex = originTsFileResource.timeIndex;
this.pathToReadOnlyMemChunkMap = pathToReadOnlyMemChunkMap;
this.pathToChunkMetadataListMap = pathToChunkMetadataListMap;
generatePathToTimeSeriesMetadataMap();
this.originTsFileResource = originTsFileResource;
this.tsFileID = originTsFileResource.tsFileID;
this.isSeq = originTsFileResource.isSeq;
Expand Down Expand Up @@ -996,8 +998,27 @@ public void setProcessor(TsFileProcessor processor) {
*
* @return TimeseriesMetadata or the first ValueTimeseriesMetadata in VectorTimeseriesMetadata
*/
public ITimeSeriesMetadata getTimeSeriesMetadata(IFullPath seriesPath) {
return pathToTimeSeriesMetadataMap.get(seriesPath);
public ITimeSeriesMetadata getTimeSeriesMetadata(IFullPath seriesPath) throws IOException {
try {
return pathToTimeSeriesMetadataMap.computeIfAbsent(
seriesPath,
k -> {
if (pathToChunkMetadataListMap.containsKey(k)) {
try {
return ResourceByPathUtils.getResourceInstance(seriesPath)
.generateTimeSeriesMetadata(
pathToReadOnlyMemChunkMap.get(seriesPath),
pathToChunkMetadataListMap.get(seriesPath));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
return null;
}
});
} catch (UncheckedIOException e) {
throw e.getCause();
}
}

public DataRegion.SettleTsFileCallBack getSettleTsFileCallBack() {
Expand Down Expand Up @@ -1272,16 +1293,6 @@ public long degradeTimeIndex() {
return beforeRamSize - ramSize;
}

private void generatePathToTimeSeriesMetadataMap() throws IOException {
for (IFullPath path : pathToChunkMetadataListMap.keySet()) {
pathToTimeSeriesMetadataMap.put(
path,
ResourceByPathUtils.getResourceInstance(path)
.generateTimeSeriesMetadata(
pathToReadOnlyMemChunkMap.get(path), pathToChunkMetadataListMap.get(path)));
}
}

public void deleteRemovedDeviceAndUpdateEndTime(Map<IDeviceID, Long> lastTimeForEachDevice) {
ITimeIndex newTimeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
for (Map.Entry<IDeviceID, Long> entry : lastTimeForEachDevice.entrySet()) {
Expand Down

0 comments on commit 9411fea

Please sign in to comment.