Skip to content

Commit

Permalink
[To dev/1.3] Add the used memory calculation for DescPriorityMergeRea…
Browse files Browse the repository at this point in the history
…der which is missed before (#14549)
  • Loading branch information
Beyyes authored Dec 26, 2024
1 parent 719931b commit e579738
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 212 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;

import org.apache.tsfile.read.reader.IPointReader;

import java.io.IOException;
import java.util.PriorityQueue;

public class DescPriorityMergeReader extends PriorityMergeReader {

public DescPriorityMergeReader() {
super.heap =
currentReadStopTime = Long.MAX_VALUE;
heap =
new PriorityQueue<>(
(o1, o2) -> {
int timeCompare =
Expand All @@ -37,13 +35,7 @@ public DescPriorityMergeReader() {
}

@Override
public void addReader(IPointReader reader, MergeReaderPriority priority, long endTime)
throws IOException {
if (reader.hasNextTimeValuePair()) {
heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
super.currentReadStopTime = Math.min(currentReadStopTime, endTime);
} else {
reader.close();
}
protected void updateCurrentReadStopTime(long endTime) {
currentReadStopTime = Math.min(currentReadStopTime, endTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;

import org.apache.tsfile.read.TimeValuePair;
Expand All @@ -34,7 +33,7 @@ public class PriorityMergeReader implements IPointReader {

// max time of all added readers in PriorityMergeReader
// or min time of all added readers in DescPriorityMergeReader
protected long currentReadStopTime;
protected long currentReadStopTime = Long.MIN_VALUE;

protected PriorityQueue<Element> heap;

Expand All @@ -57,25 +56,12 @@ public void setMemoryReservationManager(MemoryReservationManager memoryReservati
this.memoryReservationManager = memoryReservationManager;
}

@TestOnly
public void addReader(IPointReader reader, long priority) throws IOException {
if (reader.hasNextTimeValuePair()) {
heap.add(
new Element(
reader,
reader.nextTimeValuePair(),
new MergeReaderPriority(Long.MAX_VALUE, priority, 0, false)));
} else {
reader.close();
}
}

public void addReader(IPointReader reader, MergeReaderPriority priority, long endTime)
throws IOException {
if (reader.hasNextTimeValuePair()) {
Element element = new Element(reader, reader.nextTimeValuePair(), priority);
heap.add(element);
currentReadStopTime = Math.max(currentReadStopTime, endTime);
updateCurrentReadStopTime(endTime);
long size = element.getReader().getUsedMemorySize();
usedMemorySize += size;
if (memoryReservationManager != null) {
Expand All @@ -86,6 +72,10 @@ public void addReader(IPointReader reader, MergeReaderPriority priority, long en
}
}

protected void updateCurrentReadStopTime(long endTime) {
currentReadStopTime = Math.max(currentReadStopTime, endTime);
}

public long getCurrentReadStopTime() {
return currentReadStopTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,31 @@

import java.io.IOException;

public class FakedSeriesReader implements IPointReader {
public class AscFakedSeriesReader implements IPointReader {

private int index;
private int size;
private boolean initWithTimeList;
private static final TSDataType DATA_TYPE = TSDataType.INT64;
protected int index;
protected int size;
protected boolean initWithTimeList;
protected static final TSDataType DATA_TYPE = TSDataType.INT64;

// init with time list and value
private long[] timestamps;
private long value;
protected long[] timestamps;
protected long value;

// init with startTime, size, interval and modValue
private long startTime;
private int interval;
private int modValue;
protected long startTime;
protected int interval;
protected int modValue;

public FakedSeriesReader(long[] timestamps, long value) {
public AscFakedSeriesReader(long[] timestamps, long value) {
this.initWithTimeList = true;
this.index = 0;
this.size = timestamps.length;
this.timestamps = timestamps;
this.value = value;
}

public FakedSeriesReader(long startTime, int size, int interval, int modValue) {
public AscFakedSeriesReader(long startTime, int size, int interval, int modValue) {
this.initWithTimeList = false;
this.index = 0;
this.size = size;
Expand Down Expand Up @@ -82,8 +82,8 @@ public TimeValuePair currentTimeValuePair() throws IOException {

@Override
public long getUsedMemorySize() {
// not used
return 0;
// use size of timestamps to mock the used memory
return size;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.TsPrimitiveType;

public class DescFakedSeriesReader extends AscFakedSeriesReader {

public DescFakedSeriesReader(long[] timestamps, long value) {
super(timestamps, value);
index = size - 1;
}

public DescFakedSeriesReader(long startTime, int size, int interval, int modValue) {
super(startTime, size, interval, modValue);
index = size - 1;
startTime = startTime + interval * size;
}

@Override
public boolean hasNextTimeValuePair() {
return index >= 0;
}

@Override
public TimeValuePair nextTimeValuePair() {
if (initWithTimeList) {
return new TimeValuePair(timestamps[index--], TsPrimitiveType.getByType(DATA_TYPE, value));
} else {
long time = startTime;
startTime -= interval;
index--;
return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.INT64, time % modValue));
}
}
}
Loading

0 comments on commit e579738

Please sign in to comment.