Skip to content

Commit

Permalink
[Enhancement] Migrate the alter table related logic scattered everywh…
Browse files Browse the repository at this point in the history
…ere to AlterJobExecutor (#48637)

Signed-off-by: HangyuanLiu <[email protected]>
(cherry picked from commit b0ea263)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
#	fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveMetadata.java
#	fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableClauseAnalyzer.java
#	fe/fe-core/src/main/java/com/starrocks/sql/analyzer/AlterTableStatementAnalyzer.java
#	fe/fe-core/src/main/java/com/starrocks/sql/ast/AlterTableStmt.java
#	fe/fe-core/src/test/java/com/starrocks/alter/AlterTest.java
#	fe/fe-core/src/test/java/com/starrocks/connector/AlterTableTest.java
  • Loading branch information
HangyuanLiu authored and mergify[bot] committed Jul 23, 2024
1 parent ed07be4 commit c9db259
Show file tree
Hide file tree
Showing 36 changed files with 769 additions and 212 deletions.
238 changes: 204 additions & 34 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java

Large diffs are not rendered by default.

60 changes: 8 additions & 52 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.google.common.collect.Sets;
import com.starrocks.analysis.DateLiteral;
import com.starrocks.analysis.TableName;
import com.starrocks.analysis.TableRef;
import com.starrocks.authentication.AuthenticationMgr;
import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.ColocateTableIndex;
Expand Down Expand Up @@ -72,10 +71,8 @@
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.UserException;
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.DynamicPartitionUtil;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.Util;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.persist.AlterMaterializedViewBaseTableInfosLog;
Expand All @@ -102,28 +99,21 @@
import com.starrocks.sql.analyzer.Analyzer;
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.AddPartitionClause;
import com.starrocks.sql.ast.AlterClause;
import com.starrocks.sql.ast.AlterMaterializedViewStatusClause;
import com.starrocks.sql.ast.AlterSystemStmt;
import com.starrocks.sql.ast.AlterTableCommentClause;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.sql.ast.ColumnRenameClause;
import com.starrocks.sql.ast.CompactionClause;
import com.starrocks.sql.ast.CreateMaterializedViewStatement;
import com.starrocks.sql.ast.CreateMaterializedViewStmt;
import com.starrocks.sql.ast.DropMaterializedViewStmt;
import com.starrocks.sql.ast.DropPartitionClause;
import com.starrocks.sql.ast.ModifyPartitionClause;
import com.starrocks.sql.ast.ModifyTablePropertiesClause;
import com.starrocks.sql.ast.PartitionRenameClause;
import com.starrocks.sql.ast.QueryStatement;
import com.starrocks.sql.ast.ReplacePartitionClause;
import com.starrocks.sql.ast.RollupRenameClause;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.ast.TableRenameClause;
import com.starrocks.sql.ast.TruncatePartitionClause;
import com.starrocks.sql.ast.TruncateTableStmt;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import com.starrocks.sql.parser.SqlParser;
Expand Down Expand Up @@ -155,7 +145,6 @@ public class AlterJobMgr {
private final SchemaChangeHandler schemaChangeHandler;
private final MaterializedViewHandler materializedViewHandler;
private final SystemHandler clusterHandler;
private final CompactionHandler compactionHandler;

public AlterJobMgr(SchemaChangeHandler schemaChangeHandler,
MaterializedViewHandler materializedViewHandler,
Expand All @@ -164,7 +153,6 @@ public AlterJobMgr(SchemaChangeHandler schemaChangeHandler,
this.schemaChangeHandler = schemaChangeHandler;
this.materializedViewHandler = materializedViewHandler;
this.clusterHandler = systemHandler;
this.compactionHandler = compactionHandler;
}

public void start() {
Expand Down Expand Up @@ -542,6 +530,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
db.checkQuota();
}


// some operations will take long time to process, need to be done outside the databse lock
boolean needProcessOutsideDatabaseLock = false;
String tableName = dbTableName.getTbl();
Expand Down Expand Up @@ -575,6 +564,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
} else if (currentAlterOps.hasRollupOp()) {
materializedViewHandler.process(alterClauses, db, olapTable);
isSynchronous = false;
<<<<<<< HEAD
} else if (currentAlterOps.hasPartitionOp()) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
Expand Down Expand Up @@ -627,6 +617,11 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
} else if (currentAlterOps.hasSwapOp()) {
new AlterJobExecutor().process(stmt, ConnectContext.get());
} else if (currentAlterOps.hasAlterCommentOp()) {
=======
} else if (stmt.contains(AlterOpType.RENAME)) {
processRename(db, olapTable, alterClauses);
} else if (stmt.contains(AlterOpType.ALTER_COMMENT)) {
>>>>>>> b0ea263f81 ([Enhancement] Migrate the alter table related logic scattered everywhere to AlterJobExecutor (#48637))
processAlterComment(db, olapTable, alterClauses);
} else if (currentAlterOps.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) {
needProcessOutsideDatabaseLock = true;
Expand All @@ -643,42 +638,7 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
if (needProcessOutsideDatabaseLock) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
if (alterClause instanceof AddPartitionClause) {
if (!((AddPartitionClause) alterClause).isTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed((OlapTable) db.getTable(tableName));
}
GlobalStateMgr.getCurrentState().getLocalMetastore()
.addPartitions(Util.getOrCreateConnectContext(), db, tableName, (AddPartitionClause) alterClause);
} else if (alterClause instanceof TruncatePartitionClause) {
// This logic is used to adapt mysql syntax.
// ALTER TABLE test TRUNCATE PARTITION p1;
TruncatePartitionClause clause = (TruncatePartitionClause) alterClause;
TableRef tableRef = new TableRef(stmt.getTbl(), null, clause.getPartitionNames());
TruncateTableStmt tStmt = new TruncateTableStmt(tableRef);
ConnectContext ctx = new ConnectContext();
ctx.setGlobalStateMgr(GlobalStateMgr.getCurrentState());
GlobalStateMgr.getCurrentState().getLocalMetastore().truncateTable(tStmt, ctx);
} else if (alterClause instanceof ModifyPartitionClause) {
ModifyPartitionClause clause = ((ModifyPartitionClause) alterClause);
Map<String, String> properties = clause.getProperties();
List<String> partitionNames = clause.getPartitionNames();
// currently, only in memory property could reach here
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY));
olapTable = (OlapTable) db.getTable(tableName);
if (olapTable.isCloudNativeTable()) {
throw new DdlException("Lake table not support alter in_memory");
}

schemaChangeHandler.updatePartitionsInMemoryMeta(
db, tableName, partitionNames, properties);

locker.lockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.WRITE);
try {
modifyPartitionsProperty(db, olapTable, partitionNames, properties);
} finally {
locker.unLockTablesWithIntensiveDbLock(db, Lists.newArrayList(olapTable.getId()), LockType.WRITE);
}
} else if (alterClause instanceof ModifyTablePropertiesClause) {
if (alterClause instanceof ModifyTablePropertiesClause) {
Map<String, String> properties = alterClause.getProperties();
Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_PERSISTENT_INDEX) ||
Expand Down Expand Up @@ -725,10 +685,6 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
} else {
throw new DdlException("Invalid alter operation: " + alterClause.getOpType());
}
} else if (alterClause instanceof CompactionClause) {
String s = (((CompactionClause) alterClause).isBaseCompaction() ? "base" : "cumulative")
+ " compact " + tableName + " partitions: " + ((CompactionClause) alterClause).getPartitionNames();
compactionHandler.process(alterClauses, db, olapTable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.DdlException;
import com.starrocks.common.UserException;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
Expand All @@ -33,14 +32,12 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.ast.AlterClause;
import com.starrocks.sql.ast.CancelStmt;
import com.starrocks.sql.ast.CompactionClause;
import com.starrocks.task.AgentBatchTask;
import com.starrocks.task.AgentTask;
import com.starrocks.task.AgentTaskExecutor;
import com.starrocks.task.AgentTaskQueue;
import com.starrocks.task.CompactionTask;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.util.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -50,28 +47,12 @@
import java.util.ArrayList;
import java.util.List;

public class CompactionHandler extends AlterHandler {
public class CompactionHandler {
private static final Logger LOG = LogManager.getLogger(CompactionHandler.class);

public CompactionHandler() {
super("compaction");
}


@Override
protected void runAfterCatalogReady() {
super.runAfterCatalogReady();
}

@Override
public List<List<Comparable>> getAlterJobInfosByDb(Database db) {
throw new NotImplementedException();
}

@Override
// add synchronized to avoid process 2 or more stmts at same time
public synchronized ShowResultSet process(List<AlterClause> alterClauses, Database db,
OlapTable olapTable) throws UserException {
public static synchronized ShowResultSet process(List<AlterClause> alterClauses, Database db,
OlapTable olapTable) throws UserException {
Preconditions.checkArgument(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
Preconditions.checkState(alterClause instanceof CompactionClause);
Expand Down Expand Up @@ -142,7 +123,7 @@ public synchronized ShowResultSet process(List<AlterClause> alterClauses, Databa
}

@NotNull
private List<Partition> findAllPartitions(OlapTable olapTable, CompactionClause compactionClause) {
private static List<Partition> findAllPartitions(OlapTable olapTable, CompactionClause compactionClause) {
List<Partition> allPartitions = new ArrayList<>();
if (compactionClause.getPartitionNames().isEmpty()) {
allPartitions.addAll(olapTable.getPartitions());
Expand All @@ -159,10 +140,4 @@ private List<Partition> findAllPartitions(OlapTable olapTable, CompactionClause
}
return allPartitions;
}

@Override
public synchronized void cancel(CancelStmt stmt) throws DdlException {
throw new NotImplementedException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ public void handleNewListPartitionDescs(Map<ColumnId, Column> idToColumn,
long partitionId = partition.getId();
PartitionDesc partitionDesc = entry.second;
Preconditions.checkArgument(partitionDesc instanceof SinglePartitionDesc);
Preconditions.checkArgument(((SinglePartitionDesc) partitionDesc).isAnalyzed());
this.idToDataProperty.put(partitionId, partitionDesc.getPartitionDataProperty());
this.idToReplicationNum.put(partitionId, partitionDesc.getReplicationNum());
this.idToInMemory.put(partitionId, partitionDesc.isInMemory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ private Range<PartitionKey> checkNewRange(List<Column> partitionColumns, Partiti

public Range<PartitionKey> handleNewSinglePartitionDesc(Map<ColumnId, Column> schema, SingleRangePartitionDesc desc,
long partitionId, boolean isTemp) throws DdlException {
Preconditions.checkArgument(desc.isAnalyzed());
Range<PartitionKey> range;
try {
range = checkAndCreateRange(schema, desc, isTemp);
Expand Down Expand Up @@ -277,7 +276,6 @@ public void handleNewRangePartitionDescs(Map<ColumnId, Column> schema,
if (!existPartitionNameSet.contains(partition.getName())) {
long partitionId = partition.getId();
SingleRangePartitionDesc desc = (SingleRangePartitionDesc) entry.second;
Preconditions.checkArgument(desc.isAnalyzed());
Range<PartitionKey> range;
try {
range = checkAndCreateRange(schema, (SingleRangePartitionDesc) entry.second, isTemp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,12 @@ public static Map<String, String> analyzeDynamicPartition(Map<String, String> pr
return analyzedProperties;
}

public static void checkAlterAllowed(OlapTable olapTable) throws DdlException {
public static void checkAlterAllowed(OlapTable olapTable) {
TableProperty tableProperty = olapTable.getTableProperty();
if (tableProperty != null && tableProperty.getDynamicPartitionProperty() != null &&
tableProperty.getDynamicPartitionProperty().isExists() &&
tableProperty.getDynamicPartitionProperty().isEnabled()) {
throw new DdlException("Cannot add/drop partition on a Dynamic Partition Table, " +
throw new SemanticException("Cannot add/drop partition on a Dynamic Partition Table, " +
"Use command `ALTER TABLE tbl_name SET (\"dynamic_partition.enable\" = \"false\")` firstly.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ public void abortSink(String dbName, String table, List<TSinkCommitInfo> commitI
}

@Override
public void alterTable(AlterTableStmt stmt) throws UserException {
normal.alterTable(stmt);
public void alterTable(ConnectContext context, AlterTableStmt stmt) throws UserException {
normal.alterTable(context, stmt);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ default void finishSink(String dbName, String table, List<TSinkCommitInfo> commi
default void abortSink(String dbName, String table, List<TSinkCommitInfo> commitInfos) {
}

default void alterTable(AlterTableStmt stmt) throws UserException {
default void alterTable(ConnectContext context, AlterTableStmt stmt) throws UserException {
throw new StarRocksConnectorException("This connector doesn't support alter table");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,52 @@ public void clear() {
public CloudConfiguration getCloudConfiguration() {
return hdfsEnvironment.getCloudConfiguration();
}
<<<<<<< HEAD
=======

@Override
public void alterTable(ConnectContext context, AlterTableStmt stmt) throws UserException {
// (FIXME) add this api just for tests of external table
List<AlterClause> alterClauses = stmt.getAlterClauseList();
for (AlterClause alterClause : alterClauses) {
if (alterClause instanceof AddPartitionClause) {
addPartition(stmt, alterClause);
} else {
throw new StarRocksConnectorException("This connector doesn't support alter table type: %s",
alterClause.getOpType());
}
}
}

private void addPartition(AlterTableStmt stmt, AlterClause alterClause) {
HiveTable table = (HiveTable) getTable(stmt.getDbName(), stmt.getTableName());
AddPartitionClause addPartitionClause = (AddPartitionClause) alterClause;
List<String> partitionColumns = table.getPartitionColumnNames();
// now do not support to specify location of hive partition in add partition
if (!(addPartitionClause.getPartitionDesc() instanceof SingleItemListPartitionDesc)) {
return;
}
SingleItemListPartitionDesc partitionDesc = (SingleItemListPartitionDesc) addPartitionClause.getPartitionDesc();
String tablePath = table.getTableLocation();
String partitionString = partitionColumns.get(0) + "=" + partitionDesc.getValues().get(0);
String partitionPath = tablePath + "/" + partitionString;
HivePartition hivePartition = HivePartition.builder()
.setDatabaseName(table.getDbName())
.setTableName(table.getTableName())
.setColumns(table.getDataColumnNames().stream()
.map(table::getColumn)
.collect(Collectors.toList()))
.setValues(partitionDesc.getValues())
.setParameters(ImmutableMap.<String, String>builder()
.put("starrocks_version", Version.STARROCKS_VERSION + "-" + Version.STARROCKS_COMMIT_HASH)
.put(STARROCKS_QUERY_ID, ConnectContext.get().getQueryId().toString())
.buildOrThrow())
.setStorageFormat(table.getStorageFormat())
.setLocation(partitionPath)
.build();
HivePartitionWithStats partitionWithStats =
new HivePartitionWithStats(partitionString, hivePartition, HivePartitionStats.empty());
hmsOps.addPartitions(table.getDbName(), table.getTableName(), Lists.newArrayList(partitionWithStats));
}
>>>>>>> b0ea263f81 ([Enhancement] Migrate the alter table related logic scattered everywhere to AlterJobExecutor (#48637))
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void createView(CreateViewStmt stmt) throws DdlException {
}

@Override
public void alterTable(AlterTableStmt stmt) throws UserException {
public void alterTable(ConnectContext context, AlterTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
org.apache.iceberg.Table table = icebergCatalog.getTable(dbName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public ShowResultSet visitCancelRefreshMaterializedViewStatement(CancelRefreshMa
@Override
public ShowResultSet visitAlterTableStatement(AlterTableStmt stmt, ConnectContext context) {
ErrorReport.wrapWithRuntimeException(() -> {
context.getGlobalStateMgr().getMetadataMgr().alterTable(stmt);
context.getGlobalStateMgr().getMetadataMgr().alterTable(context, stmt);
});
return null;
}
Expand Down Expand Up @@ -1095,7 +1095,7 @@ public ShowResultSet visitDataCacheSelectStatement(DataCacheSelectStatement stat
public ShowResultSet visitCreateDictionaryStatement(CreateDictionaryStmt stmt, ConnectContext context) {
ErrorReport.wrapWithRuntimeException(() -> {
context.getGlobalStateMgr().getDictionaryMgr().createDictionary(stmt,
context.getCurrentCatalog(), context.getDatabase());
context.getCurrentCatalog(), context.getDatabase());
});
return null;
}
Expand Down
Loading

0 comments on commit c9db259

Please sign in to comment.