diff --git a/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java b/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java index ef8a6234c..061a0caa6 100644 --- a/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java +++ b/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java @@ -26,7 +26,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Instant; -import java.util.Date; import java.util.List; import javax.inject.Inject; @@ -149,14 +148,6 @@ public void parsePartialPrefix(String remoteFilePath) { token = remotePath.getName(3).toString(); } - @Override - public String remotePrefix(Date start, Date end, String location) { - return PATH_JOINER.join( - clusterPrefix(location), - instanceIdentity.getInstance().getToken(), - match(start, end)); - } - @Override public Path remoteV2Prefix(Path location, BackupFileType fileType) { if (location.getNameCount() <= 1) { diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java index 8024b124d..78e5edb45 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java @@ -21,13 +21,7 @@ import com.netflix.priam.utils.SystemUtils; import java.io.File; import java.io.FileFilter; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashSet; import java.util.Optional; -import java.util.Set; import javax.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,40 +89,6 @@ protected String getKeyspace(File backupDir) { */ protected abstract void processColumnFamily(File backupDir) throws Exception; - /** - * Get all the backup directories for Cassandra. - * - * @param config to get the location of the data folder. - * @param monitoringFolder folder where cassandra backup's are configured. - * @return Set of the path(s) containing the backup folder for each columnfamily. - * @throws Exception incase of IOException. - */ - public static Set getBackupDirectories(IConfiguration config, String monitoringFolder) - throws Exception { - HashSet backupPaths = new HashSet<>(); - if (config.getDataFileLocation() == null) return backupPaths; - Path dataPath = Paths.get(config.getDataFileLocation()); - if (Files.exists(dataPath) && Files.isDirectory(dataPath)) - try (DirectoryStream directoryStream = - Files.newDirectoryStream(dataPath, path -> Files.isDirectory(path))) { - for (Path keyspaceDirPath : directoryStream) { - try (DirectoryStream keyspaceStream = - Files.newDirectoryStream( - keyspaceDirPath, path -> Files.isDirectory(path))) { - for (Path columnfamilyDirPath : keyspaceStream) { - Path backupDirPath = - Paths.get(columnfamilyDirPath.toString(), monitoringFolder); - if (Files.exists(backupDirPath) && Files.isDirectory(backupDirPath)) { - logger.debug("Backup folder: {}", backupDirPath); - backupPaths.add(backupDirPath); - } - } - } - } - } - return backupPaths; - } - protected static File[] getSecondaryIndexDirectories(File backupDir) { FileFilter filter = (file) -> file.getName().startsWith(".") && isAReadableDirectory(file); return Optional.ofNullable(backupDir.listFiles(filter)).orElse(new File[] {}); diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java index 7e61ccada..579bced47 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java @@ -47,7 +47,6 @@ public abstract class AbstractBackupPath implements Comparable getMostRecentSnapshotPaths( return snapshotPaths; } - public static List getIncrementalPaths( - AbstractBackupPath latestValidMetaFile, - DateUtil.DateRange dateRange, - IMetaProxy metaProxy) { - Instant snapshotTime; - snapshotTime = latestValidMetaFile.getLastModified(); - DateUtil.DateRange incrementalDateRange = - new DateUtil.DateRange(snapshotTime, dateRange.getEndTime()); - List incrementalPaths = new ArrayList<>(); - metaProxy.getIncrementals(incrementalDateRange).forEachRemaining(incrementalPaths::add); - return incrementalPaths; - } - public static Map> getFilter(String inputFilter) throws IllegalArgumentException { if (StringUtils.isEmpty(inputFilter)) return null; @@ -144,4 +135,35 @@ public final boolean isFiltered(String keyspace, String columnFamilyDir) { || includeFilter.get(keyspace).contains(columnFamilyName))); return false; } + + /** + * Get all the backup directories for Cassandra. + * + * @param dataDirectory the location of the data folder. + * @param monitoringFolder folder where cassandra backup's are configured. + * @return Set of the path(s) containing the backup folder for each columnfamily. + * @throws IOException + */ + public static ImmutableSet getBackupDirectories( + String dataDirectory, String monitoringFolder) throws IOException { + ImmutableSet.Builder backupPaths = ImmutableSet.builder(); + Path dataPath = Paths.get(dataDirectory); + if (Files.exists(dataPath) && Files.isDirectory(dataPath)) + try (DirectoryStream directoryStream = + Files.newDirectoryStream(dataPath, Files::isDirectory)) { + for (Path keyspaceDirPath : directoryStream) { + try (DirectoryStream keyspaceStream = + Files.newDirectoryStream(keyspaceDirPath, Files::isDirectory)) { + for (Path columnfamilyDirPath : keyspaceStream) { + Path backupDirPath = + Paths.get(columnfamilyDirPath.toString(), monitoringFolder); + if (Files.exists(backupDirPath) && Files.isDirectory(backupDirPath)) { + backupPaths.add(backupDirPath); + } + } + } + } + } + return backupPaths.build(); + } } diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupStatusMgr.java b/priam/src/main/java/com/netflix/priam/backup/BackupStatusMgr.java index 891a8a727..01f95c710 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupStatusMgr.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupStatusMgr.java @@ -209,18 +209,7 @@ public List getLatestBackupMetadata(DateUtil.DateRange dateRange .stream() .filter(Objects::nonNull) .filter(backupMetadata -> backupMetadata.getStatus() == Status.FINISHED) - .filter( - backupMetadata -> - backupMetadata - .getStart() - .toInstant() - .compareTo(dateRange.getStartTime()) - >= 0 - && backupMetadata - .getStart() - .toInstant() - .compareTo(dateRange.getEndTime()) - <= 0) + .filter(backupMetadata -> dateRange.contains(backupMetadata.getStart().toInstant())) .sorted(Comparator.comparing(BackupMetadata::getStart).reversed()) .collect(Collectors.toList()); } diff --git a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java index c757db36e..37afc9376 100644 --- a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java @@ -77,7 +77,8 @@ public static TaskTimer getTimer( private static void cleanOldBackups(IConfiguration configuration) throws Exception { Set backupPaths = - AbstractBackup.getBackupDirectories(configuration, INCREMENTAL_BACKUP_FOLDER); + BackupRestoreUtil.getBackupDirectories( + configuration.getDataFileLocation(), INCREMENTAL_BACKUP_FOLDER); for (Path backupDirPath : backupPaths) { FileUtils.cleanDirectory(backupDirPath.toFile()); } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java b/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java index 28282e789..8479120ba 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java @@ -96,9 +96,7 @@ public BackupTTLTask( @Override public void execute() throws Exception { - if (instanceState.getRestoreStatus() != null - && instanceState.getRestoreStatus().getStatus() != null - && instanceState.getRestoreStatus().getStatus() == Status.STARTED) { + if (instanceState.isRestoring()) { logger.info("Not executing the TTL Task for backups as Priam is in restore mode."); return; } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/BackupVerificationTask.java b/priam/src/main/java/com/netflix/priam/backupv2/BackupVerificationTask.java index 56df6ec43..92e5b0f72 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/BackupVerificationTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/BackupVerificationTask.java @@ -74,9 +74,7 @@ public void execute() throws Exception { return; } - if (instanceState.getRestoreStatus() != null - && instanceState.getRestoreStatus().getStatus() != null - && instanceState.getRestoreStatus().getStatus() == Status.STARTED) { + if (instanceState.isRestoring()) { logger.info("Skipping backup verification. Priam is in restore mode."); return; } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java b/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java index 2f79852b2..d16b3aa73 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java @@ -17,12 +17,12 @@ package com.netflix.priam.backupv2; +import com.google.common.collect.ImmutableList; import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.BackupRestoreException; import com.netflix.priam.backup.BackupVerificationResult; import com.netflix.priam.utils.DateUtil; import java.nio.file.Path; -import java.util.Iterator; import java.util.List; /** Proxy to do management tasks for meta files. Created by aagrawal on 12/18/18. */ @@ -78,7 +78,7 @@ public interface IMetaProxy { * @param dateRange the time period to scan in the remote file system for incremental files. * @return iterator containing the list of path on the remote file system satisfying criteria. */ - Iterator getIncrementals(DateUtil.DateRange dateRange); + ImmutableList getIncrementals(DateUtil.DateRange dateRange); /** * Validate that all the files mentioned in the meta file actually exists on remote file system. diff --git a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java index 6e5850c08..1073681c0 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java @@ -17,6 +17,7 @@ package com.netflix.priam.backupv2; +import com.google.common.collect.ImmutableList; import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.BackupRestoreException; import com.netflix.priam.backup.BackupVerificationResult; @@ -31,8 +32,6 @@ import java.util.*; import javax.inject.Inject; import javax.inject.Provider; -import org.apache.commons.collections4.iterators.FilterIterator; -import org.apache.commons.collections4.iterators.TransformIterator; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.io.filefilter.IOFileFilter; @@ -84,40 +83,34 @@ private String getMatch( } @Override - public Iterator getIncrementals(DateUtil.DateRange dateRange) { - String incrementalPrefix = getMatch(dateRange, AbstractBackupPath.BackupFileType.SST_V2); - String marker = - getMatch( - new DateUtil.DateRange(dateRange.getStartTime(), null), - AbstractBackupPath.BackupFileType.SST_V2); + public ImmutableList getIncrementals(DateUtil.DateRange dateRange) { + return new ImmutableList.Builder() + .addAll(getIncrementals(dateRange, AbstractBackupPath.BackupFileType.SST_V2)) + .addAll( + getIncrementals( + dateRange, AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2)) + .build(); + } + + private ImmutableList getIncrementals( + DateUtil.DateRange dateRange, AbstractBackupPath.BackupFileType type) { + String incrementalPrefix = getMatch(dateRange, type); + String marker = getMatch(new DateUtil.DateRange(dateRange.getStartTime(), null), type); logger.info( "Listing filesystem with prefix: {}, marker: {}, daterange: {}", incrementalPrefix, marker, dateRange); Iterator iterator = fs.listFileSystem(incrementalPrefix, null, marker); - Iterator transformIterator = - new TransformIterator<>( - iterator, - s -> { - AbstractBackupPath path = abstractBackupPathProvider.get(); - path.parseRemote(s); - return path; - }); - - return new FilterIterator<>( - transformIterator, - abstractBackupPath -> - (abstractBackupPath.getLastModified().isAfter(dateRange.getStartTime()) - && abstractBackupPath - .getLastModified() - .isBefore(dateRange.getEndTime())) - || abstractBackupPath - .getLastModified() - .equals(dateRange.getStartTime()) - || abstractBackupPath - .getLastModified() - .equals(dateRange.getEndTime())); + ImmutableList.Builder results = ImmutableList.builder(); + while (iterator.hasNext()) { + AbstractBackupPath path = abstractBackupPathProvider.get(); + path.parseRemote(iterator.next()); + if (dateRange.contains(path.getLastModified())) { + results.add(path); + } + } + return results.build(); } @Override @@ -136,10 +129,7 @@ public List findMetaFiles(DateUtil.DateRange dateRange) { AbstractBackupPath abstractBackupPath = abstractBackupPathProvider.get(); abstractBackupPath.parseRemote(iterator.next()); logger.debug("Meta file found: {}", abstractBackupPath); - if (abstractBackupPath.getLastModified().toEpochMilli() - >= dateRange.getStartTime().toEpochMilli() - && abstractBackupPath.getLastModified().toEpochMilli() - <= dateRange.getEndTime().toEpochMilli()) { + if (dateRange.contains(abstractBackupPath.getLastModified())) { metas.add(abstractBackupPath); } } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java index bc05c1874..639c85b86 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java @@ -146,7 +146,9 @@ public static TaskTimer getTimer(IBackupRestoreConfig config) throws IllegalArgu static void cleanOldBackups(IConfiguration config) throws Exception { // Clean up all the backup directories, if any. - Set backupPaths = AbstractBackup.getBackupDirectories(config, SNAPSHOT_FOLDER); + Set backupPaths = + BackupRestoreUtil.getBackupDirectories( + config.getDataFileLocation(), SNAPSHOT_FOLDER); for (Path backupDirPath : backupPaths) try (DirectoryStream directoryStream = Files.newDirectoryStream(backupDirPath, Files::isDirectory)) { diff --git a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java index 6fc656caa..c9f17eb06 100644 --- a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java +++ b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java @@ -125,11 +125,6 @@ default int getBackupRetentionDays() { return 0; } - /** @return Get list of racs to backup. Backup all racs if empty */ - default List getBackupRacs() { - return Collections.EMPTY_LIST; - } - /** * Backup location i.e. remote file system to upload backups. e.g. for S3 it will be s3 bucket * name @@ -403,6 +398,10 @@ default String getRestoreSnapshot() { return StringUtils.EMPTY; } + default String getRestoreDataLocation() { + return getCassandraBaseDirectory() + "/restore"; + } + /** @return Get the region to connect to SDB for instance identity */ default String getSDBInstanceIdentityRegion() { return "us-east-1"; diff --git a/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java b/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java index 15fb15c69..cfd3eae39 100644 --- a/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java +++ b/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java @@ -92,11 +92,6 @@ public int getBackupRetentionDays() { return config.get(PRIAM_PRE + ".backup.retention", 0); } - @Override - public List getBackupRacs() { - return config.getList(PRIAM_PRE + ".backup.racs"); - } - @Override public String getRestorePrefix() { return config.get(PRIAM_PRE + ".restore.prefix"); @@ -283,6 +278,11 @@ public String getRestoreSnapshot() { return config.get(PRIAM_PRE + ".restore.snapshot", ""); } + @Override + public String getRestoreDataLocation() { + return config.get(PRIAM_PRE + ".restore.data.location"); + } + @Override public boolean isRestoreEncrypted() { return config.get(PRIAM_PRE + ".encrypted.restore.enabled", false); diff --git a/priam/src/main/java/com/netflix/priam/connection/CassandraOperations.java b/priam/src/main/java/com/netflix/priam/connection/CassandraOperations.java index 299610e27..dbdb02653 100644 --- a/priam/src/main/java/com/netflix/priam/connection/CassandraOperations.java +++ b/priam/src/main/java/com/netflix/priam/connection/CassandraOperations.java @@ -16,8 +16,17 @@ */ package com.netflix.priam.connection; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.netflix.priam.backup.BackupRestoreUtil; import com.netflix.priam.config.IConfiguration; +import com.netflix.priam.health.CassandraMonitor; import com.netflix.priam.utils.RetryableCallable; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.*; import javax.inject.Inject; import org.apache.cassandra.db.ColumnFamilyStoreMBean; @@ -202,4 +211,56 @@ public List> gossipInfo() throws Exception { } return returnPublicIpSourceIpMap; } + + @Override + public List importAll(String srcDir) throws IOException { + List failedImports = new ArrayList<>(); + if (CassandraMonitor.hasCassadraStarted()) { + for (Path tableDir : BackupRestoreUtil.getBackupDirectories(srcDir, "")) { + String keyspace = tableDir.getParent().getFileName().toString(); + String table = tableDir.getFileName().toString().split("-")[0]; + failedImports.addAll(importData(keyspace, table, tableDir.toString())); + } + } else { + recursiveMove(Paths.get(srcDir), Paths.get(configuration.getDataFileLocation())); + } + return failedImports; + } + + private List importData(String keyspace, String table, String source) + throws IOException { + try (JMXNodeTool nodeTool = JMXNodeTool.instance(configuration)) { + return nodeTool.importNewSSTables( + keyspace, + table, + ImmutableSet.of(source), + false /* resetLevel */, + false /* clearRepaired */, + true /* verifySSTables */, + true /* verifyTokens */, + true /* invalidateCaches */, + false /* extendedVerify */, + false /* copyData */); + } + } + + private void recursiveMove(Path source, Path destination) throws IOException { + Preconditions.checkState(Files.exists(source)); + if (!Files.exists(destination)) { + if (!destination.toFile().mkdirs()) { + throw new IOException("Failed creating " + destination); + } + } + try (DirectoryStream directoryStream = Files.newDirectoryStream(source)) { + for (Path path : directoryStream) { + if (Files.isRegularFile(path)) { + Files.move(path, destination.resolve(path.getFileName())); + } else if (Files.isDirectory(path)) { + recursiveMove(path, destination.resolve(path.getFileName())); + } else { + throw new IOException("Failed determining type of inode is " + path); + } + } + } + } } diff --git a/priam/src/main/java/com/netflix/priam/connection/ICassandraOperations.java b/priam/src/main/java/com/netflix/priam/connection/ICassandraOperations.java index ae0c97201..481198e1d 100644 --- a/priam/src/main/java/com/netflix/priam/connection/ICassandraOperations.java +++ b/priam/src/main/java/com/netflix/priam/connection/ICassandraOperations.java @@ -61,4 +61,10 @@ public interface ICassandraOperations { void forceKeyspaceFlush(String keyspaceName) throws Exception; List> gossipInfo() throws Exception; + + /** + * import sstables from the directory at srcDir into the configured data directory. importAll + * Will just move them if Cassandra hasn't started. + */ + List importAll(String srcDir) throws Exception; } diff --git a/priam/src/main/java/com/netflix/priam/health/CassandraMonitor.java b/priam/src/main/java/com/netflix/priam/health/CassandraMonitor.java index 4f580e8af..1955e4aa5 100644 --- a/priam/src/main/java/com/netflix/priam/health/CassandraMonitor.java +++ b/priam/src/main/java/com/netflix/priam/health/CassandraMonitor.java @@ -169,8 +169,8 @@ public static Boolean hasCassadraStarted() { } // Added for testing only - public static void setIsCassadraStarted() { + public static void setIsCassandraStarted(boolean newStartedState) { // Setting cassandra flag to true - isCassandraStarted.set(true); + isCassandraStarted.set(newStartedState); } } diff --git a/priam/src/main/java/com/netflix/priam/health/InstanceState.java b/priam/src/main/java/com/netflix/priam/health/InstanceState.java index 6336e1ca6..1d7716ea5 100644 --- a/priam/src/main/java/com/netflix/priam/health/InstanceState.java +++ b/priam/src/main/java/com/netflix/priam/health/InstanceState.java @@ -18,8 +18,11 @@ import com.netflix.priam.backup.BackupMetadata; import com.netflix.priam.backup.Status; +import com.netflix.priam.utils.DateUtil; import com.netflix.priam.utils.GsonJsonSerializer; import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; @@ -143,12 +146,31 @@ public boolean isHealthy() { return isHealthy.get(); } - private boolean isRestoring() { + public boolean isRestoring() { return restoreStatus != null && restoreStatus.getStatus() != null && restoreStatus.getStatus() == Status.STARTED; } + public void startRestore(DateUtil.DateRange dateRange) { + restoreStatus.resetStatus(); + restoreStatus.setStartDateRange( + LocalDateTime.ofInstant(dateRange.getStartTime(), ZoneId.of("UTC"))); + Date endTime = new Date(dateRange.getEndTime().toEpochMilli()); + restoreStatus.setEndDateRange(DateUtil.convert(endTime)); + restoreStatus.setExecutionStartTime(LocalDateTime.now()); + setRestoreStatus(Status.STARTED); + } + + public void endRestore(Status status, LocalDateTime endTime) { + restoreStatus.setExecutionEndTime(endTime); + setRestoreStatus(status); + } + + public void setRestoreMetaFile(String path) { + restoreStatus.setSnapshotMetaFile(path); + } + private void setHealthy() { this.isHealthy.set( isRestoring() diff --git a/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java b/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java index 9ac66639e..aef6bacaf 100644 --- a/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java +++ b/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java @@ -153,25 +153,19 @@ public Response validateV2SnapshotByDate( @Path("/list/{daterange}") public Response list(@PathParam("daterange") String daterange) throws Exception { DateUtil.DateRange dateRange = new DateUtil.DateRange(daterange); - // Find latest valid meta file. - Optional latestValidMetaFile = + Optional metaFile = BackupRestoreUtil.getLatestValidMetaPath(metaProxy, dateRange); - if (!latestValidMetaFile.isPresent()) { + if (!metaFile.isPresent()) { return Response.ok("No valid meta found!").build(); } - List allFiles = + List files = BackupRestoreUtil.getMostRecentSnapshotPaths( - latestValidMetaFile.get(), metaProxy, pathProvider); - allFiles.addAll( - BackupRestoreUtil.getIncrementalPaths( - latestValidMetaFile.get(), dateRange, metaProxy)); - - return Response.ok( - GsonJsonSerializer.getGson() - .toJson( - allFiles.stream() - .map(AbstractBackupPath::getRemotePath) - .collect(Collectors.toList()))) - .build(); + metaFile.get(), metaProxy, pathProvider); + DateUtil.DateRange incrementalDateRange = + new DateRange(metaFile.get().getLastModified(), dateRange.getEndTime()); + files.addAll(metaProxy.getIncrementals(incrementalDateRange)); + List remotePaths = + files.stream().map(AbstractBackupPath::getRemotePath).collect(Collectors.toList()); + return Response.ok(GsonJsonSerializer.getGson().toJson(remotePaths)).build(); } } diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index 11a133adf..6b6d067f9 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -22,42 +22,35 @@ import com.netflix.priam.backup.Status; import com.netflix.priam.backupv2.IMetaProxy; import com.netflix.priam.config.IConfiguration; +import com.netflix.priam.connection.ICassandraOperations; import com.netflix.priam.defaultimpl.ICassandraProcess; +import com.netflix.priam.health.CassandraMonitor; import com.netflix.priam.health.InstanceState; import com.netflix.priam.identity.InstanceIdentity; -import com.netflix.priam.identity.config.InstanceInfo; import com.netflix.priam.scheduler.Task; import com.netflix.priam.utils.DateUtil; import com.netflix.priam.utils.RetryableCallable; import com.netflix.priam.utils.Sleeper; -import java.io.File; -import java.io.IOException; import java.math.BigInteger; -import java.nio.file.Path; +import java.nio.file.*; import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.*; import java.util.concurrent.Future; import javax.inject.Inject; import javax.inject.Named; import javax.inject.Provider; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A means to perform a restore. This class contains the following characteristics: - It is agnostic - * to the source type of the restore, this is determine by the injected IBackupFileSystem. - This + * to the source type of the restore, this is determined by the injected IBackupFileSystem. - This * class can be scheduled, i.e. it is a "Task". - When this class is executed, it uses its own * thread pool to execute the restores. */ public abstract class AbstractRestore extends Task implements IRestoreStrategy { private static final Logger logger = LoggerFactory.getLogger(AbstractRestore.class); - private static final String JOBNAME = "AbstractRestore"; - private static final String SYSTEM_KEYSPACE = "system"; - private static BigInteger restoreToken; final IBackupFileSystem fs; final Sleeper sleeper; private final BackupRestoreUtil backupRestoreUtil; @@ -67,6 +60,7 @@ public abstract class AbstractRestore extends Task implements IRestoreStrategy { private final ICassandraProcess cassProcess; private final InstanceState instanceState; private final IPostRestoreHook postRestoreHook; + private final ICassandraOperations cassOps; @Inject @Named("v2") @@ -75,14 +69,14 @@ public abstract class AbstractRestore extends Task implements IRestoreStrategy { public AbstractRestore( IConfiguration config, IBackupFileSystem fs, - String name, Sleeper sleeper, Provider pathProvider, InstanceIdentity instanceIdentity, RestoreTokenSelector tokenSelector, ICassandraProcess cassProcess, InstanceState instanceState, - IPostRestoreHook postRestoreHook) { + IPostRestoreHook postRestoreHook, + ICassandraOperations cassOps) { super(config); this.fs = fs; this.sleeper = sleeper; @@ -95,44 +89,22 @@ public AbstractRestore( new BackupRestoreUtil( config.getRestoreIncludeCFList(), config.getRestoreExcludeCFList()); this.postRestoreHook = postRestoreHook; + this.cassOps = cassOps; } - public static final boolean isRestoreEnabled(IConfiguration conf, InstanceInfo instanceInfo) { - boolean isRestoreMode = StringUtils.isNotBlank(conf.getRestoreSnapshot()); - boolean isBackedupRac = - (CollectionUtils.isEmpty(conf.getBackupRacs()) - || conf.getBackupRacs().contains(instanceInfo.getRac())); - return (isRestoreMode && isBackedupRac); + public static boolean isRestoreEnabled(IConfiguration conf) { + return StringUtils.isNotBlank(conf.getRestoreSnapshot()); } - private List> download( - Iterator fsIterator, boolean waitForCompletion) throws Exception { + private List> download(Iterator fsIterator) throws Exception { List> futureList = new ArrayList<>(); while (fsIterator.hasNext()) { AbstractBackupPath temp = fsIterator.next(); - if (backupRestoreUtil.isFiltered( - temp.getKeyspace(), temp.getColumnFamily())) { // is filtered? - logger.info( - "Bypassing restoring file \"{}\" as it is part of the keyspace.columnfamily filter list. Its keyspace:cf is: {}:{}", - temp.newRestoreFile(), - temp.getKeyspace(), - temp.getColumnFamily()); + if (backupRestoreUtil.isFiltered(temp.getKeyspace(), temp.getColumnFamily())) { continue; } - - File localFileHandler = temp.newRestoreFile(); - if (logger.isDebugEnabled()) - logger.debug( - "Created local file name: " - + localFileHandler.getAbsolutePath() - + File.pathSeparator - + localFileHandler.getName()); futureList.add(downloadFile(temp)); } - - // Wait for all download to finish that were started from this method. - if (waitForCompletion) waitForCompletion(futureList); - return futureList; } @@ -140,116 +112,71 @@ private void waitForCompletion(List> futureList) throws Exception { for (Future future : futureList) future.get(); } - private void stopCassProcess() throws IOException { - cassProcess.stop(true); - } - @Override public void execute() throws Exception { - if (!isRestoreEnabled(config, instanceIdentity.getInstanceInfo())) return; + if (!isRestoreEnabled(config)) return; logger.info("Starting restore for {}", config.getRestoreSnapshot()); final DateUtil.DateRange dateRange = new DateUtil.DateRange(config.getRestoreSnapshot()); new RetryableCallable() { public Void retriableCall() throws Exception { - logger.info("Attempting restore"); restore(dateRange); - logger.info("Restore completed"); - // Wait for other server init to complete sleeper.sleep(30000); return null; } }.call(); + logger.info("Restore complete."); } public void restore(DateUtil.DateRange dateRange) throws Exception { - // fail early if post restore hook has invalid parameters if (!postRestoreHook.hasValidParameters()) { throw new PostRestoreHookException("Invalid PostRestoreHook parameters"); } - - Date endTime = new Date(dateRange.getEndTime().toEpochMilli()); IMetaProxy metaProxy = metaV2Proxy; - - // Set the restore status. - instanceState.getRestoreStatus().resetStatus(); - instanceState - .getRestoreStatus() - .setStartDateRange( - LocalDateTime.ofInstant(dateRange.getStartTime(), ZoneId.of("UTC"))); - instanceState.getRestoreStatus().setEndDateRange(DateUtil.convert(endTime)); - instanceState.getRestoreStatus().setExecutionStartTime(LocalDateTime.now()); - instanceState.setRestoreStatus(Status.STARTED); + instanceState.startRestore(dateRange); String origToken = instanceIdentity.getInstance().getToken(); - try { if (config.isRestoreClosestToken()) { - restoreToken = + BigInteger restoreToken = tokenSelector.getClosestToken( new BigInteger(origToken), new Date(dateRange.getStartTime().toEpochMilli())); instanceIdentity.getInstance().setToken(restoreToken.toString()); } - - // Stop cassandra if its running - stopCassProcess(); - - // Cleanup local data - File dataDir = new File(config.getDataFileLocation()); - if (dataDir.exists() && dataDir.isDirectory()) FileUtils.cleanDirectory(dataDir); - - // Find latest valid meta file. Optional latestValidMetaFile = BackupRestoreUtil.getLatestValidMetaPath(metaProxy, dateRange); - if (!latestValidMetaFile.isPresent()) { - logger.info("No valid snapshot meta file found, Restore Failed."); - instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); - instanceState.setRestoreStatus(Status.FAILED); + instanceState.endRestore(Status.FAILED, LocalDateTime.now()); return; } - - logger.info( - "Snapshot Meta file for restore {}", latestValidMetaFile.get().getRemotePath()); - instanceState - .getRestoreStatus() - .setSnapshotMetaFile(latestValidMetaFile.get().getRemotePath()); - + logger.info("Meta file for restore {}", latestValidMetaFile.get().getRemotePath()); + instanceState.setRestoreMetaFile(latestValidMetaFile.get().getRemotePath()); List allFiles = BackupRestoreUtil.getMostRecentSnapshotPaths( latestValidMetaFile.get(), metaProxy, pathProvider); if (!config.skipIncrementalRestore()) { - allFiles.addAll( - BackupRestoreUtil.getIncrementalPaths( - latestValidMetaFile.get(), dateRange, metaProxy)); + DateUtil.DateRange incrementalDateRange = + new DateUtil.DateRange( + latestValidMetaFile.get().getLastModified(), + dateRange.getEndTime()); + allFiles.addAll(metaProxy.getIncrementals(incrementalDateRange)); } - - // Download snapshot which is listed in the meta file. - List> futureList = new ArrayList<>(); - futureList.addAll(download(allFiles.iterator(), false)); - - // Wait for all the futures to finish. + List> futureList = new ArrayList<>(download(allFiles.iterator())); waitForCompletion(futureList); - - // Given that files are restored now, kick off post restore hook - logger.info("Starting post restore hook"); - postRestoreHook.execute(); - logger.info("Completed executing post restore hook"); - - // Declare restore as finished. - instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); - instanceState.setRestoreStatus(Status.FINISHED); - - // Start cassandra if restore is successful. - if (!config.doesCassandraStartManually()) cassProcess.start(true); - else - logger.info( - "config.doesCassandraStartManually() is set to True, hence Cassandra needs to be started manually ..."); + List failedImports = cassOps.importAll(config.getRestoreDataLocation()); + if (!failedImports.isEmpty()) { + instanceState.endRestore(Status.FAILED, LocalDateTime.now()); + } else { + postRestoreHook.execute(); + if (!config.doesCassandraStartManually() + && !CassandraMonitor.hasCassadraStarted()) { + cassProcess.start(true); + } + instanceState.endRestore(Status.FINISHED, LocalDateTime.now()); + } } catch (Exception e) { - instanceState.setRestoreStatus(Status.FAILED); - instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); - logger.error("Error while trying to restore: {}", e.getMessage(), e); + instanceState.endRestore(Status.FAILED, LocalDateTime.now()); throw e; } finally { instanceIdentity.getInstance().setToken(origToken); @@ -265,26 +192,4 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { * @throws Exception If there is any error in downloading file from the remote file system. */ protected abstract Future downloadFile(final AbstractBackupPath path) throws Exception; - - final class BoundedList extends LinkedList { - - private final int limit; - - BoundedList(int limit) { - this.limit = limit; - } - - @Override - public boolean add(E o) { - super.add(o); - while (size() > limit) { - super.remove(); - } - return true; - } - } - - public final int getDownloadTasksQueued() { - return fs.getDownloadTasksQueued(); - } } diff --git a/priam/src/main/java/com/netflix/priam/restore/Restore.java b/priam/src/main/java/com/netflix/priam/restore/Restore.java index 6dba06149..dceaabe5c 100644 --- a/priam/src/main/java/com/netflix/priam/restore/Restore.java +++ b/priam/src/main/java/com/netflix/priam/restore/Restore.java @@ -19,6 +19,7 @@ import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.IBackupFileSystem; import com.netflix.priam.config.IConfiguration; +import com.netflix.priam.connection.ICassandraOperations; import com.netflix.priam.defaultimpl.ICassandraProcess; import com.netflix.priam.health.InstanceState; import com.netflix.priam.identity.InstanceIdentity; @@ -30,14 +31,11 @@ import javax.inject.Inject; import javax.inject.Provider; import javax.inject.Singleton; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Main class for restoring data from backup. Backup restored using this way are not encrypted. */ @Singleton public class Restore extends AbstractRestore { public static final String JOBNAME = "AUTO_RESTORE_JOB"; - private static final Logger logger = LoggerFactory.getLogger(Restore.class); @Inject public Restore( @@ -49,18 +47,19 @@ public Restore( InstanceIdentity instanceIdentity, RestoreTokenSelector tokenSelector, InstanceState instanceState, - IPostRestoreHook postRestoreHook) { + IPostRestoreHook postRestoreHook, + ICassandraOperations cassandraOperations) { super( config, fs, - JOBNAME, sleeper, pathProvider, instanceIdentity, tokenSelector, cassProcess, instanceState, - postRestoreHook); + postRestoreHook, + cassandraOperations); } @Override diff --git a/priam/src/main/java/com/netflix/priam/tuner/StandardTuner.java b/priam/src/main/java/com/netflix/priam/tuner/StandardTuner.java index dc4500ac9..777331fc7 100644 --- a/priam/src/main/java/com/netflix/priam/tuner/StandardTuner.java +++ b/priam/src/main/java/com/netflix/priam/tuner/StandardTuner.java @@ -67,7 +67,7 @@ public void writeAllProperties(String yamlLocation, String hostname, String seed map.put("listen_address", hostname); map.put("rpc_address", hostname); // Dont bootstrap in restore mode - if (!Restore.isRestoreEnabled(config, instanceInfo)) { + if (!Restore.isRestoreEnabled(config)) { map.put("auto_bootstrap", config.getAutoBoostrap()); } else { map.put("auto_bootstrap", false); diff --git a/priam/src/main/java/com/netflix/priam/utils/DateUtil.java b/priam/src/main/java/com/netflix/priam/utils/DateUtil.java index ff28cf96f..54c7e264c 100644 --- a/priam/src/main/java/com/netflix/priam/utils/DateUtil.java +++ b/priam/src/main/java/com/netflix/priam/utils/DateUtil.java @@ -215,6 +215,10 @@ public Instant getEndTime() { return endTime; } + public boolean contains(Instant instant) { + return startTime.compareTo(instant) <= 0 && endTime.compareTo(instant) >= 0; + } + public String toString() { return GsonJsonSerializer.getGson().toJson(this); } diff --git a/priam/src/test/groovy/com/netflix/priam/cluser/management/TestCompaction.groovy b/priam/src/test/groovy/com/netflix/priam/cluser/management/TestCompaction.groovy index 90fe7db80..1e073b95a 100644 --- a/priam/src/test/groovy/com/netflix/priam/cluser/management/TestCompaction.groovy +++ b/priam/src/test/groovy/com/netflix/priam/cluser/management/TestCompaction.groovy @@ -150,7 +150,7 @@ class TestCompaction extends Specification { private static int concurrentRuns(int size) { - CassandraMonitor.setIsCassadraStarted() + CassandraMonitor.setIsCassandraStarted(true) ExecutorService threads = Executors.newFixedThreadPool(size) List> torun = new ArrayList<>(size) for (int i = 0; i < size; i++) { diff --git a/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java b/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java index 0e87702ee..939dbc63d 100644 --- a/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java +++ b/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java @@ -28,6 +28,8 @@ import com.netflix.priam.config.FakeConfiguration; import com.netflix.priam.config.IBackupRestoreConfig; import com.netflix.priam.config.IConfiguration; +import com.netflix.priam.connection.CassandraOperations; +import com.netflix.priam.connection.ICassandraOperations; import com.netflix.priam.cred.ICredential; import com.netflix.priam.defaultimpl.FakeCassandraProcess; import com.netflix.priam.defaultimpl.ICassandraProcess; @@ -78,5 +80,6 @@ protected void configure() { bind(IMetaProxy.class).annotatedWith(Names.named("v2")).to(MetaV2Proxy.class); bind(DynamicRateLimiter.class).to(FakeDynamicRateLimiter.class); bind(Clock.class).toInstance(Clock.fixed(Instant.EPOCH, ZoneId.systemDefault())); + bind(ICassandraOperations.class).to(CassandraOperations.class).in(Scopes.SINGLETON); } } diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java index f99bbce84..0eeb7e954 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java @@ -197,10 +197,11 @@ public void testTTLNext() throws Exception { } @Test - public void testRestoreMode(@Mocked InstanceState state) throws Exception { + public void testRestoreMode(@Mocked InstanceState.RestoreStatus restoreStatus) + throws Exception { new Expectations() { { - state.getRestoreStatus().getStatus(); + restoreStatus.getStatus(); result = Status.STARTED; } }; diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java index e017ef295..9a483b04c 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java @@ -22,6 +22,7 @@ import com.google.inject.Injector; import com.netflix.priam.backup.AbstractBackup; import com.netflix.priam.backup.BRTestModule; +import com.netflix.priam.backup.BackupRestoreUtil; import com.netflix.priam.config.IBackupRestoreConfig; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.connection.JMXNodeTool; @@ -116,7 +117,8 @@ public void testBackupDisabled( // snapshot V2 name should not be there. Set backupPaths = - AbstractBackup.getBackupDirectories(configuration, AbstractBackup.SNAPSHOT_FOLDER); + BackupRestoreUtil.getBackupDirectories( + configuration.getDataFileLocation(), AbstractBackup.SNAPSHOT_FOLDER); for (Path backupPath : backupPaths) { Assert.assertFalse(Files.exists(Paths.get(backupPath.toString(), snapshotName))); Assert.assertTrue(Files.exists(Paths.get(backupPath.toString(), snapshotV1Name))); @@ -199,6 +201,8 @@ public void updateService( result = "-1"; configuration.isIncrementalBackupEnabled(); result = true; + configuration.getDataFileLocation(); + result = "target/data"; backupRestoreConfig.enableV2Backups(); result = true; backupRestoreConfig.getBackupVerificationCronExpression(); diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupVerificationTask.java b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupVerificationTask.java index ba867eb50..f93ab2e76 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupVerificationTask.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupVerificationTask.java @@ -151,10 +151,11 @@ public void noBackups() throws Exception { } @Test - public void testRestoreMode(@Mocked InstanceState state) throws Exception { + public void testRestoreMode(@Mocked InstanceState.RestoreStatus restoreStatus) + throws Exception { new Expectations() { { - state.getRestoreStatus().getStatus(); + restoreStatus.getStatus(); result = Status.STARTED; } }; diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java index 417fc7444..2ac6f5454 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java @@ -17,6 +17,8 @@ package com.netflix.priam.backupv2; +import com.google.common.collect.ImmutableList; +import com.google.common.truth.Truth; import com.google.inject.Guice; import com.google.inject.Injector; import com.netflix.priam.backup.AbstractBackupPath; @@ -32,7 +34,6 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; import javax.inject.Provider; @@ -131,13 +132,16 @@ public void testGetSSTFilesFromMeta() throws Exception { @Test public void testGetIncrementalFiles() throws Exception { DateUtil.DateRange dateRange = new DateUtil.DateRange("202812071820,20281229"); - Iterator incrementals = metaProxy.getIncrementals(dateRange); - int i = 0; - while (incrementals.hasNext()) { - System.out.println(incrementals.next()); - i++; - } - Assert.assertEquals(3, i); + ImmutableList paths = metaProxy.getIncrementals(dateRange); + Truth.assertThat(paths).hasSize(4); + } + + @Test + public void testGetIncrementalFilesIncludesSecondaryIndexes() throws Exception { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202812071820,20281229"); + ImmutableList paths = metaProxy.getIncrementals(dateRange); + Truth.assertThat(paths.get(3).getType()) + .isEqualTo(AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2); } @Test @@ -227,6 +231,17 @@ private List getRemoteFakeFiles() { "SNAPPY", "PLAINTEXT", "file4-Data.db")); + files.add( + Paths.get( + getPrefix(), + AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2.toString(), + "1859828420000", + "keyspace1", + "columnfamily1", + "index1", + "SNAPPY", + "PLAINTEXT", + "file5-Data.db")); files.add( Paths.get( getPrefix(), diff --git a/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java b/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java index 4d6f65c8b..36f358e90 100644 --- a/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java +++ b/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java @@ -44,6 +44,7 @@ public class FakeConfiguration implements IConfiguration { private String partitioner; private String diskFailurePolicy; private int blockForPeersTimeoutInSecs; + private String dataFileLocation; public Map fakeProperties = new HashMap<>(); @@ -345,4 +346,16 @@ public FakeConfiguration setBlockForPeersTimeoutInSecs(int timeout) { public int getBlockForPeersTimeoutInSecs() { return this.blockForPeersTimeoutInSecs; } + + public FakeConfiguration setDataFileLocation(String dataFileLocation) { + this.dataFileLocation = dataFileLocation; + return this; + } + + @Override + public String getDataFileLocation() { + return dataFileLocation == null + ? IConfiguration.super.getDataFileLocation() + : dataFileLocation; + } } diff --git a/priam/src/test/java/com/netflix/priam/connection/TestCassandraOperations.java b/priam/src/test/java/com/netflix/priam/connection/TestCassandraOperations.java index 89c9e5e59..2a2b0a6e9 100644 --- a/priam/src/test/java/com/netflix/priam/connection/TestCassandraOperations.java +++ b/priam/src/test/java/com/netflix/priam/connection/TestCassandraOperations.java @@ -17,12 +17,20 @@ package com.netflix.priam.connection; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Files; +import com.google.common.truth.Truth; import com.google.inject.Guice; import com.google.inject.Injector; import com.mchange.io.FileUtils; import com.netflix.priam.backup.BRTestModule; +import com.netflix.priam.config.FakeConfiguration; import com.netflix.priam.config.IConfiguration; +import com.netflix.priam.health.CassandraMonitor; import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.List; import java.util.Map; import mockit.Expectations; @@ -30,17 +38,34 @@ import mockit.MockUp; import mockit.Mocked; import org.apache.cassandra.tools.NodeProbe; -import org.junit.Assert; -import org.junit.Test; +import org.junit.*; +import org.junit.rules.TestName; /** Created by aagrawal on 3/1/19. */ public class TestCassandraOperations { private final String gossipInfo1 = "src/test/resources/gossipInfoSample_1.txt"; @Mocked private NodeProbe nodeProbe; @Mocked private JMXNodeTool jmxNodeTool; + private FakeConfiguration config; private static CassandraOperations cassandraOperations; + private static final String BASE_DIR = "base"; + private static final String RESTORE_DIR = "restore"; + private static final String DATA_DIR = "data"; + private static final String KS = "ks"; + private static final String TB = "tb"; + private static final String SI = "si"; + private static final String DATAFILE = "datafile"; + private static final String SI_DATAFILE = "si_datafile"; - public TestCassandraOperations() { + @Rule public TestName name = new TestName(); + + @BeforeClass + public static void prepBeforeAllTests() { + deleteBaseDirectoryForAllTests(); + } + + @Before + public void prepareTest() throws IOException { new MockUp() { @Mock NodeProbe instance(IConfiguration config) { @@ -48,13 +73,29 @@ NodeProbe instance(IConfiguration config) { } }; Injector injector = Guice.createInjector(new BRTestModule()); - if (cassandraOperations == null) - cassandraOperations = injector.getInstance(CassandraOperations.class); + cassandraOperations = injector.getInstance(CassandraOperations.class); + Paths.get(getRestoreDir(), KS, TB, SI).toFile().mkdirs(); + Files.touch(Paths.get(getRestoreDir(), KS, TB, DATAFILE).toFile()); + Files.touch(Paths.get(getRestoreDir(), KS, TB, SI, SI_DATAFILE).toFile()); + config = (FakeConfiguration) injector.getInstance(IConfiguration.class); + config.setDataFileLocation(getDataDir()); + CassandraMonitor.setIsCassandraStarted(false); + } + + @After + public void cleanup() { + org.apache.commons.io.FileUtils.deleteQuietly(Paths.get(getRestoreDir()).toFile()); + org.apache.commons.io.FileUtils.deleteQuietly(Paths.get(getDataDir()).toFile()); + config.setDataFileLocation(null); + } + + @AfterClass + public static void cleanupAfterAllTests() { + deleteBaseDirectoryForAllTests(); } @Test public void testGossipInfo() throws Exception { - String gossipInfoFromNodetool = FileUtils.getContentsAsString(new File(gossipInfo1)); new Expectations() { { @@ -80,4 +121,87 @@ public void testGossipInfo() throws Exception { Assert.assertEquals("[123,234]", gossipInfo.get("TOKENS")); }); } + + @Test + public void testRestoreViaMove_dataDirectoryIsCreated() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir()).toFile().exists()); + } + + @Test + public void testRestoreViaMove_keyspaceDirectoryIsCreated() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir(), KS).toFile().exists()); + } + + @Test + public void testRestoreViaMove_tableDirectoryIsCreated() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir(), KS, TB).toFile().exists()); + } + + @Test + public void testRestoreViaMove_secondaryIndexDirectoryIsCreated() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir(), KS, TB, SI).toFile().exists()); + } + + @Test + public void testRestoreViaMove_dataFileIsMoved() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir(), KS, TB, DATAFILE).toFile().exists()); + } + + @Test + public void testRestoreViaMove_dataFileIsRemovedFromOrigin() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getRestoreDir(), KS, TB, DATAFILE).toFile().exists()).isFalse(); + } + + @Test + public void testRestoreViaMove_secondaryIndexFileIsMoved() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir(), KS, TB, SI, SI_DATAFILE).toFile().exists()); + } + + @Test + public void testRestoreViaMove_secondaryIndexFileIsRemovedFromOrigin() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getRestoreDir(), KS, TB, SI, SI_DATAFILE).toFile().exists()) + .isFalse(); + } + + @Test + public void testRestoreViaImport() throws IOException { + CassandraMonitor.setIsCassandraStarted(true); + new Expectations() { + { + nodeProbe.importNewSSTables( + KS, + TB, + ImmutableSet.of(Paths.get(getRestoreDir(), KS, TB).toString()), + false /* resetLevel */, + false /* clearRepaired */, + true /* verifySSTables */, + true /* verifyTokens */, + true /* invalidateCaches */, + false /* extendedVerify */, + false /* copyData */); + result = new ArrayList<>(); + } + }; + cassandraOperations.importAll(getRestoreDir()); + } + + private static void deleteBaseDirectoryForAllTests() { + org.apache.commons.io.FileUtils.deleteQuietly(Paths.get(BASE_DIR).toFile()); + } + + private String getRestoreDir() { + return BASE_DIR + "/" + name.getMethodName() + RESTORE_DIR; + } + + private String getDataDir() { + return BASE_DIR + "/" + name.getMethodName() + DATA_DIR; + } } diff --git a/priam/src/test/java/com/netflix/priam/health/TestCassandraMonitor.java b/priam/src/test/java/com/netflix/priam/health/TestCassandraMonitor.java index 4bf3fa61c..dfcf1a142 100644 --- a/priam/src/test/java/com/netflix/priam/health/TestCassandraMonitor.java +++ b/priam/src/test/java/com/netflix/priam/health/TestCassandraMonitor.java @@ -63,7 +63,7 @@ public void testCassandraMonitor() throws Exception { Assert.assertFalse(CassandraMonitor.hasCassadraStarted()); - CassandraMonitor.setIsCassadraStarted(); + CassandraMonitor.setIsCassandraStarted(true); Assert.assertTrue(CassandraMonitor.hasCassadraStarted()); monitor.execute(); diff --git a/priam/src/test/java/com/netflix/priam/utils/TestDateUtils.java b/priam/src/test/java/com/netflix/priam/utils/TestDateUtils.java index 15cc254b3..a9701173a 100644 --- a/priam/src/test/java/com/netflix/priam/utils/TestDateUtils.java +++ b/priam/src/test/java/com/netflix/priam/utils/TestDateUtils.java @@ -17,6 +17,7 @@ package com.netflix.priam.utils; +import com.google.common.truth.Truth; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -100,4 +101,34 @@ public void testFutureDateRangeValues() { Assert.assertEquals(Instant.ofEpochSecond(1830686460), dateRange.getEndTime()); Assert.assertEquals("1830", dateRange.match()); } + + @Test + public void testContainsInstantBeforeStartTime() { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202404301200,202405011200"); + Truth.assertThat(dateRange.contains(Instant.parse("2024-04-29T12:00:00Z"))).isFalse(); + } + + @Test + public void testContainsInstantEqualToStartTime() { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202404301200,202405011200"); + Truth.assertThat(dateRange.contains(Instant.parse("2024-04-30T12:00:00Z"))).isTrue(); + } + + @Test + public void testContainsInstantBetweenStartAndEndTimes() { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202404301200,202405011200"); + Truth.assertThat(dateRange.contains(Instant.parse("2024-05-01T00:00:00Z"))).isTrue(); + } + + @Test + public void testContainsInstantEqualToEndTime() { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202404301200,202405011200"); + Truth.assertThat(dateRange.contains(Instant.parse("2024-05-01T12:00:00Z"))).isTrue(); + } + + @Test + public void testContainsInstantAfterEndTime() { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202404301200,202405011200"); + Truth.assertThat(dateRange.contains(Instant.parse("2024-05-02T12:00:00Z"))).isFalse(); + } }