Skip to content

Commit

Permalink
[Enhancement] Support getting physical partition by name (#47270)
Browse files Browse the repository at this point in the history
Signed-off-by: xiangguangyxg <[email protected]>
  • Loading branch information
xiangguangyxg authored Jul 19, 2024
1 parent 2746fdd commit 08b9092
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 28 deletions.
44 changes: 44 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public enum OlapTableState {
protected Map<String, Partition> nameToPartition = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);

protected Map<Long, Long> physicalPartitionIdToPartitionId = new HashMap<>();
protected Map<String, Long> physicalPartitionNameToPartitionId = new HashMap<>();

@SerializedName(value = "defaultDistributionInfo")
protected DistributionInfo defaultDistributionInfo;
Expand Down Expand Up @@ -364,6 +365,7 @@ public void copyOnlyForQuery(OlapTable olapTable) {
olapTable.idToPartition = idToPartitions;
olapTable.nameToPartition = nameToPartitions;
olapTable.physicalPartitionIdToPartitionId = this.physicalPartitionIdToPartitionId;
olapTable.physicalPartitionNameToPartitionId = this.physicalPartitionNameToPartitionId;
olapTable.tempPartitions = new TempPartitions();
for (Partition tempPartition : this.getTempPartitions()) {
olapTable.tempPartitions.addPartition(tempPartition.shallowCopy());
Expand Down Expand Up @@ -789,6 +791,7 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
.put(newPartId, rangePartitionInfo.idToInMemory.remove(entry.getValue()));
idToPartition.get(entry.getValue()).getSubPartitions().forEach(physicalPartition -> {
physicalPartitionIdToPartitionId.remove(physicalPartition.getId());
physicalPartitionNameToPartitionId.remove(physicalPartition.getName());
});
idToPartition.put(newPartId, idToPartition.remove(entry.getValue()));
Partition partition = idToPartition.get(newPartId);
Expand All @@ -801,6 +804,7 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
partition.addSubPartition(physicalPartition);
}
physicalPartitionIdToPartitionId.put(physicalPartition.getId(), newPartId);
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), newPartId);
});
}
} else {
Expand All @@ -813,6 +817,7 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
partitionInfo.idToInMemory.put(newPartId, partitionInfo.idToInMemory.remove(entry.getValue()));
idToPartition.get(entry.getValue()).getSubPartitions().forEach(physicalPartition -> {
physicalPartitionIdToPartitionId.remove(physicalPartition.getId());
physicalPartitionNameToPartitionId.remove(physicalPartition.getName());
});
idToPartition.put(newPartId, idToPartition.remove(entry.getValue()));
Partition partition = idToPartition.get(newPartId);
Expand All @@ -825,6 +830,7 @@ public Status resetIdsForRestore(GlobalStateMgr globalStateMgr, Database db, int
partition.addSubPartition(physicalPartition);
}
physicalPartitionIdToPartitionId.put(physicalPartition.getId(), newPartId);
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), newPartId);
});
}
}
Expand Down Expand Up @@ -1232,6 +1238,7 @@ public void addPartition(Partition partition) {
nameToPartition.put(partition.getName(), partition);
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
physicalPartitionIdToPartitionId.put(physicalPartition.getId(), partition.getId());
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), partition.getId());
}
}

Expand All @@ -1255,6 +1262,9 @@ private void dropPartition(long dbId, String partitionName, boolean isForceDrop,
physicalPartitionIdToPartitionId.keySet().removeAll(partition.getSubPartitions()
.stream().map(PhysicalPartition::getId)
.collect(Collectors.toList()));
physicalPartitionNameToPartitionId.keySet().removeAll(partition.getSubPartitions()
.stream().map(PhysicalPartition::getName)
.collect(Collectors.toList()));
}

protected RecyclePartitionInfo buildRecyclePartitionInfo(long dbId, Partition partition) {
Expand Down Expand Up @@ -1386,6 +1396,33 @@ public PhysicalPartition getPhysicalPartition(long physicalPartitionId) {
return null;
}

public PhysicalPartition getPhysicalPartition(String physicalPartitionName) {
Long partitionId = physicalPartitionNameToPartitionId.get(physicalPartitionName);
if (partitionId == null) {
for (Partition partition : idToPartition.values()) {
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
if (subPartition.getName().equals(physicalPartitionName)) {
return subPartition;
}
}
}
for (Partition partition : tempPartitions.getAllPartitions()) {
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
if (subPartition.getName().equals(physicalPartitionName)) {
return subPartition;
}
}
}
} else {
Partition partition = getPartition(partitionId);
if (partition != null) {
return partition.getSubPartition(physicalPartitionName);
}
}

return null;
}

public Collection<PhysicalPartition> getPhysicalPartitions() {
return idToPartition.values().stream()
.flatMap(partition -> partition.getSubPartitions().stream())
Expand Down Expand Up @@ -1850,10 +1887,12 @@ public void gsonPostProcess() throws IOException {
// Recover nameToPartition from idToPartition
nameToPartition = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
physicalPartitionIdToPartitionId = Maps.newHashMap();
physicalPartitionNameToPartitionId = Maps.newHashMap();
for (Partition partition : idToPartition.values()) {
nameToPartition.put(partition.getName(), partition);
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
physicalPartitionIdToPartitionId.put(physicalPartition.getId(), partition.getId());
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), partition.getId());
}
}

