diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 942fe993c560..47707e53170f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -178,6 +178,9 @@ public class IoTDBConfig { /** The proportion of write memory for compaction */ private double compactionProportion = 0.2; + /** The proportion of memtable memory for WAL queue */ + private double walBufferQueueProportion = 0.1; + /** The proportion of memtable memory for device path cache */ private double devicePathCacheProportion = 0.05; @@ -222,8 +225,8 @@ public class IoTDBConfig { /** max total direct buffer off heap memory size proportion */ private double maxDirectBufferOffHeapMemorySizeProportion = 0.8; - /** Blocking queue capacity of each wal buffer */ - private int walBufferQueueCapacity = 500; + /** Blocking queue capacity of each page cache deletion buffer */ + private int pageCacheDeletionBufferQueueCapacity = 500; /** Size threshold of each wal file. Unit: byte */ private volatile long walFileSizeThresholdInByte = 30 * 1024 * 1024L; @@ -1984,12 +1987,12 @@ public void setMaxDirectBufferOffHeapMemorySizeProportion( this.maxDirectBufferOffHeapMemorySizeProportion = maxDirectBufferOffHeapMemorySizeProportion; } - public int getWalBufferQueueCapacity() { - return walBufferQueueCapacity; + public int getPageCacheDeletionBufferQueueCapacity() { + return pageCacheDeletionBufferQueueCapacity; } - void setWalBufferQueueCapacity(int walBufferQueueCapacity) { - this.walBufferQueueCapacity = walBufferQueueCapacity; + void setPageCacheDeletionBufferQueueCapacity(int pageCacheDeletionBufferQueueCapacity) { + this.pageCacheDeletionBufferQueueCapacity = pageCacheDeletionBufferQueueCapacity; } public long getWalFileSizeThresholdInByte() { @@ -3571,6 +3574,14 @@ public double getCompactionProportion() { return compactionProportion; } + public double getWalBufferQueueProportion() { + return walBufferQueueProportion; + } + + public void setWalBufferQueueProportion(double walBufferQueueProportion) { + this.walBufferQueueProportion = walBufferQueueProportion; + } + public double getDevicePathCacheProportion() { return devicePathCacheProportion; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index b652acf73294..50dd5b3f0bd4 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -334,6 +334,15 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException .map(String::trim) .orElse(Double.toString(conf.getRejectProportion()))); + final double walBufferQueueProportion = + Double.parseDouble( + Optional.ofNullable( + properties.getProperty( + "wal_buffer_queue_proportion", + Double.toString(conf.getWalBufferQueueProportion()))) + .map(String::trim) + .orElse(Double.toString(conf.getWalBufferQueueProportion()))); + final double devicePathCacheProportion = Double.parseDouble( Optional.ofNullable( @@ -343,11 +352,12 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException .map(String::trim) .orElse(Double.toString(conf.getDevicePathCacheProportion()))); - if (rejectProportion + devicePathCacheProportion >= 1) { + if (rejectProportion + walBufferQueueProportion + devicePathCacheProportion >= 1) { LOGGER.warn( - "The sum of write_memory_proportion and device_path_cache_proportion is too large, use default values 0.8 and 0.05."); + "The sum of reject_proportion, wal_buffer_queue_proportion and device_path_cache_proportion is too large, use default values 0.8, 0.1 and 0.05."); } else { conf.setRejectProportion(rejectProportion); + conf.setWalBufferQueueProportion(walBufferQueueProportion); conf.setDevicePathCacheProportion(devicePathCacheProportion); } @@ -1687,16 +1697,16 @@ private void loadWALProps(TrimProperties properties) throws IOException { conf.setWalBufferSize(walBufferSize); } - int walBufferQueueCapacity = + int pageCacheDeletionBufferQueueCapacity = Integer.parseInt( Optional.ofNullable( properties.getProperty( - "wal_buffer_queue_capacity", - Integer.toString(conf.getWalBufferQueueCapacity()))) + "page_cache_deletion_buffer_queue_capacity", + Integer.toString(conf.getPageCacheDeletionBufferQueueCapacity()))) .map(String::trim) - .orElse(Integer.toString(conf.getWalBufferQueueCapacity()))); - if (walBufferQueueCapacity > 0) { - conf.setWalBufferQueueCapacity(walBufferQueueCapacity); + .orElse(Integer.toString(conf.getPageCacheDeletionBufferQueueCapacity()))); + if (pageCacheDeletionBufferQueueCapacity > 0) { + conf.setPageCacheDeletionBufferQueueCapacity(pageCacheDeletionBufferQueueCapacity); } boolean WALInsertNodeCacheShrinkClearEnabled = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java index 52633315e3df..dcd52086425c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java @@ -61,7 +61,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer { // Buffer config keep consistent with WAL. private static final int ONE_THIRD_WAL_BUFFER_SIZE = config.getWalBufferSize() / 3; private static final double FSYNC_BUFFER_RATIO = 0.95; - private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity(); + private static final int QUEUE_CAPACITY = config.getPageCacheDeletionBufferQueueCapacity(); private static final long MAX_WAIT_CLOSE_TIME_IN_MS = 10000; // DeletionResources received from storage engine, which is waiting to be persisted. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index b13250a37847..9161d7ca26ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; @@ -85,6 +86,8 @@ public abstract class InsertNode extends SearchNode { protected ProgressIndex progressIndex; + protected long memorySize; + private static final DeviceIDFactory deviceIDFactory = DeviceIDFactory.getInstance(); protected InsertNode(PlanNodeId id) { @@ -418,4 +421,11 @@ protected PartialPath readTargetPath(DataInputStream stream) return DataNodeDevicePathCache.getInstance() .getPartialPath(ReadWriteIOUtils.readString(stream)); } + + public long getMemorySize() { + if (memorySize == 0) { + memorySize = InsertNodeMemoryEstimator.sizeOf(this); + } + return memorySize; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java index e7194126c1b6..3d7e218b71db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.CheckpointType; +import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; @@ -182,6 +183,8 @@ private void unbindFlushSubTaskMetrics(AbstractMetricService metricService) { public static final String READ_WAL_BUFFER_COST_NS = "read_wal_buffer_cost"; public static final String WRITE_WAL_BUFFER_COST_NS = "write_wal_buffer_cost"; public static final String ENTRIES_COUNT = "entries_count"; + public static final String WAL_QUEUE_CURRENT_MEM_COST = "wal_queue_current_mem_cost"; + public static final String WAL_QUEUE_MAX_MEM_COST = "wal_queue_max_mem_cost"; private Histogram usedRatioHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; private Histogram entriesCountHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; @@ -192,6 +195,7 @@ private void unbindFlushSubTaskMetrics(AbstractMetricService metricService) { private Histogram readWALBufferSizeHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; private Histogram readWALBufferCostHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; private Histogram writeWALBufferCostHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Gauge walQueueMaxMemSizeGauge = DoNothingMetricManager.DO_NOTHING_GAUGE; private void bindWALMetrics(AbstractMetricService metricService) { metricService.createAutoGauge( @@ -253,6 +257,20 @@ private void bindWALMetrics(AbstractMetricService metricService) { MetricLevel.IMPORTANT, Tag.NAME.toString(), WRITE_WAL_BUFFER_COST_NS); + SystemInfo systemInfo = SystemInfo.getInstance(); + metricService.createAutoGauge( + Metric.WAL_QUEUE_MEM_COST.toString(), + MetricLevel.IMPORTANT, + systemInfo, + SystemInfo::getCurrentWalQueueMemoryCost, + Tag.NAME.toString(), + WAL_QUEUE_CURRENT_MEM_COST); + walQueueMaxMemSizeGauge = + metricService.getOrCreateGauge( + Metric.WAL_QUEUE_MEM_COST.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + WAL_QUEUE_MAX_MEM_COST); } private void unbindWALMetrics(AbstractMetricService metricService) { @@ -274,6 +292,16 @@ private void unbindWALMetrics(AbstractMetricService metricService) { name -> metricService.remove( MetricType.HISTOGRAM, Metric.WAL_BUFFER.toString(), Tag.NAME.toString(), name)); + metricService.remove( + MetricType.AUTO_GAUGE, + Metric.WAL_QUEUE_MEM_COST.toString(), + Tag.NAME.toString(), + WAL_QUEUE_CURRENT_MEM_COST); + metricService.remove( + MetricType.GAUGE, + Metric.WAL_QUEUE_MEM_COST.toString(), + Tag.NAME.toString(), + WAL_QUEUE_MAX_MEM_COST); } // endregion @@ -909,6 +937,10 @@ public void recordWALBufferEntriesCount(long count) { entriesCountHistogram.update(count); } + public void recordWALQueueMaxMemorySize(long size) { + walQueueMaxMemSizeGauge.set(size); + } + public void recordFlushThreshold(double flushThreshold) { flushThreholdGauge.set((long) flushThreshold); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java index 0d610c5af2f4..503a3d35cbd2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitor.java @@ -151,12 +151,12 @@ public long getTotalMemTableSize() { @Override public double getFlushThershold() { - return SystemInfo.getInstance().getFlushThershold(); + return SystemInfo.getInstance().getFlushThreshold(); } @Override public double getRejectThershold() { - return SystemInfo.getInstance().getRejectThershold(); + return SystemInfo.getInstance().getRejectThreshold(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index fff93680c9c8..27f681663d52 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -903,7 +903,6 @@ public int serializedSize() { /** Notice: this method is concurrent unsafe. */ @Override public void serializeToWAL(IWALByteBufferView buffer) { - // TODO:[WAL] WALWriteUtils.write(isSignalMemTable(), buffer); if (isSignalMemTable()) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java index 36ba97dadf08..cfe272225f71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALNodeClosedException; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.MemoryControlledWALEntryQueue; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; @@ -57,8 +58,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -77,7 +76,6 @@ public class WALBuffer extends AbstractWALBuffer { private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); public static final int ONE_THIRD_WAL_BUFFER_SIZE = config.getWalBufferSize() / 3; private static final double FSYNC_BUFFER_RATIO = 0.95; - private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity(); private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance(); // whether close method is called @@ -85,7 +83,7 @@ public class WALBuffer extends AbstractWALBuffer { // manage checkpoints private final CheckpointManager checkpointManager; // WALEntries - private final BlockingQueue walEntries = new ArrayBlockingQueue<>(QUEUE_CAPACITY); + private final MemoryControlledWALEntryQueue walEntries = new MemoryControlledWALEntryQueue(); // lock to provide synchronization for double buffers mechanism, protecting buffers status private final Lock buffersLock = new ReentrantLock(); // condition to guarantee correctness of switching buffers diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java index 82d44213483a..bb5969dde9d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java @@ -196,4 +196,6 @@ public WALFlushListener getWalFlushListener() { } public abstract boolean isSignal(); + + public abstract long getMemorySize(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java index e065dbc2b587..6da50edba4f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java @@ -21,9 +21,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; +import org.apache.tsfile.utils.RamUsageEstimator; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -102,6 +106,14 @@ public TabletInfo(List tabletRangeList) { this.tabletRangeList = new ArrayList<>(tabletRangeList); } + public int getRangeRowCount() { + int count = 0; + for (int[] range : tabletRangeList) { + count += range[1] - range[0]; + } + return count; + } + @Override public int hashCode() { return Objects.hash(tabletRangeList); @@ -137,6 +149,28 @@ public boolean isSignal() { return false; } + @Override + public long getMemorySize() { + switch (type) { + case INSERT_TABLET_NODE: + return ((InsertNode) value).getMemorySize() + / ((InsertTabletNode) value).getRowCount() + * tabletInfo.getRangeRowCount(); + case INSERT_ROW_NODE: + case INSERT_ROWS_NODE: + return ((InsertNode) value).getMemorySize(); + case MEMORY_TABLE_SNAPSHOT: + return ((IMemTable) value).getTVListsRamCost(); + case DELETE_DATA_NODE: + case RELATIONAL_DELETE_DATA_NODE: + case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE: + case MEMORY_TABLE_CHECKPOINT: + return RamUsageEstimator.sizeOfObject(value); + default: + throw new RuntimeException("Unsupported wal entry type " + type); + } + } + @Override public int hashCode() { return Objects.hash(super.hashCode(), tabletInfo); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALSignalEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALSignalEntry.java index 0839659dd455..86064d3bf2fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALSignalEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALSignalEntry.java @@ -57,4 +57,9 @@ public void serialize(ByteBuffer buffer) { public boolean isSignal() { return true; } + + @Override + public long getMemorySize() { + return Byte.BYTES; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index 73a21772f0c3..7028872fcadf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -410,14 +410,14 @@ private void summarizeExecuteResult() { .getName()), StringUtils.join(successfullyDeleted, ","), fileIndexAfterFilterSafelyDeleteIndex, - System.getProperty("line.separator"))); + System.lineSeparator())); if (!pinnedMemTableIds.isEmpty()) { summary .append("- MemTable has been flushed but pinned by PIPE, the MemTableId list is : ") .append(StringUtils.join(pinnedMemTableIds, ",")) .append(".") - .append(System.getProperty("line.separator")); + .append(System.lineSeparator()); } if (fileIndexAfterFilterSafelyDeleteIndex < sortedWalFilesExcludingLast.length) { summary.append( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java new file mode 100644 index 000000000000..3fabd49f3f78 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/MemoryControlledWALEntryQueue.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.wal.utils; + +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; +import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class MemoryControlledWALEntryQueue { + + private final BlockingQueue queue; + private final Object nonFullCondition = new Object(); + + public MemoryControlledWALEntryQueue() { + queue = new LinkedBlockingQueue<>(); + } + + public WALEntry poll(long timeout, TimeUnit unit) throws InterruptedException { + WALEntry e = queue.poll(timeout, unit); + if (e != null) { + SystemInfo.getInstance().updateWalQueueMemoryCost(-getElementSize(e)); + synchronized (nonFullCondition) { + nonFullCondition.notifyAll(); + } + } + return e; + } + + public void put(WALEntry e) throws InterruptedException { + long elementSize = getElementSize(e); + synchronized (nonFullCondition) { + while (SystemInfo.getInstance().cannotReserveMemoryForWalEntry(elementSize)) { + nonFullCondition.wait(); + } + } + queue.put(e); + SystemInfo.getInstance().updateWalQueueMemoryCost(elementSize); + } + + public WALEntry take() throws InterruptedException { + WALEntry e = queue.take(); + SystemInfo.getInstance().updateWalQueueMemoryCost(-getElementSize(e)); + synchronized (nonFullCondition) { + nonFullCondition.notifyAll(); + } + + return e; + } + + public int size() { + return queue.size(); + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + + private long getElementSize(WALEntry walEntry) { + return walEntry.getMemorySize(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index 0458320af052..e0b9d50cb811 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -54,8 +54,9 @@ public class SystemInfo { private long memorySizeForMemtable; private long memorySizeForCompaction; + private long memorySizeForWalBufferQueue; private long totalDirectBufferMemorySizeLimit; - private Map reportedStorageGroupMemCostMap = new HashMap<>(); + private final Map reportedStorageGroupMemCostMap = new HashMap<>(); private long flushingMemTablesCost = 0L; private final AtomicLong directBufferMemoryCost = new AtomicLong(0); @@ -76,6 +77,8 @@ public class SystemInfo { private volatile boolean isEncodingFasterThanIo = true; + private final AtomicLong walBufferQueueMemoryCost = new AtomicLong(0); + private SystemInfo() { allocateWriteMemory(); } @@ -113,7 +116,7 @@ public synchronized boolean reportStorageGroupStatus( return true; } else { logger.info( - "Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}), REJECT_THERSHOLD ({})", + "Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}), REJECT_THRESHOLD ({})", dataRegionInfo.getDataRegion().getDatabaseName(), delta, totalStorageGroupMemCost, @@ -403,10 +406,16 @@ public void allocateWriteMemory() { (config.getAllocateMemoryForStorageEngine() * config.getWriteProportionForMemtable()); memorySizeForCompaction = (long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion()); + memorySizeForWalBufferQueue = + (long) + (config.getAllocateMemoryForStorageEngine() + * config.getWriteProportionForMemtable() + * config.getWalBufferQueueProportion()); FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion(); REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion(); WritingMetrics.getInstance().recordFlushThreshold(FLUSH_THRESHOLD); WritingMetrics.getInstance().recordRejectThreshold(REJECT_THRESHOLD); + WritingMetrics.getInstance().recordWALQueueMaxMemorySize(memorySizeForWalBufferQueue); } @TestOnly @@ -537,11 +546,23 @@ public long getTotalMemTableSize() { return totalStorageGroupMemCost; } - public double getFlushThershold() { + public double getFlushThreshold() { return FLUSH_THRESHOLD; } - public double getRejectThershold() { + public double getRejectThreshold() { return REJECT_THRESHOLD; } + + public long getCurrentWalQueueMemoryCost() { + return walBufferQueueMemoryCost.get(); + } + + public void updateWalQueueMemoryCost(long delta) { + walBufferQueueMemoryCost.addAndGet(delta); + } + + public boolean cannotReserveMemoryForWalEntry(long walEntrySize) { + return walBufferQueueMemoryCost.get() + walEntrySize > memorySizeForWalBufferQueue; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java index 8c5ea0246e66..b9152ec9132f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/buffer/CacheHitRatioMonitorTest.java @@ -77,11 +77,11 @@ public void testCacheHitRatioMonitor() { SystemInfo.getInstance().getTotalMemTableSize(), cacheHitRatioMonitor.getTotalMemTableSize()); assertEquals( - SystemInfo.getInstance().getFlushThershold(), + SystemInfo.getInstance().getFlushThreshold(), cacheHitRatioMonitor.getFlushThershold(), delta); assertEquals( - SystemInfo.getInstance().getRejectThershold(), + SystemInfo.getInstance().getRejectThreshold(), cacheHitRatioMonitor.getRejectThershold(), delta); assertEquals( diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 2efae33aae9c..3030bf795d40 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -725,25 +725,30 @@ primitive_array_size=64 # Datatype: double chunk_metadata_size_proportion=0.1 -# Ratio of write memory for invoking flush disk, 0.4 by default +# Ratio of memtable memory for invoking flush disk, 0.4 by default # If you have extremely high write load (like batch=1000), it can be set lower than the default value like 0.2 # effectiveMode: restart # Datatype: double flush_proportion=0.4 -# Ratio of write memory allocated for buffered arrays, 0.6 by default +# Ratio of memtable memory allocated for buffered arrays, 0.6 by default # effectiveMode: restart # Datatype: double buffered_arrays_memory_proportion=0.6 -# Ratio of write memory for rejecting insertion, 0.8 by default +# Ratio of memtable memory for rejecting insertion, 0.8 by default # If you have extremely high write load (like batch=1000) and the physical memory size is large enough, # it can be set higher than the default value like 0.9 # effectiveMode: restart # Datatype: double reject_proportion=0.8 -# Ratio of memory for the DevicePathCache. DevicePathCache is the deviceId cache, keep only one copy of the same deviceId in memory +# Ratio of memtable memory for controlling the max memory size of WAL buffer queues, 0.1 by default +# effectiveMode: restart +# Datatype: double +wal_buffer_queue_proportion=0.1 + +# Ratio of memtable memory for the DevicePathCache. DevicePathCache is the deviceId cache, keep only one copy of the same deviceId in memory # effectiveMode: restart # Datatype: double device_path_cache_proportion=0.05 @@ -1403,11 +1408,6 @@ wal_sync_mode_fsync_delay_in_ms=3 # Datatype: int wal_buffer_size_in_byte=33554432 -# Blocking queue capacity of each wal buffer, restricts maximum number of WALEdits cached in the blocking queue. -# effectiveMode: restart -# Datatype: int -wal_buffer_queue_capacity=500 - # Size threshold of each wal file # When a wal file's size exceeds this, the wal file will be closed and a new wal file will be created. # If it's a value smaller than 0, use the default value 30 * 1024 * 1024 (30MB). @@ -1464,6 +1464,11 @@ iot_consensus_cache_window_time_in_ms=-1 # Datatype: boolean enable_wal_compression=true +# Blocking queue capacity of each PageCacheDeletionBuffer, restricts maximum number of entry in the blocking queue. +# effectiveMode: restart +# Datatype: int +page_cache_deletion_buffer_queue_capacity=500 + #################### ### IoTConsensus Configuration #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 04757610064a..0894f33dc39b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -74,6 +74,7 @@ public enum Metric { WAL_BUFFER("wal_buffer"), PENDING_FLUSH_TASK("pending_flush_task"), WAL_COST("wal_cost"), + WAL_QUEUE_MEM_COST("wal_queue_mem_cost"), FLUSH_COST("flush_cost"), FLUSH_SUB_TASK_COST("flush_sub_task_cost"), MEMTABLE_THRESHOLD("memtable_threshold"),