Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Migrate the alter table related logic scattered everywhere to AlterJobExecutor #48637

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 198 additions & 117 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java

Large diffs are not rendered by default.

113 changes: 2 additions & 111 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.google.common.collect.Sets;
import com.starrocks.analysis.DateLiteral;
import com.starrocks.analysis.TableName;
import com.starrocks.analysis.TableRef;
import com.starrocks.authentication.AuthenticationMgr;
import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.ColocateTableIndex;
Expand Down Expand Up @@ -72,10 +71,8 @@
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.UserException;
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.DynamicPartitionUtil;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.Util;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.persist.AlterMaterializedViewBaseTableInfosLog;
Expand All @@ -102,35 +99,27 @@
import com.starrocks.sql.analyzer.Analyzer;
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.AddPartitionClause;
import com.starrocks.sql.ast.AlterClause;
import com.starrocks.sql.ast.AlterMaterializedViewStatusClause;
import com.starrocks.sql.ast.AlterSystemStmt;
import com.starrocks.sql.ast.AlterTableCommentClause;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.sql.ast.ColumnRenameClause;
import com.starrocks.sql.ast.CompactionClause;
import com.starrocks.sql.ast.CreateMaterializedViewStatement;
import com.starrocks.sql.ast.CreateMaterializedViewStmt;
import com.starrocks.sql.ast.DropMaterializedViewStmt;
import com.starrocks.sql.ast.DropPartitionClause;
import com.starrocks.sql.ast.ModifyPartitionClause;
import com.starrocks.sql.ast.ModifyTablePropertiesClause;
import com.starrocks.sql.ast.PartitionRenameClause;
import com.starrocks.sql.ast.QueryStatement;
import com.starrocks.sql.ast.ReplacePartitionClause;
import com.starrocks.sql.ast.RollupRenameClause;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.ast.TableRenameClause;
import com.starrocks.sql.ast.TruncatePartitionClause;
import com.starrocks.sql.ast.TruncateTableStmt;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.thrift.TStorageMedium;
import com.starrocks.thrift.TTabletMetaType;
import com.starrocks.thrift.TTabletType;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
Expand All @@ -156,7 +145,6 @@ public class AlterJobMgr {
private final SchemaChangeHandler schemaChangeHandler;
private final MaterializedViewHandler materializedViewHandler;
private final SystemHandler clusterHandler;
private final CompactionHandler compactionHandler;

public AlterJobMgr(SchemaChangeHandler schemaChangeHandler,
MaterializedViewHandler materializedViewHandler,
Expand All @@ -165,7 +153,6 @@ public AlterJobMgr(SchemaChangeHandler schemaChangeHandler,
this.schemaChangeHandler = schemaChangeHandler;
this.materializedViewHandler = materializedViewHandler;
this.clusterHandler = systemHandler;
this.compactionHandler = compactionHandler;
}

public void start() {
Expand Down Expand Up @@ -536,6 +523,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_TABLE_ERROR, tableName);
}


// some operations will take long time to process, need to be done outside the databse lock
boolean needProcessOutsideDatabaseLock = false;
boolean isSynchronous = true;
Expand Down Expand Up @@ -563,66 +551,8 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
} else if (stmt.hasRollupOp()) {
materializedViewHandler.process(alterClauses, db, olapTable);
isSynchronous = false;
} else if (stmt.hasPartitionOp()) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
if (alterClause instanceof DropPartitionClause) {
DropPartitionClause dropPartitionClause = (DropPartitionClause) alterClause;
if (!dropPartitionClause.isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(tableName));
}
if (dropPartitionClause.getPartitionName() != null && dropPartitionClause.getPartitionName()
.startsWith(ExpressionRangePartitionInfo.SHADOW_PARTITION_PREFIX)) {
throw new DdlException("Deletion of shadow partitions is not allowed");
}
List<String> partitionNames = dropPartitionClause.getPartitionNames();
if (CollectionUtils.isNotEmpty(partitionNames)) {
boolean hasShadowPartition = partitionNames.stream()
.anyMatch(partitionName -> partitionName.startsWith(
ExpressionRangePartitionInfo.SHADOW_PARTITION_PREFIX));
if (hasShadowPartition) {
throw new DdlException("Deletion of shadow partitions is not allowed");
}
}
GlobalStateMgr.getCurrentState().getLocalMetastore().dropPartition(db, olapTable, dropPartitionClause);
} else if (alterClause instanceof ReplacePartitionClause) {
ReplacePartitionClause replacePartitionClause = (ReplacePartitionClause) alterClause;
List<String> partitionNames = replacePartitionClause.getPartitionNames();
for (String partitionName : partitionNames) {
if (partitionName.startsWith(ExpressionRangePartitionInfo.SHADOW_PARTITION_PREFIX)) {
throw new DdlException("Replace shadow partitions is not allowed");
}
}
GlobalStateMgr.getCurrentState().getLocalMetastore()
.replaceTempPartition(db, tableName, replacePartitionClause);
} else if (alterClause instanceof ModifyPartitionClause) {
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
// expand the partition names if it is 'Modify Partition(*)'
if (clause.isNeedExpand()) {
List<String> partitionNames = clause.getPartitionNames();
partitionNames.clear();
for (Partition partition : olapTable.getPartitions()) {
partitionNames.add(partition.getName());
}
}
Map<String, String> properties = clause.getProperties();
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
needProcessOutsideDatabaseLock = true;
} else {
List<String> partitionNames = clause.getPartitionNames();
modifyPartitionsProperty(db, olapTable, partitionNames, properties);
}
} else if (alterClause instanceof AddPartitionClause) {
needProcessOutsideDatabaseLock = true;
} else {
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
}
} else if (stmt.contains(AlterOpType.TRUNCATE_PARTITION)) {
needProcessOutsideDatabaseLock = true;
} else if (stmt.contains(AlterOpType.RENAME)) {
processRename(db, olapTable, alterClauses);
} else if (stmt.contains(AlterOpType.SWAP)) {
new AlterJobExecutor().process(stmt, ConnectContext.get());
} else if (stmt.contains(AlterOpType.ALTER_COMMENT)) {
processAlterComment(db, olapTable, alterClauses);
} else if (stmt.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) {
Expand All @@ -640,42 +570,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
if (needProcessOutsideDatabaseLock) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
if (alterClause instanceof AddPartitionClause) {
if (!((AddPartitionClause) alterClause).isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(tableName));
}
GlobalStateMgr.getCurrentState().getLocalMetastore()
.addPartitions(Util.getOrCreateConnectContext(), db, tableName, (AddPartitionClause) alterClause);
} else if (alterClause instanceof TruncatePartitionClause) {
// This logic is used to adapt mysql syntax.
// ALTER TABLE test TRUNCATE PARTITION p1;
TruncatePartitionClause clause = (TruncatePartitionClause) alterClause;
TableRef tableRef = new TableRef(stmt.getTbl(), null, clause.getPartitionNames());
TruncateTableStmt tStmt = new TruncateTableStmt(tableRef);
ConnectContext ctx = new ConnectContext();
ctx.setGlobalStateMgr(GlobalStateMgr.getCurrentState());
GlobalStateMgr.getCurrentState().getLocalMetastore().truncateTable(tStmt, ctx);
} else if (alterClause instanceof ModifyPartitionClause) {
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
Map<String, String> properties = clause.getProperties();
List<String> partitionNames = clause.getPartitionNames();
// currently, only in memory property could reach here
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY));
olapTable = (OlapTable) db.getTable(tableName);
if (olapTable.isCloudNativeTable()) {
throw new DdlException("Lake table not support alter in_memory");
}