Expand Down Expand Up @@ -1937,11 +1976,13 @@ public Partition replacePartition(Partition newPartition) {

oldPartition.getSubPartitions().forEach(physicalPartition -> {
physicalPartitionIdToPartitionId.remove(physicalPartition.getId());
physicalPartitionNameToPartitionId.remove(physicalPartition.getName());
});
idToPartition.remove(oldPartition.getId());
idToPartition.put(newPartition.getId(), newPartition);
newPartition.getSubPartitions().forEach(physicalPartition -> {
physicalPartitionIdToPartitionId.put(physicalPartition.getId(), newPartition.getId());
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), newPartition.getId());
});

nameToPartition.put(newPartition.getName(), newPartition);
Expand Down Expand Up @@ -2463,6 +2504,7 @@ public void dropTempPartition(String partitionName, boolean needDropTablet) {
tempPartitions.dropPartition(partitionName, needDropTablet);
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
physicalPartitionIdToPartitionId.remove(physicalPartition.getId());
physicalPartitionNameToPartitionId.remove(physicalPartition.getName());
}
}
}
Expand Down Expand Up @@ -2608,6 +2650,7 @@ public void addTempPartition(Partition partition) {
tempPartitions.addPartition(partition);
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
physicalPartitionIdToPartitionId.put(physicalPartition.getId(), partition.getId());
physicalPartitionNameToPartitionId.put(physicalPartition.getName(), partition.getId());
}
}

