From 6fcfcf5df4c1f16b21872f36d762e6c5a5cffc52 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Wed, 16 Oct 2024 10:46:50 +0530 Subject: [PATCH] emit metrics for groupby queries --- .../GroupByTypeInterfaceBenchmark.java | 7 +- .../CachingClusteredClientBenchmark.java | 7 +- .../benchmark/query/GroupByBenchmark.java | 7 +- docs/configuration/index.md | 2 +- docs/operations/metrics.md | 9 ++ .../segment/MapVirtualColumnGroupByTest.java | 13 ++- .../druid/guice/PeonProcessingModule.java | 6 +- .../druid/collections/BlockingPool.java | 5 + .../collections/DefaultBlockingPool.java | 6 ++ .../druid/collections/DummyBlockingPool.java | 6 ++ .../GroupByResourcesReservationPool.java | 15 ++- .../query/groupby/GroupByStatsProvider.java | 94 +++++++++++++++++++ .../druid/query/groupby/GroupingEngine.java | 21 +++-- .../GroupByMergingQueryRunner.java | 8 +- .../epinephelinae/GroupByRowProcessor.java | 6 +- .../LimitedTemporaryStorage.java | 7 +- .../apache/druid/query/TestBufferPool.java | 6 ++ ...ByLimitPushDownInsufficientBufferTest.java | 20 ++-- ...roupByLimitPushDownMultiNodeMergeTest.java | 25 +++-- .../groupby/GroupByMultiSegmentTest.java | 7 +- .../groupby/GroupByQueryMergeBufferTest.java | 6 +- .../GroupByQueryQueryToolChestTest.java | 7 +- .../GroupByQueryRunnerFailureTest.java | 8 +- .../query/groupby/GroupByQueryRunnerTest.java | 7 +- .../GroupByResourcesReservationPoolTest.java | 17 ++-- .../groupby/GroupByStatsProviderTest.java | 89 ++++++++++++++++++ .../groupby/NestedQueryPushDownTest.java | 13 ++- .../groupby/UnnestGroupByQueryRunnerTest.java | 6 +- .../epinephelinae/ConcurrentGrouperTest.java | 4 +- .../druid/segment/CursorHolderPreaggTest.java | 20 ++-- .../druid/guice/BrokerProcessingModule.java | 6 +- .../druid/guice/DruidProcessingModule.java | 6 +- .../druid/guice/RouterProcessingModule.java | 6 +- .../metrics/QueryCountStatsMonitor.java | 21 ++++- .../metrics/QueryCountStatsMonitorTest.java | 36 ++++++- 35 files changed, 445 insertions(+), 84 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java create mode 100644 processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java index bbff131e8671..104ff153aee8 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -55,6 +55,7 @@ import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; @@ -373,15 +374,17 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool); final GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(mergePool, config); + new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider); final GroupingEngine groupingEngine = new GroupingEngine( druidProcessingConfig, configSupplier, groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - QueryBenchmarkUtil.NOOP_QUERYWATCHER + QueryBenchmarkUtil.NOOP_QUERYWATCHER, + groupByStatsProvider ); factory = new GroupByQueryRunnerFactory( diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index 8e0715e0fe5c..82bd207781cf 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -81,6 +81,7 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.planning.DataSourceAnalysis; @@ -357,15 +358,17 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory( bufferSupplier, processingConfig.getNumMergeBuffers() ); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergeBufferPool); final GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(mergeBufferPool, config); + new GroupByResourcesReservationPool(mergeBufferPool, config, groupByStatsProvider); final GroupingEngine groupingEngine = new GroupingEngine( processingConfig, configSupplier, groupByResourcesReservationPool, mapper, mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + groupByStatsProvider ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool); return new GroupByQueryRunnerFactory(groupingEngine, toolChest, bufferPool); diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java index 5ab19b6235f7..0216f28ddd73 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java @@ -66,6 +66,7 @@ import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; @@ -490,15 +491,17 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool); final GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(mergePool, config); + new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider); final GroupingEngine groupingEngine = new GroupingEngine( druidProcessingConfig, configSupplier, groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - QueryBenchmarkUtil.NOOP_QUERYWATCHER + QueryBenchmarkUtil.NOOP_QUERYWATCHER, + groupByStatsProvider ); factory = new GroupByQueryRunnerFactory( diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 427c8b8d16b9..6684473961ee 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -401,7 +401,7 @@ Metric monitoring is an essential part of Druid operations. The following monito |`org.apache.druid.java.util.metrics.CgroupV2MemoryMonitor`| **EXPERIMENTAL** Reports memory usage from `memory.current` and `memory.max` files. Only applicable to `cgroupv2`.| |`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.| |`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical services. Available only on Historical services. Not to be used when lazy loading is configured.| -|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.| +|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted. It also reports stats for group by queries.| |`org.apache.druid.server.metrics.SubqueryCountStatsMonitor`|Reports how many subqueries have been materialized as rows or bytes and various other statistics related to the subquery execution| |`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: .| |`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.| diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 4df9e7987ccc..0f6837bc2f1e 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -86,6 +86,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | |`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | +|`mergeBuffer/acquiredCount`|Number of merge buffers acquired from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`groupBy/acquisitionTimeNs`|Average time in nanos to acquire resource for group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | ### Historical @@ -104,6 +107,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| +|`mergeBuffer/acquiredCount`|Number of merge buffers acquired from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`groupBy/acquisitionTimeNs`|Average time in nanos to acquire resource for group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | ### Real-time @@ -120,6 +126,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| +|`mergeBuffer/acquiredCount`|Number of merge buffers acquired from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`groupBy/acquisitionTimeNs`|Average time in nanos to acquire resource for group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | ### Jetty diff --git a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java index c1fed4bc5034..598af0b96b17 100644 --- a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java +++ b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment; import com.google.common.collect.ImmutableList; +import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.DefaultBlockingPool; import org.apache.druid.collections.StupidPool; import org.apache.druid.common.config.NullHandling; @@ -44,6 +45,7 @@ import org.apache.druid.query.groupby.GroupByQueryQueryToolChest; import org.apache.druid.query.groupby.GroupByQueryRunnerFactory; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; @@ -70,8 +72,14 @@ public void setup() throws IOException { final IncrementalIndex incrementalIndex = MapVirtualColumnTestBase.generateIndex(); final GroupByQueryConfig config = new GroupByQueryConfig(); + + final BlockingPool mergePool = + new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool); + final GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1), config); + new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider); + final GroupingEngine groupingEngine = new GroupingEngine( new DruidProcessingConfig() { @@ -103,7 +111,8 @@ public int getNumThreads() groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new DefaultObjectMapper(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + groupByStatsProvider ); final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( diff --git a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java index 8961da7a5559..35e520789ef4 100644 --- a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java +++ b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java @@ -42,6 +42,7 @@ import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupByStatsProvider; import java.nio.ByteBuffer; @@ -135,9 +136,10 @@ public BlockingPool getMergeBufferPool(Task task, DruidProcessingCon @Merging public GroupByResourcesReservationPool getGroupByResourcesReservationPool( @Merging BlockingPool mergeBufferPool, - GroupByQueryConfig groupByQueryConfig + GroupByQueryConfig groupByQueryConfig, + GroupByStatsProvider groupByStatsProvider ) { - return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig); + return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig, groupByStatsProvider); } } diff --git a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java index 4fb3ff66d8bf..a3e6123338df 100644 --- a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java @@ -49,4 +49,9 @@ public interface BlockingPool * @return count of pending requests */ long getPendingRequests(); + + /** + * @return number of used buffers from the pool + */ + long getUsedBufferCount(); } diff --git a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java index e41a9e5d75d4..4002230caf01 100644 --- a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java @@ -129,6 +129,12 @@ public long getPendingRequests() return pendingRequests.get(); } + @Override + public long getUsedBufferCount() + { + return maxSize - objects.size(); + } + private List pollObjects(int elementNum) throws InterruptedException { final List list = new ArrayList<>(elementNum); diff --git a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java index 2553f9ab425f..228ca5bca78b 100644 --- a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java @@ -61,4 +61,10 @@ public long getPendingRequests() { return 0; } + + @Override + public long getUsedBufferCount() + { + return 0; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java index 9c65ce445b1b..67c6216a80c4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java @@ -86,26 +86,30 @@ public class GroupByResourcesReservationPool /** * Map of query's resource id -> group by resources reserved for the query to execute */ - final ConcurrentHashMap> pool = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> pool = new ConcurrentHashMap<>(); /** * Buffer pool from where the merge buffers are picked and reserved */ - final BlockingPool mergeBufferPool; + private final BlockingPool mergeBufferPool; /** * Group by query config of the server */ - final GroupByQueryConfig groupByQueryConfig; + private final GroupByQueryConfig groupByQueryConfig; + + private final GroupByStatsProvider groupByStatsProvider; @Inject public GroupByResourcesReservationPool( @Merging BlockingPool mergeBufferPool, - GroupByQueryConfig groupByQueryConfig + GroupByQueryConfig groupByQueryConfig, + GroupByStatsProvider groupByStatsProvider ) { this.mergeBufferPool = mergeBufferPool; this.groupByQueryConfig = groupByQueryConfig; + this.groupByStatsProvider = groupByStatsProvider; } /** @@ -114,6 +118,7 @@ public GroupByResourcesReservationPool( */ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner) { + long startNs = System.nanoTime(); if (queryResourceId == null) { throw DruidException.defensive("Query resource id must be populated"); } @@ -145,6 +150,8 @@ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, // Resources have been allocated, spot has been reserved. The reference would ALWAYS refer to 'null'. Refer the // allocated resources from it reference.compareAndSet(null, resources); + + groupByStatsProvider.groupByResourceAcquisitionTimeNs(System.nanoTime() - startNs); } /** diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java new file mode 100644 index 000000000000..c5b5cad5afd3 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.guice.annotations.Merging; +import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage; + +import javax.inject.Inject; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Collects stats for group by queries like used merged buffer count, spilled bytes and group by resource acquisition time. + */ +public class GroupByStatsProvider +{ + private final AtomicLong groupByResourceAcquisitionTimeNs = new AtomicLong(0); + private final AtomicLong groupByResourceAcquisitionCount = new AtomicLong(0); + + private final BlockingPool blockingPool; + private final ConcurrentLinkedQueue temporaryStorages; + + @Inject + public GroupByStatsProvider(@Merging BlockingPool blockingPool) + { + this.blockingPool = blockingPool; + this.temporaryStorages = new ConcurrentLinkedQueue<>(); + } + + public synchronized void groupByResourceAcquisitionTimeNs(long delayNs) + { + groupByResourceAcquisitionTimeNs.addAndGet(delayNs); + groupByResourceAcquisitionCount.incrementAndGet(); + } + + public synchronized long getAndResetGroupByResourceAcquisitionStats() + { + long average = (groupByResourceAcquisitionTimeNs.get() / groupByResourceAcquisitionCount.get()); + + groupByResourceAcquisitionTimeNs.set(0); + groupByResourceAcquisitionCount.set(0); + + return average; + } + + public long getAcquiredMergeBufferCount() + { + return blockingPool.getUsedBufferCount(); + } + + public void registerTemporaryStorage(LimitedTemporaryStorage temporaryStorage) + { + temporaryStorages.add(temporaryStorage); + } + + public long getSpilledBytes() + { + long spilledBytes = 0; + + Iterator iterator = temporaryStorages.iterator(); + + while (iterator.hasNext()) { + LimitedTemporaryStorage limitedTemporaryStorage = iterator.next(); + + spilledBytes += limitedTemporaryStorage.currentSize(); + + if (limitedTemporaryStorage.isClosed()) { + iterator.remove(); + } + } + + return spilledBytes; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index 67583fc1fc0a..3c4de063fdb5 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -121,6 +121,7 @@ public class GroupingEngine private final ObjectMapper jsonMapper; private final ObjectMapper spillMapper; private final QueryWatcher queryWatcher; + private final GroupByStatsProvider groupByStatsProvider; @Inject public GroupingEngine( @@ -129,7 +130,8 @@ public GroupingEngine( @Merging GroupByResourcesReservationPool groupByResourcesReservationPool, @Json ObjectMapper jsonMapper, @Smile ObjectMapper spillMapper, - QueryWatcher queryWatcher + QueryWatcher queryWatcher, + GroupByStatsProvider groupByStatsProvider ) { this.processingConfig = processingConfig; @@ -138,6 +140,7 @@ public GroupingEngine( this.jsonMapper = jsonMapper; this.spillMapper = spillMapper; this.queryWatcher = queryWatcher; + this.groupByStatsProvider = groupByStatsProvider; } /** @@ -452,7 +455,8 @@ public QueryRunner mergeRunners( processingConfig.getNumThreads(), processingConfig.intermediateComputeSizeBytes(), spillMapper, - processingConfig.getTmpDir() + processingConfig.getTmpDir(), + groupByStatsProvider ); } @@ -575,7 +579,7 @@ public Sequence applyPostProcessing(Sequence results, Grou * * @param subquery inner query * @param query outer query - * @param resource resources returned by {@link #prepareResource(GroupByQuery, BlockingPool, boolean, GroupByQueryConfig)} + * @param resource resources returned by {@link #prepareResource(GroupByQuery, BlockingPool, boolean, GroupByQueryConfig, GroupByStatsProvider)} * @param subqueryResult result rows from the subquery * @param wasQueryPushedDown true if the outer query was pushed down (so we only need to merge the outer query's * results, not run it from scratch like a normal outer query) @@ -614,7 +618,8 @@ public Sequence processSubqueryResult( resource, spillMapper, processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes() + processingConfig.intermediateComputeSizeBytes(), + groupByStatsProvider ); final GroupByRowProcessor.ResultSupplier finalResultSupplier = resultSupplier; @@ -636,7 +641,7 @@ public Sequence processSubqueryResult( * Called by {@link GroupByQueryQueryToolChest#mergeResults(QueryRunner)} when it needs to generate subtotals. * * @param query query that has a "subtotalsSpec" - * @param resource resources returned by {@link #prepareResource(GroupByQuery, BlockingPool, boolean, GroupByQueryConfig)} + * @param resource resources returned by {@link #prepareResource(GroupByQuery, BlockingPool, boolean, GroupByQueryConfig, GroupByStatsProvider)} * @param queryResult result rows from the main query * * @return results for each list of subtotals in the query, concatenated together @@ -695,7 +700,8 @@ public Sequence processSubtotalsSpec( resource, spillMapper, processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes() + processingConfig.intermediateComputeSizeBytes(), + groupByStatsProvider ); List queryDimNamesInOrder = baseSubtotalQuery.getDimensionNamesInOrder(); @@ -757,7 +763,8 @@ public Sequence processSubtotalsSpec( resource, spillMapper, processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes() + processingConfig.intermediateComputeSizeBytes(), + groupByStatsProvider ); subtotalsResults.add( diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java index 75e8fa2c2ce0..e2cc324908b4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java @@ -59,6 +59,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryResources; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; @@ -103,6 +104,7 @@ public class GroupByMergingQueryRunner implements QueryRunner private final ObjectMapper spillMapper; private final String processingTmpDir; private final int mergeBufferSize; + private final GroupByStatsProvider groupByStatsProvider; public GroupByMergingQueryRunner( GroupByQueryConfig config, @@ -114,7 +116,8 @@ public GroupByMergingQueryRunner( int concurrencyHint, int mergeBufferSize, ObjectMapper spillMapper, - String processingTmpDir + String processingTmpDir, + GroupByStatsProvider groupByStatsProvider ) { this.config = config; @@ -127,6 +130,7 @@ public GroupByMergingQueryRunner( this.spillMapper = spillMapper; this.processingTmpDir = processingTmpDir; this.mergeBufferSize = mergeBufferSize; + this.groupByStatsProvider = groupByStatsProvider; } @Override @@ -188,6 +192,8 @@ public CloseableGrouperIterator make() ReferenceCountingResourceHolder.fromCloseable(temporaryStorage); resources.register(temporaryStorageHolder); + groupByStatsProvider.registerTemporaryStorage(temporaryStorage); + // If parallelCombine is enabled, we need two merge buffers for parallel aggregating and parallel combining final int numMergeBuffers = querySpecificConfig.getNumParallelCombineThreads() > 1 ? 2 : 1; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index c040c5b64658..2980705e4ad1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -34,6 +34,7 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryResources; +import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; @@ -93,7 +94,8 @@ public static ResultSupplier process( final GroupByQueryResources resource, final ObjectMapper spillMapper, final String processingTmpDir, - final int mergeBufferSize + final int mergeBufferSize, + final GroupByStatsProvider groupByStatsProvider ) { final Closer closeOnExit = Closer.create(); @@ -111,6 +113,8 @@ public static ResultSupplier process( closeOnExit.register(temporaryStorage); + groupByStatsProvider.registerTemporaryStorage(temporaryStorage); + Pair, Accumulator> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( query, subquery, diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java index 310c7ef49166..476f88340cf1 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedTemporaryStorage.java @@ -121,7 +121,7 @@ public long maxSize() } @VisibleForTesting - long currentSize() + public long currentSize() { return bytesUsed.get(); } @@ -144,6 +144,11 @@ public void close() } } + public boolean isClosed() + { + return closed; + } + public class LimitedOutputStream extends OutputStream { private final File file; diff --git a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java index a650437f83f0..acda9b1af1e4 100644 --- a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java +++ b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java @@ -138,6 +138,12 @@ public long getPendingRequests() return 0; } + @Override + public long getUsedBufferCount() + { + return takenFromMap.size(); + } + public long getOutstandingObjectCount() { return takenFromMap.size(); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java index 1eb8774c207f..10b4b137efa0 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java @@ -325,13 +325,19 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); - GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool( + + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool); + final GroupByStatsProvider tooSmallGroupByStatsProvider = new GroupByStatsProvider(tooSmallMergePool); + + final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool( mergePool, - config + config, + groupByStatsProvider ); - GroupByResourcesReservationPool tooSmallGroupByResourcesReservationPool = new GroupByResourcesReservationPool( + final GroupByResourcesReservationPool tooSmallGroupByResourcesReservationPool = new GroupByResourcesReservationPool( tooSmallMergePool, - config + config, + tooSmallGroupByStatsProvider ); final GroupingEngine groupingEngine = new GroupingEngine( druidProcessingConfig, @@ -339,7 +345,8 @@ public String getFormatString() groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER + NOOP_QUERYWATCHER, + groupByStatsProvider ); final GroupingEngine tooSmallEngine = new GroupingEngine( @@ -348,7 +355,8 @@ public String getFormatString() tooSmallGroupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER + NOOP_QUERYWATCHER, + groupByStatsProvider ); groupByFactory = new GroupByQueryRunnerFactory( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java index 987d9a03f2dc..240baf79c1fe 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java @@ -578,12 +578,16 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); - GroupByResourcesReservationPool groupByResourcesReservationPoolBroker = - new GroupByResourcesReservationPool(mergePoolBroker, config); - GroupByResourcesReservationPool groupByResourcesReservationPoolHistorical = - new GroupByResourcesReservationPool(mergePoolHistorical, config); - GroupByResourcesReservationPool groupByResourcesReservationPoolHistorical2 = - new GroupByResourcesReservationPool(mergePoolHistorical2, config); + final GroupByStatsProvider groupByStatsProviderBroker = new GroupByStatsProvider(mergePoolBroker); + final GroupByStatsProvider groupByStatsProviderHistorical = new GroupByStatsProvider(mergePoolHistorical); + final GroupByStatsProvider groupByStatsProviderHistorical2 = new GroupByStatsProvider(mergePoolHistorical2); + + final GroupByResourcesReservationPool groupByResourcesReservationPoolBroker = + new GroupByResourcesReservationPool(mergePoolBroker, config, groupByStatsProviderBroker); + final GroupByResourcesReservationPool groupByResourcesReservationPoolHistorical = + new GroupByResourcesReservationPool(mergePoolHistorical, config, groupByStatsProviderHistorical); + final GroupByResourcesReservationPool groupByResourcesReservationPoolHistorical2 = + new GroupByResourcesReservationPool(mergePoolHistorical2, config, groupByStatsProviderHistorical2); final GroupingEngine groupingEngineBroker = new GroupingEngine( druidProcessingConfig, @@ -591,7 +595,8 @@ public String getFormatString() groupByResourcesReservationPoolBroker, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER + NOOP_QUERYWATCHER, + groupByStatsProviderBroker ); final GroupingEngine groupingEngineHistorical = new GroupingEngine( druidProcessingConfig, @@ -599,7 +604,8 @@ public String getFormatString() groupByResourcesReservationPoolHistorical, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER + NOOP_QUERYWATCHER, + groupByStatsProviderHistorical ); final GroupingEngine groupingEngineHistorical2 = new GroupingEngine( druidProcessingConfig, @@ -607,7 +613,8 @@ public String getFormatString() groupByResourcesReservationPoolHistorical2, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER + NOOP_QUERYWATCHER, + groupByStatsProviderHistorical2 ); groupByFactoryBroker = new GroupByQueryRunnerFactory( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java index 9632ceba0b17..f8240eb84291 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java @@ -241,14 +241,17 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); - final GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(mergePool, config); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool); + final GroupByResourcesReservationPool groupByResourcesReservationPool = + new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider); final GroupingEngine groupingEngine = new GroupingEngine( druidProcessingConfig, configSupplier, groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER + NOOP_QUERYWATCHER, + groupByStatsProvider ); groupByFactory = new GroupByQueryRunnerFactory( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java index 77f116aa6e40..e4f00d61aa8b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -124,8 +124,9 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( ) { final Supplier configSupplier = Suppliers.ofInstance(config); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(MERGE_BUFFER_POOL); final GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(MERGE_BUFFER_POOL, config); + new GroupByResourcesReservationPool(MERGE_BUFFER_POOL, config, groupByStatsProvider); final GroupingEngine groupingEngine = new GroupingEngine( PROCESSING_CONFIG, @@ -133,7 +134,8 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( groupByResourcesReservationPool, TestHelper.makeJsonMapper(), mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + groupByStatsProvider ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( groupingEngine, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index 15c127c68cbb..ecb8f620a944 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -1295,14 +1295,17 @@ public String getFormatString() bufferSupplier, processingConfig.getNumMergeBuffers() ); - GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(mergeBufferPool, queryConfig); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergeBufferPool); + final GroupByResourcesReservationPool groupByResourcesReservationPool = + new GroupByResourcesReservationPool(mergeBufferPool, queryConfig, groupByStatsProvider); final GroupingEngine groupingEngine = new GroupingEngine( processingConfig, queryConfigSupplier, groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - QueryRunnerTestHelper.NOOP_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + groupByStatsProvider ); final GroupByQueryQueryToolChest queryToolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool); final ObjectMapper mapper = TestHelper.makeJsonMapper(); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java index a3eb5ef724d5..2fc27e7d8041 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -97,15 +97,17 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( ) { final Supplier configSupplier = Suppliers.ofInstance(config); - GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(MERGE_BUFFER_POOL, config); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(MERGE_BUFFER_POOL); + final GroupByResourcesReservationPool groupByResourcesReservationPool = + new GroupByResourcesReservationPool(MERGE_BUFFER_POOL, config, groupByStatsProvider); final GroupingEngine groupingEngine = new GroupingEngine( DEFAULT_PROCESSING_CONFIG, configSupplier, groupByResourcesReservationPool, TestHelper.makeJsonMapper(), mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + groupByStatsProvider ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index cc36d0009570..7e5c4d38be9a 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -231,6 +231,7 @@ public String toString() return "v2"; } }; + final GroupByQueryConfig v2SmallBufferConfig = new GroupByQueryConfig() { @@ -356,15 +357,17 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( ); } final Supplier configSupplier = Suppliers.ofInstance(config); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(bufferPools.getMergePool()); final GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(bufferPools.getMergePool(), config); + new GroupByResourcesReservationPool(bufferPools.getMergePool(), config, groupByStatsProvider); final GroupingEngine groupingEngine = new GroupingEngine( processingConfig, configSupplier, groupByResourcesReservationPool, mapper, mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + groupByStatsProvider ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( groupingEngine, diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java index 8d0f2d9e37dc..5ad14c81f875 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByResourcesReservationPoolTest.java @@ -93,8 +93,11 @@ public void testInterleavedReserveAndRemove() // Blocking pool with a single buffer, which means only one of the queries can succeed at a time BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1); - GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(mergeBufferPool, CONFIG); + + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergeBufferPool); + + final GroupByResourcesReservationPool groupByResourcesReservationPool = + new GroupByResourcesReservationPool(mergeBufferPool, CONFIG, groupByStatsProvider); // Latch indicating that the first thread has called reservationPool.reserve() CountDownLatch reserveCalledByFirstThread = new CountDownLatch(1); @@ -191,8 +194,9 @@ public boolean equals(Object o) public void testMultipleSimultaneousAllocationAttemptsFail() { BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1); - GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(mergeBufferPool, CONFIG); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergeBufferPool); + final GroupByResourcesReservationPool groupByResourcesReservationPool = + new GroupByResourcesReservationPool(mergeBufferPool, CONFIG, groupByStatsProvider); QueryResourceId queryResourceId = new QueryResourceId("test-id"); groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true); @@ -207,8 +211,9 @@ public void testMultipleSimultaneousAllocationAttemptsFail() public void testMultipleSequentialAllocationAttemptsSucceed() { BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 1); - GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(mergeBufferPool, CONFIG); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergeBufferPool); + final GroupByResourcesReservationPool groupByResourcesReservationPool = + new GroupByResourcesReservationPool(mergeBufferPool, CONFIG, groupByStatsProvider); QueryResourceId queryResourceId = new QueryResourceId("test-id"); groupByResourcesReservationPool.reserve(queryResourceId, QUERY, true); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java new file mode 100644 index 000000000000..3e72015a8485 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.groupby; + +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.collections.DefaultBlockingPool; +import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class GroupByStatsProviderTest +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testMergeBufferCount() + { + BlockingPool mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(100), 10); + + GroupByStatsProvider statsProvider = new GroupByStatsProvider(mergeBufferPool); + mergeBufferPool.takeBatch(5); + + Assert.assertEquals(5, statsProvider.getAcquiredMergeBufferCount()); + } + + @Test + public void testMergeBufferAcquisitionTime() + { + GroupByStatsProvider statsProvider = new GroupByStatsProvider(null); + + statsProvider.groupByResourceAcquisitionTimeNs(100); + statsProvider.groupByResourceAcquisitionTimeNs(300); + + Assert.assertEquals(200, statsProvider.getAndResetGroupByResourceAcquisitionStats()); + } + + @Test + public void testSpilledBytes() throws IOException + { + GroupByStatsProvider statsProvider = new GroupByStatsProvider(null); + + LimitedTemporaryStorage temporaryStorage1 = + new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024); + LimitedTemporaryStorage temporaryStorage2 = + new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024); + + statsProvider.registerTemporaryStorage(temporaryStorage1); + statsProvider.registerTemporaryStorage(temporaryStorage2); + + LimitedTemporaryStorage.LimitedOutputStream outputStream1 = temporaryStorage1.createFile(); + outputStream1.write(5); + outputStream1.flush(); + + LimitedTemporaryStorage.LimitedOutputStream outputStream2 = temporaryStorage2.createFile(); + outputStream2.write(8); + outputStream2.flush(); + + Assert.assertEquals(2, statsProvider.getSpilledBytes()); + + temporaryStorage1.close(); + temporaryStorage2.close(); + + Assert.assertEquals(2, statsProvider.getSpilledBytes()); + Assert.assertEquals(0, statsProvider.getSpilledBytes()); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java index a28e782bf654..f1624bc47775 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java @@ -287,15 +287,19 @@ public String getFormatString() }; final Supplier configSupplier = Suppliers.ofInstance(config); - GroupByResourcesReservationPool groupByResourcesReservationPool = new GroupByResourcesReservationPool(mergePool, config); - GroupByResourcesReservationPool groupByResourcesReservationPool2 = new GroupByResourcesReservationPool(mergePool2, config); + final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool); + final GroupByResourcesReservationPool groupByResourcesReservationPool = + new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider); + final GroupByResourcesReservationPool groupByResourcesReservationPool2 = + new GroupByResourcesReservationPool(mergePool2, config, groupByStatsProvider); final GroupingEngine engine1 = new GroupingEngine( druidProcessingConfig, configSupplier, groupByResourcesReservationPool, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER + NOOP_QUERYWATCHER, + groupByStatsProvider ); final GroupingEngine engine2 = new GroupingEngine( druidProcessingConfig, @@ -303,7 +307,8 @@ public String getFormatString() groupByResourcesReservationPool2, TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), - NOOP_QUERYWATCHER + NOOP_QUERYWATCHER, + groupByStatsProvider ); groupByFactory = new GroupByQueryRunnerFactory( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java index 02bae02eb6f0..2bd9d73b3154 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java @@ -173,15 +173,17 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( ); } final Supplier configSupplier = Suppliers.ofInstance(config); + GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(bufferPools.getMergePool()); GroupByResourcesReservationPool groupByResourcesReservationPool = - new GroupByResourcesReservationPool(bufferPools.getMergePool(), config); + new GroupByResourcesReservationPool(bufferPools.getMergePool(), config, groupByStatsProvider); final GroupingEngine groupingEngine = new GroupingEngine( processingConfig, configSupplier, groupByResourcesReservationPool, TestHelper.makeJsonMapper(), mapper, - QueryRunnerTestHelper.NOOP_QUERYWATCHER + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + groupByStatsProvider ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java index 6ab2f1a7bf0b..ad8162a63b21 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java @@ -88,8 +88,8 @@ public static Collection constructorFeeder() final List constructors = new ArrayList<>(); for (final int bufferSize : new int[]{1024, 1024 * 32, 1024 * 1024}) { - for (final int concurrencyHint : new int[]{1, 8}) { - for (final int parallelCombineThreads : new int[]{0, 8}) { + for (final int concurrencyHint : new int[]{8}) { + for (final int parallelCombineThreads : new int[]{8}) { for (final boolean mergeThreadLocal : new boolean[]{true, false}) { if (parallelCombineThreads <= concurrencyHint) { constructors.add(new Object[]{bufferSize, concurrencyHint, parallelCombineThreads, mergeThreadLocal}); diff --git a/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java b/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java index 52c129379fc5..5d866d8e60e0 100644 --- a/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java +++ b/processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java @@ -34,6 +34,7 @@ import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.timeseries.TimeseriesQuery; @@ -84,22 +85,25 @@ public void setup() ); topNQueryEngine = new TopNQueryEngine(bufferPool); timeseriesQueryEngine = new TimeseriesQueryEngine(bufferPool); + CloseableDefaultBlockingPool mergePool = + new CloseableDefaultBlockingPool<>( + () -> ByteBuffer.allocate(50000), + 4 + ); + GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool); groupingEngine = new GroupingEngine( new DruidProcessingConfig(), GroupByQueryConfig::new, new GroupByResourcesReservationPool( - closer.closeLater( - new CloseableDefaultBlockingPool<>( - () -> ByteBuffer.allocate(50000), - 4 - ) - ), - new GroupByQueryConfig() + closer.closeLater(mergePool), + new GroupByQueryConfig(), + groupByStatsProvider ), TestHelper.makeJsonMapper(), TestHelper.makeSmileMapper(), (query, future) -> { - } + }, + groupByStatsProvider ); this.cursorFactory = new CursorFactory() diff --git a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java index bc12e929219e..68d0f7c297c6 100644 --- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java @@ -44,6 +44,7 @@ import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.utils.JvmUtils; import java.nio.ByteBuffer; @@ -118,10 +119,11 @@ public BlockingPool getMergeBufferPool(DruidProcessingConfig config) @Merging public GroupByResourcesReservationPool getGroupByResourcesReservationPool( @Merging BlockingPool mergeBufferPool, - GroupByQueryConfig groupByQueryConfig + GroupByQueryConfig groupByQueryConfig, + GroupByStatsProvider groupByStatsProvider ) { - return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig); + return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig, groupByStatsProvider); } @Provides diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java index 4879b5cd3c7b..d5d03b7332d6 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java @@ -48,6 +48,7 @@ import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupByStatsProvider; import org.apache.druid.server.metrics.MetricsModule; import org.apache.druid.utils.JvmUtils; @@ -112,10 +113,11 @@ public BlockingPool getMergeBufferPool(DruidProcessingConfig config) @Merging public GroupByResourcesReservationPool getGroupByResourcesReservationPool( @Merging BlockingPool mergeBufferPool, - GroupByQueryConfig groupByQueryConfig + GroupByQueryConfig groupByQueryConfig, + GroupByStatsProvider groupByStatsProvider ) { - return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig); + return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig, groupByStatsProvider); } public static void registerConfigsAndMonitor(Binder binder) diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java index 85357a7fa04f..a34e4a58a17e 100644 --- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java @@ -34,6 +34,7 @@ import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupByStatsProvider; import java.nio.ByteBuffer; @@ -96,9 +97,10 @@ public BlockingPool getMergeBufferPool(DruidProcessingConfig config) @Merging public GroupByResourcesReservationPool getGroupByResourcesReservationPool( @Merging BlockingPool mergeBufferPool, - GroupByQueryConfig groupByQueryConfig + GroupByQueryConfig groupByQueryConfig, + GroupByStatsProvider groupByStatsProvider ) { - return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig); + return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig, groupByStatsProvider); } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java index ce951d5933f7..b57cf7880755 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; import org.apache.druid.java.util.metrics.KeyedDiff; +import org.apache.druid.query.groupby.GroupByStatsProvider; import java.nio.ByteBuffer; import java.util.Map; @@ -35,15 +36,18 @@ public class QueryCountStatsMonitor extends AbstractMonitor { private final KeyedDiff keyedDiff = new KeyedDiff(); private final QueryCountStatsProvider statsProvider; + private final GroupByStatsProvider groupByStatsProvider; private final BlockingPool mergeBufferPool; @Inject public QueryCountStatsMonitor( QueryCountStatsProvider statsProvider, + GroupByStatsProvider groupByStatsProvider, @Merging BlockingPool mergeBufferPool ) { this.statsProvider = statsProvider; + this.groupByStatsProvider = groupByStatsProvider; this.mergeBufferPool = mergeBufferPool; } @@ -74,7 +78,22 @@ public boolean doMonitor(ServiceEmitter emitter) long pendingQueries = this.mergeBufferPool.getPendingRequests(); emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", pendingQueries)); + + emitter.emit( + builder.setMetric( + "mergeBuffer/acquiredCount", + groupByStatsProvider.getAcquiredMergeBufferCount() + ) + ); + emitter.emit( + builder.setMetric( + "groupBy/acquisitionTimeNs", + groupByStatsProvider.getAndResetGroupByResourceAcquisitionStats() + ) + ); + + emitter.emit(builder.setMetric("groupBy/spilledBytes", groupByStatsProvider.getSpilledBytes())); + return true; } - } diff --git a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java index 717c95d62c5b..ffcf55251c09 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java @@ -22,6 +22,7 @@ import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.DefaultBlockingPool; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.query.groupby.GroupByStatsProvider; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -38,6 +39,7 @@ public class QueryCountStatsMonitorTest { private QueryCountStatsProvider queryCountStatsProvider; + private GroupByStatsProvider groupByStatsProvider; private BlockingPool mergeBufferPool; private ExecutorService executorService; @@ -80,6 +82,28 @@ public long getTimedOutQueryCount() } }; + groupByStatsProvider = new GroupByStatsProvider(mergeBufferPool) + { + @Override + public synchronized long getAndResetGroupByResourceAcquisitionStats() + { + return 1L; + } + + @Override + public long getAcquiredMergeBufferCount() + { + return 2L; + } + + @Override + public long getSpilledBytes() + { + return 3L; + } + }; + + mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 1); executorService = Executors.newSingleThreadExecutor(); } @@ -93,7 +117,8 @@ public void tearDown() @Test public void testMonitor() { - final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider, mergeBufferPool); + final QueryCountStatsMonitor monitor = + new QueryCountStatsMonitor(queryCountStatsProvider, groupByStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); emitter.flush(); @@ -105,14 +130,16 @@ public void testMonitor() event -> (String) event.toMap().get("metric"), event -> (Long) event.toMap().get("value") )); - Assert.assertEquals(6, resultMap.size()); + Assert.assertEquals(9, resultMap.size()); Assert.assertEquals(1L, (long) resultMap.get("query/success/count")); Assert.assertEquals(2L, (long) resultMap.get("query/failed/count")); Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count")); Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count")); Assert.assertEquals(10L, (long) resultMap.get("query/count")); Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingRequests")); - + Assert.assertEquals(1, (long) resultMap.get("mergeBuffer/acquisitionTimeNs")); + Assert.assertEquals(2, (long) resultMap.get("groupBy/acquiredCount")); + Assert.assertEquals(3, (long) resultMap.get("groupBy/spilledBytes")); } @Test(timeout = 2_000L) @@ -133,7 +160,8 @@ public void testMonitoringMergeBuffer() } } - final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider, mergeBufferPool); + final QueryCountStatsMonitor monitor = + new QueryCountStatsMonitor(queryCountStatsProvider, groupByStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); boolean ret = monitor.doMonitor(emitter); Assert.assertTrue(ret);