Skip to content

Commit

Permalink
emit metrics for groupby queries
Browse files Browse the repository at this point in the history
  • Loading branch information
findingrish committed Oct 16, 2024
1 parent 28fead5 commit 6fcfcf5
Show file tree
Hide file tree
Showing 35 changed files with 445 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand Down Expand Up @@ -373,15 +374,17 @@ public String getFormatString()
};

final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergePool, config);
new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider);
final GroupingEngine groupingEngine = new GroupingEngine(
druidProcessingConfig,
configSupplier,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
groupByStatsProvider
);

factory = new GroupByQueryRunnerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.planning.DataSourceAnalysis;
Expand Down Expand Up @@ -357,15 +358,17 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory(
bufferSupplier,
processingConfig.getNumMergeBuffers()
);
final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergeBufferPool);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergeBufferPool, config);
new GroupByResourcesReservationPool(mergeBufferPool, config, groupByStatsProvider);
final GroupingEngine groupingEngine = new GroupingEngine(
processingConfig,
configSupplier,
groupByResourcesReservationPool,
mapper,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
groupByStatsProvider
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool);
return new GroupByQueryRunnerFactory(groupingEngine, toolChest, bufferPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
Expand Down Expand Up @@ -490,15 +491,17 @@ public String getFormatString()
};

final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergePool, config);
new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider);
final GroupingEngine groupingEngine = new GroupingEngine(
druidProcessingConfig,
configSupplier,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
groupByStatsProvider
);

factory = new GroupByQueryRunnerFactory(
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ Metric monitoring is an essential part of Druid operations. The following monito
|`org.apache.druid.java.util.metrics.CgroupV2MemoryMonitor`| **EXPERIMENTAL** Reports memory usage from `memory.current` and `memory.max` files. Only applicable to `cgroupv2`.|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.|
|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical services. Available only on Historical services. Not to be used when lazy loading is configured.|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted. It also reports stats for group by queries.|
|`org.apache.druid.server.metrics.SubqueryCountStatsMonitor`|Reports how many subqueries have been materialized as rows or bytes and various other statistics related to the subquery execution|
|`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: <https://github.com/apache/druid/pull/4973>.|
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.|
Expand Down
9 changes: 9 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`mergeBuffer/acquiredCount`|Number of merge buffers acquired from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/acquisitionTimeNs`|Average time in nanos to acquire resource for group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |

### Historical

Expand All @@ -104,6 +107,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/acquiredCount`|Number of merge buffers acquired from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/acquisitionTimeNs`|Average time in nanos to acquire resource for group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |

### Real-time

Expand All @@ -120,6 +126,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/acquiredCount`|Number of merge buffers acquired from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/acquisitionTimeNs`|Average time in nanos to acquire resource for group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |

### Jetty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment;

import com.google.common.collect.ImmutableList;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
Expand All @@ -44,6 +45,7 @@
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand All @@ -70,8 +72,14 @@ public void setup() throws IOException
{
final IncrementalIndex incrementalIndex = MapVirtualColumnTestBase.generateIndex();
final GroupByQueryConfig config = new GroupByQueryConfig();

final BlockingPool<ByteBuffer> mergePool =
new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1);
final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool);

final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1), config);
new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider);

