Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](backup) Load backup meta and job info bytes from disk #43276 #43518

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public void setTypeRead(boolean isTypeRead) {

public abstract boolean isCancelled();

public abstract boolean isFinished();

public static AbstractJob read(DataInput in) throws IOException {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ public class BackupHandler extends MasterDaemon implements Writable {

private Env env;

// map to store backup info, key is label name, value is Pair<meta, info>, meta && info is bytes
// this map not present in persist && only in fe master memory
// map to store backup info, key is label name, value is the BackupJob
// this map not present in persist && only in fe memory
// one table only keep one snapshot info, only keep last
private final Map<String, Snapshot> localSnapshots = new HashMap<>();
private final Map<String, BackupJob> localSnapshots = new HashMap<>();
private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock();

public BackupHandler() {
Expand Down Expand Up @@ -166,6 +166,7 @@ private boolean init() {
return false;
}
}

isInit = true;
return true;
}
Expand Down Expand Up @@ -484,11 +485,15 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
return;
}

List<String> removedLabels = Lists.newArrayList();
jobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList());
while (jobs.size() >= Config.max_backup_restore_job_num_per_db) {
jobs.removeFirst();
AbstractJob removedJob = jobs.removeFirst();
if (removedJob instanceof BackupJob && ((BackupJob) removedJob).isLocalSnapshot()) {
removedLabels.add(removedJob.getLabel());
}
}
AbstractJob lastJob = jobs.peekLast();

Expand All @@ -501,6 +506,17 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
} finally {
jobLock.unlock();
}

if (job.isFinished() && job instanceof BackupJob) {
// Save snapshot to local repo, when reload backupHandler from image.
BackupJob backupJob = (BackupJob) job;
if (backupJob.isLocalSnapshot()) {
addSnapshot(backupJob.getLabel(), backupJob);
}
}
for (String label : removedLabels) {
removeSnapshot(label);
}
}

private List<AbstractJob> getAllCurrentJobs() {
Expand Down Expand Up @@ -737,22 +753,42 @@ public boolean report(TTaskType type, long jobId, long taskId, int finishedNum,
return false;
}

public void addSnapshot(String labelName, Snapshot snapshot) {
public void addSnapshot(String labelName, BackupJob backupJob) {
assert backupJob.isFinished();

LOG.info("add snapshot {} to local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
localSnapshots.put(labelName, snapshot);
localSnapshots.put(labelName, backupJob);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}

public void removeSnapshot(String labelName) {
LOG.info("remove snapshot {} from local repo", labelName);
localSnapshotsLock.writeLock().lock();
try {
localSnapshots.remove(labelName);
} finally {
localSnapshotsLock.writeLock().unlock();
}
}

public Snapshot getSnapshot(String labelName) {
BackupJob backupJob;
localSnapshotsLock.readLock().lock();
try {
return localSnapshots.get(labelName);
backupJob = localSnapshots.get(labelName);
} finally {
localSnapshotsLock.readLock().unlock();
}

if (backupJob == null) {
return null;
}

return backupJob.getSnapshot();
}

public static BackupHandler read(DataInput in) throws IOException {
Expand Down
45 changes: 30 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ public enum BackupJobState {
// backup properties && table commit seq with table id
private Map<String, String> properties = Maps.newHashMap();

private byte[] metaInfoBytes = null;
private byte[] jobInfoBytes = null;

public BackupJob() {
super(JobType.BACKUP);
}
Expand Down Expand Up @@ -333,11 +330,7 @@ public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTas

@Override
public synchronized void replayRun() {
LOG.info("replay run backup job: {}", this);
if (state == BackupJobState.FINISHED && repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
env.getBackupHandler().addSnapshot(label, snapshot);
}
// nothing to do
}

@Override
Expand All @@ -355,6 +348,11 @@ public boolean isCancelled() {
return state == BackupJobState.CANCELLED;
}

@Override
public boolean isFinished() {
return state == BackupJobState.FINISHED;
}

// Polling the job state and do the right things.
@Override
public synchronized void run() {
Expand Down Expand Up @@ -792,8 +790,6 @@ private void saveMetaInfo() {
}
backupMeta.writeToFile(metaInfoFile);
localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
// read meta info to metaInfoBytes
metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());

// 3. save job info file
Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
Expand All @@ -818,8 +814,6 @@ private void saveMetaInfo() {
}
jobInfo.writeToFile(jobInfoFile);
localJobInfoFilePath = jobInfoFile.getAbsolutePath();
// read job info to jobInfoBytes
jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
} catch (Exception e) {
status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage());
return;
Expand Down Expand Up @@ -873,7 +867,6 @@ private void uploadMetaAndJobInfoFile() {
}
}


finishedTime = System.currentTimeMillis();
state = BackupJobState.FINISHED;

Expand All @@ -882,8 +875,7 @@ private void uploadMetaAndJobInfoFile() {
LOG.info("job is finished. {}", this);

if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) {
Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes);
env.getBackupHandler().addSnapshot(label, snapshot);
env.getBackupHandler().addSnapshot(label, this);
return;
}
}
Expand Down Expand Up @@ -976,6 +968,29 @@ private void cancelInternal() {
LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this);
}

public boolean isLocalSnapshot() {
return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
}

// read meta and job info bytes from disk, and return the snapshot
public synchronized Snapshot getSnapshot() {
if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
return null;
}

try {
File metaInfoFile = new File(localMetaInfoFilePath);
File jobInfoFile = new File(localJobInfoFilePath);
byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
return new Snapshot(label, metaInfoBytes, jobInfoBytes);
} catch (IOException e) {
LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ",
localMetaInfoFilePath, localJobInfoFilePath, e);
return null;
}
}

public synchronized List<String> getInfo() {
List<String> info = Lists.newArrayList();
info.add(String.valueOf(jobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ public boolean isCancelled() {
return state == RestoreJobState.CANCELLED;
}

@Override
public boolean isFinished() {
return state == RestoreJobState.FINISHED;
}

@Override
public synchronized void run() {
if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2803,15 +2803,18 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c
}

// Step 3: get snapshot
String label = request.getLabelName();
TGetSnapshotResult result = new TGetSnapshotResult();
result.setStatus(new TStatus(TStatusCode.OK));
Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName());
Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(label);
if (snapshot == null) {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
result.getStatus().addToErrorMsgs("snapshot not exist");
result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label));
} else {
result.setMeta(snapshot.getMeta());
result.setJobInfo(snapshot.getJobInfo());
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}",
label, snapshot.getMeta().length, snapshot.getJobInfo().length);
}

return result;
Expand Down
Loading