Skip to content

Commit

Permalink
Revert "[BugFix] fix query-level profile counters (backport #49081) (#…
Browse files Browse the repository at this point in the history
…49152)"

This reverts commit c1f5b57.
  • Loading branch information
murphyatwork authored Aug 6, 2024
1 parent cabd7e2 commit c59e3c9
Showing 1 changed file with 39 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
package com.starrocks.qe.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.starrocks.common.Config;
import com.starrocks.common.MarkedCountDownLatch;
import com.starrocks.common.Pair;
Expand Down Expand Up @@ -56,7 +53,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -338,45 +334,6 @@ public void updateLoadInformation(FragmentInstanceExecState execState, TReportEx
}
}

/**
* A counter with its merge operator
*/
static class MergeableCounter {
String name;
TUnit unit;
BinaryOperator<Long> backendMerge;

private MergeableCounter(TUnit unit, BinaryOperator<Long> backendMerge) {
this.unit = unit;
this.backendMerge = backendMerge;
}

public static MergeableCounter mergeBySum(TUnit unit) {
return new MergeableCounter(unit, Long::sum);
}

public static MergeableCounter mergeBySum(String name, TUnit unit) {
MergeableCounter res = new MergeableCounter(unit, Long::sum);
res.name = name;
return res;
}

public static MergeableCounter mergeByMax(TUnit unit) {
return new MergeableCounter(unit, Long::max);
}
}

// Query-level metrics but reported in fragment-level, so need to be merged
private static final Map<String, MergeableCounter> QUERY_CUMULATIVE_COUNTERS = ImmutableMap.of(
"QueryCumulativeCpuTime", MergeableCounter.mergeBySum(TUnit.TIME_NS),
"QueryPeakMemoryUsage", MergeableCounter.mergeBySum("QuerySumMemoryUsage", TUnit.BYTES),
"QueryExecutionWallTime", MergeableCounter.mergeByMax(TUnit.TIME_NS),
"QuerySpillBytes", MergeableCounter.mergeBySum(TUnit.BYTES)
);

/**
* Build the Query-Level profile from fragment-level profile
*/
public RuntimeProfile buildQueryProfile(boolean needMerge) {
if (!needMerge || !jobSpec.isEnablePipeline()) {
return queryProfile;
Expand All @@ -387,8 +344,11 @@ public RuntimeProfile buildQueryProfile(boolean needMerge) {
newQueryProfile.copyAllInfoStringsFrom(queryProfile, null);
newQueryProfile.copyAllCountersFrom(queryProfile);

// Table<CounterName, BackendAddress, CounterValue>
Table<String, String, Long> counterPerBackend = HashBasedTable.create();
long sumQueryCumulativeCpuTime = 0;
long sumQuerySpillBytes = 0;
long sumQueryPeakMemoryBytes = 0;
long maxQueryPeakMemoryUsage = 0;
long maxQueryExecutionWallTime = 0;

List<RuntimeProfile> newFragmentProfiles = Lists.newArrayList();
for (RuntimeProfile fragmentProfile : fragmentProfiles) {
Expand Down Expand Up @@ -416,17 +376,32 @@ public RuntimeProfile buildQueryProfile(boolean needMerge) {
missingInstanceIds.add(instanceProfile.getInfoString("InstanceId"));
}

for (Map.Entry<String, MergeableCounter> entry : QUERY_CUMULATIVE_COUNTERS.entrySet()) {
String queryCounter = entry.getKey();
Counter toBeRemove = instanceProfile.getCounter(queryCounter);
if (toBeRemove != null) {
String backendAddress = instanceProfile.getInfoString("Address");
counterPerBackend.row(queryCounter).merge(backendAddress, toBeRemove.getValue(), Long::max);
instanceProfile.removeCounter(queryCounter);
}
// Get query level peak memory usage, cpu cost, wall time
Counter toBeRemove = instanceProfile.getCounter("QueryCumulativeCpuTime");
if (toBeRemove != null) {
sumQueryCumulativeCpuTime += toBeRemove.getValue();
}
}
instanceProfile.removeCounter("QueryCumulativeCpuTime");

toBeRemove = instanceProfile.getCounter("QueryPeakMemoryUsage");
if (toBeRemove != null) {
maxQueryPeakMemoryUsage = Math.max(maxQueryPeakMemoryUsage, toBeRemove.getValue());
sumQueryPeakMemoryBytes += toBeRemove.getValue();
}
instanceProfile.removeCounter("QueryPeakMemoryUsage");

toBeRemove = instanceProfile.getCounter("QueryExecutionWallTime");
if (toBeRemove != null) {
maxQueryExecutionWallTime = Math.max(maxQueryExecutionWallTime, toBeRemove.getValue());
}
instanceProfile.removeCounter("QueryExecutionWallTime");

toBeRemove = instanceProfile.getCounter("QuerySpillBytes");
if (toBeRemove != null) {
sumQuerySpillBytes += toBeRemove.getValue();
}
instanceProfile.removeCounter("QuerySpillBytes");
}
newFragmentProfile.addInfoString("BackendAddresses", String.join(",", backendAddresses));
newFragmentProfile.addInfoString("InstanceIds", String.join(",", instanceIds));
if (!missingInstanceIds.isEmpty()) {
Expand Down Expand Up @@ -546,15 +521,16 @@ public RuntimeProfile buildQueryProfile(boolean needMerge) {
queryPeakScheduleTime.setValue(maxScheduleTime);
newQueryProfile.getCounterTotalTime().setValue(0);

for (Map.Entry<String, MergeableCounter> entry : QUERY_CUMULATIVE_COUNTERS.entrySet()) {
String queryCounter = entry.getKey();
MergeableCounter merge = entry.getValue();
counterPerBackend.row(queryCounter).values().stream().reduce(merge.backendMerge).ifPresent(value -> {
String queryCounterName = merge.name != null ? merge.name : queryCounter;
Counter counter = newQueryProfile.addCounter(queryCounterName, merge.unit, null);
counter.setValue(value);
});
}
Counter queryCumulativeCpuTime = newQueryProfile.addCounter("QueryCumulativeCpuTime", TUnit.TIME_NS, null);
queryCumulativeCpuTime.setValue(sumQueryCumulativeCpuTime);
Counter queryPeakMemoryUsage = newQueryProfile.addCounter("QueryPeakMemoryUsagePerNode", TUnit.BYTES, null);
queryPeakMemoryUsage.setValue(maxQueryPeakMemoryUsage);
Counter sumQueryPeakMemoryUsage = newQueryProfile.addCounter("QuerySumMemoryUsage", TUnit.BYTES, null);
sumQueryPeakMemoryUsage.setValue(sumQueryPeakMemoryBytes);
Counter queryExecutionWallTime = newQueryProfile.addCounter("QueryExecutionWallTime", TUnit.TIME_NS, null);
queryExecutionWallTime.setValue(maxQueryExecutionWallTime);
Counter querySpillBytes = newQueryProfile.addCounter("QuerySpillBytes", TUnit.BYTES, null);
querySpillBytes.setValue(sumQuerySpillBytes);

if (execPlan != null) {
newQueryProfile.addInfoString("Topology", execPlan.getProfilingPlan().toTopologyJson());
Expand Down

0 comments on commit c59e3c9

Please sign in to comment.