Skip to content

Commit

Permalink
[Fix] Fix computation of excess workload and available capacity (#371)
Browse files Browse the repository at this point in the history
* [Fix] Fix computation of excess workload and available capacity

#322

#359

* Update src/main/java/com/amazon/jenkins/ec2fleet/NoDelayProvisionStrategy.java

Co-authored-by: Jerad C <[email protected]>

---------

Co-authored-by: Jerad C <[email protected]>
  • Loading branch information
pdk27 and cjerad committed Jun 1, 2023
1 parent 36ed713 commit 6bce254
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,63 +30,64 @@ public NodeProvisioner.StrategyDecision apply(final NodeProvisioner.StrategyStat
final Label label = strategyState.getLabel();

final LoadStatistics.LoadStatisticsSnapshot snapshot = strategyState.getSnapshot();
final int availableCapacity =
snapshot.getAvailableExecutors() // live executors
+ snapshot.getConnectingExecutors() // executors present but not yet connected
+ strategyState.getPlannedCapacitySnapshot() // capacity added by previous strategies from previous rounds
+ strategyState.getAdditionalPlannedCapacity(); // capacity added by previous strategies _this round_

int currentDemand = snapshot.getQueueLength() - availableCapacity;
LOGGER.log(currentDemand < 1 ? Level.FINE : Level.INFO,
"label [{0}]: currentDemand {1} availableCapacity {2} (availableExecutors {3} connectingExecutors {4} plannedCapacitySnapshot {5} additionalPlannedCapacity {6})",
new Object[]{label, currentDemand, availableCapacity, snapshot.getAvailableExecutors(),
snapshot.getConnectingExecutors(), strategyState.getPlannedCapacitySnapshot(),
strategyState.getAdditionalPlannedCapacity()});

for (final Cloud cloud : getClouds()) {
if (currentDemand < 1) {
LOGGER.log(Level.FINE, "label [{0}]: currentDemand is less than 1, not provisioning", label);
final int availableCapacity = snapshot.getAvailableExecutors() // available executors
+ strategyState.getPlannedCapacitySnapshot() // capacity added by previous strategies from previous rounds
+ strategyState.getAdditionalPlannedCapacity(); // capacity added by previous strategies _this round_

int qLen = snapshot.getQueueLength();
int excessWorkload = qLen - availableCapacity;
LOGGER.log(Level.FINE, "label [{0}]: queueLength {1} availableCapacity {2} (availableExecutors {3} plannedCapacitySnapshot {4} additionalPlannedCapacity {5})",
new Object[]{label, qLen, availableCapacity, snapshot.getAvailableExecutors(),
strategyState.getPlannedCapacitySnapshot(), strategyState.getAdditionalPlannedCapacity()});

if (excessWorkload <= 0) {
LOGGER.log(Level.INFO, "label [{0}]: No excess workload, provisioning not needed.", label);
return NodeProvisioner.StrategyDecision.PROVISIONING_COMPLETED;
}

for (final Cloud c : getClouds()) {
if (excessWorkload < 1) {
break;
}

if (!(cloud instanceof EC2FleetCloud)) {
if (!(c instanceof EC2FleetCloud)) {
LOGGER.log(Level.FINE, "label [{0}]: cloud {1} is not an EC2FleetCloud, continuing...",
new Object[]{label, cloud.getDisplayName()});
new Object[]{label, c.getDisplayName()});
continue;
}

Cloud.CloudState cloudState = new Cloud.CloudState(label, strategyState.getAdditionalPlannedCapacity());
if (!cloud.canProvision(cloudState)) {
if (!c.canProvision(cloudState)) {
LOGGER.log(Level.INFO, "label [{0}]: cloud {1} can not provision for this label, continuing...",
new Object[]{label, cloud.getDisplayName()});
new Object[]{label, c.getDisplayName()});
continue;
}

final EC2FleetCloud ec2 = (EC2FleetCloud) cloud;
if (!ec2.isNoDelayProvision()) {
if (!((EC2FleetCloud) c).isNoDelayProvision()) {
LOGGER.log(Level.FINE, "label [{0}]: cloud {1} does not use No Delay Provision Strategy, continuing...",
new Object[]{label, cloud.getDisplayName()});
new Object[]{label, c.getDisplayName()});
continue;
}

LOGGER.log(Level.INFO, "label [{0}]: cloud {1} can provision for this label",
new Object[]{label, cloud.getDisplayName()});
final Collection<NodeProvisioner.PlannedNode> plannedNodes = cloud.provision(cloudState, currentDemand);
for (NodeProvisioner.PlannedNode plannedNode : plannedNodes) {
currentDemand -= plannedNode.numExecutors;
LOGGER.log(Level.FINE, "label [{0}]: cloud {1} can provision for this label",
new Object[]{label, c.getDisplayName()});
final Collection<NodeProvisioner.PlannedNode> plannedNodes = c.provision(cloudState, excessWorkload);
for (NodeProvisioner.PlannedNode pn : plannedNodes) {
excessWorkload -= pn.numExecutors;
LOGGER.log(Level.INFO, "Started provisioning {0} from {1} with {2,number,integer} "
+ "executors. Remaining excess workload: {3,number,#.###}",
new Object[]{pn.displayName, c.name, pn.numExecutors, excessWorkload});
}
LOGGER.log(Level.FINE, "Planned {0} new nodes", plannedNodes.size());
strategyState.recordPendingLaunches(plannedNodes);
LOGGER.log(Level.FINE, "After provisioning currentDemand={0}", new Object[]{currentDemand});
}

if (currentDemand < 1) {
LOGGER.log(Level.FINE, "Provisioning completed");
return NodeProvisioner.StrategyDecision.PROVISIONING_COMPLETED;
} else {
if (excessWorkload > 0) {
LOGGER.log(Level.FINE, "Provisioning not complete, consulting remaining strategies");
return NodeProvisioner.StrategyDecision.CONSULT_REMAINING_STRATEGIES;
}

LOGGER.log(Level.FINE, "Provisioning completed");
return NodeProvisioner.StrategyDecision.PROVISIONING_COMPLETED;
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,32 @@ public void givenEC2Clouds_shouldReduceAsAmountOfExecutors() {
verify(ec2FleetCloud1, times(1)).provision(any(Cloud.CloudState.class), eq(2));
}

@Test
public void givenPlannedCapacity_shouldComputeExcessWorkloadCorrectly() {
when(snapshot.getQueueLength()).thenReturn(6);
when(snapshot.getAvailableExecutors()).thenReturn(0);
when(state.getPlannedCapacitySnapshot()).thenReturn(3);
when(state.getLabel()).thenReturn(label);

final EC2FleetCloud ec2FleetCloud1 = mock(EC2FleetCloud.class);
clouds.add(ec2FleetCloud1);
when(ec2FleetCloud1.canProvision(any(Cloud.CloudState.class))).thenReturn(true);
when(ec2FleetCloud1.isNoDelayProvision()).thenReturn(true);
final NodeProvisioner.PlannedNode plannedNode1 = new NodeProvisioner.PlannedNode("fc1-0", new CompletableFuture<>(), 2);
when(ec2FleetCloud1.provision(any(Cloud.CloudState.class), anyInt())).thenReturn(Arrays.asList(plannedNode1));

final EC2FleetCloud ec2FleetCloud2 = mock(EC2FleetCloud.class);
clouds.add(ec2FleetCloud2);
when(ec2FleetCloud2.canProvision(any(Cloud.CloudState.class))).thenReturn(true);
when(ec2FleetCloud2.isNoDelayProvision()).thenReturn(true);
final NodeProvisioner.PlannedNode plannedNode2 = new NodeProvisioner.PlannedNode("fc2-0", new CompletableFuture<>(), 1);
when(ec2FleetCloud2.provision(any(Cloud.CloudState.class), anyInt())).thenReturn(Arrays.asList(plannedNode2));

final NodeProvisioner.StrategyDecision decision = strategy.apply(state);

Assert.assertEquals(NodeProvisioner.StrategyDecision.PROVISIONING_COMPLETED, decision);
verify(ec2FleetCloud1, times(1)).provision(any(Cloud.CloudState.class), eq(3));
verify(ec2FleetCloud2, times(1)).provision(any(Cloud.CloudState.class), eq(1));
}

}

0 comments on commit 6bce254

Please sign in to comment.