Skip to content

Commit

Permalink
[feat](binlog) Add replace table binlog
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Nov 19, 2024
1 parent c5ee28b commit 32c93af
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 23 deletions.
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ public void processReplaceTable(Database db, OlapTable origTable, String newTblN
replaceTableInternal(db, origTable, olapNewTbl, swapTable, false, isForce);
// write edit log
ReplaceTableOperationLog log = new ReplaceTableOperationLog(db.getId(),
origTable.getId(), olapNewTbl.getId(), swapTable, isForce);
origTable.getId(), oldTblName, olapNewTbl.getId(), newTblName, swapTable, isForce);
Env.getCurrentEnv().getEditLog().logReplaceTable(log);
LOG.info("finish replacing table {} with table {}, is swap: {}", oldTblName, newTblName, swapTable);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.persist.ModifyCommentOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TableRenameColumnInfo;
Expand All @@ -45,6 +46,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -367,6 +369,22 @@ public void addModifyViewDef(AlterViewInfo alterViewInfo, long commitSeq) {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterViewInfo);
}

public void addReplaceTable(ReplaceTableOperationLog info, long commitSeq) {
if (StringUtils.isEmpty(info.getOrigTblName()) || StringUtils.isEmpty(info.getNewTblName())) {
LOG.warn("skip replace table binlog, because origTblName or newTblName is empty. info: {}", info);
return;
}

long dbId = info.getDbId();
List<Long> tableIds = Lists.newArrayList();
tableIds.add(info.getOrigTblId());
long timestamp = -1;
TBinlogType type = TBinlogType.REPLACE_TABLE;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}

// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
Expand Down
53 changes: 40 additions & 13 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
Expand Down Expand Up @@ -626,19 +628,29 @@ private void recordDroppedResources(TBinlog binlog) {

// A method to record the dropped tables, indexes, and partitions.
private void recordDroppedResources(TBinlog binlog, Object raw) {
recordDroppedResources(binlog.getType(), binlog.getCommitSeq(), binlog.getData(), raw);
}

private void recordDroppedResources(TBinlogType binlogType, long commitSeq, String data, Object raw) {
if (raw == null) {
switch (binlog.getType()) {
switch (binlogType) {
case DROP_PARTITION:
raw = DropPartitionInfo.fromJson(binlog.data);
raw = DropPartitionInfo.fromJson(data);
break;
case DROP_TABLE:
raw = DropTableRecord.fromJson(binlog.data);
raw = DropTableRecord.fromJson(data);
break;
case ALTER_JOB:
raw = AlterJobRecord.fromJson(binlog.data);
raw = AlterJobRecord.fromJson(data);
break;
case TRUNCATE_TABLE:
raw = TruncateTableRecord.fromJson(binlog.data);
raw = TruncateTableRecord.fromJson(data);
break;
case REPLACE_TABLE:
raw = ReplaceTableOperationLog.fromJson(data);
break;
case BARRIER:
raw = BarrierLog.fromJson(data);
break;
default:
break;
Expand All @@ -648,29 +660,44 @@ private void recordDroppedResources(TBinlog binlog, Object raw) {
}
}

if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
recordDroppedResources(binlogType, commitSeq, raw);
}

private void recordDroppedResources(TBinlogType binlogType, long commitSeq, Object raw) {
if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
droppedPartitions.add(Pair.of(partitionId, commitSeq));
}
} else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) {
} else if (binlogType == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) {
long tableId = ((DropTableRecord) raw).getTableId();
if (tableId > 0) {
droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
droppedTables.add(Pair.of(tableId, commitSeq));
}
} else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) {
} else if (binlogType == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) {
AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) {
for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
if (indexId != null && indexId > 0) {
droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq()));
droppedIndexes.add(Pair.of(indexId, commitSeq));
}
}
}
} else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) {
} else if (binlogType == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) {
TruncateTableRecord truncateTableRecord = (TruncateTableRecord) raw;
for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
droppedPartitions.add(Pair.of(partitionId, commitSeq));
}
} else if (binlogType == TBinlogType.REPLACE_TABLE && raw instanceof ReplaceTableOperationLog) {
ReplaceTableOperationLog record = (ReplaceTableOperationLog) raw;
if (!record.isSwapTable()) {
droppedTables.add(Pair.of(record.getOrigTblId(), commitSeq));
}
} else if (binlogType == TBinlogType.BARRIER && raw instanceof BarrierLog) {
BarrierLog log = (BarrierLog) raw;
// keep compatible with doris 2.0/2.1
if (log.hasBinlog()) {
recordDroppedResources(log.getBinlogType(), commitSeq, log.getBinlog(), null);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static BarrierLog fromJson(String json) {
return GsonUtils.GSON.fromJson(json, BarrierLog.class);
}

@Override
public String toString() {
return toJson();
Expand Down
11 changes: 7 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
case OperationType.OP_RENAME_TABLE: {
TableInfo info = (TableInfo) journal.getData();
env.replayRenameTable(info);
Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
env.getBinlogManager().addTableRename(info, logId);
break;
}
case OperationType.OP_MODIFY_VIEW_DEF: {
Expand All @@ -318,7 +318,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
case OperationType.OP_RENAME_PARTITION: {
TableInfo info = (TableInfo) journal.getData();
env.replayRenamePartition(info);
Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
env.getBinlogManager().addTableRename(info, logId);
break;
}
case OperationType.OP_RENAME_COLUMN: {
Expand Down Expand Up @@ -366,7 +366,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
case OperationType.OP_RENAME_ROLLUP: {
TableInfo info = (TableInfo) journal.getData();
env.replayRenameRollup(info);
Env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
env.getCurrentEnv().getBinlogManager().addTableRename(info, logId);
break;
}
case OperationType.OP_LOAD_START:
Expand Down Expand Up @@ -898,6 +898,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
case OperationType.OP_REPLACE_TABLE: {
ReplaceTableOperationLog log = (ReplaceTableOperationLog) journal.getData();
env.getAlterInstance().replayReplaceTable(log);
env.getBinlogManager().addReplaceTable(log, logId);
break;
}
case OperationType.OP_CREATE_SQL_BLOCK_RULE: {
Expand Down Expand Up @@ -1950,7 +1951,9 @@ public void logGlobalVariableV2(GlobalVarPersistInfo info) {
}

public void logReplaceTable(ReplaceTableOperationLog log) {
logEdit(OperationType.OP_REPLACE_TABLE, log);
long logId = logEdit(OperationType.OP_REPLACE_TABLE, log);
LOG.info("add replace table binlog, logId: {}, infos: {}", logId, log);
Env.getCurrentEnv().getBinlogManager().addReplaceTable(log, logId);
}

public void logBatchRemoveTransactions(BatchRemoveTransactionsOperationV2 op) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,25 @@ public class ReplaceTableOperationLog implements Writable {
private long dbId;
@SerializedName(value = "origTblId")
private long origTblId;
@SerializedName(value = "origTblName")
private String origTblName;
@SerializedName(value = "newTblName")
private long newTblId;
@SerializedName(value = "actualNewTblName")
private String newTblName;
@SerializedName(value = "swapTable")
private boolean swapTable;
@SerializedName(value = "isForce")
private boolean isForce = true; // older version it was force. so keep same.

public ReplaceTableOperationLog(long dbId, long origTblId, long newTblId, boolean swapTable, boolean isForce) {
public ReplaceTableOperationLog(long dbId, long origTblId,
String origTblName, long newTblId, String newTblName,
boolean swapTable, boolean isForce) {
this.dbId = dbId;
this.origTblId = origTblId;
this.origTblName = origTblName;
this.newTblId = newTblId;
this.newTblName = newTblName;
this.swapTable = swapTable;
this.isForce = isForce;
}
Expand All @@ -55,10 +63,18 @@ public long getOrigTblId() {
return origTblId;
}

public String getOrigTblName() {
return origTblName;
}

public long getNewTblId() {
return newTblId;
}

public String getNewTblName() {
return newTblName;
}

public boolean isSwapTable() {
return swapTable;
}
Expand All @@ -67,13 +83,21 @@ public boolean isForce() {
return isForce;
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static ReplaceTableOperationLog fromJson(String json) {
return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class);
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}

public static ReplaceTableOperationLog read(DataInput in) throws IOException {
public static ReplaceTableOperationLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ReplaceTableOperationLog.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testSerialization() throws Exception {
file.createNewFile();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));

ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, 3, true, true);
ReplaceTableOperationLog log = new ReplaceTableOperationLog(1, 2, "old", 3, "new", true, true);
log.write(dos);

dos.flush();
Expand All @@ -48,6 +48,8 @@ public void testSerialization() throws Exception {
Assert.assertTrue(readLog.getNewTblId() == log.getNewTblId());
Assert.assertTrue(readLog.getOrigTblId() == log.getOrigTblId());
Assert.assertTrue(readLog.isSwapTable() == log.isSwapTable());
Assert.assertTrue(readLog.getOrigTblName() == log.getOrigTblName());
Assert.assertTrue(readLog.getNewTblName() == log.getNewTblName());

// 3. delete files
dis.close();
Expand Down
4 changes: 2 additions & 2 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,7 @@ enum TBinlogType {
RENAME_COLUMN = 15,
MODIFY_COMMENT = 16,
MODIFY_VIEW_DEF = 17,
REPLACE_TABLE = 18,

// Keep some IDs for allocation so that when new binlog types are added in the
// future, the changes can be picked back to the old versions without breaking
Expand All @@ -1207,8 +1208,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
MIN_UNKNOWN = 18,
UNKNOWN_3 = 19,
MIN_UNKNOWN = 19,
UNKNOWN_4 = 20,
UNKNOWN_5 = 21,
UNKNOWN_6 = 22,
Expand Down

0 comments on commit 32c93af

Please sign in to comment.