Skip to content

Commit

Permalink
separating concern of token from backup identifier.
Browse files Browse the repository at this point in the history
  • Loading branch information
zmarois committed Nov 27, 2017
1 parent b482ce2 commit f72091a
Show file tree
Hide file tree
Showing 28 changed files with 295 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ private void setPriamProperties()
String token = null;
String seeds = null;
boolean isReplace = false;
boolean isExternallyDefinedToken = false;
String replacedIp = "";
String extraEnvParams = null;

while (true)
{
try
{
token = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_token");
isExternallyDefinedToken = Boolean.parseBoolean(DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/is_externally_defined_token"));
if (isExternallyDefinedToken) {
token = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_token");
}
seeds = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_seeds");
isReplace = Boolean.parseBoolean(DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/is_replace_token"));
replacedIp = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_replaced_ip");
Expand All @@ -64,7 +68,7 @@ private void setPriamProperties()
e.printStackTrace();
}

if (token != null && seeds != null)
if ((token != null || !isExternallyDefinedToken) && seeds != null)
break;
try
{
Expand All @@ -75,8 +79,10 @@ private void setPriamProperties()
// do nothing.
}
}

System.setProperty("cassandra.initial_token", token);

if (isExternallyDefinedToken) {
System.setProperty("cassandra.initial_token", token);
}

setExtraEnvParams(extraEnvParams);

Expand Down
5 changes: 5 additions & 0 deletions priam/src/main/java/com/netflix/priam/IConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ public interface IConfiguration {
*/
public String getHostIP();

/**
* @return Gets the number of tokens assigned to the node when using virtual nodes.
*/
public int getNumTokens();

/**
* @return Bytes per second to throttle for backups
*/
Expand Down
2 changes: 1 addition & 1 deletion priam/src/main/java/com/netflix/priam/PriamServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public PriamServer(IConfiguration config, PriamScheduler scheduler, InstanceIden
}

public void intialize() throws Exception {
if (id.getInstance().isOutOfService())
if (id.isOutOfService())
return;

// start to schedule jobs
Expand Down
10 changes: 5 additions & 5 deletions priam/src/main/java/com/netflix/priam/aws/S3BackupPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public String getRemotePath() {
buff.append(baseDir).append(S3BackupPath.PATH_SEP); // Base dir
buff.append(region).append(S3BackupPath.PATH_SEP);
buff.append(clusterName).append(S3BackupPath.PATH_SEP);// Cluster name
buff.append(token).append(S3BackupPath.PATH_SEP);
buff.append(nodeIdentifier).append(S3BackupPath.PATH_SEP);
buff.append(formatDate(time)).append(S3BackupPath.PATH_SEP);
buff.append(type).append(S3BackupPath.PATH_SEP);
if (type != BackupFileType.META && type != BackupFileType.CL) {
Expand Down Expand Up @@ -83,7 +83,7 @@ public void parseRemote(String remoteFilePath) {
baseDir = pieces.get(0);
region = pieces.get(1);
clusterName = pieces.get(2);
token = pieces.get(3);
nodeIdentifier = pieces.get(3);
time = parseDate(pieces.get(4));
type = BackupFileType.valueOf(pieces.get(5));
if (type != BackupFileType.META && type != BackupFileType.CL) {
Expand All @@ -109,14 +109,14 @@ public void parsePartialPrefix(String remoteFilePath) {
baseDir = pieces.get(0);
region = pieces.get(1);
clusterName = pieces.get(2);
token = pieces.get(3);
nodeIdentifier = pieces.get(3);
}

@Override
public String remotePrefix(Date start, Date end, String location) {
StringBuffer buff = new StringBuffer(clusterPrefix(location));
token = factory.getInstance().getToken();
buff.append(token).append(S3BackupPath.PATH_SEP);
nodeIdentifier = instanceIdentity.getBackupIdentifier();
buff.append(nodeIdentifier).append(S3BackupPath.PATH_SEP);
// match the common characters to prefix.
buff.append(match(start, end));
return buff.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static TaskTimer getTimer(InstanceIdentity id) {
logger.info("Seed node. Instance id: {}"
+ ", host ip: {}"
+ ", host name: {}",
id.getInstance().getInstanceId(), id.getInstance().getHostIP(), id.getInstance().getHostName());
id.getInstanceId(), id.getHostIP(), id.getHostName());
return_ = new SimpleTimer(JOBNAME, 120 * 1000 + ran.nextInt(120 * 1000));
} else
return_ = new SimpleTimer(JOBNAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@ public enum BackupFileType {
protected String columnFamily;
protected String fileName;
protected String baseDir;
protected String token;
protected String nodeIdentifier;
protected String region;
protected Date time;
protected long size; //uncompressed file size
protected long compressedFileSize = 0;
protected boolean isCassandra1_0;

protected final InstanceIdentity factory;
protected final InstanceIdentity instanceIdentity;
protected final IConfiguration config;
protected File backupFile;
protected Date uploadedTs;
protected int awsSlowDownExceptionCounter = 0;

public AbstractBackupPath(IConfiguration config, InstanceIdentity factory) {
this.factory = factory;
public AbstractBackupPath(IConfiguration config, InstanceIdentity instanceIdentity) {
this.instanceIdentity = instanceIdentity;
this.config = config;
}

Expand All @@ -93,7 +93,7 @@ public void parseLocal(File file, BackupFileType type) throws ParseException {
this.clusterName = config.getAppName();
this.baseDir = config.getBackupLocation();
this.region = config.getDC();
this.token = factory.getInstance().getToken();
this.nodeIdentifier = instanceIdentity.getBackupIdentifier();
this.type = type;
if (type != BackupFileType.META && type != BackupFileType.CL) {
this.keyspace = elements[0];
Expand Down Expand Up @@ -214,8 +214,8 @@ public String getBaseDir() {
return baseDir;
}

public String getToken() {
return token;
public String getNodeIdentifier() {
return nodeIdentifier;
}

public String getRegion() {
Expand Down Expand Up @@ -262,7 +262,7 @@ public void setFileName(String fileName) {
}

public InstanceIdentity getInstanceIdentity() {
return this.factory;
return this.instanceIdentity;
}

public void setUploadedTs(Date uploadedTs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void execute() throws Exception {

Date startTime = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime();
snapshotName = pathFactory.get().formatDate(startTime);
String token = instanceIdentity.getInstance().getToken();
String token = instanceIdentity.getToken();

// Save start snapshot status
BackupMetadata backupMetadata = new BackupMetadata(token, startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ public String getCassProcessName() {
return config.get(CONFIG_CASS_PROCESS_NAME, DEFAULT_CASS_PROCESS_NAME);
}

@Override
public int getNumTokens() {
return config.get(CONFIG_VNODE_NUM_TOKENS, DEFAULT_VNODE_NUM_TOKENS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.netflix.priam.utils.ITokenManager;
import com.netflix.priam.utils.RetryableCallable;
import com.netflix.priam.utils.Sleeper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,8 +79,12 @@ public boolean test(PriamInstance input) {
};

private PriamInstance myInstance;
private String backupIdentifier;
private String token;
private boolean outOfService = false;
private boolean isReplace = false;
private boolean isTokenPregenerated = false;
private boolean initialized = true;
private String replacedIp = "";
private IDeadTokenRetriever deadTokenRetriever;
private IPreGeneratedTokenRetriever preGeneratedTokenRetriever;
Expand All @@ -104,7 +109,7 @@ public InstanceIdentity(IPriamInstanceFactory factory, IMembership membership, I
init();
}

public PriamInstance getInstance() {
PriamInstance getInstance() {
return myInstance;
}

Expand All @@ -118,7 +123,7 @@ public PriamInstance retriableCall() throws Exception {
for (PriamInstance ins : deadInstances) {
logger.info("[Dead] Iterating though the hosts: {}", ins.getInstanceId());
if (ins.getInstanceId().equals(config.getInstanceName())) {
ins.setOutOfService(true);
outOfService = true;
logger.info("[Dead] found that this node is dead."
+ " application: {}"
+ ", id: {}"
Expand Down Expand Up @@ -242,6 +247,16 @@ public void forEachExecution() {
}

logger.info("My token: {}", myInstance.getToken());

if (myInstance.getToken() == null || myInstance.getToken().isEmpty()) {
backupIdentifier = "virual" + Integer.toString(myInstance.getId());
} else
{
backupIdentifier = myInstance.getToken();
}

token = myInstance.getToken();
initialized = true;
}

private void populateRacMap() {
Expand Down Expand Up @@ -304,4 +319,49 @@ public String getReplacedIp() {
private static boolean isInstanceDummy(PriamInstance instance) {
return instance.getInstanceId().equals(DUMMY_INSTANCE_ID);
}

public boolean isOutOfService()
{
return outOfService;
}

public String getBackupIdentifier()
{
return backupIdentifier;
}

public void setBackupIdentifier(String backupIdentifier)
{
this.backupIdentifier = backupIdentifier;
}

public String getToken()
{
return token;
}

public void setToken(String token)
{
this.token = token;
}

public String getInstanceId()
{
return myInstance.getInstanceId();
}

public String getHostIP()
{
return myInstance.getHostIP();
}

public String getHostName()
{
return myInstance.getHostName();
}

public boolean isExternallyDefinedToken()
{
return initialized && StringUtils.isNotBlank(token);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,5 @@ public void setUpdatetime(long updatetime) {
this.updatetime = updatetime;
}

public boolean isOutOfService() {
return outOfService;
}

public void setOutOfService(boolean outOfService) {
this.outOfService = outOfService;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public NewTokenRetriever(IPriamInstanceFactory factory, IMembership membership,
@Override
public PriamInstance get() throws Exception {

logger.info("Generating my own and new token");
logger.info("Generating my own and new identifier");
// Sleep random interval - upto 15 sec
sleeper.sleep(new Random().nextInt(15000));
int hash = tokenManager.regionOffset(config.getDC());
Expand All @@ -74,10 +74,16 @@ public PriamInstance get() throws Exception {
} else
my_slot = config.getRacs().size() + maxSlot;

logger.info("Trying to createToken with slot {} with rac count {} with rac membership size {} with dc {}",
my_slot, membership.getRacCount(), membership.getRacMembershipSize(), config.getDC());
String payload = tokenManager.createToken(my_slot, membership.getRacCount(), membership.getRacMembershipSize(), config.getDC());
return factory.create(config.getAppName(), my_slot + hash, config.getInstanceName(), config.getHostname(), config.getHostIP(), config.getRac(), null, payload);
int identifier = my_slot + hash;
String token = null;
if (config.getNumTokens() == 1)
{
logger.info("Trying to createToken with slot {} with rac count {} with rac membership size {} with dc {}",
my_slot, membership.getRacCount(), membership.getRacMembershipSize(), config.getDC());
token = tokenManager.createToken(
my_slot, membership.getRacCount(), membership.getRacMembershipSize(), config.getDC());
}
return factory.create(config.getAppName(), identifier, config.getInstanceName(), config.getHostname(), config.getHostIP(), config.getRac(), null, token);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ private void notify(AbstractBackupPath abp, String uploadStatus) {
jsonObject.put("cf", abp.getColumnFamily());
jsonObject.put("region", abp.getRegion());
jsonObject.put("rack", this.config.getRac());
jsonObject.put("token", abp.getToken());
jsonObject.put("token", abp.getNodeIdentifier());
jsonObject.put("token", abp.getNodeIdentifier());
jsonObject.put("nodeIdentifier", abp.getNodeIdentifier());
jsonObject.put("filename", abp.getFileName());
jsonObject.put("uncompressfilesize", abp.getSize());
jsonObject.put("compressfilesize", abp.getCompressedFileSize());
Expand Down
Loading

0 comments on commit f72091a

Please sign in to comment.