Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Unify load profile through analyze profile & support session/table profile collect granularity #47548

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/en/administration/management/FE_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4549,9 +4549,9 @@ ADMIN SET FRONTEND CONFIG ("key" = "value");
-->

<!--
##### stream_load_profile_collect_second
##### load_profile_collect_threshold_second
meegoo marked this conversation as resolved.
Show resolved Hide resolved

- Default: 10
- Default: 0
- Type: Long
- Unit: Seconds
- Is mutable: Yes
Expand Down
4 changes: 2 additions & 2 deletions docs/zh/administration/management/FE_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4555,9 +4555,9 @@ Compaction Score 代表了一个表分区是否值得进行 Compaction 的评分
-->

<!--
##### stream_load_profile_collect_second
##### stream_load_profile_collect_threshold_second

- 默认值:10
- 默认值:0
- 类型:Long
- 单位:Seconds
- 是否动态:是
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause clause,
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_MUTABLE_BUCKET_NUM)) {
schemaChangeHandler.updateTableMeta(db, tableName.getTbl(), properties,
TTabletMetaType.MUTABLE_BUCKET_NUM);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE)) {
schemaChangeHandler.updateTableMeta(db, tableName.getTbl(), properties,
TTabletMetaType.ENABLE_LOAD_PROFILE);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_ENABLE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_TTL) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_SIZE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2100,6 +2100,11 @@ public void updateTableMeta(Database db, String tableName, Map<String, String> p
if (mutableBucketNum == olapTable.getMutableBucketNum()) {
return;
}
} else if (metaType == TTabletMetaType.ENABLE_LOAD_PROFILE) {
boolean enableLoadProfile = Boolean.parseBoolean(properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE));
if (enableLoadProfile == olapTable.enableLoadProfile()) {
return;
}
} else if (metaType == TTabletMetaType.PRIMARY_INDEX_CACHE_EXPIRE_SEC) {
int primaryIndexCacheExpireSec = Integer.parseInt(properties.get(
PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC));
Expand Down
33 changes: 33 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ public enum OlapTableState {
protected Map<Long, Long> doubleWritePartitions = new HashMap<>();

// Both the following two flags are used by StarMgrMetaSyncer
private long lastCollectProfileTime = 0;

// The flag is used to indicate whether the table shard group has changed.
public AtomicBoolean isShardGroupChanged = new AtomicBoolean(false);
// The flag is used to indicate whether the table is doing automatic bucketing.
Expand Down Expand Up @@ -2552,6 +2554,23 @@ public void setMutableBucketNum(long bucketNum) {
tableProperty.buildMutableBucketNum();
}

public Boolean enableLoadProfile() {
if (tableProperty != null) {
return tableProperty.enableLoadProfile();
}
return false;
}

public void setEnableLoadProfile(boolean enableLoadProfile) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
}
tableProperty
.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE,
Boolean.valueOf(enableLoadProfile).toString());
tableProperty.buildEnableLoadProfile();
}

