Skip to content

Commit

Permalink
Add Meta refactor development branch
Browse files Browse the repository at this point in the history
Signed-off-by: HangyuanLiu <[email protected]>
  • Loading branch information
HangyuanLiu committed Sep 12, 2024
1 parent 4bd744c commit b0cfc6d
Show file tree
Hide file tree
Showing 114 changed files with 7,804 additions and 5,961 deletions.
349 changes: 325 additions & 24 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java

Large diffs are not rendered by default.

26 changes: 23 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@
import com.starrocks.scheduler.Task;
import com.starrocks.scheduler.TaskBuilder;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.LocalMetastore;
import com.starrocks.sql.analyzer.Analyzer;
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.AlterMaterializedViewStatusClause;
import com.starrocks.sql.ast.CancelAlterTableStmt;
import com.starrocks.sql.ast.CreateMaterializedViewStatement;
import com.starrocks.sql.ast.DropMaterializedViewStmt;
import com.starrocks.sql.ast.QueryStatement;
import com.starrocks.sql.ast.ShowAlterStmt;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
Expand All @@ -102,6 +103,8 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static com.starrocks.meta.StarRocksMeta.inactiveRelatedMaterializedView;

public class AlterJobMgr {
private static final Logger LOG = LogManager.getLogger(AlterJobMgr.class);
public static final String MANUAL_INACTIVE_MV_REASON = "user use alter materialized view set status to inactive";
Expand Down Expand Up @@ -231,7 +234,7 @@ public static QueryStatement recreateMVQuery(MaterializedView materializedView,
String createMvSql) {
// If we could parse the MV sql successfully, and the schema of mv does not change,
// we could reuse the existing MV
Optional<Database> mayDb = GlobalStateMgr.getCurrentState().getLocalMetastore().mayGetDb(materializedView.getDbId());
Optional<Database> mayDb = GlobalStateMgr.getCurrentState().getStarRocksMeta().mayGetDb(materializedView.getDbId());

// check database existing
String dbName = mayDb.orElseThrow(() ->
Expand Down Expand Up @@ -498,7 +501,7 @@ public void alterView(AlterViewInfo alterViewInfo) {
}
view.setNewFullSchema(newFullSchema);
view.setComment(comment);
LocalMetastore.inactiveRelatedMaterializedView(db, view,
inactiveRelatedMaterializedView(db, view,
MaterializedViewExceptions.inactiveReasonForBaseViewChanged(viewName));
db.dropTable(viewName);
db.registerTableUnlocked(view);
Expand Down Expand Up @@ -535,6 +538,23 @@ public void replayModifyPartition(ModifyPartitionInfo info) {
}
}

/*
* used for handling CancelAlterStmt (for client is the CANCEL ALTER
* command). including SchemaChangeHandler and RollupHandler
*/
public void cancelAlter(CancelAlterTableStmt stmt, String reason) throws DdlException {
if (stmt.getAlterType() == ShowAlterStmt.AlterType.ROLLUP) {
materializedViewHandler.cancel(stmt, reason);
} else if (stmt.getAlterType() == ShowAlterStmt.AlterType.COLUMN
|| stmt.getAlterType() == ShowAlterStmt.AlterType.OPTIMIZE) {
schemaChangeHandler.cancel(stmt, reason);
} else if (stmt.getAlterType() == ShowAlterStmt.AlterType.MATERIALIZED_VIEW) {
materializedViewHandler.cancelMV(stmt);
} else {
throw new DdlException("Cancel " + stmt.getAlterType() + " does not implement yet");
}
}

public SchemaChangeHandler getSchemaChangeHandler() {
return this.schemaChangeHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public Void visitAlterMaterializedViewStatusClause(AlterMaterializedViewStatusCl
alterMaterializedViewStatus(materializedView, status, false);
// for manual refresh type, do not refresh
if (materializedView.getRefreshScheme().getType() != MaterializedView.RefreshType.MANUAL) {
GlobalStateMgr.getCurrentState().getLocalMetastore()
GlobalStateMgr.getCurrentState().getStarRocksMeta()
.refreshMaterializedView(dbName, materializedView.getName(), true, null,
Constants.TaskRunPriority.NORMAL.value(), true, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.lake.compaction.CompactionMgr;
import com.starrocks.lake.compaction.PartitionIdentifier;
import com.starrocks.meta.TabletMetastore;
import com.starrocks.qe.ShowResultSet;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
Expand All @@ -47,7 +48,7 @@
import java.util.ArrayList;
import java.util.List;

public class CompactionHandler {
public class CompactionHandler {
private static final Logger LOG = LogManager.getLogger(CompactionHandler.class);

// add synchronized to avoid process 2 or more stmts at same time
Expand Down Expand Up @@ -82,11 +83,16 @@ public static synchronized ShowResultSet process(List<AlterClause> alterClauses,
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(olapTable.getId()), LockType.READ);
try {
List<Partition> allPartitions = findAllPartitions(olapTable, compactionClause);

TabletMetastore tabletMetastore = GlobalStateMgr.getCurrentState().getTabletMetastore();
for (Partition partition : allPartitions) {
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : physicalPartition.getMaterializedIndices(
MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
List<PhysicalPartition> physicalPartitionList = tabletMetastore.getAllPhysicalPartition(partition);
for (PhysicalPartition physicalPartition : physicalPartitionList) {
List<MaterializedIndex> materializedIndices = tabletMetastore
.getMaterializedIndices(physicalPartition, MaterializedIndex.IndexExtState.VISIBLE);
for (MaterializedIndex materializedIndex : materializedIndices) {
List<Tablet> tabletList = tabletMetastore.getAllTablets(materializedIndex);
for (Tablet tablet : tabletList) {
for (Long backendId : ((LocalTablet) tablet).getBackendIds()) {
backendToTablets.put(backendId, tablet.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ public AlterJobV2 build() throws UserException {
long partitionId = partition.getParentId();
long physicalPartitionId = partition.getId();
long shardGroupId = partition.getShardGroupId();
List<Tablet> originTablets = partition.getIndex(originIndexId).getTablets();

MaterializedIndex materializedIndex = partition.getIndex(originIndexId);
List<Tablet> originTablets = GlobalStateMgr.getCurrentState().getTabletMetastore()
.getAllTablets(materializedIndex);

// TODO: It is not good enough to create shards into the same group id, schema change PR needs to
// revise the code again.
List<Long> originTabletIds = originTablets.stream().map(Tablet::getId).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ boolean publishVersion() {
long commitVersion = commitVersionMap.get(partitionId);
Map<Long, MaterializedIndex> dirtyIndexMap = physicalPartitionIndexMap.row(partitionId);
for (MaterializedIndex index : dirtyIndexMap.values()) {
Utils.publishVersion(index.getTablets(), txnInfo, commitVersion - 1, commitVersion,
List<Tablet> tabletList = GlobalStateMgr.getCurrentState().getTabletMetastore().getAllTablets(index);
Utils.publishVersion(tabletList, txnInfo, commitVersion - 1, commitVersion,
warehouseId);
}
}
Expand Down Expand Up @@ -319,7 +320,7 @@ public void updateIndexTabletMeta(Database db, OlapTable table, PhysicalPartitio
Locker locker = new Locker();
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.READ);
try {
tablets = new ArrayList<>(index.getTablets());
tablets = GlobalStateMgr.getCurrentState().getTabletMetastore().getAllTablets(index);
} finally {
locker.unLockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.READ);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -338,7 +337,9 @@ protected void runPendingJob() throws AlterCancelException {
if (enableTabletCreationOptimization) {
numTablets = physicalPartitionIndexMap.size();
} else {
numTablets = physicalPartitionIndexMap.values().stream().map(MaterializedIndex::getTablets)
numTablets = physicalPartitionIndexMap.values().stream()
.map(materializedIndex -> GlobalStateMgr.getCurrentState().getTabletMetastore()
.getAllTablets(materializedIndex))
.mapToLong(List::size).sum();
}
countDownLatch = new MarkedCountDownLatch<>((int) numTablets);
Expand Down Expand Up @@ -373,7 +374,7 @@ protected void runPendingJob() throws AlterCancelException {
.build().toTabletSchema();

boolean createSchemaFile = true;
for (Tablet shadowTablet : shadowIdx.getTablets()) {
for (Tablet shadowTablet : GlobalStateMgr.getCurrentState().getTabletMetastore().getAllTablets(shadowIdx)) {
long shadowTabletId = shadowTablet.getId();
ComputeNode computeNode = GlobalStateMgr.getCurrentState().getWarehouseMgr()
.getComputeNodeAssignedToTablet(warehouseId, (LakeTablet) shadowTablet);
Expand Down Expand Up @@ -418,7 +419,7 @@ protected void runPendingJob() throws AlterCancelException {
}

sendAgentTaskAndWait(batchTask, countDownLatch, Config.tablet_create_timeout_second * numTablets,
waitingCreatingReplica, isCancelling);
waitingCreatingReplica, isCancelling);

// Add shadow indexes to table.
try (WriteLockedDatabase db = getWriteLockedDatabase(dbId)) {
Expand Down Expand Up @@ -483,7 +484,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
long shadowIdxId = entry.getKey();
MaterializedIndex shadowIdx = entry.getValue();
for (Tablet shadowTablet : shadowIdx.getTablets()) {
for (Tablet shadowTablet : GlobalStateMgr.getCurrentState().getTabletMetastore().getAllTablets(shadowIdx)) {
ComputeNode computeNode = GlobalStateMgr.getCurrentState().getWarehouseMgr()
.getComputeNodeAssignedToTablet(warehouseId, (LakeTablet) shadowTablet);
if (computeNode == null) {
Expand Down Expand Up @@ -610,7 +611,7 @@ protected void runFinishedRewritingJob() throws AlterCancelException {

// Delete tablet and shards
for (MaterializedIndex droppedIndex : droppedIndexes) {
List<Long> shards = droppedIndex.getTablets().stream().map(Tablet::getId).collect(Collectors.toList());
List<Long> shards = GlobalStateMgr.getCurrentState().getTabletMetastore().getAllTabletIDs(droppedIndex);
// TODO: what if unusedShards deletion is partially successful?
StarMgrMetaSyncer.dropTabletAndDeleteShard(shards, GlobalStateMgr.getCurrentState().getStarOSAgent());
}
Expand Down Expand Up @@ -668,7 +669,9 @@ boolean publishVersion() {
long commitVersion = commitVersionMap.get(partitionId);
Map<Long, MaterializedIndex> shadowIndexMap = physicalPartitionIndexMap.row(partitionId);
for (MaterializedIndex shadowIndex : shadowIndexMap.values()) {
Utils.publishVersion(shadowIndex.getTablets(), txnInfo, 1, commitVersion, warehouseId);
Utils.publishVersion(
GlobalStateMgr.getCurrentState().getTabletMetastore().getAllTablets(shadowIndex),
txnInfo, 1, commitVersion, warehouseId);
}
}
return true;
Expand Down Expand Up @@ -700,7 +703,7 @@ private void inactiveRelatedMv(Set<String> modifiedColumns, @NotNull OlapTable t
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
for (MvId mvId : tbl.getRelatedMaterializedViews()) {
MaterializedView mv = (MaterializedView) GlobalStateMgr.getCurrentState().getLocalMetastore()
.getTable(db.getId(), mvId.getId());
.getTable(db.getId(), mvId.getId());
if (mv == null) {
LOG.warn("Ignore materialized view {} does not exists", mvId);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public void processBatchAddRollup(List<AlterClause> alterClauses, Database db, O
TabletInvertedIndex tabletInvertedIndex = GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
for (RollupJobV2 rollupJobV2 : rollupNameJobMap.values()) {
for (MaterializedIndex index : rollupJobV2.getPartitionIdToRollupIndex().values()) {
for (Tablet tablet : index.getTablets()) {
for (Tablet tablet : GlobalStateMgr.getCurrentState().getTabletMetastore().getAllTablets(index)) {
tabletInvertedIndex.deleteTablet(tablet.getId());
}
}
Expand Down Expand Up @@ -459,7 +459,7 @@ private List<Column> checkAndPrepareMaterializedView(CreateMaterializedViewStmt

// check if mv index already exists in db

if (GlobalStateMgr.getCurrentState().getLocalMetastore().mayGetTable(db.getFullName(), mvName).isPresent()) {
if (GlobalStateMgr.getCurrentState().getStarRocksMeta().mayGetTable(db.getFullName(), mvName).isPresent()) {
throw new DdlException("Table [" + mvName + "] already exists in the db " + db.getFullName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ private void cancelInternal() {
if (partition != null) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
// hash set is able to deduplicate the elements
tmpTablets.addAll(index.getTablets());
tmpTablets.addAll(GlobalStateMgr.getCurrentState().getTabletMetastore().getAllTablets(index));
}
targetTable.dropTempPartition(partition.getName(), true);
} else {
Expand Down
Loading

0 comments on commit b0cfc6d

Please sign in to comment.