Skip to content

Commit

Permalink
[Feature] Unify load profile through analyze profile & support sessio…
Browse files Browse the repository at this point in the history
…n/table profile collect granularity

Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo committed Sep 5, 2024
1 parent fa1256b commit 2a9e335
Show file tree
Hide file tree
Showing 29 changed files with 344 additions and 52 deletions.
4 changes: 2 additions & 2 deletions docs/en/administration/management/FE_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4549,9 +4549,9 @@ ADMIN SET FRONTEND CONFIG ("key" = "value");
-->

<!--
##### stream_load_profile_collect_second
##### load_profile_collect_threshold_second
- Default: 10
- Default: 0
- Type: Long
- Unit: Seconds
- Is mutable: Yes
Expand Down
4 changes: 2 additions & 2 deletions docs/zh/administration/management/FE_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4555,9 +4555,9 @@ Compaction Score 代表了一个表分区是否值得进行 Compaction 的评分
-->

<!--
##### stream_load_profile_collect_second
##### stream_load_profile_collect_threshold_second

- 默认值:10
- 默认值:0
- 类型:Long
- 单位:Seconds
- 是否动态:是
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause clause,
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_MUTABLE_BUCKET_NUM)) {
schemaChangeHandler.updateTableMeta(db, tableName.getTbl(), properties,
TTabletMetaType.MUTABLE_BUCKET_NUM);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE)) {
schemaChangeHandler.updateTableMeta(db, tableName.getTbl(), properties,
TTabletMetaType.ENABLE_LOAD_PROFILE);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_ENABLE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_TTL) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_SIZE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2100,6 +2100,11 @@ public void updateTableMeta(Database db, String tableName, Map<String, String> p
if (mutableBucketNum == olapTable.getMutableBucketNum()) {
return;
}
} else if (metaType == TTabletMetaType.ENABLE_LOAD_PROFILE) {
boolean enableLoadProfile = Boolean.parseBoolean(properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE));
if (enableLoadProfile == olapTable.enableLoadProfile()) {
return;
}
} else if (metaType == TTabletMetaType.PRIMARY_INDEX_CACHE_EXPIRE_SEC) {
int primaryIndexCacheExpireSec = Integer.parseInt(properties.get(
PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC));
Expand Down
33 changes: 33 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ public enum OlapTableState {
protected Map<Long, Long> doubleWritePartitions = new HashMap<>();

// Both the following two flags are used by StarMgrMetaSyncer
private long lastCollectProfileTime = 0;

// The flag is used to indicate whether the table shard group has changed.
public AtomicBoolean isShardGroupChanged = new AtomicBoolean(false);
// The flag is used to indicate whether the table is doing automatic bucketing.
Expand Down Expand Up @@ -2552,6 +2554,23 @@ public void setMutableBucketNum(long bucketNum) {
tableProperty.buildMutableBucketNum();
}

public Boolean enableLoadProfile() {
if (tableProperty != null) {
return tableProperty.enableLoadProfile();
}
return false;
}

public void setEnableLoadProfile(boolean enableLoadProfile) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
}
tableProperty
.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE,
Boolean.valueOf(enableLoadProfile).toString());
tableProperty.buildEnableLoadProfile();
}

