Skip to content

Commit

Permalink
[feat][broker] PIP-264: Add managed ledger cache metrics (apache#22898)
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor authored Jun 26, 2024
1 parent 243ad5a commit fe726db
Show file tree
Hide file tree
Showing 16 changed files with 544 additions and 51 deletions.
12 changes: 12 additions & 0 deletions managed-ledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-opentelemetry</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -120,6 +126,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,51 @@ public interface ManagedLedgerFactoryMXBean {
*/
double getCacheHitsRate();

/**
* Cumulative number of cache hits.
*/
long getCacheHitsTotal();

/**
* Get the number of cache misses per second.
*/
double getCacheMissesRate();

/**
* Cumulative number of cache misses.
*/
long getCacheMissesTotal();

/**
* Get the amount of data is retrieved from the cache in byte/s.
*/
double getCacheHitsThroughput();

/**
* Cumulative amount of data retrieved from the cache in bytes.
*/
long getCacheHitsBytesTotal();

/**
* Get the amount of data is retrieved from the bookkeeper in byte/s.
*/
double getCacheMissesThroughput();

/**
* Cumulative amount of data retrieved from the bookkeeper in bytes.
*/
long getCacheMissesBytesTotal();

/**
* Get the number of cache evictions during the last minute.
*/
long getNumberOfCacheEvictions();

/**
* Cumulative number of cache evictions.
*/
long getNumberOfCacheEvictionsTotal();

/**
* Cumulative number of entries inserted into the cache.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.apache.pulsar.opentelemetry.Constants;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType;

public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable {

// Replaces pulsar_ml_count
public static final String MANAGED_LEDGER_COUNTER = "pulsar.broker.managed_ledger.count";
private final ObservableLongMeasurement managedLedgerCounter;

// Replaces pulsar_ml_cache_evictions
public static final String CACHE_EVICTION_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.eviction.count";
private final ObservableLongMeasurement cacheEvictionOperationCounter;

// Replaces 'pulsar_ml_cache_entries',
// 'pulsar_ml_cache_inserted_entries_total',
// 'pulsar_ml_cache_evicted_entries_total'
public static final String CACHE_ENTRY_COUNTER = "pulsar.broker.managed_ledger.cache.entry.count";
private final ObservableLongMeasurement cacheEntryCounter;

// Replaces pulsar_ml_cache_used_size
public static final String CACHE_SIZE_COUNTER = "pulsar.broker.managed_ledger.cache.entry.size";
private final ObservableLongMeasurement cacheSizeCounter;

// Replaces pulsar_ml_cache_hits_rate, pulsar_ml_cache_misses_rate
public static final String CACHE_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.operation.count";
private final ObservableLongMeasurement cacheOperationCounter;

// Replaces pulsar_ml_cache_hits_throughput, pulsar_ml_cache_misses_throughput
public static final String CACHE_OPERATION_BYTES_COUNTER = "pulsar.broker.managed_ledger.cache.operation.size";
private final ObservableLongMeasurement cacheOperationBytesCounter;

// Replaces 'pulsar_ml_cache_pool_active_allocations',
// 'pulsar_ml_cache_pool_active_allocations_huge',
// 'pulsar_ml_cache_pool_active_allocations_normal',
// 'pulsar_ml_cache_pool_active_allocations_small'
public static final String CACHE_POOL_ACTIVE_ALLOCATION_COUNTER =
"pulsar.broker.managed_ledger.cache.pool.allocation.active.count";
private final ObservableLongMeasurement cachePoolActiveAllocationCounter;

// Replaces ['pulsar_ml_cache_pool_allocated', 'pulsar_ml_cache_pool_used']
public static final String CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER =
"pulsar.broker.managed_ledger.cache.pool.allocation.size";
private final ObservableLongMeasurement cachePoolActiveAllocationSizeCounter;

private final BatchCallback batchCallback;

public OpenTelemetryManagedLedgerCacheStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) {
var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);

managedLedgerCounter = meter
.upDownCounterBuilder(MANAGED_LEDGER_COUNTER)
.setUnit("{managed_ledger}")
.setDescription("The total number of managed ledgers.")
.buildObserver();

cacheEvictionOperationCounter = meter
.counterBuilder(CACHE_EVICTION_OPERATION_COUNTER)
.setUnit("{eviction}")
.setDescription("The total number of cache eviction operations.")
.buildObserver();

cacheEntryCounter = meter
.upDownCounterBuilder(CACHE_ENTRY_COUNTER)
.setUnit("{entry}")
.setDescription("The number of entries in the entry cache.")
.buildObserver();

cacheSizeCounter = meter
.upDownCounterBuilder(CACHE_SIZE_COUNTER)
.setUnit("{By}")
.setDescription("The byte amount of entries stored in the entry cache.")
.buildObserver();

cacheOperationCounter = meter
.counterBuilder(CACHE_OPERATION_COUNTER)
.setUnit("{entry}")
.setDescription("The number of cache operations.")
.buildObserver();

cacheOperationBytesCounter = meter
.counterBuilder(CACHE_OPERATION_BYTES_COUNTER)
.setUnit("{By}")
.setDescription("The byte amount of data retrieved from cache operations.")
.buildObserver();

cachePoolActiveAllocationCounter = meter
.upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_COUNTER)
.setUnit("{allocation}")
.setDescription("The number of currently active allocations in the direct arena.")
.buildObserver();

cachePoolActiveAllocationSizeCounter = meter
.upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER)
.setUnit("{By}")
.setDescription("The memory allocated in the direct arena.")
.buildObserver();


batchCallback = meter.batchCallback(() -> recordMetrics(factory),
managedLedgerCounter,
cacheEvictionOperationCounter,
cacheEntryCounter,
cacheSizeCounter,
cacheOperationCounter,
cacheOperationBytesCounter,
cachePoolActiveAllocationCounter,
cachePoolActiveAllocationSizeCounter);
}

@Override
public void close() {
batchCallback.close();
}

private void recordMetrics(ManagedLedgerFactoryImpl factory) {
var stats = factory.getCacheStats();

managedLedgerCounter.record(stats.getNumberOfManagedLedgers());
cacheEvictionOperationCounter.record(stats.getNumberOfCacheEvictionsTotal());

var entriesOut = stats.getCacheEvictedEntriesCount();
var entriesIn = stats.getCacheInsertedEntriesCount();
var entriesActive = entriesIn - entriesOut;
cacheEntryCounter.record(entriesActive, CacheEntryStatus.ACTIVE.attributes);
cacheEntryCounter.record(entriesIn, CacheEntryStatus.INSERTED.attributes);
cacheEntryCounter.record(entriesOut, CacheEntryStatus.EVICTED.attributes);
cacheSizeCounter.record(stats.getCacheUsedSize());

cacheOperationCounter.record(stats.getCacheHitsTotal(), CacheOperationStatus.HIT.attributes);
cacheOperationBytesCounter.record(stats.getCacheHitsBytesTotal(), CacheOperationStatus.HIT.attributes);
cacheOperationCounter.record(stats.getCacheMissesTotal(), CacheOperationStatus.MISS.attributes);
cacheOperationBytesCounter.record(stats.getCacheMissesBytesTotal(), CacheOperationStatus.MISS.attributes);

var allocatorStats = new PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR);
cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsSmall, PoolArenaType.SMALL.attributes);
cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsNormal,
PoolArenaType.NORMAL.attributes);
cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsHuge, PoolArenaType.HUGE.attributes);
cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalAllocated,
PoolChunkAllocationType.ALLOCATED.attributes);
cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalUsed, PoolChunkAllocationType.USED.attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo;
import org.apache.bookkeeper.mledger.MetadataCompressionConfig;
import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
Expand Down Expand Up @@ -118,6 +120,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private volatile long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;

private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;

//indicate whether shutdown() is called.
private volatile boolean closed;

Expand Down Expand Up @@ -149,7 +153,7 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfi
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE);
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
}

public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper)
Expand All @@ -168,21 +172,24 @@ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
config, NullStatsLogger.INSTANCE);
config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
}

public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
ManagedLedgerFactoryConfig config, StatsLogger statsLogger)
ManagedLedgerFactoryConfig config, StatsLogger statsLogger,
OpenTelemetry openTelemetry)
throws Exception {
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
config, statsLogger);
config, statsLogger, openTelemetry);
}

private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
boolean isBookkeeperManaged,
ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception {
ManagedLedgerFactoryConfig config,
StatsLogger statsLogger,
OpenTelemetry openTelemetry) throws Exception {
MetadataCompressionConfig compressionConfigForManagedLedgerInfo =
config.getCompressionConfigForManagedLedgerInfo();
MetadataCompressionConfig compressionConfigForManagedCursorInfo =
Expand Down Expand Up @@ -220,6 +227,8 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
closed = false;

metadataStore.registerSessionListener(this::handleMetadataStoreNotification);

openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
}

static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
Expand Down Expand Up @@ -611,6 +620,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}));
}).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
openTelemetryCacheStats.close();
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,51 @@ public double getCacheHitsRate() {
return cacheHits.getRate();
}

@Override
public long getCacheHitsTotal() {
return cacheHits.getTotalCount();
}

@Override
public double getCacheMissesRate() {
return cacheMisses.getRate();
}

@Override
public long getCacheMissesTotal() {
return cacheMisses.getTotalCount();
}

@Override
public double getCacheHitsThroughput() {
return cacheHits.getValueRate();
}

@Override
public long getCacheHitsBytesTotal() {
return cacheHits.getTotalValue();
}

@Override
public double getCacheMissesThroughput() {
return cacheMisses.getValueRate();
}

@Override
public long getCacheMissesBytesTotal() {
return cacheMisses.getTotalValue();
}

@Override
public long getNumberOfCacheEvictions() {
return cacheEvictions.getCount();
}

@Override
public long getNumberOfCacheEvictionsTotal() {
return cacheEvictions.getTotalCount();
}

public long getCacheInsertedEntriesCount() {
return insertedEntryCount.sum();
}
Expand Down
Loading

0 comments on commit fe726db

Please sign in to comment.