Skip to content

Commit

Permalink
[feat](binlog) Support drop rollup binlog
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Nov 27, 2024
1 parent 805520b commit 0ea9a11
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -944,25 +944,23 @@ public void processBatchDropRollup(List<AlterClause> dropRollupClauses, Database
}

// drop data in memory
Set<Long> indexIdSet = new HashSet<>();
Set<String> rollupNameSet = new HashSet<>();
Map<Long, String> rollupNameMap = new HashMap<>();
for (AlterClause alterClause : dropRollupClauses) {
DropRollupClause dropRollupClause = (DropRollupClause) alterClause;
String rollupIndexName = dropRollupClause.getRollupName();
long rollupIndexId = dropMaterializedView(rollupIndexName, olapTable);
indexIdSet.add(rollupIndexId);
rollupNameSet.add(rollupIndexName);
rollupNameMap.put(rollupIndexId, rollupIndexName);
}

// batch log drop rollup operation
EditLog editLog = Env.getCurrentEnv().getEditLog();
long dbId = db.getId();
long tableId = olapTable.getId();
String tableName = olapTable.getName();
editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, indexIdSet));
deleteIndexList = indexIdSet.stream().collect(Collectors.toList());
editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, rollupNameMap));
deleteIndexList = rollupNameMap.keySet().stream().collect(Collectors.toList());
LOG.info("finished drop rollup index[{}] in table[{}]",
String.join("", rollupNameSet), olapTable.getName());
String.join("", rollupNameMap.values()), olapTable.getName());
} finally {
olapTable.writeUnlock();
}
Expand All @@ -982,8 +980,8 @@ public void processDropMaterializedView(DropMaterializedViewStmt dropMaterialize
long mvIndexId = dropMaterializedView(mvName, olapTable);
// Step3: log drop mv operation
EditLog editLog = Env.getCurrentEnv().getEditLog();
editLog.logDropRollup(
new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), mvIndexId, false, false, 0));
editLog.logDropRollup(new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(),
mvIndexId, mvName, false, false, 0));
deleteIndexList.add(mvIndexId);
LOG.info("finished drop materialized view [{}] in table [{}]", mvName, olapTable.getName());
} catch (MetaNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
Expand Down Expand Up @@ -429,6 +430,22 @@ public void addIndexChangeJob(IndexChangeJob indexChangeJob, long commitSeq) {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, indexChangeJob);
}

public void addDropRollup(DropInfo info, long commitSeq) {
if (StringUtils.isEmpty(info.getIndexName())) {
LOG.warning("skip drop rollup binlog, because indexName is empty. info: {}", info);
return;
}

long dbId = info.getDbId();
List<Long> tableIds = Lists.newArrayList();
tableIds.add(info.getTableId());
long timestamp = -1;
TBinlogType type = TBinlogType.DROP_ROLLUP;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}

// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.thrift.TBinlog;
Expand Down Expand Up @@ -649,6 +650,9 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Stri
case REPLACE_TABLE:
raw = ReplaceTableOperationLog.fromJson(data);
break;
case DROP_ROLLUP:
raw = DropInfo.fromJson(data);
break;
case BARRIER:
raw = BarrierLog.fromJson(data);
break;
Expand Down Expand Up @@ -693,6 +697,11 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Obje
if (!record.isSwapTable()) {
droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq));
}
} else if (binlogType == TBinlogType.DROP_ROLLUP && raw instanceof DropInfo) {
long indexId = ((DropInfo) raw).getIndexId();
if (indexId > 0) {
droppedIndexes.add(Pair.of(indexId, commitSeq));
}
} else if (binlogType == TBinlogType.BARRIER && raw instanceof BarrierLog) {
BarrierLog log = (BarrierLog) raw;
// keep compatible with doris 2.0/2.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ private void dropTableInternal(Database db, Table table, boolean isView, boolean

Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
db.getId(), table.getId());
DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, -1L, isView, forceDrop, recycleTime);
DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, isView, forceDrop, recycleTime);
Env.getCurrentEnv().getEditLog().logDropTable(info);
Env.getCurrentEnv().getMtmvService().dropTable(table);
}
Expand Down Expand Up @@ -3250,7 +3250,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
try {
dropTable(db, tableId, true, false, 0L);
if (hadLogEditCreateTable) {
DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), -1L, false, true, 0L);
DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), false, true, 0L);
Env.getCurrentEnv().getEditLog().logDropTable(info);
}
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