public TWriteQuorumType writeQuorum() {
if (tableProperty != null) {
return tableProperty.writeQuorum();
Expand Down Expand Up @@ -3253,6 +3272,12 @@ public Map<String, String> getProperties() {
properties.put(PropertyAnalyzer.PROPERTIES_MUTABLE_BUCKET_NUM, mutableBucketNum.toString());
}

// enable load profile
Boolean enableLoadProfile = enableLoadProfile();
if (enableLoadProfile) {
properties.put(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE, "true");
}

// locations
Multimap<String, String> locationsMap = getLocation();
if (locationsMap != null) {
Expand Down Expand Up @@ -3540,4 +3565,12 @@ public PhysicalPartition getPartitionSample() {
return null;
}
}

public long getLastCollectProfileTime() {
return lastCollectProfileTime;
}

public void updateLastCollectProfileTime() {
this.lastCollectProfileTime = System.currentTimeMillis();
}
}
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ public static String valueList() {
// the default mutable bucket number
private long mutableBucketNum = 0;

private boolean enableLoadProfile = false;

// 1. This table has been deleted. if hasDelete is false, the BE segment must don't have deleteConditions.
// If hasDelete is true, the BE segment maybe have deleteConditions because compaction.
// 2. Before checkpoint, we relay delete job journal log to persist.
Expand Down Expand Up @@ -358,6 +360,9 @@ public TableProperty buildProperty(short opCode) {
case OperationType.OP_MODIFY_MUTABLE_BUCKET_NUM:
buildMutableBucketNum();
break;
case OperationType.OP_MODIFY_ENABLE_LOAD_PROFILE:
buildEnableLoadProfile();
break;
case OperationType.OP_MODIFY_BINLOG_CONFIG:
buildBinlogConfig();
break;
Expand Down Expand Up @@ -650,6 +655,13 @@ public TableProperty buildMutableBucketNum() {
return this;
}

public TableProperty buildEnableLoadProfile() {
if (properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE) != null) {
enableLoadProfile = Boolean.parseBoolean(properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE));
}
return this;
}

public TableProperty buildEnablePersistentIndex() {
enablePersistentIndex = Boolean.parseBoolean(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_PERSISTENT_INDEX, "false"));
Expand Down Expand Up @@ -906,6 +918,10 @@ public long getMutableBucketNum() {
return mutableBucketNum;
}

public boolean enableLoadProfile() {
return enableLoadProfile;
}

public String getStorageVolume() {
return storageVolume;
}
Expand Down Expand Up @@ -1022,6 +1038,7 @@ public void gsonPostProcess() throws IOException {
buildPartitionLiveNumber();
buildReplicatedStorage();
buildBucketSize();
buildEnableLoadProfile();
buildBinlogConfig();
buildBinlogAvailableVersion();
buildDataCachePartitionDuration();
Expand Down
10 changes: 8 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2888,11 +2888,17 @@ public class Config extends ConfigBase {
public static int routine_load_scheduler_interval_millisecond = 10000;

/**
* Only when the stream load time exceeds this value,
* Only when the stream/routine load time exceeds this value,
* the profile will be put into the profileManager
*/
@ConfField(mutable = true, aliases = {"stream_load_profile_collect_second"})
public static long stream_load_profile_collect_threshold_second = 0;

/**
* The interval of collecting load profile through table granularity
*/
@ConfField(mutable = true)
public static long stream_load_profile_collect_second = 10; //10s
public static long load_profile_collect_interval_second = 0;

/**
* If set to <= 0, means that no limitation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public class ProfileManager implements MemoryTrackable {
public static final String DEFAULT_DB = "Default Db";
public static final String VARIABLES = "Variables";
public static final String PROFILE_COLLECT_TIME = "Collect Profile Time";
public static final String LOAD_TYPE = "Load Type";

public static final String LOAD_TYPE_STREAM_LOAD = "STREAM_LOAD";
public static final String LOAD_TYPE_ROUTINE_LOAD = "ROUTINE_LOAD";

private static final int MEMORY_PROFILE_SAMPLES = 10;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public class PropertyAnalyzer {

public static final String PROPERTIES_MUTABLE_BUCKET_NUM = "mutable_bucket_num";

public static final String PROPERTIES_ENABLE_LOAD_PROFILE = "enable_load_profile";

public static final String PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC = "primary_index_cache_expire_sec";

public static final String PROPERTIES_TABLET_TYPE = "tablet_type";
Expand Down Expand Up @@ -445,6 +447,14 @@ public static long analyzeMutableBucketNum(Map<String, String> properties) {
}
}

public static boolean analyzeEnableLoadProfile(Map<String, String> properties) {
boolean enableLoadProfile = false;
if (properties != null && properties.containsKey(PROPERTIES_ENABLE_LOAD_PROFILE)) {
enableLoadProfile = Boolean.parseBoolean(properties.get(PROPERTIES_ENABLE_LOAD_PROFILE));
}
return enableLoadProfile;
}

public static int analyzeAutoRefreshPartitionsLimit(Map<String, String> properties, MaterializedView mv) {
if (mv.getRefreshScheme().getType() == MaterializedView.RefreshType.MANUAL) {
throw new SemanticException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ public void readFields(DataInput in) throws IOException {
case OperationType.OP_MODIFY_REPLICATED_STORAGE:
case OperationType.OP_MODIFY_BUCKET_SIZE:
case OperationType.OP_MODIFY_MUTABLE_BUCKET_NUM:
case OperationType.OP_MODIFY_ENABLE_LOAD_PROFILE:
case OperationType.OP_MODIFY_BINLOG_CONFIG:
case OperationType.OP_MODIFY_BINLOG_AVAILABLE_VERSION:
case OperationType.OP_MODIFY_ENABLE_PERSISTENT_INDEX:
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.starrocks.common.util.LoadPriority;
import com.starrocks.common.util.LogBuilder;
import com.starrocks.common.util.LogKey;
import com.starrocks.common.util.ProfileManager;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.load.EtlJobType;
Expand Down Expand Up @@ -935,6 +936,16 @@ public TLoadInfo toThrift() {
info.setTxn_id(transactionId);
if (!loadIds.isEmpty()) {
info.setLoad_id(Joiner.on(", ").join(loadIds));

List<String> profileIds = Lists.newArrayList();
for (String loadId : loadIds) {
if (ProfileManager.getInstance().hasProfile(loadId)) {
profileIds.add(loadId);
}
}
if (!profileIds.isEmpty()) {
info.setProfile_id(Joiner.on(", ").join(loadIds));
}
}
try {
info.setTable(getTableNames(true).stream().collect(Collectors.joining(",")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonPreProcessable;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.scheduler.Coordinator;
import com.starrocks.rpc.ThriftConnectionPool;
import com.starrocks.rpc.ThriftRPCRequestExecutor;
Expand Down Expand Up @@ -840,7 +838,7 @@ private void unprotectedWaitCoordFinish() throws UserException {
}

if (coord.isEnableLoadProfile()) {
collectProfile();
collectProfile(false);
}

this.trackingUrl = coord.getTrackingUrl();
Expand Down Expand Up @@ -1019,7 +1017,7 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw

// sync stream load collect profile, here we collect profile only when be has reported
if (isSyncStreamLoad() && coord != null && coord.isProfileAlreadyReported()) {
collectProfile();
collectProfile(false);
}

writeLock();
Expand All @@ -1037,31 +1035,24 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
}
}

public void collectProfile() {
long currentTimestamp = System.currentTimeMillis();
long totalTimeMs = currentTimestamp - createTimeMs;

// For the usage scenarios of flink cdc or routine load,
// the frequency of stream load maybe very high, resulting in many profiles,
// but we may only care about the long-duration stream load profile.
if (totalTimeMs < Config.stream_load_profile_collect_second * 1000) {
LOG.info(String.format("Load %s, totalTimeMs %d < Config.stream_load_profile_collect_second %d)",
label, totalTimeMs, Config.stream_load_profile_collect_second));
return;
}

public RuntimeProfile buildTopLevelProfile(boolean isAborted) {
RuntimeProfile profile = new RuntimeProfile("Load");
RuntimeProfile summaryProfile = new RuntimeProfile("Summary");
summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(loadId));
summaryProfile.addInfoString(ProfileManager.START_TIME,
TimeUtils.longToTimeString(createTimeMs));

summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(System.currentTimeMillis()));
long currentTimestamp = System.currentTimeMillis();
long totalTimeMs = currentTimestamp - createTimeMs;
summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp));
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs));

summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load");
summaryProfile.addInfoString(ProfileManager.LOAD_TYPE, getStringByType());
summaryProfile.addInfoString(ProfileManager.QUERY_STATE, isAborted ? "Aborted" : "Finished");
summaryProfile.addInfoString("StarRocks Version",
String.format("%s-%s", Version.STARROCKS_VERSION, Version.STARROCKS_COMMIT_HASH));
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, getStmt());
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, dbName);

Map<String, String> loadCounters = coord.getLoadCounters();
Expand All @@ -1071,19 +1062,20 @@ public void collectProfile() {
summaryProfile.addInfoString("NumRowsAbnormal", loadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL));
summaryProfile.addInfoString("numRowsUnselected", loadCounters.get(LoadJob.UNSELECTED_ROWS));
}
ConnectContext session = ConnectContext.get();
if (session != null) {
SessionVariable variables = session.getSessionVariable();
if (variables != null) {
summaryProfile.addInfoString("NonDefaultSessionVariables", variables.getNonDefaultVariablesJson());
}
}

profile.addChild(summaryProfile);

return profile;
}


public void collectProfile(boolean isAborted) {
RuntimeProfile profile = buildTopLevelProfile(isAborted);

if (coord.getQueryProfile() != null) {
if (!isSyncStreamLoad()) {
coord.collectProfileSync();
profile.addChild(coord.buildQueryProfile(session == null || session.needMergeProfile()));
profile.addChild(coord.buildQueryProfile(true));
} else {
profile.addChild(coord.getQueryProfile());
}
Expand Down Expand Up @@ -1140,6 +1132,10 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
return;
}

if (isSyncStreamLoad() && coord.isProfileAlreadyReported()) {
collectProfile(true);
}

writeLock();
try {
if (isFinalState()) {
Expand Down Expand Up @@ -1347,10 +1343,18 @@ public boolean isSyncStreamLoad() {
return isSyncStreamLoad;
}

public boolean setIsSyncStreamLoad(boolean isSyncStreamLoad) {
return this.isSyncStreamLoad = isSyncStreamLoad;
}

public boolean isRoutineLoadTask() {
return type == Type.ROUTINE_LOAD;
}

public String getStmt() {
return "";
}

// for sync stream load
public void setCoordinator(Coordinator coord) {
this.coord = coord;
Expand All @@ -1359,9 +1363,9 @@ public void setCoordinator(Coordinator coord) {
public String getStringByType() {
switch (this.type) {
case ROUTINE_LOAD:
return "ROUTINE_LOAD";
return ProfileManager.LOAD_TYPE_ROUTINE_LOAD;
case STREAM_LOAD:
return "STREAM_LOAD";
return ProfileManager.LOAD_TYPE_STREAM_LOAD;
case PARALLEL_STREAM_LOAD:
return "PARALLEL_STREAM_LOAD";
default:
Expand Down Expand Up @@ -1507,7 +1511,9 @@ public TLoadInfo toThrift() {
} else {
info.setProgress("0%");
}

if (ProfileManager.getInstance().hasProfile(DebugUtil.printId(loadId))) {
info.setProfile_id(DebugUtil.printId(loadId));
}
// tracking url
if (trackingUrl != null) {
info.setUrl(trackingUrl);
Expand Down
Loading

0 comments on commit 2a9e335

Please sign in to comment.