Skip to content

Commit

Permalink
[improve](backup) Add the commit seq at the backup job level. (#44049)
Browse files Browse the repository at this point in the history
Related PR: selectdb/ccr-syncer#237

Problem Summary:

When a backup job is created, it will read the database and obtain a set
of table refs. If there is a table creation statement running
simultaneously and the new table is not included in the table refs, then
the job commit seq obtained by the CCR syncer only through the commit
seq of the tables included in the snapshot may be greater than that of
the new table. Consequently, the table will be lost during the
synchronization process.

This PR acquires a commit seq within the database read lock as the
commit seq of the entire snapshot. In this way, the CCR syncer can
continue to synchronize the binlog starting from this value. Thus, the
table creation statements running in parallel with the backup job will
either be included in the table refs or their binlog can be observed by
the snapshot commit sequence.
  • Loading branch information
w41ter committed Nov 18, 2024
1 parent f20aff5 commit f457e85
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 23 deletions.
43 changes: 32 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.SnapshotTask;
Expand Down Expand Up @@ -308,20 +309,32 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
+ " is read only");
}

// Determine the tables to be backed up
long commitSeq = 0;
Set<String> tableNames = Sets.newHashSet();
AbstractBackupTableRefClause abstractBackupTableRefClause = stmt.getAbstractBackupTableRefClause();
if (abstractBackupTableRefClause == null) {
tableNames = db.getTableNamesWithLock();
} else if (abstractBackupTableRefClause.isExclude()) {
tableNames = db.getTableNamesWithLock();
for (TableRef tableRef : abstractBackupTableRefClause.getTableRefList()) {
if (!tableNames.remove(tableRef.getName().getTbl())) {
LOG.info("exclude table " + tableRef.getName().getTbl()
+ " of backup stmt is not exists in db " + db.getFullName());

// Obtain the snapshot commit seq, any creating table binlog will be visible.
db.readLock();
try {
BarrierLog log = new BarrierLog(db.getId(), db.getFullName());
commitSeq = env.getEditLog().logBarrier(log);

// Determine the tables to be backed up
if (abstractBackupTableRefClause == null) {
tableNames = db.getTableNames();
} else if (abstractBackupTableRefClause.isExclude()) {
tableNames = db.getTableNames();
for (TableRef tableRef : abstractBackupTableRefClause.getTableRefList()) {
if (!tableNames.remove(tableRef.getName().getTbl())) {
LOG.info("exclude table " + tableRef.getName().getTbl()
+ " of backup stmt is not exists in db " + db.getFullName());
}
}
}
} finally {
db.readUnlock();
}

List<TableRef> tblRefs = Lists.newArrayList();
if (abstractBackupTableRefClause != null && !abstractBackupTableRefClause.isExclude()) {
tblRefs = abstractBackupTableRefClause.getTableRefList();
Expand All @@ -338,6 +351,14 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
for (TableRef tblRef : tblRefs) {
String tblName = tblRef.getName().getTbl();
Table tbl = db.getTableOrDdlException(tblName);

// filter the table types which are not supported by local backup.
if (repository == null && tbl.getType() != TableType.OLAP
&& tbl.getType() != TableType.VIEW && tbl.getType() != TableType.MATERIALIZED_VIEW) {
tblRefsNotSupport.add(tblRef);
continue;
}

if (tbl.getType() == TableType.VIEW || tbl.getType() == TableType.ODBC) {
continue;
}
Expand Down Expand Up @@ -374,7 +395,7 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
}

// Check if label already be used
long repoId = -1;
long repoId = Repository.KEEP_ON_LOCAL_REPO_ID;
if (repository != null) {
List<String> existSnapshotNames = Lists.newArrayList();
Status st = repository.listSnapshots(existSnapshotNames);
Expand All @@ -396,7 +417,7 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
// Create a backup job
BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(),
ClusterNamespace.getNameFromFullName(db.getFullName()),
tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId);
tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId, commitSeq);
// write log
env.getEditLog().logBackupJob(backupJob);

Expand Down
34 changes: 27 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.BarrierLog;
Expand Down Expand Up @@ -85,6 +86,7 @@
public class BackupJob extends AbstractJob {
private static final Logger LOG = LogManager.getLogger(BackupJob.class);
private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:";
private static final String SNAPSHOT_COMMIT_SEQ = "commit_seq";

public enum BackupJobState {
PENDING, // Job is newly created. Send snapshot tasks and save copied meta info, then transfer to SNAPSHOTING
Expand Down Expand Up @@ -123,6 +125,8 @@ public enum BackupJobState {
// backup properties && table commit seq with table id
private Map<String, String> properties = Maps.newHashMap();

private long commitSeq = 0;

public BackupJob() {
super(JobType.BACKUP);
}
Expand All @@ -133,11 +137,13 @@ public BackupJob(JobType jobType) {
}

public BackupJob(String label, long dbId, String dbName, List<TableRef> tableRefs, long timeoutMs,
BackupContent content, Env env, long repoId) {
BackupContent content, Env env, long repoId, long commitSeq) {
super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId);
this.tableRefs = tableRefs;
this.state = BackupJobState.PENDING;
this.commitSeq = commitSeq;
properties.put(BackupStmt.PROP_CONTENT, content.name());
properties.put(SNAPSHOT_COMMIT_SEQ, String.valueOf(commitSeq));
}

public BackupJobState getState() {
Expand Down Expand Up @@ -237,7 +243,7 @@ public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishT
if (request.getTaskStatus().getStatusCode() == TStatusCode.TABLET_MISSING
&& !tryNewTabletSnapshotTask(task)) {
status = new Status(ErrCode.NOT_FOUND,
"make snapshot failed, failed to ge tablet, table will be droped or truncated");
"make snapshot failed, failed to ge tablet, table will be dropped or truncated");
cancelInternal();
}

Expand Down Expand Up @@ -379,6 +385,14 @@ public synchronized void run() {

LOG.debug("run backup job: {}", this);

if (state == BackupJobState.PENDING) {
String pausedLabel = DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_PENDING_BACKUP_JOB", "");
if (!pausedLabel.isEmpty() && label.startsWith(pausedLabel)) {
LOG.info("pause pending backup job by debug point: {}", this);
return;
}
}

// run job base on current state
switch (state) {
case PENDING:
Expand Down Expand Up @@ -526,7 +540,7 @@ private void checkOlapTable(OlapTable olapTable, TableRef backupTableRef) {

private void prepareSnapshotTaskForOlapTableWithoutLock(Database db, OlapTable olapTable,
TableRef backupTableRef, AgentBatchTask batchTask) {
// Add barrier editolog for barrier commit seq
// Add barrier editlog for barrier commit seq
long dbId = db.getId();
String dbName = db.getFullName();
long tableId = olapTable.getId();
Expand Down Expand Up @@ -656,13 +670,11 @@ private void removeUnsupportProperties(OlapTable tbl) {

private void waitingAllSnapshotsFinished() {
if (unfinishedTaskIds.isEmpty()) {

if (env.getEditLog().exceedMaxJournalSize(this)) {
status = new Status(ErrCode.COMMON_ERROR, "backupJob is too large ");
return;
}


snapshotFinishedTime = System.currentTimeMillis();
state = BackupJobState.UPLOAD_SNAPSHOT;

Expand Down Expand Up @@ -972,6 +984,10 @@ public boolean isLocalSnapshot() {
return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
}

public long getCommitSeq() {
return commitSeq;
}

// read meta and job info bytes from disk, and return the snapshot
public synchronized Snapshot getSnapshot() {
if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
Expand All @@ -981,15 +997,15 @@ public synchronized Snapshot getSnapshot() {
// Avoid loading expired meta.
long expiredAt = createTime + timeoutMs;
if (System.currentTimeMillis() >= expiredAt) {
return new Snapshot(label, new byte[0], new byte[0], expiredAt);
return new Snapshot(label, new byte[0], new byte[0], expiredAt, commitSeq);
}

try {
File metaInfoFile = new File(localMetaInfoFilePath);
File jobInfoFile = new File(localJobInfoFilePath);
byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt);
return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, commitSeq);
} catch (IOException e) {
LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ",
localMetaInfoFilePath, localJobInfoFilePath, e);
Expand Down Expand Up @@ -1182,6 +1198,10 @@ public void readOthers(DataInput in) throws IOException {
String value = Text.readString(in);
properties.put(key, value);
}

if (properties.containsKey(SNAPSHOT_COMMIT_SEQ)) {
commitSeq = Long.parseLong(properties.get(SNAPSHOT_COMMIT_SEQ));
}
}

@Override
Expand Down
11 changes: 10 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/backup/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,18 @@ public class Snapshot {
@SerializedName(value = "expired_at")
private long expiredAt = 0;

@SerializedName(value = "commitSeq")
private long commitSeq = 0;

public Snapshot() {
}

public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt) {
public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, long commitSeq) {
this.label = label;
this.meta = meta;
this.jobInfo = jobInfo;
this.expiredAt = expiredAt;
this.commitSeq = commitSeq;
}

public byte[] getMeta() {
Expand All @@ -60,6 +64,10 @@ public boolean isExpired() {
return System.currentTimeMillis() > expiredAt;
}

public long getCommitSeq() {
return commitSeq;
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}
Expand All @@ -71,6 +79,7 @@ public String toString() {
+ ", meta=" + meta
+ ", jobInfo=" + jobInfo
+ ", expiredAt=" + expiredAt
+ ", commitSeq=" + commitSeq
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,10 @@ public Set<String> getTableNamesWithLock() {
}
}

public Set<String> getTableNames() {
return new HashSet<>(this.nameToTable.keySet());
}

/**
* This is a thread-safe method when nameToTable is a concurrent hash map
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public class BarrierLog implements Writable {
public BarrierLog() {
}

public BarrierLog(long dbId, String dbName) {
this.dbId = dbId;
this.dbName = dbName;
}

public BarrierLog(long dbId, String dbName, long tableId, String tableName) {
this.dbId = dbId;
this.dbName = dbName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2820,9 +2820,10 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c
byte[] meta = snapshot.getMeta();
byte[] jobInfo = snapshot.getJobInfo();
long expiredAt = snapshot.getExpiredAt();
long commitSeq = snapshot.getCommitSeq();

LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, expired at: {}",
label, meta.length, jobInfo.length, expiredAt);
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, "
+ "expired at: {}, commit seq: {}", label, meta.length, jobInfo.length, expiredAt, commitSeq);
if (request.isEnableCompress()) {
meta = GZIPUtils.compress(meta);
jobInfo = GZIPUtils.compress(jobInfo);
Expand All @@ -2835,6 +2836,7 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c
result.setMeta(meta);
result.setJobInfo(jobInfo);
result.setExpiredAt(expiredAt);
result.setCommitSeq(commitSeq);
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ Status getBrokerAddress(Long beId, Env env, List<FsBroker> brokerAddrs) {
new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
null));
job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL,
env, repo.getId());
env, repo.getId(), 0);
}

@Test
Expand Down Expand Up @@ -348,7 +348,7 @@ public void testRunAbnormal() {
new TableRef(new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, "unknown_tbl"),
null));
job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL,
env, repo.getId());
env, repo.getId(), 0);
job.run();
Assert.assertEquals(Status.ErrCode.NOT_FOUND, job.getStatus().getErrCode());
Assert.assertEquals(BackupJobState.CANCELLED, job.getState());
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,7 @@ struct TGetSnapshotResult {
4: optional Types.TNetworkAddress master_address
5: optional bool compressed;
6: optional i64 expiredAt; // in millis
7: optional i64 commit_seq;
}

struct TTableRef {
Expand Down

0 comments on commit f457e85

Please sign in to comment.