Skip to content

Commit

Permalink
Migrate the alter table related logic scattered everywhere to AlterJo…
Browse files Browse the repository at this point in the history
…bExecutor

Signed-off-by: HangyuanLiu <[email protected]>
  • Loading branch information
HangyuanLiu committed Jul 23, 2024
1 parent b87f84a commit eabf78f
Show file tree
Hide file tree
Showing 24 changed files with 307 additions and 330 deletions.
327 changes: 210 additions & 117 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java

Large diffs are not rendered by default.

117 changes: 2 additions & 115 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.google.common.collect.Sets;
import com.starrocks.analysis.DateLiteral;
import com.starrocks.analysis.TableName;
import com.starrocks.analysis.TableRef;
import com.starrocks.authentication.AuthenticationMgr;
import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.ColocateTableIndex;
Expand Down Expand Up @@ -72,10 +71,8 @@
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.UserException;
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.DynamicPartitionUtil;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.Util;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.persist.AlterMaterializedViewBaseTableInfosLog;
Expand All @@ -102,35 +99,27 @@
import com.starrocks.sql.analyzer.Analyzer;
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.AddPartitionClause;
import com.starrocks.sql.ast.AlterClause;
import com.starrocks.sql.ast.AlterMaterializedViewStatusClause;
import com.starrocks.sql.ast.AlterSystemStmt;
import com.starrocks.sql.ast.AlterTableCommentClause;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.sql.ast.ColumnRenameClause;
import com.starrocks.sql.ast.CompactionClause;
import com.starrocks.sql.ast.CreateMaterializedViewStatement;
import com.starrocks.sql.ast.CreateMaterializedViewStmt;
import com.starrocks.sql.ast.DropMaterializedViewStmt;
import com.starrocks.sql.ast.DropPartitionClause;
import com.starrocks.sql.ast.ModifyPartitionClause;
import com.starrocks.sql.ast.ModifyTablePropertiesClause;
import com.starrocks.sql.ast.PartitionRenameClause;
import com.starrocks.sql.ast.QueryStatement;
import com.starrocks.sql.ast.ReplacePartitionClause;
import com.starrocks.sql.ast.RollupRenameClause;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.ast.TableRenameClause;
import com.starrocks.sql.ast.TruncatePartitionClause;
import com.starrocks.sql.ast.TruncateTableStmt;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.thrift.TStorageMedium;
import com.starrocks.thrift.TTabletMetaType;
import com.starrocks.thrift.TTabletType;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
Expand All @@ -156,7 +145,6 @@ public class AlterJobMgr {
private final SchemaChangeHandler schemaChangeHandler;
private final MaterializedViewHandler materializedViewHandler;
private final SystemHandler clusterHandler;
private final CompactionHandler compactionHandler;

public AlterJobMgr(SchemaChangeHandler schemaChangeHandler,
MaterializedViewHandler materializedViewHandler,
Expand All @@ -165,7 +153,6 @@ public AlterJobMgr(SchemaChangeHandler schemaChangeHandler,
this.schemaChangeHandler = schemaChangeHandler;
this.materializedViewHandler = materializedViewHandler;
this.clusterHandler = systemHandler;
this.compactionHandler = compactionHandler;
}

public void start() {
Expand Down Expand Up @@ -535,11 +522,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
if (table == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_TABLE_ERROR, tableName);
}
OlapTable olapTable = (OlapTable) table;

// some operations will take long time to process, need to be done outside the databse lock
boolean needProcessOutsideDatabaseLock = false;
boolean isSynchronous = true;

// some operations will take long time to process, need to be done outside the databse lock
boolean needProcessOutsideDatabaseLock = false;
Expand All @@ -548,6 +531,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
if (!(table.isOlapOrCloudNativeTable() || table.isMaterializedView())) {
throw new DdlException("Do not support alter non-native table/materialized-view[" + tableName + "]");
}
OlapTable olapTable = (OlapTable) table;

List<AlterClause> alterClauses = stmt.getAlterClauseList();
Locker locker = new Locker();
Expand All @@ -567,66 +551,8 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
} else if (stmt.hasRollupOp()) {
materializedViewHandler.process(alterClauses, db, olapTable);
isSynchronous = false;
} else if (stmt.hasPartitionOp()) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
if (alterClause instanceof DropPartitionClause) {
DropPartitionClause dropPartitionClause = (DropPartitionClause) alterClause;
if (!dropPartitionClause.isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(tableName));
}
if (dropPartitionClause.getPartitionName() != null && dropPartitionClause.getPartitionName()
.startsWith(ExpressionRangePartitionInfo.SHADOW_PARTITION_PREFIX)) {
throw new DdlException("Deletion of shadow partitions is not allowed");
}
List<String> partitionNames = dropPartitionClause.getPartitionNames();
if (CollectionUtils.isNotEmpty(partitionNames)) {
boolean hasShadowPartition = partitionNames.stream()
.anyMatch(partitionName -> partitionName.startsWith(
ExpressionRangePartitionInfo.SHADOW_PARTITION_PREFIX));
if (hasShadowPartition) {
throw new DdlException("Deletion of shadow partitions is not allowed");
}
}
GlobalStateMgr.getCurrentState().getLocalMetastore().dropPartition(db, olapTable, dropPartitionClause);
} else if (alterClause instanceof ReplacePartitionClause) {
ReplacePartitionClause replacePartitionClause = (ReplacePartitionClause) alterClause;
List<String> partitionNames = replacePartitionClause.getPartitionNames();
for (String partitionName : partitionNames) {
if (partitionName.startsWith(ExpressionRangePartitionInfo.SHADOW_PARTITION_PREFIX)) {
throw new DdlException("Replace shadow partitions is not allowed");
}
}
GlobalStateMgr.getCurrentState().getLocalMetastore()
.replaceTempPartition(db, tableName, replacePartitionClause);
} else if (alterClause instanceof ModifyPartitionClause) {
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
// expand the partition names if it is 'Modify Partition(*)'
if (clause.isNeedExpand()) {
List<String> partitionNames = clause.getPartitionNames();
partitionNames.clear();
for (Partition partition : olapTable.getPartitions()) {
partitionNames.add(partition.getName());
}
}
Map<String, String> properties = clause.getProperties();
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
needProcessOutsideDatabaseLock = true;
} else {
List<String> partitionNames = clause.getPartitionNames();
modifyPartitionsProperty(db, olapTable, partitionNames, properties);
}
} else if (alterClause instanceof AddPartitionClause) {
needProcessOutsideDatabaseLock = true;
} else {
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
}
} else if (stmt.contains(AlterOpType.TRUNCATE_PARTITION)) {
needProcessOutsideDatabaseLock = true;
} else if (stmt.contains(AlterOpType.RENAME)) {
processRename(db, olapTable, alterClauses);
} else if (stmt.contains(AlterOpType.SWAP)) {
new AlterJobExecutor().process(stmt, ConnectContext.get());
} else if (stmt.contains(AlterOpType.ALTER_COMMENT)) {
processAlterComment(db, olapTable, alterClauses);
} else if (stmt.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) {
Expand All @@ -644,42 +570,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
if (needProcessOutsideDatabaseLock) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
if (alterClause instanceof AddPartitionClause) {
if (!((AddPartitionClause) alterClause).isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(tableName));
}
GlobalStateMgr.getCurrentState().getLocalMetastore()
.addPartitions(Util.getOrCreateConnectContext(), db, tableName, (AddPartitionClause) alterClause);
} else if (alterClause instanceof TruncatePartitionClause) {
// This logic is used to adapt mysql syntax.
// ALTER TABLE test TRUNCATE PARTITION p1;
TruncatePartitionClause clause = (TruncatePartitionClause) alterClause;
TableRef tableRef = new TableRef(stmt.getTbl(), null, clause.getPartitionNames());
TruncateTableStmt tStmt = new TruncateTableStmt(tableRef);
ConnectContext ctx = new ConnectContext();
ctx.setGlobalStateMgr(GlobalStateMgr.getCurrentState());
GlobalStateMgr.getCurrentState().getLocalMetastore().truncateTable(tStmt, ctx);
} else if (alterClause instanceof ModifyPartitionClause) {
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
Map<String, String> properties = clause.getProperties();
List<String> partitionNames = clause.getPartitionNames();
// currently, only in memory property could reach here
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY));
olapTable = (OlapTable) db.getTable(tableName);
if (olapTable.isCloudNativeTable()) {
throw new DdlException("Lake table not support alter in_memory");
}

