Skip to content

Commit

Permalink
Cease stopping Cassandra in the restore process and reveal the abilit…
Browse files Browse the repository at this point in the history
…y to restore to a staging area and then import or move the data depending on whether Cassandra is running.
  • Loading branch information
mattl-netflix committed May 5, 2024
1 parent 4ece8d3 commit 9557275
Show file tree
Hide file tree
Showing 15 changed files with 261 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,13 @@ protected String match(Date start, Date end) {
/** Local restore file */
public File newRestoreFile() {
File return_;
String dataDir = config.getDataFileLocation();
String dataDir = config.getRestoreDataLocation();
switch (type) {
case SECONDARY_INDEX_V2:
String restoreFileName =
PATH_JOINER.join(dataDir, keyspace, columnFamily, indexDir, fileName);
return_ = new File(restoreFileName);
return_ =
new File(
PATH_JOINER.join(
dataDir, keyspace, columnFamily, indexDir, fileName));
break;
case META_V2:
return_ = new File(PATH_JOINER.join(config.getDataFileLocation(), fileName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableSet;
import com.netflix.priam.backupv2.IMetaProxy;
import com.netflix.priam.utils.DateUtil;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -141,19 +142,18 @@ public final boolean isFiltered(String keyspace, String columnFamilyDir) {
* @param dataDirectory the location of the data folder.
* @param monitoringFolder folder where cassandra backup's are configured.
* @return Set of the path(s) containing the backup folder for each columnfamily.
* @throws Exception incase of IOException.
* @throws IOException
*/
public static ImmutableSet<Path> getBackupDirectories(
String dataDirectory, String monitoringFolder) throws Exception {
String dataDirectory, String monitoringFolder) throws IOException {
ImmutableSet.Builder<Path> backupPaths = ImmutableSet.builder();
Path dataPath = Paths.get(dataDirectory);
if (Files.exists(dataPath) && Files.isDirectory(dataPath))
try (DirectoryStream<Path> directoryStream =
Files.newDirectoryStream(dataPath, path -> Files.isDirectory(path))) {
Files.newDirectoryStream(dataPath, Files::isDirectory)) {
for (Path keyspaceDirPath : directoryStream) {
try (DirectoryStream<Path> keyspaceStream =
Files.newDirectoryStream(
keyspaceDirPath, path -> Files.isDirectory(path))) {
Files.newDirectoryStream(keyspaceDirPath, Files::isDirectory)) {
for (Path columnfamilyDirPath : keyspaceStream) {
Path backupDirPath =
Paths.get(columnfamilyDirPath.toString(), monitoringFolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ default String getRestoreSnapshot() {
return StringUtils.EMPTY;
}

default String getRestoreDataLocation() {
return getCassandraBaseDirectory() + "/restore";
}

/** @return Get the region to connect to SDB for instance identity */
default String getSDBInstanceIdentityRegion() {
return "us-east-1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ public String getRestoreSnapshot() {
return config.get(PRIAM_PRE + ".restore.snapshot", "");
}

@Override
public String getRestoreDataLocation() {
return config.get(PRIAM_PRE + ".restore.data.location");
}

@Override
public boolean isRestoreEncrypted() {
return config.get(PRIAM_PRE + ".encrypted.restore.enabled", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,17 @@
*/
package com.netflix.priam.connection;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.netflix.priam.backup.BackupRestoreUtil;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.health.CassandraMonitor;
import com.netflix.priam.utils.RetryableCallable;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import javax.inject.Inject;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
Expand Down Expand Up @@ -202,4 +211,56 @@ public List<Map<String, String>> gossipInfo() throws Exception {
}
return returnPublicIpSourceIpMap;
}

@Override
public List<String> importAll(String srcDir) throws IOException {
List<String> failedImports = new ArrayList<>();
if (CassandraMonitor.hasCassadraStarted()) {
for (Path tableDir : BackupRestoreUtil.getBackupDirectories(srcDir, "")) {
String keyspace = tableDir.getParent().getFileName().toString();
String table = tableDir.getFileName().toString().split("-")[0];
failedImports.addAll(importData(keyspace, table, tableDir.toString()));
}
} else {
recursiveMove(Paths.get(srcDir), Paths.get(configuration.getDataFileLocation()));
}
return failedImports;
}

private List<String> importData(String keyspace, String table, String source)
throws IOException {
try (JMXNodeTool nodeTool = JMXNodeTool.instance(configuration)) {
return nodeTool.importNewSSTables(
keyspace,
table,
ImmutableSet.of(source),
false /* resetLevel */,
false /* clearRepaired */,
true /* verifySSTables */,
true /* verifyTokens */,
true /* invalidateCaches */,
false /* extendedVerify */,
false /* copyData */);
}
}

private void recursiveMove(Path source, Path destination) throws IOException {
Preconditions.checkState(Files.exists(source));
if (!Files.exists(destination)) {
if (!destination.toFile().mkdirs()) {
throw new IOException("Failed creating " + destination);
}
}
try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(source)) {
for (Path path : directoryStream) {
if (Files.isRegularFile(path)) {
Files.move(path, destination.resolve(path.getFileName()));
} else if (Files.isDirectory(path)) {
recursiveMove(path, destination.resolve(path.getFileName()));
} else {
throw new IOException("Failed determining type of inode is " + path);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@ public interface ICassandraOperations {
void forceKeyspaceFlush(String keyspaceName) throws Exception;

List<Map<String, String>> gossipInfo() throws Exception;

/**
* import sstables from the directory at srcDir into the configured data directory. importAll
* Will just move them if Cassandra hasn't started.
*/
List<String> importAll(String srcDir) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ public static Boolean hasCassadraStarted() {
}

// Added for testing only
public static void setIsCassadraStarted() {
public static void setIsCassandraStarted(boolean newStartedState) {
// Setting cassandra flag to true
isCassandraStarted.set(true);
isCassandraStarted.set(newStartedState);
}
}
29 changes: 18 additions & 11 deletions priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import com.netflix.priam.backup.Status;
import com.netflix.priam.backupv2.IMetaProxy;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.connection.ICassandraOperations;
import com.netflix.priam.defaultimpl.ICassandraProcess;
import com.netflix.priam.health.CassandraMonitor;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.scheduler.Task;
import com.netflix.priam.utils.DateUtil;
import com.netflix.priam.utils.RetryableCallable;
import com.netflix.priam.utils.Sleeper;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.file.Path;
import java.nio.file.*;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -59,6 +60,7 @@ public abstract class AbstractRestore extends Task implements IRestoreStrategy {
private final ICassandraProcess cassProcess;
private final InstanceState instanceState;
private final IPostRestoreHook postRestoreHook;
private final ICassandraOperations cassOps;

@Inject
@Named("v2")
Expand All @@ -73,7 +75,8 @@ public AbstractRestore(
RestoreTokenSelector tokenSelector,
ICassandraProcess cassProcess,
InstanceState instanceState,
IPostRestoreHook postRestoreHook) {
IPostRestoreHook postRestoreHook,
ICassandraOperations cassOps) {
super(config);
this.fs = fs;
this.sleeper = sleeper;
Expand All @@ -86,6 +89,7 @@ public AbstractRestore(
new BackupRestoreUtil(
config.getRestoreIncludeCFList(), config.getRestoreExcludeCFList());
this.postRestoreHook = postRestoreHook;
this.cassOps = cassOps;
}

public static boolean isRestoreEnabled(IConfiguration conf) {
Expand All @@ -108,10 +112,6 @@ private void waitForCompletion(List<Future<Path>> futureList) throws Exception {
for (Future<Path> future : futureList) future.get();
}

private void stopCassProcess() throws IOException {
cassProcess.stop(true);
}

@Override
public void execute() throws Exception {
if (!isRestoreEnabled(config)) return;
Expand Down Expand Up @@ -144,7 +144,6 @@ public void restore(DateUtil.DateRange dateRange) throws Exception {
new Date(dateRange.getStartTime().toEpochMilli()));
instanceIdentity.getInstance().setToken(restoreToken.toString());
}
stopCassProcess();
Optional<AbstractBackupPath> latestValidMetaFile =
BackupRestoreUtil.getLatestValidMetaPath(metaProxy, dateRange);
if (!latestValidMetaFile.isPresent()) {
Expand All @@ -165,9 +164,17 @@ public void restore(DateUtil.DateRange dateRange) throws Exception {
}
List<Future<Path>> futureList = new ArrayList<>(download(allFiles.iterator()));
waitForCompletion(futureList);
postRestoreHook.execute();
instanceState.endRestore(Status.FINISHED, LocalDateTime.now());
if (!config.doesCassandraStartManually()) cassProcess.start(true);
List<String> failedImports = cassOps.importAll(config.getRestoreDataLocation());
if (!failedImports.isEmpty()) {
instanceState.endRestore(Status.FAILED, LocalDateTime.now());
} else {
postRestoreHook.execute();
if (!config.doesCassandraStartManually()
&& !CassandraMonitor.hasCassadraStarted()) {
cassProcess.start(true);
}
instanceState.endRestore(Status.FINISHED, LocalDateTime.now());
}
} catch (Exception e) {
instanceState.endRestore(Status.FAILED, LocalDateTime.now());
throw e;
Expand Down
7 changes: 5 additions & 2 deletions priam/src/main/java/com/netflix/priam/restore/Restore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.netflix.priam.backup.AbstractBackupPath;
import com.netflix.priam.backup.IBackupFileSystem;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.connection.ICassandraOperations;
import com.netflix.priam.defaultimpl.ICassandraProcess;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.identity.InstanceIdentity;
Expand Down Expand Up @@ -46,7 +47,8 @@ public Restore(
InstanceIdentity instanceIdentity,
RestoreTokenSelector tokenSelector,
InstanceState instanceState,
IPostRestoreHook postRestoreHook) {
IPostRestoreHook postRestoreHook,
ICassandraOperations cassandraOperations) {
super(
config,
fs,
Expand All @@ -56,7 +58,8 @@ public Restore(
tokenSelector,
cassProcess,
instanceState,
postRestoreHook);
postRestoreHook,
cassandraOperations);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class TestCompaction extends Specification {


private static int concurrentRuns(int size) {
CassandraMonitor.setIsCassadraStarted()
CassandraMonitor.setIsCassandraStarted(true)
ExecutorService threads = Executors.newFixedThreadPool(size)
List<Callable<Boolean>> torun = new ArrayList<>(size)
for (int i = 0; i < size; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.netflix.priam.config.FakeConfiguration;
import com.netflix.priam.config.IBackupRestoreConfig;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.connection.CassandraOperations;
import com.netflix.priam.connection.ICassandraOperations;
import com.netflix.priam.cred.ICredential;
import com.netflix.priam.defaultimpl.FakeCassandraProcess;
import com.netflix.priam.defaultimpl.ICassandraProcess;
Expand Down Expand Up @@ -78,5 +80,6 @@ protected void configure() {
bind(IMetaProxy.class).annotatedWith(Names.named("v2")).to(MetaV2Proxy.class);
bind(DynamicRateLimiter.class).to(FakeDynamicRateLimiter.class);
bind(Clock.class).toInstance(Clock.fixed(Instant.EPOCH, ZoneId.systemDefault()));
bind(ICassandraOperations.class).to(CassandraOperations.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ public void updateService(
result = "-1";
configuration.isIncrementalBackupEnabled();
result = true;
configuration.getDataFileLocation();
result = "target/data";
backupRestoreConfig.enableV2Backups();
result = true;
backupRestoreConfig.getBackupVerificationCronExpression();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class FakeConfiguration implements IConfiguration {
private String partitioner;
private String diskFailurePolicy;
private int blockForPeersTimeoutInSecs;
private String dataFileLocation;

public Map<String, String> fakeProperties = new HashMap<>();

Expand Down Expand Up @@ -345,4 +346,16 @@ public FakeConfiguration setBlockForPeersTimeoutInSecs(int timeout) {
public int getBlockForPeersTimeoutInSecs() {
return this.blockForPeersTimeoutInSecs;
}

public FakeConfiguration setDataFileLocation(String dataFileLocation) {
this.dataFileLocation = dataFileLocation;
return this;
}

@Override
public String getDataFileLocation() {
return dataFileLocation == null
? IConfiguration.super.getDataFileLocation()
: dataFileLocation;
}
}
Loading

0 comments on commit 9557275

Please sign in to comment.