From 9df5ed799ee493efb4f30f056015eb88d24032b2 Mon Sep 17 00:00:00 2001 From: Shelby Holden Date: Wed, 30 Oct 2024 09:14:46 -0400 Subject: [PATCH] Retry step for interrupted thread and error with transient azure table client --- .../java/bio/terra/common/FutureUtils.java | 4 +- .../CreateSnapshotStorageTableDataStep.java | 30 +++-- .../flight/create/SnapshotCreateFlight.java | 3 +- ...reateSnapshotStorageTableDataStepTest.java | 111 ++++++++++++++++++ 4 files changed, 139 insertions(+), 9 deletions(-) create mode 100644 src/test/java/bio/terra/service/snapshot/flight/create/CreateSnapshotStorageTableDataStepTest.java diff --git a/src/main/java/bio/terra/common/FutureUtils.java b/src/main/java/bio/terra/common/FutureUtils.java index b3f67dc612..b6e5d7845d 100644 --- a/src/main/java/bio/terra/common/FutureUtils.java +++ b/src/main/java/bio/terra/common/FutureUtils.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicReference; public final class FutureUtils { + public static final String INTERRUPTED_THREAD_MESSAGE = "Thread was interrupted"; private FutureUtils() {} @@ -65,7 +66,8 @@ private FutureUtils() {} // Cancellation may not be necessary, but it can't hurt: f.cancel(true); } catch (InterruptedException e) { - foundFailure.compareAndSet(null, new ApiException("Thread was interrupted", e)); + foundFailure.compareAndSet( + null, new ApiException(INTERRUPTED_THREAD_MESSAGE, e)); // Cancellation may not be necessary, but it can't hurt: f.cancel(true); } catch (ExecutionException e) { diff --git a/src/main/java/bio/terra/service/snapshot/flight/create/CreateSnapshotStorageTableDataStep.java b/src/main/java/bio/terra/service/snapshot/flight/create/CreateSnapshotStorageTableDataStep.java index 2bd2f22106..364884407f 100644 --- a/src/main/java/bio/terra/service/snapshot/flight/create/CreateSnapshotStorageTableDataStep.java +++ b/src/main/java/bio/terra/service/snapshot/flight/create/CreateSnapshotStorageTableDataStep.java @@ -1,10 +1,13 @@ package bio.terra.service.snapshot.flight.create; import bio.terra.common.FlightUtils; +import bio.terra.common.FutureUtils; +import bio.terra.common.exception.ApiException; import bio.terra.service.common.CommonMapKeys; import bio.terra.service.common.azure.StorageTableName; import bio.terra.service.filedata.azure.AzureSynapsePdao; import bio.terra.service.filedata.azure.tables.TableDao; +import bio.terra.service.filedata.exception.FileSystemExecutionException; import bio.terra.service.resourcemanagement.azure.AzureAuthService; import bio.terra.service.resourcemanagement.azure.AzureStorageAuthInfo; import bio.terra.service.snapshot.Snapshot; @@ -12,6 +15,7 @@ import bio.terra.stairway.FlightContext; import bio.terra.stairway.Step; import bio.terra.stairway.StepResult; +import bio.terra.stairway.StepStatus; import com.azure.data.tables.TableServiceClient; import java.util.Set; import java.util.UUID; @@ -61,13 +65,25 @@ public StepResult doStep(FlightContext context) throws InterruptedException { Set refIds = azureSynapsePdao.getRefIdsForSnapshot(snapshot); - tableDao.addFilesToSnapshot( - datasetTableServiceClient, - snapshotTableServiceClient, - datasetId, - datasetName, - snapshot, - refIds); + try { + tableDao.addFilesToSnapshot( + datasetTableServiceClient, + snapshotTableServiceClient, + datasetId, + datasetName, + snapshot, + refIds); + } catch (ApiException ex) { + // retry step if thread is interrupted + if (ex.getMessage().contains(FutureUtils.INTERRUPTED_THREAD_MESSAGE)) { + return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, ex); + } else { + return new StepResult(StepStatus.STEP_RESULT_FAILURE_FATAL, ex); + } + } catch (FileSystemExecutionException ex) { + // retry for case that Azure Table operations fail in transient way + return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, ex); + } return StepResult.getStepResultSuccess(); } diff --git a/src/main/java/bio/terra/service/snapshot/flight/create/SnapshotCreateFlight.java b/src/main/java/bio/terra/service/snapshot/flight/create/SnapshotCreateFlight.java index 8613318977..5b623668ab 100644 --- a/src/main/java/bio/terra/service/snapshot/flight/create/SnapshotCreateFlight.java +++ b/src/main/java/bio/terra/service/snapshot/flight/create/SnapshotCreateFlight.java @@ -380,7 +380,8 @@ public SnapshotCreateFlight(FlightMap inputParameters, Object applicationContext snapshotService, datasetId, datasetName, - snapshotId)); + snapshotId), + randomBackoffRetry); addStep( new CreateSnapshotStorageTableDependenciesStep( diff --git a/src/test/java/bio/terra/service/snapshot/flight/create/CreateSnapshotStorageTableDataStepTest.java b/src/test/java/bio/terra/service/snapshot/flight/create/CreateSnapshotStorageTableDataStepTest.java new file mode 100644 index 0000000000..6533359630 --- /dev/null +++ b/src/test/java/bio/terra/service/snapshot/flight/create/CreateSnapshotStorageTableDataStepTest.java @@ -0,0 +1,111 @@ +package bio.terra.service.snapshot.flight.create; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; + +import bio.terra.common.FutureUtils; +import bio.terra.common.category.Unit; +import bio.terra.common.exception.ApiException; +import bio.terra.service.common.CommonMapKeys; +import bio.terra.service.filedata.azure.AzureSynapsePdao; +import bio.terra.service.filedata.azure.tables.TableDao; +import bio.terra.service.filedata.exception.FileSystemExecutionException; +import bio.terra.service.resourcemanagement.azure.AzureAuthService; +import bio.terra.service.resourcemanagement.azure.AzureStorageAuthInfo; +import bio.terra.service.snapshot.Snapshot; +import bio.terra.service.snapshot.SnapshotService; +import bio.terra.stairway.FlightContext; +import bio.terra.stairway.FlightMap; +import bio.terra.stairway.StepStatus; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +@Tag(Unit.TAG) +class CreateSnapshotStorageTableDataStepTest { + @Mock private TableDao tableDao; + @Mock private AzureAuthService azureAuthService; + @Mock private AzureSynapsePdao azureSynapsePdao; + @Mock private SnapshotService snapshotService; + @Mock private FlightContext flightContext; + private FlightMap workingMap; + private UUID datasetId; + private String datasetName; + private UUID snapshotId; + private CreateSnapshotStorageTableDataStep step; + + @BeforeEach + void setUp() { + snapshotId = UUID.randomUUID(); + datasetId = UUID.randomUUID(); + datasetName = "datasetName"; + workingMap = new FlightMap(); + workingMap.put( + CommonMapKeys.DATASET_STORAGE_AUTH_INFO, + new AzureStorageAuthInfo(UUID.randomUUID(), "resourceGroup", "storageAccount")); + workingMap.put( + CommonMapKeys.SNAPSHOT_STORAGE_AUTH_INFO, + new AzureStorageAuthInfo(UUID.randomUUID(), "resourceGroup", "storageAccount")); + when(flightContext.getInputParameters()).thenReturn(new FlightMap()); + when(flightContext.getWorkingMap()).thenReturn(workingMap); + when(azureAuthService.getTableServiceClient(any())).thenReturn(null); + when(snapshotService.retrieve(any())).thenReturn(new Snapshot().id(snapshotId)); + + when(azureSynapsePdao.getRefIdsForSnapshot(any())) + .thenReturn(Set.of(UUID.randomUUID().toString())); + step = + new CreateSnapshotStorageTableDataStep( + tableDao, + azureAuthService, + azureSynapsePdao, + snapshotService, + datasetId, + datasetName, + snapshotId); + } + + @Test + void retryInterruptedThread() throws InterruptedException { + doThrow(new ApiException(FutureUtils.INTERRUPTED_THREAD_MESSAGE)) + .when(tableDao) + .addFilesToSnapshot(any(), any(), any(), any(), any(), any()); + var result = step.doStep(flightContext); + assertThat(result.getStepStatus(), equalTo(StepStatus.STEP_RESULT_FAILURE_RETRY)); + } + + @Test + void fatalApiException() throws InterruptedException { + doThrow(new ApiException("other error message")) + .when(tableDao) + .addFilesToSnapshot(any(), any(), any(), any(), any(), any()); + var result = step.doStep(flightContext); + assertThat(result.getStepStatus(), equalTo(StepStatus.STEP_RESULT_FAILURE_FATAL)); + } + + @Test + void retryFileSystemExecutionException() throws InterruptedException { + doThrow(new FileSystemExecutionException("")) + .when(tableDao) + .addFilesToSnapshot(any(), any(), any(), any(), any(), any()); + var result = step.doStep(flightContext); + assertThat(result.getStepStatus(), equalTo(StepStatus.STEP_RESULT_FAILURE_RETRY)); + } + + @Test + void addFilesToSnapshotSuccess() throws InterruptedException { + doNothing().when(tableDao).addFilesToSnapshot(any(), any(), any(), any(), any(), any()); + var result = step.doStep(flightContext); + assertThat(result.getStepStatus(), equalTo(StepStatus.STEP_RESULT_SUCCESS)); + } +}