From 4b59fa7e6e9f5955a19bcd7bf552263065ae842b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 16 Jan 2025 12:00:57 +0100 Subject: [PATCH] Stop retaining reference to intermediary aggregation results in QueryPhaseResultConsumer (#119984) We retained a reference to the partial `MergeResult` until after the search response has been sent. This can waste a lot of memory in some cases where partial merges don't do much to reduce memory consumption. Lets `null` out all the fields that may retain heavyweight references on `reduce`. Also, creating new lists saves churn and makes it easier to reason about things for the 2 mutable lists this makes non-final and saves some copying. --- .../search/QueryPhaseResultConsumer.java | 114 +++++++++--------- 1 file changed, 55 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index 37d5065fdd031..9a8dd94dcd324 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -29,7 +29,6 @@ import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -67,8 +66,8 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults onPartialMergeFailure; private final int batchReduceSize; - private final List buffer = new ArrayList<>(); - private final List emptyResults = new ArrayList<>(); + private List buffer = new ArrayList<>(); + private List emptyResults = new ArrayList<>(); // the memory that is accounted in the circuit breaker for this consumer private volatile long circuitBreakerBytes; // the memory that is currently used in the buffer @@ -159,32 +158,40 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { if (f != null) { throw f; } - + List buffer; + synchronized (this) { + // final reduce, we're done with the buffer so we just null it out and continue with a local variable to + // save field references. The synchronized block is never contended but needed to have a memory barrier and sync buffer's + // contents with all the previous writers to it + buffer = this.buffer; + buffer = buffer == null ? Collections.emptyList() : buffer; + this.buffer = null; + } // ensure consistent ordering - sortBuffer(); + buffer.sort(RESULT_COMPARATOR); final TopDocsStats topDocsStats = this.topDocsStats; + var mergeResult = this.mergeResult; + this.mergeResult = null; final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1); final List topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null; final List> aggsList = hasAggs ? new ArrayList<>(resultSize) : null; - synchronized (this) { - if (mergeResult != null) { - if (topDocsList != null) { - topDocsList.add(mergeResult.reducedTopDocs); - } - if (aggsList != null) { - aggsList.add(DelayableWriteable.referencing(mergeResult.reducedAggs)); - } + if (mergeResult != null) { + if (topDocsList != null) { + topDocsList.add(mergeResult.reducedTopDocs); } - for (QuerySearchResult result : buffer) { - topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly()); - if (topDocsList != null) { - TopDocsAndMaxScore topDocs = result.consumeTopDocs(); - setShardIndex(topDocs.topDocs, result.getShardIndex()); - topDocsList.add(topDocs.topDocs); - } - if (aggsList != null) { - aggsList.add(result.getAggs()); - } + if (aggsList != null) { + aggsList.add(DelayableWriteable.referencing(mergeResult.reducedAggs)); + } + } + for (QuerySearchResult result : buffer) { + topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly()); + if (topDocsList != null) { + TopDocsAndMaxScore topDocs = result.consumeTopDocs(); + setShardIndex(topDocs.topDocs, result.getShardIndex()); + topDocsList.add(topDocs.topDocs); + } + if (aggsList != null) { + aggsList.add(result.getAggs()); } } SearchPhaseController.ReducedQueryPhase reducePhase; @@ -206,7 +213,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { performFinalReduce ); } finally { - releaseAggs(); + releaseAggs(buffer); } if (hasAggs // reduced aggregations can be null if all shards failed @@ -226,25 +233,25 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { ); } return reducePhase; + } private static final Comparator RESULT_COMPARATOR = Comparator.comparingInt(QuerySearchResult::getShardIndex); private MergeResult partialReduce( - QuerySearchResult[] toConsume, - List emptyResults, + List toConsume, + List processedShards, TopDocsStats topDocsStats, MergeResult lastMerge, int numReducePhases ) { // ensure consistent ordering - Arrays.sort(toConsume, RESULT_COMPARATOR); + toConsume.sort(RESULT_COMPARATOR); - final List processedShards = new ArrayList<>(emptyResults); final TopDocs newTopDocs; final InternalAggregations newAggs; final List> aggsList; - final int resultSetSize = toConsume.length + (lastMerge != null ? 1 : 0); + final int resultSetSize = toConsume.size() + (lastMerge != null ? 1 : 0); if (hasAggs) { aggsList = new ArrayList<>(resultSetSize); if (lastMerge != null) { @@ -307,12 +314,6 @@ private boolean hasPendingMerges() { return queue.isEmpty() == false || runningTask.get() != null; } - void sortBuffer() { - if (buffer.size() > 0) { - buffer.sort(RESULT_COMPARATOR); - } - } - private synchronized void addWithoutBreaking(long size) { circuitBreaker.addWithoutBreaking(size); circuitBreakerBytes += size; @@ -376,21 +377,21 @@ private void consume(QuerySearchResult result, Runnable next) { } } if (hasFailure == false) { + var b = buffer; aggsCurrentBufferSize += aggsSize; // add one if a partial merge is pending - int size = buffer.size() + (hasPartialReduce ? 1 : 0); + int size = b.size() + (hasPartialReduce ? 1 : 0); if (size >= batchReduceSize) { hasPartialReduce = true; executeNextImmediately = false; - QuerySearchResult[] clone = buffer.toArray(QuerySearchResult[]::new); - MergeTask task = new MergeTask(clone, aggsCurrentBufferSize, new ArrayList<>(emptyResults), next); + MergeTask task = new MergeTask(b, aggsCurrentBufferSize, emptyResults, next); + b = buffer = new ArrayList<>(); + emptyResults = new ArrayList<>(); aggsCurrentBufferSize = 0; - buffer.clear(); - emptyResults.clear(); queue.add(task); tryExecuteNext(); } - buffer.add(result); + b.add(result); } } } @@ -404,10 +405,13 @@ private void consume(QuerySearchResult result, Runnable next) { } private void releaseBuffer() { - for (QuerySearchResult querySearchResult : buffer) { - querySearchResult.releaseAggs(); + var b = buffer; + if (b != null) { + this.buffer = null; + for (QuerySearchResult querySearchResult : b) { + querySearchResult.releaseAggs(); + } } - buffer.clear(); } private synchronized void onMergeFailure(Exception exc) { @@ -449,7 +453,7 @@ private void tryExecuteNext() { @Override protected void doRun() { MergeTask mergeTask = task; - QuerySearchResult[] toConsume = mergeTask.consumeBuffer(); + List toConsume = mergeTask.consumeBuffer(); while (mergeTask != null) { final MergeResult thisMergeResult = mergeResult; long estimatedTotalSize = (thisMergeResult != null ? thisMergeResult.estimatedSize : 0) + mergeTask.aggsBufferSize; @@ -512,15 +516,7 @@ public void onFailure(Exception exc) { }); } - private synchronized void releaseAggs() { - if (hasAggs) { - for (QuerySearchResult result : buffer) { - result.releaseAggs(); - } - } - } - - private static void releaseAggs(QuerySearchResult... toConsume) { + private static void releaseAggs(List toConsume) { for (QuerySearchResult result : toConsume) { result.releaseAggs(); } @@ -535,19 +531,19 @@ private record MergeResult( private static class MergeTask { private final List emptyResults; - private QuerySearchResult[] buffer; + private List buffer; private final long aggsBufferSize; private Runnable next; - private MergeTask(QuerySearchResult[] buffer, long aggsBufferSize, List emptyResults, Runnable next) { + private MergeTask(List buffer, long aggsBufferSize, List emptyResults, Runnable next) { this.buffer = buffer; this.aggsBufferSize = aggsBufferSize; this.emptyResults = emptyResults; this.next = next; } - public synchronized QuerySearchResult[] consumeBuffer() { - QuerySearchResult[] toRet = buffer; + public synchronized List consumeBuffer() { + List toRet = buffer; buffer = null; return toRet; } @@ -559,7 +555,7 @@ public synchronized Runnable consumeListener() { } public void cancel() { - QuerySearchResult[] buffer = consumeBuffer(); + List buffer = consumeBuffer(); if (buffer != null) { releaseAggs(buffer); }