Expand All @@ -43,12 +44,15 @@ public class BatchDropInfo implements Writable {
private String tableName; // not used in equals and hashCode
@SerializedName(value = "indexIdSet")
private Set<Long> indexIdSet;
@SerializedName(value = "indexNameMap")
private Map<Long, String> indexNameMap; // not used in equals and hashCode

public BatchDropInfo(long dbId, long tableId, String tableName, Set<Long> indexIdSet) {
public BatchDropInfo(long dbId, long tableId, String tableName, Map<Long, String> indexNameMap) {
this.dbId = dbId;
this.tableId = tableId;
this.tableName = tableName;
this.indexIdSet = indexIdSet;
this.indexIdSet = indexNameMap.keySet();
this.indexNameMap = indexNameMap;
}

@Override
Expand Down Expand Up @@ -82,6 +86,10 @@ public Set<Long> getIndexIdSet() {
return indexIdSet;
}

public Map<Long, String> getIndexNameMap() {
return indexNameMap;
}

public long getDbId() {
return dbId;
}
Expand Down
19 changes: 17 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class DropInfo implements Writable {
private String tableName; // not used in equals and hashCode
@SerializedName(value = "indexId")
private long indexId;
@SerializedName(value = "indexName")
private long indexName; // not used in equals and hashCode
@SerializedName(value = "isView")
private boolean isView = false;
@SerializedName(value = "forceDrop")
Expand All @@ -48,8 +50,13 @@ public class DropInfo implements Writable {
public DropInfo() {
}

public DropInfo(long dbId, long tableId, String tableName, long indexId, boolean isView, boolean forceDrop,
long recycleTime) {
public DropInfo(long dbId, long tableId, String tableName, boolean isView, boolean forceDrop,
long recycleTime) {
this(dbId, tableId, tableName, -1, "", isView, forceDrop, recycleTime);
}

public DropInfo(long dbId, long tableId, String tableName, long indexId, String indexName, boolean isView,
boolean forceDrop, long recycleTime) {
this.dbId = dbId;
this.tableId = tableId;
this.tableName = tableName;
Expand All @@ -75,6 +82,10 @@ public long getIndexId() {
return this.indexId;
}

public String getIndexName() {
return this.indexName;
}

public boolean isView() {
return this.isView;
}
Expand Down Expand Up @@ -133,4 +144,8 @@ public boolean equals(Object obj) {
public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static DropInfo fromJson(String json) {
return GsonUtils.GSON.fromJson(json, DropInfo.class);
}
}
27 changes: 20 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* EditLog maintains a log of the memory modifications.
Expand Down Expand Up @@ -341,15 +342,18 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
case OperationType.OP_DROP_ROLLUP: {
DropInfo info = (DropInfo) journal.getData();
env.getMaterializedViewHandler().replayDropRollup(info, env);
env.getBinlogManager().addDropRollup(info, logId);
break;
}
case OperationType.OP_BATCH_DROP_ROLLUP: {
BatchDropInfo batchDropInfo = (BatchDropInfo) journal.getData();
for (long indexId : batchDropInfo.getIndexIdSet()) {
env.getMaterializedViewHandler().replayDropRollup(
new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(),
batchDropInfo.getTableName(), indexId, false, false, 0),
env);
for (Map.Entry<Long, String> entry : batchDropInfo.getIndexNameMap().entrySet()) {
long indexId = entry.getKey();
String indexName = entry.getValue();
DropInfo info = new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(),
batchDropInfo.getTableName(), indexId, indexName, false, false, 0);
env.getMaterializedViewHandler().replayDropRollup(info, env);
env.getBinlogManager().addDropRollup(info, logId);
}
break;
}
Expand Down Expand Up @@ -1463,11 +1467,20 @@ public void logRecoverTable(RecoverInfo info) {
}

public void logDropRollup(DropInfo info) {
logEdit(OperationType.OP_DROP_ROLLUP, info);
long logId = logEdit(OperationType.OP_DROP_ROLLUP, info);
Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId);
}

public void logBatchDropRollup(BatchDropInfo batchDropInfo) {
logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo);
long logId = logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo);
for (Map.Entry<Long, String> entry : batchDropInfo.getIndexNameMap().entrySet()) {
DropInfo info = new DropInfo(batchDropInfo.getDbId(),
batchDropInfo.getTableId(),
batchDropInfo.getTableName(),
entry.getKey(), entry.getValue(),
false, true, 0);
Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId);
}
}

public void logFinishConsistencyCheck(ConsistencyCheckInfo info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testDropInfoSerialization() throws Exception {
DropInfo info1 = new DropInfo();
info1.write(dos);

DropInfo info2 = new DropInfo(1, 2, "t2", -1, false, true, 0);
DropInfo info2 = new DropInfo(1, 2, "t2", -1, "", false, true, 0);
info2.write(dos);

dos.flush();
Expand All @@ -65,10 +65,10 @@ public void testDropInfoSerialization() throws Exception {

Assert.assertEquals(rInfo2, rInfo2);
Assert.assertNotEquals(rInfo2, this);
Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, false, true, 0));
Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, false, true, 0));
Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, false, false, 0));
Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, false, true, 0));
Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, "", false, true, 0));
Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, "", false, true, 0));
Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false, false, 0));
Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false, true, 0));

// 3. delete files
dis.close();
Expand Down
4 changes: 2 additions & 2 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,7 @@ enum TBinlogType {
INDEX_CHANGE_JOB = 20,
RENAME_ROLLUP = 21,
RENAME_PARTITION = 22,
DROP_ROLLUP = 23,

// Keep some IDs for allocation so that when new binlog types are added in the
// future, the changes can be picked back to the old versions without breaking
Expand All @@ -1212,8 +1213,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
MIN_UNKNOWN = 23,
UNKNOWN_8 = 24,
MIN_UNKNOWN = 24,
UNKNOWN_9 = 25,
UNKNOWN_10 = 26,
UNKNOWN_11 = 27,
Expand Down

0 comments on commit 0ea9a11

Please sign in to comment.