Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: HangyuanLiu <[email protected]>
  • Loading branch information
HangyuanLiu committed Nov 8, 2024
1 parent 84fbd74 commit dbfc8d9
Show file tree
Hide file tree
Showing 89 changed files with 739 additions and 565 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public AlterJobV2 build() throws UserException {
properties.put(LakeTablet.PROPERTY_KEY_PARTITION_ID, Long.toString(physicalPartitionId));
properties.put(LakeTablet.PROPERTY_KEY_INDEX_ID, Long.toString(shadowIndexId));
List<Long> shadowTabletIds =
createShards(originTablets.size(), table.getPartitionFilePathInfo(partitionId),
table.getPartitionFileCacheInfo(partitionId), shardGroupId,
createShards(originTablets.size(), table.getPartitionFilePathInfo(physicalPartitionId),
table.getPartitionFileCacheInfo(physicalPartitionId), shardGroupId,
originTabletIds, properties, warehouseId);
Preconditions.checkState(originTablets.size() == shadowTabletIds.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,13 @@ public void updateIndexTabletMeta(Database db, OlapTable table, PhysicalPartitio

void updateNextVersion(@NotNull LakeTable table) {
for (long partitionId : physicalPartitionIndexMap.rowKeySet()) {
PhysicalPartition partition = table.getPhysicalPartition(partitionId);
long commitVersion = commitVersionMap.get(partitionId);
Preconditions.checkState(partition.getNextVersion() == commitVersion,
"partitionNextVersion=" + partition.getNextVersion() + " commitVersion=" + commitVersion);
partition.setNextVersion(commitVersion + 1);
PhysicalPartition physicalPartition = table.getPhysicalPartition(partitionId);
long commitVersion = commitVersionMap.get(physicalPartition.getId());
Preconditions.checkState(physicalPartition.getNextVersion() == commitVersion,
"partitionNextVersion=" + physicalPartition.getNextVersion() + " commitVersion=" + commitVersion);
physicalPartition.setNextVersion(commitVersion + 1);
LOG.info("LakeTableAlterMetaJob id: {} update next version of partition: {}, commitVersion: {}",
jobId, partition.getId(), commitVersion);
jobId, physicalPartition.getId(), commitVersion);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,13 @@ protected void runPendingJob() throws AlterCancelException {
countDownLatch = new MarkedCountDownLatch<>((int) numTablets);
createReplicaLatch = countDownLatch;
long baseIndexId = table.getBaseIndexId();
for (long partitionId : physicalPartitionIndexMap.rowKeySet()) {
PhysicalPartition partition = table.getPhysicalPartition(partitionId);
Preconditions.checkState(partition != null);
TStorageMedium storageMedium = table.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
for (long physicalPartitionId : physicalPartitionIndexMap.rowKeySet()) {
PhysicalPartition physicalPartition = table.getPhysicalPartition(physicalPartitionId);
Preconditions.checkState(physicalPartition != null);
TStorageMedium storageMedium = table.getPartitionInfo()
.getDataProperty(physicalPartition.getParentId()).getStorageMedium();

Map<Long, MaterializedIndex> shadowIndexMap = physicalPartitionIndexMap.row(partitionId);
Map<Long, MaterializedIndex> shadowIndexMap = physicalPartitionIndexMap.row(physicalPartitionId);
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
long shadowIdxId = entry.getKey();
MaterializedIndex shadowIdx = entry.getValue();
Expand Down Expand Up @@ -387,7 +388,7 @@ protected void runPendingJob() throws AlterCancelException {
.setNodeId(computeNode.getId())
.setDbId(dbId)
.setTableId(tableId)
.setPartitionId(partitionId)
.setPartitionId(physicalPartitionId)
.setIndexId(shadowIdxId)
.setTabletId(shadowTabletId)
.setVersion(Partition.PARTITION_INIT_VERSION)
Expand Down
56 changes: 29 additions & 27 deletions fe/fe-core/src/main/java/com/starrocks/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public enum RestoreJobState {
private AgentBatchTask batchTask;

boolean enableColocateRestore = Config.enable_colocate_restore;

public RestoreJob() {
super(JobType.RESTORE);
}
Expand Down Expand Up @@ -424,19 +424,20 @@ private void checkIfNeedCancel() {
locker.lockDatabase(db.getId(), LockType.READ);
try {
for (IdChain idChain : fileMapping.getMapping().keySet()) {
OlapTable tbl = (OlapTable) globalStateMgr.getLocalMetastore()
.getTable(db.getId(), idChain.getTblId());
OlapTable tbl = (OlapTable) globalStateMgr.getLocalMetastore().getTable(db.getId(), idChain.getTblId());
if (tbl == null) {
status = new Status(ErrCode.NOT_FOUND, "table " + idChain.getTblId() + " has been dropped");
return;
}
PhysicalPartition part = tbl.getPhysicalPartition(idChain.getPartId());
if (part == null) {

PhysicalPartition physicalPartition = tbl.getPhysicalPartition(idChain.getPartId());

if (physicalPartition == null) {
status = new Status(ErrCode.NOT_FOUND, "partition " + idChain.getPartId() + " has been dropped");
return;
}

MaterializedIndex index = part.getIndex(idChain.getIdxId());
MaterializedIndex index = physicalPartition.getIndex(idChain.getIdxId());
if (index == null) {
status = new Status(ErrCode.NOT_FOUND, "index " + idChain.getIdxId() + " has been dropped");
return;
Expand Down Expand Up @@ -497,8 +498,8 @@ private void checkAndPrepareMeta() {

if (!tbl.isSupportBackupRestore()) {
status = new Status(ErrCode.UNSUPPORTED,
"Table: " + tbl.getName() +
" can not support backup restore, type: {}" + tbl.getType());
"Table: " + tbl.getName() +
" can not support backup restore, type: {}" + tbl.getType());
return;
}

Expand Down Expand Up @@ -532,11 +533,11 @@ private void checkAndPrepareMeta() {
Table remoteTbl = backupMeta.getTable(tblInfo.name);
Preconditions.checkNotNull(remoteTbl);
Table localTbl = globalStateMgr.getLocalMetastore()
.getTable(db.getFullName(), jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
.getTable(db.getFullName(), jobInfo.getAliasByOriginNameIfSet(tblInfo.name));

if (localTbl != null && remoteTbl.isOlapView() && !localTbl.isOlapView()) {
status = new Status(ErrCode.BAD_REPLACE,
"Table: " + localTbl.getName() + " has existed and it is not a View");
"Table: " + localTbl.getName() + " has existed and it is not a View");
return;
}

Expand Down Expand Up @@ -569,8 +570,8 @@ private void checkAndPrepareMeta() {
// table already exist, check schema
if (!localTbl.isSupportBackupRestore()) {
status = new Status(ErrCode.UNSUPPORTED,
"Table: " + localTbl.getName() +
" can not support backup restore, type: {}" + localTbl.getType());
"Table: " + localTbl.getName() +
" can not support backup restore, type: {}" + localTbl.getType());
return;
}
OlapTable localOlapTbl = (OlapTable) localTbl;
Expand Down Expand Up @@ -735,7 +736,7 @@ private void checkAndPrepareMeta() {
ColocateTableIndex.GroupId groupId = colocateTableIndex.getGroup(remoteOlapTbl.getId());
List<List<Long>> backendsPerBucketSeq = colocateTableIndex.getBackendsPerBucketSeq(groupId);
ColocatePersistInfo colocatePersistInfo = ColocatePersistInfo
.createForAddTable(groupId, remoteOlapTbl.getId(), backendsPerBucketSeq);
.createForAddTable(groupId, remoteOlapTbl.getId(), backendsPerBucketSeq);
colocatePersistInfos.add(colocatePersistInfo);
}

Expand All @@ -755,7 +756,7 @@ private void checkAndPrepareMeta() {
// generate create replica tasks for all restored partitions
for (Pair<String, Partition> entry : restoredPartitions) {
OlapTable localTbl = (OlapTable) globalStateMgr.getLocalMetastore()
.getTable(db.getFullName(), entry.first);
.getTable(db.getFullName(), entry.first);
Preconditions.checkNotNull(localTbl, localTbl.getName());
Partition restorePart = entry.second;
OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
Expand Down Expand Up @@ -908,17 +909,17 @@ protected void prepareAndSendSnapshotTasks(Database db) {
locker.lockDatabase(db.getId(), LockType.READ);
try {
for (IdChain idChain : fileMapping.getMapping().keySet()) {
OlapTable tbl = (OlapTable) globalStateMgr.getLocalMetastore()
.getTable(db.getId(), idChain.getTblId());
PhysicalPartition part = tbl.getPhysicalPartition(idChain.getPartId());
MaterializedIndex index = part.getIndex(idChain.getIdxId());
OlapTable tbl = (OlapTable) globalStateMgr.getLocalMetastore().getTable(db.getId(), idChain.getTblId());

PhysicalPartition physicalPartition = tbl.getPhysicalPartition(idChain.getPartId());
MaterializedIndex index = physicalPartition.getIndex(idChain.getIdxId());
LocalTablet tablet = (LocalTablet) index.getTablet(idChain.getTabletId());
Replica replica = tablet.getReplicaById(idChain.getReplicaId());
long signature = globalStateMgr.getNextId();
SnapshotTask task = new SnapshotTask(null, replica.getBackendId(), signature,
jobId, db.getId(),
tbl.getId(), part.getId(), index.getId(), tablet.getId(),
part.getVisibleVersion(),
tbl.getId(), physicalPartition.getId(), index.getId(), tablet.getId(),
physicalPartition.getVisibleVersion(),
tbl.getSchemaHashByIndexId(index.getId()), timeoutMs,
true /* is restore task*/);
batchTask.addTask(task);
Expand Down Expand Up @@ -1098,7 +1099,7 @@ protected void genFileMappingWithPartition(OlapTable localTbl, Partition localPa
for (Replica localReplica : localTablet.getImmutableReplicas()) {
IdChain src = new IdChain(remoteTblId, backupPartInfo.id, backupIdxInfo.id, backupTabletInfo.id,
-1L /* no replica id */);
IdChain dest = new IdChain(localTbl.getId(), localPartition.getId(),
IdChain dest = new IdChain(localTbl.getId(), localPartition.getDefaultPhysicalPartition().getId(),
localIdx.getId(), localTablet.getId(), localReplica.getId());
fileMapping.putMapping(dest, src, overwrite);
LOG.debug("tablet mapping: {} to {} file mapping: {} to {}",
Expand Down Expand Up @@ -1142,7 +1143,7 @@ private void replayCheckAndPrepareMeta() {
// replay set all existing tables's state to RESTORE
for (BackupTableInfo tblInfo : jobInfo.tables.values()) {
Table tbl = globalStateMgr.getLocalMetastore()
.getTable(db.getFullName(), jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
.getTable(db.getFullName(), jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
if (tbl == null || tbl.isOlapView()) {
continue;
}
Expand Down Expand Up @@ -1174,7 +1175,7 @@ private void replayCheckAndPrepareMeta() {
protected void addRestoredPartitions(Database db, boolean modify) {
for (Pair<String, Partition> entry : restoredPartitions) {
OlapTable localTbl = (OlapTable) globalStateMgr.getLocalMetastore()
.getTable(db.getFullName(), entry.first);
.getTable(db.getFullName(), entry.first);
Partition restorePart = entry.second;
OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
RangePartitionInfo localPartitionInfo = (RangePartitionInfo) localTbl.getPartitionInfo();
Expand Down Expand Up @@ -1374,6 +1375,7 @@ protected void prepareDownloadTasks(List<SnapshotInfo> beSnapshotInfos, Database

IdChain catalogIds = new IdChain(tbl.getId(), part.getId(), idx.getId(),
info.getTabletId(), replica.getId());

IdChain repoIds = fileMapping.get(catalogIds);
if (repoIds == null) {
status = new Status(ErrCode.NOT_FOUND,
Expand Down Expand Up @@ -1527,7 +1529,7 @@ private Status allTabletCommitted(boolean isReplay) {
try {
for (BackupTableInfo tblInfo : jobInfo.tables.values()) {
Table tbl = globalStateMgr.getLocalMetastore()
.getTable(db.getFullName(), jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
.getTable(db.getFullName(), jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
// skip to restore table ids
if (skipRestoreRemoteTableIds.contains(tblInfo.id)) {
continue;
Expand Down Expand Up @@ -1720,7 +1722,7 @@ public void cancelInternal(boolean isReplay) {
// remove restored partitions
for (Pair<String, Partition> entry : restoredPartitions) {
OlapTable restoreTbl = (OlapTable) globalStateMgr.getLocalMetastore()
.getTable(db.getFullName(), entry.first);
.getTable(db.getFullName(), entry.first);
if (restoreTbl == null) {
continue;
}
Expand All @@ -1744,7 +1746,7 @@ public void cancelInternal(boolean isReplay) {
for (Table restoreTbl : restoredTbls) {
if (restoreTbl instanceof OlapTable && restoreTbl.getId() == colocatePersistInfo.getTableId()) {
globalStateMgr.getColocateTableIndex()
.removeTable(restoreTbl.getId(), (OlapTable) restoreTbl, isReplay);
.removeTable(restoreTbl.getId(), (OlapTable) restoreTbl, isReplay);
}
}
}
Expand Down Expand Up @@ -1773,7 +1775,7 @@ private void setTableStateToNormal(Database db) {
continue;
}
Table tbl = globalStateMgr.getLocalMetastore()
.getTable(db.getFullName(), jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
.getTable(db.getFullName(), jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
if (tbl == null) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ private void updateMetaInternal(String dbName, TTableMeta meta, List<TBackendMet
for (TPartitionMeta partitionMeta : meta.getPartitions()) {
Partition logicalPartition = new Partition(partitionMeta.getPartition_id(),
partitionMeta.getPartition_name(),
null, // TODO(wulei): fix it
defaultDistributionInfo);

PhysicalPartition physicalPartition = new PhysicalPartition(GlobalStateMgr.getCurrentState().getNextId(),
Expand Down
13 changes: 8 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
});
origPhysicalPartitions.forEach(physicalPartition -> {
if (physicalPartition.getId() != newPartitionId) {
physicalPartition.setIdForRestore(globalStateMgr.getNextId());
physicalPartition.setIdForRestore(GlobalStateMgr.getCurrentState().getNextId());
physicalPartition.setParentId(newPartitionId);
partition.addSubPartition(physicalPartition);
}
Expand Down Expand Up @@ -1357,6 +1357,7 @@ public void addPartition(Partition partition) {

public void addPhysicalPartition(PhysicalPartition physicalPartition) {
physicalPartitionIdToPartitionId.put(physicalPartition.getId(), physicalPartition.getParentId());
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), physicalPartition.getParentId());
}

// This is a private method.
Expand Down Expand Up @@ -2250,9 +2251,11 @@ public List<List<Long>> getArbitraryTabletBucketsSeq() throws DdlException {
List<List<Long>> backendsPerBucketSeq = Lists.newArrayList();
Optional<Partition> optionalPartition = idToPartition.values().stream().findFirst();
if (optionalPartition.isPresent()) {
PhysicalPartition partition = optionalPartition.get().getDefaultPhysicalPartition();
Partition partition = optionalPartition.get();
PhysicalPartition physicalPartition = partition.getDefaultPhysicalPartition();

short replicationNum = partitionInfo.getReplicationNum(partition.getId());
MaterializedIndex baseIdx = partition.getBaseIndex();
MaterializedIndex baseIdx = physicalPartition.getBaseIndex();
for (Long tabletId : baseIdx.getTabletIdsInOrder()) {
LocalTablet tablet = (LocalTablet) baseIdx.getTablet(tabletId);
List<Long> replicaBackendIds = tablet.getNormalReplicaBackendIds();
Expand Down Expand Up @@ -3412,10 +3415,10 @@ public FilePathInfo getDefaultFilePathInfo() {
}

@Nullable
public FilePathInfo getPartitionFilePathInfo(long partitionId) {
public FilePathInfo getPartitionFilePathInfo(long physicalPartitionId) {
FilePathInfo pathInfo = getDefaultFilePathInfo();
if (pathInfo != null) {
return StarOSAgent.allocatePartitionFilePathInfo(pathInfo, partitionId);
return StarOSAgent.allocatePartitionFilePathInfo(pathInfo, physicalPartitionId);
}
return null;
}
Expand Down
Loading

0 comments on commit dbfc8d9

Please sign in to comment.