Skip to content

Commit

Permalink
[fix](backup) Load backup meta and job info bytes from disk (#43276)
Browse files Browse the repository at this point in the history
1. the metaInfoBytes and jobInfoBytes do not persist and are not
reloaded from disks during image loading and journal replaying. This PR
fixes this issue by reading them from files.
2. In some scenes, the metaInfoBytes and jobInfoBytes are large and
consume a lot of memory. Reading them from files on demand can reduce
memory usage.
  • Loading branch information
w41ter committed Nov 8, 2024
1 parent 4b3aa2e commit 94d65d9
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public void setTypeRead(boolean isTypeRead) {

public abstract boolean isCancelled();

public abstract boolean isFinished();

public static AbstractJob read(DataInput in) throws IOException {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ public class BackupHandler extends MasterDaemon implements Writable {

private Env env;

// map to store backup info, key is label name, value is Pair<meta, info>, meta && info is bytes
// this map not present in persist && only in fe master memory
// map to store backup info, key is label name, value is the BackupJob
// this map not present in persist && only in fe memory
// one table only keep one snapshot info, only keep last
private final Map<String, Snapshot> localSnapshots = new HashMap<>();
private final Map<String, BackupJob> localSnapshots = new HashMap<>();
private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();

public BackupHandler() {
Expand Down Expand Up @@ -166,6 +166,7 @@ private boolean init() {
return false;
}
}

isInit = true;
return true;
}
Expand Down Expand Up @@ -484,11 +485,15 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
return;
}

List<String> removedLabels = Lists.newArrayList();
jobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList());
while (jobs.size() >= Config.max_backup_restore_job_num_per_db) {
jobs.removeFirst();
AbstractJob removedJob = jobs.removeFirst();
if (removedJob instanceof BackupJob && ((BackupJob) removedJob).isLocalSnapshot()) {
removedLabels.add(removedJob.getLabel());
}
}
AbstractJob lastJob = jobs.peekLast();

Expand All @@ -501,6 +506,17 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
} finally {
jobLock.unlock();
}

if (job.isFinished() && job instanceof BackupJob) {
// Save snapshot to local repo, when reload backupHandler from image.
BackupJob backupJob = (BackupJob) job;
if (backupJob.isLocalSnapshot()) {
addSnapshot(backupJob.getLabel(), backupJob);
}
}
for (String label : removedLabels) {
removeSnapshot(label);
}
}

private List<AbstractJob> getAllCurrentJobs() {
Expand Down Expand Up @@ -737,22 +753,42 @@ public boolean report(TTaskType type, long jobId, long taskId, int finishedNum,
return false;
}

public void addSnapshot(String labelName, Snapshot snapshot) {
public void addSnapshot(String labelName, BackupJob backupJob) {
assert backupJob.isFinished();

LOG.info("add snapshot {} to local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
localSnapshots.put(labelName, snapshot);
localSnapshots.put(labelName, backupJob);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}

public void removeSnapshot(String labelName) {
LOG.info("remove snapshot {} from local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
localSnapshots.remove(labelName);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}

public Snapshot getSnapshot(String labelName) {
BackupJob backupJob;
localSnapshotsLock.readLock().lock();
try {
return localSnapshots.get(labelName);
backupJob = localSnapshots.get(labelName);
} finally {
localSnapshotsLock.readLock().unlock();
}

if (backupJob == null) {
return null;
}

return backupJob.getSnapshot();
}

public static BackupHandler read(DataInput in) throws IOException {
Expand Down
45 changes: 30 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ public enum BackupJobState {
// backup properties && table commit seq with table id
private Map<String, String> properties = Maps.newHashMap();

private byte[] metaInfoBytes = null;
private byte[] jobInfoBytes = null;

public BackupJob() {
super(JobType.BACKUP);
}
Expand Down Expand Up @@ -333,11 +330,7 @@ public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTas

@Override
public synchronized void replayRun() {
LOG.info("replay run backup job: {}", this);
if (state == BackupJobState.FINISHED && repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
env.getBackupHandler().addSnapshot(label, snapshot);
}
// nothing to do
}

@Override
Expand All @@ -355,6 +348,11 @@ public boolean isCancelled() {
return state == BackupJobState.CANCELLED;
}

@Override
public boolean isFinished() {
return state == BackupJobState.FINISHED;
}

// Polling the job state and do the right things.
@Override
public synchronized void run() {
Expand Down Expand Up @@ -792,8 +790,6 @@ private void saveMetaInfo() {
}
backupMeta.writeToFile(metaInfoFile);
localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
// read meta info to metaInfoBytes
metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());

// 3. save job info file
Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
Expand All @@ -818,8 +814,6 @@ private void saveMetaInfo() {
}
jobInfo.writeToFile(jobInfoFile);
localJobInfoFilePath = jobInfoFile.getAbsolutePath();
// read job info to jobInfoBytes
jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
} catch (Exception e) {
status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage());
return;
Expand Down Expand Up @@ -873,7 +867,6 @@ private void uploadMetaAndJobInfoFile() {
}
}


finishedTime = System.currentTimeMillis();
state = BackupJobState.FINISHED;

Expand All @@ -882,8 +875,7 @@ private void uploadMetaAndJobInfoFile() {
LOG.info("job is finished. {}", this);

if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
env.getBackupHandler().addSnapshot(label, snapshot);
env.getBackupHandler().addSnapshot(label, this);
return;
}
}
Expand Down Expand Up @@ -976,6 +968,29 @@ private void cancelInternal() {
LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this);
}

public boolean isLocalSnapshot() {
return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
}

// read meta and job info bytes from disk, and return the snapshot
public synchronized Snapshot getSnapshot() {
if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
return null;
}

try {
File metaInfoFile = new File(localMetaInfoFilePath);
File jobInfoFile = new File(localJobInfoFilePath);
byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
return new Snapshot(label, metaInfoBytes, jobInfoBytes);
} catch (IOException e) {
LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ",
localMetaInfoFilePath, localJobInfoFilePath, e);
return null;
}
}

public synchronized List<String> getInfo() {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ public boolean isCancelled() {
return state == RestoreJobState.CANCELLED;
}

@Override
public boolean isFinished() {
return state == RestoreJobState.FINISHED;
}

@Override
public synchronized void run() {
if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2803,15 +2803,18 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c
}

// Step 3: get snapshot
String label = request.getLabelName();
TGetSnapshotResult result = new TGetSnapshotResult();
result.setStatus(new TStatus(TStatusCode.OK));
Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName());
Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(label);
if (snapshot == null) {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
result.getStatus().addToErrorMsgs("snapshot not exist");
result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label));
} else {
result.setMeta(snapshot.getMeta());
result.setJobInfo(snapshot.getJobInfo());
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}",
label, snapshot.getMeta().length, snapshot.getJobInfo().length);
}

return result;
Expand Down

0 comments on commit 94d65d9

Please sign in to comment.