Skip to content

Commit

Permalink
[BugFix] Fix dynamic partition table unexpectly stop scheduling (#45235)
Browse files Browse the repository at this point in the history
Signed-off-by: Dejun Xia <[email protected]>
(cherry picked from commit d97d273)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/catalog/DynamicPartitionProperty.java
#	fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
#	fe/fe-core/src/main/java/com/starrocks/clone/DynamicPartitionScheduler.java
#	fe/fe-core/src/main/java/com/starrocks/common/util/DynamicPartitionUtil.java
#	fe/fe-core/src/main/java/com/starrocks/qe/ShowExecutor.java
  • Loading branch information
nshangyiming authored and mergify[bot] committed May 8, 2024
1 parent a5840ab commit 3fd7c72
Show file tree
Hide file tree
Showing 11 changed files with 730 additions and 35 deletions.
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.starrocks.persist.CreateTableInfo;
import com.starrocks.persist.DropInfo;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatsConstants;
import com.starrocks.system.SystemInfoService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -863,6 +864,10 @@ public boolean isInfoSchemaDb() {
return fullQualifiedName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME);
}

public boolean isStatisticsDatabase() {
return fullQualifiedName.equalsIgnoreCase(StatsConstants.STATISTICS_DB_NAME);
}

// the invoker should hold db's writeLock
public void setExist(boolean exist) {
this.exist = exist;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public class DynamicPartitionProperty {
public static final int NOT_SET_HISTORY_PARTITION_NUM = 0;
public static final String NOT_SET_PREFIX = "p";

private boolean exist;
private final boolean exists;

private boolean enable;
private boolean enabled;
private String timeUnit;
private int start;
private int end;
Expand All @@ -64,8 +64,8 @@ public class DynamicPartitionProperty {
private int historyPartitionNum;
public DynamicPartitionProperty(Map<String, String> properties) {
if (properties != null && !properties.isEmpty()) {
this.exist = true;
this.enable = Boolean.parseBoolean(properties.get(ENABLE));
this.exists = true;
this.enabled = Boolean.parseBoolean(properties.get(ENABLE));
this.timeUnit = properties.get(TIME_UNIT);
this.tz = TimeUtils.getOrSystemTimeZone(properties.get(TIME_ZONE));
// In order to compatible dynamic add partition version
Expand All @@ -79,7 +79,7 @@ public DynamicPartitionProperty(Map<String, String> properties) {
HISTORY_PARTITION_NUM, String.valueOf(NOT_SET_HISTORY_PARTITION_NUM)));
createStartOfs(properties);
} else {
this.exist = false;
this.exists = false;
}
}

Expand All @@ -99,8 +99,8 @@ private void createStartOfs(Map<String, String> properties) {
}
}

public boolean isExist() {
return exist;
public boolean isExists() {
return exists;
}

public String getTimeUnit() {
Expand All @@ -123,8 +123,8 @@ public int getBuckets() {
return buckets;
}

public boolean getEnable() {
return enable;
public boolean isEnabled() {
return enabled;
}

public StartOfDate getStartOfWeek() {
Expand Down Expand Up @@ -158,6 +158,7 @@ public int getHistoryPartitionNum() {
return historyPartitionNum;
}

<<<<<<< HEAD
public String getPropString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
Expand All @@ -167,6 +168,16 @@ public String getPropString() {
sb.append(START + ":" + start + ",");
sb.append(END + ":" + end + ",");
sb.append(PREFIX + ":" + prefix + ",");
=======
public Map<String, String> getProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(ENABLE, String.valueOf(enabled));
properties.put(TIME_UNIT, timeUnit);
properties.put(TIME_ZONE, tz.getID());
properties.put(START, String.valueOf(start));
properties.put(END, String.valueOf(end));
properties.put(PREFIX, prefix);
>>>>>>> d97d27382e ([BugFix] Fix dynamic partition table unexpectly stop scheduling (#45235))
if (buckets > 0) {
sb.append(BUCKETS + ":" + buckets + ",");
}
Expand All @@ -190,7 +201,7 @@ public void setTimeUnit(String timeUnit) {

@Override
public String toString() {
String res = ",\n\"" + ENABLE + "\" = \"" + enable + "\""
String res = ",\n\"" + ENABLE + "\" = \"" + enabled + "\""
+ ",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\""
+ ",\n\"" + TIME_ZONE + "\" = \"" + tz.getID() + "\""
+ ",\n\"" + START + "\" = \"" + start + "\""
Expand Down
78 changes: 77 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public TableProperty getTableProperty() {
public boolean dynamicPartitionExists() {
return tableProperty != null
&& tableProperty.getDynamicPartitionProperty() != null
&& tableProperty.getDynamicPartitionProperty().isExist();
&& tableProperty.getDynamicPartitionProperty().isExists();
}

public void setBaseIndexId(long baseIndexId) {
Expand Down Expand Up @@ -2150,6 +2150,82 @@ public void onCreate() {
}

@Override
<<<<<<< HEAD
=======
public void onCreate(Database db) {
super.onCreate(db);

ColocateTableIndex colocateTableIndex = GlobalStateMgr.getCurrentState().getColocateTableIndex();
if (colocateTableIndex.isColocateTable(getId())) {
ColocateTableIndex.GroupId groupId = colocateTableIndex.getGroup(getId());
List<List<Long>> backendsPerBucketSeq = colocateTableIndex.getBackendsPerBucketSeq(groupId);
ColocatePersistInfo colocatePersistInfo = ColocatePersistInfo.createForAddTable(groupId, getId(),
backendsPerBucketSeq);
GlobalStateMgr.getCurrentState().getEditLog().logColocateAddTable(colocatePersistInfo);
}

DynamicPartitionUtil.registerOrRemovePartitionScheduleInfo(db.getId(), this);

if (Config.dynamic_partition_enable && getTableProperty().getDynamicPartitionProperty().isEnabled()) {
new Thread(() -> {
try {
GlobalStateMgr.getCurrentState().getDynamicPartitionScheduler()
.executeDynamicPartitionForTable(db.getId(), getId());
} catch (Exception ex) {
LOG.warn("Some problems were encountered in the process of triggering " +
"the execution of dynamic partitioning", ex);
}
}, "BackgroundDynamicPartitionThread").start();
}

if (isTemporaryTable()) {
TemporaryTableMgr temporaryTableMgr = GlobalStateMgr.getCurrentState().getTemporaryTableMgr();
temporaryTableMgr.addTemporaryTable(sessionId, db.getId(), name, id);
LOG.debug("add temporary table, name[{}] id[{}] session[{}]", name, id, sessionId);
}
}

private void analyzePartitionInfo() {
if (!(partitionInfo instanceof ExpressionRangePartitionInfo)) {
return;
}
ExpressionRangePartitionInfo expressionRangePartitionInfo = (ExpressionRangePartitionInfo) partitionInfo;
// currently, automatic partition only supports one expression
Expr partitionExpr = expressionRangePartitionInfo.getPartitionExprs().get(0);
// for Partition slot ref, the SlotDescriptor is not serialized, so should
// recover it here.
// the SlotDescriptor is used by toThrift, which influences the execution
// process.
List<SlotRef> slotRefs = Lists.newArrayList();
partitionExpr.collect(SlotRef.class, slotRefs);
Preconditions.checkState(slotRefs.size() == 1);
// schema change should update slot id
for (int i = 0; i < fullSchema.size(); i++) {
Column column = fullSchema.get(i);
if (column.getName().equalsIgnoreCase(slotRefs.get(0).getColumnName())) {
SlotDescriptor slotDescriptor = new SlotDescriptor(new SlotId(i), column.getName(),
column.getType(), column.isAllowNull());
slotRefs.get(0).setDesc(slotDescriptor);
}
}
}

// Remove all Tablets belonging to this table from TabletInvertedIndex
public void removeTabletsFromInvertedIndex() {
TabletInvertedIndex invertedIndex = GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
Collection<Partition> allPartitions = getAllPartitions();
for (Partition partition : allPartitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
}
}

// If you are modifying this function, please check if you need to modify LakeTable.onDrop also.
@Override
>>>>>>> d97d27382e ([BugFix] Fix dynamic partition table unexpectly stop scheduling (#45235))
public void onDrop(Database db, boolean force, boolean replay) {
// drop all temp partitions of this table, so that there is no temp partitions in recycle bin,
// which make things easier.
Expand Down
Loading

0 comments on commit 3fd7c72

Please sign in to comment.