From 6bb0c1b27879fe2c875ee4adaca87d24bc58d94f Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Wed, 1 May 2024 14:45:40 -0700 Subject: [PATCH 01/15] Remove last usage of CL BackupFileType. --- .../java/com/netflix/priam/backup/AbstractBackupPath.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java index 7e61ccada..769ff330a 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java @@ -47,7 +47,6 @@ public abstract class AbstractBackupPath implements Comparable Date: Wed, 1 May 2024 14:47:04 -0700 Subject: [PATCH 02/15] Remove unused remotePrefix() method. --- .../java/com/netflix/priam/aws/RemoteBackupPath.java | 9 --------- .../com/netflix/priam/backup/AbstractBackupPath.java | 5 ----- 2 files changed, 14 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java b/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java index ef8a6234c..061a0caa6 100644 --- a/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java +++ b/priam/src/main/java/com/netflix/priam/aws/RemoteBackupPath.java @@ -26,7 +26,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Instant; -import java.util.Date; import java.util.List; import javax.inject.Inject; @@ -149,14 +148,6 @@ public void parsePartialPrefix(String remoteFilePath) { token = remotePath.getName(3).toString(); } - @Override - public String remotePrefix(Date start, Date end, String location) { - return PATH_JOINER.join( - clusterPrefix(location), - instanceIdentity.getInstance().getToken(), - match(start, end)); - } - @Override public Path remoteV2Prefix(Path location, BackupFileType fileType) { if (location.getNameCount() <= 1) { diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java index 769ff330a..4ddbd22c5 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java @@ -184,11 +184,6 @@ public boolean equals(Object obj) { /** Parses paths with just token prefixes */ public abstract void parsePartialPrefix(String remoteFilePath); - /** - * Provides a common prefix that matches all objects that fall between the start and end time - */ - public abstract String remotePrefix(Date start, Date end, String location); - public abstract Path remoteV2Prefix(Path location, BackupFileType fileType); /** Provides the cluster prefix */ From b80bfb35b2f26730718eaeeabce94f1db345cdb4 Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Wed, 1 May 2024 12:23:17 -0700 Subject: [PATCH 03/15] Create DateRange::contains to consolidate recurring logic in a consistent way. --- .../netflix/priam/backup/BackupStatusMgr.java | 13 +------- .../netflix/priam/backupv2/MetaV2Proxy.java | 17 ++-------- .../com/netflix/priam/utils/DateUtil.java | 4 +++ .../netflix/priam/utils/TestDateUtils.java | 31 +++++++++++++++++++ 4 files changed, 38 insertions(+), 27 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupStatusMgr.java b/priam/src/main/java/com/netflix/priam/backup/BackupStatusMgr.java index 891a8a727..01f95c710 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupStatusMgr.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupStatusMgr.java @@ -209,18 +209,7 @@ public List getLatestBackupMetadata(DateUtil.DateRange dateRange .stream() .filter(Objects::nonNull) .filter(backupMetadata -> backupMetadata.getStatus() == Status.FINISHED) - .filter( - backupMetadata -> - backupMetadata - .getStart() - .toInstant() - .compareTo(dateRange.getStartTime()) - >= 0 - && backupMetadata - .getStart() - .toInstant() - .compareTo(dateRange.getEndTime()) - <= 0) + .filter(backupMetadata -> dateRange.contains(backupMetadata.getStart().toInstant())) .sorted(Comparator.comparing(BackupMetadata::getStart).reversed()) .collect(Collectors.toList()); } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java index 6e5850c08..15a60b4e4 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java @@ -107,17 +107,7 @@ public Iterator getIncrementals(DateUtil.DateRange dateRange return new FilterIterator<>( transformIterator, - abstractBackupPath -> - (abstractBackupPath.getLastModified().isAfter(dateRange.getStartTime()) - && abstractBackupPath - .getLastModified() - .isBefore(dateRange.getEndTime())) - || abstractBackupPath - .getLastModified() - .equals(dateRange.getStartTime()) - || abstractBackupPath - .getLastModified() - .equals(dateRange.getEndTime())); + abstractBackupPath -> dateRange.contains(abstractBackupPath.getLastModified())); } @Override @@ -136,10 +126,7 @@ public List findMetaFiles(DateUtil.DateRange dateRange) { AbstractBackupPath abstractBackupPath = abstractBackupPathProvider.get(); abstractBackupPath.parseRemote(iterator.next()); logger.debug("Meta file found: {}", abstractBackupPath); - if (abstractBackupPath.getLastModified().toEpochMilli() - >= dateRange.getStartTime().toEpochMilli() - && abstractBackupPath.getLastModified().toEpochMilli() - <= dateRange.getEndTime().toEpochMilli()) { + if (dateRange.contains(abstractBackupPath.getLastModified())) { metas.add(abstractBackupPath); } } diff --git a/priam/src/main/java/com/netflix/priam/utils/DateUtil.java b/priam/src/main/java/com/netflix/priam/utils/DateUtil.java index ff28cf96f..54c7e264c 100644 --- a/priam/src/main/java/com/netflix/priam/utils/DateUtil.java +++ b/priam/src/main/java/com/netflix/priam/utils/DateUtil.java @@ -215,6 +215,10 @@ public Instant getEndTime() { return endTime; } + public boolean contains(Instant instant) { + return startTime.compareTo(instant) <= 0 && endTime.compareTo(instant) >= 0; + } + public String toString() { return GsonJsonSerializer.getGson().toJson(this); } diff --git a/priam/src/test/java/com/netflix/priam/utils/TestDateUtils.java b/priam/src/test/java/com/netflix/priam/utils/TestDateUtils.java index 15cc254b3..a9701173a 100644 --- a/priam/src/test/java/com/netflix/priam/utils/TestDateUtils.java +++ b/priam/src/test/java/com/netflix/priam/utils/TestDateUtils.java @@ -17,6 +17,7 @@ package com.netflix.priam.utils; +import com.google.common.truth.Truth; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -100,4 +101,34 @@ public void testFutureDateRangeValues() { Assert.assertEquals(Instant.ofEpochSecond(1830686460), dateRange.getEndTime()); Assert.assertEquals("1830", dateRange.match()); } + + @Test + public void testContainsInstantBeforeStartTime() { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202404301200,202405011200"); + Truth.assertThat(dateRange.contains(Instant.parse("2024-04-29T12:00:00Z"))).isFalse(); + } + + @Test + public void testContainsInstantEqualToStartTime() { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202404301200,202405011200"); + Truth.assertThat(dateRange.contains(Instant.parse("2024-04-30T12:00:00Z"))).isTrue(); + } + + @Test + public void testContainsInstantBetweenStartAndEndTimes() { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202404301200,202405011200"); + Truth.assertThat(dateRange.contains(Instant.parse("2024-05-01T00:00:00Z"))).isTrue(); + } + + @Test + public void testContainsInstantEqualToEndTime() { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202404301200,202405011200"); + Truth.assertThat(dateRange.contains(Instant.parse("2024-05-01T12:00:00Z"))).isTrue(); + } + + @Test + public void testContainsInstantAfterEndTime() { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202404301200,202405011200"); + Truth.assertThat(dateRange.contains(Instant.parse("2024-05-02T12:00:00Z"))).isFalse(); + } } From d51863c26e7d2481c825dac2060849e1dca4c18d Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Wed, 1 May 2024 12:50:30 -0700 Subject: [PATCH 04/15] Change IMetaProxy API to return an ImmutableList of AbstractBackupPaths when fetching incrementals. The iterators are always fully materialized. Also, remove the now-redundant method from BackupRestoreUtil that merely wrapped the MetaProxy call. --- .../priam/backup/BackupRestoreUtil.java | 14 ---------- .../netflix/priam/backupv2/IMetaProxy.java | 4 +-- .../netflix/priam/backupv2/MetaV2Proxy.java | 26 ++++++++----------- .../priam/resources/BackupServletV2.java | 26 +++++++------------ .../priam/restore/AbstractRestore.java | 8 +++--- .../priam/backupv2/TestMetaV2Proxy.java | 10 ++----- 6 files changed, 30 insertions(+), 58 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java b/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java index 54dda7f2e..e165f6bc2 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java @@ -21,7 +21,6 @@ import com.netflix.priam.backupv2.IMetaProxy; import com.netflix.priam.utils.DateUtil; import java.nio.file.Path; -import java.time.Instant; import java.util.*; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -83,19 +82,6 @@ public static List getMostRecentSnapshotPaths( return snapshotPaths; } - public static List getIncrementalPaths( - AbstractBackupPath latestValidMetaFile, - DateUtil.DateRange dateRange, - IMetaProxy metaProxy) { - Instant snapshotTime; - snapshotTime = latestValidMetaFile.getLastModified(); - DateUtil.DateRange incrementalDateRange = - new DateUtil.DateRange(snapshotTime, dateRange.getEndTime()); - List incrementalPaths = new ArrayList<>(); - metaProxy.getIncrementals(incrementalDateRange).forEachRemaining(incrementalPaths::add); - return incrementalPaths; - } - public static Map> getFilter(String inputFilter) throws IllegalArgumentException { if (StringUtils.isEmpty(inputFilter)) return null; diff --git a/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java b/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java index 2f79852b2..d16b3aa73 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/IMetaProxy.java @@ -17,12 +17,12 @@ package com.netflix.priam.backupv2; +import com.google.common.collect.ImmutableList; import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.BackupRestoreException; import com.netflix.priam.backup.BackupVerificationResult; import com.netflix.priam.utils.DateUtil; import java.nio.file.Path; -import java.util.Iterator; import java.util.List; /** Proxy to do management tasks for meta files. Created by aagrawal on 12/18/18. */ @@ -78,7 +78,7 @@ public interface IMetaProxy { * @param dateRange the time period to scan in the remote file system for incremental files. * @return iterator containing the list of path on the remote file system satisfying criteria. */ - Iterator getIncrementals(DateUtil.DateRange dateRange); + ImmutableList getIncrementals(DateUtil.DateRange dateRange); /** * Validate that all the files mentioned in the meta file actually exists on remote file system. diff --git a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java index 15a60b4e4..e2101a5bc 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java @@ -17,6 +17,7 @@ package com.netflix.priam.backupv2; +import com.google.common.collect.ImmutableList; import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.BackupRestoreException; import com.netflix.priam.backup.BackupVerificationResult; @@ -31,8 +32,6 @@ import java.util.*; import javax.inject.Inject; import javax.inject.Provider; -import org.apache.commons.collections4.iterators.FilterIterator; -import org.apache.commons.collections4.iterators.TransformIterator; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.io.filefilter.IOFileFilter; @@ -84,7 +83,7 @@ private String getMatch( } @Override - public Iterator getIncrementals(DateUtil.DateRange dateRange) { + public ImmutableList getIncrementals(DateUtil.DateRange dateRange) { String incrementalPrefix = getMatch(dateRange, AbstractBackupPath.BackupFileType.SST_V2); String marker = getMatch( @@ -96,18 +95,15 @@ public Iterator getIncrementals(DateUtil.DateRange dateRange marker, dateRange); Iterator iterator = fs.listFileSystem(incrementalPrefix, null, marker); - Iterator transformIterator = - new TransformIterator<>( - iterator, - s -> { - AbstractBackupPath path = abstractBackupPathProvider.get(); - path.parseRemote(s); - return path; - }); - - return new FilterIterator<>( - transformIterator, - abstractBackupPath -> dateRange.contains(abstractBackupPath.getLastModified())); + ImmutableList.Builder results = ImmutableList.builder(); + while (iterator.hasNext()) { + AbstractBackupPath path = abstractBackupPathProvider.get(); + path.parseRemote(iterator.next()); + if (dateRange.contains(path.getLastModified())) { + results.add(path); + } + } + return results.build(); } @Override diff --git a/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java b/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java index 9ac66639e..aef6bacaf 100644 --- a/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java +++ b/priam/src/main/java/com/netflix/priam/resources/BackupServletV2.java @@ -153,25 +153,19 @@ public Response validateV2SnapshotByDate( @Path("/list/{daterange}") public Response list(@PathParam("daterange") String daterange) throws Exception { DateUtil.DateRange dateRange = new DateUtil.DateRange(daterange); - // Find latest valid meta file. - Optional latestValidMetaFile = + Optional metaFile = BackupRestoreUtil.getLatestValidMetaPath(metaProxy, dateRange); - if (!latestValidMetaFile.isPresent()) { + if (!metaFile.isPresent()) { return Response.ok("No valid meta found!").build(); } - List allFiles = + List files = BackupRestoreUtil.getMostRecentSnapshotPaths( - latestValidMetaFile.get(), metaProxy, pathProvider); - allFiles.addAll( - BackupRestoreUtil.getIncrementalPaths( - latestValidMetaFile.get(), dateRange, metaProxy)); - - return Response.ok( - GsonJsonSerializer.getGson() - .toJson( - allFiles.stream() - .map(AbstractBackupPath::getRemotePath) - .collect(Collectors.toList()))) - .build(); + metaFile.get(), metaProxy, pathProvider); + DateUtil.DateRange incrementalDateRange = + new DateRange(metaFile.get().getLastModified(), dateRange.getEndTime()); + files.addAll(metaProxy.getIncrementals(incrementalDateRange)); + List remotePaths = + files.stream().map(AbstractBackupPath::getRemotePath).collect(Collectors.toList()); + return Response.ok(GsonJsonSerializer.getGson().toJson(remotePaths)).build(); } } diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index 11a133adf..1a427ff2c 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -220,9 +220,11 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { BackupRestoreUtil.getMostRecentSnapshotPaths( latestValidMetaFile.get(), metaProxy, pathProvider); if (!config.skipIncrementalRestore()) { - allFiles.addAll( - BackupRestoreUtil.getIncrementalPaths( - latestValidMetaFile.get(), dateRange, metaProxy)); + DateUtil.DateRange incrementalDateRange = + new DateUtil.DateRange( + latestValidMetaFile.get().getLastModified(), + dateRange.getEndTime()); + allFiles.addAll(metaProxy.getIncrementals(incrementalDateRange)); } // Download snapshot which is listed in the meta file. diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java index 417fc7444..3494a0578 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java @@ -17,6 +17,7 @@ package com.netflix.priam.backupv2; +import com.google.common.truth.Truth; import com.google.inject.Guice; import com.google.inject.Injector; import com.netflix.priam.backup.AbstractBackupPath; @@ -32,7 +33,6 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; import javax.inject.Provider; @@ -131,13 +131,7 @@ public void testGetSSTFilesFromMeta() throws Exception { @Test public void testGetIncrementalFiles() throws Exception { DateUtil.DateRange dateRange = new DateUtil.DateRange("202812071820,20281229"); - Iterator incrementals = metaProxy.getIncrementals(dateRange); - int i = 0; - while (incrementals.hasNext()) { - System.out.println(incrementals.next()); - i++; - } - Assert.assertEquals(3, i); + Truth.assertThat(metaProxy.getIncrementals(dateRange)).hasSize(3); } @Test From 7bb2a38a86db852ea535df97ae31eb67f015a890 Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Wed, 1 May 2024 13:24:23 -0700 Subject: [PATCH 05/15] Include incremental secondary index files in restore. --- .../netflix/priam/backupv2/MetaV2Proxy.java | 17 ++++++++++---- .../priam/backupv2/TestMetaV2Proxy.java | 23 ++++++++++++++++++- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java index e2101a5bc..1073681c0 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/MetaV2Proxy.java @@ -84,11 +84,18 @@ private String getMatch( @Override public ImmutableList getIncrementals(DateUtil.DateRange dateRange) { - String incrementalPrefix = getMatch(dateRange, AbstractBackupPath.BackupFileType.SST_V2); - String marker = - getMatch( - new DateUtil.DateRange(dateRange.getStartTime(), null), - AbstractBackupPath.BackupFileType.SST_V2); + return new ImmutableList.Builder() + .addAll(getIncrementals(dateRange, AbstractBackupPath.BackupFileType.SST_V2)) + .addAll( + getIncrementals( + dateRange, AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2)) + .build(); + } + + private ImmutableList getIncrementals( + DateUtil.DateRange dateRange, AbstractBackupPath.BackupFileType type) { + String incrementalPrefix = getMatch(dateRange, type); + String marker = getMatch(new DateUtil.DateRange(dateRange.getStartTime(), null), type); logger.info( "Listing filesystem with prefix: {}, marker: {}, daterange: {}", incrementalPrefix, diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java index 3494a0578..2ac6f5454 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestMetaV2Proxy.java @@ -17,6 +17,7 @@ package com.netflix.priam.backupv2; +import com.google.common.collect.ImmutableList; import com.google.common.truth.Truth; import com.google.inject.Guice; import com.google.inject.Injector; @@ -131,7 +132,16 @@ public void testGetSSTFilesFromMeta() throws Exception { @Test public void testGetIncrementalFiles() throws Exception { DateUtil.DateRange dateRange = new DateUtil.DateRange("202812071820,20281229"); - Truth.assertThat(metaProxy.getIncrementals(dateRange)).hasSize(3); + ImmutableList paths = metaProxy.getIncrementals(dateRange); + Truth.assertThat(paths).hasSize(4); + } + + @Test + public void testGetIncrementalFilesIncludesSecondaryIndexes() throws Exception { + DateUtil.DateRange dateRange = new DateUtil.DateRange("202812071820,20281229"); + ImmutableList paths = metaProxy.getIncrementals(dateRange); + Truth.assertThat(paths.get(3).getType()) + .isEqualTo(AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2); } @Test @@ -221,6 +231,17 @@ private List getRemoteFakeFiles() { "SNAPPY", "PLAINTEXT", "file4-Data.db")); + files.add( + Paths.get( + getPrefix(), + AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2.toString(), + "1859828420000", + "keyspace1", + "columnfamily1", + "index1", + "SNAPPY", + "PLAINTEXT", + "file5-Data.db")); files.add( Paths.get( getPrefix(), From ce6c7c10ef0f195d5e8fc30f17b310446c60ffc7 Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Sat, 9 Mar 2024 15:46:59 -0800 Subject: [PATCH 06/15] Remove unused code from our restore tooling. --- .../priam/restore/AbstractRestore.java | 25 ------------------- .../com/netflix/priam/restore/Restore.java | 4 --- 2 files changed, 29 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index 1a427ff2c..fdab811ef 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -55,8 +55,6 @@ */ public abstract class AbstractRestore extends Task implements IRestoreStrategy { private static final Logger logger = LoggerFactory.getLogger(AbstractRestore.class); - private static final String JOBNAME = "AbstractRestore"; - private static final String SYSTEM_KEYSPACE = "system"; private static BigInteger restoreToken; final IBackupFileSystem fs; final Sleeper sleeper; @@ -75,7 +73,6 @@ public abstract class AbstractRestore extends Task implements IRestoreStrategy { public AbstractRestore( IConfiguration config, IBackupFileSystem fs, - String name, Sleeper sleeper, Provider pathProvider, InstanceIdentity instanceIdentity, @@ -267,26 +264,4 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { * @throws Exception If there is any error in downloading file from the remote file system. */ protected abstract Future downloadFile(final AbstractBackupPath path) throws Exception; - - final class BoundedList extends LinkedList { - - private final int limit; - - BoundedList(int limit) { - this.limit = limit; - } - - @Override - public boolean add(E o) { - super.add(o); - while (size() > limit) { - super.remove(); - } - return true; - } - } - - public final int getDownloadTasksQueued() { - return fs.getDownloadTasksQueued(); - } } diff --git a/priam/src/main/java/com/netflix/priam/restore/Restore.java b/priam/src/main/java/com/netflix/priam/restore/Restore.java index 6dba06149..4c23d2e60 100644 --- a/priam/src/main/java/com/netflix/priam/restore/Restore.java +++ b/priam/src/main/java/com/netflix/priam/restore/Restore.java @@ -30,14 +30,11 @@ import javax.inject.Inject; import javax.inject.Provider; import javax.inject.Singleton; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Main class for restoring data from backup. Backup restored using this way are not encrypted. */ @Singleton public class Restore extends AbstractRestore { public static final String JOBNAME = "AUTO_RESTORE_JOB"; - private static final Logger logger = LoggerFactory.getLogger(Restore.class); @Inject public Restore( @@ -53,7 +50,6 @@ public Restore( super( config, fs, - JOBNAME, sleeper, pathProvider, instanceIdentity, From 8d18f511814689045e0d0685a4bff8a2356a360b Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Sat, 9 Mar 2024 20:52:52 -0800 Subject: [PATCH 07/15] Remove waitForCompletion parameter to download method which is always false in practice. Remove final modifier on static method. Make static property an instance variable instead. Use parameterized constructor call in place of two lines. --- .../netflix/priam/restore/AbstractRestore.java | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index fdab811ef..9b3d89495 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -49,13 +49,12 @@ /** * A means to perform a restore. This class contains the following characteristics: - It is agnostic - * to the source type of the restore, this is determine by the injected IBackupFileSystem. - This + * to the source type of the restore, this is determined by the injected IBackupFileSystem. - This * class can be scheduled, i.e. it is a "Task". - When this class is executed, it uses its own * thread pool to execute the restores. */ public abstract class AbstractRestore extends Task implements IRestoreStrategy { private static final Logger logger = LoggerFactory.getLogger(AbstractRestore.class); - private static BigInteger restoreToken; final IBackupFileSystem fs; final Sleeper sleeper; private final BackupRestoreUtil backupRestoreUtil; @@ -94,7 +93,7 @@ public AbstractRestore( this.postRestoreHook = postRestoreHook; } - public static final boolean isRestoreEnabled(IConfiguration conf, InstanceInfo instanceInfo) { + public static boolean isRestoreEnabled(IConfiguration conf, InstanceInfo instanceInfo) { boolean isRestoreMode = StringUtils.isNotBlank(conf.getRestoreSnapshot()); boolean isBackedupRac = (CollectionUtils.isEmpty(conf.getBackupRacs()) @@ -102,8 +101,7 @@ public static final boolean isRestoreEnabled(IConfiguration conf, InstanceInfo i return (isRestoreMode && isBackedupRac); } - private List> download( - Iterator fsIterator, boolean waitForCompletion) throws Exception { + private List> download(Iterator fsIterator) throws Exception { List> futureList = new ArrayList<>(); while (fsIterator.hasNext()) { AbstractBackupPath temp = fsIterator.next(); @@ -126,10 +124,6 @@ private List> download( + localFileHandler.getName()); futureList.add(downloadFile(temp)); } - - // Wait for all download to finish that were started from this method. - if (waitForCompletion) waitForCompletion(futureList); - return futureList; } @@ -182,7 +176,7 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { try { if (config.isRestoreClosestToken()) { - restoreToken = + BigInteger restoreToken = tokenSelector.getClosestToken( new BigInteger(origToken), new Date(dateRange.getStartTime().toEpochMilli())); @@ -225,8 +219,7 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { } // Download snapshot which is listed in the meta file. - List> futureList = new ArrayList<>(); - futureList.addAll(download(allFiles.iterator(), false)); + List> futureList = new ArrayList<>(download(allFiles.iterator())); // Wait for all the futures to finish. waitForCompletion(futureList); From 646973d8185fd18393c02948ac5aec5772337894 Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Sat, 9 Mar 2024 20:57:14 -0800 Subject: [PATCH 08/15] Remove all non-javadoc comments that don't provide warnings to future maintainers. --- .../com/netflix/priam/restore/AbstractRestore.java | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index 9b3d89495..b89ba18ae 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -105,8 +105,7 @@ private List> download(Iterator fsIterator) thr List> futureList = new ArrayList<>(); while (fsIterator.hasNext()) { AbstractBackupPath temp = fsIterator.next(); - if (backupRestoreUtil.isFiltered( - temp.getKeyspace(), temp.getColumnFamily())) { // is filtered? + if (backupRestoreUtil.isFiltered(temp.getKeyspace(), temp.getColumnFamily())) { logger.info( "Bypassing restoring file \"{}\" as it is part of the keyspace.columnfamily filter list. Its keyspace:cf is: {}:{}", temp.newRestoreFile(), @@ -155,7 +154,6 @@ public Void retriableCall() throws Exception { } public void restore(DateUtil.DateRange dateRange) throws Exception { - // fail early if post restore hook has invalid parameters if (!postRestoreHook.hasValidParameters()) { throw new PostRestoreHookException("Invalid PostRestoreHook parameters"); } @@ -163,7 +161,6 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { Date endTime = new Date(dateRange.getEndTime().toEpochMilli()); IMetaProxy metaProxy = metaV2Proxy; - // Set the restore status. instanceState.getRestoreStatus().resetStatus(); instanceState .getRestoreStatus() @@ -183,14 +180,11 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { instanceIdentity.getInstance().setToken(restoreToken.toString()); } - // Stop cassandra if its running stopCassProcess(); - // Cleanup local data File dataDir = new File(config.getDataFileLocation()); if (dataDir.exists() && dataDir.isDirectory()) FileUtils.cleanDirectory(dataDir); - // Find latest valid meta file. Optional latestValidMetaFile = BackupRestoreUtil.getLatestValidMetaPath(metaProxy, dateRange); @@ -218,22 +212,17 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { allFiles.addAll(metaProxy.getIncrementals(incrementalDateRange)); } - // Download snapshot which is listed in the meta file. List> futureList = new ArrayList<>(download(allFiles.iterator())); - // Wait for all the futures to finish. waitForCompletion(futureList); - // Given that files are restored now, kick off post restore hook logger.info("Starting post restore hook"); postRestoreHook.execute(); logger.info("Completed executing post restore hook"); - // Declare restore as finished. instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); instanceState.setRestoreStatus(Status.FINISHED); - // Start cassandra if restore is successful. if (!config.doesCassandraStartManually()) cassProcess.start(true); else logger.info( From 8c23800cc14f204508cfca70f1d5907eedae4606 Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Sat, 9 Mar 2024 21:06:47 -0800 Subject: [PATCH 09/15] Remove redundant logging. --- .../priam/restore/AbstractRestore.java | 26 ++----------------- 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index b89ba18ae..5acdcfe98 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -106,21 +106,8 @@ private List> download(Iterator fsIterator) thr while (fsIterator.hasNext()) { AbstractBackupPath temp = fsIterator.next(); if (backupRestoreUtil.isFiltered(temp.getKeyspace(), temp.getColumnFamily())) { - logger.info( - "Bypassing restoring file \"{}\" as it is part of the keyspace.columnfamily filter list. Its keyspace:cf is: {}:{}", - temp.newRestoreFile(), - temp.getKeyspace(), - temp.getColumnFamily()); continue; } - - File localFileHandler = temp.newRestoreFile(); - if (logger.isDebugEnabled()) - logger.debug( - "Created local file name: " - + localFileHandler.getAbsolutePath() - + File.pathSeparator - + localFileHandler.getName()); futureList.add(downloadFile(temp)); } return futureList; @@ -142,15 +129,14 @@ public void execute() throws Exception { final DateUtil.DateRange dateRange = new DateUtil.DateRange(config.getRestoreSnapshot()); new RetryableCallable() { public Void retriableCall() throws Exception { - logger.info("Attempting restore"); restore(dateRange); - logger.info("Restore completed"); // Wait for other server init to complete sleeper.sleep(30000); return null; } }.call(); + logger.info("Restore complete."); } public void restore(DateUtil.DateRange dateRange) throws Exception { @@ -189,14 +175,12 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { BackupRestoreUtil.getLatestValidMetaPath(metaProxy, dateRange); if (!latestValidMetaFile.isPresent()) { - logger.info("No valid snapshot meta file found, Restore Failed."); instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); instanceState.setRestoreStatus(Status.FAILED); return; } - logger.info( - "Snapshot Meta file for restore {}", latestValidMetaFile.get().getRemotePath()); + logger.info("Meta file for restore {}", latestValidMetaFile.get().getRemotePath()); instanceState .getRestoreStatus() .setSnapshotMetaFile(latestValidMetaFile.get().getRemotePath()); @@ -216,21 +200,15 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { waitForCompletion(futureList); - logger.info("Starting post restore hook"); postRestoreHook.execute(); - logger.info("Completed executing post restore hook"); instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); instanceState.setRestoreStatus(Status.FINISHED); if (!config.doesCassandraStartManually()) cassProcess.start(true); - else - logger.info( - "config.doesCassandraStartManually() is set to True, hence Cassandra needs to be started manually ..."); } catch (Exception e) { instanceState.setRestoreStatus(Status.FAILED); instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); - logger.error("Error while trying to restore: {}", e.getMessage(), e); throw e; } finally { instanceIdentity.getInstance().setToken(origToken); From aa3dffe762a38dc0d0686ff6ddaa66fd0f2d006c Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Sat, 9 Mar 2024 21:09:07 -0800 Subject: [PATCH 10/15] Remove redundant vertical whitespace. --- .../netflix/priam/restore/AbstractRestore.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index 5acdcfe98..df049e458 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -130,7 +130,6 @@ public void execute() throws Exception { new RetryableCallable() { public Void retriableCall() throws Exception { restore(dateRange); - // Wait for other server init to complete sleeper.sleep(30000); return null; @@ -143,10 +142,8 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { if (!postRestoreHook.hasValidParameters()) { throw new PostRestoreHookException("Invalid PostRestoreHook parameters"); } - Date endTime = new Date(dateRange.getEndTime().toEpochMilli()); IMetaProxy metaProxy = metaV2Proxy; - instanceState.getRestoreStatus().resetStatus(); instanceState .getRestoreStatus() @@ -156,7 +153,6 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { instanceState.getRestoreStatus().setExecutionStartTime(LocalDateTime.now()); instanceState.setRestoreStatus(Status.STARTED); String origToken = instanceIdentity.getInstance().getToken(); - try { if (config.isRestoreClosestToken()) { BigInteger restoreToken = @@ -165,26 +161,20 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { new Date(dateRange.getStartTime().toEpochMilli())); instanceIdentity.getInstance().setToken(restoreToken.toString()); } - stopCassProcess(); - File dataDir = new File(config.getDataFileLocation()); if (dataDir.exists() && dataDir.isDirectory()) FileUtils.cleanDirectory(dataDir); - Optional latestValidMetaFile = BackupRestoreUtil.getLatestValidMetaPath(metaProxy, dateRange); - if (!latestValidMetaFile.isPresent()) { instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); instanceState.setRestoreStatus(Status.FAILED); return; } - logger.info("Meta file for restore {}", latestValidMetaFile.get().getRemotePath()); instanceState .getRestoreStatus() .setSnapshotMetaFile(latestValidMetaFile.get().getRemotePath()); - List allFiles = BackupRestoreUtil.getMostRecentSnapshotPaths( latestValidMetaFile.get(), metaProxy, pathProvider); @@ -195,16 +185,11 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { dateRange.getEndTime()); allFiles.addAll(metaProxy.getIncrementals(incrementalDateRange)); } - List> futureList = new ArrayList<>(download(allFiles.iterator())); - waitForCompletion(futureList); - postRestoreHook.execute(); - instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); instanceState.setRestoreStatus(Status.FINISHED); - if (!config.doesCassandraStartManually()) cassProcess.start(true); } catch (Exception e) { instanceState.setRestoreStatus(Status.FAILED); From 4d6d07f332e4307253fa3d1438bb271c27311253 Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Sat, 9 Mar 2024 21:45:29 -0800 Subject: [PATCH 11/15] Remove backup racs as that configuration is not used in practice and is not relevant to whether a restore should be attempted. --- .../com/netflix/priam/config/IConfiguration.java | 5 ----- .../com/netflix/priam/config/PriamConfiguration.java | 5 ----- .../com/netflix/priam/restore/AbstractRestore.java | 12 +++--------- .../java/com/netflix/priam/tuner/StandardTuner.java | 2 +- 4 files changed, 4 insertions(+), 20 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java index 6fc656caa..7c611f0fe 100644 --- a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java +++ b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java @@ -125,11 +125,6 @@ default int getBackupRetentionDays() { return 0; } - /** @return Get list of racs to backup. Backup all racs if empty */ - default List getBackupRacs() { - return Collections.EMPTY_LIST; - } - /** * Backup location i.e. remote file system to upload backups. e.g. for S3 it will be s3 bucket * name diff --git a/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java b/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java index 15fb15c69..d565eb1f2 100644 --- a/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java +++ b/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java @@ -92,11 +92,6 @@ public int getBackupRetentionDays() { return config.get(PRIAM_PRE + ".backup.retention", 0); } - @Override - public List getBackupRacs() { - return config.getList(PRIAM_PRE + ".backup.racs"); - } - @Override public String getRestorePrefix() { return config.get(PRIAM_PRE + ".restore.prefix"); diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index df049e458..06406b982 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -25,7 +25,6 @@ import com.netflix.priam.defaultimpl.ICassandraProcess; import com.netflix.priam.health.InstanceState; import com.netflix.priam.identity.InstanceIdentity; -import com.netflix.priam.identity.config.InstanceInfo; import com.netflix.priam.scheduler.Task; import com.netflix.priam.utils.DateUtil; import com.netflix.priam.utils.RetryableCallable; @@ -41,7 +40,6 @@ import javax.inject.Inject; import javax.inject.Named; import javax.inject.Provider; -import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -93,12 +91,8 @@ public AbstractRestore( this.postRestoreHook = postRestoreHook; } - public static boolean isRestoreEnabled(IConfiguration conf, InstanceInfo instanceInfo) { - boolean isRestoreMode = StringUtils.isNotBlank(conf.getRestoreSnapshot()); - boolean isBackedupRac = - (CollectionUtils.isEmpty(conf.getBackupRacs()) - || conf.getBackupRacs().contains(instanceInfo.getRac())); - return (isRestoreMode && isBackedupRac); + public static boolean isRestoreEnabled(IConfiguration conf) { + return StringUtils.isNotBlank(conf.getRestoreSnapshot()); } private List> download(Iterator fsIterator) throws Exception { @@ -123,7 +117,7 @@ private void stopCassProcess() throws IOException { @Override public void execute() throws Exception { - if (!isRestoreEnabled(config, instanceIdentity.getInstanceInfo())) return; + if (!isRestoreEnabled(config)) return; logger.info("Starting restore for {}", config.getRestoreSnapshot()); final DateUtil.DateRange dateRange = new DateUtil.DateRange(config.getRestoreSnapshot()); diff --git a/priam/src/main/java/com/netflix/priam/tuner/StandardTuner.java b/priam/src/main/java/com/netflix/priam/tuner/StandardTuner.java index dc4500ac9..777331fc7 100644 --- a/priam/src/main/java/com/netflix/priam/tuner/StandardTuner.java +++ b/priam/src/main/java/com/netflix/priam/tuner/StandardTuner.java @@ -67,7 +67,7 @@ public void writeAllProperties(String yamlLocation, String hostname, String seed map.put("listen_address", hostname); map.put("rpc_address", hostname); // Dont bootstrap in restore mode - if (!Restore.isRestoreEnabled(config, instanceInfo)) { + if (!Restore.isRestoreEnabled(config)) { map.put("auto_bootstrap", config.getAutoBoostrap()); } else { map.put("auto_bootstrap", false); From f10289fd58779b7b6b0b738cfe1f0a420dd47b8a Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Wed, 10 Apr 2024 15:16:49 -0700 Subject: [PATCH 12/15] Consolidate restore status API. --- .../netflix/priam/backupv2/BackupTTLTask.java | 4 +--- .../backupv2/BackupVerificationTask.java | 4 +--- .../netflix/priam/health/InstanceState.java | 24 ++++++++++++++++++- .../priam/restore/AbstractRestore.java | 24 ++++--------------- .../priam/backupv2/TestBackupTTLTask.java | 5 ++-- .../backupv2/TestBackupVerificationTask.java | 5 ++-- 6 files changed, 36 insertions(+), 30 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java b/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java index 28282e789..8479120ba 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/BackupTTLTask.java @@ -96,9 +96,7 @@ public BackupTTLTask( @Override public void execute() throws Exception { - if (instanceState.getRestoreStatus() != null - && instanceState.getRestoreStatus().getStatus() != null - && instanceState.getRestoreStatus().getStatus() == Status.STARTED) { + if (instanceState.isRestoring()) { logger.info("Not executing the TTL Task for backups as Priam is in restore mode."); return; } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/BackupVerificationTask.java b/priam/src/main/java/com/netflix/priam/backupv2/BackupVerificationTask.java index 56df6ec43..92e5b0f72 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/BackupVerificationTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/BackupVerificationTask.java @@ -74,9 +74,7 @@ public void execute() throws Exception { return; } - if (instanceState.getRestoreStatus() != null - && instanceState.getRestoreStatus().getStatus() != null - && instanceState.getRestoreStatus().getStatus() == Status.STARTED) { + if (instanceState.isRestoring()) { logger.info("Skipping backup verification. Priam is in restore mode."); return; } diff --git a/priam/src/main/java/com/netflix/priam/health/InstanceState.java b/priam/src/main/java/com/netflix/priam/health/InstanceState.java index 6336e1ca6..1d7716ea5 100644 --- a/priam/src/main/java/com/netflix/priam/health/InstanceState.java +++ b/priam/src/main/java/com/netflix/priam/health/InstanceState.java @@ -18,8 +18,11 @@ import com.netflix.priam.backup.BackupMetadata; import com.netflix.priam.backup.Status; +import com.netflix.priam.utils.DateUtil; import com.netflix.priam.utils.GsonJsonSerializer; import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; @@ -143,12 +146,31 @@ public boolean isHealthy() { return isHealthy.get(); } - private boolean isRestoring() { + public boolean isRestoring() { return restoreStatus != null && restoreStatus.getStatus() != null && restoreStatus.getStatus() == Status.STARTED; } + public void startRestore(DateUtil.DateRange dateRange) { + restoreStatus.resetStatus(); + restoreStatus.setStartDateRange( + LocalDateTime.ofInstant(dateRange.getStartTime(), ZoneId.of("UTC"))); + Date endTime = new Date(dateRange.getEndTime().toEpochMilli()); + restoreStatus.setEndDateRange(DateUtil.convert(endTime)); + restoreStatus.setExecutionStartTime(LocalDateTime.now()); + setRestoreStatus(Status.STARTED); + } + + public void endRestore(Status status, LocalDateTime endTime) { + restoreStatus.setExecutionEndTime(endTime); + setRestoreStatus(status); + } + + public void setRestoreMetaFile(String path) { + restoreStatus.setSnapshotMetaFile(path); + } + private void setHealthy() { this.isHealthy.set( isRestoring() diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index 06406b982..8d7a494b1 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -34,7 +34,6 @@ import java.math.BigInteger; import java.nio.file.Path; import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.*; import java.util.concurrent.Future; import javax.inject.Inject; @@ -136,16 +135,8 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { if (!postRestoreHook.hasValidParameters()) { throw new PostRestoreHookException("Invalid PostRestoreHook parameters"); } - Date endTime = new Date(dateRange.getEndTime().toEpochMilli()); IMetaProxy metaProxy = metaV2Proxy; - instanceState.getRestoreStatus().resetStatus(); - instanceState - .getRestoreStatus() - .setStartDateRange( - LocalDateTime.ofInstant(dateRange.getStartTime(), ZoneId.of("UTC"))); - instanceState.getRestoreStatus().setEndDateRange(DateUtil.convert(endTime)); - instanceState.getRestoreStatus().setExecutionStartTime(LocalDateTime.now()); - instanceState.setRestoreStatus(Status.STARTED); + instanceState.startRestore(dateRange); String origToken = instanceIdentity.getInstance().getToken(); try { if (config.isRestoreClosestToken()) { @@ -161,14 +152,11 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { Optional latestValidMetaFile = BackupRestoreUtil.getLatestValidMetaPath(metaProxy, dateRange); if (!latestValidMetaFile.isPresent()) { - instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); - instanceState.setRestoreStatus(Status.FAILED); + instanceState.endRestore(Status.FAILED, LocalDateTime.now()); return; } logger.info("Meta file for restore {}", latestValidMetaFile.get().getRemotePath()); - instanceState - .getRestoreStatus() - .setSnapshotMetaFile(latestValidMetaFile.get().getRemotePath()); + instanceState.setRestoreMetaFile(latestValidMetaFile.get().getRemotePath()); List allFiles = BackupRestoreUtil.getMostRecentSnapshotPaths( latestValidMetaFile.get(), metaProxy, pathProvider); @@ -182,12 +170,10 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { List> futureList = new ArrayList<>(download(allFiles.iterator())); waitForCompletion(futureList); postRestoreHook.execute(); - instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); - instanceState.setRestoreStatus(Status.FINISHED); + instanceState.endRestore(Status.FINISHED, LocalDateTime.now()); if (!config.doesCassandraStartManually()) cassProcess.start(true); } catch (Exception e) { - instanceState.setRestoreStatus(Status.FAILED); - instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now()); + instanceState.endRestore(Status.FAILED, LocalDateTime.now()); throw e; } finally { instanceIdentity.getInstance().setToken(origToken); diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java index f99bbce84..0eeb7e954 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupTTLTask.java @@ -197,10 +197,11 @@ public void testTTLNext() throws Exception { } @Test - public void testRestoreMode(@Mocked InstanceState state) throws Exception { + public void testRestoreMode(@Mocked InstanceState.RestoreStatus restoreStatus) + throws Exception { new Expectations() { { - state.getRestoreStatus().getStatus(); + restoreStatus.getStatus(); result = Status.STARTED; } }; diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupVerificationTask.java b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupVerificationTask.java index ba867eb50..f93ab2e76 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupVerificationTask.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupVerificationTask.java @@ -151,10 +151,11 @@ public void noBackups() throws Exception { } @Test - public void testRestoreMode(@Mocked InstanceState state) throws Exception { + public void testRestoreMode(@Mocked InstanceState.RestoreStatus restoreStatus) + throws Exception { new Expectations() { { - state.getRestoreStatus().getStatus(); + restoreStatus.getStatus(); result = Status.STARTED; } }; From 6b3d45a86a0217d748a4e079418b75d0fc635e36 Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Wed, 10 Apr 2024 15:19:32 -0700 Subject: [PATCH 13/15] Cease wiping the data directory in advance of restore. It is counter-intuitive and will be empty in practice in most cases anyway. --- .../main/java/com/netflix/priam/restore/AbstractRestore.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index 8d7a494b1..5d1ecc0c5 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -29,7 +29,6 @@ import com.netflix.priam.utils.DateUtil; import com.netflix.priam.utils.RetryableCallable; import com.netflix.priam.utils.Sleeper; -import java.io.File; import java.io.IOException; import java.math.BigInteger; import java.nio.file.Path; @@ -39,7 +38,6 @@ import javax.inject.Inject; import javax.inject.Named; import javax.inject.Provider; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,8 +145,6 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { instanceIdentity.getInstance().setToken(restoreToken.toString()); } stopCassProcess(); - File dataDir = new File(config.getDataFileLocation()); - if (dataDir.exists() && dataDir.isDirectory()) FileUtils.cleanDirectory(dataDir); Optional latestValidMetaFile = BackupRestoreUtil.getLatestValidMetaPath(metaProxy, dateRange); if (!latestValidMetaFile.isPresent()) { From 4ece8d3a0de7fa11fc407d89f6e9496ae0482950 Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Wed, 1 May 2024 17:48:57 -0700 Subject: [PATCH 14/15] Move getBackupDirectories to BackupRestoreUtil and change its signature to cease requiring an IConfiguration but rather just take the data directory path as a String. --- .../netflix/priam/backup/AbstractBackup.java | 40 ------------------- .../priam/backup/BackupRestoreUtil.java | 36 +++++++++++++++++ .../priam/backup/IncrementalBackup.java | 3 +- .../priam/backupv2/SnapshotMetaTask.java | 4 +- .../priam/backupv2/TestBackupV2Service.java | 4 +- 5 files changed, 44 insertions(+), 43 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java index 8024b124d..78e5edb45 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java @@ -21,13 +21,7 @@ import com.netflix.priam.utils.SystemUtils; import java.io.File; import java.io.FileFilter; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashSet; import java.util.Optional; -import java.util.Set; import javax.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,40 +89,6 @@ protected String getKeyspace(File backupDir) { */ protected abstract void processColumnFamily(File backupDir) throws Exception; - /** - * Get all the backup directories for Cassandra. - * - * @param config to get the location of the data folder. - * @param monitoringFolder folder where cassandra backup's are configured. - * @return Set of the path(s) containing the backup folder for each columnfamily. - * @throws Exception incase of IOException. - */ - public static Set getBackupDirectories(IConfiguration config, String monitoringFolder) - throws Exception { - HashSet backupPaths = new HashSet<>(); - if (config.getDataFileLocation() == null) return backupPaths; - Path dataPath = Paths.get(config.getDataFileLocation()); - if (Files.exists(dataPath) && Files.isDirectory(dataPath)) - try (DirectoryStream directoryStream = - Files.newDirectoryStream(dataPath, path -> Files.isDirectory(path))) { - for (Path keyspaceDirPath : directoryStream) { - try (DirectoryStream keyspaceStream = - Files.newDirectoryStream( - keyspaceDirPath, path -> Files.isDirectory(path))) { - for (Path columnfamilyDirPath : keyspaceStream) { - Path backupDirPath = - Paths.get(columnfamilyDirPath.toString(), monitoringFolder); - if (Files.exists(backupDirPath) && Files.isDirectory(backupDirPath)) { - logger.debug("Backup folder: {}", backupDirPath); - backupPaths.add(backupDirPath); - } - } - } - } - } - return backupPaths; - } - protected static File[] getSecondaryIndexDirectories(File backupDir) { FileFilter filter = (file) -> file.getName().startsWith(".") && isAReadableDirectory(file); return Optional.ofNullable(backupDir.listFiles(filter)).orElse(new File[] {}); diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java b/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java index e165f6bc2..635d91b26 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java @@ -18,9 +18,13 @@ package com.netflix.priam.backup; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.netflix.priam.backupv2.IMetaProxy; import com.netflix.priam.utils.DateUtil; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.*; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -130,4 +134,36 @@ public final boolean isFiltered(String keyspace, String columnFamilyDir) { || includeFilter.get(keyspace).contains(columnFamilyName))); return false; } + + /** + * Get all the backup directories for Cassandra. + * + * @param dataDirectory the location of the data folder. + * @param monitoringFolder folder where cassandra backup's are configured. + * @return Set of the path(s) containing the backup folder for each columnfamily. + * @throws Exception incase of IOException. + */ + public static ImmutableSet getBackupDirectories( + String dataDirectory, String monitoringFolder) throws Exception { + ImmutableSet.Builder backupPaths = ImmutableSet.builder(); + Path dataPath = Paths.get(dataDirectory); + if (Files.exists(dataPath) && Files.isDirectory(dataPath)) + try (DirectoryStream directoryStream = + Files.newDirectoryStream(dataPath, path -> Files.isDirectory(path))) { + for (Path keyspaceDirPath : directoryStream) { + try (DirectoryStream keyspaceStream = + Files.newDirectoryStream( + keyspaceDirPath, path -> Files.isDirectory(path))) { + for (Path columnfamilyDirPath : keyspaceStream) { + Path backupDirPath = + Paths.get(columnfamilyDirPath.toString(), monitoringFolder); + if (Files.exists(backupDirPath) && Files.isDirectory(backupDirPath)) { + backupPaths.add(backupDirPath); + } + } + } + } + } + return backupPaths.build(); + } } diff --git a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java index c757db36e..37afc9376 100644 --- a/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java @@ -77,7 +77,8 @@ public static TaskTimer getTimer( private static void cleanOldBackups(IConfiguration configuration) throws Exception { Set backupPaths = - AbstractBackup.getBackupDirectories(configuration, INCREMENTAL_BACKUP_FOLDER); + BackupRestoreUtil.getBackupDirectories( + configuration.getDataFileLocation(), INCREMENTAL_BACKUP_FOLDER); for (Path backupDirPath : backupPaths) { FileUtils.cleanDirectory(backupDirPath.toFile()); } diff --git a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java index bc05c1874..639c85b86 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java @@ -146,7 +146,9 @@ public static TaskTimer getTimer(IBackupRestoreConfig config) throws IllegalArgu static void cleanOldBackups(IConfiguration config) throws Exception { // Clean up all the backup directories, if any. - Set backupPaths = AbstractBackup.getBackupDirectories(config, SNAPSHOT_FOLDER); + Set backupPaths = + BackupRestoreUtil.getBackupDirectories( + config.getDataFileLocation(), SNAPSHOT_FOLDER); for (Path backupDirPath : backupPaths) try (DirectoryStream directoryStream = Files.newDirectoryStream(backupDirPath, Files::isDirectory)) { diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java index e017ef295..26f3409f9 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java @@ -22,6 +22,7 @@ import com.google.inject.Injector; import com.netflix.priam.backup.AbstractBackup; import com.netflix.priam.backup.BRTestModule; +import com.netflix.priam.backup.BackupRestoreUtil; import com.netflix.priam.config.IBackupRestoreConfig; import com.netflix.priam.config.IConfiguration; import com.netflix.priam.connection.JMXNodeTool; @@ -116,7 +117,8 @@ public void testBackupDisabled( // snapshot V2 name should not be there. Set backupPaths = - AbstractBackup.getBackupDirectories(configuration, AbstractBackup.SNAPSHOT_FOLDER); + BackupRestoreUtil.getBackupDirectories( + configuration.getDataFileLocation(), AbstractBackup.SNAPSHOT_FOLDER); for (Path backupPath : backupPaths) { Assert.assertFalse(Files.exists(Paths.get(backupPath.toString(), snapshotName))); Assert.assertTrue(Files.exists(Paths.get(backupPath.toString(), snapshotV1Name))); From 955727593c900edf1a4edddb03859b48c1d56c29 Mon Sep 17 00:00:00 2001 From: Matt Lehman Date: Thu, 2 May 2024 09:29:57 -0700 Subject: [PATCH 15/15] Cease stopping Cassandra in the restore process and reveal the ability to restore to a staging area and then import or move the data depending on whether Cassandra is running. --- .../priam/backup/AbstractBackupPath.java | 9 +- .../priam/backup/BackupRestoreUtil.java | 10 +- .../netflix/priam/config/IConfiguration.java | 4 + .../priam/config/PriamConfiguration.java | 5 + .../priam/connection/CassandraOperations.java | 61 ++++++++ .../connection/ICassandraOperations.java | 6 + .../priam/health/CassandraMonitor.java | 4 +- .../priam/restore/AbstractRestore.java | 29 ++-- .../com/netflix/priam/restore/Restore.java | 7 +- .../cluser/management/TestCompaction.groovy | 2 +- .../netflix/priam/backup/BRTestModule.java | 3 + .../priam/backupv2/TestBackupV2Service.java | 2 + .../priam/config/FakeConfiguration.java | 13 ++ .../connection/TestCassandraOperations.java | 136 +++++++++++++++++- .../priam/health/TestCassandraMonitor.java | 2 +- 15 files changed, 261 insertions(+), 32 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java index 4ddbd22c5..579bced47 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java @@ -146,12 +146,13 @@ protected String match(Date start, Date end) { /** Local restore file */ public File newRestoreFile() { File return_; - String dataDir = config.getDataFileLocation(); + String dataDir = config.getRestoreDataLocation(); switch (type) { case SECONDARY_INDEX_V2: - String restoreFileName = - PATH_JOINER.join(dataDir, keyspace, columnFamily, indexDir, fileName); - return_ = new File(restoreFileName); + return_ = + new File( + PATH_JOINER.join( + dataDir, keyspace, columnFamily, indexDir, fileName)); break; case META_V2: return_ = new File(PATH_JOINER.join(config.getDataFileLocation(), fileName)); diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java b/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java index 635d91b26..1e17cb80f 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupRestoreUtil.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet; import com.netflix.priam.backupv2.IMetaProxy; import com.netflix.priam.utils.DateUtil; +import java.io.IOException; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; @@ -141,19 +142,18 @@ public final boolean isFiltered(String keyspace, String columnFamilyDir) { * @param dataDirectory the location of the data folder. * @param monitoringFolder folder where cassandra backup's are configured. * @return Set of the path(s) containing the backup folder for each columnfamily. - * @throws Exception incase of IOException. + * @throws IOException */ public static ImmutableSet getBackupDirectories( - String dataDirectory, String monitoringFolder) throws Exception { + String dataDirectory, String monitoringFolder) throws IOException { ImmutableSet.Builder backupPaths = ImmutableSet.builder(); Path dataPath = Paths.get(dataDirectory); if (Files.exists(dataPath) && Files.isDirectory(dataPath)) try (DirectoryStream directoryStream = - Files.newDirectoryStream(dataPath, path -> Files.isDirectory(path))) { + Files.newDirectoryStream(dataPath, Files::isDirectory)) { for (Path keyspaceDirPath : directoryStream) { try (DirectoryStream keyspaceStream = - Files.newDirectoryStream( - keyspaceDirPath, path -> Files.isDirectory(path))) { + Files.newDirectoryStream(keyspaceDirPath, Files::isDirectory)) { for (Path columnfamilyDirPath : keyspaceStream) { Path backupDirPath = Paths.get(columnfamilyDirPath.toString(), monitoringFolder); diff --git a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java index 7c611f0fe..c9f17eb06 100644 --- a/priam/src/main/java/com/netflix/priam/config/IConfiguration.java +++ b/priam/src/main/java/com/netflix/priam/config/IConfiguration.java @@ -398,6 +398,10 @@ default String getRestoreSnapshot() { return StringUtils.EMPTY; } + default String getRestoreDataLocation() { + return getCassandraBaseDirectory() + "/restore"; + } + /** @return Get the region to connect to SDB for instance identity */ default String getSDBInstanceIdentityRegion() { return "us-east-1"; diff --git a/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java b/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java index d565eb1f2..cfd3eae39 100644 --- a/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java +++ b/priam/src/main/java/com/netflix/priam/config/PriamConfiguration.java @@ -278,6 +278,11 @@ public String getRestoreSnapshot() { return config.get(PRIAM_PRE + ".restore.snapshot", ""); } + @Override + public String getRestoreDataLocation() { + return config.get(PRIAM_PRE + ".restore.data.location"); + } + @Override public boolean isRestoreEncrypted() { return config.get(PRIAM_PRE + ".encrypted.restore.enabled", false); diff --git a/priam/src/main/java/com/netflix/priam/connection/CassandraOperations.java b/priam/src/main/java/com/netflix/priam/connection/CassandraOperations.java index 299610e27..dbdb02653 100644 --- a/priam/src/main/java/com/netflix/priam/connection/CassandraOperations.java +++ b/priam/src/main/java/com/netflix/priam/connection/CassandraOperations.java @@ -16,8 +16,17 @@ */ package com.netflix.priam.connection; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.netflix.priam.backup.BackupRestoreUtil; import com.netflix.priam.config.IConfiguration; +import com.netflix.priam.health.CassandraMonitor; import com.netflix.priam.utils.RetryableCallable; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.*; import javax.inject.Inject; import org.apache.cassandra.db.ColumnFamilyStoreMBean; @@ -202,4 +211,56 @@ public List> gossipInfo() throws Exception { } return returnPublicIpSourceIpMap; } + + @Override + public List importAll(String srcDir) throws IOException { + List failedImports = new ArrayList<>(); + if (CassandraMonitor.hasCassadraStarted()) { + for (Path tableDir : BackupRestoreUtil.getBackupDirectories(srcDir, "")) { + String keyspace = tableDir.getParent().getFileName().toString(); + String table = tableDir.getFileName().toString().split("-")[0]; + failedImports.addAll(importData(keyspace, table, tableDir.toString())); + } + } else { + recursiveMove(Paths.get(srcDir), Paths.get(configuration.getDataFileLocation())); + } + return failedImports; + } + + private List importData(String keyspace, String table, String source) + throws IOException { + try (JMXNodeTool nodeTool = JMXNodeTool.instance(configuration)) { + return nodeTool.importNewSSTables( + keyspace, + table, + ImmutableSet.of(source), + false /* resetLevel */, + false /* clearRepaired */, + true /* verifySSTables */, + true /* verifyTokens */, + true /* invalidateCaches */, + false /* extendedVerify */, + false /* copyData */); + } + } + + private void recursiveMove(Path source, Path destination) throws IOException { + Preconditions.checkState(Files.exists(source)); + if (!Files.exists(destination)) { + if (!destination.toFile().mkdirs()) { + throw new IOException("Failed creating " + destination); + } + } + try (DirectoryStream directoryStream = Files.newDirectoryStream(source)) { + for (Path path : directoryStream) { + if (Files.isRegularFile(path)) { + Files.move(path, destination.resolve(path.getFileName())); + } else if (Files.isDirectory(path)) { + recursiveMove(path, destination.resolve(path.getFileName())); + } else { + throw new IOException("Failed determining type of inode is " + path); + } + } + } + } } diff --git a/priam/src/main/java/com/netflix/priam/connection/ICassandraOperations.java b/priam/src/main/java/com/netflix/priam/connection/ICassandraOperations.java index ae0c97201..481198e1d 100644 --- a/priam/src/main/java/com/netflix/priam/connection/ICassandraOperations.java +++ b/priam/src/main/java/com/netflix/priam/connection/ICassandraOperations.java @@ -61,4 +61,10 @@ public interface ICassandraOperations { void forceKeyspaceFlush(String keyspaceName) throws Exception; List> gossipInfo() throws Exception; + + /** + * import sstables from the directory at srcDir into the configured data directory. importAll + * Will just move them if Cassandra hasn't started. + */ + List importAll(String srcDir) throws Exception; } diff --git a/priam/src/main/java/com/netflix/priam/health/CassandraMonitor.java b/priam/src/main/java/com/netflix/priam/health/CassandraMonitor.java index 4f580e8af..1955e4aa5 100644 --- a/priam/src/main/java/com/netflix/priam/health/CassandraMonitor.java +++ b/priam/src/main/java/com/netflix/priam/health/CassandraMonitor.java @@ -169,8 +169,8 @@ public static Boolean hasCassadraStarted() { } // Added for testing only - public static void setIsCassadraStarted() { + public static void setIsCassandraStarted(boolean newStartedState) { // Setting cassandra flag to true - isCassandraStarted.set(true); + isCassandraStarted.set(newStartedState); } } diff --git a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java index 5d1ecc0c5..6b6d067f9 100644 --- a/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java +++ b/priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java @@ -22,16 +22,17 @@ import com.netflix.priam.backup.Status; import com.netflix.priam.backupv2.IMetaProxy; import com.netflix.priam.config.IConfiguration; +import com.netflix.priam.connection.ICassandraOperations; import com.netflix.priam.defaultimpl.ICassandraProcess; +import com.netflix.priam.health.CassandraMonitor; import com.netflix.priam.health.InstanceState; import com.netflix.priam.identity.InstanceIdentity; import com.netflix.priam.scheduler.Task; import com.netflix.priam.utils.DateUtil; import com.netflix.priam.utils.RetryableCallable; import com.netflix.priam.utils.Sleeper; -import java.io.IOException; import java.math.BigInteger; -import java.nio.file.Path; +import java.nio.file.*; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.Future; @@ -59,6 +60,7 @@ public abstract class AbstractRestore extends Task implements IRestoreStrategy { private final ICassandraProcess cassProcess; private final InstanceState instanceState; private final IPostRestoreHook postRestoreHook; + private final ICassandraOperations cassOps; @Inject @Named("v2") @@ -73,7 +75,8 @@ public AbstractRestore( RestoreTokenSelector tokenSelector, ICassandraProcess cassProcess, InstanceState instanceState, - IPostRestoreHook postRestoreHook) { + IPostRestoreHook postRestoreHook, + ICassandraOperations cassOps) { super(config); this.fs = fs; this.sleeper = sleeper; @@ -86,6 +89,7 @@ public AbstractRestore( new BackupRestoreUtil( config.getRestoreIncludeCFList(), config.getRestoreExcludeCFList()); this.postRestoreHook = postRestoreHook; + this.cassOps = cassOps; } public static boolean isRestoreEnabled(IConfiguration conf) { @@ -108,10 +112,6 @@ private void waitForCompletion(List> futureList) throws Exception { for (Future future : futureList) future.get(); } - private void stopCassProcess() throws IOException { - cassProcess.stop(true); - } - @Override public void execute() throws Exception { if (!isRestoreEnabled(config)) return; @@ -144,7 +144,6 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { new Date(dateRange.getStartTime().toEpochMilli())); instanceIdentity.getInstance().setToken(restoreToken.toString()); } - stopCassProcess(); Optional latestValidMetaFile = BackupRestoreUtil.getLatestValidMetaPath(metaProxy, dateRange); if (!latestValidMetaFile.isPresent()) { @@ -165,9 +164,17 @@ public void restore(DateUtil.DateRange dateRange) throws Exception { } List> futureList = new ArrayList<>(download(allFiles.iterator())); waitForCompletion(futureList); - postRestoreHook.execute(); - instanceState.endRestore(Status.FINISHED, LocalDateTime.now()); - if (!config.doesCassandraStartManually()) cassProcess.start(true); + List failedImports = cassOps.importAll(config.getRestoreDataLocation()); + if (!failedImports.isEmpty()) { + instanceState.endRestore(Status.FAILED, LocalDateTime.now()); + } else { + postRestoreHook.execute(); + if (!config.doesCassandraStartManually() + && !CassandraMonitor.hasCassadraStarted()) { + cassProcess.start(true); + } + instanceState.endRestore(Status.FINISHED, LocalDateTime.now()); + } } catch (Exception e) { instanceState.endRestore(Status.FAILED, LocalDateTime.now()); throw e; diff --git a/priam/src/main/java/com/netflix/priam/restore/Restore.java b/priam/src/main/java/com/netflix/priam/restore/Restore.java index 4c23d2e60..dceaabe5c 100644 --- a/priam/src/main/java/com/netflix/priam/restore/Restore.java +++ b/priam/src/main/java/com/netflix/priam/restore/Restore.java @@ -19,6 +19,7 @@ import com.netflix.priam.backup.AbstractBackupPath; import com.netflix.priam.backup.IBackupFileSystem; import com.netflix.priam.config.IConfiguration; +import com.netflix.priam.connection.ICassandraOperations; import com.netflix.priam.defaultimpl.ICassandraProcess; import com.netflix.priam.health.InstanceState; import com.netflix.priam.identity.InstanceIdentity; @@ -46,7 +47,8 @@ public Restore( InstanceIdentity instanceIdentity, RestoreTokenSelector tokenSelector, InstanceState instanceState, - IPostRestoreHook postRestoreHook) { + IPostRestoreHook postRestoreHook, + ICassandraOperations cassandraOperations) { super( config, fs, @@ -56,7 +58,8 @@ public Restore( tokenSelector, cassProcess, instanceState, - postRestoreHook); + postRestoreHook, + cassandraOperations); } @Override diff --git a/priam/src/test/groovy/com/netflix/priam/cluser/management/TestCompaction.groovy b/priam/src/test/groovy/com/netflix/priam/cluser/management/TestCompaction.groovy index 90fe7db80..1e073b95a 100644 --- a/priam/src/test/groovy/com/netflix/priam/cluser/management/TestCompaction.groovy +++ b/priam/src/test/groovy/com/netflix/priam/cluser/management/TestCompaction.groovy @@ -150,7 +150,7 @@ class TestCompaction extends Specification { private static int concurrentRuns(int size) { - CassandraMonitor.setIsCassadraStarted() + CassandraMonitor.setIsCassandraStarted(true) ExecutorService threads = Executors.newFixedThreadPool(size) List> torun = new ArrayList<>(size) for (int i = 0; i < size; i++) { diff --git a/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java b/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java index 0e87702ee..939dbc63d 100644 --- a/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java +++ b/priam/src/test/java/com/netflix/priam/backup/BRTestModule.java @@ -28,6 +28,8 @@ import com.netflix.priam.config.FakeConfiguration; import com.netflix.priam.config.IBackupRestoreConfig; import com.netflix.priam.config.IConfiguration; +import com.netflix.priam.connection.CassandraOperations; +import com.netflix.priam.connection.ICassandraOperations; import com.netflix.priam.cred.ICredential; import com.netflix.priam.defaultimpl.FakeCassandraProcess; import com.netflix.priam.defaultimpl.ICassandraProcess; @@ -78,5 +80,6 @@ protected void configure() { bind(IMetaProxy.class).annotatedWith(Names.named("v2")).to(MetaV2Proxy.class); bind(DynamicRateLimiter.class).to(FakeDynamicRateLimiter.class); bind(Clock.class).toInstance(Clock.fixed(Instant.EPOCH, ZoneId.systemDefault())); + bind(ICassandraOperations.class).to(CassandraOperations.class).in(Scopes.SINGLETON); } } diff --git a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java index 26f3409f9..9a483b04c 100644 --- a/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java +++ b/priam/src/test/java/com/netflix/priam/backupv2/TestBackupV2Service.java @@ -201,6 +201,8 @@ public void updateService( result = "-1"; configuration.isIncrementalBackupEnabled(); result = true; + configuration.getDataFileLocation(); + result = "target/data"; backupRestoreConfig.enableV2Backups(); result = true; backupRestoreConfig.getBackupVerificationCronExpression(); diff --git a/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java b/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java index 4d6f65c8b..36f358e90 100644 --- a/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java +++ b/priam/src/test/java/com/netflix/priam/config/FakeConfiguration.java @@ -44,6 +44,7 @@ public class FakeConfiguration implements IConfiguration { private String partitioner; private String diskFailurePolicy; private int blockForPeersTimeoutInSecs; + private String dataFileLocation; public Map fakeProperties = new HashMap<>(); @@ -345,4 +346,16 @@ public FakeConfiguration setBlockForPeersTimeoutInSecs(int timeout) { public int getBlockForPeersTimeoutInSecs() { return this.blockForPeersTimeoutInSecs; } + + public FakeConfiguration setDataFileLocation(String dataFileLocation) { + this.dataFileLocation = dataFileLocation; + return this; + } + + @Override + public String getDataFileLocation() { + return dataFileLocation == null + ? IConfiguration.super.getDataFileLocation() + : dataFileLocation; + } } diff --git a/priam/src/test/java/com/netflix/priam/connection/TestCassandraOperations.java b/priam/src/test/java/com/netflix/priam/connection/TestCassandraOperations.java index 89c9e5e59..2a2b0a6e9 100644 --- a/priam/src/test/java/com/netflix/priam/connection/TestCassandraOperations.java +++ b/priam/src/test/java/com/netflix/priam/connection/TestCassandraOperations.java @@ -17,12 +17,20 @@ package com.netflix.priam.connection; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Files; +import com.google.common.truth.Truth; import com.google.inject.Guice; import com.google.inject.Injector; import com.mchange.io.FileUtils; import com.netflix.priam.backup.BRTestModule; +import com.netflix.priam.config.FakeConfiguration; import com.netflix.priam.config.IConfiguration; +import com.netflix.priam.health.CassandraMonitor; import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.List; import java.util.Map; import mockit.Expectations; @@ -30,17 +38,34 @@ import mockit.MockUp; import mockit.Mocked; import org.apache.cassandra.tools.NodeProbe; -import org.junit.Assert; -import org.junit.Test; +import org.junit.*; +import org.junit.rules.TestName; /** Created by aagrawal on 3/1/19. */ public class TestCassandraOperations { private final String gossipInfo1 = "src/test/resources/gossipInfoSample_1.txt"; @Mocked private NodeProbe nodeProbe; @Mocked private JMXNodeTool jmxNodeTool; + private FakeConfiguration config; private static CassandraOperations cassandraOperations; + private static final String BASE_DIR = "base"; + private static final String RESTORE_DIR = "restore"; + private static final String DATA_DIR = "data"; + private static final String KS = "ks"; + private static final String TB = "tb"; + private static final String SI = "si"; + private static final String DATAFILE = "datafile"; + private static final String SI_DATAFILE = "si_datafile"; - public TestCassandraOperations() { + @Rule public TestName name = new TestName(); + + @BeforeClass + public static void prepBeforeAllTests() { + deleteBaseDirectoryForAllTests(); + } + + @Before + public void prepareTest() throws IOException { new MockUp() { @Mock NodeProbe instance(IConfiguration config) { @@ -48,13 +73,29 @@ NodeProbe instance(IConfiguration config) { } }; Injector injector = Guice.createInjector(new BRTestModule()); - if (cassandraOperations == null) - cassandraOperations = injector.getInstance(CassandraOperations.class); + cassandraOperations = injector.getInstance(CassandraOperations.class); + Paths.get(getRestoreDir(), KS, TB, SI).toFile().mkdirs(); + Files.touch(Paths.get(getRestoreDir(), KS, TB, DATAFILE).toFile()); + Files.touch(Paths.get(getRestoreDir(), KS, TB, SI, SI_DATAFILE).toFile()); + config = (FakeConfiguration) injector.getInstance(IConfiguration.class); + config.setDataFileLocation(getDataDir()); + CassandraMonitor.setIsCassandraStarted(false); + } + + @After + public void cleanup() { + org.apache.commons.io.FileUtils.deleteQuietly(Paths.get(getRestoreDir()).toFile()); + org.apache.commons.io.FileUtils.deleteQuietly(Paths.get(getDataDir()).toFile()); + config.setDataFileLocation(null); + } + + @AfterClass + public static void cleanupAfterAllTests() { + deleteBaseDirectoryForAllTests(); } @Test public void testGossipInfo() throws Exception { - String gossipInfoFromNodetool = FileUtils.getContentsAsString(new File(gossipInfo1)); new Expectations() { { @@ -80,4 +121,87 @@ public void testGossipInfo() throws Exception { Assert.assertEquals("[123,234]", gossipInfo.get("TOKENS")); }); } + + @Test + public void testRestoreViaMove_dataDirectoryIsCreated() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir()).toFile().exists()); + } + + @Test + public void testRestoreViaMove_keyspaceDirectoryIsCreated() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir(), KS).toFile().exists()); + } + + @Test + public void testRestoreViaMove_tableDirectoryIsCreated() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir(), KS, TB).toFile().exists()); + } + + @Test + public void testRestoreViaMove_secondaryIndexDirectoryIsCreated() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir(), KS, TB, SI).toFile().exists()); + } + + @Test + public void testRestoreViaMove_dataFileIsMoved() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir(), KS, TB, DATAFILE).toFile().exists()); + } + + @Test + public void testRestoreViaMove_dataFileIsRemovedFromOrigin() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getRestoreDir(), KS, TB, DATAFILE).toFile().exists()).isFalse(); + } + + @Test + public void testRestoreViaMove_secondaryIndexFileIsMoved() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getDataDir(), KS, TB, SI, SI_DATAFILE).toFile().exists()); + } + + @Test + public void testRestoreViaMove_secondaryIndexFileIsRemovedFromOrigin() throws IOException { + cassandraOperations.importAll(getRestoreDir()); + Truth.assertThat(Paths.get(getRestoreDir(), KS, TB, SI, SI_DATAFILE).toFile().exists()) + .isFalse(); + } + + @Test + public void testRestoreViaImport() throws IOException { + CassandraMonitor.setIsCassandraStarted(true); + new Expectations() { + { + nodeProbe.importNewSSTables( + KS, + TB, + ImmutableSet.of(Paths.get(getRestoreDir(), KS, TB).toString()), + false /* resetLevel */, + false /* clearRepaired */, + true /* verifySSTables */, + true /* verifyTokens */, + true /* invalidateCaches */, + false /* extendedVerify */, + false /* copyData */); + result = new ArrayList<>(); + } + }; + cassandraOperations.importAll(getRestoreDir()); + } + + private static void deleteBaseDirectoryForAllTests() { + org.apache.commons.io.FileUtils.deleteQuietly(Paths.get(BASE_DIR).toFile()); + } + + private String getRestoreDir() { + return BASE_DIR + "/" + name.getMethodName() + RESTORE_DIR; + } + + private String getDataDir() { + return BASE_DIR + "/" + name.getMethodName() + DATA_DIR; + } } diff --git a/priam/src/test/java/com/netflix/priam/health/TestCassandraMonitor.java b/priam/src/test/java/com/netflix/priam/health/TestCassandraMonitor.java index 4bf3fa61c..dfcf1a142 100644 --- a/priam/src/test/java/com/netflix/priam/health/TestCassandraMonitor.java +++ b/priam/src/test/java/com/netflix/priam/health/TestCassandraMonitor.java @@ -63,7 +63,7 @@ public void testCassandraMonitor() throws Exception { Assert.assertFalse(CassandraMonitor.hasCassadraStarted()); - CassandraMonitor.setIsCassadraStarted(); + CassandraMonitor.setIsCassandraStarted(true); Assert.assertTrue(CassandraMonitor.hasCassadraStarted()); monitor.execute();