From 0e03cdce97fe38e2c428ae251f5471bb1b8331bd Mon Sep 17 00:00:00 2001 From: walter Date: Sat, 16 Nov 2024 01:13:17 +0800 Subject: [PATCH] [improve](backup) Add the commit seq at the backup job level. (#44049) Related PR: https://github.com/selectdb/ccr-syncer/pull/237 Problem Summary: When a backup job is created, it will read the database and obtain a set of table refs. If there is a table creation statement running simultaneously and the new table is not included in the table refs, then the job commit seq obtained by the CCR syncer only through the commit seq of the tables included in the snapshot may be greater than that of the new table. Consequently, the table will be lost during the synchronization process. This PR acquires a commit seq within the database read lock as the commit seq of the entire snapshot. In this way, the CCR syncer can continue to synchronize the binlog starting from this value. Thus, the table creation statements running in parallel with the backup job will either be included in the table refs or their binlog can be observed by the snapshot commit sequence. --- .../apache/doris/backup/BackupHandler.java | 43 ++++++++++++++----- .../org/apache/doris/backup/BackupJob.java | 34 ++++++++++++--- .../org/apache/doris/backup/Snapshot.java | 11 ++++- .../org/apache/doris/catalog/Database.java | 4 ++ .../org/apache/doris/persist/BarrierLog.java | 5 +++ .../doris/service/FrontendServiceImpl.java | 6 ++- .../apache/doris/backup/BackupJobTest.java | 4 +- gensrc/thrift/FrontendService.thrift | 1 + 8 files changed, 85 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 3015e366c3e11b..61d1e9e5f1347e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -49,6 +49,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.persist.BarrierLog; import org.apache.doris.task.DirMoveTask; import org.apache.doris.task.DownloadTask; import org.apache.doris.task.SnapshotTask; @@ -308,20 +309,32 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws + " is read only"); } - // Determine the tables to be backed up + long commitSeq = 0; Set tableNames = Sets.newHashSet(); AbstractBackupTableRefClause abstractBackupTableRefClause = stmt.getAbstractBackupTableRefClause(); - if (abstractBackupTableRefClause == null) { - tableNames = db.getTableNamesWithLock(); - } else if (abstractBackupTableRefClause.isExclude()) { - tableNames = db.getTableNamesWithLock(); - for (TableRef tableRef : abstractBackupTableRefClause.getTableRefList()) { - if (!tableNames.remove(tableRef.getName().getTbl())) { - LOG.info("exclude table " + tableRef.getName().getTbl() - + " of backup stmt is not exists in db " + db.getFullName()); + + // Obtain the snapshot commit seq, any creating table binlog will be visible. + db.readLock(); + try { + BarrierLog log = new BarrierLog(db.getId(), db.getFullName()); + commitSeq = env.getEditLog().logBarrier(log); + + // Determine the tables to be backed up + if (abstractBackupTableRefClause == null) { + tableNames = db.getTableNames(); + } else if (abstractBackupTableRefClause.isExclude()) { + tableNames = db.getTableNames(); + for (TableRef tableRef : abstractBackupTableRefClause.getTableRefList()) { + if (!tableNames.remove(tableRef.getName().getTbl())) { + LOG.info("exclude table " + tableRef.getName().getTbl() + + " of backup stmt is not exists in db " + db.getFullName()); + } } } + } finally { + db.readUnlock(); } + List tblRefs = Lists.newArrayList(); if (abstractBackupTableRefClause != null && !abstractBackupTableRefClause.isExclude()) { tblRefs = abstractBackupTableRefClause.getTableRefList(); @@ -339,6 +352,14 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws for (TableRef tblRef : tblRefs) { String tblName = tblRef.getName().getTbl(); Table tbl = db.getTableOrDdlException(tblName); + + // filter the table types which are not supported by local backup. + if (repository == null && tbl.getType() != TableType.OLAP + && tbl.getType() != TableType.VIEW && tbl.getType() != TableType.MATERIALIZED_VIEW) { + tblRefsNotSupport.add(tblRef); + continue; + } + if (tbl.getType() == TableType.VIEW || tbl.getType() == TableType.ODBC) { continue; } @@ -385,7 +406,7 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws tblRefs.removeAll(tblRefsNotSupport); // Check if label already be used - long repoId = -1; + long repoId = Repository.KEEP_ON_LOCAL_REPO_ID; if (repository != null) { List existSnapshotNames = Lists.newArrayList(); Status st = repository.listSnapshots(existSnapshotNames); @@ -407,7 +428,7 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws // Create a backup job BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(), ClusterNamespace.getNameFromFullName(db.getFullName()), - tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId); + tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId, commitSeq); // write log env.getEditLog().logBackupJob(backupJob); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 8a5043ebe7f61a..59365a24931407 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -36,6 +36,7 @@ import org.apache.doris.catalog.View; import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.property.S3ClientBEProperties; import org.apache.doris.persist.BarrierLog; @@ -85,6 +86,7 @@ public class BackupJob extends AbstractJob { private static final Logger LOG = LogManager.getLogger(BackupJob.class); private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:"; + private static final String SNAPSHOT_COMMIT_SEQ = "commit_seq"; public enum BackupJobState { PENDING, // Job is newly created. Send snapshot tasks and save copied meta info, then transfer to SNAPSHOTING @@ -123,6 +125,8 @@ public enum BackupJobState { // backup properties && table commit seq with table id private Map properties = Maps.newHashMap(); + private long commitSeq = 0; + public BackupJob() { super(JobType.BACKUP); } @@ -133,11 +137,13 @@ public BackupJob(JobType jobType) { } public BackupJob(String label, long dbId, String dbName, List tableRefs, long timeoutMs, - BackupContent content, Env env, long repoId) { + BackupContent content, Env env, long repoId, long commitSeq) { super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId); this.tableRefs = tableRefs; this.state = BackupJobState.PENDING; + this.commitSeq = commitSeq; properties.put(BackupStmt.PROP_CONTENT, content.name()); + properties.put(SNAPSHOT_COMMIT_SEQ, String.valueOf(commitSeq)); } public BackupJobState getState() { @@ -237,7 +243,7 @@ public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishT if (request.getTaskStatus().getStatusCode() == TStatusCode.TABLET_MISSING && !tryNewTabletSnapshotTask(task)) { status = new Status(ErrCode.NOT_FOUND, - "make snapshot failed, failed to ge tablet, table will be droped or truncated"); + "make snapshot failed, failed to ge tablet, table will be dropped or truncated"); cancelInternal(); } @@ -379,6 +385,14 @@ public synchronized void run() { LOG.debug("run backup job: {}", this); + if (state == BackupJobState.PENDING) { + String pausedLabel = DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_PENDING_BACKUP_JOB", ""); + if (!pausedLabel.isEmpty() && label.startsWith(pausedLabel)) { + LOG.info("pause pending backup job by debug point: {}", this); + return; + } + } + // run job base on current state switch (state) { case PENDING: @@ -526,7 +540,7 @@ private void checkOlapTable(OlapTable olapTable, TableRef backupTableRef) { private void prepareSnapshotTaskForOlapTableWithoutLock(Database db, OlapTable olapTable, TableRef backupTableRef, AgentBatchTask batchTask) { - // Add barrier editolog for barrier commit seq + // Add barrier editlog for barrier commit seq long dbId = db.getId(); String dbName = db.getFullName(); long tableId = olapTable.getId(); @@ -656,13 +670,11 @@ private void removeUnsupportProperties(OlapTable tbl) { private void waitingAllSnapshotsFinished() { if (unfinishedTaskIds.isEmpty()) { - if (env.getEditLog().exceedMaxJournalSize(this)) { status = new Status(ErrCode.COMMON_ERROR, "backupJob is too large "); return; } - snapshotFinishedTime = System.currentTimeMillis(); state = BackupJobState.UPLOAD_SNAPSHOT; @@ -972,6 +984,10 @@ public boolean isLocalSnapshot() { return repoId == Repository.KEEP_ON_LOCAL_REPO_ID; } + public long getCommitSeq() { + return commitSeq; + } + // read meta and job info bytes from disk, and return the snapshot public synchronized Snapshot getSnapshot() { if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) { @@ -981,7 +997,7 @@ public synchronized Snapshot getSnapshot() { // Avoid loading expired meta. long expiredAt = createTime + timeoutMs; if (System.currentTimeMillis() >= expiredAt) { - return new Snapshot(label, new byte[0], new byte[0], expiredAt); + return new Snapshot(label, new byte[0], new byte[0], expiredAt, commitSeq); } try { @@ -989,7 +1005,7 @@ public synchronized Snapshot getSnapshot() { File jobInfoFile = new File(localJobInfoFilePath); byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); - return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt); + return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, commitSeq); } catch (IOException e) { LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ", localMetaInfoFilePath, localJobInfoFilePath, e); @@ -1182,6 +1198,10 @@ public void readOthers(DataInput in) throws IOException { String value = Text.readString(in); properties.put(key, value); } + + if (properties.containsKey(SNAPSHOT_COMMIT_SEQ)) { + commitSeq = Long.parseLong(properties.get(SNAPSHOT_COMMIT_SEQ)); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java index c4c93548177ca5..a9f734dbc99220 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java @@ -34,14 +34,18 @@ public class Snapshot { @SerializedName(value = "expired_at") private long expiredAt = 0; + @SerializedName(value = "commitSeq") + private long commitSeq = 0; + public Snapshot() { } - public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt) { + public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, long commitSeq) { this.label = label; this.meta = meta; this.jobInfo = jobInfo; this.expiredAt = expiredAt; + this.commitSeq = commitSeq; } public byte[] getMeta() { @@ -60,6 +64,10 @@ public boolean isExpired() { return System.currentTimeMillis() > expiredAt; } + public long getCommitSeq() { + return commitSeq; + } + public String toJson() { return GsonUtils.GSON.toJson(this); } @@ -71,6 +79,7 @@ public String toString() { + ", meta=" + meta + ", jobInfo=" + jobInfo + ", expiredAt=" + expiredAt + + ", commitSeq=" + commitSeq + '}'; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index b218109383230d..b81068d475c2de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -518,6 +518,10 @@ public Set getTableNamesWithLock() { } } + public Set getTableNames() { + return new HashSet<>(this.nameToTable.keySet()); + } + /** * This is a thread-safe method when nameToTable is a concurrent hash map */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java index 2b4245b290c850..4a9ce13e03b3ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java @@ -46,6 +46,11 @@ public class BarrierLog implements Writable { public BarrierLog() { } + public BarrierLog(long dbId, String dbName) { + this.dbId = dbId; + this.dbName = dbName; + } + public BarrierLog(long dbId, String dbName, long tableId, String tableName) { this.dbId = dbId; this.dbName = dbName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 8c521cc73b0b52..e67534d302b9c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2820,9 +2820,10 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c byte[] meta = snapshot.getMeta(); byte[] jobInfo = snapshot.getJobInfo(); long expiredAt = snapshot.getExpiredAt(); + long commitSeq = snapshot.getCommitSeq(); - LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, expired at: {}", - label, meta.length, jobInfo.length, expiredAt); + LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, " + + "expired at: {}, commit seq: {}", label, meta.length, jobInfo.length, expiredAt, commitSeq); if (request.isEnableCompress()) { meta = GZIPUtils.compress(meta); jobInfo = GZIPUtils.compress(jobInfo); @@ -2835,6 +2836,7 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c result.setMeta(meta); result.setJobInfo(jobInfo); result.setExpiredAt(expiredAt); + result.setCommitSeq(commitSeq); } return result; diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java index 4e0eecda1fa7d3..dd9cb0752934b9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java @@ -212,7 +212,7 @@ Status getBrokerAddress(Long beId, Env env, List brokerAddrs) { new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME), null)); job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL, - env, repo.getId()); + env, repo.getId(), 0); } @Test @@ -348,7 +348,7 @@ public void testRunAbnormal() { new TableRef(new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, "unknown_tbl"), null)); job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL, - env, repo.getId()); + env, repo.getId(), 0); job.run(); Assert.assertEquals(Status.ErrCode.NOT_FOUND, job.getStatus().getErrCode()); Assert.assertEquals(BackupJobState.CANCELLED, job.getState()); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index a75275bd9174c1..b58f8a42e8924d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1204,6 +1204,7 @@ struct TGetSnapshotResult { 4: optional Types.TNetworkAddress master_address 5: optional bool compressed; 6: optional i64 expiredAt; // in millis + 7: optional i64 commit_seq; } struct TTableRef {