Expand All @@ -2616,6 +2659,7 @@ public void dropAllTempPartitions() {
partitionInfo.dropPartition(partition.getId());
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
physicalPartitionIdToPartitionId.remove(physicalPartition.getId());
physicalPartitionNameToPartitionId.remove(physicalPartition.getName());
}
}
tempPartitions.dropAll();
Expand Down
31 changes: 28 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Partition.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@
import com.starrocks.catalog.MaterializedIndex.IndexExtState;
import com.starrocks.catalog.MaterializedIndex.IndexState;
import com.starrocks.common.FeConstants;
import com.starrocks.common.io.Writable;
import com.starrocks.persist.gson.GsonPostProcessable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -55,7 +56,7 @@
/**
* Internal representation of partition-related metadata.
*/
public class Partition extends MetaObject implements PhysicalPartition, Writable {
public class Partition extends MetaObject implements PhysicalPartition, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(Partition.class);

public static final long PARTITION_INIT_VERSION = 1L;
Expand All @@ -79,6 +80,7 @@ public enum PartitionState {
private PartitionState state;
@SerializedName(value = "idToSubPartition")
private Map<Long, PhysicalPartitionImpl> idToSubPartition = Maps.newHashMap();
private Map<String, PhysicalPartitionImpl> nameToSubPartition = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);

@SerializedName(value = "distributionInfo")
private DistributionInfo distributionInfo;
Expand Down Expand Up @@ -170,6 +172,7 @@ public Partition shallowCopy() {
partition.distributionInfo = this.distributionInfo;
partition.shardGroupId = this.shardGroupId;
partition.idToSubPartition = Maps.newHashMap(this.idToSubPartition);
partition.nameToSubPartition = Maps.newHashMap(this.nameToSubPartition);
return partition;
}

Expand Down Expand Up @@ -201,11 +204,15 @@ public boolean isImmutable() {
public void addSubPartition(PhysicalPartition subPartition) {
if (subPartition instanceof PhysicalPartitionImpl) {
idToSubPartition.put(subPartition.getId(), (PhysicalPartitionImpl) subPartition);
nameToSubPartition.put(subPartition.getName(), (PhysicalPartitionImpl) subPartition);
}
}

public void removeSubPartition(long id) {
idToSubPartition.remove(id);
PhysicalPartitionImpl subPartition = idToSubPartition.remove(id);
if (subPartition != null) {
nameToSubPartition.remove(subPartition.getName());
}
}

public Collection<PhysicalPartition> getSubPartitions() {
Expand All @@ -218,6 +225,10 @@ public PhysicalPartition getSubPartition(long id) {
return this.id == id ? this : idToSubPartition.get(id);
}

public PhysicalPartition getSubPartition(String name) {
return this.name.equals(name) ? this : nameToSubPartition.get(name);
}

public long getParentId() {
return this.id;
}
Expand Down Expand Up @@ -541,4 +552,18 @@ public long getMinRetainVersion() {
public void setMinRetainVersion(long minRetainVersion) {
this.minRetainVersion = minRetainVersion;
}

public String generatePhysicalPartitionName(long physicalParitionId) {
return this.name + '_' + physicalParitionId;
}

@Override
public void gsonPostProcess() throws IOException {
for (PhysicalPartitionImpl subPartition : idToSubPartition.values()) {
if (subPartition.getName() == null) {
subPartition.setName(generatePhysicalPartitionName(subPartition.getId()));
}
nameToSubPartition.put(subPartition.getName(), subPartition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface PhysicalPartition {

// physical partition id
public long getId();
public String getName();
public void setName(String name);
public void setIdForRestore(long id);
public long getBeforeRestoreId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class PhysicalPartitionImpl extends MetaObject implements PhysicalPartiti
@SerializedName(value = "id")
private long id;

@SerializedName(value = "name")
private String name;

private long beforeRestoreId;

@SerializedName(value = "parentId")
Expand Down Expand Up @@ -92,8 +95,9 @@ public class PhysicalPartitionImpl extends MetaObject implements PhysicalPartiti

private volatile long minRetainVersion = 0;

public PhysicalPartitionImpl(long id, long parentId, long sharedGroupId, MaterializedIndex baseIndex) {
public PhysicalPartitionImpl(long id, String name, long parentId, long sharedGroupId, MaterializedIndex baseIndex) {
this.id = id;
this.name = name;
this.parentId = parentId;
this.baseIndex = baseIndex;
this.visibleVersion = PARTITION_INIT_VERSION;
Expand All @@ -107,6 +111,16 @@ public long getId() {
return this.id;
}

@Override
public String getName() {
return this.name;
}

@Override
public void setName(String name) {
this.name = name;
}

@Override
public void setIdForRestore(long id) {
this.beforeRestoreId = this.id;
Expand Down Expand Up @@ -402,6 +416,7 @@ public boolean equals(Object obj) {
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append("partitionId: ").append(id).append("; ");
buffer.append("partitionName: ").append(name).append("; ");
buffer.append("parentPartitionId: ").append(parentId).append("; ");
buffer.append("shardGroupId: ").append(shardGroupId).append("; ");
buffer.append("isImmutable: ").append(isImmutable()).append("; ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Table.TableType;
Expand Down Expand Up @@ -585,7 +585,7 @@ private static TableInfo initTableInfo(TTableReplicationRequest request) throws
tableDataSize = olapTable.getDataSize();

for (TPartitionReplicationInfo tPartitionInfo : request.partition_replication_infos.values()) {
Partition partition = olapTable.getPartition(tPartitionInfo.partition_id);
PhysicalPartition partition = olapTable.getPhysicalPartition(tPartitionInfo.partition_id);
if (partition == null) {
throw new MetaNotFoundException("Partition " + tPartitionInfo.partition_id + " in table "
+ table.getName() + " in database " + db.getFullName() + " not found");
Expand All @@ -611,12 +611,12 @@ private static TableInfo initTableInfo(TTableReplicationRequest request) throws
}

private static PartitionInfo initPartitionInfo(OlapTable olapTable, TPartitionReplicationInfo tPartitionInfo,
Partition partition) throws MetaNotFoundException {
PhysicalPartition partition) throws MetaNotFoundException {
Map<Long, IndexInfo> indexInfos = Maps.newHashMap();
for (TIndexReplicationInfo tIndexInfo : tPartitionInfo.index_replication_infos.values()) {
MaterializedIndex index = partition.getIndex(tIndexInfo.index_id);
if (index == null) {
throw new MetaNotFoundException("Index " + tIndexInfo.index_id + " in partition " + partition.getName()
throw new MetaNotFoundException("Index " + tIndexInfo.index_id + " in partition " + partition.getId()
+ " in table " + olapTable.getName() + " not found");
}
IndexInfo indexInfo = initIndexInfo(olapTable, tIndexInfo, index);
Expand Down Expand Up @@ -677,8 +677,8 @@ private static TableInfo initTableInfo(OlapTable table, OlapTable srcTable,
private static Map<Long, PartitionInfo> initPartitionInfos(OlapTable table, OlapTable srcTable,
SystemInfoService srcSystemInfoService) {
Map<Long, PartitionInfo> partitionInfos = Maps.newHashMap();
for (Partition partition : table.getPartitions()) {
Partition srcPartition = srcTable.getPartition(partition.getName());
for (PhysicalPartition partition : table.getPhysicalPartitions()) {
PhysicalPartition srcPartition = srcTable.getPhysicalPartition(partition.getName());
Preconditions.checkState(partition.getCommittedVersion() == partition.getVisibleVersion(),
"Partition " + partition.getName() + " in table " + table.getName()
+ " publish version not finished");
Expand All @@ -695,8 +695,8 @@ private static Map<Long, PartitionInfo> initPartitionInfos(OlapTable table, Olap
return partitionInfos;
}

private static PartitionInfo initPartitionInfo(OlapTable table, OlapTable srcTable, Partition partition,
Partition srcPartition, SystemInfoService srcSystemInfoService) {
private static PartitionInfo initPartitionInfo(OlapTable table, OlapTable srcTable, PhysicalPartition partition,
PhysicalPartition srcPartition, SystemInfoService srcSystemInfoService) {
Map<Long, IndexInfo> indexInfos = Maps.newHashMap();
for (Map.Entry<String, Long> indexNameToId : table.getIndexNameToId().entrySet()) {
long indexId = indexNameToId.getValue();
Expand Down
Loading

0 comments on commit 08b9092

Please sign in to comment.