Skip to content

Commit

Permalink
[feat](binlog) Add Support recover binlog
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp committed Dec 9, 2024
1 parent 7c6ece0 commit 07d4649
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
Expand Down Expand Up @@ -446,6 +447,32 @@ public void addDropRollup(DropInfo info, long commitSeq) {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}


private boolean supportedRecoverInfo(RecoverInfo info) {
//table name and partitionName added together.
// recover table case, tablename must exist in newer version
// recover partition case also table name must exist.
// so checking only table name here.
if (StringUtils.isEmpty(info.getTableName())) {
LOG.warn("skip recover info binlog, because tableName is empty. info: {}", info);
return false;
}
return true;
}

public void addRecoverTableRecord(RecoverInfo info, long commitSeq) {
if (supportedRecoverInfo(info) == false) {
return;
}
long dbId = info.getDbId();
List<Long> tableIds = Lists.newArrayList();
tableIds.add(info.getTableId());
long timestamp = -1;
TBinlogType type = TBinlogType.RECOVER_INFO;
String data = info.toJson();
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}

// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
Expand Down
33 changes: 23 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
Expand Down Expand Up @@ -124,7 +125,7 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {

allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
recordDroppedResources(binlog);
recordDroppedOrRecoveredResources(binlog);

if (tableIds == null) {
return;
Expand Down Expand Up @@ -178,7 +179,7 @@ public void addBinlog(TBinlog binlog, Object raw) {
return;
}

recordDroppedResources(binlog, raw);
recordDroppedOrRecoveredResources(binlog, raw);

switch (binlog.getType()) {
case CREATE_TABLE:
Expand Down Expand Up @@ -623,16 +624,16 @@ public void getBinlogInfo(BaseProcResult result) {
}
}

private void recordDroppedResources(TBinlog binlog) {
recordDroppedResources(binlog, null);
private void recordDroppedOrRecoveredResources(TBinlog binlog) {
recordDroppedOrRecoveredResources(binlog, null);
}

// A method to record the dropped tables, indexes, and partitions.
private void recordDroppedResources(TBinlog binlog, Object raw) {
recordDroppedResources(binlog.getType(), binlog.getCommitSeq(), binlog.getData(), raw);
private void recordDroppedOrRecoveredResources(TBinlog binlog, Object raw) {
recordDroppedOrRecoveredResources(binlog.getType(), binlog.getCommitSeq(), binlog.getData(), raw);
}

private void recordDroppedResources(TBinlogType binlogType, long commitSeq, String data, Object raw) {
private void recordDroppedOrRecoveredResources(TBinlogType binlogType, long commitSeq, String data, Object raw) {
if (raw == null) {
switch (binlogType) {
case DROP_PARTITION:
Expand All @@ -656,6 +657,9 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Stri
case BARRIER:
raw = BarrierLog.fromJson(data);
break;
case RECOVER_INFO:
raw = RecoverInfo.fromJson(data);
break;
default:
break;
}
Expand All @@ -664,10 +668,10 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Stri
}
}

recordDroppedResources(binlogType, commitSeq, raw);
recordDroppedOrRecoveredResources(binlogType, commitSeq, raw);
}

private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Object raw) {
private void recordDroppedOrRecoveredResources(TBinlogType binlogType, long commitSeq, Object raw) {
if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
Expand Down Expand Up @@ -706,7 +710,16 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Obje
BarrierLog log = (BarrierLog) raw;
// keep compatible with doris 2.0/2.1
if (log.hasBinlog()) {
recordDroppedResources(log.getBinlogType(), commitSeq, log.getBinlog(), null);
recordDroppedOrRecoveredResources(log.getBinlogType(), commitSeq, log.getBinlog(), null);
}
} else if ((binlogType == TBinlogType.RECOVER_INFO) && (raw instanceof RecoverInfo)) {
RecoverInfo recoverInfo = (RecoverInfo) raw;
long partitionId = recoverInfo.getPartitionId();
long tableId = recoverInfo.getTableId();
if (partitionId > 0) {
droppedPartitions.removeIf(entry -> (entry.first == partitionId));
} else if (tableId > 0) {
droppedTables.removeIf(entry -> (entry.first == tableId));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,8 @@ private synchronized boolean innerRecoverTable(Database db, Table table, String
LOG.info("replay recover table[{}]", table.getId());
} else {
// log
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), table.getId(), -1L, "", newTableName, "");
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), table.getId(),
-1L, "", table.getName(), newTableName, "", "");
Env.getCurrentEnv().getEditLog().logRecoverTable(recoverInfo);
}
// Only olap table need recover dynamic partition, other table like jdbc odbc view.. do not need it
Expand Down Expand Up @@ -873,7 +874,8 @@ public synchronized void recoverPartition(long dbId, OlapTable table, String par
idToRecycleTime.remove(partitionId);

// log
RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "", "", newPartitionName);
RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "",
table.getName(), "", partitionName, newPartitionName);
Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
LOG.info("recover partition[{}]", partitionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ public void recoverDatabase(String dbName, long dbId, String newDbName) throws D
fullNameToDb.put(db.getFullName(), db);
idToDb.put(db.getId(), db);
// log
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L, newDbName, "", "");
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L, newDbName, "", "", "", "");
Env.getCurrentEnv().getEditLog().logRecoverDb(recoverInfo);
db.unmarkDropped();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,13 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
case OperationType.OP_RECOVER_TABLE: {
RecoverInfo info = (RecoverInfo) journal.getData();
env.replayRecoverTable(info);
env.getBinlogManager().addRecoverTableRecord(info, logId);
break;
}
case OperationType.OP_RECOVER_PARTITION: {
RecoverInfo info = (RecoverInfo) journal.getData();
env.replayRecoverPartition(info);
env.getBinlogManager().addRecoverTableRecord(info, logId);
break;
}
case OperationType.OP_RENAME_TABLE: {
Expand Down Expand Up @@ -1436,7 +1438,8 @@ public void logErasePartition(long partitionId) {
}

public void logRecoverPartition(RecoverInfo info) {
logEdit(OperationType.OP_RECOVER_PARTITION, info);
long logId = logEdit(OperationType.OP_RECOVER_PARTITION, info);
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId);
}

