Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](binlog) Add Support recover binlog #44818

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
Expand Down Expand Up @@ -446,6 +447,32 @@ public void addDropRollup(DropInfo info, long commitSeq) {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}


private boolean supportedRecoverInfo(RecoverInfo info) {
//table name and partitionName added together.
// recover table case, tablename must exist in newer version
// recover partition case also table name must exist.
// so checking only table name here.
if (StringUtils.isEmpty(info.getTableName())) {
LOG.warn("skip recover info binlog, because tableName is empty. info: {}", info);
return false;
}
return true;
}

public void addRecoverTableRecord(RecoverInfo info, long commitSeq) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RecoverInfo comes from the old version of Doris, which does not contain TableName and PartitionName, and should be filtered here. Otherwise, it might cause the ccr syncer to stop synching.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you. have update it.

if (supportedRecoverInfo(info) == false) {
return;
}
long dbId = info.getDbId();
List<Long> tableIds = Lists.newArrayList();
tableIds.add(info.getTableId());
long timestamp = -1;
TBinlogType type = TBinlogType.RECOVER_INFO;
String data = info.toJson();
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}

// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
Expand Down
33 changes: 23 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
Expand Down Expand Up @@ -124,7 +125,7 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {

allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
recordDroppedResources(binlog);
recordDroppedOrRecoveredResources(binlog);

if (tableIds == null) {
return;
Expand Down Expand Up @@ -178,7 +179,7 @@ public void addBinlog(TBinlog binlog, Object raw) {
return;
}

recordDroppedResources(binlog, raw);
recordDroppedOrRecoveredResources(binlog, raw);

switch (binlog.getType()) {
case CREATE_TABLE:
Expand Down Expand Up @@ -623,16 +624,16 @@ public void getBinlogInfo(BaseProcResult result) {
}
}

private void recordDroppedResources(TBinlog binlog) {
recordDroppedResources(binlog, null);
private void recordDroppedOrRecoveredResources(TBinlog binlog) {
recordDroppedOrRecoveredResources(binlog, null);
}

// A method to record the dropped tables, indexes, and partitions.
private void recordDroppedResources(TBinlog binlog, Object raw) {
recordDroppedResources(binlog.getType(), binlog.getCommitSeq(), binlog.getData(), raw);
private void recordDroppedOrRecoveredResources(TBinlog binlog, Object raw) {
recordDroppedOrRecoveredResources(binlog.getType(), binlog.getCommitSeq(), binlog.getData(), raw);
}

private void recordDroppedResources(TBinlogType binlogType, long commitSeq, String data, Object raw) {
private void recordDroppedOrRecoveredResources(TBinlogType binlogType, long commitSeq, String data, Object raw) {
if (raw == null) {
switch (binlogType) {
case DROP_PARTITION:
Expand All @@ -656,6 +657,9 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Stri
case BARRIER:
raw = BarrierLog.fromJson(data);
break;
case RECOVER_INFO:
raw = RecoverInfo.fromJson(data);
break;
default:
break;
}
Expand All @@ -664,10 +668,10 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Stri
}
}

recordDroppedResources(binlogType, commitSeq, raw);
recordDroppedOrRecoveredResources(binlogType, commitSeq, raw);
}

private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Object raw) {
private void recordDroppedOrRecoveredResources(TBinlogType binlogType, long commitSeq, Object raw) {
if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
Expand Down Expand Up @@ -706,7 +710,16 @@ private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Obje
BarrierLog log = (BarrierLog) raw;
// keep compatible with doris 2.0/2.1
if (log.hasBinlog()) {
recordDroppedResources(log.getBinlogType(), commitSeq, log.getBinlog(), null);
recordDroppedOrRecoveredResources(log.getBinlogType(), commitSeq, log.getBinlog(), null);
}
} else if ((binlogType == TBinlogType.RECOVER_INFO) && (raw instanceof RecoverInfo)) {
RecoverInfo recoverInfo = (RecoverInfo) raw;
long partitionId = recoverInfo.getPartitionId();
long tableId = recoverInfo.getTableId();
if (partitionId > 0) {
droppedPartitions.removeIf(entry -> (entry.first == partitionId));
} else if (tableId > 0) {
droppedTables.removeIf(entry -> (entry.first == tableId));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,8 @@ private synchronized boolean innerRecoverTable(Database db, Table table, String
LOG.info("replay recover table[{}]", table.getId());
} else {
// log
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), table.getId(), -1L, "", newTableName, "");
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), table.getId(),
-1L, "", table.getName(), newTableName, "", "");
Env.getCurrentEnv().getEditLog().logRecoverTable(recoverInfo);
}
// Only olap table need recover dynamic partition, other table like jdbc odbc view.. do not need it
Expand Down Expand Up @@ -873,7 +874,8 @@ public synchronized void recoverPartition(long dbId, OlapTable table, String par
idToRecycleTime.remove(partitionId);

// log
RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "", "", newPartitionName);
RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(), partitionId, "",
table.getName(), "", partitionName, newPartitionName);
Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
LOG.info("recover partition[{}]", partitionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ public void recoverDatabase(String dbName, long dbId, String newDbName) throws D
fullNameToDb.put(db.getFullName(), db);
idToDb.put(db.getId(), db);
// log
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L, newDbName, "", "");
RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L, newDbName, "", "", "", "");
Env.getCurrentEnv().getEditLog().logRecoverDb(recoverInfo);
db.unmarkDropped();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,13 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
case OperationType.OP_RECOVER_TABLE: {
RecoverInfo info = (RecoverInfo) journal.getData();
env.replayRecoverTable(info);
env.getBinlogManager().addRecoverTableRecord(info, logId);
Vallishp marked this conversation as resolved.
Show resolved Hide resolved
break;
}
case OperationType.OP_RECOVER_PARTITION: {
RecoverInfo info = (RecoverInfo) journal.getData();
env.replayRecoverPartition(info);
env.getBinlogManager().addRecoverTableRecord(info, logId);
break;
}
case OperationType.OP_RENAME_TABLE: {
Expand Down Expand Up @@ -1444,7 +1446,8 @@ public void logErasePartition(long partitionId) {
}

public void logRecoverPartition(RecoverInfo info) {
logEdit(OperationType.OP_RECOVER_PARTITION, info);
long logId = logEdit(OperationType.OP_RECOVER_PARTITION, info);
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId);
}

