Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrated code lifecycle: Pause build agent #9348

Open
wants to merge 34 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e70c8bd
pause and resume agents server side
BBesrour Sep 14, 2024
cf62138
validate build agent status before action
BBesrour Sep 14, 2024
7fc872a
add client buttons
BBesrour Sep 15, 2024
27a83aa
update build agent status in ui
BBesrour Sep 15, 2024
0ccc125
add client tests
BBesrour Sep 15, 2024
70c3029
add server tests
BBesrour Sep 15, 2024
991ac59
dont save result after grace period
BBesrour Sep 17, 2024
2a9cc33
make grace period configurable
BBesrour Sep 21, 2024
cc960b7
dont check if agent is paused
BBesrour Sep 22, 2024
fe6ba08
feedback
BBesrour Sep 22, 2024
0aab99a
add instance locks and use futures instead of sleep
BBesrour Sep 23, 2024
74591d1
use single lock
BBesrour Sep 23, 2024
33fbda9
update ui
BBesrour Sep 23, 2024
b102f65
feedback
BBesrour Sep 24, 2024
8e5ba4b
Merge branch 'develop' into feature/integrated-code-lifecycle/pause-b…
BBesrour Sep 28, 2024
f4caada
Merge branch 'develop' into feature/integrated-code-lifecycle/pause-b…
BBesrour Sep 28, 2024
1c36f2a
Merge branch 'develop' into feature/integrated-code-lifecycle/pause-b…
BBesrour Sep 30, 2024
05650a8
fix client coverage
BBesrour Oct 1, 2024
36b0d91
Merge branch 'develop' into feature/integrated-code-lifecycle/pause-b…
BBesrour Oct 6, 2024
a891aa2
Merge branch 'develop' into feature/integrated-code-lifecycle/pause-b…
BBesrour Oct 8, 2024
5f69fad
Merge branch 'develop' into feature/integrated-code-lifecycle/pause-b…
BBesrour Oct 12, 2024
b8a8c2a
Merge branch 'develop' into feature/integrated-code-lifecycle/pause-b…
BBesrour Oct 13, 2024
6277db6
fix style
BBesrour Oct 13, 2024
6d33712
fix test
BBesrour Oct 13, 2024
15d862c
fix test
BBesrour Oct 13, 2024
89a652e
fix test
BBesrour Oct 13, 2024
3f1db83
fix test
BBesrour Oct 13, 2024
c341686
Merge branch 'develop' into feature/integrated-code-lifecycle/pause-b…
BBesrour Oct 13, 2024
fb398ac
fix test
BBesrour Oct 13, 2024
c00cdc7
fix test
BBesrour Oct 13, 2024
fefab08
Merge branch 'develop' into feature/integrated-code-lifecycle/pause-b…
BBesrour Oct 14, 2024
b02311b
Merge branch 'develop' into feature/integrated-code-lifecycle/pause-b…
BBesrour Oct 18, 2024
88cd16c
tests
BBesrour Oct 18, 2024
8aa86fc
tests
BBesrour Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
// in the future are migrated or cleared. Changes should be communicated in release notes as potentially breaking changes.
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public record BuildAgentInformation(String name, int maxNumberOfConcurrentBuildJobs, int numberOfCurrentBuildJobs, List<BuildJobQueueItem> runningBuildJobs, boolean status,
List<BuildJobQueueItem> recentBuildJobs, String publicSshKey) implements Serializable {
public record BuildAgentInformation(String name, int maxNumberOfConcurrentBuildJobs, int numberOfCurrentBuildJobs, List<BuildJobQueueItem> runningBuildJobs,
BuildAgentStatus status, List<BuildJobQueueItem> recentBuildJobs, String publicSshKey) implements Serializable {

@Serial
private static final long serialVersionUID = 1L;
Expand All @@ -27,4 +27,8 @@ public BuildAgentInformation(BuildAgentInformation agentInformation, List<BuildJ
this(agentInformation.name(), agentInformation.maxNumberOfConcurrentBuildJobs(), agentInformation.numberOfCurrentBuildJobs(), agentInformation.runningBuildJobs,
agentInformation.status(), recentBuildJobs, agentInformation.publicSshKey());
}

public enum BuildAgentStatus {
ACTIVE, IDLE, PAUSED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,12 @@ public CompletableFuture<BuildResult> executeBuildJob(BuildJobQueueItem buildJob
}
}
});
futureResult.whenComplete(((result, throwable) -> runningFutures.remove(buildJobItem.id())));

