Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Add Time based metric exipration. (#1996)
Browse files Browse the repository at this point in the history
Fixes #1956

### Motivation
FastThreadLocal wont cleanup until thread exit. so we can just check if
the stats is not recorded a long time if expired just remove from
threadlocal


### Modifications
add time check logic if long time have no record just remove the stats.
  • Loading branch information
lifepuzzlefun authored Aug 14, 2023
1 parent 844adb4 commit 33d3c04
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ public void clear() {
throw new UnsupportedOperationException();
}

public void rotateLatencyCollection() {
public void rotateLatencyCollection(long expiredTimeSeconds) {
// Swap current with replacement
ThreadLocalAccessor local = current;
current = replacement;
replacement = local;

final DoublesUnion aggregateSuccess = new DoublesUnionBuilder().build();
final DoublesUnion aggregateFail = new DoublesUnionBuilder().build();
local.record(aggregateSuccess, aggregateFail);
local.recordAndCheckStatsExpire(aggregateSuccess, aggregateFail, expiredTimeSeconds);
successResult = aggregateSuccess.getResultAndReset();
failResult = aggregateFail.getResultAndReset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class LocalData {
private final DoublesSketch successSketch = new DoublesSketchBuilder().build();
private final DoublesSketch failSketch = new DoublesSketchBuilder().build();
private final StampedLock lock = new StampedLock();
private volatile long lastHasRecordTime = System.currentTimeMillis();

public void updateSuccessSketch(double value) {
long stamp = lock.readLock();
Expand All @@ -42,9 +43,18 @@ public void updateFailedSketch(double value) {
}
}

public long lastHasRecordTime() {
return lastHasRecordTime;
}

public void record(DoublesUnion aggregateSuccess, DoublesUnion aggregateFail) {
long stamp = lock.writeLock();
try {
boolean isEmpty = successSketch.isEmpty() && failSketch.isEmpty();
if (!isEmpty) {
lastHasRecordTime = System.currentTimeMillis();
}

aggregateSuccess.update(successSketch);
successSketch.reset();
aggregateFail.update(failSketch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public class PrometheusMetricsProvider implements PrometheusRawMetricsProvider {
public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;

public static final String PROMETHEUS_STATS_EXPIRED_SECONDS = "prometheusStatsExpiredSeconds";

public static final long DEFAULT_PROMETHEUS_STATS_EXPIRED_SECONDS = TimeUnit.HOURS.toSeconds(1);

private long expiredTimeSeconds;

private static final String KOP_PROMETHEUS_STATS_CLUSTER = "cluster";
private final Map<String, String> defaultStatsLoggerLabels = new HashMap<>();

Expand Down Expand Up @@ -65,11 +71,14 @@ public void start(Configuration conf) {
int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);

expiredTimeSeconds = conf.getLong(PROMETHEUS_STATS_EXPIRED_SECONDS,
DEFAULT_PROMETHEUS_STATS_EXPIRED_SECONDS);

defaultStatsLoggerLabels.putIfAbsent(KOP_PROMETHEUS_STATS_CLUSTER,
conf.getString(KOP_PROMETHEUS_STATS_CLUSTER));

executor.scheduleAtFixedRate(() -> {
rotateLatencyCollection();
rotateLatencyCollectionAndExpire(expiredTimeSeconds);
}, 1, latencyRolloverSeconds, TimeUnit.SECONDS);

}
Expand Down Expand Up @@ -103,9 +112,9 @@ public String getStatsName(String... statsComponents) {
}

@VisibleForTesting
void rotateLatencyCollection() {
void rotateLatencyCollectionAndExpire(long expiredTimeSeconds) {
opStats.forEach((name, metric) -> {
metric.rotateLatencyCollection();
metric.rotateLatencyCollection(expiredTimeSeconds);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,20 @@ protected void onRemoval(LocalData value) {
}
};

public void record(DoublesUnion aggregateSuccess, DoublesUnion aggregateFail) {
map.keySet().forEach(key -> key.record(aggregateSuccess, aggregateFail));
public void recordAndCheckStatsExpire(DoublesUnion aggregateSuccess,
DoublesUnion aggregateFail,
long expireTimeMs) {
long currentTime = System.currentTimeMillis();

map.keySet().forEach(key -> {
// update stats
key.record(aggregateSuccess, aggregateFail);

// check if record expired.
if (currentTime - key.lastHasRecordTime() > expireTimeMs) {
map.remove(key);
}
});
}

public LocalData getLocalData() {
Expand Down

0 comments on commit 33d3c04

Please sign in to comment.