public void logModifyPartition(ModifyPartitionInfo info) {
Expand All @@ -1471,7 +1474,8 @@ public void logEraseTable(long tableId) {
}

public void logRecoverTable(RecoverInfo info) {
logEdit(OperationType.OP_RECOVER_TABLE, info);
long logId = logEdit(OperationType.OP_RECOVER_TABLE, info);
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId);
}

public void logDropRollup(DropInfo info) {
Expand Down
22 changes: 20 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,30 @@ public class RecoverInfo implements Writable, GsonPostProcessable {
private String newDbName;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "tableName")
private String tableName; /// added for table name.
@SerializedName(value = "newTableName")
private String newTableName;
@SerializedName(value = "partitionId")
private long partitionId;
@SerializedName(value = "partitionName")
private String partitionName;
@SerializedName(value = "newPartitionName")
private String newPartitionName;

private RecoverInfo() {
// for persist
}

public RecoverInfo(long dbId, long tableId, long partitionId, String newDbName, String newTableName,
String newPartitionName) {
public RecoverInfo(long dbId, long tableId, long partitionId, String newDbName, String tableName,
String newTableName, String partitionName, String newPartitionName) {
this.dbId = dbId;
this.tableId = tableId;
this.tableName = tableName;
this.partitionId = partitionId;
this.newDbName = newDbName;
this.newTableName = newTableName;
this.partitionName = partitionName;
this.newPartitionName = newPartitionName;
}

Expand All @@ -67,6 +73,10 @@ public long getTableId() {
return tableId;
}

public String getTableName() {
return tableName;
}

public long getPartitionId() {
return partitionId;
}
Expand Down Expand Up @@ -109,4 +119,12 @@ private void readFields(DataInput in) throws IOException {
public void gsonPostProcess() throws IOException {
newDbName = ClusterNamespace.getNameFromFullName(newDbName);
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static RecoverInfo fromJson(String json) {
return GsonUtils.GSON.fromJson(json, RecoverInfo.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void testRecoveryInfoSerialization() throws Exception {
file.createNewFile();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));

RecoverInfo info1 = new RecoverInfo(1, 2, 3, "a", "b", "c");
RecoverInfo info1 = new RecoverInfo(1, 2, 3, "a", "", "b", "", "c");
info1.write(dos);
dos.flush();
dos.close();
Expand Down
4 changes: 2 additions & 2 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,7 @@ enum TBinlogType {
RENAME_ROLLUP = 21,
RENAME_PARTITION = 22,
DROP_ROLLUP = 23,
RECOVER_INFO = 24,

// Keep some IDs for allocation so that when new binlog types are added in the
// future, the changes can be picked back to the old versions without breaking
Expand All @@ -1215,8 +1216,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
MIN_UNKNOWN = 24,
UNKNOWN_9 = 25,
MIN_UNKNOWN = 25,
UNKNOWN_10 = 26,
UNKNOWN_11 = 27,
UNKNOWN_12 = 28,
Expand Down
Loading