diff --git a/src/main/java/de/tum/in/www1/artemis/config/websocket/WebsocketConfiguration.java b/src/main/java/de/tum/in/www1/artemis/config/websocket/WebsocketConfiguration.java index 91c45691f4dc..ef253d5cdc73 100644 --- a/src/main/java/de/tum/in/www1/artemis/config/websocket/WebsocketConfiguration.java +++ b/src/main/java/de/tum/in/www1/artemis/config/websocket/WebsocketConfiguration.java @@ -2,6 +2,8 @@ import static de.tum.in.www1.artemis.web.websocket.ResultWebsocketService.getExerciseIdFromNonPersonalExerciseResultDestination; import static de.tum.in.www1.artemis.web.websocket.ResultWebsocketService.isNonPersonalExerciseResultDestination; +import static de.tum.in.www1.artemis.web.websocket.localci.LocalCIBuildQueueWebsocketService.isBuildQueueAdminDestination; +import static de.tum.in.www1.artemis.web.websocket.localci.LocalCIBuildQueueWebsocketService.isBuildQueueCourseDestination; import static de.tum.in.www1.artemis.web.websocket.team.ParticipationTeamWebsocketService.*; import java.net.InetSocketAddress; @@ -46,13 +48,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterators; +import de.tum.in.www1.artemis.domain.Course; import de.tum.in.www1.artemis.domain.Exercise; import de.tum.in.www1.artemis.domain.User; import de.tum.in.www1.artemis.domain.participation.StudentParticipation; -import de.tum.in.www1.artemis.repository.ExamRepository; -import de.tum.in.www1.artemis.repository.ExerciseRepository; -import de.tum.in.www1.artemis.repository.StudentParticipationRepository; -import de.tum.in.www1.artemis.repository.UserRepository; +import de.tum.in.www1.artemis.repository.*; import de.tum.in.www1.artemis.security.Role; import de.tum.in.www1.artemis.security.jwt.JWTFilter; import de.tum.in.www1.artemis.security.jwt.TokenProvider; @@ -96,9 +96,11 @@ public class WebsocketConfiguration extends DelegatingWebSocketMessageBrokerConf @Value("${spring.websocket.broker.password}") private String brokerPassword; + private final CourseRepository courseRepository; + public WebsocketConfiguration(MappingJackson2HttpMessageConverter springMvcJacksonConverter, TaskScheduler messageBrokerTaskScheduler, TokenProvider tokenProvider, StudentParticipationRepository studentParticipationRepository, AuthorizationCheckService authorizationCheckService, ExerciseRepository exerciseRepository, - UserRepository userRepository, ExamRepository examRepository) { + UserRepository userRepository, ExamRepository examRepository, CourseRepository courseRepository) { this.objectMapper = springMvcJacksonConverter.getObjectMapper(); this.messageBrokerTaskScheduler = messageBrokerTaskScheduler; this.tokenProvider = tokenProvider; @@ -107,6 +109,7 @@ public WebsocketConfiguration(MappingJackson2HttpMessageConverter springMvcJacks this.exerciseRepository = exerciseRepository; this.userRepository = userRepository; this.examRepository = examRepository; + this.courseRepository = courseRepository; } @Override @@ -262,12 +265,34 @@ public Message preSend(@NotNull Message message, @NotNull MessageChannel c /** * Returns whether the subscription of the given principal to the given destination is permitted + * Database calls should be avoided as much as possible in this method. + * Only for very specific topics, database calls are allowed. * * @param principal User principal of the user who wants to subscribe * @param destination Destination topic to which the user wants to subscribe * @return flag whether subscription is allowed */ private boolean allowSubscription(Principal principal, String destination) { + /* + * IMPORTANT: Avoid database calls in this method as much as possible (e.g. checking if the user + * is an instructor in a course) + * This method is called for every subscription request, so it should be as fast as possible. + * If you need to do a database call, make sure to first check if the destination is valid for your specific + * use case. + */ + + if (isBuildQueueAdminDestination(destination)) { + var user = userRepository.getUserWithAuthorities(principal.getName()); + return authorizationCheckService.isAdmin(user); + } + + Optional courseId = isBuildQueueCourseDestination(destination); + if (courseId.isPresent()) { + Course course = courseRepository.findByIdElseThrow(courseId.get()); + var user = userRepository.getUserWithGroupsAndAuthorities(principal.getName()); + return authorizationCheckService.isAtLeastInstructorInCourse(course, user); + } + if (isParticipationTeamDestination(destination)) { Long participationId = getParticipationIdFromDestination(destination); return isParticipationOwnedByUser(principal, participationId); @@ -288,7 +313,7 @@ private boolean allowSubscription(Principal principal, String destination) { var examId = getExamIdFromExamRootDestination(destination); if (examId.isPresent()) { var exam = examRepository.findByIdElseThrow(examId.get()); - User user = userRepository.getUserWithGroupsAndAuthorities(principal.getName()); + var user = userRepository.getUserWithGroupsAndAuthorities(principal.getName()); return authorizationCheckService.isAtLeastInstructorInCourse(exam.getCourse(), user); } return true; diff --git a/src/main/java/de/tum/in/www1/artemis/repository/UserRepository.java b/src/main/java/de/tum/in/www1/artemis/repository/UserRepository.java index dbf6fc927c34..a24406cc6861 100644 --- a/src/main/java/de/tum/in/www1/artemis/repository/UserRepository.java +++ b/src/main/java/de/tum/in/www1/artemis/repository/UserRepository.java @@ -74,6 +74,9 @@ public interface UserRepository extends JpaRepository, JpaSpecificat @EntityGraph(type = LOAD, attributePaths = { "groups", "authorities" }) Optional findOneWithGroupsAndAuthoritiesByLogin(String login); + @EntityGraph(type = LOAD, attributePaths = { "authorities" }) + Optional findOneWithAuthoritiesByLogin(String login); + @EntityGraph(type = LOAD, attributePaths = { "groups", "authorities" }) Optional findOneWithGroupsAndAuthoritiesByEmail(String email); @@ -499,6 +502,17 @@ default User getUserWithGroupsAndAuthorities() { return unwrapOptionalUser(user, currentUserLogin); } + /** + * Get user with authorities of currently logged-in user + * + * @return currently logged-in user + */ + default User getUserWithAuthorities() { + String currentUserLogin = getCurrentUserLogin(); + Optional user = findOneWithAuthoritiesByLogin(currentUserLogin); + return unwrapOptionalUser(user, currentUserLogin); + } + /** * Get user with user groups, authorities and organizations of currently logged-in user * @@ -549,6 +563,17 @@ default User getUserWithGroupsAndAuthorities(@NotNull String username) { return unwrapOptionalUser(user, username); } + /** + * Get user with authorities with the username (i.e. user.getLogin() or principal.getName()) + * + * @param username the username of the user who should be retrieved from the database + * @return the user that belongs to the given principal with eagerly loaded authorities + */ + default User getUserWithAuthorities(@NotNull String username) { + Optional user = findOneWithAuthoritiesByLogin(username); + return unwrapOptionalUser(user, username); + } + /** * Finds a single user with groups and authorities using the registration number * diff --git a/src/main/java/de/tum/in/www1/artemis/service/connectors/localci/LocalCIQueueWebsocketService.java b/src/main/java/de/tum/in/www1/artemis/service/connectors/localci/LocalCIQueueWebsocketService.java new file mode 100644 index 000000000000..088e1f2be8a5 --- /dev/null +++ b/src/main/java/de/tum/in/www1/artemis/service/connectors/localci/LocalCIQueueWebsocketService.java @@ -0,0 +1,133 @@ +package de.tum.in.www1.artemis.service.connectors.localci; + +import java.util.Objects; + +import javax.annotation.PostConstruct; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Service; + +import com.hazelcast.collection.IQueue; +import com.hazelcast.collection.ItemEvent; +import com.hazelcast.collection.ItemListener; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.map.IMap; +import com.hazelcast.map.listener.EntryAddedListener; +import com.hazelcast.map.listener.EntryRemovedListener; + +import de.tum.in.www1.artemis.service.connectors.localci.dto.LocalCIBuildAgentInformation; +import de.tum.in.www1.artemis.service.connectors.localci.dto.LocalCIBuildJobQueueItem; +import de.tum.in.www1.artemis.web.websocket.localci.LocalCIBuildQueueWebsocketService; + +/** + * This service is responsible for sending build job queue information over websockets. + * It listens to changes in the build job queue and sends the updated information to the client. + * NOTE: This service is only active if the profile "localci" and "scheduling" are active. This avoids sending the + * same information multiple times and thus also avoids unnecessary load on the server. + */ +@Service +@Profile("localci & scheduling") +public class LocalCIQueueWebsocketService { + + private final Logger log = LoggerFactory.getLogger(LocalCIQueueWebsocketService.class); + + private final HazelcastInstance hazelcastInstance; + + private final IQueue queue; + + private final IMap processingJobs; + + private final IMap buildAgentInformation; + + private final LocalCIBuildQueueWebsocketService localCIBuildQueueWebsocketService; + + private final LocalCISharedBuildJobQueueService localCISharedBuildJobQueueService; + + /** + * Instantiates a new Local ci queue websocket service. + * + * @param hazelcastInstance the hazelcast instance + * @param localCIBuildQueueWebsocketService the local ci build queue websocket service + * @param localCISharedBuildJobQueueService the local ci shared build job queue service + */ + public LocalCIQueueWebsocketService(HazelcastInstance hazelcastInstance, LocalCIBuildQueueWebsocketService localCIBuildQueueWebsocketService, + LocalCISharedBuildJobQueueService localCISharedBuildJobQueueService) { + this.hazelcastInstance = hazelcastInstance; + this.localCIBuildQueueWebsocketService = localCIBuildQueueWebsocketService; + this.localCISharedBuildJobQueueService = localCISharedBuildJobQueueService; + this.queue = this.hazelcastInstance.getQueue("buildJobQueue"); + this.processingJobs = this.hazelcastInstance.getMap("processingJobs"); + this.buildAgentInformation = this.hazelcastInstance.getMap("buildAgentInformation"); + } + + /** + * Add listeners for build job queue changes. + */ + @PostConstruct + public void addListeners() { + this.queue.addItemListener(new QueuedBuildJobItemListener(), true); + this.processingJobs.addLocalEntryListener(new ProcessingBuildJobItemListener()); + this.buildAgentInformation.addLocalEntryListener(new BuildAgentListener()); + // localCIBuildQueueWebsocketService will be autowired only if scheduling is active + Objects.requireNonNull(localCIBuildQueueWebsocketService, "localCIBuildQueueWebsocketService must be non-null when scheduling is active."); + } + + private void sendQueuedJobsOverWebsocket(long courseId) { + localCIBuildQueueWebsocketService.sendQueuedBuildJobs(localCISharedBuildJobQueueService.getQueuedJobs()); + localCIBuildQueueWebsocketService.sendQueuedBuildJobsForCourse(courseId, localCISharedBuildJobQueueService.getQueuedJobsForCourse(courseId)); + } + + private void sendProcessingJobsOverWebsocket(long courseId) { + localCIBuildQueueWebsocketService.sendRunningBuildJobs(localCISharedBuildJobQueueService.getProcessingJobs()); + localCIBuildQueueWebsocketService.sendRunningBuildJobsForCourse(courseId, localCISharedBuildJobQueueService.getProcessingJobsForCourse(courseId)); + } + + private void sendBuildAgentInformationOverWebsocket() { + localCIBuildQueueWebsocketService.sendBuildAgentInformation(localCISharedBuildJobQueueService.getBuildAgentInformation()); + } + + private class QueuedBuildJobItemListener implements ItemListener { + + @Override + public void itemAdded(ItemEvent event) { + sendQueuedJobsOverWebsocket(event.getItem().getCourseId()); + } + + @Override + public void itemRemoved(ItemEvent event) { + sendQueuedJobsOverWebsocket(event.getItem().getCourseId()); + } + } + + private class ProcessingBuildJobItemListener implements EntryAddedListener, EntryRemovedListener { + + @Override + public void entryAdded(com.hazelcast.core.EntryEvent event) { + log.debug("CIBuildJobQueueItem added to processing jobs: {}", event.getValue()); + sendProcessingJobsOverWebsocket(event.getValue().getCourseId()); + } + + @Override + public void entryRemoved(com.hazelcast.core.EntryEvent event) { + log.debug("CIBuildJobQueueItem removed from processing jobs: {}", event.getOldValue()); + sendProcessingJobsOverWebsocket(event.getOldValue().getCourseId()); + } + } + + private class BuildAgentListener implements EntryAddedListener, EntryRemovedListener { + + @Override + public void entryAdded(com.hazelcast.core.EntryEvent event) { + log.debug("Build agent added: {}", event.getValue()); + sendBuildAgentInformationOverWebsocket(); + } + + @Override + public void entryRemoved(com.hazelcast.core.EntryEvent event) { + log.debug("Build agent removed: {}", event.getOldValue()); + sendBuildAgentInformationOverWebsocket(); + } + } +} diff --git a/src/main/java/de/tum/in/www1/artemis/service/connectors/localci/LocalCISharedBuildJobQueueService.java b/src/main/java/de/tum/in/www1/artemis/service/connectors/localci/LocalCISharedBuildJobQueueService.java index ab06010c4f7f..b37a1cfb9ac7 100644 --- a/src/main/java/de/tum/in/www1/artemis/service/connectors/localci/LocalCISharedBuildJobQueueService.java +++ b/src/main/java/de/tum/in/www1/artemis/service/connectors/localci/LocalCISharedBuildJobQueueService.java @@ -6,9 +6,10 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -19,8 +20,6 @@ import com.hazelcast.core.HazelcastInstance; import com.hazelcast.cp.lock.FencedLock; import com.hazelcast.map.IMap; -import com.hazelcast.map.listener.EntryAddedListener; -import com.hazelcast.map.listener.EntryRemovedListener; import de.tum.in.www1.artemis.domain.Result; import de.tum.in.www1.artemis.domain.participation.Participation; @@ -33,7 +32,6 @@ import de.tum.in.www1.artemis.service.connectors.localci.dto.LocalCIBuildResult; import de.tum.in.www1.artemis.service.programming.ProgrammingExerciseGradingService; import de.tum.in.www1.artemis.service.programming.ProgrammingMessagingService; -import de.tum.in.www1.artemis.web.websocket.localci.LocalCIBuildQueueWebsocketService; import de.tum.in.www1.artemis.web.websocket.programmingSubmission.BuildTriggerWebsocketError; @Service @@ -77,13 +75,10 @@ public class LocalCISharedBuildJobQueueService { */ private final ReentrantLock instanceLock = new ReentrantLock(); - private final LocalCIBuildQueueWebsocketService localCIBuildQueueWebsocketService; - - @Autowired public LocalCISharedBuildJobQueueService(HazelcastInstance hazelcastInstance, ExecutorService localCIBuildExecutorService, LocalCIBuildJobManagementService localCIBuildJobManagementService, ParticipationRepository participationRepository, ProgrammingExerciseGradingService programmingExerciseGradingService, ProgrammingMessagingService programmingMessagingService, - ProgrammingExerciseRepository programmingExerciseRepository, LocalCIBuildQueueWebsocketService localCIBuildQueueWebsocketService) { + ProgrammingExerciseRepository programmingExerciseRepository) { this.hazelcastInstance = hazelcastInstance; this.localCIBuildExecutorService = (ThreadPoolExecutor) localCIBuildExecutorService; this.localCIBuildJobManagementService = localCIBuildJobManagementService; @@ -95,10 +90,14 @@ public LocalCISharedBuildJobQueueService(HazelcastInstance hazelcastInstance, Ex this.processingJobs = this.hazelcastInstance.getMap("processingJobs"); this.sharedLock = this.hazelcastInstance.getCPSubsystem().getLock("buildJobQueueLock"); this.queue = this.hazelcastInstance.getQueue("buildJobQueue"); + } + + /** + * Add listener to the shared build job queue. + */ + @PostConstruct + public void addListener() { this.queue.addItemListener(new QueuedBuildJobItemListener(), true); - this.processingJobs.addLocalEntryListener(new ProcessingBuildJobItemListener()); - this.buildAgentInformation.addLocalEntryListener(new BuildAgentListener()); - this.localCIBuildQueueWebsocketService = localCIBuildQueueWebsocketService; } /** @@ -193,7 +192,7 @@ private void checkAvailabilityAndProcessNextBuild() { updateLocalBuildAgentInformation(); } - log.info("Node has no available threads currently"); + log.debug("Node has no available threads currently"); return; } @@ -356,6 +355,7 @@ private void processBuild(LocalCIBuildJobQueueItem buildJob) { log.warn("Requeueing failed build job: {}", buildJob); buildJob.setRetryCount(buildJob.getRetryCount() + 1); queue.add(buildJob); + checkAvailabilityAndProcessNextBuild(); } else { log.warn("Participation with id {} has been deleted. Cancelling the requeueing of the build job.", participation.getId()); @@ -409,54 +409,14 @@ private ProgrammingExerciseParticipation retrieveParticipationWithRetry(Long par private class QueuedBuildJobItemListener implements ItemListener { @Override - public void itemAdded(ItemEvent item) { - log.debug("CIBuildJobQueueItem added to queue: {}", item.getItem()); + public void itemAdded(ItemEvent event) { + log.debug("CIBuildJobQueueItem added to queue: {}", event.getItem()); checkAvailabilityAndProcessNextBuild(); - localCIBuildQueueWebsocketService.sendQueuedBuildJobs(getQueuedJobs()); - long courseID = item.getItem().getCourseId(); - localCIBuildQueueWebsocketService.sendQueuedBuildJobsForCourse(courseID, getQueuedJobsForCourse(courseID)); - } - - @Override - public void itemRemoved(ItemEvent item) { - log.debug("CIBuildJobQueueItem removed from queue: {}", item.getItem()); - localCIBuildQueueWebsocketService.sendQueuedBuildJobs(getQueuedJobs()); - long courseID = item.getItem().getCourseId(); - localCIBuildQueueWebsocketService.sendQueuedBuildJobsForCourse(courseID, getQueuedJobsForCourse(courseID)); - } - } - - private class ProcessingBuildJobItemListener implements EntryAddedListener, EntryRemovedListener { - - @Override - public void entryAdded(com.hazelcast.core.EntryEvent event) { - log.debug("CIBuildJobQueueItem added to processing jobs: {}", event.getValue()); - localCIBuildQueueWebsocketService.sendRunningBuildJobs(getProcessingJobs()); - long courseID = event.getValue().getCourseId(); - localCIBuildQueueWebsocketService.sendRunningBuildJobsForCourse(courseID, getProcessingJobsForCourse(courseID)); - } - - @Override - public void entryRemoved(com.hazelcast.core.EntryEvent event) { - log.debug("CIBuildJobQueueItem removed from processing jobs: {}", event.getOldValue()); - localCIBuildQueueWebsocketService.sendRunningBuildJobs(getProcessingJobs()); - long courseID = event.getOldValue().getCourseId(); - localCIBuildQueueWebsocketService.sendRunningBuildJobsForCourse(courseID, getProcessingJobsForCourse(courseID)); - } - } - - private class BuildAgentListener implements EntryAddedListener, EntryRemovedListener { - - @Override - public void entryAdded(com.hazelcast.core.EntryEvent event) { - log.debug("Build agent added: {}", event.getValue()); - localCIBuildQueueWebsocketService.sendBuildAgentInformation(getBuildAgentInformation()); } @Override - public void entryRemoved(com.hazelcast.core.EntryEvent event) { - log.debug("Build agent removed: {}", event.getOldValue()); - localCIBuildQueueWebsocketService.sendBuildAgentInformation(getBuildAgentInformation()); + public void itemRemoved(ItemEvent event) { + log.debug("CIBuildJobQueueItem removed from queue: {}", event.getItem()); } } } diff --git a/src/main/java/de/tum/in/www1/artemis/web/websocket/localci/LocalCIBuildQueueWebsocketService.java b/src/main/java/de/tum/in/www1/artemis/web/websocket/localci/LocalCIBuildQueueWebsocketService.java index 2d72397f998a..1694ba02110c 100644 --- a/src/main/java/de/tum/in/www1/artemis/web/websocket/localci/LocalCIBuildQueueWebsocketService.java +++ b/src/main/java/de/tum/in/www1/artemis/web/websocket/localci/LocalCIBuildQueueWebsocketService.java @@ -1,6 +1,9 @@ package de.tum.in.www1.artemis.web.websocket.localci; import java.util.List; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +27,8 @@ public class LocalCIBuildQueueWebsocketService { private final WebsocketMessagingService websocketMessagingService; + private static final Pattern COURSE_DESTINATION_PATTERN = Pattern.compile("^/topic/courses/(\\d+)/(queued-jobs|running-jobs)$"); + /** * Constructor for dependency injection * @@ -90,4 +95,35 @@ public void sendBuildAgentInformation(List buildAg log.debug("Sending message on topic {}: {}", channel, buildAgentInfo); websocketMessagingService.sendMessage(channel, buildAgentInfo); } + + /** + * Checks if the given destination is a build queue admin destination. + * This is the case if the destination is either /topic/admin/queued-jobs or /topic/admin/running-jobs. + * + * @param destination the destination to check + * @return true if the destination is a build queue admin destination, false otherwise + */ + public static boolean isBuildQueueAdminDestination(String destination) { + return "/topic/admin/queued-jobs".equals(destination) || "/topic/admin/running-jobs".equals(destination); + } + + /** + * Checks if the given destination is a build queue course destination. This is the case if the destination is either + * /topic/courses/{courseId}/queued-jobs or /topic/courses/{courseId}/running-jobs. + * If the destination is a build queue course destination, the courseId is returned. + * + * @param destination the destination to check + * @return the courseId if the destination is a build queue course destination, empty otherwise + */ + public static Optional isBuildQueueCourseDestination(String destination) { + // Define a pattern to match the expected course-related topic format + Matcher matcher = COURSE_DESTINATION_PATTERN.matcher(destination); + + // Check if the destination matches the pattern + if (matcher.matches()) { + // Extract the courseId from the matched groups + return Optional.of(Long.parseLong(matcher.group(1))); + } + return Optional.empty(); + } }