Skip to content

Commit

Permalink
WAL buffer queue memory control (#14329)
Browse files Browse the repository at this point in the history
* init

* dev more

* remove comment

* fix UT

* fix concurrent issue

* add metric

* add config template

* Fix review

* Fix review

* fix name
  • Loading branch information
HTHou authored Dec 11, 2024
1 parent 05ea473 commit f2a5854
Show file tree
Hide file tree
Showing 17 changed files with 248 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ public class IoTDBConfig {
/** The proportion of write memory for compaction */
private double compactionProportion = 0.2;

/** The proportion of memtable memory for WAL queue */
private double walBufferQueueProportion = 0.1;

/** The proportion of memtable memory for device path cache */
private double devicePathCacheProportion = 0.05;

Expand Down Expand Up @@ -222,8 +225,8 @@ public class IoTDBConfig {
/** max total direct buffer off heap memory size proportion */
private double maxDirectBufferOffHeapMemorySizeProportion = 0.8;

/** Blocking queue capacity of each wal buffer */
private int walBufferQueueCapacity = 500;
/** Blocking queue capacity of each page cache deletion buffer */
private int pageCacheDeletionBufferQueueCapacity = 500;

/** Size threshold of each wal file. Unit: byte */
private volatile long walFileSizeThresholdInByte = 30 * 1024 * 1024L;
Expand Down Expand Up @@ -1984,12 +1987,12 @@ public void setMaxDirectBufferOffHeapMemorySizeProportion(
this.maxDirectBufferOffHeapMemorySizeProportion = maxDirectBufferOffHeapMemorySizeProportion;
}

public int getWalBufferQueueCapacity() {
return walBufferQueueCapacity;
public int getPageCacheDeletionBufferQueueCapacity() {
return pageCacheDeletionBufferQueueCapacity;
}

void setWalBufferQueueCapacity(int walBufferQueueCapacity) {
this.walBufferQueueCapacity = walBufferQueueCapacity;
void setPageCacheDeletionBufferQueueCapacity(int pageCacheDeletionBufferQueueCapacity) {
this.pageCacheDeletionBufferQueueCapacity = pageCacheDeletionBufferQueueCapacity;
}

public long getWalFileSizeThresholdInByte() {
Expand Down Expand Up @@ -3571,6 +3574,14 @@ public double getCompactionProportion() {
return compactionProportion;
}

public double getWalBufferQueueProportion() {
return walBufferQueueProportion;
}

public void setWalBufferQueueProportion(double walBufferQueueProportion) {
this.walBufferQueueProportion = walBufferQueueProportion;
}

public double getDevicePathCacheProportion() {
return devicePathCacheProportion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,15 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
.map(String::trim)
.orElse(Double.toString(conf.getRejectProportion())));

final double walBufferQueueProportion =
Double.parseDouble(
Optional.ofNullable(
properties.getProperty(
"wal_buffer_queue_proportion",
Double.toString(conf.getWalBufferQueueProportion())))
.map(String::trim)
.orElse(Double.toString(conf.getWalBufferQueueProportion())));

final double devicePathCacheProportion =
Double.parseDouble(
Optional.ofNullable(
Expand All @@ -343,11 +352,12 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
.map(String::trim)
.orElse(Double.toString(conf.getDevicePathCacheProportion())));

if (rejectProportion + devicePathCacheProportion >= 1) {
if (rejectProportion + walBufferQueueProportion + devicePathCacheProportion >= 1) {
LOGGER.warn(
"The sum of write_memory_proportion and device_path_cache_proportion is too large, use default values 0.8 and 0.05.");
"The sum of reject_proportion, wal_buffer_queue_proportion and device_path_cache_proportion is too large, use default values 0.8, 0.1 and 0.05.");
} else {
conf.setRejectProportion(rejectProportion);
conf.setWalBufferQueueProportion(walBufferQueueProportion);
conf.setDevicePathCacheProportion(devicePathCacheProportion);
}

Expand Down Expand Up @@ -1687,16 +1697,16 @@ private void loadWALProps(TrimProperties properties) throws IOException {
conf.setWalBufferSize(walBufferSize);
}

int walBufferQueueCapacity =
int pageCacheDeletionBufferQueueCapacity =
Integer.parseInt(
Optional.ofNullable(
properties.getProperty(
"wal_buffer_queue_capacity",
Integer.toString(conf.getWalBufferQueueCapacity())))
"page_cache_deletion_buffer_queue_capacity",
Integer.toString(conf.getPageCacheDeletionBufferQueueCapacity())))
.map(String::trim)
.orElse(Integer.toString(conf.getWalBufferQueueCapacity())));
if (walBufferQueueCapacity > 0) {
conf.setWalBufferQueueCapacity(walBufferQueueCapacity);
.orElse(Integer.toString(conf.getPageCacheDeletionBufferQueueCapacity())));
if (pageCacheDeletionBufferQueueCapacity > 0) {
conf.setPageCacheDeletionBufferQueueCapacity(pageCacheDeletionBufferQueueCapacity);
}

boolean WALInsertNodeCacheShrinkClearEnabled =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer {
// Buffer config keep consistent with WAL.
private static final int ONE_THIRD_WAL_BUFFER_SIZE = config.getWalBufferSize() / 3;
private static final double FSYNC_BUFFER_RATIO = 0.95;
private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
private static final int QUEUE_CAPACITY = config.getPageCacheDeletionBufferQueueCapacity();
private static final long MAX_WAIT_CLOSE_TIME_IN_MS = 10000;

// DeletionResources received from storage engine, which is waiting to be persisted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
Expand Down Expand Up @@ -85,6 +86,8 @@ public abstract class InsertNode extends SearchNode {

protected ProgressIndex progressIndex;

protected long memorySize;

private static final DeviceIDFactory deviceIDFactory = DeviceIDFactory.getInstance();

protected InsertNode(PlanNodeId id) {
Expand Down Expand Up @@ -418,4 +421,11 @@ protected PartialPath readTargetPath(DataInputStream stream)
return DataNodeDevicePathCache.getInstance()
.getPartialPath(ReadWriteIOUtils.readString(stream));
}

public long getMemorySize() {
if (memorySize == 0) {
memorySize = InsertNodeMemoryEstimator.sizeOf(this);
}
return memorySize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.CheckpointType;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
Expand Down Expand Up @@ -182,6 +183,8 @@ private void unbindFlushSubTaskMetrics(AbstractMetricService metricService) {
public static final String READ_WAL_BUFFER_COST_NS = "read_wal_buffer_cost";
public static final String WRITE_WAL_BUFFER_COST_NS = "write_wal_buffer_cost";
public static final String ENTRIES_COUNT = "entries_count";
public static final String WAL_QUEUE_CURRENT_MEM_COST = "wal_queue_current_mem_cost";
public static final String WAL_QUEUE_MAX_MEM_COST = "wal_queue_max_mem_cost";

private Histogram usedRatioHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
private Histogram entriesCountHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
Expand All @@ -192,6 +195,7 @@ private void unbindFlushSubTaskMetrics(AbstractMetricService metricService) {
private Histogram readWALBufferSizeHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
private Histogram readWALBufferCostHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
private Histogram writeWALBufferCostHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
private Gauge walQueueMaxMemSizeGauge = DoNothingMetricManager.DO_NOTHING_GAUGE;

private void bindWALMetrics(AbstractMetricService metricService) {
metricService.createAutoGauge(
Expand Down Expand Up @@ -253,6 +257,20 @@ private void bindWALMetrics(AbstractMetricService metricService) {
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
WRITE_WAL_BUFFER_COST_NS);
SystemInfo systemInfo = SystemInfo.getInstance();
metricService.createAutoGauge(
Metric.WAL_QUEUE_MEM_COST.toString(),
MetricLevel.IMPORTANT,
systemInfo,
SystemInfo::getCurrentWalQueueMemoryCost,
Tag.NAME.toString(),
WAL_QUEUE_CURRENT_MEM_COST);
walQueueMaxMemSizeGauge =
metricService.getOrCreateGauge(
Metric.WAL_QUEUE_MEM_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
WAL_QUEUE_MAX_MEM_COST);
}

private void unbindWALMetrics(AbstractMetricService metricService) {
Expand All @@ -274,6 +292,16 @@ private void unbindWALMetrics(AbstractMetricService metricService) {
name ->
metricService.remove(
MetricType.HISTOGRAM, Metric.WAL_BUFFER.toString(), Tag.NAME.toString(), name));
metricService.remove(
MetricType.AUTO_GAUGE,
Metric.WAL_QUEUE_MEM_COST.toString(),
Tag.NAME.toString(),
WAL_QUEUE_CURRENT_MEM_COST);
metricService.remove(
MetricType.GAUGE,
Metric.WAL_QUEUE_MEM_COST.toString(),
Tag.NAME.toString(),
WAL_QUEUE_MAX_MEM_COST);
}

// endregion
Expand Down Expand Up @@ -909,6 +937,10 @@ public void recordWALBufferEntriesCount(long count) {
entriesCountHistogram.update(count);
}

public void recordWALQueueMaxMemorySize(long size) {
walQueueMaxMemSizeGauge.set(size);
}

public void recordFlushThreshold(double flushThreshold) {
flushThreholdGauge.set((long) flushThreshold);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ public long getTotalMemTableSize() {

@Override
public double getFlushThershold() {
return SystemInfo.getInstance().getFlushThershold();
return SystemInfo.getInstance().getFlushThreshold();
}

@Override
public double getRejectThershold() {
return SystemInfo.getInstance().getRejectThershold();
return SystemInfo.getInstance().getRejectThreshold();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,6 @@ public int serializedSize() {
/** Notice: this method is concurrent unsafe. */
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
// TODO:[WAL]
WALWriteUtils.write(isSignalMemTable(), buffer);
if (isSignalMemTable()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALNodeClosedException;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.MemoryControlledWALEntryQueue;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
Expand All @@ -57,8 +58,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -77,15 +76,14 @@ public class WALBuffer extends AbstractWALBuffer {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
public static final int ONE_THIRD_WAL_BUFFER_SIZE = config.getWalBufferSize() / 3;
private static final double FSYNC_BUFFER_RATIO = 0.95;
private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();

// whether close method is called
private volatile boolean isClosed = false;
// manage checkpoints
private final CheckpointManager checkpointManager;
// WALEntries
private final BlockingQueue<WALEntry> walEntries = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
private final MemoryControlledWALEntryQueue walEntries = new MemoryControlledWALEntryQueue();
// lock to provide synchronization for double buffers mechanism, protecting buffers status
private final Lock buffersLock = new ReentrantLock();
// condition to guarantee correctness of switching buffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,6 @@ public WALFlushListener getWalFlushListener() {
}

public abstract boolean isSignal();

public abstract long getMemorySize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@

import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;

import org.apache.tsfile.utils.RamUsageEstimator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -102,6 +106,14 @@ public TabletInfo(List<int[]> tabletRangeList) {
this.tabletRangeList = new ArrayList<>(tabletRangeList);
}

public int getRangeRowCount() {
int count = 0;
for (int[] range : tabletRangeList) {
count += range[1] - range[0];
}
return count;
}

@Override
public int hashCode() {
return Objects.hash(tabletRangeList);
Expand Down Expand Up @@ -137,6 +149,28 @@ public boolean isSignal() {
return false;
}

@Override
public long getMemorySize() {
switch (type) {
case INSERT_TABLET_NODE:
return ((InsertNode) value).getMemorySize()
/ ((InsertTabletNode) value).getRowCount()
* tabletInfo.getRangeRowCount();
case INSERT_ROW_NODE:
case INSERT_ROWS_NODE:
return ((InsertNode) value).getMemorySize();
case MEMORY_TABLE_SNAPSHOT:
return ((IMemTable) value).getTVListsRamCost();
case DELETE_DATA_NODE:
case RELATIONAL_DELETE_DATA_NODE:
case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE:
case MEMORY_TABLE_CHECKPOINT:
return RamUsageEstimator.sizeOfObject(value);
default:
throw new RuntimeException("Unsupported wal entry type " + type);
}
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), tabletInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ public void serialize(ByteBuffer buffer) {
public boolean isSignal() {
return true;
}

@Override
public long getMemorySize() {
return Byte.BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,14 @@ private void summarizeExecuteResult() {
.getName()),
StringUtils.join(successfullyDeleted, ","),
fileIndexAfterFilterSafelyDeleteIndex,
System.getProperty("line.separator")));
System.lineSeparator()));

if (!pinnedMemTableIds.isEmpty()) {
summary
.append("- MemTable has been flushed but pinned by PIPE, the MemTableId list is : ")
.append(StringUtils.join(pinnedMemTableIds, ","))
.append(".")
.append(System.getProperty("line.separator"));
.append(System.lineSeparator());
}
if (fileIndexAfterFilterSafelyDeleteIndex < sortedWalFilesExcludingLast.length) {
summary.append(
Expand Down
Loading

0 comments on commit f2a5854

Please sign in to comment.