Skip to content

Commit

Permalink
Add Meta refactor development branch
Browse files Browse the repository at this point in the history
Signed-off-by: HangyuanLiu <[email protected]>
  • Loading branch information
HangyuanLiu committed Sep 14, 2024
1 parent 4bd744c commit e8e0cb1
Show file tree
Hide file tree
Showing 491 changed files with 12,888 additions and 11,853 deletions.
15 changes: 11 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/StarRocksFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ private static CommandLineOptions parseArgs(String[] args) {
"Specify the meta version to decode log value, separated by ',', first is community meta" +
" version, second is StarRocks meta version");

options.addOption("ls", "ls", true, "");

CommandLine cmd = null;
try {
cmd = commandLineParser.parse(options, args);
Expand All @@ -244,7 +246,7 @@ private static CommandLineOptions parseArgs(String[] args) {
} else if (cmd.hasOption('b') || cmd.hasOption("bdb")) {
if (cmd.hasOption('l') || cmd.hasOption("listdb")) {
// list bdb je databases
BDBToolOptions bdbOpts = new BDBToolOptions(true, "", false, "", "", 0, 0);
BDBToolOptions bdbOpts = new BDBToolOptions(true, "", false, "", "", 0, 0, "");
return new CommandLineOptions(false, bdbOpts);
} else if (cmd.hasOption('d') || cmd.hasOption("db")) {
// specify a database
Expand All @@ -255,7 +257,7 @@ private static CommandLineOptions parseArgs(String[] args) {
}

if (cmd.hasOption('s') || cmd.hasOption("stat")) {
BDBToolOptions bdbOpts = new BDBToolOptions(false, dbName, true, "", "", 0, 0);
BDBToolOptions bdbOpts = new BDBToolOptions(false, dbName, true, "", "", 0, 0, "");
return new CommandLineOptions(false, bdbOpts);
} else {
String fromKey = "";
Expand Down Expand Up @@ -292,9 +294,14 @@ private static CommandLineOptions parseArgs(String[] args) {
}
}

String path = "";
if (cmd.hasOption("ls")) {
path = cmd.getOptionValue("ls");
}

BDBToolOptions bdbOpts =
new BDBToolOptions(false, dbName, false, fromKey, endKey, metaVersion,
starrocksMetaVersion);
starrocksMetaVersion, path);
return new CommandLineOptions(false, bdbOpts);
}
} else {
Expand Down Expand Up @@ -325,7 +332,7 @@ private static void checkCommandLineOptions(CommandLineOptions cmdLineOpts) {
System.out.println("Java compile version: " + Version.STARROCKS_JAVA_COMPILE_VERSION);
System.exit(0);
} else if (cmdLineOpts.runBdbTools()) {

BDBTool bdbTool = new BDBTool(BDBEnvironment.getBdbDir(), cmdLineOpts.getBdbToolOpts());
if (bdbTool.run()) {
System.exit(0);
Expand Down
439 changes: 368 additions & 71 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java

Large diffs are not rendered by default.

468 changes: 14 additions & 454 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public boolean cancel(String errMsg) {
*/
protected boolean checkTableStable(Database db) throws AlterCancelException {
long unHealthyTabletId = TabletInvertedIndex.NOT_EXIST_VALUE;
OlapTable tbl = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getId(), tableId);
OlapTable tbl = (OlapTable) GlobalStateMgr.getCurrentState().getMetastore().getTable(db.getId(), tableId);
if (tbl == null) {
throw new AlterCancelException("Table " + tableId + " does not exist");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Void visitTableRenameClause(TableRenameClause clause, ConnectContext cont
String newMvName = clause.getNewTableName();
String oldMvName = table.getName();

if (GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), newMvName) != null) {
if (GlobalStateMgr.getCurrentState().getMetastore().getTable(db.getFullName(), newMvName) != null) {
throw new SemanticException("Materialized view [" + newMvName + "] is already used");
}
table.setName(newMvName);
Expand All @@ -82,7 +82,7 @@ public Void visitTableRenameClause(TableRenameClause clause, ConnectContext cont
final RenameMaterializedViewLog renameMaterializedViewLog =
new RenameMaterializedViewLog(table.getId(), db.getId(), newMvName);
updateTaskDefinition((MaterializedView) table);
GlobalStateMgr.getCurrentState().getEditLog().logMvRename(renameMaterializedViewLog);
GlobalStateMgr.getCurrentState().getMetastore().renameMaterializedView(renameMaterializedViewLog);
LOG.info("rename materialized view[{}] to {}, id: {}", oldMvName, newMvName, table.getId());
return null;
}
Expand Down Expand Up @@ -307,7 +307,7 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause modifyT
if (isChanged) {
ModifyTablePropertyOperationLog log = new ModifyTablePropertyOperationLog(materializedView.getDbId(),
materializedView.getId(), propClone);
GlobalStateMgr.getCurrentState().getEditLog().logAlterMaterializedViewProperties(log);
GlobalStateMgr.getCurrentState().getMetastore().alterMaterializedViewProperties(log);
}
LOG.info("alter materialized view properties {}, id: {}", propClone, materializedView.getId());
return null;
Expand Down Expand Up @@ -349,7 +349,7 @@ public Void visitRefreshSchemeClause(RefreshSchemeClause refreshSchemeDesc, Conn
}
try {
// check
Table mv = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getId(), materializedView.getId());
Table mv = GlobalStateMgr.getCurrentState().getMetastore().getTable(db.getId(), materializedView.getId());
if (mv == null) {
throw new DmlException(
"update meta failed. materialized view:" + materializedView.getName() + " not exist");
Expand Down Expand Up @@ -378,7 +378,7 @@ public Void visitRefreshSchemeClause(RefreshSchemeClause refreshSchemeDesc, Conn
}

final ChangeMaterializedViewRefreshSchemeLog log = new ChangeMaterializedViewRefreshSchemeLog(materializedView);
GlobalStateMgr.getCurrentState().getEditLog().logMvChangeRefreshScheme(log);
GlobalStateMgr.getCurrentState().getMetastore().changeMaterializedRefreshScheme(log);
} finally {
locker.unLockDatabase(db.getId(), LockType.WRITE);
}
Expand All @@ -403,11 +403,11 @@ public Void visitAlterMaterializedViewStatusClause(AlterMaterializedViewStatusCl
return null;
}

GlobalStateMgr.getCurrentState().getAlterJobMgr().
alterMaterializedViewStatus(materializedView, status, false);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.alterMaterializedViewStatus(materializedView, status, false);
// for manual refresh type, do not refresh
if (materializedView.getRefreshScheme().getType() != MaterializedView.RefreshType.MANUAL) {
GlobalStateMgr.getCurrentState().getLocalMetastore()
GlobalStateMgr.getCurrentState().getStarRocksMetadata()
.refreshMaterializedView(dbName, materializedView.getName(), true, null,
Constants.TaskRunPriority.NORMAL.value(), true, false);
}
Expand All @@ -418,14 +418,14 @@ public Void visitAlterMaterializedViewStatusClause(AlterMaterializedViewStatusCl
LOG.warn("Setting the materialized view {}({}) to inactive because " +
"user use alter materialized view set status to inactive",
materializedView.getName(), materializedView.getId());
GlobalStateMgr.getCurrentState().getAlterJobMgr().
alterMaterializedViewStatus(materializedView, status, false);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.alterMaterializedViewStatus(materializedView, status, false);
} else {
throw new AlterJobException("Unsupported modification materialized view status:" + status);
}
AlterMaterializedViewStatusLog log = new AlterMaterializedViewStatusLog(materializedView.getDbId(),
materializedView.getId(), status);
GlobalStateMgr.getCurrentState().getEditLog().logAlterMvStatus(log);
GlobalStateMgr.getCurrentState().getMetastore().alterMvStatus(log);
return null;
} catch (DdlException | MetaNotFoundException e) {
throw new AlterJobException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.lake.compaction.CompactionMgr;
import com.starrocks.lake.compaction.PartitionIdentifier;
import com.starrocks.meta.MetaStore;
import com.starrocks.qe.ShowResultSet;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
Expand All @@ -47,7 +48,7 @@
import java.util.ArrayList;
import java.util.List;

public class CompactionHandler {
public class CompactionHandler {
private static final Logger LOG = LogManager.getLogger(CompactionHandler.class);

// add synchronized to avoid process 2 or more stmts at same time
Expand Down Expand Up @@ -82,11 +83,16 @@ public static synchronized ShowResultSet process(List<AlterClause> alterClauses,
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(olapTable.getId()), LockType.READ);
try {
List<Partition> allPartitions = findAllPartitions(olapTable, compactionClause);

MetaStore localMetastore = GlobalStateMgr.getCurrentState().getMetastore();
for (Partition partition : allPartitions) {
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : physicalPartition.getMaterializedIndices(
MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
List<PhysicalPartition> physicalPartitionList = localMetastore.getAllPhysicalPartition(partition);
for (PhysicalPartition physicalPartition : physicalPartitionList) {
List<MaterializedIndex> materializedIndices = localMetastore
.getMaterializedIndices(physicalPartition, MaterializedIndex.IndexExtState.VISIBLE);
for (MaterializedIndex materializedIndex : materializedIndices) {
List<Tablet> tabletList = localMetastore.getAllTablets(materializedIndex);
for (Tablet tablet : tabletList) {
for (Long backendId : ((LocalTablet) tablet).getBackendIds()) {
backendToTablets.put(backendId, tablet.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ public AlterJobV2 build() throws UserException {
long partitionId = partition.getParentId();
long physicalPartitionId = partition.getId();
long shardGroupId = partition.getShardGroupId();
List<Tablet> originTablets = partition.getIndex(originIndexId).getTablets();

MaterializedIndex materializedIndex = partition.getIndex(originIndexId);
List<Tablet> originTablets = GlobalStateMgr.getCurrentState().getMetastore()
.getAllTablets(materializedIndex);

// TODO: It is not good enough to create shards into the same group id, schema change PR needs to
// revise the code again.
List<Long> originTabletIds = originTablets.stream().map(Tablet::getId).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ protected void runPendingJob() throws AlterCancelException {
// send task to be
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
List<Partition> partitions = Lists.newArrayList();
Database db = globalStateMgr.getLocalMetastore().getDb(dbId);
Database db = globalStateMgr.getMetastore().getDb(dbId);

if (db == null) {
throw new AlterCancelException("database does not exist, dbId:" + dbId);
}

LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getMetastore().getTable(db.getFullName(), tableName);
if (table == null) {
throw new AlterCancelException("table does not exist, tableName:" + tableName);
}
Expand Down Expand Up @@ -135,13 +135,13 @@ protected void runWaitingTxnJob() throws AlterCancelException {

@Override
protected void runRunningJob() throws AlterCancelException {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
Database db = GlobalStateMgr.getCurrentState().getMetastore().getDb(dbId);
if (db == null) {
// database has been dropped
throw new AlterCancelException("database does not exist, dbId:" + dbId);
}

LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getId(), tableId);
LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getMetastore().getTable(db.getId(), tableId);
if (table == null) {
// table has been dropped
throw new AlterCancelException("table does not exist, tableId:" + tableId);
Expand Down Expand Up @@ -187,14 +187,14 @@ protected void runFinishedRewritingJob() throws AlterCancelException {
return;
}

Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
Database db = GlobalStateMgr.getCurrentState().getMetastore().getDb(dbId);
if (db == null) {
// database has been dropped
LOG.warn("database does not exist, dbId:" + dbId);
throw new AlterCancelException("database does not exist, dbId:" + dbId);
}

LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getId(), tableId);
LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getMetastore().getTable(db.getId(), tableId);
if (table == null) {
// table has been dropped
LOG.warn("table does not exist, tableId:" + tableId);
Expand All @@ -221,12 +221,12 @@ protected void runFinishedRewritingJob() throws AlterCancelException {
}

boolean readyToPublishVersion() throws AlterCancelException {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
Database db = GlobalStateMgr.getCurrentState().getMetastore().getDb(dbId);
if (db == null) {
// database has been dropped
throw new AlterCancelException("database does not exist, dbId:" + dbId);
}
LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getId(), tableId);
LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getMetastore().getTable(db.getId(), tableId);
if (table == null) {
// table has been dropped
throw new AlterCancelException("table does not exist, tableId:" + tableId);
Expand Down Expand Up @@ -263,7 +263,8 @@ boolean publishVersion() {
long commitVersion = commitVersionMap.get(partitionId);
Map<Long, MaterializedIndex> dirtyIndexMap = physicalPartitionIndexMap.row(partitionId);
for (MaterializedIndex index : dirtyIndexMap.values()) {
Utils.publishVersion(index.getTablets(), txnInfo, commitVersion - 1, commitVersion,
List<Tablet> tabletList = GlobalStateMgr.getCurrentState().getMetastore().getAllTablets(index);
Utils.publishVersion(tabletList, txnInfo, commitVersion - 1, commitVersion,
warehouseId);
}
}
Expand Down Expand Up @@ -305,7 +306,7 @@ public void updatePhysicalPartitionTabletMeta(Database db, OlapTable table, Part
locker.unLockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.READ);
}
for (MaterializedIndex index : indexList) {
updateIndexTabletMeta(db, table, partition, index);
updateIndexTabletMeta(db, table, partition.getDefaultPhysicalPartition(), index);
}
}

Expand All @@ -319,7 +320,7 @@ public void updateIndexTabletMeta(Database db, OlapTable table, PhysicalPartitio
Locker locker = new Locker();
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.READ);
try {
tablets = new ArrayList<>(index.getTablets());
tablets = GlobalStateMgr.getCurrentState().getMetastore().getAllTablets(index);
} finally {
locker.unLockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.READ);
}
Expand Down Expand Up @@ -422,9 +423,9 @@ protected boolean cancelImpl(String errMsg) {
return false;
}

Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
Database db = GlobalStateMgr.getCurrentState().getMetastore().getDb(dbId);
if (db != null) {
LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getId(), tableId);
LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getMetastore().getTable(db.getId(), tableId);
if (table != null) {
Locker locker = new Locker();
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.WRITE);
Expand Down Expand Up @@ -480,14 +481,14 @@ public void replay(AlterJobV2 replayedJob) {
restoreState(other);
}

Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
Database db = GlobalStateMgr.getCurrentState().getMetastore().getDb(dbId);
if (db == null) {
// database has been dropped
LOG.warn("database does not exist, dbId:" + dbId);
return;
}

LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getId(), tableId);
LakeTable table = (LakeTable) GlobalStateMgr.getCurrentState().getMetastore().getTable(db.getId(), tableId);
if (table == null) {
return;
}
Expand Down
Loading

0 comments on commit e8e0cb1

Please sign in to comment.