From bd5e7d0a827dec2e68898ddd3aad974bb21516de Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 22 Nov 2024 20:09:21 +0800 Subject: [PATCH] [feat](binlog) Support add/build/drop inverted index binlog (#44418) Related PR: https://github.com/selectdb/ccr-syncer/pull/252 --- .../apache/doris/alter/IndexChangeJob.java | 4 ++++ .../apache/doris/binlog/BinlogManager.java | 24 +++++++++++++++++++ .../org/apache/doris/persist/EditLog.java | 12 ++++++++-- .../TableAddOrDropInvertedIndicesInfo.java | 4 ++++ gensrc/thrift/FrontendService.thrift | 6 ++--- 5 files changed, 45 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java index bb0c018dc36f36..a51cdb01699e23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java @@ -469,6 +469,10 @@ private void replayCancelled(IndexChangeJob replayedJob) { LOG.info("cancel index job {}, err: {}", jobId, errMsg); } + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + public static IndexChangeJob read(DataInput in) throws IOException { if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_122) { IndexChangeJob job = new IndexChangeJob(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 1f785713666437..0fadfc2b542c85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -18,6 +18,7 @@ package org.apache.doris.binlog; import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.alter.IndexChangeJob; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; @@ -35,6 +36,7 @@ import org.apache.doris.persist.ReplacePartitionOperationLog; import org.apache.doris.persist.ReplaceTableOperationLog; import org.apache.doris.persist.TableAddOrDropColumnsInfo; +import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo; import org.apache.doris.persist.TableInfo; import org.apache.doris.persist.TableRenameColumnInfo; import org.apache.doris.persist.TruncateTableInfo; @@ -385,6 +387,28 @@ public void addReplaceTable(ReplaceTableOperationLog info, long commitSeq) { addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); } + public void addModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info, long commitSeq) { + long dbId = info.getDbId(); + List tableIds = Lists.newArrayList(); + tableIds.add(info.getTableId()); + long timestamp = -1; + TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES; + String data = info.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info); + } + + public void addIndexChangeJob(IndexChangeJob indexChangeJob, long commitSeq) { + long dbId = indexChangeJob.getDbId(); + List tableIds = Lists.newArrayList(); + tableIds.add(indexChangeJob.getTableId()); + long timestamp = -1; + TBinlogType type = TBinlogType.INDEX_CHANGE_JOB; + String data = indexChangeJob.toJson(); + + addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, indexChangeJob); + } + // get binlog by dbId, return first binlog.version > version public Pair getBinlog(long dbId, long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 5ae6f62ebb20e9..96f5d43aed2cbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -986,11 +986,13 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) { final TableAddOrDropInvertedIndicesInfo info = (TableAddOrDropInvertedIndicesInfo) journal.getData(); env.getSchemaChangeHandler().replayModifyTableAddOrDropInvertedIndices(info); + env.getBinlogManager().addModifyTableAddOrDropInvertedIndices(info, logId); break; } case OperationType.OP_INVERTED_INDEX_JOB: { IndexChangeJob indexChangeJob = (IndexChangeJob) journal.getData(); env.getSchemaChangeHandler().replayIndexChangeJob(indexChangeJob); + env.getBinlogManager().addIndexChangeJob(indexChangeJob, logId); break; } case OperationType.OP_CLEAN_LABEL: { @@ -2058,11 +2060,17 @@ public void logModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info) { } public void logModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info) { - logEdit(OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES, info); + long logId = logEdit(OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES, info); + LOG.info("walter log modify table add or drop inverted indices, infos: {}, json: {}", + info, info.toJson(), new RuntimeException("test")); + Env.getCurrentEnv().getBinlogManager().addModifyTableAddOrDropInvertedIndices(info, logId); } public void logIndexChangeJob(IndexChangeJob indexChangeJob) { - logEdit(OperationType.OP_INVERTED_INDEX_JOB, indexChangeJob); + long logId = logEdit(OperationType.OP_INVERTED_INDEX_JOB, indexChangeJob); + LOG.info("walter log inverted index job, infos: {}, json: {}", + indexChangeJob, indexChangeJob.toJson(), new RuntimeException("test")); + Env.getCurrentEnv().getBinlogManager().addIndexChangeJob(indexChangeJob, logId); } public void logCleanLabel(CleanLabelOperationLog log) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java index efdc3ab6e9eac2..39a90046d2419e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TableAddOrDropInvertedIndicesInfo.java @@ -95,6 +95,10 @@ public long getJobId() { return jobId; } + public String toJson() { + return GsonUtils.GSON.toJson(this); + } + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 4270513023a6fe..83f36c6905aed4 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1191,6 +1191,8 @@ enum TBinlogType { MODIFY_COMMENT = 16, MODIFY_VIEW_DEF = 17, REPLACE_TABLE = 18, + MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES = 19, + INDEX_CHANGE_JOB = 20, // Keep some IDs for allocation so that when new binlog types are added in the // future, the changes can be picked back to the old versions without breaking @@ -1207,9 +1209,7 @@ enum TBinlogType { // MODIFY_XXX = 17, // MIN_UNKNOWN = 18, // UNKNOWN_3 = 19, - MIN_UNKNOWN = 19, - UNKNOWN_4 = 20, - UNKNOWN_5 = 21, + MIN_UNKNOWN = 21, UNKNOWN_6 = 22, UNKNOWN_7 = 23, UNKNOWN_8 = 24,