schemaChangeHandler.updatePartitionsInMemoryMeta(
db, tableName, partitionNames, properties);

locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.WRITE);
try {
modifyPartitionsProperty(db, olapTable, partitionNames, properties);
} finally {
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.WRITE);
}
} else if (alterClause instanceof ModifyTablePropertiesClause) {
if (alterClause instanceof ModifyTablePropertiesClause) {
Map<String, String> properties = alterClause.getProperties();
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_PERSISTENT_INDEX) ||
Expand Down Expand Up @@ -722,10 +617,6 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
} else {
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
}
} else if (alterClause instanceof CompactionClause) {
String s = (((CompactionClause) alterClause).isBaseCompaction() ? "base" : "cumulative")
+ " compact " + tableName + " partitions: " + ((CompactionClause) alterClause).getPartitionNames();
compactionHandler.process(alterClauses, db, olapTable);
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you like to know or talk about today?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.DdlException;
import com.starrocks.common.UserException;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
Expand All @@ -33,14 +32,12 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.ast.AlterClause;
import com.starrocks.sql.ast.CancelStmt;
import com.starrocks.sql.ast.CompactionClause;
import com.starrocks.task.AgentBatchTask;
import com.starrocks.task.AgentTask;
import com.starrocks.task.AgentTaskExecutor;
import com.starrocks.task.AgentTaskQueue;
import com.starrocks.task.CompactionTask;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.util.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -50,28 +47,12 @@
import java.util.ArrayList;
import java.util.List;

public class CompactionHandler extends AlterHandler {
public class CompactionHandler {
private static final Logger LOG = LogManager.getLogger(CompactionHandler.class);

public CompactionHandler() {
super("compaction");
}


@Override
protected void runAfterCatalogReady() {
super.runAfterCatalogReady();
}

@Override
public List<List<Comparable>> getAlterJobInfosByDb(Database db) {
throw new NotImplementedException();
}

@Override
// add synchronized to avoid process 2 or more stmts at same time
public synchronized ShowResultSet process(List<AlterClause> alterClauses, Database db,
OlapTable olapTable) throws UserException {
public static synchronized ShowResultSet process(List<AlterClause> alterClauses, Database db,
OlapTable olapTable) throws UserException {
Preconditions.checkArgument(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
Preconditions.checkState(alterClause instanceof CompactionClause);
Expand Down Expand Up @@ -142,7 +123,7 @@ public synchronized ShowResultSet process(List<AlterClause> alterClauses, Databa
}

@NotNull
private List<Partition> findAllPartitions(OlapTable olapTable, CompactionClause compactionClause) {
private static List<Partition> findAllPartitions(OlapTable olapTable, CompactionClause compactionClause) {
List<Partition> allPartitions = new ArrayList<>();
if (compactionClause.getPartitionNames().isEmpty()) {
allPartitions.addAll(olapTable.getPartitions());
Expand All @@ -159,10 +140,4 @@ private List<Partition> findAllPartitions(OlapTable olapTable, CompactionClause
}
return allPartitions;
}

@Override
public synchronized void cancel(CancelStmt stmt) throws DdlException {
throw new NotImplementedException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ public void handleNewListPartitionDescs(Map<ColumnId, Column> idToColumn,
long partitionId = partition.getId();
PartitionDesc partitionDesc = entry.second;
Preconditions.checkArgument(partitionDesc instanceof SinglePartitionDesc);
Preconditions.checkArgument(((SinglePartitionDesc) partitionDesc).isAnalyzed());
this.idToDataProperty.put(partitionId, partitionDesc.getPartitionDataProperty());
this.idToReplicationNum.put(partitionId, partitionDesc.getReplicationNum());
this.idToInMemory.put(partitionId, partitionDesc.isInMemory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ private Range<PartitionKey> checkNewRange(List<Column> partitionColumns, Partiti

public Range<PartitionKey> handleNewSinglePartitionDesc(Map<ColumnId, Column> schema, SingleRangePartitionDesc desc,
long partitionId, boolean isTemp) throws DdlException {
Preconditions.checkArgument(desc.isAnalyzed());
Range<PartitionKey> range;
try {
range = checkAndCreateRange(schema, desc, isTemp);
Expand Down Expand Up @@ -277,7 +276,6 @@ public void handleNewRangePartitionDescs(Map<ColumnId, Column> schema,
if (!existPartitionNameSet.contains(partition.getName())) {
long partitionId = partition.getId();
SingleRangePartitionDesc desc = (SingleRangePartitionDesc) entry.second;
Preconditions.checkArgument(desc.isAnalyzed());
Range<PartitionKey> range;
try {
range = checkAndCreateRange(schema, (SingleRangePartitionDesc) entry.second, isTemp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,12 @@ public static Map<String, String> analyzeDynamicPartition(Map<String, String> pr
return analyzedProperties;
}

public static void checkAlterAllowed(OlapTable olapTable) throws DdlException {
public static void checkAlterAllowed(OlapTable olapTable) {
TableProperty tableProperty = olapTable.getTableProperty();
if (tableProperty != null && tableProperty.getDynamicPartitionProperty() != null &&
tableProperty.getDynamicPartitionProperty().isExists() &&
tableProperty.getDynamicPartitionProperty().isEnabled()) {
throw new DdlException("Cannot add/drop partition on a Dynamic Partition Table, " +
throw new SemanticException("Cannot add/drop partition on a Dynamic Partition Table, " +
"Use command `ALTER TABLE tbl_name SET (\"dynamic_partition.enable\" = \"false\")` firstly.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ public void abortSink(String dbName, String table, List<TSinkCommitInfo> commitI
}

@Override
public void alterTable(AlterTableStmt stmt) throws UserException {
normal.alterTable(stmt);
public void alterTable(ConnectContext context, AlterTableStmt stmt) throws UserException {
normal.alterTable(context, stmt);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ default void finishSink(String dbName, String table, List<TSinkCommitInfo> commi
default void abortSink(String dbName, String table, List<TSinkCommitInfo> commitInfos) {
}

default void alterTable(AlterTableStmt stmt) throws UserException {
default void alterTable(ConnectContext context, AlterTableStmt stmt) throws UserException {
throw new StarRocksConnectorException("This connector doesn't support alter table");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public CloudConfiguration getCloudConfiguration() {
}

@Override
public void alterTable(AlterTableStmt stmt) throws UserException {
public void alterTable(ConnectContext context, AlterTableStmt stmt) throws UserException {
// (FIXME) add this api just for tests of external table
List<AlterClause> alterClauses = stmt.getAlterClauseList();
for (AlterClause alterClause : alterClauses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void createView(CreateViewStmt stmt) throws DdlException {
}

@Override
public void alterTable(AlterTableStmt stmt) throws UserException {
public void alterTable(ConnectContext context, AlterTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
org.apache.iceberg.Table table = icebergCatalog.getTable(dbName, tableName);
Expand Down
Loading
Loading