From c59e3c9c5b056ab1986567afc72f54f596e7d37f Mon Sep 17 00:00:00 2001 From: Murphy <96611012+murphyatwork@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:57:46 +0800 Subject: [PATCH] Revert "[BugFix] fix query-level profile counters (backport #49081) (#49152)" This reverts commit c1f5b57ccc6b87f82dba3be8ac2bbe90068e838f. --- .../qe/scheduler/QueryRuntimeProfile.java | 102 +++++++----------- 1 file changed, 39 insertions(+), 63 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java index 9a09446396775..81ea1ea4fc65f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java @@ -15,12 +15,9 @@ package com.starrocks.qe.scheduler; import com.google.common.base.Preconditions; -import com.google.common.collect.HashBasedTable; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.collect.Table; import com.starrocks.common.Config; import com.starrocks.common.MarkedCountDownLatch; import com.starrocks.common.Pair; @@ -56,7 +53,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -338,45 +334,6 @@ public void updateLoadInformation(FragmentInstanceExecState execState, TReportEx } } - /** - * A counter with its merge operator - */ - static class MergeableCounter { - String name; - TUnit unit; - BinaryOperator backendMerge; - - private MergeableCounter(TUnit unit, BinaryOperator backendMerge) { - this.unit = unit; - this.backendMerge = backendMerge; - } - - public static MergeableCounter mergeBySum(TUnit unit) { - return new MergeableCounter(unit, Long::sum); - } - - public static MergeableCounter mergeBySum(String name, TUnit unit) { - MergeableCounter res = new MergeableCounter(unit, Long::sum); - res.name = name; - return res; - } - - public static MergeableCounter mergeByMax(TUnit unit) { - return new MergeableCounter(unit, Long::max); - } - } - - // Query-level metrics but reported in fragment-level, so need to be merged - private static final Map QUERY_CUMULATIVE_COUNTERS = ImmutableMap.of( - "QueryCumulativeCpuTime", MergeableCounter.mergeBySum(TUnit.TIME_NS), - "QueryPeakMemoryUsage", MergeableCounter.mergeBySum("QuerySumMemoryUsage", TUnit.BYTES), - "QueryExecutionWallTime", MergeableCounter.mergeByMax(TUnit.TIME_NS), - "QuerySpillBytes", MergeableCounter.mergeBySum(TUnit.BYTES) - ); - - /** - * Build the Query-Level profile from fragment-level profile - */ public RuntimeProfile buildQueryProfile(boolean needMerge) { if (!needMerge || !jobSpec.isEnablePipeline()) { return queryProfile; @@ -387,8 +344,11 @@ public RuntimeProfile buildQueryProfile(boolean needMerge) { newQueryProfile.copyAllInfoStringsFrom(queryProfile, null); newQueryProfile.copyAllCountersFrom(queryProfile); - // Table - Table counterPerBackend = HashBasedTable.create(); + long sumQueryCumulativeCpuTime = 0; + long sumQuerySpillBytes = 0; + long sumQueryPeakMemoryBytes = 0; + long maxQueryPeakMemoryUsage = 0; + long maxQueryExecutionWallTime = 0; List newFragmentProfiles = Lists.newArrayList(); for (RuntimeProfile fragmentProfile : fragmentProfiles) { @@ -416,17 +376,32 @@ public RuntimeProfile buildQueryProfile(boolean needMerge) { missingInstanceIds.add(instanceProfile.getInfoString("InstanceId")); } - for (Map.Entry entry : QUERY_CUMULATIVE_COUNTERS.entrySet()) { - String queryCounter = entry.getKey(); - Counter toBeRemove = instanceProfile.getCounter(queryCounter); - if (toBeRemove != null) { - String backendAddress = instanceProfile.getInfoString("Address"); - counterPerBackend.row(queryCounter).merge(backendAddress, toBeRemove.getValue(), Long::max); - instanceProfile.removeCounter(queryCounter); - } + // Get query level peak memory usage, cpu cost, wall time + Counter toBeRemove = instanceProfile.getCounter("QueryCumulativeCpuTime"); + if (toBeRemove != null) { + sumQueryCumulativeCpuTime += toBeRemove.getValue(); } - } + instanceProfile.removeCounter("QueryCumulativeCpuTime"); + + toBeRemove = instanceProfile.getCounter("QueryPeakMemoryUsage"); + if (toBeRemove != null) { + maxQueryPeakMemoryUsage = Math.max(maxQueryPeakMemoryUsage, toBeRemove.getValue()); + sumQueryPeakMemoryBytes += toBeRemove.getValue(); + } + instanceProfile.removeCounter("QueryPeakMemoryUsage"); + toBeRemove = instanceProfile.getCounter("QueryExecutionWallTime"); + if (toBeRemove != null) { + maxQueryExecutionWallTime = Math.max(maxQueryExecutionWallTime, toBeRemove.getValue()); + } + instanceProfile.removeCounter("QueryExecutionWallTime"); + + toBeRemove = instanceProfile.getCounter("QuerySpillBytes"); + if (toBeRemove != null) { + sumQuerySpillBytes += toBeRemove.getValue(); + } + instanceProfile.removeCounter("QuerySpillBytes"); + } newFragmentProfile.addInfoString("BackendAddresses", String.join(",", backendAddresses)); newFragmentProfile.addInfoString("InstanceIds", String.join(",", instanceIds)); if (!missingInstanceIds.isEmpty()) { @@ -546,15 +521,16 @@ public RuntimeProfile buildQueryProfile(boolean needMerge) { queryPeakScheduleTime.setValue(maxScheduleTime); newQueryProfile.getCounterTotalTime().setValue(0); - for (Map.Entry entry : QUERY_CUMULATIVE_COUNTERS.entrySet()) { - String queryCounter = entry.getKey(); - MergeableCounter merge = entry.getValue(); - counterPerBackend.row(queryCounter).values().stream().reduce(merge.backendMerge).ifPresent(value -> { - String queryCounterName = merge.name != null ? merge.name : queryCounter; - Counter counter = newQueryProfile.addCounter(queryCounterName, merge.unit, null); - counter.setValue(value); - }); - } + Counter queryCumulativeCpuTime = newQueryProfile.addCounter("QueryCumulativeCpuTime", TUnit.TIME_NS, null); + queryCumulativeCpuTime.setValue(sumQueryCumulativeCpuTime); + Counter queryPeakMemoryUsage = newQueryProfile.addCounter("QueryPeakMemoryUsagePerNode", TUnit.BYTES, null); + queryPeakMemoryUsage.setValue(maxQueryPeakMemoryUsage); + Counter sumQueryPeakMemoryUsage = newQueryProfile.addCounter("QuerySumMemoryUsage", TUnit.BYTES, null); + sumQueryPeakMemoryUsage.setValue(sumQueryPeakMemoryBytes); + Counter queryExecutionWallTime = newQueryProfile.addCounter("QueryExecutionWallTime", TUnit.TIME_NS, null); + queryExecutionWallTime.setValue(maxQueryExecutionWallTime); + Counter querySpillBytes = newQueryProfile.addCounter("QuerySpillBytes", TUnit.BYTES, null); + querySpillBytes.setValue(sumQuerySpillBytes); if (execPlan != null) { newQueryProfile.addInfoString("Topology", execPlan.getProfilingPlan().toTopologyJson());