Skip to content

Commit

Permalink
Fix ThreadPoolMetrics concurrent NPE bug & Fix metric leaks when freq…
Browse files Browse the repository at this point in the history
…uently creating and deleting database (#14388)

* fix concurrent bug

Signed-off-by: OneSizeFitQuorum <[email protected]>

* fix threadpoolmetric leak & points asyncreporter metric leak

Signed-off-by: OneSizeFitQuorum <[email protected]>

* fix compile

Signed-off-by: OneSizeFitQuorum <[email protected]>

* fix gile_global_count & dataregion mem leak

Signed-off-by: OneSizeFitQuorum <[email protected]>

---------

Signed-off-by: OneSizeFitQuorum <[email protected]>
  • Loading branch information
OneSizeFitsQuorum authored Dec 12, 2024
1 parent d3b96d8 commit 88ffa48
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
private final AtomicReference<Meter> dataRegionCommitMeter = new AtomicReference<>(null);
private final AtomicReference<Meter> schemaRegionCommitMeter = new AtomicReference<>(null);
private final IoTDBHistogram collectInvocationHistogram =
(IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram(null);
(IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram();

private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.type.Gauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;

import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
Expand Down Expand Up @@ -120,6 +121,10 @@ public void deleteRegion(String database, String regionId) {
.forEach(map -> deleteRegionFromMap(map, database, regionId));
Arrays.asList(seqFileSizeMap, unseqFileSizeMap)
.forEach(map -> deleteRegionFromMap(map, database, regionId));
Arrays.asList(SEQUENCE, UNSEQUENCE)
.forEach(orderStr -> deleteGlobalTsFileCountGauge(orderStr, database, regionId));
Arrays.asList(SEQUENCE, UNSEQUENCE)
.forEach(orderStr -> deleteGlobalTsFileSizeGauge(orderStr, database, regionId));
}

private <T> void deleteRegionFromMap(
Expand Down Expand Up @@ -199,6 +204,20 @@ public Gauge getOrCreateGlobalTsFileCountGauge(
regionId);
}

public void deleteGlobalTsFileCountGauge(String orderStr, String database, String regionId) {
metricService
.get()
.remove(
MetricType.GAUGE,
FILE_GLOBAL_COUNT,
Tag.NAME.toString(),
orderStr,
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
regionId);
}

private void updateGlobalTsFileSizeMap(
Map<String, Map<String, Pair<Long, Gauge>>> map,
String orderStr,
Expand Down Expand Up @@ -246,6 +265,20 @@ public Gauge getOrCreateGlobalTsFileSizeGauge(String orderStr, String database,
regionId);
}

public void deleteGlobalTsFileSizeGauge(String orderStr, String database, String regionId) {
metricService
.get()
.remove(
MetricType.GAUGE,
FILE_GLOBAL_SIZE,
Tag.NAME.toString(),
orderStr,
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
regionId);
}

// endregion

// region update level tsfile value map and gauge map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ public class DataRegion implements IDataRegionForQuery {
PerformanceOverviewMetrics.getInstance();
private final ExecutorService upgradeModFileThreadPool;

private final DataRegionMetrics metrics;

/**
* Construct a database processor.
*
Expand Down Expand Up @@ -385,7 +387,8 @@ public DataRegion(
recover();
}

MetricService.getInstance().addMetricSet(new DataRegionMetrics(this));
this.metrics = new DataRegionMetrics(this);
MetricService.getInstance().addMetricSet(metrics);
}

@TestOnly
Expand All @@ -396,6 +399,7 @@ public DataRegion(String databaseName, String id) {
this.partitionMaxFileVersions = new HashMap<>();
partitionMaxFileVersions.put(0L, 0L);
upgradeModFileThreadPool = null;
this.metrics = new DataRegionMetrics(this);
}

@Override
Expand Down Expand Up @@ -3691,6 +3695,7 @@ public void waitForDeleted() {
deletedCondition.await();
}
FileMetrics.getInstance().deleteRegion(databaseName, dataRegionId);
MetricService.getInstance().removeMetricSet(metrics);
} catch (InterruptedException e) {
logger.error("Interrupted When waiting for data region deleted.");
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import java.util.Objects;

public class DataRegionMetrics implements IMetricSet {
private DataRegion dataRegion;
private String storageGroupName;
private final DataRegion dataRegion;
private final String databaseName;

public DataRegionMetrics(DataRegion dataRegion) {
this.dataRegion = dataRegion;
this.storageGroupName = dataRegion.getDatabaseName();
this.databaseName = dataRegion.getDatabaseName();
}

@Override
Expand All @@ -45,7 +45,7 @@ public void bindTo(AbstractMetricService metricService) {
dataRegion,
DataRegion::getMemCost,
Tag.NAME.toString(),
"database_" + storageGroupName);
"database_" + databaseName);
}

@Override
Expand All @@ -54,7 +54,7 @@ public void unbindFrom(AbstractMetricService metricService) {
MetricType.AUTO_GAUGE,
Metric.MEM.toString(),
Tag.NAME.toString(),
"database_" + storageGroupName);
"database_" + databaseName);
}

@Override
Expand All @@ -67,11 +67,11 @@ public boolean equals(Object o) {
}
DataRegionMetrics that = (DataRegionMetrics) o;
return Objects.equals(dataRegion, that.dataRegion)
&& Objects.equals(storageGroupName, that.storageGroupName);
&& Objects.equals(databaseName, that.databaseName);
}

@Override
public int hashCode() {
return Objects.hash(dataRegion, storageGroupName);
return Objects.hash(dataRegion, databaseName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import org.apache.iotdb.metrics.utils.MetricType;

public class TsFileProcessorInfoMetrics implements IMetricSet {
private final String storageGroupName;
private final String databaseName;
private final TsFileProcessorInfo tsFileProcessorInfo;

public TsFileProcessorInfoMetrics(
String storageGroupName, TsFileProcessorInfo tsFileProcessorInfo) {
this.storageGroupName = storageGroupName;
this.databaseName = storageGroupName;
this.tsFileProcessorInfo = tsFileProcessorInfo;
}

Expand All @@ -45,7 +45,7 @@ public void bindTo(AbstractMetricService metricService) {
tsFileProcessorInfo,
TsFileProcessorInfo::getMemCost,
Tag.NAME.toString(),
"chunkMetaData_" + storageGroupName);
"chunkMetaData_" + databaseName);
}

@Override
Expand All @@ -55,6 +55,6 @@ public void unbindFrom(AbstractMetricService metricService) {
MetricType.AUTO_GAUGE,
Metric.MEM.toString(),
Tag.NAME.toString(),
"chunkMetaData_" + storageGroupName);
"chunkMetaData_" + databaseName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public Gauge createGauge() {
}

@Override
public Histogram createHistogram(MetricInfo metricInfo) {
public Histogram createHistogram() {
// create distributionSummary
io.micrometer.core.instrument.DistributionSummary distributionSummary =
new CumulativeDistributionSummary(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public Histogram getOrCreateHistogram(String name, MetricLevel metricLevel, Stri
metrics.computeIfAbsent(
metricInfo,
key -> {
Histogram histogram = createHistogram(metricInfo);
Histogram histogram = createHistogram();
nameToMetaInfo.put(name, metricInfo.getMetaInfo());
notifyReporterOnAdd(histogram, metricInfo);
return histogram;
Expand All @@ -263,12 +263,8 @@ public Histogram getOrCreateHistogram(String name, MetricLevel metricLevel, Stri
throw new IllegalArgumentException(metricInfo + ALREADY_EXISTS);
}

/**
* Create histogram according to metric framework.
*
* @param metricInfo the metricInfo of metric
*/
protected abstract Histogram createHistogram(MetricInfo metricInfo);
/** Create histogram according to metric framework. */
protected abstract Histogram createHistogram();

/**
* Get timer. return if exists, create if not.
Expand Down Expand Up @@ -466,7 +462,7 @@ protected boolean stop() {

protected abstract boolean stopFramework();

private boolean invalid(MetricLevel metricLevel, String name, String... tags) {
public boolean invalid(MetricLevel metricLevel, String name, String... tags) {
if (!isEnableMetricInGivenLevel(metricLevel)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,29 +259,45 @@ public Timer getOrCreateTimerWithInternalReport(
/** Count with internal report. */
public void countWithInternalReportAsync(
long delta, String metric, MetricLevel metricLevel, long time, String... tags) {
internalReporter.writeMetricToIoTDB(
metricManager.count(delta, metric, metricLevel, tags), metric, time, tags);
if (metricManager.invalid(metricLevel, metric, tags)) {
return;
}
Counter counter = metricManager.createCounter();
counter.inc(delta);
internalReporter.writeMetricToIoTDB(counter, metric, time, tags);
}

/** Gauge value with internal report. */
public void gaugeWithInternalReportAsync(
long value, String metric, MetricLevel metricLevel, long time, String... tags) {
internalReporter.writeMetricToIoTDB(
metricManager.gauge(value, metric, metricLevel, tags), metric, time, tags);
if (metricManager.invalid(metricLevel, metric, tags)) {
return;
}
Gauge gauge = metricManager.createGauge();
gauge.set(value);
internalReporter.writeMetricToIoTDB(gauge, metric, time, tags);
}

/** Rate with internal report. */
public void rateWithInternalReportAsync(
long value, String metric, MetricLevel metricLevel, long time, String... tags) {
internalReporter.writeMetricToIoTDB(
metricManager.rate(value, metric, metricLevel, tags), metric, time, tags);
if (metricManager.invalid(metricLevel, metric, tags)) {
return;
}
Rate rate = metricManager.createRate();
rate.mark(value);
internalReporter.writeMetricToIoTDB(rate, metric, time, tags);
}

/** Histogram with internal report. */
public void histogramWithInternalReportAsync(
long value, String metric, MetricLevel metricLevel, long time, String... tags) {
internalReporter.writeMetricToIoTDB(
metricManager.histogram(value, metric, metricLevel, tags), metric, time, tags);
if (metricManager.invalid(metricLevel, metric, tags)) {
return;
}
Histogram histogram = metricManager.createHistogram();
histogram.update(value);
internalReporter.writeMetricToIoTDB(histogram, metric, time, tags);
}

/** Timer with internal report. */
Expand All @@ -292,8 +308,12 @@ public void timerWithInternalReportAsync(
MetricLevel metricLevel,
long time,
String... tags) {
internalReporter.writeMetricToIoTDB(
metricManager.timer(delta, timeUnit, metric, metricLevel, tags), metric, time, tags);
if (metricManager.invalid(metricLevel, metric, tags)) {
return;
}
Timer timer = metricManager.createTimer();
timer.update(delta, timeUnit);
internalReporter.writeMetricToIoTDB(timer, metric, time, tags);
}

public List<Pair<String, String[]>> getAllMetricKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Gauge createGauge() {
}

@Override
public Histogram createHistogram(MetricInfo metricInfo) {
public Histogram createHistogram() {
return DO_NOTHING_HISTOGRAM;
}

Expand Down
Loading

0 comments on commit 88ffa48

Please sign in to comment.