Skip to content

Commit

Permalink
[opt](Nereids) lock table in ascending order of table IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
morrySnow committed Dec 10, 2024
1 parent 71f96e6 commit c2cfd55
Show file tree
Hide file tree
Showing 51 changed files with 821 additions and 1,085 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1054,10 +1054,7 @@ public boolean equals(Object obj) {
&& isKey == other.isKey
&& isAllowNull == other.isAllowNull
&& isAutoInc == other.isAutoInc
&& getDataType().equals(other.getDataType())
&& getStrLen() == other.getStrLen()
&& getPrecision() == other.getPrecision()
&& getScale() == other.getScale()
&& Objects.equals(type, other.type)
&& Objects.equals(comment, other.comment)
&& visible == other.visible
&& Objects.equals(children, other.children)
Expand Down
8 changes: 4 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
// to connection issues such as S3, so it is directly set to null
if (!isReplay) {
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, true);
}
} catch (Throwable e) {
mtmvCache = null;
Expand Down Expand Up @@ -323,7 +323,7 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws Ana
MTMVCache mtmvCache;
try {
// Should new context with ADMIN user
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, false);
} finally {
connectionContext.setThreadLocalInfo();
}
Expand Down Expand Up @@ -362,7 +362,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
*
* @return mvPartitionName ==> mvPartitionKeyDesc
*/
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() throws AnalysisException {
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
Expand Down Expand Up @@ -392,7 +392,7 @@ public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartit
Map<String, String> baseToMv = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItemsWithoutLock();
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
Set<String> basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
Sets.newHashSet());
Expand Down
17 changes: 2 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -3316,23 +3315,11 @@ public PartitionType getPartitionType() {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
throws AnalysisException {
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return getAndCopyPartitionItems();
}

public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException {
if (!tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName());
}
try {
return getAndCopyPartitionItemsWithoutLock();
} finally {
readUnlock();
}
}

public Map<String, PartitionItem> getAndCopyPartitionItemsWithoutLock() throws AnalysisException {
public Map<String, PartitionItem> getAndCopyPartitionItems() {
Map<String, PartitionItem> res = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : getPartitionInfo().getIdToItem(false).entrySet()) {
Partition partition = idToPartition.get(entry.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
Env env = connectContext.getEnv();

if (!tryLockTables(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
}

// check table and view and their columns authority
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
Expand Down Expand Up @@ -378,16 +382,38 @@ private boolean dataMaskPoliciesChanged(
return false;
}

private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
/**
* Execute table locking operations in ascending order of table IDs.
*
* @return true if obtain all tables lock.
*/
private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
StatementContext currentStatementContext = connectContext.getStatementContext();
for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
}
currentStatementContext.getTables().put(fullTableName.toList(), tableIf);
}
for (FullTableName fullTableName : sqlCacheContext.getUsedViews().keySet()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
}
currentStatementContext.getTables().put(fullTableName.toList(), tableIf);
}
currentStatementContext.lock();
return true;
}

