From e19567ca5f55b14181477ae180e33ee93ae116d9 Mon Sep 17 00:00:00 2001 From: walter Date: Wed, 6 Nov 2024 09:44:16 +0800 Subject: [PATCH] [fix](backup) Load backup meta and job info bytes from disk (#43276) 1. the metaInfoBytes and jobInfoBytes do not persist and are not reloaded from disks during image loading and journal replaying. This PR fixes this issue by reading them from files. 2. In some scenes, the metaInfoBytes and jobInfoBytes are large and consume a lot of memory. Reading them from files on demand can reduce memory usage. --- .../org/apache/doris/backup/AbstractJob.java | 2 + .../apache/doris/backup/BackupHandler.java | 50 ++++++++++++++++--- .../org/apache/doris/backup/BackupJob.java | 45 +++++++++++------ .../org/apache/doris/backup/RestoreJob.java | 5 ++ .../doris/service/FrontendServiceImpl.java | 7 ++- 5 files changed, 85 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java index a8489361d15b7a..b0da4b6ec277cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java @@ -170,6 +170,8 @@ public void setTypeRead(boolean isTypeRead) { public abstract boolean isCancelled(); + public abstract boolean isFinished(); + public abstract Status updateRepo(Repository repo); public static AbstractJob read(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 282b549c5df352..d585176e3d47a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -110,10 +110,10 @@ public class BackupHandler extends MasterDaemon implements Writable { private Env env; - // map to store backup info, key is label name, value is Pair, meta && info is bytes - // this map not present in persist && only in fe master memory + // map to store backup info, key is label name, value is the BackupJob + // this map not present in persist && only in fe memory // one table only keep one snapshot info, only keep last - private final Map localSnapshots = new HashMap<>(); + private final Map localSnapshots = new HashMap<>(); private ReadWriteLock localSnapshotsLock = new ReentrantReadWriteLock(); public BackupHandler() { @@ -168,6 +168,7 @@ private boolean init() { return false; } } + isInit = true; return true; } @@ -558,11 +559,15 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) { return; } + List removedLabels = Lists.newArrayList(); jobLock.lock(); try { Deque jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList()); while (jobs.size() >= Config.max_backup_restore_job_num_per_db) { - jobs.removeFirst(); + AbstractJob removedJob = jobs.removeFirst(); + if (removedJob instanceof BackupJob && ((BackupJob) removedJob).isLocalSnapshot()) { + removedLabels.add(removedJob.getLabel()); + } } AbstractJob lastJob = jobs.peekLast(); @@ -575,6 +580,17 @@ private void addBackupOrRestoreJob(long dbId, AbstractJob job) { } finally { jobLock.unlock(); } + + if (job.isFinished() && job instanceof BackupJob) { + // Save snapshot to local repo, when reload backupHandler from image. + BackupJob backupJob = (BackupJob) job; + if (backupJob.isLocalSnapshot()) { + addSnapshot(backupJob.getLabel(), backupJob); + } + } + for (String label : removedLabels) { + removeSnapshot(label); + } } private List getAllCurrentJobs() { @@ -813,22 +829,42 @@ public boolean report(TTaskType type, long jobId, long taskId, int finishedNum, return false; } - public void addSnapshot(String labelName, Snapshot snapshot) { + public void addSnapshot(String labelName, BackupJob backupJob) { + assert backupJob.isFinished(); + + LOG.info("add snapshot {} to local repo", labelName); localSnapshotsLock.writeLock().lock(); try { - localSnapshots.put(labelName, snapshot); + localSnapshots.put(labelName, backupJob); + } finally { + localSnapshotsLock.writeLock().unlock(); + } + } + + public void removeSnapshot(String labelName) { + LOG.info("remove snapshot {} from local repo", labelName); + localSnapshotsLock.writeLock().lock(); + try { + localSnapshots.remove(labelName); } finally { localSnapshotsLock.writeLock().unlock(); } } public Snapshot getSnapshot(String labelName) { + BackupJob backupJob; localSnapshotsLock.readLock().lock(); try { - return localSnapshots.get(labelName); + backupJob = localSnapshots.get(labelName); } finally { localSnapshotsLock.readLock().unlock(); } + + if (backupJob == null) { + return null; + } + + return backupJob.getSnapshot(); } public static BackupHandler read(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index ab7bfd8a03f6f7..c7e00f132dc0bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -130,9 +130,6 @@ public enum BackupJobState { @SerializedName("prop") private Map properties = Maps.newHashMap(); - private byte[] metaInfoBytes = null; - private byte[] jobInfoBytes = null; - public BackupJob() { super(JobType.BACKUP); } @@ -344,11 +341,7 @@ public synchronized boolean finishSnapshotUploadTask(UploadTask task, TFinishTas @Override public synchronized void replayRun() { - LOG.info("replay run backup job: {}", this); - if (state == BackupJobState.FINISHED && repoId == Repository.KEEP_ON_LOCAL_REPO_ID) { - Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes); - env.getBackupHandler().addSnapshot(label, snapshot); - } + // nothing to do } @Override @@ -366,6 +359,11 @@ public boolean isCancelled() { return state == BackupJobState.CANCELLED; } + @Override + public boolean isFinished() { + return state == BackupJobState.FINISHED; + } + @Override public synchronized Status updateRepo(Repository repo) { this.repo = repo; @@ -839,8 +837,6 @@ private void saveMetaInfo() { } backupMeta.writeToFile(metaInfoFile); localMetaInfoFilePath = metaInfoFile.getAbsolutePath(); - // read meta info to metaInfoBytes - metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); // 3. save job info file Map tableCommitSeqMap = Maps.newHashMap(); @@ -867,8 +863,6 @@ private void saveMetaInfo() { } jobInfo.writeToFile(jobInfoFile); localJobInfoFilePath = jobInfoFile.getAbsolutePath(); - // read job info to jobInfoBytes - jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); } catch (Exception e) { status = new Status(ErrCode.COMMON_ERROR, "failed to save meta info and job info file: " + e.getMessage()); return; @@ -922,7 +916,6 @@ private void uploadMetaAndJobInfoFile() { } } - finishedTime = System.currentTimeMillis(); state = BackupJobState.FINISHED; @@ -931,8 +924,7 @@ private void uploadMetaAndJobInfoFile() { LOG.info("job is finished. {}", this); if (repoId == Repository.KEEP_ON_LOCAL_REPO_ID) { - Snapshot snapshot = new Snapshot(label, metaInfoBytes, jobInfoBytes); - env.getBackupHandler().addSnapshot(label, snapshot); + env.getBackupHandler().addSnapshot(label, this); return; } } @@ -1025,6 +1017,29 @@ private void cancelInternal() { LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this); } + public boolean isLocalSnapshot() { + return repoId == Repository.KEEP_ON_LOCAL_REPO_ID; + } + + // read meta and job info bytes from disk, and return the snapshot + public synchronized Snapshot getSnapshot() { + if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) { + return null; + } + + try { + File metaInfoFile = new File(localMetaInfoFilePath); + File jobInfoFile = new File(localJobInfoFilePath); + byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath()); + byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath()); + return new Snapshot(label, metaInfoBytes, jobInfoBytes); + } catch (IOException e) { + LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ", + localMetaInfoFilePath, localJobInfoFilePath, e); + return null; + } + } + public synchronized List getInfo() { List info = Lists.newArrayList(); info.add(String.valueOf(jobId)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index cdf12c27790e16..57f56e79f290df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -396,6 +396,11 @@ public boolean isCancelled() { return state == RestoreJobState.CANCELLED; } + @Override + public boolean isFinished() { + return state == RestoreJobState.FINISHED; + } + @Override public synchronized Status updateRepo(Repository repo) { this.repo = repo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index e3d543d9111dc8..31aef1ebb7c791 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2908,15 +2908,18 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c } // Step 3: get snapshot + String label = request.getLabelName(); TGetSnapshotResult result = new TGetSnapshotResult(); result.setStatus(new TStatus(TStatusCode.OK)); - Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(request.getLabelName()); + Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(label); if (snapshot == null) { result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST); - result.getStatus().addToErrorMsgs("snapshot not exist"); + result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label)); } else { result.setMeta(snapshot.getMeta()); result.setJobInfo(snapshot.getJobInfo()); + LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}", + label, snapshot.getMeta().length, snapshot.getJobInfo().length); } return result;