From 07d46492512d2a31036a41e05b5c583ef32a3483 Mon Sep 17 00:00:00 2001 From: Vallish Date: Sat, 30 Nov 2024 13:24:24 +0000 Subject: [PATCH] [feat](binlog) Add Support recover binlog --- .../apache/doris/binlog/BinlogManager.java | 27 +++++++++++++++ .../org/apache/doris/binlog/DBBinlog.java | 33 +++++++++++++------ .../doris/catalog/CatalogRecycleBin.java | 6 ++-- .../doris/datasource/InternalCatalog.java | 2 +- .../org/apache/doris/persist/EditLog.java | 8 +++-- .../org/apache/doris/persist/RecoverInfo.java | 22 +++++++++++-- .../doris/persist/DropAndRecoverInfoTest.java | 2 +- gensrc/thrift/FrontendService.thrift | 4 +-- 8 files changed, 84 insertions(+), 20 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 67bb99a8bcdc18..b9eb91cc5f7ea6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -34,6 +34,7 @@ import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.ModifyCommentOperationLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; +import org.apache.doris.persist.RecoverInfo; import org.apache.doris.persist.ReplacePartitionOperationLog; import org.apache.doris.persist.ReplaceTableOperationLog; import org.apache.doris.persist.TableAddOrDropColumnsInfo; @@ -446,6 +447,32 @@ public void addDropRollup(DropInfo info, long commitSeq) { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); } + + private boolean supportedRecoverInfo(RecoverInfo info) { + //table name and partitionName added together. + // recover table case, tablename must exist in newer version + // recover partition case also table name must exist. + // so checking only table name here. + if (StringUtils.isEmpty(info.getTableName())) { + LOG.warn("skip recover info binlog, because tableName is empty. info: {}", info); + return false; + } + return true; + } + + public void addRecoverTableRecord(RecoverInfo info, long commitSeq) { + if (supportedRecoverInfo(info) == false) { + return; + } + long dbId = info.getDbId(); + List tableIds = Lists.newArrayList(); + tableIds.add(info.getTableId()); + long timestamp = -1; + TBinlogType type = TBinlogType.RECOVER_INFO; + String data = info.toJson(); + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); + } + // get binlog by dbId, return first binlog.version > version public Pair getBinlog(long dbId, long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index b78ed389a0fe86..0816564f150be2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -25,6 +25,7 @@ import org.apache.doris.persist.BarrierLog; import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropPartitionInfo; +import org.apache.doris.persist.RecoverInfo; import org.apache.doris.persist.ReplaceTableOperationLog; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; @@ -124,7 +125,7 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) { allBinlogs.add(binlog); binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); - recordDroppedResources(binlog); + recordDroppedOrRecoveredResources(binlog); if (tableIds == null) { return; @@ -178,7 +179,7 @@ public void addBinlog(TBinlog binlog, Object raw) { return; } - recordDroppedResources(binlog, raw); + recordDroppedOrRecoveredResources(binlog, raw); switch (binlog.getType()) { case CREATE_TABLE: @@ -623,16 +624,16 @@ public void getBinlogInfo(BaseProcResult result) { } } - private void recordDroppedResources(TBinlog binlog) { - recordDroppedResources(binlog, null); + private void recordDroppedOrRecoveredResources(TBinlog binlog) { + recordDroppedOrRecoveredResources(binlog, null); } // A method to record the dropped tables, indexes, and partitions. - private void recordDroppedResources(TBinlog binlog, Object raw) { - recordDroppedResources(binlog.getType(), binlog.getCommitSeq(), binlog.getData(), raw); + private void recordDroppedOrRecoveredResources(TBinlog binlog, Object raw) { + recordDroppedOrRecoveredResources(binlog.getType(), binlog.getCommitSeq(), binlog.getData(), raw); } - private void recordDroppedResources(TBinlogType binlogType, long commitSeq, String data, Object raw) { + private void recordDroppedOrRecoveredResources(TBinlogType binlogType, long commitSeq, String data, Object raw) { if (raw == null) { switch (binlogType) { case DROP_PARTITION: @@ -656,6 +657,9 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Stri case BARRIER: raw = BarrierLog.fromJson(data); break; + case RECOVER_INFO: + raw = RecoverInfo.fromJson(data); + break; default: break; } @@ -664,10 +668,10 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Stri } } - recordDroppedResources(binlogType, commitSeq, raw); + recordDroppedOrRecoveredResources(binlogType, commitSeq, raw); } - private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Object raw) { + private void recordDroppedOrRecoveredResources(TBinlogType binlogType, long commitSeq, Object raw) { if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) { long partitionId = ((DropPartitionInfo) raw).getPartitionId(); if (partitionId > 0) { @@ -706,7 +710,16 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Obje BarrierLog log = (BarrierLog) raw; // keep compatible with doris 2.0/2.1 if (log.hasBinlog()) { - recordDroppedResources(log.getBinlogType(), commitSeq, log.getBinlog(), null); + recordDroppedOrRecoveredResources(log.getBinlogType(), commitSeq, log.getBinlog(), null); + } + } else if ((binlogType == TBinlogType.RECOVER_INFO) && (raw instanceof RecoverInfo)) { + RecoverInfo recoverInfo = (RecoverInfo) raw; + long partitionId = recoverInfo.getPartitionId(); + long tableId = recoverInfo.getTableId(); + if (partitionId > 0) { + droppedPartitions.removeIf(entry -> (entry.first == partitionId)); + } else if (tableId > 0) { + droppedTables.removeIf(entry -> (entry.first == tableId)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index 745c1c8a351686..b5899435343b13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -774,7 +774,8 @@ private synchronized boolean innerRecoverTable(Database db, Table table, String LOG.info("replay recover table[{}]", table.getId()); } else { // log - RecoverInfo recoverInfo = new RecoverInfo(db.getId(), table.getId(), -1L, "", newTableName, ""); + RecoverInfo recoverInfo = new RecoverInfo(db.getId(), table.getId(), + -1L, "", table.getName(), newTableName, "", ""); Env.getCurrentEnv().getEditLog().logRecoverTable(recoverInfo); } // Only olap table need recover dynamic partition, other table like jdbc odbc view.. do not need it @@ -873,7 +874,8 @@ public synchronized void recoverPartition(long dbId, OlapTable table, String par idToRecycleTime.remove(partitionId); // log - RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "", "", newPartitionName); + RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "", + table.getName(), "", partitionName, newPartitionName); Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo); LOG.info("recover partition[{}]", partitionId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 64a759b9747044..000afcda6180ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -657,7 +657,7 @@ public void recoverDatabase(String dbName, long dbId, String newDbName) throws D fullNameToDb.put(db.getFullName(), db); idToDb.put(db.getId(), db); // log - RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L, newDbName, "", ""); + RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L, newDbName, "", "", "", ""); Env.getCurrentEnv().getEditLog().logRecoverDb(recoverInfo); db.unmarkDropped(); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 7d1f2127eecaaf..1f17b875c1822c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -297,11 +297,13 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { case OperationType.OP_RECOVER_TABLE: { RecoverInfo info = (RecoverInfo) journal.getData(); env.replayRecoverTable(info); + env.getBinlogManager().addRecoverTableRecord(info, logId); break; } case OperationType.OP_RECOVER_PARTITION: { RecoverInfo info = (RecoverInfo) journal.getData(); env.replayRecoverPartition(info); + env.getBinlogManager().addRecoverTableRecord(info, logId); break; } case OperationType.OP_RENAME_TABLE: { @@ -1436,7 +1438,8 @@ public void logErasePartition(long partitionId) { } public void logRecoverPartition(RecoverInfo info) { - logEdit(OperationType.OP_RECOVER_PARTITION, info); + long logId = logEdit(OperationType.OP_RECOVER_PARTITION, info); + Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId); } public void logModifyPartition(ModifyPartitionInfo info) { @@ -1463,7 +1466,8 @@ public void logEraseTable(long tableId) { } public void logRecoverTable(RecoverInfo info) { - logEdit(OperationType.OP_RECOVER_TABLE, info); + long logId = logEdit(OperationType.OP_RECOVER_TABLE, info); + Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId); } public void logDropRollup(DropInfo info) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java index 15764a99b43b29..eb4af6494e8a37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java @@ -38,10 +38,14 @@ public class RecoverInfo implements Writable, GsonPostProcessable { private String newDbName; @SerializedName(value = "tableId") private long tableId; + @SerializedName(value = "tableName") + private String tableName; /// added for table name. @SerializedName(value = "newTableName") private String newTableName; @SerializedName(value = "partitionId") private long partitionId; + @SerializedName(value = "partitionName") + private String partitionName; @SerializedName(value = "newPartitionName") private String newPartitionName; @@ -49,13 +53,15 @@ private RecoverInfo() { // for persist } - public RecoverInfo(long dbId, long tableId, long partitionId, String newDbName, String newTableName, - String newPartitionName) { + public RecoverInfo(long dbId, long tableId, long partitionId, String newDbName, String tableName, + String newTableName, String partitionName, String newPartitionName) { this.dbId = dbId; this.tableId = tableId; + this.tableName = tableName; this.partitionId = partitionId; this.newDbName = newDbName; this.newTableName = newTableName; + this.partitionName = partitionName; this.newPartitionName = newPartitionName; } @@ -67,6 +73,10 @@ public long getTableId() { return tableId; } + public String getTableName() { + return tableName; + } + public long getPartitionId() { return partitionId; } @@ -109,4 +119,12 @@ private void readFields(DataInput in) throws IOException { public void gsonPostProcess() throws IOException { newDbName = ClusterNamespace.getNameFromFullName(newDbName); } + + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + + public static RecoverInfo fromJson(String json) { + return GsonUtils.GSON.fromJson(json, RecoverInfo.class); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java index 8c74fba275352d..63afe375548fe0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java @@ -86,7 +86,7 @@ public void testRecoveryInfoSerialization() throws Exception { file.createNewFile(); DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); - RecoverInfo info1 = new RecoverInfo(1, 2, 3, "a", "b", "c"); + RecoverInfo info1 = new RecoverInfo(1, 2, 3, "a", "", "b", "", "c"); info1.write(dos); dos.flush(); dos.close(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index e2af8937425d0c..19643d2dd1d241 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1197,6 +1197,7 @@ enum TBinlogType { RENAME_ROLLUP = 21, RENAME_PARTITION = 22, DROP_ROLLUP = 23, + RECOVER_INFO = 24, // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking @@ -1213,8 +1214,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 24, - UNKNOWN_9 = 25, + MIN_UNKNOWN = 25, UNKNOWN_10 = 26, UNKNOWN_11 = 27, UNKNOWN_12 = 28,