return futureResult;
return futureResult.whenComplete(((result, throwable) -> runningFutures.remove(buildJobItem.id())));
}

Set<String> getRunningBuildJobIds() {
return Set.copyOf(runningFutures.keySet());
}

/**
Expand Down Expand Up @@ -227,7 +230,7 @@ private void finishBuildJobExceptionally(String buildJobId, String containerName
*
* @param buildJobId The id of the build job that should be cancelled.
*/
private void cancelBuildJob(String buildJobId) {
void cancelBuildJob(String buildJobId) {
Future<BuildResult> future = runningFutures.get(buildJobId);
if (future != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static de.tum.cit.aet.artemis.core.config.Constants.PROFILE_BUILDAGENT;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -12,6 +13,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -23,7 +25,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

Expand All @@ -33,6 +37,7 @@
import com.hazelcast.collection.ItemListener;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.topic.ITopic;

import de.tum.cit.aet.artemis.buildagent.dto.BuildAgentInformation;
import de.tum.cit.aet.artemis.buildagent.dto.BuildJobQueueItem;
Expand Down Expand Up @@ -64,6 +69,8 @@ public class SharedQueueProcessingService {

private final BuildAgentSshKeyService buildAgentSSHKeyService;

private final TaskScheduler taskScheduler;

private IQueue<BuildJobQueueItem> queue;

private IQueue<ResultQueueItem> resultQueue;
Expand All @@ -82,13 +89,26 @@ public class SharedQueueProcessingService {

private UUID listenerId;

/**
* Scheduled future for checking availability and processing next build job.
*/
private ScheduledFuture<?> scheduledFuture;

private volatile boolean isPaused = false;

private volatile boolean processResults = true;
BBesrour marked this conversation as resolved.
Show resolved Hide resolved

@Value("${artemis.continuous-integration.pause-grace-period-seconds:15}")
private int pauseGracePeriodSeconds;

BBesrour marked this conversation as resolved.
Show resolved Hide resolved
public SharedQueueProcessingService(@Qualifier("hazelcastInstance") HazelcastInstance hazelcastInstance, ExecutorService localCIBuildExecutorService,
BuildJobManagementService buildJobManagementService, BuildLogsMap buildLogsMap, BuildAgentSshKeyService buildAgentSSHKeyService) {
BuildJobManagementService buildJobManagementService, BuildLogsMap buildLogsMap, BuildAgentSshKeyService buildAgentSSHKeyService, TaskScheduler taskScheduler) {
this.hazelcastInstance = hazelcastInstance;
this.localCIBuildExecutorService = (ThreadPoolExecutor) localCIBuildExecutorService;
this.buildJobManagementService = buildJobManagementService;
this.buildLogsMap = buildLogsMap;
this.buildAgentSSHKeyService = buildAgentSSHKeyService;
this.taskScheduler = taskScheduler;
}

/**
Expand All @@ -101,6 +121,28 @@ public void init() {
this.queue = this.hazelcastInstance.getQueue("buildJobQueue");
this.resultQueue = this.hazelcastInstance.getQueue("buildResultQueue");
this.listenerId = this.queue.addItemListener(new QueuedBuildJobItemListener(), true);

/*
* Check every 10 seconds whether the node has at least one thread available for a new build job.
* If so, process the next build job.
* This is a backup mechanism in case the build queue is not empty, no new build jobs are entering the queue and the
* node otherwise stopped checking for build jobs in the queue.
*/
scheduledFuture = taskScheduler.scheduleAtFixedRate(this::checkAvailabilityAndProcessNextBuild, Duration.ofSeconds(10));

ITopic<String> pauseBuildAgentTopic = hazelcastInstance.getTopic("pauseBuildAgentTopic");
pauseBuildAgentTopic.addMessageListener(message -> {
if (message.getMessageObject().equals(hazelcastInstance.getCluster().getLocalMember().getAddress().toString())) {
pauseBuildAgent();
}
});

ITopic<String> resumeBuildAgentTopic = hazelcastInstance.getTopic("resumeBuildAgentTopic");
resumeBuildAgentTopic.addMessageListener(message -> {
if (message.getMessageObject().equals(hazelcastInstance.getCluster().getLocalMember().getAddress().toString())) {
resumeBuildAgent();
}
});
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
}

@PreDestroy
Expand All @@ -127,17 +169,6 @@ public void updateBuildAgentInformation() {
}
}

/**
* Check every 10 seconds whether the node has at least one thread available for a new build job.
* If so, process the next build job.
* This is a backup mechanism in case the build queue is not empty, no new build jobs are entering the queue and the
* node otherwise stopped checking for build jobs in the queue.
*/
@Scheduled(fixedRate = 10000)
public void checkForBuildJobs() {
checkAvailabilityAndProcessNextBuild();
}

/**
* Checks whether the node has at least one thread available for a new build job.
* If so, process the next build job.
Expand All @@ -158,14 +189,14 @@ private void checkAvailabilityAndProcessNextBuild() {
return;
}

if (queue.isEmpty()) {
if (queue.isEmpty() || isPaused) {
return;
}
BuildJobQueueItem buildJob = null;
instanceLock.lock();
try {
// Recheck conditions after acquiring the lock to ensure they are still valid
if (!nodeIsAvailable() || queue.isEmpty()) {
if (!nodeIsAvailable() || queue.isEmpty() || isPaused) {
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
return;
}

Expand Down Expand Up @@ -241,7 +272,9 @@ private BuildAgentInformation getUpdatedLocalBuildAgentInformation(BuildJobQueue
List<BuildJobQueueItem> processingJobsOfMember = getProcessingJobsOfNode(memberAddress);
int numberOfCurrentBuildJobs = processingJobsOfMember.size();
int maxNumberOfConcurrentBuilds = localCIBuildExecutorService.getMaximumPoolSize();
boolean active = numberOfCurrentBuildJobs > 0;
boolean hasJobs = numberOfCurrentBuildJobs > 0;
BuildAgentInformation.BuildAgentStatus status = isPaused ? BuildAgentInformation.BuildAgentStatus.PAUSED
: hasJobs ? BuildAgentInformation.BuildAgentStatus.ACTIVE : BuildAgentInformation.BuildAgentStatus.IDLE;
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
BuildAgentInformation agent = buildAgentInformation.get(memberAddress);
List<BuildJobQueueItem> recentBuildJobs;
if (agent != null) {
Expand All @@ -260,7 +293,7 @@ private BuildAgentInformation getUpdatedLocalBuildAgentInformation(BuildJobQueue

String publicSshKey = buildAgentSSHKeyService.getPublicKeyAsString();

return new BuildAgentInformation(memberAddress, maxNumberOfConcurrentBuilds, numberOfCurrentBuildJobs, processingJobsOfMember, active, recentBuildJobs, publicSshKey);
return new BuildAgentInformation(memberAddress, maxNumberOfConcurrentBuilds, numberOfCurrentBuildJobs, processingJobsOfMember, status, recentBuildJobs, publicSshKey);
}

private List<BuildJobQueueItem> getProcessingJobsOfNode(String memberAddress) {
Expand Down Expand Up @@ -304,7 +337,12 @@ private void processBuild(BuildJobQueueItem buildJob) {
buildLogsMap.removeBuildLogs(buildJob.id());

ResultQueueItem resultQueueItem = new ResultQueueItem(buildResult, finishedJob, buildLogs, null);
resultQueue.add(resultQueueItem);
if (processResults) {
resultQueue.add(resultQueueItem);
}
else {
log.info("Build agent is paused. Not adding build result to result queue for build job: {}", buildJob);
}
BBesrour marked this conversation as resolved.
Show resolved Hide resolved

// after processing a build job, remove it from the processing jobs
processingJobs.remove(buildJob.id());
Expand Down Expand Up @@ -339,7 +377,12 @@ private void processBuild(BuildJobQueueItem buildJob) {
failedResult.setBuildLogEntries(buildLogs);

ResultQueueItem resultQueueItem = new ResultQueueItem(failedResult, job, buildLogs, ex);
resultQueue.add(resultQueueItem);
if (processResults) {
resultQueue.add(resultQueueItem);
}
else {
log.info("Build agent is paused. Not adding build result to result queue for build job: {}", buildJob);
}
BBesrour marked this conversation as resolved.
Show resolved Hide resolved

processingJobs.remove(buildJob.id());
localProcessingJobs.decrementAndGet();
Expand All @@ -350,6 +393,71 @@ private void processBuild(BuildJobQueueItem buildJob) {
});
}

private void pauseBuildAgent() {
if (this.isPaused) {
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
log.info("Build agent is already paused");
return;
}

log.info("Pausing build agent with address {}", hazelcastInstance.getCluster().getLocalMember().getAddress().toString());

this.isPaused = true;
this.removeListener();
if (this.scheduledFuture != null && !this.scheduledFuture.isCancelled()) {
this.scheduledFuture.cancel(false);
}
updateLocalBuildAgentInformation();
BBesrour marked this conversation as resolved.
Show resolved Hide resolved

log.info("Gracefully cancelling running build jobs");

if (buildJobManagementService.getRunningBuildJobIds().isEmpty()) {
log.info("No running build jobs to cancel");
}
else {
// Sleep for 10 seconds to allow the build jobs to be finished. If they are not finished, they will be cancelled.
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
try {
Thread.sleep(pauseGracePeriodSeconds * 1000L);
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
}
catch (InterruptedException e) {
log.error("Error while pausing build agent", e);
}
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
BBesrour marked this conversation as resolved.
Show resolved Hide resolved

if (!this.isPaused) {
log.info("Build agent was resumed before the build jobs could be cancelled");
return;
}

this.processResults = false;
Set<String> runningBuildJobIds = buildJobManagementService.getRunningBuildJobIds();
List<BuildJobQueueItem> runningBuildJobs = processingJobs.getAll(runningBuildJobIds).values().stream().toList();
runningBuildJobIds.forEach(buildJobManagementService::cancelBuildJob);
this.queue.addAll(runningBuildJobs);
log.info("Cancelled running build jobs and added them back to the queue with Ids {}", runningBuildJobIds);
log.debug("Cancelled running build jobs: {}", runningBuildJobs);
BBesrour marked this conversation as resolved.
Show resolved Hide resolved

}
}

private void resumeBuildAgent() {
if (!this.isPaused) {
log.info("Build agent is already running");
return;
}

log.info("Resuming build agent with address {}", hazelcastInstance.getCluster().getLocalMember().getAddress().toString());
this.isPaused = false;
this.processResults = true;
// We remove the listener and scheduledTask first to avoid race conditions
this.removeListener();
this.listenerId = this.queue.addItemListener(new QueuedBuildJobItemListener(), true);
if (this.scheduledFuture != null && !this.scheduledFuture.isCancelled()) {
this.scheduledFuture.cancel(false);
}
this.scheduledFuture = taskScheduler.scheduleAtFixedRate(this::checkAvailabilityAndProcessNextBuild, Duration.ofSeconds(10));
checkAvailabilityAndProcessNextBuild();
updateLocalBuildAgentInformation();
}

/**
* Checks whether the node has at least one thread available for a new build job.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
Expand Down Expand Up @@ -197,4 +198,46 @@ public ResponseEntity<BuildJobsStatisticsDTO> getBuildJobStatistics(@RequestPara
BuildJobsStatisticsDTO buildJobStatistics = BuildJobsStatisticsDTO.of(buildJobResultCountDtos);
return ResponseEntity.ok(buildJobStatistics);
}

/**
* {@code PUT /api/admin/agent/{agentName}/pause} : Pause the specified build agent.
* This endpoint allows administrators to pause a specific build agent by its name.
* Pausing a build agent will prevent it from picking up any new build jobs until it is resumed.
*
* <p>
* <strong>Authorization:</strong> This operation requires admin privileges, enforced by {@code @EnforceAdmin}.
* </p>
*
* @param agentName the name of the build agent to be paused (provided as a path variable)
* @return {@link ResponseEntity} with status code 204 (No Content) if the agent was successfully paused
* or an appropriate error response if something went wrong
*/
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
@PutMapping("agent/{agentName}/pause")
@EnforceAdmin
public ResponseEntity<Void> pauseBuildAgent(@PathVariable String agentName) {
log.debug("REST request to pause agent {}", agentName);
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
localCIBuildJobQueueService.pauseBuildAgent(agentName);
return ResponseEntity.noContent().build();
}
BBesrour marked this conversation as resolved.
Show resolved Hide resolved

/**
* {@code PUT /api/admin/agent/{agentName}/resume} : Resume the specified build agent.
* This endpoint allows administrators to resume a specific build agent by its name.
* Resuming a build agent will allow it to pick up new build jobs again.
*
* <p>
* <strong>Authorization:</strong> This operation requires admin privileges, enforced by {@code @EnforceAdmin}.
* </p>
*
* @param agentName the name of the build agent to be resumed (provided as a path variable)
* @return {@link ResponseEntity} with status code 204 (No Content) if the agent was successfully resumed
* or an appropriate error response if something went wrong
*/
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
@PutMapping("agent/{agentName}/resume")
@EnforceAdmin
public ResponseEntity<Void> resumeBuildAgent(@PathVariable String agentName) {
log.debug("REST request to resume agent {}", agentName);
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
localCIBuildJobQueueService.resumeBuildAgent(agentName);
return ResponseEntity.noContent().build();
}
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
BBesrour marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public class SharedQueueManagementService {

private ITopic<String> canceledBuildJobsTopic;

private ITopic<String> pauseBuildAgentTopic;

private ITopic<String> resumeBuildAgentTopic;

public SharedQueueManagementService(BuildJobRepository buildJobRepository, @Qualifier("hazelcastInstance") HazelcastInstance hazelcastInstance, ProfileService profileService) {
this.buildJobRepository = buildJobRepository;
this.hazelcastInstance = hazelcastInstance;
Expand All @@ -83,6 +87,8 @@ public void init() {
this.queue = this.hazelcastInstance.getQueue("buildJobQueue");
this.canceledBuildJobsTopic = hazelcastInstance.getTopic("canceledBuildJobsTopic");
this.dockerImageCleanupInfo = this.hazelcastInstance.getMap("dockerImageCleanupInfo");
this.pauseBuildAgentTopic = hazelcastInstance.getTopic("pauseBuildAgentTopic");
this.resumeBuildAgentTopic = hazelcastInstance.getTopic("resumeBuildAgentTopic");
}

/**
Expand Down Expand Up @@ -135,6 +141,14 @@ public List<BuildAgentInformation> getBuildAgentInformationWithoutRecentBuildJob
agent.numberOfCurrentBuildJobs(), agent.runningBuildJobs(), agent.status(), null, null)).toList();
}

public void pauseBuildAgent(String agent) {
pauseBuildAgentTopic.publish(agent);
}

public void resumeBuildAgent(String agent) {
resumeBuildAgentTopic.publish(agent);
}

/**
* Cancel a build job by removing it from the queue or stopping the build process.
*
Expand Down
Loading
Loading