private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
for (Entry<FullTableName, Set<String>> kv : sqlCacheContext.getCheckPrivilegeTablesOrViews().entrySet()) {
Set<String> usedColumns = kv.getValue();
TableIf tableIf = findTableIf(env, kv.getKey());
if (tableIf == null) {
return true;
}
// release when close statementContext
currentStatementContext.addTableReadLock(tableIf);
try {
UserAuthentication.checkPermission(tableIf, connectContext, usedColumns);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@

package org.apache.doris.common.lock;

import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.ConnectContext;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* A monitored version of ReentrantReadWriteLock that provides additional
* monitoring capabilities for read and write locks.
*/
public class MonitoredReentrantReadWriteLock extends ReentrantReadWriteLock {

private static final Logger LOG = LogManager.getLogger(MonitoredReentrantReadWriteLock.class);
// Monitored read and write lock instances
private final ReadLock readLock = new ReadLock(this);
private final WriteLock writeLock = new WriteLock(this);
Expand Down Expand Up @@ -60,6 +68,11 @@ protected ReadLock(ReentrantReadWriteLock lock) {
public void lock() {
super.lock();
monitor.afterLock();
if (getReadHoldCount() + getWriteHoldCount() > 1) {
LOG.warn(" read lock count is {}, write lock count is {}, stack is {}, query id is {}",
getReadHoldCount(), getWriteHoldCount(), Thread.currentThread().getStackTrace(),
ConnectContext.get() == null ? "" : DebugUtil.printId(ConnectContext.get().queryId()));
}
}

/**
Expand Down Expand Up @@ -97,6 +110,11 @@ protected WriteLock(ReentrantReadWriteLock lock) {
public void lock() {
super.lock();
monitor.afterLock();
if (getReadHoldCount() + getWriteHoldCount() > 1) {
LOG.warn(" read lock count is {}, write lock count is {}, stack is {}, query id is {}",
getReadHoldCount(), getWriteHoldCount(), Thread.currentThread().getStackTrace(),
ConnectContext.get() == null ? "" : DebugUtil.printId(ConnectContext.get().queryId()));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ private List<Pair<List<Comparable>, TRow>> getPartitionInfosInrernal() throws An
List<Pair<List<Comparable>, TRow>> partitionInfos = new ArrayList<Pair<List<Comparable>, TRow>>();
Map<Long, List<String>> partitionsUnSyncTables = null;
String mtmvPartitionSyncErrorMsg = null;
olapTable.readLock();
if (olapTable instanceof MTMV) {
try {
partitionsUnSyncTables = MTMVPartitionUtil
Expand All @@ -251,7 +252,6 @@ private List<Pair<List<Comparable>, TRow>> getPartitionInfosInrernal() throws An
mtmvPartitionSyncErrorMsg = e.getMessage();
}
}
olapTable.readLock();
try {
List<Long> partitionIds;
PartitionInfo tblPartitionInfo = olapTable.getPartitionInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class SummaryProfile {
public static final String GET_TABLE_VERSION_COUNT = "Get Table Version Count";

public static final String PARSE_SQL_TIME = "Parse SQL Time";
public static final String NEREIDS_LOCK_TABLE_TIME = "Nereids Lock Table Time";
public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";
public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
Expand Down Expand Up @@ -136,6 +137,7 @@ public class SummaryProfile {
// The display order of execution summary items.
public static final ImmutableList<String> EXECUTION_SUMMARY_KEYS = ImmutableList.of(
PARSE_SQL_TIME,
NEREIDS_LOCK_TABLE_TIME,
NEREIDS_ANALYSIS_TIME,
NEREIDS_REWRITE_TIME,
NEREIDS_OPTIMIZE_TIME,
Expand Down Expand Up @@ -224,6 +226,8 @@ public class SummaryProfile {
private long parseSqlStartTime = -1;
@SerializedName(value = "parseSqlFinishTime")
private long parseSqlFinishTime = -1;
@SerializedName(value = "nereidsLockTableFinishTime")
private long nereidsLockTableFinishTime = -1;
@SerializedName(value = "nereidsAnalysisFinishTime")
private long nereidsAnalysisFinishTime = -1;
@SerializedName(value = "nereidsRewriteFinishTime")
Expand Down Expand Up @@ -410,6 +414,7 @@ private void updateSummaryProfile(Map<String, String> infos) {

private void updateExecutionSummaryProfile() {
executionSummaryProfile.addInfoString(PARSE_SQL_TIME, getPrettyParseSqlTime());
executionSummaryProfile.addInfoString(NEREIDS_LOCK_TABLE_TIME, getPrettyNereidsLockTableTime());
executionSummaryProfile.addInfoString(NEREIDS_ANALYSIS_TIME, getPrettyNereidsAnalysisTime());
executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, getPrettyNereidsRewriteTime());
executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, getPrettyNereidsOptimizeTime());
Expand Down Expand Up @@ -506,6 +511,10 @@ public void setParseSqlFinishTime(long parseSqlFinishTime) {
this.parseSqlFinishTime = parseSqlFinishTime;
}

public void setNereidsLockTableFinishTime() {
this.nereidsLockTableFinishTime = TimeUtils.getStartTimeMs();
}

public void setNereidsAnalysisTime() {
this.nereidsAnalysisFinishTime = TimeUtils.getStartTimeMs();
}
Expand Down Expand Up @@ -766,8 +775,12 @@ public String getPrettyParseSqlTime() {
return getPrettyTime(parseSqlFinishTime, parseSqlStartTime, TUnit.TIME_MS);
}

public String getPrettyNereidsLockTableTime() {
return getPrettyTime(nereidsLockTableFinishTime, parseSqlStartTime, TUnit.TIME_MS);
}

public String getPrettyNereidsAnalysisTime() {
return getPrettyTime(nereidsAnalysisFinishTime, queryBeginTime, TUnit.TIME_MS);
return getPrettyTime(nereidsAnalysisFinishTime, nereidsLockTableFinishTime, TUnit.TIME_MS);
}

public String getPrettyNereidsRewriteTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.exception.JobException;
Expand Down Expand Up @@ -68,6 +69,7 @@

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -173,13 +175,24 @@ public void run() throws JobException {
}
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
// such as deleting a table and creating a view with the same name
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
Set<TableIf> tablesInPlan = MTMVPlanUtil.getBaseTableFromQuery(mtmv.getQuerySql(), ctx);
this.relation = MTMVPlanUtil.generateMTMVRelation(tablesInPlan, ctx);
beforeMTMVRefresh();
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVPartitionUtil.alignMvPartition(mtmv);
List<TableIf> tableIfs = Lists.newArrayList(tablesInPlan);
tableIfs.sort(Comparator.comparing(TableIf::getId));

MTMVRefreshContext context;
// lock table order by id to avoid deadlock
MetaLockUtils.readLockTables(tableIfs);
try {
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVPartitionUtil.alignMvPartition(mtmv);
}
context = MTMVRefreshContext.buildContext(mtmv);
this.needRefreshPartitions = calculateNeedRefreshPartitions(context);
} finally {
MetaLockUtils.readUnlockTables(tableIfs);
}
MTMVRefreshContext context = MTMVRefreshContext.buildContext(mtmv);
this.needRefreshPartitions = calculateNeedRefreshPartitions(context);
this.refreshMode = generateRefreshMode(needRefreshPartitions);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
Expand All @@ -194,7 +207,7 @@ public void run() throws JobException {
int start = i * refreshPartitionNum;
int end = start + refreshPartitionNum;
Set<String> execPartitionNames = Sets.newHashSet(needRefreshPartitions
.subList(start, end > needRefreshPartitions.size() ? needRefreshPartitions.size() : end));
.subList(start, Math.min(end, needRefreshPartitions.size())));
// need get names before exec
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil
.generatePartitionSnapshots(context, relation.getBaseTablesOneLevel(), execPartitionNames);
Expand All @@ -204,7 +217,7 @@ public void run() throws JobException {
}
} catch (Throwable e) {
if (getStatus() == TaskStatus.RUNNING) {
LOG.warn("run task failed: ", e.getMessage());
LOG.warn("run task failed: {}", e.getMessage());
throw new JobException(e.getMessage(), e);
} else {
// if status is not `RUNNING`,maybe the task was canceled, therefore, it is a normal situation
Expand Down
30 changes: 19 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,31 @@ public StructInfo getStructInfo() {
return structInfo;
}

public static MTMVCache from(MTMV mtmv, ConnectContext connectContext, boolean needCost) {
public static MTMVCache from(MTMV mtmv, ConnectContext connectContext, boolean needCost, boolean needLock) {
StatementContext mvSqlStatementContext = new StatementContext(connectContext,
new OriginStatement(mtmv.getQuerySql(), 0));
if (needLock) {
mvSqlStatementContext.setNeedLockTables(false);
}
if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) {
mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext);
}
LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql());
NereidsPlanner planner = new NereidsPlanner(mvSqlStatementContext);

// Can not convert to table sink, because use the same column from different table when self join
// the out slot is wrong
if (needCost) {
// Only in mv rewrite, we need plan with eliminated cost which is used for mv chosen
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
} else {
// No need cost for performance
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
boolean originalRewriteFlag = connectContext.getSessionVariable().enableMaterializedViewRewrite;
connectContext.getSessionVariable().enableMaterializedViewRewrite = false;
try {
// Can not convert to table sink, because use the same column from different table when self join
// the out slot is wrong
if (needCost) {
// Only in mv rewrite, we need plan with eliminated cost which is used for mv chosen
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
} else {
// No need cost for performance
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
}
} finally {
connectContext.getSessionVariable().enableMaterializedViewRewrite = originalRewriteFlag;
}
Plan originPlan = planner.getCascadesContext().getRewritePlan();
// Eliminate result sink because sink operator is useless in query rewrite by materialized view
Expand All @@ -128,6 +136,6 @@ public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResu
new BitSet());
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(), needCost
? planner.getCascadesContext().getMemo().getRoot().getStatistics() : null,
structInfoOptional.orElseGet(() -> null));
structInfoOptional.orElse(null));
}
}
Loading

0 comments on commit c2cfd55

Please sign in to comment.