Skip to content

Commit

Permalink
#60 check if instances terminated in batch (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
terma committed May 24, 2019
1 parent 32cd66f commit 8947a3b
Show file tree
Hide file tree
Showing 5 changed files with 565 additions and 44 deletions.
11 changes: 3 additions & 8 deletions src/main/java/com/amazon/jenkins/ec2fleet/CloudNanny.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ public long getRecurrencePeriod() {

@Override
protected void doRun() throws Exception {

// Trigger reprovisioning as well
Jenkins.getActiveInstance().unlabeledNodeProvisioner.suggestReviewNow();

final List<FleetStateStats> stats = new ArrayList<FleetStateStats>();
final List<FleetStateStats> stats = new ArrayList<>();
for (final Cloud cloud : Jenkins.getActiveInstance().clouds) {
if (!(cloud instanceof EC2FleetCloud))
continue;
Expand All @@ -43,18 +42,14 @@ protected void doRun() throws Exception {
LOGGER.log(Level.FINE, "Checking cloud: " + fleetCloud.getLabelString());
stats.add(Queue.withLock(new Callable<FleetStateStats>() {
@Override
public FleetStateStats call()
throws Exception {
public FleetStateStats call() {
return fleetCloud.updateStatus();
}
}));
}

for (final Widget w : Jenkins.getInstance().getWidgets()) {
if (!(w instanceof FleetStatusWidget))
continue;

((FleetStatusWidget) w).setStatusList(stats);
if (w instanceof FleetStatusWidget) ((FleetStatusWidget) w).setStatusList(stats);
}
}
}
109 changes: 109 additions & 0 deletions src/main/java/com/amazon/jenkins/ec2fleet/EC2Api.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.amazon.jenkins.ec2fleet;

import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.model.AmazonEC2Exception;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
import com.amazonaws.services.ec2.model.Instance;
import com.amazonaws.services.ec2.model.InstanceStateName;
import com.amazonaws.services.ec2.model.Reservation;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@SuppressWarnings("WeakerAccess")
public class EC2Api {

private static final ImmutableSet<String> TERMINATED_STATES = ImmutableSet.of(
InstanceStateName.Terminated.toString(),
InstanceStateName.Stopped.toString(),
InstanceStateName.Stopping.toString(),
InstanceStateName.ShuttingDown.toString()
);

private static final int BATCH_SIZE = 900;

private static final String NOT_FOUND_ERROR_CODE = "InvalidInstanceID.NotFound";
private static final Pattern INSTANCE_ID_PATTERN = Pattern.compile("(i-[0-9a-zA-Z]+)");

private static List<String> parseInstanceIdsFromNotFoundException(final String errorMessage) {
final Matcher fullMessageMatcher = INSTANCE_ID_PATTERN.matcher(errorMessage);

final List<String> instanceIds = new ArrayList<>();
while (fullMessageMatcher.find()) {
instanceIds.add(fullMessageMatcher.group(1));
}

return instanceIds;
}

public static Set<String> describeTerminated(final AmazonEC2 ec2, final Set<String> instanceIds) {
return describeTerminated(ec2, instanceIds, BATCH_SIZE);
}

public static Set<String> describeTerminated(final AmazonEC2 ec2, final Set<String> instanceIds, final int batchSize) {
// assume all terminated until we get opposite info
final Set<String> terminated = new HashSet<>(instanceIds);
// don't do actual call if no data
if (instanceIds.isEmpty()) return terminated;

final List<List<String>> batches = Lists.partition(new ArrayList<>(instanceIds), batchSize);
for (final List<String> batch : batches) {
describeTerminatedBatch(ec2, terminated, batch);
}
return terminated;
}

private static void describeTerminatedBatch(final AmazonEC2 ec2, final Set<String> terminated, final List<String> batch) {
// we are going to modify list, so copy
final List<String> copy = new ArrayList<>(batch);

// just to simplify debug by having consist order
Collections.sort(copy);

// because instances could be terminated at any time we do multiple
// retry to get status and all time remove from request all non found instances if any
while (copy.size() > 0) {
try {
final DescribeInstancesRequest request = new DescribeInstancesRequest().withInstanceIds(copy);

DescribeInstancesResult result;
do {
result = ec2.describeInstances(request);
request.setNextToken(result.getNextToken());

for (final Reservation r : result.getReservations()) {
for (final Instance instance : r.getInstances()) {
// if instance not in terminated state, remove it from terminated
if (!TERMINATED_STATES.contains(instance.getState().getName())) {
terminated.remove(instance.getInstanceId());
}
}
}
} while (result.getNextToken() != null);

// all good, clear request batch to stop
copy.clear();
} catch (final AmazonEC2Exception exception) {
// if we cannot find instance, that's fine assume them as terminated
// remove from request and try again
if (exception.getErrorCode().equals(NOT_FOUND_ERROR_CODE)) {
final List<String> notFoundInstanceIds = parseInstanceIdsFromNotFoundException(exception.getMessage());
if (notFoundInstanceIds.isEmpty()) {
// looks like we cannot parse correctly, rethrow
throw exception;
}
copy.removeAll(notFoundInstanceIds);
}
}
}
}

}
55 changes: 19 additions & 36 deletions src/main/java/com/amazon/jenkins/ec2fleet/EC2FleetCloud.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.amazonaws.services.ec2.model.DescribeSpotFleetRequestsRequest;
import com.amazonaws.services.ec2.model.DescribeSpotFleetRequestsResult;
import com.amazonaws.services.ec2.model.Instance;
import com.amazonaws.services.ec2.model.InstanceStateName;
import com.amazonaws.services.ec2.model.ModifySpotFleetRequestRequest;
import com.amazonaws.services.ec2.model.Region;
import com.amazonaws.services.ec2.model.SpotFleetRequestConfig;
Expand Down Expand Up @@ -332,48 +331,49 @@ public synchronized FleetStateStats updateStatus() {

// Set up the lists of Jenkins nodes and fleet instances
// currentFleetInstances contains instances currently in the fleet
final Set<String> currentFleetInstances = new HashSet<String>(curStatus.getInstances());
final Set<String> currentInstanceIds = new HashSet<>(curStatus.getInstances());
// currentJenkinsNodes contains all Nodes currently registered in Jenkins
final Set<String> currentJenkinsNodes = new HashSet<String>();
final Set<String> currentJenkinsNodes = new HashSet<>();
for (final Node node : Jenkins.getInstance().getNodes()) {
currentJenkinsNodes.add(node.getNodeName());
}
// missingFleetInstances contains Jenkins nodes that were once fleet instances but are no longer in the fleet
final Set<String> missingFleetInstances = new HashSet<String>();
final Set<String> missingFleetInstances = new HashSet<>();
missingFleetInstances.addAll(currentJenkinsNodes);
missingFleetInstances.retainAll(fleetInstancesCache);
missingFleetInstances.removeAll(currentFleetInstances);
missingFleetInstances.removeAll(currentInstanceIds);

// terminatedFleetInstances contains fleet instances that are terminated, stopped, stopping, or shutting down
final Set<String> terminatedFleetInstances = new HashSet<String>();
for (final String instance : currentFleetInstances) {
try {
if (isTerminated(ec2, instance)) terminatedFleetInstances.add(instance);
} catch (final Exception ex) {
LOGGER.log(Level.WARNING, "Unable to check the instance state of " + instance);
}
Set<String> terminatedInstanceIds = new HashSet<>();
try {
terminatedInstanceIds = EC2Api.describeTerminated(ec2, currentInstanceIds);
LOGGER.log(Level.INFO, "Described terminated instances " + terminatedInstanceIds + " for " + currentInstanceIds);
} catch (final Exception ex) {
LOGGER.log(Level.WARNING, "Unable to describe terminated instances for " + currentInstanceIds);
}

// newFleetInstances contains running fleet instances that are not already Jenkins nodes
final Set<String> newFleetInstances = new HashSet<String>();
newFleetInstances.addAll(currentFleetInstances);
newFleetInstances.removeAll(terminatedFleetInstances);
final Set<String> newFleetInstances = new HashSet<>();
newFleetInstances.addAll(currentInstanceIds);
newFleetInstances.removeAll(terminatedInstanceIds);
newFleetInstances.removeAll(currentJenkinsNodes);

// update caches
dyingFleetInstancesCache.addAll(missingFleetInstances);
dyingFleetInstancesCache.addAll(terminatedFleetInstances);
dyingFleetInstancesCache.addAll(terminatedInstanceIds);
dyingFleetInstancesCache.retainAll(currentJenkinsNodes);
fleetInstancesCache.addAll(currentFleetInstances);
fleetInstancesCache.addAll(currentInstanceIds);
fleetInstancesCache.removeAll(dyingFleetInstancesCache);
fleetInstancesCache.retainAll(currentJenkinsNodes);

LOGGER.log(Level.FINE, "# of current Jenkins nodes:" + currentJenkinsNodes.size());
LOGGER.log(Level.FINE, "Fleet (" + getLabelString() + ") contains instances [" + join(", ", currentFleetInstances) + "]");
LOGGER.log(Level.FINE, "Fleet (" + getLabelString() + ") contains instances [" + join(", ", currentInstanceIds) + "]");
LOGGER.log(Level.FINE, "Jenkins contains dying instances [" + join(", ", dyingFleetInstancesCache) + "]");
LOGGER.log(Level.FINER, "Jenkins contains fleet instances [" + join(", ", fleetInstancesCache) + "]");
LOGGER.log(Level.FINER, "Current Jenkins nodes [" + join(", ", currentJenkinsNodes) + "]");
LOGGER.log(Level.FINER, "New fleet instances [" + join(", ", newFleetInstances) + "]");
LOGGER.log(Level.FINER, "Missing fleet instances [" + join(", ", missingFleetInstances) + "]");
LOGGER.log(Level.FINER, "Terminated fleet instances [" + join(", ", terminatedFleetInstances) + "]");
LOGGER.log(Level.FINER, "Terminated fleet instances [" + join(", ", terminatedInstanceIds) + "]");

// Remove dying fleet instances from Jenkins
for (final String instance : dyingFleetInstancesCache) {
Expand Down Expand Up @@ -413,23 +413,6 @@ public synchronized FleetStateStats updateStatus() {
return curStatus;
}


private boolean isTerminated(final AmazonEC2 ec2, final String instanceId) {
final DescribeInstancesResult result = ec2.describeInstances(
new DescribeInstancesRequest().withInstanceIds(instanceId));
if (result.getReservations().isEmpty()) //Can't find this instance, assume it is terminated
return true;
final Instance instance = result.getReservations().get(0).getInstances().get(0);
final String instanceState = instance.getState().getName();
final Set<String> terminatedStates = new HashSet<String>();
terminatedStates.add(InstanceStateName.Terminated.toString());
terminatedStates.add(InstanceStateName.Stopped.toString());
terminatedStates.add(InstanceStateName.Stopping.toString());
terminatedStates.add(InstanceStateName.ShuttingDown.toString());

return terminatedStates.contains(instanceState);
}

private void addNewSlave(final AmazonEC2 ec2, final String instanceId) throws Exception {
// Generate a random FS root if one isn't specified
String fsRoot = this.fsRoot;
Expand Down
Loading

0 comments on commit 8947a3b

Please sign in to comment.