final GroupingEngine groupingEngine = new GroupingEngine(
new DruidProcessingConfig()
{
Expand Down Expand Up @@ -103,7 +111,8 @@ public int getNumThreads()
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new DefaultObjectMapper(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
groupByStatsProvider
);

final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;

import java.nio.ByteBuffer;

Expand Down Expand Up @@ -135,9 +136,10 @@ public BlockingPool<ByteBuffer> getMergeBufferPool(Task task, DruidProcessingCon
@Merging
public GroupByResourcesReservationPool getGroupByResourcesReservationPool(
@Merging BlockingPool<ByteBuffer> mergeBufferPool,
GroupByQueryConfig groupByQueryConfig
GroupByQueryConfig groupByQueryConfig,
GroupByStatsProvider groupByStatsProvider
)
{
return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig);
return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig, groupByStatsProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ public interface BlockingPool<T>
* @return count of pending requests
*/
long getPendingRequests();

/**
* @return number of used buffers from the pool
*/
long getUsedBufferCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public long getPendingRequests()
return pendingRequests.get();
}

@Override
public long getUsedBufferCount()
{
return maxSize - objects.size();
}

private List<T> pollObjects(int elementNum) throws InterruptedException
{
final List<T> list = new ArrayList<>(elementNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@ public long getPendingRequests()
{
return 0;
}

@Override
public long getUsedBufferCount()
{
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,30 @@ public class GroupByResourcesReservationPool
/**
* Map of query's resource id -> group by resources reserved for the query to execute
*/
final ConcurrentHashMap<QueryResourceId, AtomicReference<GroupByQueryResources>> pool = new ConcurrentHashMap<>();
private final ConcurrentHashMap<QueryResourceId, AtomicReference<GroupByQueryResources>> pool = new ConcurrentHashMap<>();

/**
* Buffer pool from where the merge buffers are picked and reserved
*/
final BlockingPool<ByteBuffer> mergeBufferPool;
private final BlockingPool<ByteBuffer> mergeBufferPool;

/**
* Group by query config of the server
*/
final GroupByQueryConfig groupByQueryConfig;
private final GroupByQueryConfig groupByQueryConfig;

private final GroupByStatsProvider groupByStatsProvider;

@Inject
public GroupByResourcesReservationPool(
@Merging BlockingPool<ByteBuffer> mergeBufferPool,
GroupByQueryConfig groupByQueryConfig
GroupByQueryConfig groupByQueryConfig,
GroupByStatsProvider groupByStatsProvider
)
{
this.mergeBufferPool = mergeBufferPool;
this.groupByQueryConfig = groupByQueryConfig;
this.groupByStatsProvider = groupByStatsProvider;
}

/**
Expand All @@ -114,6 +118,7 @@ public GroupByResourcesReservationPool(
*/
public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner)
{
long startNs = System.nanoTime();
if (queryResourceId == null) {
throw DruidException.defensive("Query resource id must be populated");
}
Expand Down Expand Up @@ -145,6 +150,8 @@ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery,
// Resources have been allocated, spot has been reserved. The reference would ALWAYS refer to 'null'. Refer the
// allocated resources from it
reference.compareAndSet(null, resources);

groupByStatsProvider.groupByResourceAcquisitionTimeNs(System.nanoTime() - startNs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.groupby;

import org.apache.druid.collections.BlockingPool;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage;

import javax.inject.Inject;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
* Collects stats for group by queries like used merged buffer count, spilled bytes and group by resource acquisition time.
*/
public class GroupByStatsProvider
{
private final AtomicLong groupByResourceAcquisitionTimeNs = new AtomicLong(0);
private final AtomicLong groupByResourceAcquisitionCount = new AtomicLong(0);

private final BlockingPool<ByteBuffer> blockingPool;
private final ConcurrentLinkedQueue<LimitedTemporaryStorage> temporaryStorages;

@Inject
public GroupByStatsProvider(@Merging BlockingPool<ByteBuffer> blockingPool)
{
this.blockingPool = blockingPool;
this.temporaryStorages = new ConcurrentLinkedQueue<>();
}

public synchronized void groupByResourceAcquisitionTimeNs(long delayNs)
{
groupByResourceAcquisitionTimeNs.addAndGet(delayNs);
groupByResourceAcquisitionCount.incrementAndGet();
}

public synchronized long getAndResetGroupByResourceAcquisitionStats()
{
long average = (groupByResourceAcquisitionTimeNs.get() / groupByResourceAcquisitionCount.get());

groupByResourceAcquisitionTimeNs.set(0);
groupByResourceAcquisitionCount.set(0);

return average;
}

public long getAcquiredMergeBufferCount()
{
return blockingPool.getUsedBufferCount();
}

public void registerTemporaryStorage(LimitedTemporaryStorage temporaryStorage)
{
temporaryStorages.add(temporaryStorage);
}

public long getSpilledBytes()
{
long spilledBytes = 0;

Iterator<LimitedTemporaryStorage> iterator = temporaryStorages.iterator();

while (iterator.hasNext()) {
LimitedTemporaryStorage limitedTemporaryStorage = iterator.next();

spilledBytes += limitedTemporaryStorage.currentSize();

if (limitedTemporaryStorage.isClosed()) {
iterator.remove();
}
}

return spilledBytes;
}
}
Loading

0 comments on commit 6fcfcf5

Please sign in to comment.