public void logModifyPartition(ModifyPartitionInfo info) {
Expand All @@ -1463,7 +1466,8 @@ public void logEraseTable(long tableId) {
}

public void logRecoverTable(RecoverInfo info) {
logEdit(OperationType.OP_RECOVER_TABLE, info);
long logId = logEdit(OperationType.OP_RECOVER_TABLE, info);
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId);
}

public void logDropRollup(DropInfo info) {
Expand Down
22 changes: 20 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,30 @@ public class RecoverInfo implements Writable, GsonPostProcessable {
private String newDbName;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "tableName")
private String tableName; /// added for table name.
@SerializedName(value = "newTableName")
private String newTableName;
@SerializedName(value = "partitionId")
private long partitionId;
@SerializedName(value = "partitionName")
private String partitionName;
@SerializedName(value = "newPartitionName")
private String newPartitionName;

private RecoverInfo() {
// for persist
}

public RecoverInfo(long dbId, long tableId, long partitionId, String newDbName, String newTableName,
String newPartitionName) {
public RecoverInfo(long dbId, long tableId, long partitionId, String newDbName, String tableName,
String newTableName, String partitionName, String newPartitionName) {
this.dbId = dbId;
this.tableId = tableId;
this.tableName = tableName;
this.partitionId = partitionId;
this.newDbName = newDbName;
this.newTableName = newTableName;
this.partitionName = partitionName;
this.newPartitionName = newPartitionName;
}

Expand All @@ -67,6 +73,10 @@ public long getTableId() {
return tableId;
}

public String getTableName() {
return tableName;
}

public long getPartitionId() {
return partitionId;
}
Expand Down Expand Up @@ -109,4 +119,12 @@ private void readFields(DataInput in) throws IOException {
public void gsonPostProcess() throws IOException {
newDbName = ClusterNamespace.getNameFromFullName(newDbName);
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static RecoverInfo fromJson(String json) {
return GsonUtils.GSON.fromJson(json, RecoverInfo.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void testRecoveryInfoSerialization() throws Exception {
file.createNewFile();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));

RecoverInfo info1 = new RecoverInfo(1, 2, 3, "a", "b", "c");
RecoverInfo info1 = new RecoverInfo(1, 2, 3, "a", "", "b", "", "c");
info1.write(dos);
dos.flush();
dos.close();
Expand Down
4 changes: 2 additions & 2 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ enum TBinlogType {
RENAME_ROLLUP = 21,
RENAME_PARTITION = 22,
DROP_ROLLUP = 23,
RECOVER_INFO = 24,

// Keep some IDs for allocation so that when new binlog types are added in the
// future, the changes can be picked back to the old versions without breaking
Expand All @@ -1213,8 +1214,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
MIN_UNKNOWN = 24,
UNKNOWN_9 = 25,
MIN_UNKNOWN = 25,
UNKNOWN_10 = 26,
UNKNOWN_11 = 27,
UNKNOWN_12 = 28,
Expand Down

0 comments on commit 07d4649

Please sign in to comment.