From 544aa73c767d135140132152de699386fe939d97 Mon Sep 17 00:00:00 2001 From: w41ter Date: Wed, 27 Nov 2024 09:03:48 +0000 Subject: [PATCH] [feat](binlog) Support drop rollup binlog --- .../doris/alter/MaterializedViewHandler.java | 16 +++++------ .../apache/doris/binlog/BinlogManager.java | 12 +++++++++ .../org/apache/doris/binlog/DBBinlog.java | 9 +++++++ .../doris/datasource/InternalCatalog.java | 4 +-- .../apache/doris/persist/BatchDropInfo.java | 12 +++++++-- .../org/apache/doris/persist/DropInfo.java | 15 +++++++++-- .../org/apache/doris/persist/EditLog.java | 27 ++++++++++++++----- .../doris/persist/DropAndRecoverInfoTest.java | 10 +++---- gensrc/thrift/FrontendService.thrift | 4 +-- 9 files changed, 80 insertions(+), 29 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index d02e91a379f5600..a6f1cae99876782 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -944,14 +944,12 @@ public void processBatchDropRollup(List dropRollupClauses, Database } // drop data in memory - Set indexIdSet = new HashSet<>(); - Set rollupNameSet = new HashSet<>(); + Map rollupNameMap = new HashMap<>(); for (AlterClause alterClause : dropRollupClauses) { DropRollupClause dropRollupClause = (DropRollupClause) alterClause; String rollupIndexName = dropRollupClause.getRollupName(); long rollupIndexId = dropMaterializedView(rollupIndexName, olapTable); - indexIdSet.add(rollupIndexId); - rollupNameSet.add(rollupIndexName); + rollupNameMap.put(rollupIndexId, rollupIndexName); } // batch log drop rollup operation @@ -959,10 +957,10 @@ public void processBatchDropRollup(List dropRollupClauses, Database long dbId = db.getId(); long tableId = olapTable.getId(); String tableName = olapTable.getName(); - editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, indexIdSet)); - deleteIndexList = indexIdSet.stream().collect(Collectors.toList()); + editLog.logBatchDropRollup(new BatchDropInfo(dbId, tableId, tableName, rollupNameMap)); + deleteIndexList = rollupNameMap.keySet().stream().collect(Collectors.toList()); LOG.info("finished drop rollup index[{}] in table[{}]", - String.join("", rollupNameSet), olapTable.getName()); + String.join("", rollupNameMap.values()), olapTable.getName()); } finally { olapTable.writeUnlock(); } @@ -982,8 +980,8 @@ public void processDropMaterializedView(DropMaterializedViewStmt dropMaterialize long mvIndexId = dropMaterializedView(mvName, olapTable); // Step3: log drop mv operation EditLog editLog = Env.getCurrentEnv().getEditLog(); - editLog.logDropRollup( - new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), mvIndexId, false, false, 0)); + editLog.logDropRollup(new DropInfo(db.getId(), olapTable.getId(), olapTable.getName(), + mvIndexId, mvName, false, false, 0)); deleteIndexList.add(mvIndexId); LOG.info("finished drop materialized view [{}] in table [{}]", mvName, olapTable.getName()); } catch (MetaNotFoundException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 3a033c981038c29..c22c5b12e04e941 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -30,6 +30,7 @@ import org.apache.doris.persist.BarrierLog; import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.BinlogGcInfo; +import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.ModifyCommentOperationLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; @@ -429,6 +430,17 @@ public void addIndexChangeJob(IndexChangeJob indexChangeJob, long commitSeq) { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, indexChangeJob); } + public void addDropRollup(DropInfo info, long commitSeq) { + long dbId = info.getDbId(); + List tableIds = Lists.newArrayList(); + tableIds.add(info.getTableId()); + long timestamp = -1; + TBinlogType type = TBinlogType.DROP_ROLLUP; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); + } + // get binlog by dbId, return first binlog.version > version public Pair getBinlog(long dbId, long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index c96e994be91c3ad..b78ed389a0fe864 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -23,6 +23,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.persist.BarrierLog; +import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.ReplaceTableOperationLog; import org.apache.doris.thrift.TBinlog; @@ -649,6 +650,9 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Stri case REPLACE_TABLE: raw = ReplaceTableOperationLog.fromJson(data); break; + case DROP_ROLLUP: + raw = DropInfo.fromJson(data); + break; case BARRIER: raw = BarrierLog.fromJson(data); break; @@ -693,6 +697,11 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Obje if (!record.isSwapTable()) { droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq)); } + } else if (binlogType == TBinlogType.DROP_ROLLUP && raw instanceof DropInfo) { + long indexId = ((DropInfo) raw).getIndexId(); + if (indexId > 0) { + droppedIndexes.add(Pair.of(indexId, commitSeq)); + } } else if (binlogType == TBinlogType.BARRIER && raw instanceof BarrierLog) { BarrierLog log = (BarrierLog) raw; // keep compatible with doris 2.0/2.1 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 3a9e96bade67f53..43150a647ca3c75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1006,7 +1006,7 @@ private void dropTableInternal(Database db, Table table, boolean isView, boolean Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(), db.getId(), table.getId()); - DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, -1L, isView, forceDrop, recycleTime); + DropInfo info = new DropInfo(db.getId(), table.getId(), tableName, isView, forceDrop, recycleTime); Env.getCurrentEnv().getEditLog().logDropTable(info); Env.getCurrentEnv().getMtmvService().dropTable(table); } @@ -3250,7 +3250,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx try { dropTable(db, tableId, true, false, 0L); if (hadLogEditCreateTable) { - DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), -1L, false, true, 0L); + DropInfo info = new DropInfo(db.getId(), tableId, olapTable.getName(), false, true, 0L); Env.getCurrentEnv().getEditLog().logDropTable(info); } } catch (Exception ex) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java index fdfc44e27bbebc5..260ad316d3cc246 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BatchDropInfo.java @@ -26,6 +26,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -43,12 +44,15 @@ public class BatchDropInfo implements Writable { private String tableName; // not used in equals and hashCode @SerializedName(value = "indexIdSet") private Set indexIdSet; + @SerializedName(value = "indexNameMap") + private Map indexNameMap; // not used in equals and hashCode - public BatchDropInfo(long dbId, long tableId, String tableName, Set indexIdSet) { + public BatchDropInfo(long dbId, long tableId, String tableName, Map indexNameMap) { this.dbId = dbId; this.tableId = tableId; this.tableName = tableName; - this.indexIdSet = indexIdSet; + this.indexIdSet = indexNameMap.keySet(); + this.indexNameMap = indexNameMap; } @Override @@ -82,6 +86,10 @@ public Set getIndexIdSet() { return indexIdSet; } + public Map getIndexNameMap() { + return indexNameMap; + } + public long getDbId() { return dbId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java index 461f3ddd67d5a7c..f2cdfba4c4b2249 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropInfo.java @@ -38,6 +38,8 @@ public class DropInfo implements Writable { private String tableName; // not used in equals and hashCode @SerializedName(value = "indexId") private long indexId; + @SerializedName(value = "indexName") + private long indexName; // not used in equals and hashCode @SerializedName(value = "isView") private boolean isView = false; @SerializedName(value = "forceDrop") @@ -48,8 +50,13 @@ public class DropInfo implements Writable { public DropInfo() { } - public DropInfo(long dbId, long tableId, String tableName, long indexId, boolean isView, boolean forceDrop, - long recycleTime) { + public DropInfo(long dbId, long tableId, String tableName, boolean isView, boolean forceDrop, + long recycleTime) { + this(dbId, tableId, tableName, -1, "", isView, forceDrop, recycleTime); + } + + public DropInfo(long dbId, long tableId, String tableName, long indexId, String indexName, boolean isView, + boolean forceDrop, long recycleTime) { this.dbId = dbId; this.tableId = tableId; this.tableName = tableName; @@ -133,4 +140,8 @@ public boolean equals(Object obj) { public String toJson() { return GsonUtils.GSON.toJson(this); } + + public static DropInfo fromJson(String json) { + return GsonUtils.GSON.fromJson(json, DropInfo.class); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 1e70eb634b2073c..7d1f2127eecaafc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -105,6 +105,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; /** * EditLog maintains a log of the memory modifications. @@ -341,15 +342,18 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { case OperationType.OP_DROP_ROLLUP: { DropInfo info = (DropInfo) journal.getData(); env.getMaterializedViewHandler().replayDropRollup(info, env); + env.getBinlogManager().addDropRollup(info, logId); break; } case OperationType.OP_BATCH_DROP_ROLLUP: { BatchDropInfo batchDropInfo = (BatchDropInfo) journal.getData(); - for (long indexId : batchDropInfo.getIndexIdSet()) { - env.getMaterializedViewHandler().replayDropRollup( - new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(), - batchDropInfo.getTableName(), indexId, false, false, 0), - env); + for (Map.Entry entry : batchDropInfo.getIndexNameMap().entrySet()) { + long indexId = entry.getKey(); + String indexName = entry.getValue(); + DropInfo info = new DropInfo(batchDropInfo.getDbId(), batchDropInfo.getTableId(), + batchDropInfo.getTableName(), indexId, indexName, false, false, 0); + env.getMaterializedViewHandler().replayDropRollup(info, env); + env.getBinlogManager().addDropRollup(info, logId); } break; } @@ -1463,11 +1467,20 @@ public void logRecoverTable(RecoverInfo info) { } public void logDropRollup(DropInfo info) { - logEdit(OperationType.OP_DROP_ROLLUP, info); + long logId = logEdit(OperationType.OP_DROP_ROLLUP, info); + Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId); } public void logBatchDropRollup(BatchDropInfo batchDropInfo) { - logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo); + long logId = logEdit(OperationType.OP_BATCH_DROP_ROLLUP, batchDropInfo); + for (Map.Entry entry : batchDropInfo.getIndexNameMap().entrySet()) { + DropInfo info = new DropInfo(batchDropInfo.getDbId(), + batchDropInfo.getTableId(), + batchDropInfo.getTableName(), + entry.getKey(), entry.getValue(), + false, true, 0); + Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId); + } } public void logFinishConsistencyCheck(ConsistencyCheckInfo info) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java index 88aa22ded22e5e0..8c74fba275352d7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java @@ -44,7 +44,7 @@ public void testDropInfoSerialization() throws Exception { DropInfo info1 = new DropInfo(); info1.write(dos); - DropInfo info2 = new DropInfo(1, 2, "t2", -1, false, true, 0); + DropInfo info2 = new DropInfo(1, 2, "t2", -1, "", false, true, 0); info2.write(dos); dos.flush(); @@ -65,10 +65,10 @@ public void testDropInfoSerialization() throws Exception { Assert.assertEquals(rInfo2, rInfo2); Assert.assertNotEquals(rInfo2, this); - Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, false, true, 0)); - Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, false, true, 0)); - Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, false, false, 0)); - Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, false, true, 0)); + Assert.assertNotEquals(info2, new DropInfo(0, 2, "t2", -1L, "", false, true, 0)); + Assert.assertNotEquals(info2, new DropInfo(1, 0, "t0", -1L, "", false, true, 0)); + Assert.assertNotEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false, false, 0)); + Assert.assertEquals(info2, new DropInfo(1, 2, "t2", -1L, "", false, true, 0)); // 3. delete files dis.close(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index fc7f98e16e3f746..e2af8937425d0c7 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1196,6 +1196,7 @@ enum TBinlogType { INDEX_CHANGE_JOB = 20, RENAME_ROLLUP = 21, RENAME_PARTITION = 22, + DROP_ROLLUP = 23, // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking @@ -1212,8 +1213,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 23, - UNKNOWN_8 = 24, + MIN_UNKNOWN = 24, UNKNOWN_9 = 25, UNKNOWN_10 = 26, UNKNOWN_11 = 27,