diff --git a/osgp/platform/osgp-core/src/main/java/org/opensmartgridplatform/core/application/tasks/ScheduledTaskExecutorService.java b/osgp/platform/osgp-core/src/main/java/org/opensmartgridplatform/core/application/tasks/ScheduledTaskExecutorService.java index 8bbabfd46d..a2dde242c9 100644 --- a/osgp/platform/osgp-core/src/main/java/org/opensmartgridplatform/core/application/tasks/ScheduledTaskExecutorService.java +++ b/osgp/platform/osgp-core/src/main/java/org/opensmartgridplatform/core/application/tasks/ScheduledTaskExecutorService.java @@ -7,10 +7,10 @@ import java.sql.Timestamp; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.function.Predicate; -import java.util.stream.Collectors; import org.opensmartgridplatform.core.application.config.ScheduledTaskExecutorJobConfig; import org.opensmartgridplatform.core.application.services.DeviceRequestMessageService; import org.opensmartgridplatform.domain.core.entities.Device; @@ -63,16 +63,31 @@ private void processStrandedScheduledTasks() { st -> st.getModificationTimeInstant().isBefore(ultimatePendingTime); final List strandedScheduledTasks = - scheduledTasks.stream().filter(pendingExceeded).collect(Collectors.toList()); + scheduledTasks.stream().filter(pendingExceeded).toList(); + + final List retryScheduledTasks = new ArrayList<>(); + final List deleteScheduledTasks = new ArrayList<>(); + strandedScheduledTasks.forEach( strandedScheduledTask -> { if (this.shouldBeRetried(strandedScheduledTask)) { strandedScheduledTask.retryOn(new Date()); + retryScheduledTasks.add(strandedScheduledTask); + LOGGER.info( + "Scheduled task for device {} with correlationUid {} will be retried", + strandedScheduledTask.getDeviceIdentification(), + strandedScheduledTask.getCorrelationId()); } else { - strandedScheduledTask.setFailed("No response received for scheduled task"); + LOGGER.info( + "Scheduled task for device {} with correlationUid {} will be removed", + strandedScheduledTask.getDeviceIdentification(), + strandedScheduledTask.getCorrelationId()); + deleteScheduledTasks.add(strandedScheduledTask); } - this.scheduledTaskRepository.save(strandedScheduledTask); }); + + this.scheduledTaskRepository.saveAll(retryScheduledTasks); + this.scheduledTaskRepository.deleteAll(deleteScheduledTasks); } private boolean shouldBeRetried(final ScheduledTask scheduledTask) { diff --git a/osgp/platform/osgp-core/src/main/resources/db/migration/V202308011603530000__clean_scheduled_tasks.sql b/osgp/platform/osgp-core/src/main/resources/db/migration/V202308011603530000__clean_scheduled_tasks.sql new file mode 100644 index 0000000000..b1a046bb0e --- /dev/null +++ b/osgp/platform/osgp-core/src/main/resources/db/migration/V202308011603530000__clean_scheduled_tasks.sql @@ -0,0 +1,11 @@ +DO +$$ +BEGIN + +CREATE INDEX IF NOT EXISTS scheduled_task_status_idx on scheduled_task(status); +CREATE INDEX IF NOT EXISTS scheduled_task_scheduled_time_idx on scheduled_task(scheduled_time); + +DELETE FROM scheduled_task WHERE status = 3 AND error_log = 'No response received for scheduled task'; + +END; +$$ diff --git a/osgp/platform/osgp-core/src/test/java/org/opensmartgridplatform/core/application/tasks/ScheduledTaskExecutorServiceTest.java b/osgp/platform/osgp-core/src/test/java/org/opensmartgridplatform/core/application/tasks/ScheduledTaskExecutorServiceTest.java index e1b09df023..f0073adf3b 100644 --- a/osgp/platform/osgp-core/src/test/java/org/opensmartgridplatform/core/application/tasks/ScheduledTaskExecutorServiceTest.java +++ b/osgp/platform/osgp-core/src/test/java/org/opensmartgridplatform/core/application/tasks/ScheduledTaskExecutorServiceTest.java @@ -9,7 +9,6 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -50,7 +49,7 @@ public class ScheduledTaskExecutorServiceTest { private static final String DATA_OBJECT = "data object"; - private static final Timestamp SCHEDULED_TIME = new Timestamp(System.currentTimeMillis()); + private static final Timestamp INITIAL_SCHEDULED_TIME = new Timestamp(System.currentTimeMillis()); @Mock private DeviceRequestMessageService deviceRequestMessageService; @@ -59,7 +58,7 @@ public class ScheduledTaskExecutorServiceTest { @InjectMocks private ScheduledTaskExecutorService scheduledTaskExecutorService; @Mock private ScheduledTaskExecutorJobConfig scheduledTaskExecutorJobConfig; - @Captor private ArgumentCaptor scheduledTaskCaptor; + @Captor private ArgumentCaptor> scheduledTaskCaptor; /** * Test the scheduled task runner for the case when the deviceRequestMessageService gives a @@ -112,17 +111,19 @@ void testRetryStrandedPendingTask() { .thenReturn(-1L); when(this.scheduledTaskExecutorJobConfig.scheduledTaskPageSize()).thenReturn(30); this.whenFindByStatusAndScheduledTime( - expiredPendingTasks, new ArrayList(), new ArrayList()); + expiredPendingTasks, new ArrayList<>(), new ArrayList<>()); this.scheduledTaskExecutorService.processScheduledTasks(); - verify(this.scheduledTaskRepository, times(4)).save(this.scheduledTaskCaptor.capture()); - final List savedScheduledTasks = this.scheduledTaskCaptor.getAllValues(); + verify(this.scheduledTaskRepository).saveAll(this.scheduledTaskCaptor.capture()); + final List retryScheduledTasks = this.scheduledTaskCaptor.getValue(); + assertThat(retryScheduledTasks).hasSize(1); + assertThat(retryScheduledTasks.get(0).getStatus()).isEqualTo(ScheduledTaskStatusType.RETRY); + assertThat(retryScheduledTasks.get(0).getScheduledTime()).isAfter(INITIAL_SCHEDULED_TIME); - assertThat(savedScheduledTasks.get(0).getStatus()).isEqualTo(ScheduledTaskStatusType.FAILED); - assertThat(savedScheduledTasks.get(1).getStatus()).isEqualTo(ScheduledTaskStatusType.RETRY); - assertThat(savedScheduledTasks.get(2).getStatus()).isEqualTo(ScheduledTaskStatusType.FAILED); - assertThat(savedScheduledTasks.get(3).getStatus()).isEqualTo(ScheduledTaskStatusType.FAILED); + verify(this.scheduledTaskRepository).deleteAll(this.scheduledTaskCaptor.capture()); + final List deleteScheduledTasks = this.scheduledTaskCaptor.getValue(); + assertThat(deleteScheduledTasks).hasSize(3); } private void whenFindByStatusAndScheduledTime( @@ -152,20 +153,20 @@ private List createPendingTasks() { private ScheduledTask createScheduledTask( final boolean exceededMaxRetry, final boolean expiredTask) { - MessageMetadata messageMetadata; + final MessageMetadata messageMetadata; if (expiredTask) { messageMetadata = this.createExpiredMessageMetadata(); } else { messageMetadata = this.createMessageMetadata(); } final ScheduledTask expiredScheduledTask = - new ScheduledTask(messageMetadata, DOMAIN, DOMAIN, DATA_OBJECT, SCHEDULED_TIME); + new ScheduledTask(messageMetadata, DOMAIN, DOMAIN, DATA_OBJECT, INITIAL_SCHEDULED_TIME); // retryOn() will raise the number of retries. The retry time will not change since it is the // same as the retry time in the message metadata. State will be set RETRY and will be reset to // PENDING by the setPending method. This is the only way to raise the number of retry above the // maxRetries (0) if (exceededMaxRetry) { - expiredScheduledTask.retryOn(SCHEDULED_TIME); + expiredScheduledTask.retryOn(INITIAL_SCHEDULED_TIME); } expiredScheduledTask.setPending(); return expiredScheduledTask;