Skip to content

Commit

Permalink
[feat](binlog) Support add/build/drop inverted index binlog (apache#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Nov 26, 2024
1 parent d993874 commit bd5e7d0
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ private void replayCancelled(IndexChangeJob replayedJob) {
LOG.info("cancel index job {}, err: {}", jobId, errMsg);
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static IndexChangeJob read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_122) {
IndexChangeJob job = new IndexChangeJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.binlog;

import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.IndexChangeJob;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
Expand All @@ -35,6 +36,7 @@
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TableRenameColumnInfo;
import org.apache.doris.persist.TruncateTableInfo;
Expand Down Expand Up @@ -385,6 +387,28 @@ public void addReplaceTable(ReplaceTableOperationLog info, long commitSeq) {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}

public void addModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info, long commitSeq) {
long dbId = info.getDbId();
List<Long> tableIds = Lists.newArrayList();
tableIds.add(info.getTableId());
long timestamp = -1;
TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}

public void addIndexChangeJob(IndexChangeJob indexChangeJob, long commitSeq) {
long dbId = indexChangeJob.getDbId();
List<Long> tableIds = Lists.newArrayList();
tableIds.add(indexChangeJob.getTableId());
long timestamp = -1;
TBinlogType type = TBinlogType.INDEX_CHANGE_JOB;
String data = indexChangeJob.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, indexChangeJob);
}

// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
Expand Down
12 changes: 10 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -986,11 +986,13 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
final TableAddOrDropInvertedIndicesInfo info =
(TableAddOrDropInvertedIndicesInfo) journal.getData();
env.getSchemaChangeHandler().replayModifyTableAddOrDropInvertedIndices(info);
env.getBinlogManager().addModifyTableAddOrDropInvertedIndices(info, logId);
break;
}
case OperationType.OP_INVERTED_INDEX_JOB: {
IndexChangeJob indexChangeJob = (IndexChangeJob) journal.getData();
env.getSchemaChangeHandler().replayIndexChangeJob(indexChangeJob);
env.getBinlogManager().addIndexChangeJob(indexChangeJob, logId);
break;
}
case OperationType.OP_CLEAN_LABEL: {
Expand Down Expand Up @@ -2058,11 +2060,17 @@ public void logModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info) {
}

public void logModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info) {
logEdit(OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES, info);
long logId = logEdit(OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES, info);
LOG.info("walter log modify table add or drop inverted indices, infos: {}, json: {}",
info, info.toJson(), new RuntimeException("test"));
Env.getCurrentEnv().getBinlogManager().addModifyTableAddOrDropInvertedIndices(info, logId);
}

public void logIndexChangeJob(IndexChangeJob indexChangeJob) {
logEdit(OperationType.OP_INVERTED_INDEX_JOB, indexChangeJob);
long logId = logEdit(OperationType.OP_INVERTED_INDEX_JOB, indexChangeJob);
LOG.info("walter log inverted index job, infos: {}, json: {}",
indexChangeJob, indexChangeJob.toJson(), new RuntimeException("test"));
Env.getCurrentEnv().getBinlogManager().addIndexChangeJob(indexChangeJob, logId);
}

public void logCleanLabel(CleanLabelOperationLog log) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public long getJobId() {
return jobId;
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
Expand Down
6 changes: 3 additions & 3 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,8 @@ enum TBinlogType {
MODIFY_COMMENT = 16,
MODIFY_VIEW_DEF = 17,
REPLACE_TABLE = 18,
MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES = 19,
INDEX_CHANGE_JOB = 20,

// Keep some IDs for allocation so that when new binlog types are added in the
// future, the changes can be picked back to the old versions without breaking
Expand All @@ -1207,9 +1209,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
MIN_UNKNOWN = 19,
UNKNOWN_4 = 20,
UNKNOWN_5 = 21,
MIN_UNKNOWN = 21,
UNKNOWN_6 = 22,
UNKNOWN_7 = 23,
UNKNOWN_8 = 24,
Expand Down

0 comments on commit bd5e7d0

Please sign in to comment.