Skip to content

Commit

Permalink
Updated BackupDirectorySize interface to get the file count for snaps…
Browse files Browse the repository at this point in the history
…hots/backups
  • Loading branch information
ayushis committed Sep 17, 2024
1 parent 10840be commit bde4768
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import com.google.inject.ImplementedBy;

/** estimates the number of bytes remaining to upload in a snapshot */
@ImplementedBy(SnapshotDirectorySize.class)
/** estimates the number of bytes and files remaining to upload in a snapshot/backup */
public interface DirectorySize {
/** return the total bytes of all snapshot files south of location in the filesystem */
/** return the total bytes of all snapshot/backup files south of location in the filesystem */
long getBytes(String location);
/** return the total files of all snapshot/backup files south of location in the filesystem */
int getFiles(String location);
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,4 @@ private Void deleteIfEmpty(File dir) {
if (FileUtils.sizeOfDirectory(dir) == 0) FileUtils.deleteQuietly(dir);
return null;
}

public static int countFilesInBackupDir(IConfiguration config) throws Exception {
int totalFileCount = 0;
Set<Path> backupDirectories =
AbstractBackup.getBackupDirectories(config, INCREMENTAL_BACKUP_FOLDER);
for (Path backupDir : backupDirectories) {
try (Stream<Path> stream = Files.list(backupDir)) {
totalFileCount += stream.filter(Files::isRegularFile).count();
} catch (Exception e) {
logger.error("Failed to get files in backups directory. {}", e.getMessage());
e.printStackTrace();
}
}
return totalFileCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.netflix.priam.backup;

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;

/** Estimates remaining bytes or files to upload in a backup by looking at the file system */
public class IncrementalBackupDirectorySize implements DirectorySize {

public long getBytes(String location) {
SummingFileVisitor fileVisitor = new SummingFileVisitor();
try {
Files.walkFileTree(Paths.get(location), fileVisitor);
} catch (IOException e) {
// BackupFileVisitor is happy with an estimate and won't produce these in practice.
}
return fileVisitor.getTotalBytes();
}

public int getFiles(String location) {
SummingFileVisitor fileVisitor = new SummingFileVisitor();
try {
Files.walkFileTree(Paths.get(location), fileVisitor);
} catch (IOException e) {
// BackupFileVisitor is happy with an estimate and won't produce these in practice.
}
return fileVisitor.getTotalFiles();
}

private static final class SummingFileVisitor implements FileVisitor<Path> {
private long totalBytes;
private int totalFiles;

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toString().contains(AbstractBackup.INCREMENTAL_BACKUP_FOLDER) && attrs.isRegularFile()) {
totalBytes += attrs.size();
totalFiles += 1;
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) {
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
return FileVisitResult.CONTINUE;
}

long getTotalBytes() {
return totalBytes;
}

int getTotalFiles() { return totalFiles; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;

/** Estimates remaining bytes to upload in a backup by looking at the file system */
/** Estimates remaining bytes or files to upload in a backup by looking at the file system */
public class SnapshotDirectorySize implements DirectorySize {

public long getBytes(String location) {
Expand All @@ -17,8 +17,19 @@ public long getBytes(String location) {
return fileVisitor.getTotalBytes();
}

public int getFiles(String location) {
SummingFileVisitor fileVisitor = new SummingFileVisitor();
try {
Files.walkFileTree(Paths.get(location), fileVisitor);
} catch (IOException e) {
// BackupFileVisitor is happy with an estimate and won't produce these in practice.
}
return fileVisitor.getTotalFiles();
}

private static final class SummingFileVisitor implements FileVisitor<Path> {
private long totalBytes;
private int totalFiles;

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
Expand All @@ -29,6 +40,7 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toString().contains(AbstractBackup.SNAPSHOT_FOLDER) && attrs.isRegularFile()) {
totalBytes += attrs.size();
totalFiles += 1;
}
return FileVisitResult.CONTINUE;
}
Expand All @@ -46,5 +58,7 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
long getTotalBytes() {
return totalBytes;
}

int getTotalFiles() { return totalFiles; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.netflix.priam.backupv2;

import com.netflix.priam.backup.DirectorySize;
import com.netflix.priam.backup.IncrementalBackup;
import com.netflix.priam.config.IBackupRestoreConfig;
import com.netflix.priam.config.IConfiguration;
Expand All @@ -29,6 +30,9 @@
import java.util.Map;
import javax.inject.Inject;
import org.apache.commons.lang3.math.Fraction;
import com.netflix.priam.backup.SnapshotDirectorySize;
import com.netflix.priam.backup.IncrementalBackupDirectorySize;


/**
* Encapsulate the backup service 2.0 - Execute all the tasks required to run backup service.
Expand All @@ -41,6 +45,8 @@ public class BackupV2Service implements IService {
private final SnapshotMetaTask snapshotMetaTask;
private final CassandraTunerService cassandraTunerService;
private final ITokenRetriever tokenRetriever;
private final DirectorySize snapshotDirectorySize = new SnapshotDirectorySize();
private final DirectorySize incrementalBackupDirectorySize = new IncrementalBackupDirectorySize();

@Inject
public BackupV2Service(
Expand Down Expand Up @@ -106,8 +112,8 @@ public void updateServicePost() throws Exception {}

public Map<String, Integer> countPendingBackupFiles() throws Exception {
Map<String, Integer> backupFiles = new HashMap<String, Integer>();
backupFiles.put("snapshotFiles", snapshotMetaTask.countFilesInSnapshotDir(configuration));
backupFiles.put("incrementalFiles", IncrementalBackup.countFilesInBackupDir(configuration));
backupFiles.put("totalFiles", (snapshotDirectorySize.getFiles(configuration.getDataFileLocation()) +
incrementalBackupDirectorySize.getFiles(configuration.getDataFileLocation())));
return backupFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -443,26 +443,4 @@ public void onFailure(Throwable t) {
};
Futures.addCallback(future, callback, MoreExecutors.directExecutor());
}

public int countFilesInSnapshotDir(IConfiguration config) throws Exception {
int totalFileCount = 0;
Set<Path> snapshotDirectories =
AbstractBackup.getBackupDirectories(config, SNAPSHOT_FOLDER);
for (Path snapshotDir : snapshotDirectories) {
try (DirectoryStream<Path> directoryStream =
Files.newDirectoryStream(snapshotDir, Files::isDirectory)) {
for (Path backupDir : directoryStream) {
if (backupDir.toFile().getName().startsWith(SNAPSHOT_PREFIX)) {
try (Stream<Path> stream = Files.list(backupDir)) {
totalFileCount += stream.filter(Files::isRegularFile).count();
}
}
}
} catch (Exception e) {
logger.error("Failed to get files in snapshot directory. {}", e.getMessage());
e.printStackTrace();
}
}
return totalFileCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.netflix.priam.backupv2.IMetaProxy;
import com.netflix.priam.backupv2.SnapshotMetaTask;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.merics.BackupMetrics;
import com.netflix.priam.notification.BackupNotificationMgr;
import com.netflix.priam.utils.DateUtil;
import com.netflix.priam.utils.DateUtil.DateRange;
Expand Down Expand Up @@ -62,7 +61,6 @@ public class BackupServletV2 {
private final BackupNotificationMgr backupNotificationMgr;
private final PriamServer priamServer;

private final BackupMetrics backupMetrics;
private static final String REST_SUCCESS = "[\"ok\"]";

@Inject
Expand All @@ -77,8 +75,7 @@ public BackupServletV2(
Provider<AbstractBackupPath> pathProvider,
BackupV2Service backupService,
BackupNotificationMgr backupNotificationMgr,
PriamServer priamServer,
BackupMetrics backupMetrics) {
PriamServer priamServer) {
this.backupStatusMgr = backupStatusMgr;
this.backupVerification = backupVerification;
this.snapshotMetaService = snapshotMetaService;
Expand All @@ -89,7 +86,6 @@ public BackupServletV2(
this.backupService = backupService;
this.backupNotificationMgr = backupNotificationMgr;
this.priamServer = priamServer;
this.backupMetrics = backupMetrics;
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
public class TestBackupDynamicRateLimiter {
private static final Instant NOW = Instant.ofEpochMilli(1 << 16);
private static final Instant LATER = NOW.plusMillis(Duration.ofHours(1).toMillis());
private static final int DIR_SIZE = 1 << 16;
private static final int DIR_SIZE_BYTES = 1 << 16;
private static final int DIR_SIZE_FILES = 10;

private BackupDynamicRateLimiter rateLimiter;
private FakeConfiguration config;
Expand All @@ -34,22 +35,22 @@ public void setUp() {

@Test
public void sunnyDay() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Stopwatch timer = timePermitAcquisition(getBackupPath(), LATER, 21);
Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtLeast(1_000);
Truth.assertThat(timer.elapsed(TimeUnit.MILLISECONDS)).isAtMost(2_000);
}

@Test
public void targetSetToEpoch() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Stopwatch timer = timePermitAcquisition(getBackupPath(), Instant.EPOCH, 20);
assertNoRateLimiting(timer);
}

@Test
public void pathIsNotASnapshot() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
AbstractBackupPath path =
getBackupPath(
"target/data/Keyspace1/Standard1/backups/Keyspace1-Standard1-ia-4-Data.db");
Expand All @@ -59,47 +60,47 @@ public void pathIsNotASnapshot() {

@Test
public void targetIsNow() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Stopwatch timer = timePermitAcquisition(getBackupPath(), NOW, 20);
assertNoRateLimiting(timer);
}

@Test
public void targetIsInThePast() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
Instant target = NOW.minus(Duration.ofHours(1L));
Stopwatch timer = timePermitAcquisition(getBackupPath(), target, 20);
assertNoRateLimiting(timer);
}

@Test
public void noBackupThreads() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 0), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 0), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalState(() -> timePermitAcquisition(getBackupPath(), LATER, 20));
}

@Test
public void negativeBackupThreads() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", -1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", -1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalState(() -> timePermitAcquisition(getBackupPath(), LATER, 20));
}

@Test
public void noData() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, 0);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, 0, 0);
Stopwatch timer = timePermitAcquisition(getBackupPath(), LATER, 20);
assertNoRateLimiting(timer);
}

@Test
public void noPermitsRequested() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalArgument(() -> timePermitAcquisition(getBackupPath(), LATER, 0));
}

@Test
public void negativePermitsRequested() {
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE);
rateLimiter = getRateLimiter(ImmutableMap.of("Priam.backup.threads", 1), NOW, DIR_SIZE_BYTES, DIR_SIZE_FILES);
assertIllegalArgument(() -> timePermitAcquisition(getBackupPath(), LATER, -1));
}

Expand All @@ -123,12 +124,12 @@ private Stopwatch timePermitAcquisition(AbstractBackupPath path, Instant now, in
}

private BackupDynamicRateLimiter getRateLimiter(
Map<String, Object> properties, Instant now, long directorySize) {
Map<String, Object> properties, Instant now, long directorySizeBytes, int directorySizeFiles) {
properties.forEach(config::setFakeConfig);
return new BackupDynamicRateLimiter(
config,
Clock.fixed(now, ZoneId.systemDefault()),
new FakeDirectorySize(directorySize));
new FakeDirectorySize(directorySizeBytes, directorySizeFiles));
}

private void assertNoRateLimiting(Stopwatch timer) {
Expand All @@ -155,14 +156,21 @@ private void assertIllegalArgument(Runnable method) {

private static final class FakeDirectorySize implements DirectorySize {
private final long size;
private final int fileCount;

FakeDirectorySize(long size) {
FakeDirectorySize(long size, int fileCount) {
this.size = size;
this.fileCount = fileCount;
}

@Override
public long getBytes(String location) {
return size;
}

@Override
public int getFiles(String location) {
return fileCount;
}
}
}

0 comments on commit bde4768

Please sign in to comment.