public TWriteQuorumType writeQuorum() {
if (tableProperty != null) {
return tableProperty.writeQuorum();
Expand Down Expand Up @@ -3253,6 +3272,12 @@ public Map<String, String> getProperties() {
properties.put(PropertyAnalyzer.PROPERTIES_MUTABLE_BUCKET_NUM, mutableBucketNum.toString());
}

// enable load profile
Boolean enableLoadProfile = enableLoadProfile();
if (enableLoadProfile) {
properties.put(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE, "true");
}

// locations
Multimap<String, String> locationsMap = getLocation();
if (locationsMap != null) {
Expand Down Expand Up @@ -3540,4 +3565,12 @@ public PhysicalPartition getPartitionSample() {
return null;
}
}

public long getLastCollectProfileTime() {
return lastCollectProfileTime;
}

public void updateLastCollectProfileTime() {
this.lastCollectProfileTime = System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ public static String valueList() {
// the default mutable bucket number
private long mutableBucketNum = 0;

private boolean enableLoadProfile = false;

// 1. This table has been deleted. if hasDelete is false, the BE segment must don't have deleteConditions.
// If hasDelete is true, the BE segment maybe have deleteConditions because compaction.
// 2. Before checkpoint, we relay delete job journal log to persist.
Expand Down Expand Up @@ -358,6 +360,9 @@ public TableProperty buildProperty(short opCode) {
case OperationType.OP_MODIFY_MUTABLE_BUCKET_NUM:
buildMutableBucketNum();
break;
case OperationType.OP_MODIFY_ENABLE_LOAD_PROFILE:
buildEnableLoadProfile();
break;
case OperationType.OP_MODIFY_BINLOG_CONFIG:
buildBinlogConfig();
break;
Expand Down Expand Up @@ -650,6 +655,13 @@ public TableProperty buildMutableBucketNum() {
return this;
}

public TableProperty buildEnableLoadProfile() {
if (properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE) != null) {
enableLoadProfile = Boolean.parseBoolean(properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE));
}
return this;
}

public TableProperty buildEnablePersistentIndex() {
enablePersistentIndex = Boolean.parseBoolean(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_PERSISTENT_INDEX, "false"));
Expand Down Expand Up @@ -906,6 +918,10 @@ public long getMutableBucketNum() {
return mutableBucketNum;
}

public boolean enableLoadProfile() {
return enableLoadProfile;
}

public String getStorageVolume() {
return storageVolume;
}
Expand Down Expand Up @@ -1022,6 +1038,7 @@ public void gsonPostProcess() throws IOException {
buildPartitionLiveNumber();
buildReplicatedStorage();
buildBucketSize();
buildEnableLoadProfile();
buildBinlogConfig();
buildBinlogAvailableVersion();
buildDataCachePartitionDuration();
Expand Down
10 changes: 8 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2888,11 +2888,17 @@ public class Config extends ConfigBase {
public static int routine_load_scheduler_interval_millisecond = 10000;

/**
* Only when the stream load time exceeds this value,
* Only when the stream/routine load time exceeds this value,
* the profile will be put into the profileManager
*/
@ConfField(mutable = true, aliases = {"stream_load_profile_collect_second"})
public static long stream_load_profile_collect_threshold_second = 0;

/**
* The interval of collecting load profile through table granularity
*/
@ConfField(mutable = true)
public static long stream_load_profile_collect_second = 10; //10s
public static long load_profile_collect_interval_second = 0;

/**
* If set to <= 0, means that no limitation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public class ProfileManager implements MemoryTrackable {
public static final String DEFAULT_DB = "Default Db";
public static final String VARIABLES = "Variables";
public static final String PROFILE_COLLECT_TIME = "Collect Profile Time";
public static final String LOAD_TYPE = "Load Type";

public static final String LOAD_TYPE_STREAM_LOAD = "STREAM_LOAD";
public static final String LOAD_TYPE_ROUTINE_LOAD = "ROUTINE_LOAD";

private static final int MEMORY_PROFILE_SAMPLES = 10;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public class PropertyAnalyzer {

public static final String PROPERTIES_MUTABLE_BUCKET_NUM = "mutable_bucket_num";

public static final String PROPERTIES_ENABLE_LOAD_PROFILE = "enable_load_profile";

public static final String PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC = "primary_index_cache_expire_sec";

public static final String PROPERTIES_TABLET_TYPE = "tablet_type";
Expand Down Expand Up @@ -445,6 +447,14 @@ public static long analyzeMutableBucketNum(Map<String, String> properties) {
}
}

public static boolean analyzeEnableLoadProfile(Map<String, String> properties) {
boolean enableLoadProfile = false;
if (properties != null && properties.containsKey(PROPERTIES_ENABLE_LOAD_PROFILE)) {
enableLoadProfile = Boolean.parseBoolean(properties.get(PROPERTIES_ENABLE_LOAD_PROFILE));
}
return enableLoadProfile;
}

public static int analyzeAutoRefreshPartitionsLimit(Map<String, String> properties, MaterializedView mv) {
if (mv.getRefreshScheme().getType() == MaterializedView.RefreshType.MANUAL) {
throw new SemanticException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ public void readFields(DataInput in) throws IOException {
case OperationType.OP_MODIFY_REPLICATED_STORAGE:
case OperationType.OP_MODIFY_BUCKET_SIZE:
case OperationType.OP_MODIFY_MUTABLE_BUCKET_NUM:
case OperationType.OP_MODIFY_ENABLE_LOAD_PROFILE:
case OperationType.OP_MODIFY_BINLOG_CONFIG:
case OperationType.OP_MODIFY_BINLOG_AVAILABLE_VERSION:
case OperationType.OP_MODIFY_ENABLE_PERSISTENT_INDEX:
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.starrocks.common.util.LoadPriority;
import com.starrocks.common.util.LogBuilder;
import com.starrocks.common.util.LogKey;
import com.starrocks.common.util.ProfileManager;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.load.EtlJobType;
Expand Down Expand Up @@ -935,6 +936,16 @@ public TLoadInfo toThrift() {
info.setTxn_id(transactionId);
if (!loadIds.isEmpty()) {
info.setLoad_id(Joiner.on(", ").join(loadIds));

List<String> profileIds = Lists.newArrayList();
for (String loadId : loadIds) {
if (ProfileManager.getInstance().hasProfile(loadId)) {
profileIds.add(loadId);
}
}
if (!profileIds.isEmpty()) {
info.setProfile_id(Joiner.on(", ").join(loadIds));
}
}
try {
info.setTable(getTableNames(true).stream().collect(Collectors.joining(",")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonPreProcessable;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.scheduler.Coordinator;
import com.starrocks.rpc.ThriftConnectionPool;
import com.starrocks.rpc.ThriftRPCRequestExecutor;
Expand Down Expand Up @@ -840,7 +838,7 @@ private void unprotectedWaitCoordFinish() throws UserException {
}

if (coord.isEnableLoadProfile()) {
collectProfile();
collectProfile(false);
}

this.trackingUrl = coord.getTrackingUrl();
Expand Down Expand Up @@ -1019,7 +1017,7 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw

// sync stream load collect profile, here we collect profile only when be has reported
if (isSyncStreamLoad() && coord != null && coord.isProfileAlreadyReported()) {
collectProfile();
collectProfile(false);
}

writeLock();
Expand All @@ -1037,31 +1035,24 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
}
}

public void collectProfile() {
long currentTimestamp = System.currentTimeMillis();
long totalTimeMs = currentTimestamp - createTimeMs;

// For the usage scenarios of flink cdc or routine load,
// the frequency of stream load maybe very high, resulting in many profiles,
// but we may only care about the long-duration stream load profile.
if (totalTimeMs < Config.stream_load_profile_collect_second * 1000) {
LOG.info(String.format("Load %s, totalTimeMs %d < Config.stream_load_profile_collect_second %d)",
label, totalTimeMs, Config.stream_load_profile_collect_second));
return;
}

public RuntimeProfile buildTopLevelProfile(boolean isAborted) {
RuntimeProfile profile = new RuntimeProfile("Load");
RuntimeProfile summaryProfile = new RuntimeProfile("Summary");
summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(loadId));
summaryProfile.addInfoString(ProfileManager.START_TIME,
TimeUtils.longToTimeString(createTimeMs));

summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(System.currentTimeMillis()));
long currentTimestamp = System.currentTimeMillis();
long totalTimeMs = currentTimestamp - createTimeMs;
summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp));
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs));

summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load");
summaryProfile.addInfoString(ProfileManager.LOAD_TYPE, getStringByType());
summaryProfile.addInfoString(ProfileManager.QUERY_STATE, isAborted ? "Aborted" : "Finished");
summaryProfile.addInfoString("StarRocks Version",
String.format("%s-%s", Version.STARROCKS_VERSION, Version.STARROCKS_COMMIT_HASH));
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, getStmt());
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, dbName);

Map<String, String> loadCounters = coord.getLoadCounters();
Expand All @@ -1071,19 +1062,20 @@ public void collectProfile() {
summaryProfile.addInfoString("NumRowsAbnormal", loadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL));
summaryProfile.addInfoString("numRowsUnselected", loadCounters.get(LoadJob.UNSELECTED_ROWS));
}
ConnectContext session = ConnectContext.get();
if (session != null) {
SessionVariable variables = session.getSessionVariable();
if (variables != null) {
summaryProfile.addInfoString("NonDefaultSessionVariables", variables.getNonDefaultVariablesJson());
}
}

profile.addChild(summaryProfile);

return profile;
}


public void collectProfile(boolean isAborted) {
RuntimeProfile profile = buildTopLevelProfile(isAborted);

if (coord.getQueryProfile() != null) {
if (!isSyncStreamLoad()) {
coord.collectProfileSync();
profile.addChild(coord.buildQueryProfile(session == null || session.needMergeProfile()));
profile.addChild(coord.buildQueryProfile(true));
} else {
profile.addChild(coord.getQueryProfile());
}
Expand Down Expand Up @@ -1140,6 +1132,10 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
return;
}

if (isSyncStreamLoad() && coord.isProfileAlreadyReported()) {
collectProfile(true);
}

writeLock();
try {
if (isFinalState()) {
Expand Down Expand Up @@ -1347,10 +1343,18 @@ public boolean isSyncStreamLoad() {
return isSyncStreamLoad;
}

public boolean setIsSyncStreamLoad(boolean isSyncStreamLoad) {
return this.isSyncStreamLoad = isSyncStreamLoad;
}

public boolean isRoutineLoadTask() {
return type == Type.ROUTINE_LOAD;
}

public String getStmt() {
return "";
}

// for sync stream load
public void setCoordinator(Coordinator coord) {
this.coord = coord;
Expand All @@ -1359,9 +1363,9 @@ public void setCoordinator(Coordinator coord) {
public String getStringByType() {
switch (this.type) {
case ROUTINE_LOAD:
return "ROUTINE_LOAD";
return ProfileManager.LOAD_TYPE_ROUTINE_LOAD;
case STREAM_LOAD:
return "STREAM_LOAD";
return ProfileManager.LOAD_TYPE_STREAM_LOAD;
case PARALLEL_STREAM_LOAD:
return "PARALLEL_STREAM_LOAD";
default:
Expand Down Expand Up @@ -1507,7 +1511,9 @@ public TLoadInfo toThrift() {
} else {
info.setProgress("0%");
}

if (ProfileManager.getInstance().hasProfile(DebugUtil.printId(loadId))) {
info.setProfile_id(DebugUtil.printId(loadId));
}
// tracking url
if (trackingUrl != null) {
info.setUrl(trackingUrl);
Expand Down
Loading
Loading