schemaChangeHandler.updatePartitionsInMemoryMeta(
db, tableName, partitionNames, properties);

locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.WRITE);
try {
modifyPartitionsProperty(db, olapTable, partitionNames, properties);
} finally {
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.WRITE);
}
} else if (alterClause instanceof ModifyTablePropertiesClause) {
if (alterClause instanceof ModifyTablePropertiesClause) {
Map<String, String> properties = alterClause.getProperties();
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_PERSISTENT_INDEX) ||
Expand Down Expand Up @@ -726,10 +617,6 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
} else {
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
}
} else if (alterClause instanceof CompactionClause) {
String s = (((CompactionClause) alterClause).isBaseCompaction() ? "base" : "cumulative")
+ " compact " + tableName + " partitions: " + ((CompactionClause) alterClause).getPartitionNames();
compactionHandler.process(alterClauses, db, olapTable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.DdlException;
import com.starrocks.common.UserException;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
Expand All @@ -33,14 +32,12 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.ast.AlterClause;
import com.starrocks.sql.ast.CancelStmt;
import com.starrocks.sql.ast.CompactionClause;
import com.starrocks.task.AgentBatchTask;
import com.starrocks.task.AgentTask;
import com.starrocks.task.AgentTaskExecutor;
import com.starrocks.task.AgentTaskQueue;
import com.starrocks.task.CompactionTask;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.util.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -50,28 +47,12 @@
import java.util.ArrayList;
import java.util.List;

public class CompactionHandler extends AlterHandler {
public class CompactionHandler {
private static final Logger LOG = LogManager.getLogger(CompactionHandler.class);

public CompactionHandler() {
super("compaction");
}


@Override
protected void runAfterCatalogReady() {
super.runAfterCatalogReady();
}

@Override
public List<List<Comparable>> getAlterJobInfosByDb(Database db) {
throw new NotImplementedException();
}

@Override
// add synchronized to avoid process 2 or more stmts at same time
public synchronized ShowResultSet process(List<AlterClause> alterClauses, Database db,
OlapTable olapTable) throws UserException {
public static synchronized ShowResultSet process(List<AlterClause> alterClauses, Database db,
OlapTable olapTable) throws UserException {
Preconditions.checkArgument(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
Preconditions.checkState(alterClause instanceof CompactionClause);
Expand Down Expand Up @@ -142,7 +123,7 @@ public synchronized ShowResultSet process(List<AlterClause> alterClauses, Databa
}

@NotNull
private List<Partition> findAllPartitions(OlapTable olapTable, CompactionClause compactionClause) {
private static List<Partition> findAllPartitions(OlapTable olapTable, CompactionClause compactionClause) {
List<Partition> allPartitions = new ArrayList<>();
if (compactionClause.getPartitionNames().isEmpty()) {
allPartitions.addAll(olapTable.getPartitions());
Expand All @@ -159,10 +140,4 @@ private List<Partition> findAllPartitions(OlapTable olapTable, CompactionClause
}
return allPartitions;
}

@Override
public synchronized void cancel(CancelStmt stmt) throws DdlException {
throw new NotImplementedException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ public void handleNewListPartitionDescs(Map<ColumnId, Column> idToColumn,
long partitionId = partition.getId();
PartitionDesc partitionDesc = entry.second;
Preconditions.checkArgument(partitionDesc instanceof SinglePartitionDesc);
Preconditions.checkArgument(((SinglePartitionDesc) partitionDesc).isAnalyzed());
this.idToDataProperty.put(partitionId, partitionDesc.getPartitionDataProperty());
this.idToReplicationNum.put(partitionId, partitionDesc.getReplicationNum());
this.idToInMemory.put(partitionId, partitionDesc.isInMemory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ private Range<PartitionKey> checkNewRange(List<Column> partitionColumns, Partiti

public Range<PartitionKey> handleNewSinglePartitionDesc(Map<ColumnId, Column> schema, SingleRangePartitionDesc desc,
long partitionId, boolean isTemp) throws DdlException {
Preconditions.checkArgument(desc.isAnalyzed());
Range<PartitionKey> range;
try {
range = checkAndCreateRange(schema, desc, isTemp);
Expand Down Expand Up @@ -277,7 +276,6 @@ public void handleNewRangePartitionDescs(Map<ColumnId, Column> schema,
if (!existPartitionNameSet.contains(partition.getName())) {
long partitionId = partition.getId();
SingleRangePartitionDesc desc = (SingleRangePartitionDesc) entry.second;
Preconditions.checkArgument(desc.isAnalyzed());
Range<PartitionKey> range;
try {
range = checkAndCreateRange(schema, (SingleRangePartitionDesc) entry.second, isTemp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,12 @@ public static Map<String, String> analyzeDynamicPartition(Map<String, String> pr
return analyzedProperties;
}

public static void checkAlterAllowed(OlapTable olapTable) throws DdlException {
public static void checkAlterAllowed(OlapTable olapTable) {
TableProperty tableProperty = olapTable.getTableProperty();
if (tableProperty != null && tableProperty.getDynamicPartitionProperty() != null &&
tableProperty.getDynamicPartitionProperty().isExists() &&
tableProperty.getDynamicPartitionProperty().isEnabled()) {
throw new DdlException("Cannot add/drop partition on a Dynamic Partition Table, " +
throw new SemanticException("Cannot add/drop partition on a Dynamic Partition Table, " +
"Use command `ALTER TABLE tbl_name SET (\"dynamic_partition.enable\" = \"false\")` firstly.");
}
}
Expand Down
Loading

0 comments on commit eabf78f

Please sign in to comment.