diff --git a/priam-dse-extensions/build.gradle b/priam-dse-extensions/build.gradle deleted file mode 100644 index 6b49ae610..000000000 --- a/priam-dse-extensions/build.gradle +++ /dev/null @@ -1,4 +0,0 @@ -dependencies { - compile project(':priam') - testCompile project(path: ':priam') -} diff --git a/priam-web/build.gradle b/priam-web/build.gradle index 499e29273..65ea89630 100644 --- a/priam-web/build.gradle +++ b/priam-web/build.gradle @@ -10,6 +10,5 @@ apply plugin: 'war' dependencies { compile project(':priam') - compile project(':priam-dse-extensions') compile project(':priam-cass-extensions') } diff --git a/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java b/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java index b1e59e3d5..d0bdfa1fc 100644 --- a/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java +++ b/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java @@ -85,17 +85,17 @@ private Path getV2Location() { Path prefix = Paths.get( getV2Prefix().toString(), - type.toString(), + directives.getType().toString(), getLastModified().toEpochMilli() + ""); - if (type == BackupFileType.SST_V2) { + if (directives.getType() == UploadDownloadDirectives.BackupFileType.SST_V2) { prefix = Paths.get(prefix.toString(), keyspace, columnFamily); } return Paths.get( prefix.toString(), - getCompression().toString(), - getEncryption().toString(), + directives.getCompression().toString(), + directives.getEncryption().toString(), fileName); } @@ -108,23 +108,25 @@ private void parseV2Location(String remoteFile) { "Too few elements (expected: [%d]) in path: %s", 8, remoteFilePath)); int name_count_idx = 3; - type = BackupFileType.valueOf(remoteFilePath.getName(name_count_idx++).toString()); + directives.withType( + UploadDownloadDirectives.BackupFileType.valueOf( + remoteFilePath.getName(name_count_idx++).toString())); setLastModified( Instant.ofEpochMilli( Long.parseLong(remoteFilePath.getName(name_count_idx++).toString()))); - if (type == BackupFileType.SST_V2) { + if (directives.getType() == UploadDownloadDirectives.BackupFileType.SST_V2) { keyspace = remoteFilePath.getName(name_count_idx++).toString(); columnFamily = remoteFilePath.getName(name_count_idx++).toString(); } - setCompression( - ICompression.CompressionAlgorithm.valueOf( - remoteFilePath.getName(name_count_idx++).toString())); - - setEncryption( - IFileCryptography.CryptographyAlgorithm.valueOf( - remoteFilePath.getName(name_count_idx++).toString())); + directives + .withCompression( + ICompression.CompressionAlgorithm.valueOf( + remoteFilePath.getName(name_count_idx++).toString())) + .withEncryption( + IFileCryptography.CryptographyAlgorithm.valueOf( + remoteFilePath.getName(name_count_idx++).toString())); fileName = remoteFilePath.getName(name_count_idx).toString(); } @@ -133,8 +135,8 @@ private Path getV1Location() { Paths.get( getV1Prefix().toString(), DateUtil.formatyyyyMMddHHmm(time), - type.toString()); - if (BackupFileType.isDataFile(type)) + directives.getType().toString()); + if (UploadDownloadDirectives.BackupFileType.isDataFile(directives.getType())) path = Paths.get(path.toString(), keyspace, columnFamily); return Paths.get(path.toString(), fileName); } @@ -147,8 +149,15 @@ private void parseV1Location(Path remoteFilePath) { "Too few elements (expected: [%d]) in path: %s", 7, remoteFilePath)); time = DateUtil.getDate(remoteFilePath.getName(4).toString()); - type = BackupFileType.valueOf(remoteFilePath.getName(5).toString()); - if (BackupFileType.isDataFile(type)) { + /* + We put this as hard-coded value as Backup V1 will always remain snappy compressed to keep backwards compatibility + */ + directives + .withCompression(ICompression.CompressionAlgorithm.SNAPPY) + .withType( + UploadDownloadDirectives.BackupFileType.valueOf( + remoteFilePath.getName(5).toString())); + if (UploadDownloadDirectives.BackupFileType.isDataFile(directives.getType())) { keyspace = remoteFilePath.getName(6).toString(); columnFamily = remoteFilePath.getName(7).toString(); } @@ -179,7 +188,8 @@ private void parseV1Prefix(Path remoteFilePath) { */ @Override public String getRemotePath() { - if (type == BackupFileType.SST_V2 || type == BackupFileType.META_V2) { + if (directives.getType() == UploadDownloadDirectives.BackupFileType.SST_V2 + || directives.getType() == UploadDownloadDirectives.BackupFileType.META_V2) { return getV2Location().toString(); } else { return getV1Location().toString(); @@ -191,14 +201,18 @@ public void parseRemote(String remoteFilePath) { // Check for all backup file types to ensure how we parse // TODO: We should clean this hack to get backupFileType for parsing when we delete V1 of // backups. - for (BackupFileType fileType : BackupFileType.values()) { + directives.withRemotePath(Paths.get(remoteFilePath)); + + for (UploadDownloadDirectives.BackupFileType fileType : + UploadDownloadDirectives.BackupFileType.values()) { if (remoteFilePath.contains(PATH_SEP + fileType.toString() + PATH_SEP)) { - type = fileType; + directives.withType(fileType); break; } } - if (type == BackupFileType.SST_V2 || type == BackupFileType.META_V2) { + if (directives.getType() == UploadDownloadDirectives.BackupFileType.SST_V2 + || directives.getType() == UploadDownloadDirectives.BackupFileType.META_V2) { parseV2Location(remoteFilePath); } else { parseV1Location(Paths.get(remoteFilePath)); @@ -221,7 +235,7 @@ public String remotePrefix(Date start, Date end, String location) { } @Override - public Path remoteV2Prefix(Path location, BackupFileType fileType) { + public Path remoteV2Prefix(Path location, UploadDownloadDirectives.BackupFileType fileType) { if (location.getNameCount() <= 1) { baseDir = config.getBackupLocation(); clusterName = config.getAppName(); diff --git a/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java b/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java index 60dc33f57..54e7b9c1c 100755 --- a/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java @@ -26,7 +26,7 @@ import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.BackupRestoreException; import com.netflix.priam.backup.RangeReadInputStream; -import com.netflix.priam.compress.ICompression; +import com.netflix.priam.compress.ChunkedStream; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.cred.ICredential; import com.netflix.priam.cryptography.IFileCryptography; @@ -34,7 +34,6 @@ import com.netflix.priam.merics.BackupMetrics; import com.netflix.priam.notification.BackupNotificationMgr; import java.io.*; -import java.nio.file.Path; import java.util.Iterator; import java.util.List; import org.apache.commons.io.IOUtils; @@ -51,7 +50,6 @@ public class S3EncryptedFileSystem extends S3FileSystemBase { @Inject public S3EncryptedFileSystem( Provider pathProvider, - ICompression compress, final IConfiguration config, ICredential cred, @Named("filecryptoalgorithm") IFileCryptography fileCryptography, @@ -59,7 +57,7 @@ public S3EncryptedFileSystem( BackupNotificationMgr backupNotificationMgr, InstanceInfo instanceInfo) { - super(pathProvider, compress, config, backupMetrics, backupNotificationMgr); + super(pathProvider, config, backupMetrics, backupNotificationMgr); this.encryptor = fileCryptography; super.s3Client = AmazonS3Client.builder() @@ -69,14 +67,15 @@ public S3EncryptedFileSystem( } @Override - protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRestoreException { - try (OutputStream os = new FileOutputStream(localPath.toFile()); + protected void downloadFileImpl(final AbstractBackupPath.UploadDownloadDirectives directives) + throws BackupRestoreException { + try (OutputStream os = new FileOutputStream(directives.getLocalPath().toFile()); RangeReadInputStream rris = new RangeReadInputStream( s3Client, getShard(), - super.getFileSize(remotePath), - remotePath.toString())) { + super.getFileSize(directives.getRemotePath()), + directives.getRemotePath().toString())) { /* * To handle use cases where decompression should be done outside of the download. For example, the file have been compressed and then encrypted. * Hence, decompressing it here would compromise the decryption. @@ -85,7 +84,7 @@ protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRe } catch (Exception e) { throw new BackupRestoreException( "Exception encountered downloading " - + remotePath + + directives.getRemotePath() + " from S3 bucket " + getShard() + ", Msg: " @@ -95,33 +94,36 @@ protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRe } @Override - protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRestoreException { - long chunkSize = getChunkSize(localPath); + protected long uploadFileImpl(AbstractBackupPath.UploadDownloadDirectives directives) + throws BackupRestoreException { + long chunkSize = getChunkSize(directives.getLocalPath()); // initialize chunking request to aws InitiateMultipartUploadRequest initRequest = - new InitiateMultipartUploadRequest(config.getBackupPrefix(), remotePath.toString()); + new InitiateMultipartUploadRequest( + config.getBackupPrefix(), directives.getRemotePath().toString()); // Fetch the aws generated upload id for this chunking request InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); DataPart part = new DataPart( config.getBackupPrefix(), - remotePath.toString(), + directives.getRemotePath().toString(), initResponse.getUploadId()); // Metadata on number of parts to be uploaded List partETags = Lists.newArrayList(); // Read chunks from src, compress it, and write to temp file - File compressedDstFile = new File(localPath.toString() + ".compressed"); + File compressedDstFile = new File(directives.getLocalPath().toString() + ".compressed"); if (logger.isDebugEnabled()) logger.debug( "Compressing {} with chunk size {}", compressedDstFile.getAbsolutePath(), chunkSize); - try (InputStream in = new FileInputStream(localPath.toFile()); + try (InputStream in = new FileInputStream(directives.getLocalPath().toFile()); BufferedOutputStream compressedBos = new BufferedOutputStream(new FileOutputStream(compressedDstFile))) { - Iterator compressedChunks = this.compress.compress(in, chunkSize); + Iterator compressedChunks = + new ChunkedStream(directives.getCompression(), in, chunkSize); while (compressedChunks.hasNext()) { byte[] compressedChunk = compressedChunks.next(); compressedBos.write(compressedChunk); @@ -138,7 +140,8 @@ protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRest try (BufferedInputStream compressedBis = new BufferedInputStream(new FileInputStream(compressedDstFile))) { Iterator chunks = - this.encryptor.encryptStream(compressedBis, remotePath.toString()); + this.encryptor.encryptStream( + compressedBis, directives.getRemotePath().toString()); // identifies this part position in the object we are uploading int partNum = 0; @@ -154,7 +157,7 @@ protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRest ++partNum, chunk, config.getBackupPrefix(), - remotePath.toString(), + directives.getRemotePath().toString(), initResponse.getUploadId()); S3PartUploader partUploader = new S3PartUploader(s3Client, dp, partETags); encryptedFileSize += chunk.length; @@ -175,11 +178,12 @@ protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRest // identifies the combined object datav CompleteMultipartUploadResult resultS3MultiPartUploadComplete = new S3PartUploader(s3Client, part, partETags).completeUpload(); - checkSuccessfulUpload(resultS3MultiPartUploadComplete, localPath); + checkSuccessfulUpload(resultS3MultiPartUploadComplete, directives.getLocalPath()); return encryptedFileSize; } catch (Exception e) { new S3PartUploader(s3Client, part, partETags).abortUpload(); - throw new BackupRestoreException("Error uploading file: " + localPath, e); + throw new BackupRestoreException( + "Error uploading file: " + directives.getLocalPath(), e); } finally { if (compressedDstFile.exists()) compressedDstFile.delete(); } diff --git a/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java b/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java index a51794d61..0400f1084 100644 --- a/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java +++ b/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java @@ -27,7 +27,8 @@ import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.BackupRestoreException; import com.netflix.priam.backup.RangeReadInputStream; -import com.netflix.priam.compress.ICompression; +import com.netflix.priam.compress.ChunkedStream; +import com.netflix.priam.compress.Decompressor; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.identity.config.InstanceInfo; import com.netflix.priam.merics.BackupMetrics; @@ -53,12 +54,11 @@ public class S3FileSystem extends S3FileSystemBase { public S3FileSystem( @Named("awss3roleassumption") IS3Credential cred, Provider pathProvider, - ICompression compress, final IConfiguration config, BackupMetrics backupMetrics, BackupNotificationMgr backupNotificationMgr, InstanceInfo instanceInfo) { - super(pathProvider, compress, config, backupMetrics, backupNotificationMgr); + super(pathProvider, config, backupMetrics, backupNotificationMgr); s3Client = AmazonS3Client.builder() .withCredentials(cred.getAwsCredentialProvider()) @@ -67,23 +67,29 @@ public S3FileSystem( } @Override - protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRestoreException { + protected void downloadFileImpl(final AbstractBackupPath.UploadDownloadDirectives directives) + throws BackupRestoreException { try { - long remoteFileSize = super.getFileSize(remotePath); + long remoteFileSize = super.getFileSize(directives.getRemotePath()); RangeReadInputStream rris = new RangeReadInputStream( - s3Client, getShard(), remoteFileSize, remotePath.toString()); + s3Client, + getShard(), + remoteFileSize, + directives.getRemotePath().toString()); final long bufSize = MAX_BUFFERED_IN_STREAM_SIZE > remoteFileSize ? remoteFileSize : MAX_BUFFERED_IN_STREAM_SIZE; - compress.decompressAndClose( + Decompressor.decompress( + directives.getCompression(), new BufferedInputStream(rris, (int) bufSize), - new BufferedOutputStream(new FileOutputStream(localPath.toFile()))); + new BufferedOutputStream( + new FileOutputStream(directives.getLocalPath().toFile()))); } catch (Exception e) { throw new BackupRestoreException( "Exception encountered downloading " - + remotePath + + directives.getRemotePath() + " from S3 bucket " + getShard() + ", Msg: " @@ -107,27 +113,29 @@ private ObjectMetadata getObjectMetadata(Path path) { return ret; } - private long uploadMultipart(Path localPath, Path remotePath) throws BackupRestoreException { - long chunkSize = getChunkSize(localPath); + private long uploadMultipart(AbstractBackupPath.UploadDownloadDirectives directives) + throws BackupRestoreException { + long chunkSize = getChunkSize(directives.getLocalPath()); if (logger.isDebugEnabled()) logger.debug( "Uploading to {}/{} with chunk size {}", config.getBackupPrefix(), - remotePath, + directives.getRemotePath(), chunkSize); InitiateMultipartUploadRequest initRequest = - new InitiateMultipartUploadRequest(config.getBackupPrefix(), remotePath.toString()); - initRequest.withObjectMetadata(getObjectMetadata(localPath)); + new InitiateMultipartUploadRequest( + config.getBackupPrefix(), directives.getRemotePath().toString()); + initRequest.withObjectMetadata(getObjectMetadata(directives.getLocalPath())); InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest); DataPart part = new DataPart( config.getBackupPrefix(), - remotePath.toString(), + directives.getRemotePath().toString(), initResponse.getUploadId()); List partETags = Collections.synchronizedList(new ArrayList()); - try (InputStream in = new FileInputStream(localPath.toFile())) { - Iterator chunks = compress.compress(in, chunkSize); + try (InputStream in = new FileInputStream(directives.getLocalPath().toFile())) { + Iterator chunks = new ChunkedStream(directives.getCompression(), in, chunkSize); // Upload parts. int partNum = 0; AtomicInteger partsUploaded = new AtomicInteger(0); @@ -141,7 +149,7 @@ private long uploadMultipart(Path localPath, Path remotePath) throws BackupResto ++partNum, chunk, config.getBackupPrefix(), - remotePath.toString(), + directives.getRemotePath().toString(), initResponse.getUploadId()); S3PartUploader partUploader = new S3PartUploader(s3Client, dp, partETags, partsUploaded); @@ -155,7 +163,7 @@ private long uploadMultipart(Path localPath, Path remotePath) throws BackupResto executor.sleepTillEmpty(); logger.info( "All chunks uploaded for file {}, num of expected parts:{}, num of actual uploaded parts: {}", - localPath.toFile().getName(), + directives.getLocalPath().toFile().getName(), partNum, partsUploaded.get()); @@ -169,7 +177,7 @@ private long uploadMultipart(Path localPath, Path remotePath) throws BackupResto CompleteMultipartUploadResult resultS3MultiPartUploadComplete = new S3PartUploader(s3Client, part, partETags).completeUpload(); - checkSuccessfulUpload(resultS3MultiPartUploadComplete, localPath); + checkSuccessfulUpload(resultS3MultiPartUploadComplete, directives.getLocalPath()); if (logger.isDebugEnabled()) { final S3ResponseMetadata responseMetadata = @@ -188,13 +196,15 @@ private long uploadMultipart(Path localPath, Path remotePath) throws BackupResto return compressedFileSize; } catch (Exception e) { new S3PartUploader(s3Client, part, partETags).abortUpload(); - throw new BackupRestoreException("Error uploading file: " + localPath.toString(), e); + throw new BackupRestoreException( + "Error uploading file: " + directives.getLocalPath().toString(), e); } } - protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRestoreException { + protected long uploadFileImpl(AbstractBackupPath.UploadDownloadDirectives directives) + throws BackupRestoreException { long chunkSize = config.getBackupChunkSize(); - long fileSize = localPath.toFile().length(); + long fileSize = directives.getSize(); if (fileSize < chunkSize) { // Upload file without using multipart upload as it will be more efficient. @@ -202,24 +212,33 @@ protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRest logger.debug( "Uploading to {}/{} using PUT operation", config.getBackupPrefix(), - remotePath); + directives.getRemotePath()); try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); InputStream in = - new BufferedInputStream(new FileInputStream(localPath.toFile()))) { - Iterator chunkedStream = compress.compress(in, chunkSize); + new BufferedInputStream( + new FileInputStream(directives.getLocalPath().toFile()))) { + Iterator chunkedStream = + new ChunkedStream(directives.getCompression(), in, chunkSize); while (chunkedStream.hasNext()) { byteArrayOutputStream.write(chunkedStream.next()); } byte[] chunk = byteArrayOutputStream.toByteArray(); long compressedFileSize = chunk.length; - rateLimiter.acquire(chunk.length); - ObjectMetadata objectMetadata = getObjectMetadata(localPath); + + /** + * Weird, right that we are checking for length which is positive. You can thanks + * this to sometimes C* creating files which are zero bytes, and giving that in + * snapshot for some unknown reason. + */ + if (chunk.length > 0) rateLimiter.acquire(chunk.length); + + ObjectMetadata objectMetadata = getObjectMetadata(directives.getLocalPath()); objectMetadata.setContentLength(chunk.length); PutObjectRequest putObjectRequest = new PutObjectRequest( config.getBackupPrefix(), - remotePath.toString(), + directives.getRemotePath().toString(), new ByteArrayInputStream(chunk), objectMetadata); // Retry if failed. @@ -234,14 +253,14 @@ public PutObjectResult retriableCall() throws Exception { if (logger.isDebugEnabled()) logger.debug( "Successfully uploaded file with putObject: {} and etag: {}", - remotePath, + directives.getRemotePath(), upload.getETag()); return compressedFileSize; } catch (Exception e) { throw new BackupRestoreException( - "Error uploading file: " + localPath.toFile().getName(), e); + "Error uploading file: " + directives.getLocalPath().toFile().getName(), e); } - } else return uploadMultipart(localPath, remotePath); + } else return uploadMultipart(directives); } } diff --git a/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java b/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java index 302c8cda1..3cc61b3a3 100755 --- a/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java +++ b/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java @@ -30,7 +30,6 @@ import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.AbstractFileSystem; import com.netflix.priam.backup.BackupRestoreException; -import com.netflix.priam.compress.ICompression; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.merics.BackupMetrics; import com.netflix.priam.notification.BackupNotificationMgr; @@ -50,19 +49,16 @@ public abstract class S3FileSystemBase extends AbstractFileSystem { private static final Logger logger = LoggerFactory.getLogger(S3FileSystemBase.class); AmazonS3 s3Client; final IConfiguration config; - final ICompression compress; final BlockingSubmitThreadPoolExecutor executor; final RateLimiter rateLimiter; private final RateLimiter objectExistLimiter; S3FileSystemBase( Provider pathProvider, - ICompression compress, final IConfiguration config, BackupMetrics backupMetrics, BackupNotificationMgr backupNotificationMgr) { super(config, backupMetrics, backupNotificationMgr, pathProvider); - this.compress = compress; this.config = config; int threads = config.getBackupThreads(); diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java index fb0f4ca54..e7f415664 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java @@ -19,7 +19,8 @@ import com.google.common.collect.Lists; import com.google.inject.Inject; import com.google.inject.Provider; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; +import com.netflix.priam.compress.ICompression; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.scheduler.Task; import com.netflix.priam.utils.SystemUtils; @@ -92,22 +93,16 @@ protected List upload( for (File file : files) { if (file.isFile() && file.exists()) { AbstractBackupPath bp = getAbstractBackupPath(file, type); + bp.getDirectives() + .withRetry(10) + .withDeleteAfterSuccessfulUpload(true) + .withRemotePath(Paths.get(bp.getRemotePath())); - if (async) - futures.add( - fs.asyncUploadFile( - Paths.get(bp.getBackupFile().getAbsolutePath()), - Paths.get(bp.getRemotePath()), - bp, - 10, - true)); - else - fs.uploadFile( - Paths.get(bp.getBackupFile().getAbsolutePath()), - Paths.get(bp.getRemotePath()), - bp, - 10, - true); + if (type == BackupFileType.SST_V2 || type == BackupFileType.META_V2) + bp.getDirectives().withCompression(ICompression.CompressionAlgorithm.LZ4); + + if (async) futures.add(fs.asyncUploadFile(bp)); + else fs.uploadFile(bp); bps.add(bp); } diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java index c3dd2afe2..3510d39d1 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java @@ -37,22 +37,6 @@ public abstract class AbstractBackupPath implements Comparable objectCache; + private final Cache objectCache; @Inject public AbstractFileSystem( @@ -80,7 +82,10 @@ public AbstractFileSystem( // Add notifications. this.addObserver(backupNotificationMgr); this.objectCache = - CacheBuilder.newBuilder().maximumSize(configuration.getBackupQueueSize()).build(); + CacheBuilder.newBuilder() + .maximumSize(configuration.getBackupQueueSize()) + .expireAfterAccess(configuration.getBackupCacheTTLInDays(), TimeUnit.DAYS) + .build(); tasksQueued = new ConcurrentHashMap<>().newKeySet(); /* Note: We are using different queue for upload and download as with Backup V2.0 we might download all the meta @@ -111,147 +116,159 @@ Also, we may want to have different TIMEOUT for each kind of operation (upload/d } @Override - public Future asyncDownloadFile( - final Path remotePath, final Path localPath, final int retry) + public Future asyncDownloadFile(final AbstractBackupPath path) throws BackupRestoreException, RejectedExecutionException { return fileDownloadExecutor.submit( () -> { - downloadFile(remotePath, localPath, retry); - return remotePath; + downloadFile(path); + return path.getDirectives().getRemotePath(); }); } @Override - public void downloadFile(final Path remotePath, final Path localPath, final int retry) - throws BackupRestoreException { + public void downloadFile(final AbstractBackupPath path) throws BackupRestoreException { // TODO: Should we download the file if localPath already exists? - if (remotePath == null || localPath == null) return; - localPath.toFile().getParentFile().mkdirs(); - logger.info("Downloading file: {} to location: {}", remotePath, localPath); + AbstractBackupPath.UploadDownloadDirectives directives = path.getDirectives(); + if (directives.getRemotePath() == null || directives.getLocalPath() == null) return; + directives.getLocalPath().toFile().getParentFile().mkdirs(); + logger.info( + "Downloading file: {} to location: {}", + directives.getRemotePath(), + directives.getLocalPath()); try { - new BoundedExponentialRetryCallable(500, 10000, retry) { + new BoundedExponentialRetryCallable(500, 10000, directives.getRetry()) { @Override public Void retriableCall() throws Exception { - downloadFileImpl(remotePath, localPath); + downloadFileImpl(directives); return null; } }.call(); // Note we only downloaded the bytes which are represented on file system (they are // compressed and maybe encrypted). // File size after decompression or decryption might be more/less. - backupMetrics.recordDownloadRate(getFileSize(remotePath)); + backupMetrics.recordDownloadRate(getFileSize(directives.getRemotePath())); backupMetrics.incrementValidDownloads(); - logger.info("Successfully downloaded file: {} to location: {}", remotePath, localPath); + logger.info( + "Successfully downloaded file: {} to location: {}", + directives.getRemotePath(), + directives.getLocalPath()); } catch (Exception e) { backupMetrics.incrementInvalidDownloads(); - logger.error("Error while downloading file: {} to location: {}", remotePath, localPath); + logger.error( + "Error while downloading file: {} to location: {}", + directives.getRemotePath(), + directives.getLocalPath()); throw new BackupRestoreException(e.getMessage()); } } - protected abstract void downloadFileImpl(final Path remotePath, final Path localPath) + protected abstract void downloadFileImpl( + final AbstractBackupPath.UploadDownloadDirectives directives) throws BackupRestoreException; @Override - public Future asyncUploadFile( - final Path localPath, - final Path remotePath, - final AbstractBackupPath path, - final int retry, - final boolean deleteAfterSuccessfulUpload) + public Future asyncUploadFile(final AbstractBackupPath path) throws FileNotFoundException, RejectedExecutionException, BackupRestoreException { return fileUploadExecutor.submit( () -> { - uploadFile(localPath, remotePath, path, retry, deleteAfterSuccessfulUpload); - return localPath; + uploadFile(path); + return path.getDirectives().getLocalPath(); }); } @Override - public void uploadFile( - final Path localPath, - final Path remotePath, - final AbstractBackupPath path, - final int retry, - final boolean deleteAfterSuccessfulUpload) + public void uploadFile(final AbstractBackupPath path) throws FileNotFoundException, BackupRestoreException { - if (localPath == null - || remotePath == null - || !localPath.toFile().exists() - || localPath.toFile().isDirectory()) + AbstractBackupPath.UploadDownloadDirectives directives = path.getDirectives(); + if (directives == null + || directives.getLocalPath() == null + || directives.getRemotePath() == null + || !directives.getLocalPath().toFile().exists() + || directives.getLocalPath().toFile().isDirectory()) throw new FileNotFoundException( "File do not exist or is a directory. localPath: " - + localPath + + directives.getLocalPath() + ", remotePath: " - + remotePath); + + directives.getRemotePath()); - if (tasksQueued.add(localPath)) { - logger.info("Uploading file: {} to location: {}", localPath, remotePath); + if (tasksQueued.add(directives.getLocalPath())) { + logger.info( + "Uploading file: {} to location: {}", + directives.getLocalPath(), + directives.getRemotePath()); try { long uploadedFileSize; // Upload file if it not present at remote location. - if (path.getType() != BackupFileType.SST_V2 || !checkObjectExists(remotePath)) { + if (path.getDirectives().getType() != BackupFileType.SST_V2 + || !checkObjectExists(directives.getRemotePath())) { notifyEventStart(new BackupEvent(path)); uploadedFileSize = - new BoundedExponentialRetryCallable(500, 10000, retry) { + new BoundedExponentialRetryCallable( + 500, 10000, directives.getRetry()) { @Override public Long retriableCall() throws Exception { - return uploadFileImpl(localPath, remotePath); + return uploadFileImpl(directives); } }.call(); // Add to cache after successful upload. // We only add SST_V2 as other file types are usually not checked, so no point // evicting our SST_V2 results. - if (path.getType() == BackupFileType.SST_V2) addObjectCache(remotePath); + if (path.getDirectives().getType() == BackupFileType.SST_V2) + addObjectCache(directives.getRemotePath()); backupMetrics.recordUploadRate(uploadedFileSize); backupMetrics.incrementValidUploads(); - path.setCompressedFileSize(uploadedFileSize); + path.getDirectives().setCompressedFileSize(uploadedFileSize); notifyEventSuccess(new BackupEvent(path)); } else { // file is already uploaded to remote file system. - logger.info("File: {} already present on remoteFileSystem.", remotePath); + logger.info( + "File: {} already present on remoteFileSystem.", + directives.getRemotePath()); } logger.info( - "Successfully uploaded file: {} to location: {}", localPath, remotePath); + "Successfully uploaded file: {} to location: {}", + directives.getLocalPath(), + directives.getRemotePath()); - if (deleteAfterSuccessfulUpload && !FileUtils.deleteQuietly(localPath.toFile())) + if (directives.isDeleteAfterSuccessfulUpload() + && !FileUtils.deleteQuietly(directives.getLocalPath().toFile())) logger.warn( String.format( "Failed to delete local file %s.", - localPath.toFile().getAbsolutePath())); + directives.getLocalPath().toFile().getAbsolutePath())); } catch (Exception e) { backupMetrics.incrementInvalidUploads(); notifyEventFailure(new BackupEvent(path)); logger.error( "Error while uploading file: {} to location: {}. Exception: Msg: [{}], Trace: {}", - localPath, - remotePath, + directives.getLocalPath(), + directives.getRemotePath(), e.getMessage(), e.getStackTrace()); throw new BackupRestoreException(e.getMessage()); } finally { // Remove the task from the list so if we try to upload file ever again, we can. - tasksQueued.remove(localPath); + tasksQueued.remove(directives.getLocalPath()); } - } else logger.info("Already in queue, no-op. File: {}", localPath); + } else logger.info("Already in queue, no-op. File: {}", directives.getLocalPath()); } private void addObjectCache(Path remotePath) { - objectCache.put(remotePath, Boolean.TRUE); + objectCache.put(remotePath, DateUtil.getInstant()); } @Override public boolean checkObjectExists(Path remotePath) { // Check in cache, if remote file exists. - Boolean cacheResult = objectCache.getIfPresent(remotePath); + Instant lastUpdatedTimestamp = objectCache.getIfPresent(remotePath); // Cache hit. Return the value. - if (cacheResult != null) return cacheResult; + if (lastUpdatedTimestamp != null) return true; // Cache miss - Check remote file system if object exist. boolean remoteFileExist = doesRemoteFileExist(remotePath); @@ -278,7 +295,8 @@ public void deleteRemoteFiles(List remotePaths) throws BackupRestoreExcept protected abstract boolean doesRemoteFileExist(Path remotePath); - protected abstract long uploadFileImpl(final Path localPath, final Path remotePath) + protected abstract long uploadFileImpl( + final AbstractBackupPath.UploadDownloadDirectives directives) throws BackupRestoreException; @Override diff --git a/priam/src/main/java/com/netflix/priam/backup/CommitLogBackup.java b/priam/src/main/java/com/netflix/priam/backup/CommitLogBackup.java index c667d3a54..621ae2d25 100644 --- a/priam/src/main/java/com/netflix/priam/backup/CommitLogBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/CommitLogBackup.java @@ -17,7 +17,7 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.name.Named; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.utils.DateUtil; import java.io.File; import java.nio.file.Paths; @@ -65,13 +65,12 @@ public List upload(String archivedDir, final String snapshot bp.parseLocal(file, BackupFileType.CL); if (snapshotName != null) bp.time = DateUtil.getDate(snapshotName); + bp.getDirectives() + .withRetry(10) + .withDeleteAfterSuccessfulUpload(true) + .withRemotePath(Paths.get(bp.getRemotePath())); - fs.uploadFile( - Paths.get(bp.getBackupFile().getAbsolutePath()), - Paths.get(bp.getRemotePath()), - bp, - 10, - true); + fs.uploadFile(bp); bps.add(bp); addToRemotePath(bp.getRemotePath()); } catch (Exception e) { diff --git a/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java index ef8935a05..9ca4ad680 100644 --- a/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java @@ -29,30 +29,24 @@ public interface IBackupFileSystem { /** * Download the file denoted by remotePath to the local file system denoted by local path. * - * @param remotePath fully qualified location of the file on remote file system. - * @param localPath location on the local file sytem where remote file should be downloaded. - * @param retry No. of times to retry to download a file from remote file system. If <1, it - * will try to download file exactly once. + * @param path AbstractBackupPath encapsulates the location and directives for upload/download. * @throws BackupRestoreException if file is not available, downloadable or any other error from * remote file system. */ - void downloadFile(Path remotePath, Path localPath, int retry) throws BackupRestoreException; + void downloadFile(final AbstractBackupPath path) throws BackupRestoreException; /** * Download the file denoted by remotePath in an async fashion to the local file system denoted * by local path. * - * @param remotePath fully qualified location of the file on remote file system. - * @param localPath location on the local file sytem where remote file should be downloaded. - * @param retry No. of times to retry to download a file from remote file system. If <1, it - * will try to download file exactly once. + * @param path AbstractBackupPath encapsulates the location and directives for upload/download. * @return The future of the async job to monitor the progress of the job. * @throws BackupRestoreException if file is not available, downloadable or any other error from * remote file system. * @throws RejectedExecutionException if the queue is full and TIMEOUT is reached while trying * to add the work to the queue. */ - Future asyncDownloadFile(final Path remotePath, final Path localPath, final int retry) + Future asyncDownloadFile(final AbstractBackupPath path) throws BackupRestoreException, RejectedExecutionException; /** @@ -63,41 +57,20 @@ Future asyncDownloadFile(final Path remotePath, final Path localPath, fina * internal queue. Note that de-duping is best effort and is not always guaranteed as we try to * avoid lock on read/write of the files-in-progress. * - * @param localPath Path of the local file that needs to be uploaded. - * @param remotePath Fully qualified path on the remote file system where file should be - * uploaded. - * @param path AbstractBackupPath to be used to send backup notifications only. - * @param retry No of times to retry to upload a file. If <1, it will try to upload file - * exactly once. - * @param deleteAfterSuccessfulUpload If true, delete the file denoted by localPath after it is - * successfully uploaded to the filesystem. If there is any failure, file will not be - * deleted. + * @param path AbstractBackupPath encapsulates the location and directives for upload/download. * @throws BackupRestoreException in case of failure to upload for any reason including file not * readable or remote file system errors. * @throws FileNotFoundException If a file as denoted by localPath is not available or is a * directory. */ - void uploadFile( - Path localPath, - Path remotePath, - AbstractBackupPath path, - int retry, - boolean deleteAfterSuccessfulUpload) + void uploadFile(final AbstractBackupPath path) throws FileNotFoundException, BackupRestoreException; /** * Upload the local file denoted by localPath in async fashion to the remote file system at * location denoted by remotePath. * - * @param localPath Path of the local file that needs to be uploaded. - * @param remotePath Fully qualified path on the remote file system where file should be - * uploaded. - * @param path AbstractBackupPath to be used to send backup notifications only. - * @param retry No of times to retry to upload a file. If <1, it will try to upload file - * exactly once. - * @param deleteAfterSuccessfulUpload If true, delete the file denoted by localPath after it is - * successfully uploaded to the filesystem. If there is any failure, file will not be - * deleted. + * @param path AbstractBackupPath encapsulates the location and directives for upload/download. * @return The future of the async job to monitor the progress of the job. This will be null if * file was de-duped for upload. * @throws BackupRestoreException in case of failure to upload for any reason including file not @@ -107,12 +80,7 @@ void uploadFile( * @throws RejectedExecutionException if the queue is full and TIMEOUT is reached while trying * to add the work to the queue. */ - Future asyncUploadFile( - final Path localPath, - final Path remotePath, - final AbstractBackupPath path, - final int retry, - final boolean deleteAfterSuccessfulUpload) + Future asyncUploadFile(final AbstractBackupPath path) throws FileNotFoundException, RejectedExecutionException, BackupRestoreException; /** diff --git a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java index ef8d9740a..9df31a01f 100644 --- a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java @@ -19,7 +19,7 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.backupv2.SnapshotMetaTask; import com.netflix.priam.config.IBackupRestoreConfig; import com.netflix.priam.config.IConfiguration; diff --git a/priam/src/main/java/com/netflix/priam/backup/MetaData.java b/priam/src/main/java/com/netflix/priam/backup/MetaData.java index be7a0bdab..ba0b70ab8 100644 --- a/priam/src/main/java/com/netflix/priam/backup/MetaData.java +++ b/priam/src/main/java/com/netflix/priam/backup/MetaData.java @@ -18,7 +18,7 @@ import com.google.inject.Inject; import com.google.inject.Provider; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.utils.DateUtil; import java.io.File; @@ -62,12 +62,12 @@ public AbstractBackupPath set(List bps, String snapshotName) fr.write(jsonObj.toJSONString()); } AbstractBackupPath backupfile = decorateMetaJson(metafile, snapshotName); - fs.uploadFile( - Paths.get(backupfile.getBackupFile().getAbsolutePath()), - Paths.get(backupfile.getRemotePath()), - backupfile, - 10, - true); + backupfile + .getDirectives() + .withRetry(10) + .withDeleteAfterSuccessfulUpload(true) + .withRemotePath(Paths.get(backupfile.getRemotePath())); + fs.uploadFile(backupfile); addToRemotePath(backupfile.getRemotePath()); return backupfile; } @@ -92,10 +92,8 @@ public AbstractBackupPath decorateMetaJson(File metafile, String snapshotName) */ public Boolean doesExist(final AbstractBackupPath meta) { try { - fs.downloadFile( - Paths.get(meta.getRemotePath()), - Paths.get(meta.newRestoreFile().getAbsolutePath()), - 5); // download actual file to disk + meta.getDirectives().withRetry(5).withLocalPath(meta.newRestoreFile().toPath()); + fs.downloadFile(meta); // download actual file to disk } catch (Exception e) { logger.error("Error downloading the Meta data try with a different date...", e); } diff --git a/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java b/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java index e9c0d6508..cb7e03f04 100644 --- a/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java @@ -20,7 +20,7 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.backupv2.ForgottenFilesManager; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.connection.CassandraOperations; diff --git a/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java b/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java index 6f6e36419..cdce55ba9 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java @@ -21,6 +21,7 @@ import com.google.inject.Provider; import com.google.inject.Singleton; import com.netflix.priam.backup.AbstractBackupPath; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.backup.BackupRestoreException; import com.netflix.priam.backup.IBackupFileSystem; import com.netflix.priam.backup.IFileSystemContext; @@ -220,9 +221,7 @@ private void deleteFile(AbstractBackupPath path, boolean forceClear) private String getSSTPrefix() { Path location = fileSystem.getPrefix(); AbstractBackupPath abstractBackupPath = abstractBackupPathProvider.get(); - return abstractBackupPath - .remoteV2Prefix(location, AbstractBackupPath.BackupFileType.SST_V2) - .toString(); + return abstractBackupPath.remoteV2Prefix(location, BackupFileType.SST_V2).toString(); } @Override diff --git a/priam/src/main/java/com/netflix/priam/backupv2/MetaFileWriterBuilder.java b/priam/src/main/java/com/netflix/priam/backupv2/MetaFileWriterBuilder.java index a1a4fb878..451292498 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/MetaFileWriterBuilder.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/MetaFileWriterBuilder.java @@ -19,6 +19,7 @@ import com.google.gson.stream.JsonWriter; import com.google.inject.Provider; import com.netflix.priam.backup.AbstractBackupPath; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.backup.IBackupFileSystem; import com.netflix.priam.backup.IFileSystemContext; import com.netflix.priam.config.IConfiguration; @@ -188,14 +189,15 @@ public MetaFileWriterBuilder.UploadStep endMetaFileGeneration() throws IOExcepti */ public void uploadMetaFile(boolean deleteOnSuccess) throws Exception { AbstractBackupPath abstractBackupPath = pathFactory.get(); - abstractBackupPath.parseLocal( - metaFilePath.toFile(), AbstractBackupPath.BackupFileType.META_V2); - backupFileSystem.uploadFile( - metaFilePath, - Paths.get(getRemoteMetaFilePath()), - abstractBackupPath, - 10, - deleteOnSuccess); + abstractBackupPath.parseLocal(metaFilePath.toFile(), BackupFileType.META_V2); + + abstractBackupPath + .getDirectives() + .withRetry(10) + .withDeleteAfterSuccessfulUpload(deleteOnSuccess) + .withRemotePath(Paths.get(abstractBackupPath.getRemotePath())); + + backupFileSystem.uploadFile(abstractBackupPath); } public Path getMetaFilePath() { @@ -204,8 +206,7 @@ public Path getMetaFilePath() { public String getRemoteMetaFilePath() throws Exception { AbstractBackupPath abstractBackupPath = pathFactory.get(); - abstractBackupPath.parseLocal( - metaFilePath.toFile(), AbstractBackupPath.BackupFileType.META_V2); + abstractBackupPath.parseLocal(metaFilePath.toFile(), BackupFileType.META_V2); return abstractBackupPath.getRemotePath(); } } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/MetaV1Proxy.java b/priam/src/main/java/com/netflix/priam/backupv2/MetaV1Proxy.java index cfce6ae81..d6660ecd8 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/MetaV1Proxy.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/MetaV1Proxy.java @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import com.google.inject.Inject; import com.netflix.priam.backup.*; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.utils.DateUtil; import java.io.FileReader; @@ -67,7 +68,7 @@ public List findMetaFiles(DateUtil.DateRange dateRange) { while (backupfiles.hasNext()) { AbstractBackupPath path = backupfiles.next(); - if (path.getType() == AbstractBackupPath.BackupFileType.META) + if (path.getDirectives().getType() == BackupFileType.META) // Since there are now meta file for incrementals as well as snapshot, we need to // find the correct one (i.e. the snapshot meta file (meta.json)) if (path.getFileName().equalsIgnoreCase("meta.json")) { @@ -118,7 +119,7 @@ public BackupVerificationResult isMetaFileValid(AbstractBackupPath metaBackupPat List remoteListing = new ArrayList<>(); while (backupfiles.hasNext()) { AbstractBackupPath path = backupfiles.next(); - if (path.getType() == AbstractBackupPath.BackupFileType.SNAP) + if (path.getDirectives().getType() == BackupFileType.SNAP) remoteListing.add(path.getRemotePath()); } @@ -150,7 +151,8 @@ public BackupVerificationResult isMetaFileValid(AbstractBackupPath metaBackupPat @Override public Path downloadMetaFile(AbstractBackupPath meta) throws BackupRestoreException { Path localFile = Paths.get(meta.newRestoreFile().getAbsolutePath() + ".download"); - fs.downloadFile(Paths.get(meta.getRemotePath()), localFile, 10); + meta.getDirectives().withLocalPath(localFile).withRetry(10); + fs.downloadFile(meta); return localFile; } @@ -181,7 +183,7 @@ public Iterator getIncrementals(DateUtil.DateRange dateRange return new FilterIterator<>( iterator, abstractBackupPath -> - abstractBackupPath.getType() == AbstractBackupPath.BackupFileType.SST); + abstractBackupPath.getDirectives().getType() == BackupFileType.SST); } @Override diff --git a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java index 5dafd2472..32073912a 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java @@ -19,6 +19,7 @@ import com.google.inject.Provider; import com.netflix.priam.backup.*; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.utils.DateUtil; import java.io.File; @@ -61,11 +62,10 @@ public Path getLocalMetaFileDirectory() { @Override public String getMetaPrefix(DateUtil.DateRange dateRange) { - return getMatch(dateRange, AbstractBackupPath.BackupFileType.META_V2); + return getMatch(dateRange, BackupFileType.META_V2); } - private String getMatch( - DateUtil.DateRange dateRange, AbstractBackupPath.BackupFileType backupFileType) { + private String getMatch(DateUtil.DateRange dateRange, BackupFileType backupFileType) { Path location = fs.getPrefix(); AbstractBackupPath abstractBackupPath = abstractBackupPathProvider.get(); String match = StringUtils.EMPTY; @@ -81,11 +81,11 @@ private String getMatch( @Override public Iterator getIncrementals(DateUtil.DateRange dateRange) throws BackupRestoreException { - String incrementalPrefix = getMatch(dateRange, AbstractBackupPath.BackupFileType.SST_V2); + String incrementalPrefix = getMatch(dateRange, BackupFileType.SST_V2); String marker = getMatch( new DateUtil.DateRange(dateRange.getStartTime(), null), - AbstractBackupPath.BackupFileType.SST_V2); + BackupFileType.SST_V2); logger.info( "Listing filesystem with prefix: {}, marker: {}, daterange: {}", incrementalPrefix, @@ -153,7 +153,8 @@ public List findMetaFiles(DateUtil.DateRange dateRange) { @Override public Path downloadMetaFile(AbstractBackupPath meta) throws BackupRestoreException { Path localFile = Paths.get(meta.newRestoreFile().getAbsolutePath()); - fs.downloadFile(Paths.get(meta.getRemotePath()), localFile, 10); + meta.getDirectives().withRetry(10).withLocalPath(localFile); + fs.downloadFile(meta); return localFile; } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java index d5133374c..514ec287d 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java @@ -18,6 +18,7 @@ import com.google.inject.Provider; import com.netflix.priam.backup.*; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.backup.BackupVersion; import com.netflix.priam.config.IBackupRestoreConfig; import com.netflix.priam.config.IConfiguration; @@ -286,7 +287,7 @@ private void uploadAllFiles( // Process each snapshot of SNAPSHOT_PREFIX // We do not want to wait for completion and we just want to add them to queue. This // is to ensure that next run happens on time. - upload(snapshotDirectory, AbstractBackupPath.BackupFileType.SST_V2, true, false); + upload(snapshotDirectory, BackupFileType.SST_V2, true, false); } } } @@ -344,7 +345,7 @@ private void generateMetaFile( // Add isUploaded and remotePath here. try { AbstractBackupPath abstractBackupPath = pathFactory.get(); - abstractBackupPath.parseLocal(file, AbstractBackupPath.BackupFileType.SST_V2); + abstractBackupPath.parseLocal(file, BackupFileType.SST_V2); fileUploadResult.setBackupPath(abstractBackupPath.getRemotePath()); fileUploadResult.setUploaded( fs.checkObjectExists(Paths.get(fileUploadResult.getBackupPath()))); diff --git a/priam/src/main/java/com/netflix/priam/compress/ChunkedStream.java b/priam/src/main/java/com/netflix/priam/compress/ChunkedStream.java index b23b29dd4..100f61d7d 100644 --- a/priam/src/main/java/com/netflix/priam/compress/ChunkedStream.java +++ b/priam/src/main/java/com/netflix/priam/compress/ChunkedStream.java @@ -16,27 +16,36 @@ */ package com.netflix.priam.compress; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.util.Iterator; import org.apache.commons.io.IOUtils; -import org.xerial.snappy.SnappyOutputStream; /** Byte iterator representing compressed data. Uses snappy compression */ public class ChunkedStream implements Iterator { private boolean hasnext = true; private final ByteArrayOutputStream bos; - private final SnappyOutputStream compress; private final InputStream origin; private final long chunkSize; private static final int BYTES_TO_READ = 2048; + private final ICompressor compressor; - public ChunkedStream(InputStream is, long chunkSize) throws IOException { + public ChunkedStream( + ICompression.CompressionAlgorithm compressionAlgorithm, InputStream is, long chunkSize) + throws IOException { this.origin = is; this.bos = new ByteArrayOutputStream(); - this.compress = new SnappyOutputStream(bos); this.chunkSize = chunkSize; + switch (compressionAlgorithm) { + case LZ4: + compressor = new LZ4Compressor(bos, (int) chunkSize); + break; + case SNAPPY: + compressor = new SnappyCompressor(bos); + break; + default: + compressor = new NoOpCompressor(bos); + break; + } } @Override @@ -50,7 +59,7 @@ public byte[] next() { byte data[] = new byte[BYTES_TO_READ]; int count; while ((count = origin.read(data, 0, data.length)) != -1) { - compress.write(data, 0, count); + compressor.write(data, 0, count); if (bos.size() >= chunkSize) return returnSafe(); } // We don't have anything else to read hence set to false. @@ -61,10 +70,10 @@ public byte[] next() { } private byte[] done() throws IOException { - compress.flush(); + compressor.finish(); byte[] return_ = bos.toByteArray(); hasnext = false; - IOUtils.closeQuietly(compress); + compressor.closeQuietly(); IOUtils.closeQuietly(bos); IOUtils.closeQuietly(origin); return return_; diff --git a/priam/src/main/java/com/netflix/priam/compress/SnappyCompression.java b/priam/src/main/java/com/netflix/priam/compress/Decompressor.java similarity index 51% rename from priam/src/main/java/com/netflix/priam/compress/SnappyCompression.java rename to priam/src/main/java/com/netflix/priam/compress/Decompressor.java index ae43a6063..b92daf129 100644 --- a/priam/src/main/java/com/netflix/priam/compress/SnappyCompression.java +++ b/priam/src/main/java/com/netflix/priam/compress/Decompressor.java @@ -1,5 +1,6 @@ +package com.netflix.priam.compress; /* - * Copyright 2017 Netflix, Inc. + * Copyright 2019 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,36 +15,39 @@ * limitations under the License. * */ -package com.netflix.priam.compress; import java.io.*; -import java.util.Iterator; -import org.apache.commons.io.IOUtils; +import net.jpountz.lz4.LZ4BlockInputStream; import org.xerial.snappy.SnappyInputStream; -/** Class to generate compressed chunks of data from an input stream using SnappyCompression */ -public class SnappyCompression implements ICompression { +public class Decompressor { private static final int BUFFER = 2 * 1024; - @Override - public Iterator compress(InputStream is, long chunkSize) throws IOException { - return new ChunkedStream(is, chunkSize); + public static void decompress( + ICompression.CompressionAlgorithm compressionAlgorithm, + InputStream input, + OutputStream output) + throws IOException { + decompress(getInputStream(compressionAlgorithm, input), output); } - @Override - public void decompressAndClose(InputStream input, OutputStream output) throws IOException { - try { - decompress(input, output); - } finally { - IOUtils.closeQuietly(input); - IOUtils.closeQuietly(output); + private static InputStream getInputStream( + ICompression.CompressionAlgorithm compressionAlgorithm, InputStream inputStream) + throws IOException { + switch (compressionAlgorithm) { + case SNAPPY: + return new SnappyInputStream(new BufferedInputStream(inputStream)); + case LZ4: + return new LZ4BlockInputStream(new BufferedInputStream(inputStream)); + default: + return new BufferedInputStream(inputStream); } } - private void decompress(InputStream input, OutputStream output) throws IOException { + private static void decompress(InputStream input, OutputStream output) throws IOException { byte data[] = new byte[BUFFER]; try (BufferedOutputStream dest1 = new BufferedOutputStream(output, BUFFER); - SnappyInputStream is = new SnappyInputStream(new BufferedInputStream(input))) { + InputStream is = input) { int c; while ((c = is.read(data, 0, BUFFER)) != -1) { dest1.write(data, 0, c); diff --git a/priam/src/main/java/com/netflix/priam/compress/ICompression.java b/priam/src/main/java/com/netflix/priam/compress/ICompression.java index 2eca42c6f..013ab1391 100644 --- a/priam/src/main/java/com/netflix/priam/compress/ICompression.java +++ b/priam/src/main/java/com/netflix/priam/compress/ICompression.java @@ -16,13 +16,6 @@ */ package com.netflix.priam.compress; -import com.google.inject.ImplementedBy; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Iterator; - -@ImplementedBy(SnappyCompression.class) public interface ICompression { enum CompressionAlgorithm { @@ -31,19 +24,5 @@ enum CompressionAlgorithm { NONE } - /** - * Uncompress the input stream and write to the output stream. Closes both input and output - * streams - */ - void decompressAndClose(InputStream input, OutputStream output) throws IOException; - - /** - * Produces chunks of compressed data. - * - * @param is inputstream to be compressed. - * @param chunkSize compress the stream and return it in parts of chunk - * @return compressed byte array iterator - * @throws IOException - */ - Iterator compress(InputStream is, long chunkSize) throws IOException; + CompressionAlgorithm DEFAULT_COMPRESSION = CompressionAlgorithm.SNAPPY; } diff --git a/priam/src/main/java/com/netflix/priam/compress/ICompressor.java b/priam/src/main/java/com/netflix/priam/compress/ICompressor.java new file mode 100644 index 000000000..8d10925c3 --- /dev/null +++ b/priam/src/main/java/com/netflix/priam/compress/ICompressor.java @@ -0,0 +1,28 @@ +package com.netflix.priam.compress; +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.IOException; + +public interface ICompressor { + + void write(byte[] b, int byteOffset, int byteLength) throws IOException; + + void finish() throws IOException; + + void closeQuietly(); +} diff --git a/priam/src/main/java/com/netflix/priam/compress/LZ4Compressor.java b/priam/src/main/java/com/netflix/priam/compress/LZ4Compressor.java new file mode 100644 index 000000000..d0853dc63 --- /dev/null +++ b/priam/src/main/java/com/netflix/priam/compress/LZ4Compressor.java @@ -0,0 +1,46 @@ +package com.netflix.priam.compress; +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.*; +import net.jpountz.lz4.LZ4BlockOutputStream; + +public class LZ4Compressor implements ICompressor { + private final LZ4BlockOutputStream lz4Compress; + + public LZ4Compressor(OutputStream outputStream, int chunkSize) { + this.lz4Compress = new LZ4BlockOutputStream(outputStream, chunkSize); + } + + @Override + public void write(byte[] b, int byteOffset, int byteLength) throws IOException { + lz4Compress.write(b, byteOffset, byteLength); + } + + @Override + public void finish() throws IOException { + lz4Compress.finish(); + } + + @Override + public void closeQuietly() { + try { + if (lz4Compress != null) lz4Compress.close(); + } catch (IOException e) { + } + } +} diff --git a/priam/src/main/java/com/netflix/priam/compress/NoOpCompressor.java b/priam/src/main/java/com/netflix/priam/compress/NoOpCompressor.java new file mode 100644 index 000000000..dbfdefa0c --- /dev/null +++ b/priam/src/main/java/com/netflix/priam/compress/NoOpCompressor.java @@ -0,0 +1,45 @@ +package com.netflix.priam.compress; +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.*; + +public class NoOpCompressor implements ICompressor { + private final OutputStream outputStream; + + public NoOpCompressor(OutputStream outputStream) { + this.outputStream = outputStream; + } + + @Override + public void write(byte[] b, int byteOffset, int byteLength) throws IOException { + outputStream.write(b, byteOffset, byteLength); + } + + @Override + public void finish() throws IOException { + outputStream.flush(); + } + + @Override + public void closeQuietly() { + try { + if (outputStream != null) outputStream.close(); + } catch (IOException e) { + } + } +} diff --git a/priam/src/main/java/com/netflix/priam/compress/SnappyCompressor.java b/priam/src/main/java/com/netflix/priam/compress/SnappyCompressor.java new file mode 100644 index 000000000..42ab93c3d --- /dev/null +++ b/priam/src/main/java/com/netflix/priam/compress/SnappyCompressor.java @@ -0,0 +1,46 @@ +package com.netflix.priam.compress; +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.*; +import org.xerial.snappy.SnappyOutputStream; + +public class SnappyCompressor implements ICompressor { + private final SnappyOutputStream snappyCompress; + + public SnappyCompressor(OutputStream outputStream) { + snappyCompress = new SnappyOutputStream(outputStream); + } + + @Override + public void write(byte[] b, int byteOffset, int byteLength) throws IOException { + snappyCompress.write(b, byteOffset, byteLength); + } + + @Override + public void finish() throws IOException { + snappyCompress.flush(); + } + + @Override + public void closeQuietly() { + try { + if (snappyCompress != null) snappyCompress.close(); + } catch (IOException e) { + } + } +} diff --git a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java index 3d148865c..9c2dfeb1f 100644 --- a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java +++ b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java @@ -826,6 +826,17 @@ default int getBackupQueueSize() { return 100000; } + /** + * TTL (in days) for the entries - filenames which will be cached internally when successfully + * uploaded to remote file system. This cache is used to avoid bombarding the remote file system + * to check if object exists. + * + * @return + */ + default int getBackupCacheTTLInDays() { + return 2; + } + /** * Queue size to be used for file downloads. Note that once queue is full, we would wait for * {@link #getDownloadTimeout()} to add any new item before declining the request and throwing diff --git a/priam/src/main/java/com/netflix/priam/google/GoogleEncryptedFileSystem.java b/priam/src/main/java/com/netflix/priam/google/GoogleEncryptedFileSystem.java index f54693165..12024f465 100755 --- a/priam/src/main/java/com/netflix/priam/google/GoogleEncryptedFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/google/GoogleEncryptedFileSystem.java @@ -175,12 +175,15 @@ private Credential constructGcsCredential() throws Exception { } @Override - protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRestoreException { + protected void downloadFileImpl(final AbstractBackupPath.UploadDownloadDirectives directives) + throws BackupRestoreException { String objectName = parseObjectname(getPrefix().toString()); com.google.api.services.storage.Storage.Objects.Get get; try { - get = constructObjectResourceHandle().get(this.srcBucketName, remotePath.toString()); + get = + constructObjectResourceHandle() + .get(this.srcBucketName, directives.getRemotePath().toString()); } catch (IOException e) { throw new BackupRestoreException( "IO error retrieving metadata for: " @@ -193,7 +196,7 @@ protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRe // If you're not using GCS' AppEngine, download the whole thing (instead of chunks) in one // request, if possible. get.getMediaHttpDownloader().setDirectDownloadEnabled(true); - try (OutputStream os = new FileOutputStream(localPath.toFile()); + try (OutputStream os = new FileOutputStream(directives.getLocalPath().toFile()); InputStream is = get.executeMediaAsInputStream()) { IOUtils.copyLarge(is, os); } catch (IOException e) { @@ -233,7 +236,8 @@ public void shutdown() { } @Override - protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRestoreException { + protected long uploadFileImpl(AbstractBackupPath.UploadDownloadDirectives directives) + throws BackupRestoreException { throw new UnsupportedOperationException(); } diff --git a/priam/src/main/java/com/netflix/priam/notification/BackupNotificationMgr.java b/priam/src/main/java/com/netflix/priam/notification/BackupNotificationMgr.java index 3071a0c67..c635f75f5 100644 --- a/priam/src/main/java/com/netflix/priam/notification/BackupNotificationMgr.java +++ b/priam/src/main/java/com/netflix/priam/notification/BackupNotificationMgr.java @@ -60,12 +60,12 @@ private void notify(AbstractBackupPath abp, String uploadStatus) { jsonObject.put("rack", instanceInfo.getRac()); jsonObject.put("token", abp.getToken()); jsonObject.put("filename", abp.getFileName()); - jsonObject.put("uncompressfilesize", abp.getSize()); - jsonObject.put("compressfilesize", abp.getCompressedFileSize()); - jsonObject.put("backuptype", abp.getType().name()); + jsonObject.put("uncompressfilesize", abp.getDirectives().getSize()); + jsonObject.put("compressfilesize", abp.getDirectives().getCompressedFileSize()); + jsonObject.put("backuptype", abp.getDirectives().getType().name()); jsonObject.put("uploadstatus", uploadStatus); - jsonObject.put("compression", abp.getCompression().name()); - jsonObject.put("encryption", abp.getEncryption().name()); + jsonObject.put("compression", abp.getDirectives().getCompression().name()); + jsonObject.put("encryption", abp.getDirectives().getEncryption().name()); // SNS Attributes for filtering messages. Cluster name and backup file type. Map messageAttributes = new HashMap<>(); @@ -78,7 +78,7 @@ private void notify(AbstractBackupPath abp, String uploadStatus) { "backuptype", new MessageAttributeValue() .withDataType("String") - .withStringValue(abp.getType().name())); + .withStringValue(abp.getDirectives().getType().name())); this.notificationService.notify(jsonObject.toString(), messageAttributes); } catch (JSONException exception) { diff --git a/priam/src/main/java/com/netflix/priam/resources/BackupServlet.java b/priam/src/main/java/com/netflix/priam/resources/BackupServlet.java index 31c9efdcc..d1e6f0ad7 100644 --- a/priam/src/main/java/com/netflix/priam/resources/BackupServlet.java +++ b/priam/src/main/java/com/netflix/priam/resources/BackupServlet.java @@ -19,7 +19,7 @@ import com.google.inject.Inject; import com.google.inject.name.Named; import com.netflix.priam.backup.*; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.backup.BackupVersion; import com.netflix.priam.config.IBackupRestoreConfig; import com.netflix.priam.config.IConfiguration; @@ -249,7 +249,8 @@ private JSONObject constructJsonResponse( JSONArray jArray = new JSONArray(); while (it.hasNext()) { AbstractBackupPath p = it.next(); - if (!filter.isEmpty() && BackupFileType.valueOf(filter) != p.getType()) continue; + if (!filter.isEmpty() + && BackupFileType.valueOf(filter) != p.getDirectives().getType()) continue; JSONObject backupJSON = new JSONObject(); backupJSON.put("bucket", config.getBackupPrefix()); backupJSON.put("filename", p.getRemotePath()); diff --git a/priam/src/main/java/com/netflix/priam/resources/CassandraAdmin.java b/priam/src/main/java/com/netflix/priam/resources/CassandraAdmin.java index 07ac56e8a..6ff5506a5 100644 --- a/priam/src/main/java/com/netflix/priam/resources/CassandraAdmin.java +++ b/priam/src/main/java/com/netflix/priam/resources/CassandraAdmin.java @@ -20,7 +20,8 @@ import com.google.inject.Inject; import com.netflix.priam.cluster.management.Compaction; import com.netflix.priam.cluster.management.Flush; -import com.netflix.priam.compress.SnappyCompression; +import com.netflix.priam.compress.Decompressor; +import com.netflix.priam.compress.ICompression; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.connection.CassandraOperations; import com.netflix.priam.connection.JMXConnectionException; @@ -532,8 +533,10 @@ public Response cassDrain() throws IOException, ExecutionException, InterruptedE @Path("/decompress") public Response decompress(@QueryParam("in") String in, @QueryParam("out") String out) throws Exception { - SnappyCompression compress = new SnappyCompression(); - compress.decompressAndClose(new FileInputStream(in), new FileOutputStream(out)); + Decompressor.decompress( + ICompression.DEFAULT_COMPRESSION, + new FileInputStream(in), + new FileOutputStream(out)); JSONObject object = new JSONObject(); object.put("Input compressed file", in); object.put("Output decompress file", out); diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index 25a0275d2..b12527509 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -19,7 +19,6 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.netflix.priam.backup.*; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; import com.netflix.priam.backupv2.IMetaProxy; import com.netflix.priam.config.IBackupRestoreConfig; import com.netflix.priam.config.IConfiguration; @@ -158,7 +157,8 @@ private List> downloadCommitLogs( BoundedList bl = new BoundedList(lastN); while (fsIterator.hasNext()) { AbstractBackupPath temp = fsIterator.next(); - if (temp.getType() == BackupFileType.CL) { + if (temp.getDirectives().getType() + == AbstractBackupPath.UploadDownloadDirectives.BackupFileType.CL) { bl.add(temp); } } diff --git a/priam/src/main/java/com/netflix/priam/restore/EncryptedRestoreBase.java b/priam/src/main/java/com/netflix/priam/restore/EncryptedRestoreBase.java index d4e06aa9d..c87e3bc39 100755 --- a/priam/src/main/java/com/netflix/priam/restore/EncryptedRestoreBase.java +++ b/priam/src/main/java/com/netflix/priam/restore/EncryptedRestoreBase.java @@ -17,6 +17,7 @@ import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.IBackupFileSystem; import com.netflix.priam.backup.MetaData; +import com.netflix.priam.compress.Decompressor; import com.netflix.priam.compress.ICompression; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.cred.ICredentialGeneric; @@ -104,10 +105,8 @@ public Path retriableCall() throws Exception { // == download object from source bucket try { // Not retrying to download file here as it is already in RetryCallable. - fs.downloadFile( - Paths.get(path.getRemotePath()), - Paths.get(tempFile.getAbsolutePath()), - 0); + path.getDirectives().withRetry(0).withLocalPath(tempFile.toPath()); + fs.downloadFile(path); } catch (Exception ex) { // This behavior is retryable; therefore, lets get to a clean state // before each retry. @@ -170,7 +169,8 @@ public Path retriableCall() throws Exception { BufferedOutputStream finalDestination = new BufferedOutputStream( new FileOutputStream(restoreLocation))) { - compress.decompressAndClose(is, finalDestination); + Decompressor.decompress( + ICompression.DEFAULT_COMPRESSION, is, finalDestination); } catch (Exception ex) { throw new Exception( "Exception uncompressing file: " diff --git a/priam/src/main/java/com/netflix/priam/restore/Restore.java b/priam/src/main/java/com/netflix/priam/restore/Restore.java index ac0a27f66..14327c1e5 100644 --- a/priam/src/main/java/com/netflix/priam/restore/Restore.java +++ b/priam/src/main/java/com/netflix/priam/restore/Restore.java @@ -32,7 +32,6 @@ import com.netflix.priam.utils.Sleeper; import java.io.File; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,8 +71,8 @@ public Restore( @Override protected final Future downloadFile( final AbstractBackupPath path, final File restoreLocation) throws Exception { - return fs.asyncDownloadFile( - Paths.get(path.getRemotePath()), Paths.get(restoreLocation.getAbsolutePath()), 5); + path.getDirectives().withRetry(5).withLocalPath(restoreLocation.toPath()); + return fs.asyncDownloadFile(path); } public static TaskTimer getTimer() { diff --git a/priam/src/main/java/com/netflix/priam/tuner/CassandraTunerService.java b/priam/src/main/java/com/netflix/priam/tuner/CassandraTunerService.java index d04ba7927..31ea1b8c0 100644 --- a/priam/src/main/java/com/netflix/priam/tuner/CassandraTunerService.java +++ b/priam/src/main/java/com/netflix/priam/tuner/CassandraTunerService.java @@ -1,3 +1,19 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package com.netflix.priam.tuner; import com.netflix.priam.backup.IncrementalBackup; diff --git a/priam/src/test/java/com/netflix/priam/aws/TestRemoteBackupPath.java b/priam/src/test/java/com/netflix/priam/aws/TestRemoteBackupPath.java index 67e1d0dc7..89a528916 100644 --- a/priam/src/test/java/com/netflix/priam/aws/TestRemoteBackupPath.java +++ b/priam/src/test/java/com/netflix/priam/aws/TestRemoteBackupPath.java @@ -21,7 +21,7 @@ import com.google.inject.Injector; import com.google.inject.Provider; import com.netflix.priam.backup.AbstractBackupPath; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.backup.BRTestModule; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.cryptography.IFileCryptography; @@ -64,7 +64,7 @@ public void testV1BackupPathsSST() throws ParseException { 0, abstractBackupPath.getLastModified().toEpochMilli()); // File do not exist. Assert.assertEquals("keyspace1", abstractBackupPath.getKeyspace()); Assert.assertEquals("columnfamily1", abstractBackupPath.getColumnFamily()); - Assert.assertEquals(BackupFileType.SST, abstractBackupPath.getType()); + Assert.assertEquals(BackupFileType.SST, abstractBackupPath.getDirectives().getType()); Assert.assertEquals(path.toFile(), abstractBackupPath.getBackupFile()); Assert.assertEquals( 0, @@ -86,9 +86,11 @@ private void validateAbstractBackupPath(AbstractBackupPath abp1, AbstractBackupP Assert.assertEquals(abp1.getKeyspace(), abp2.getKeyspace()); Assert.assertEquals(abp1.getColumnFamily(), abp2.getColumnFamily()); Assert.assertEquals(abp1.getFileName(), abp2.getFileName()); - Assert.assertEquals(abp1.getType(), abp2.getType()); - Assert.assertEquals(abp1.getCompression(), abp2.getCompression()); - Assert.assertEquals(abp1.getEncryption(), abp2.getEncryption()); + Assert.assertEquals(abp1.getDirectives().getType(), abp2.getDirectives().getType()); + Assert.assertEquals( + abp1.getDirectives().getCompression(), abp2.getDirectives().getCompression()); + Assert.assertEquals( + abp1.getDirectives().getEncryption(), abp2.getDirectives().getEncryption()); } @Test @@ -109,7 +111,7 @@ public void testV1BackupPathsSnap() throws ParseException { 0, abstractBackupPath.getLastModified().toEpochMilli()); // File do not exist. Assert.assertEquals("keyspace1", abstractBackupPath.getKeyspace()); Assert.assertEquals("columnfamily1", abstractBackupPath.getColumnFamily()); - Assert.assertEquals(BackupFileType.SNAP, abstractBackupPath.getType()); + Assert.assertEquals(BackupFileType.SNAP, abstractBackupPath.getDirectives().getType()); Assert.assertEquals(path.toFile(), abstractBackupPath.getBackupFile()); Assert.assertEquals( "201801011201", DateUtil.formatyyyyMMddHHmm(abstractBackupPath.getTime())); @@ -135,7 +137,7 @@ public void testV1BackupPathsMeta() throws ParseException { 0, abstractBackupPath.getLastModified().toEpochMilli()); // File do not exist. Assert.assertEquals(null, abstractBackupPath.getKeyspace()); Assert.assertEquals(null, abstractBackupPath.getColumnFamily()); - Assert.assertEquals(BackupFileType.META, abstractBackupPath.getType()); + Assert.assertEquals(BackupFileType.META, abstractBackupPath.getDirectives().getType()); Assert.assertEquals(path.toFile(), abstractBackupPath.getBackupFile()); // Verify toRemote and parseRemote. @@ -165,8 +167,8 @@ public void testV2BackupPathSST() throws ParseException { 0, abstractBackupPath.getLastModified().toEpochMilli()); // File do not exist. Assert.assertEquals("keyspace1", abstractBackupPath.getKeyspace()); Assert.assertEquals("columnfamily1", abstractBackupPath.getColumnFamily()); - Assert.assertEquals("SNAPPY", abstractBackupPath.getCompression().name()); - Assert.assertEquals(BackupFileType.SST_V2, abstractBackupPath.getType()); + Assert.assertEquals("SNAPPY", abstractBackupPath.getDirectives().getCompression().name()); + Assert.assertEquals(BackupFileType.SST_V2, abstractBackupPath.getDirectives().getType()); Assert.assertEquals(path.toFile(), abstractBackupPath.getBackupFile()); // Verify toRemote and parseRemote. @@ -192,11 +194,11 @@ public void testV2BackupPathMeta() throws ParseException { 0, abstractBackupPath.getLastModified().toEpochMilli()); // File do not exist. Assert.assertEquals(null, abstractBackupPath.getKeyspace()); Assert.assertEquals(null, abstractBackupPath.getColumnFamily()); - Assert.assertEquals(BackupFileType.META_V2, abstractBackupPath.getType()); + Assert.assertEquals(BackupFileType.META_V2, abstractBackupPath.getDirectives().getType()); Assert.assertEquals(path.toFile(), abstractBackupPath.getBackupFile()); Assert.assertEquals( IFileCryptography.CryptographyAlgorithm.PLAINTEXT, - abstractBackupPath.getEncryption()); + abstractBackupPath.getDirectives().getEncryption()); // Verify toRemote and parseRemote. Instant now = DateUtil.getInstant(); @@ -204,7 +206,7 @@ public void testV2BackupPathMeta() throws ParseException { String remotePath = abstractBackupPath.getRemotePath(); logger.info(remotePath); - Assert.assertEquals("SNAPPY", abstractBackupPath.getCompression().name()); + Assert.assertEquals("SNAPPY", abstractBackupPath.getDirectives().getCompression().name()); AbstractBackupPath abstractBackupPath2 = pathFactory.get(); abstractBackupPath2.parseRemote(remotePath); diff --git a/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java b/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java index 14d842674..43bcd8510 100644 --- a/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java +++ b/priam/src/test/java/com/netflix/priam/backup/FakeBackupFileSystem.java @@ -141,16 +141,20 @@ public void cleanup() { } @Override - protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRestoreException { + protected void downloadFileImpl(AbstractBackupPath.UploadDownloadDirectives directives) + throws BackupRestoreException { AbstractBackupPath path = pathProvider.get(); - path.parseRemote(remotePath.toString()); + path.parseRemote(directives.getRemotePath().toString()); - if (path.getType() == AbstractBackupPath.BackupFileType.META) { + if (directives.getType() + == AbstractBackupPath.UploadDownloadDirectives.BackupFileType.META) { // List all files and generate the file - try (FileWriter fr = new FileWriter(localPath.toFile())) { + try (FileWriter fr = new FileWriter(directives.getLocalPath().toFile())) { JSONArray jsonObj = new JSONArray(); for (AbstractBackupPath filePath : flist) { - if (filePath.type == AbstractBackupPath.BackupFileType.SNAP + if (filePath.getDirectives().type + == AbstractBackupPath.UploadDownloadDirectives.BackupFileType + .SNAP && filePath.time.equals(path.time)) { jsonObj.add(filePath.getRemotePath()); } @@ -161,13 +165,14 @@ protected void downloadFileImpl(Path remotePath, Path localPath) throws BackupRe throw new BackupRestoreException(io.getMessage(), io); } } - downloadedFiles.add(remotePath.toString()); + downloadedFiles.add(directives.getRemotePath().toString()); } @Override - protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRestoreException { - uploadedFiles.add(localPath.toFile().getAbsolutePath()); - addFile(remotePath.toString()); - return localPath.toFile().length(); + protected long uploadFileImpl(AbstractBackupPath.UploadDownloadDirectives directives) + throws BackupRestoreException { + uploadedFiles.add(directives.getLocalPath().toFile().getAbsolutePath()); + addFile(directives.getRemotePath().toString()); + return directives.getLocalPath().toFile().length(); } } diff --git a/priam/src/test/java/com/netflix/priam/backup/NullBackupFileSystem.java b/priam/src/test/java/com/netflix/priam/backup/NullBackupFileSystem.java index 04377f301..393c8e3c3 100644 --- a/priam/src/test/java/com/netflix/priam/backup/NullBackupFileSystem.java +++ b/priam/src/test/java/com/netflix/priam/backup/NullBackupFileSystem.java @@ -63,7 +63,7 @@ public void cleanup() { } @Override - protected void downloadFileImpl(Path remotePath, Path localPath) + protected void downloadFileImpl(final AbstractBackupPath.UploadDownloadDirectives directives) throws BackupRestoreException {} @Override @@ -72,7 +72,8 @@ protected boolean doesRemoteFileExist(Path remotePath) { } @Override - protected long uploadFileImpl(Path localPath, Path remotePath) throws BackupRestoreException { + protected long uploadFileImpl(AbstractBackupPath.UploadDownloadDirectives directives) + throws BackupRestoreException { return 0; } } diff --git a/priam/src/test/java/com/netflix/priam/backup/S3ClientSideEncryptionSymMasterKey.java b/priam/src/test/java/com/netflix/priam/backup/S3ClientSideEncryptionSymMasterKey.java new file mode 100644 index 000000000..990175d58 --- /dev/null +++ b/priam/src/test/java/com/netflix/priam/backup/S3ClientSideEncryptionSymMasterKey.java @@ -0,0 +1,129 @@ +package com.netflix.priam.backup; +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkClientException; +import com.amazonaws.auth.profile.ProfileCredentialsProvider; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.AmazonS3EncryptionClientBuilder; +import com.amazonaws.services.s3.model.*; +import com.amazonaws.util.IOUtils; +import java.io.*; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.X509EncodedKeySpec; +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; + +public class S3ClientSideEncryptionSymMasterKey { + + public static void main(String[] args) throws Exception { + Regions clientRegion = Regions.US_EAST_1; + + String bucketName = "useast1-cass-test-1"; + String objectKeyName = "perf_aa_sample_client_side_key.txt"; + String masterKeyDir = System.getProperty("java.io.tmpdir"); + String masterKeyName = "secret.key"; + + // Generate a symmetric 256-bit AES key. + KeyGenerator symKeyGenerator = KeyGenerator.getInstance("AES"); + symKeyGenerator.init(256); + SecretKey symKey = symKeyGenerator.generateKey(); + + // To see how it works, save and load the key to and from the file system. + saveSymmetricKey(masterKeyDir, masterKeyName, symKey); + symKey = loadSymmetricAESKey(masterKeyDir, masterKeyName, "AES"); + + try { + // Create the Amazon S3 encryption client. + EncryptionMaterials encryptionMaterials = new EncryptionMaterials(symKey); + AmazonS3 s3EncryptionClient = + AmazonS3EncryptionClientBuilder.standard() + .withCredentials(new ProfileCredentialsProvider()) + .withEncryptionMaterials( + new StaticEncryptionMaterialsProvider(encryptionMaterials)) + .withRegion(clientRegion) + .build(); + + // Upload a new object. The encryption client automatically encrypts it. + byte[] plaintext = + "S3 Object Encrypted Using Client-Side Symmetric Master Key.".getBytes(); + s3EncryptionClient.putObject( + new PutObjectRequest( + bucketName, + objectKeyName, + new ByteArrayInputStream(plaintext), + new ObjectMetadata())); + + // Download and decrypt the object. + S3Object downloadedObject = s3EncryptionClient.getObject(bucketName, objectKeyName); + byte[] decrypted = + com.amazonaws.util.IOUtils.toByteArray(downloadedObject.getObjectContent()); + + // Verify that the data that you downloaded is the same as the original data. + System.out.println("Plaintext: " + new String(plaintext)); + System.out.println("Decrypted text: " + new String(decrypted)); + + AmazonS3 s3Client = + AmazonS3ClientBuilder.standard() + .withCredentials(new ProfileCredentialsProvider()) + .withRegion(clientRegion) + .build(); + S3Object downloadObject2 = s3Client.getObject(bucketName, objectKeyName); + byte[] downloaded = IOUtils.toByteArray(downloadObject2.getObjectContent()); + System.out.println("Just downloaded: " + new String(downloaded)); + + } catch (AmazonServiceException e) { + // The call was transmitted successfully, but Amazon S3 couldn't process + // it, so it returned an error response. + e.printStackTrace(); + } catch (SdkClientException e) { + // Amazon S3 couldn't be contacted for a response, or the client + // couldn't parse the response from Amazon S3. + e.printStackTrace(); + } + } + + private static void saveSymmetricKey( + String masterKeyDir, String masterKeyName, SecretKey secretKey) throws IOException { + X509EncodedKeySpec x509EncodedKeySpec = new X509EncodedKeySpec(secretKey.getEncoded()); + FileOutputStream keyOutputStream = + new FileOutputStream(masterKeyDir + File.separator + masterKeyName); + keyOutputStream.write(x509EncodedKeySpec.getEncoded()); + keyOutputStream.close(); + } + + private static SecretKey loadSymmetricAESKey( + String masterKeyDir, String masterKeyName, String algorithm) + throws IOException, NoSuchAlgorithmException, InvalidKeySpecException, + InvalidKeyException { + // Read the key from the specified file. + File keyFile = new File(masterKeyDir + File.separator + masterKeyName); + FileInputStream keyInputStream = new FileInputStream(keyFile); + byte[] encodedPrivateKey = new byte[(int) keyFile.length()]; + keyInputStream.read(encodedPrivateKey); + keyInputStream.close(); + + // Reconstruct and return the master key. + return new SecretKeySpec(encodedPrivateKey, "AES"); + } +} diff --git a/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java b/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java index 695caf80b..3e14f8231 100644 --- a/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java +++ b/priam/src/test/java/com/netflix/priam/backup/TestAbstractFileSystem.java @@ -82,8 +82,12 @@ public void testFailedRetriesUpload() throws Exception { try { Collection files = generateFiles(1, 1, 1); for (File file : files) { - failureFileSystem.uploadFile( - file.toPath(), file.toPath(), getDummyPath(file.toPath()), 2, true); + AbstractBackupPath path = getDummyPath(file.toPath()); + path.getDirectives() + .withRetry(2) + .withDeleteAfterSuccessfulUpload(true) + .withRemotePath(file.toPath()); + failureFileSystem.uploadFile(path); } } catch (BackupRestoreException e) { // Verify the failure metric for upload is incremented. @@ -93,7 +97,9 @@ public void testFailedRetriesUpload() throws Exception { private AbstractBackupPath getDummyPath(Path localPath) throws ParseException { AbstractBackupPath path = injector.getInstance(AbstractBackupPath.class); - path.parseLocal(localPath.toFile(), AbstractBackupPath.BackupFileType.SNAP); + path.parseLocal( + localPath.toFile(), + AbstractBackupPath.UploadDownloadDirectives.BackupFileType.SNAP); return path; } @@ -109,7 +115,9 @@ private Collection generateFiles(int noOfKeyspaces, int noOfCf, int noOfSs @Test public void testFailedRetriesDownload() { try { - failureFileSystem.downloadFile(Paths.get(""), null, 2); + AbstractBackupPath path = injector.getInstance(AbstractBackupPath.class); + path.getDirectives().withRemotePath(Paths.get("")).withRetry(2); + failureFileSystem.downloadFile(path); } catch (BackupRestoreException e) { // Verify the failure metric for download is incremented. Assert.assertEquals(1, (int) backupMetrics.getInvalidDownloads().count()); @@ -121,12 +129,12 @@ public void testUpload() throws Exception { Collection files = generateFiles(1, 1, 1); // Dummy upload with compressed size. for (File file : files) { - myFileSystem.uploadFile( - file.toPath(), - Paths.get(file.toString() + ".tmp"), - getDummyPath(file.toPath()), - 2, - true); + AbstractBackupPath path = getDummyPath(file.toPath()); + path.getDirectives() + .withRetry(2) + .withDeleteAfterSuccessfulUpload(true) + .withRemotePath(Paths.get(file.toString() + ".tmp")); + myFileSystem.uploadFile(path); // Verify the success metric for upload is incremented. Assert.assertEquals(1, (int) backupMetrics.getValidUploads().actualCount()); @@ -139,7 +147,12 @@ public void testUpload() throws Exception { @Test public void testDownload() throws Exception { // Dummy download - myFileSystem.downloadFile(Paths.get(""), Paths.get(configuration.getDataFileLocation()), 2); + AbstractBackupPath path = injector.getInstance(AbstractBackupPath.class); + path.getDirectives() + .withRemotePath(Paths.get("")) + .withLocalPath(Paths.get(configuration.getDataFileLocation())) + .withRetry(2); + myFileSystem.downloadFile(path); // Verify the success metric for download is incremented. Assert.assertEquals(1, (int) backupMetrics.getValidDownloads().actualCount()); } @@ -149,14 +162,13 @@ public void testAsyncUpload() throws Exception { // Testing single async upload. Collection files = generateFiles(1, 1, 1); for (File file : files) { - myFileSystem - .asyncUploadFile( - file.toPath(), - Paths.get(file.toString() + ".tmp"), - getDummyPath(file.toPath()), - 2, - true) - .get(); + AbstractBackupPath path = getDummyPath(file.toPath()); + path.getDirectives() + .withRetry(2) + .withDeleteAfterSuccessfulUpload(true) + .withRemotePath(Paths.get(file.toString() + ".tmp")); + + myFileSystem.asyncUploadFile(path).get(); // 1. Verify the success metric for upload is incremented. Assert.assertEquals(1, (int) backupMetrics.getValidUploads().actualCount()); // 2. The task queue is empty after upload is finished. @@ -172,13 +184,12 @@ public void testAsyncUploadBulk() throws Exception { Collection files = generateFiles(1, 1, 20); List> futures = new ArrayList<>(); for (File file : files) { - futures.add( - myFileSystem.asyncUploadFile( - file.toPath(), - Paths.get(file.toString() + ".tmp"), - getDummyPath(file.toPath()), - 2, - true)); + AbstractBackupPath path = getDummyPath(file.toPath()); + path.getDirectives() + .withRetry(2) + .withDeleteAfterSuccessfulUpload(true) + .withRemotePath(Paths.get(file.toString() + ".tmp")); + futures.add(myFileSystem.asyncUploadFile(path)); } // Verify all the work is finished. @@ -205,8 +216,12 @@ public void testUploadDedup() throws Exception { for (int i = 0; i < size; i++) { torun.add( () -> { - myFileSystem.uploadFile( - file.toPath(), file.toPath(), abstractBackupPath, 2, true); + AbstractBackupPath path = getDummyPath(file.toPath()); + path.getDirectives() + .withRetry(2) + .withDeleteAfterSuccessfulUpload(true) + .withRemotePath(file.toPath()); + myFileSystem.uploadFile(path); return Boolean.TRUE; }); } @@ -233,9 +248,12 @@ public void testAsyncUploadFailure() throws Exception { // Testing single async upload. Collection files = generateFiles(1, 1, 1); for (File file : files) { - Future future = - failureFileSystem.asyncUploadFile( - file.toPath(), file.toPath(), getDummyPath(file.toPath()), 2, true); + AbstractBackupPath path = getDummyPath(file.toPath()); + path.getDirectives() + .withRetry(2) + .withDeleteAfterSuccessfulUpload(true) + .withRemotePath(file.toPath()); + Future future = failureFileSystem.asyncUploadFile(path); try { future.get(); } catch (Exception e) { @@ -253,10 +271,13 @@ public void testAsyncUploadFailure() throws Exception { @Test public void testAsyncDownload() throws Exception { + AbstractBackupPath path = injector.getInstance(AbstractBackupPath.class); + path.getDirectives() + .withRetry(2) + .withRemotePath(Paths.get("")) + .withLocalPath(Paths.get(configuration.getDataFileLocation())); // Testing single async download. - Future future = - myFileSystem.asyncDownloadFile( - Paths.get(""), Paths.get(configuration.getDataFileLocation()), 2); + Future future = myFileSystem.asyncDownloadFile(path); future.get(); // 1. Verify the success metric for download is incremented. Assert.assertEquals(1, (int) backupMetrics.getValidDownloads().actualCount()); @@ -270,10 +291,15 @@ public void testAsyncDownloadBulk() throws Exception { // 1. Give 1000 dummy files to download. File download takes some random time to download. int totalFiles = 1000; List> futureList = new ArrayList<>(); - for (int i = 0; i < totalFiles; i++) - futureList.add( - myFileSystem.asyncDownloadFile( - Paths.get("" + i), Paths.get(configuration.getDataFileLocation()), 2)); + for (int i = 0; i < totalFiles; i++) { + AbstractBackupPath path = injector.getInstance(AbstractBackupPath.class); + path.getDirectives() + .withRetry(2) + .withRemotePath(Paths.get("" + i)) + .withLocalPath(Paths.get(configuration.getDataFileLocation())); + + futureList.add(myFileSystem.asyncDownloadFile(path)); + } // Ensure processing is finished. for (Future future1 : futureList) { @@ -289,7 +315,10 @@ public void testAsyncDownloadBulk() throws Exception { @Test public void testAsyncDownloadFailure() throws Exception { - Future future = failureFileSystem.asyncDownloadFile(Paths.get(""), null, 2); + AbstractBackupPath path = injector.getInstance(AbstractBackupPath.class); + path.getDirectives().withRetry(2).withRemotePath(Paths.get("")); + + Future future = failureFileSystem.asyncDownloadFile(path); try { future.get(); } catch (Exception e) { @@ -310,19 +339,20 @@ public FailureFileSystem( } @Override - protected void downloadFileImpl(Path remotePath, Path localPath) + protected void downloadFileImpl( + final AbstractBackupPath.UploadDownloadDirectives directives) throws BackupRestoreException { throw new BackupRestoreException( "User injected failure file system error for testing download. Remote path: " - + remotePath); + + directives.getRemotePath()); } @Override - protected long uploadFileImpl(Path localPath, Path remotePath) + protected long uploadFileImpl(AbstractBackupPath.UploadDownloadDirectives directives) throws BackupRestoreException { throw new BackupRestoreException( "User injected failure file system error for testing upload. Local path: " - + localPath); + + directives.getLocalPath()); } } @@ -340,7 +370,8 @@ public MyFileSystem( } @Override - protected void downloadFileImpl(Path remotePath, Path localPath) + protected void downloadFileImpl( + final AbstractBackupPath.UploadDownloadDirectives directives) throws BackupRestoreException { try { Thread.sleep(random.nextInt(20)); @@ -350,7 +381,7 @@ protected void downloadFileImpl(Path remotePath, Path localPath) } @Override - protected long uploadFileImpl(Path localPath, Path remotePath) + protected long uploadFileImpl(AbstractBackupPath.UploadDownloadDirectives directives) throws BackupRestoreException { try { Thread.sleep(random.nextInt(20)); diff --git a/priam/src/test/java/com/netflix/priam/backup/TestBackupFile.java b/priam/src/test/java/com/netflix/priam/backup/TestBackupFile.java index 52478f3f6..d7e7c6865 100644 --- a/priam/src/test/java/com/netflix/priam/backup/TestBackupFile.java +++ b/priam/src/test/java/com/netflix/priam/backup/TestBackupFile.java @@ -20,7 +20,7 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.netflix.priam.aws.RemoteBackupPath; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.identity.InstanceIdentity; import com.netflix.priam.utils.DateUtil; import java.io.BufferedOutputStream; @@ -75,7 +75,7 @@ public void testBackupFileCreation() throws ParseException { "target/data/Keyspace1/Standard1/snapshots/201108082320/Keyspace1-Standard1-ia-5-Data.db"; RemoteBackupPath backupfile = injector.getInstance(RemoteBackupPath.class); backupfile.parseLocal(new File(snapshotfile), BackupFileType.SNAP); - Assert.assertEquals(BackupFileType.SNAP, backupfile.type); + Assert.assertEquals(BackupFileType.SNAP, backupfile.getDirectives().type); Assert.assertEquals("Keyspace1", backupfile.keyspace); Assert.assertEquals("Standard1", backupfile.columnFamily); Assert.assertEquals("1234567", backupfile.token); @@ -95,7 +95,7 @@ public void testIncBackupFileCreation() throws ParseException { File bfile = new File("target/data/Keyspace1/Standard1/Keyspace1-Standard1-ia-5-Data.db"); RemoteBackupPath backupfile = injector.getInstance(RemoteBackupPath.class); backupfile.parseLocal(bfile, BackupFileType.SST); - Assert.assertEquals(BackupFileType.SST, backupfile.type); + Assert.assertEquals(BackupFileType.SST, backupfile.getDirectives().type); Assert.assertEquals("Keyspace1", backupfile.keyspace); Assert.assertEquals("Standard1", backupfile.columnFamily); Assert.assertEquals("1234567", backupfile.token); @@ -120,7 +120,7 @@ public void testMetaFileCreation() throws ParseException { RemoteBackupPath backupfile = injector.getInstance(RemoteBackupPath.class); backupfile.parseLocal(bfile, BackupFileType.META); backupfile.setTime(DateUtil.getDate("201108082320")); - Assert.assertEquals(BackupFileType.META, backupfile.type); + Assert.assertEquals(BackupFileType.META, backupfile.getDirectives().type); Assert.assertEquals("1234567", backupfile.token); Assert.assertEquals("fake-app", backupfile.clusterName); Assert.assertEquals(region, backupfile.region); diff --git a/priam/src/test/java/com/netflix/priam/backup/TestBackupVerification.java b/priam/src/test/java/com/netflix/priam/backup/TestBackupVerification.java index cef662187..46a44c619 100644 --- a/priam/src/test/java/com/netflix/priam/backup/TestBackupVerification.java +++ b/priam/src/test/java/com/netflix/priam/backup/TestBackupVerification.java @@ -19,7 +19,7 @@ import com.google.inject.Guice; import com.google.inject.Injector; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.backupv2.MetaV1Proxy; import com.netflix.priam.backupv2.MetaV2Proxy; import com.netflix.priam.config.IConfiguration; diff --git a/priam/src/test/java/com/netflix/priam/backup/TestS3FileSystem.java b/priam/src/test/java/com/netflix/priam/backup/TestS3FileSystem.java index 9d3b4ad70..7e6c9e58d 100644 --- a/priam/src/test/java/com/netflix/priam/backup/TestS3FileSystem.java +++ b/priam/src/test/java/com/netflix/priam/backup/TestS3FileSystem.java @@ -32,7 +32,7 @@ import com.netflix.priam.aws.RemoteBackupPath; import com.netflix.priam.aws.S3FileSystem; import com.netflix.priam.aws.S3PartUploader; -import com.netflix.priam.backup.AbstractBackupPath.BackupFileType; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.identity.config.InstanceInfo; import com.netflix.priam.merics.BackupMetrics; @@ -101,13 +101,14 @@ public void testFileUpload() throws Exception { IBackupFileSystem fs = injector.getInstance(NullBackupFileSystem.class); RemoteBackupPath backupfile = injector.getInstance(RemoteBackupPath.class); backupfile.parseLocal(new File(FILE_PATH), BackupFileType.SNAP); + backupfile + .getDirectives() + .withRetry(0) + .withDeleteAfterSuccessfulUpload(false) + .withRemotePath(Paths.get(backupfile.getRemotePath())); + long noOfFilesUploaded = backupMetrics.getUploadRate().count(); - fs.uploadFile( - Paths.get(backupfile.getBackupFile().getAbsolutePath()), - Paths.get(backupfile.getRemotePath()), - backupfile, - 0, - false); + fs.uploadFile(backupfile); Assert.assertEquals(1, backupMetrics.getUploadRate().count() - noOfFilesUploaded); } @@ -117,12 +118,13 @@ public void testFileUploadDeleteExists() throws Exception { IBackupFileSystem fs = injector.getInstance(NullBackupFileSystem.class); RemoteBackupPath backupfile = injector.getInstance(RemoteBackupPath.class); backupfile.parseLocal(new File(FILE_PATH), BackupFileType.SST_V2); - fs.uploadFile( - Paths.get(backupfile.getBackupFile().getAbsolutePath()), - Paths.get(backupfile.getRemotePath()), - backupfile, - 0, - false); + backupfile + .getDirectives() + .withRetry(0) + .withDeleteAfterSuccessfulUpload(false) + .withRemotePath(Paths.get(backupfile.getRemotePath())); + + fs.uploadFile(backupfile); Assert.assertTrue(fs.checkObjectExists(Paths.get(backupfile.getRemotePath()))); // Lets delete the file now. List deleteFiles = Lists.newArrayList(); @@ -141,13 +143,14 @@ public void testFileUploadFailures() throws Exception { "target/data/Keyspace1/Standard1/backups/201108082320/Keyspace1-Standard1-ia-1-Data.db"; RemoteBackupPath backupfile = injector.getInstance(RemoteBackupPath.class); backupfile.parseLocal(new File(snapshotfile), BackupFileType.SNAP); + backupfile + .getDirectives() + .withRetry(0) + .withDeleteAfterSuccessfulUpload(false) + .withRemotePath(Paths.get(backupfile.getRemotePath())); + try { - fs.uploadFile( - Paths.get(backupfile.getBackupFile().getAbsolutePath()), - Paths.get(backupfile.getRemotePath()), - backupfile, - 0, - false); + fs.uploadFile(backupfile); } catch (BackupRestoreException e) { // ignore } @@ -165,13 +168,14 @@ public void testFileUploadCompleteFailure() throws Exception { "target/data/Keyspace1/Standard1/backups/201108082320/Keyspace1-Standard1-ia-1-Data.db"; RemoteBackupPath backupfile = injector.getInstance(RemoteBackupPath.class); backupfile.parseLocal(new File(snapshotfile), BackupFileType.SNAP); + backupfile + .getDirectives() + .withRetry(0) + .withDeleteAfterSuccessfulUpload(false) + .withRemotePath(Paths.get(backupfile.getRemotePath())); + try { - fs.uploadFile( - Paths.get(backupfile.getBackupFile().getAbsolutePath()), - Paths.get(backupfile.getRemotePath()), - backupfile, - 0, - false); + fs.uploadFile(backupfile); } catch (BackupRestoreException e) { // ignore } diff --git a/priam/src/test/java/com/netflix/priam/backup/s3test.java b/priam/src/test/java/com/netflix/priam/backup/s3test.java new file mode 100644 index 000000000..526987bb0 --- /dev/null +++ b/priam/src/test/java/com/netflix/priam/backup/s3test.java @@ -0,0 +1,192 @@ +package com.netflix.priam.backup; +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkClientException; +import com.amazonaws.auth.profile.ProfileCredentialsProvider; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.*; +import java.io.*; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import javax.crypto.KeyGenerator; + +public class s3test { + private static SSECustomerKey SSE_KEY; + private static AmazonS3 S3_CLIENT; + private static KeyGenerator KEY_GENERATOR; + + public static void main(String[] args) throws IOException, NoSuchAlgorithmException { + Regions clientRegion = Regions.US_EAST_1; + String bucketName = "nflx-cass-primary-persistence-test-us-east-1"; + String bucketNoVersion = "useast1-cass-test-1"; + String keyName = "sample.txt"; + String uploadFileName = "priam/src/test/resources/gossipInfoSample_1.txt"; + String targetKeyName = "perf_aa_sample2.txt"; + + // Create an encryption key. + KEY_GENERATOR = KeyGenerator.getInstance("AES"); + KEY_GENERATOR.init(256, new SecureRandom()); + SSE_KEY = new SSECustomerKey(KEY_GENERATOR.generateKey()); + + try { + S3_CLIENT = + AmazonS3ClientBuilder.standard() + .withCredentials(new ProfileCredentialsProvider()) + .withRegion(clientRegion) + .build(); + + System.out.println( + String.format( + "bucket: %s, Versioning: %s", + bucketName, + S3_CLIENT.getBucketVersioningConfiguration(bucketName).getStatus())); + System.out.println( + String.format( + "bucket: %s, Versioning: %s", + bucketNoVersion, + S3_CLIENT + .getBucketVersioningConfiguration(bucketNoVersion) + .getStatus())); + + // Upload an object. + // uploadObject(bucketName, keyName, new File(uploadFileName)); + + // Download the object. + // downloadObject(bucketName, keyName); + + // Verify that the object is properly encrypted by attempting to retrieve it + // using the encryption key. + // retrieveObjectMetadata(bucketName, keyName); + // retrieveObjectMetadata(bucketName, "perf_aa_sample.txt"); + deleteObject(bucketName, keyName); + + // We put a delete marker on `sample.txt`. This should get automatically deleted by + // 12th. There are 3 versions, they should get dleted too. + // ample.txt has 2 versions (should be 1) + // perf_aa_sample.txt has 3 versions (should be 1) + + // Copy the object into a new object that also uses SSE-C. + // copyObject(bucketName, keyName, targetKeyName); + } catch (AmazonServiceException e) { + // The call was transmitted successfully, but Amazon S3 couldn't process + // it, so it returned an error response. + e.printStackTrace(); + } catch (SdkClientException e) { + // Amazon S3 couldn't be contacted for a response, or the client + // couldn't parse the response from Amazon S3. + e.printStackTrace(); + } + } + + private static void uploadObject(String bucketName, String keyName, File file) { + Instant instant = Instant.now().plus(1, ChronoUnit.DAYS); + PutObjectRequest putRequest = + new PutObjectRequest( + bucketName, + keyName, + file); // .withSSECustomerKey(SSE_KEY).withObjectLockMode(ObjectLockMode.COMPLIANCE).withObjectLockRetainUntilDate(new Date(instant.toEpochMilli())); + S3_CLIENT.putObject(putRequest); + System.out.println("Object uploaded"); + } + + private static void downloadObject(String bucketName, String keyName) throws IOException { + GetObjectRequest getObjectRequest = + new GetObjectRequest(bucketName, keyName); // .withSSECustomerKey(SSE_KEY); + S3Object object = S3_CLIENT.getObject(getObjectRequest); + + System.out.println("Object content: "); + displayTextInputStream(object.getObjectContent()); + } + + private static void retrieveObjectMetadata(String bucketName, String keyName) { + GetObjectMetadataRequest getMetadataRequest = + new GetObjectMetadataRequest(bucketName, keyName); // .withSSECustomerKey(SSE_KEY); + ObjectMetadata objectMetadata = S3_CLIENT.getObjectMetadata(getMetadataRequest); + System.out.println( + String.format( + "Metadata retrieved. %s, %s, %s", + objectMetadata.getVersionId(), + objectMetadata.getObjectLockMode(), + objectMetadata.getObjectLockRetainUntilDate())); + } + + private static void deleteObject(String bucketName, String keyName) { + // ListVersionsRequest request = new ListVersionsRequest() + // .withBucketName(bucketName).withPrefix(keyName); + // + // VersionListing versionListing = S3_CLIENT.listVersions(request); + // int numVersions = 0, numPages = 0; + // while (true) { + // numPages++; + // for (S3VersionSummary objectSummary : + // versionListing.getVersionSummaries()) { + // System.out.printf("Retrieved object %s, version %s\n", + // objectSummary.getKey(), + // objectSummary.getVersionId()); + // numVersions++; + // } + // // Check whether there are more pages of versions to retrieve. If + // // there are, retrieve them. Otherwise, exit the loop. + // if (versionListing.isTruncated()) { + // versionListing = S3_CLIENT.listNextBatchOfVersions(versionListing); + // } else { + // break; + // } + // } + + // GetObjectMetadataRequest getMetadataRequest = + //// new GetObjectMetadataRequest(bucketName, + // keyName);//.withSSECustomerKey(SSE_KEY); + //// ObjectMetadata objectMetadata = S3_CLIENT.getObjectMetadata(getMetadataRequest); + //// DeleteVersionRequest deleteVersionRequest = new DeleteVersionRequest(bucketName, + // keyName, objectMetadata.getVersionId()); + //// S3_CLIENT.deleteVersion(deleteVersionRequest); + + DeleteObjectRequest deleteObjectRequest = new DeleteObjectRequest(bucketName, keyName); + S3_CLIENT.deleteObject(deleteObjectRequest); + } + + private static void copyObject(String bucketName, String keyName, String targetKeyName) + throws NoSuchAlgorithmException { + // Create a new encryption key for target so that the target is saved using SSE-C. + SSECustomerKey newSSEKey = new SSECustomerKey(KEY_GENERATOR.generateKey()); + + CopyObjectRequest copyRequest = + new CopyObjectRequest(bucketName, keyName, bucketName, targetKeyName) + .withSourceSSECustomerKey(SSE_KEY) + .withDestinationSSECustomerKey(newSSEKey); + + S3_CLIENT.copyObject(copyRequest); + System.out.println("Object copied"); + } + + private static void displayTextInputStream(S3ObjectInputStream input) throws IOException { + // Read one line at a time from the input stream and display each line. + BufferedReader reader = new BufferedReader(new InputStreamReader(input)); + String line; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + System.out.println(); + } +} diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java index 74577b11f..ed9abe405 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java @@ -21,6 +21,7 @@ import com.google.inject.Injector; import com.google.inject.Provider; import com.netflix.priam.backup.AbstractBackupPath; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.backup.BRTestModule; import com.netflix.priam.backup.FakeBackupFileSystem; import com.netflix.priam.backup.Status; @@ -121,7 +122,7 @@ public void prepTest(int daysForSnapshot) throws Exception { for (int i = 0; i < metas.length; i++) { AbstractBackupPath path = pathProvider.get(); - path.parseLocal(metas[i].toFile(), AbstractBackupPath.BackupFileType.META_V2); + path.parseLocal(metas[i].toFile(), BackupFileType.META_V2); allFiles.add(path.getRemotePath()); allFilesMap.put("META" + i, path.getRemotePath()); } @@ -131,7 +132,7 @@ public void prepTest(int daysForSnapshot) throws Exception { private String getRemoteFromLocal(String localPath) throws ParseException { AbstractBackupPath path = pathProvider.get(); - path.parseLocal(new File(localPath), AbstractBackupPath.BackupFileType.SST_V2); + path.parseLocal(new File(localPath), BackupFileType.SST_V2); return path.getRemotePath(); } diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupUtils.java b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupUtils.java index 1cd63c960..5780d6b9d 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupUtils.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupUtils.java @@ -56,7 +56,8 @@ public Path createMeta(List filesToAdd, Instant snapshotTime) throws IOE for (String file : filesToAdd) { AbstractBackupPath path = pathProvider.get(); path.parseRemote(file); - if (path.getType() == AbstractBackupPath.BackupFileType.SST_V2) { + if (path.getDirectives().getType() + == AbstractBackupPath.UploadDownloadDirectives.BackupFileType.SST_V2) { ColumnfamilyResult.SSTableResult ssTableResult = new ColumnfamilyResult.SSTableResult(); @@ -89,7 +90,7 @@ private FileUploadResult getFileUploadResult(AbstractBackupPath path) { path.getColumnFamily(), path.getLastModified(), path.getLastModified(), - path.getSize()); + path.getDirectives().getSize()); fileUploadResult.setBackupPath(path.getRemotePath()); return fileUploadResult; } diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java index f434204d7..cfa18f6b4 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java @@ -21,6 +21,7 @@ import com.google.inject.Injector; import com.google.inject.Provider; import com.netflix.priam.backup.AbstractBackupPath; +import com.netflix.priam.backup.AbstractBackupPath.UploadDownloadDirectives.BackupFileType; import com.netflix.priam.backup.BRTestModule; import com.netflix.priam.backup.BackupRestoreException; import com.netflix.priam.backup.FakeBackupFileSystem; @@ -86,7 +87,7 @@ public void testIsMetaFileValid() throws Exception { Instant snapshotInstant = DateUtil.getInstant(); Path metaPath = backupUtils.createMeta(getRemoteFakeFiles(), snapshotInstant); AbstractBackupPath abstractBackupPath = abstractBackupPathProvider.get(); - abstractBackupPath.parseLocal(metaPath.toFile(), AbstractBackupPath.BackupFileType.META_V2); + abstractBackupPath.parseLocal(metaPath.toFile(), BackupFileType.META_V2); Assert.assertTrue(metaProxy.isMetaFileValid(abstractBackupPath).valid); FileUtils.deleteQuietly(metaPath.toFile()); @@ -95,7 +96,7 @@ public void testIsMetaFileValid() throws Exception { fileToAdd.add( Paths.get( getPrefix(), - AbstractBackupPath.BackupFileType.SST_V2.toString(), + BackupFileType.SST_V2.toString(), "1859817645000", "keyspace1", "columnfamily1", @@ -165,7 +166,7 @@ private List getRemoteFakeFiles() { files.add( Paths.get( getPrefix(), - AbstractBackupPath.BackupFileType.SST_V2.toString(), + BackupFileType.SST_V2.toString(), "1859817645000", "keyspace1", "columnfamily1", @@ -175,7 +176,7 @@ private List getRemoteFakeFiles() { files.add( Paths.get( getPrefix(), - AbstractBackupPath.BackupFileType.SST_V2.toString(), + BackupFileType.SST_V2.toString(), "1859818845000", "keyspace1", "columnfamily1", @@ -186,7 +187,7 @@ private List getRemoteFakeFiles() { files.add( Paths.get( getPrefix(), - AbstractBackupPath.BackupFileType.META_V2.toString(), + BackupFileType.META_V2.toString(), "1859824860000", "SNAPPY", "PLAINTEXT", @@ -194,7 +195,7 @@ private List getRemoteFakeFiles() { files.add( Paths.get( getPrefix(), - AbstractBackupPath.BackupFileType.SST_V2.toString(), + BackupFileType.SST_V2.toString(), "1859826045000", "keyspace1", "columnfamily1", @@ -204,7 +205,7 @@ private List getRemoteFakeFiles() { files.add( Paths.get( getPrefix(), - AbstractBackupPath.BackupFileType.SST_V2.toString(), + BackupFileType.SST_V2.toString(), "1859828410000", "keyspace1", "columnfamily1", @@ -214,7 +215,7 @@ private List getRemoteFakeFiles() { files.add( Paths.get( getPrefix(), - AbstractBackupPath.BackupFileType.SST_V2.toString(), + BackupFileType.SST_V2.toString(), "1859828420000", "keyspace1", "columnfamily1", @@ -224,7 +225,7 @@ private List getRemoteFakeFiles() { files.add( Paths.get( getPrefix(), - AbstractBackupPath.BackupFileType.META_V2.toString(), + BackupFileType.META_V2.toString(), "1859828460000", "SNAPPY", "PLAINTEXT", diff --git a/priam/src/test/java/com/netflix/priam/backup/TestCompression.java b/priam/src/test/java/com/netflix/priam/compress/TestCompression.java similarity index 81% rename from priam/src/test/java/com/netflix/priam/backup/TestCompression.java rename to priam/src/test/java/com/netflix/priam/compress/TestCompression.java index 63fdb0dbd..091a91f4f 100644 --- a/priam/src/test/java/com/netflix/priam/backup/TestCompression.java +++ b/priam/src/test/java/com/netflix/priam/compress/TestCompression.java @@ -15,13 +15,11 @@ * */ -package com.netflix.priam.backup; +package com.netflix.priam.compress; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import com.netflix.priam.compress.ICompression; -import com.netflix.priam.compress.SnappyCompression; import com.netflix.priam.utils.SystemUtils; import java.io.*; import java.util.Enumeration; @@ -107,18 +105,22 @@ public void zipTest() throws IOException { @Test public void snappyTest() throws IOException { - ICompression compress = new SnappyCompression(); - testCompressor(compress); + testCompressor(ICompression.CompressionAlgorithm.SNAPPY); + testCompressor(ICompression.DEFAULT_COMPRESSION); } - private void testCompressor(ICompression compress) throws IOException { + private void testCompressor(ICompression.CompressionAlgorithm compressionAlgorithm) + throws IOException { File compressedOutputFile = new File("/tmp/test1.compress"); File decompressedTempOutput = new File("/tmp/compress-test-out.txt"); long chunkSize = 5L * 1024 * 1024; try { Iterator it = - compress.compress(new FileInputStream(randomContentFile), chunkSize); + new ChunkedStream( + compressionAlgorithm, + new FileInputStream(randomContentFile), + chunkSize); try (FileOutputStream ostream = new FileOutputStream(compressedOutputFile)) { while (it.hasNext()) { byte[] chunk = it.next(); @@ -127,9 +129,14 @@ private void testCompressor(ICompression compress) throws IOException { ostream.flush(); } - assertTrue(randomContentFile.length() > compressedOutputFile.length()); + assertTrue(randomContentFile.length() >= compressedOutputFile.length()); + System.out.println( + String.format( + "Compressed size: %d, Original size: %d", + compressedOutputFile.length(), randomContentFile.length())); - compress.decompressAndClose( + Decompressor.decompress( + compressionAlgorithm, new FileInputStream(compressedOutputFile), new FileOutputStream(decompressedTempOutput)); String md1 = SystemUtils.md5(randomContentFile); @@ -140,4 +147,14 @@ private void testCompressor(ICompression compress) throws IOException { FileUtils.deleteQuietly(decompressedTempOutput); } } + + @Test + public void testlz4() throws Exception { + testCompressor(ICompression.CompressionAlgorithm.LZ4); + } + + @Test + public void noCompression() throws Exception { + testCompressor(ICompression.CompressionAlgorithm.NONE); + } } diff --git a/settings.gradle b/settings.gradle index dfe759d98..a81e21259 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,2 +1,2 @@ rootProject.name = 'Priam' -include 'priam','priam-web','priam-cass-extensions','priam-dse-extensions' +include 'priam','priam-web','priam-cass-extensions'