Skip to content

Commit

Permalink
Retry step for interrupted thread and error with transient azure tabl…
Browse files Browse the repository at this point in the history
…e client
  • Loading branch information
snf2ye committed Oct 30, 2024
1 parent c80072b commit 9df5ed7
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 9 deletions.
4 changes: 3 additions & 1 deletion src/main/java/bio/terra/common/FutureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.atomic.AtomicReference;

public final class FutureUtils {
public static final String INTERRUPTED_THREAD_MESSAGE = "Thread was interrupted";

private FutureUtils() {}

Expand Down Expand Up @@ -65,7 +66,8 @@ private FutureUtils() {}
// Cancellation may not be necessary, but it can't hurt:
f.cancel(true);
} catch (InterruptedException e) {
foundFailure.compareAndSet(null, new ApiException("Thread was interrupted", e));
foundFailure.compareAndSet(
null, new ApiException(INTERRUPTED_THREAD_MESSAGE, e));
// Cancellation may not be necessary, but it can't hurt:
f.cancel(true);
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package bio.terra.service.snapshot.flight.create;

import bio.terra.common.FlightUtils;
import bio.terra.common.FutureUtils;
import bio.terra.common.exception.ApiException;
import bio.terra.service.common.CommonMapKeys;
import bio.terra.service.common.azure.StorageTableName;
import bio.terra.service.filedata.azure.AzureSynapsePdao;
import bio.terra.service.filedata.azure.tables.TableDao;
import bio.terra.service.filedata.exception.FileSystemExecutionException;
import bio.terra.service.resourcemanagement.azure.AzureAuthService;
import bio.terra.service.resourcemanagement.azure.AzureStorageAuthInfo;
import bio.terra.service.snapshot.Snapshot;
import bio.terra.service.snapshot.SnapshotService;
import bio.terra.stairway.FlightContext;
import bio.terra.stairway.Step;
import bio.terra.stairway.StepResult;
import bio.terra.stairway.StepStatus;
import com.azure.data.tables.TableServiceClient;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -61,13 +65,25 @@ public StepResult doStep(FlightContext context) throws InterruptedException {

Set<String> refIds = azureSynapsePdao.getRefIdsForSnapshot(snapshot);

tableDao.addFilesToSnapshot(
datasetTableServiceClient,
snapshotTableServiceClient,
datasetId,
datasetName,
snapshot,
refIds);
try {
tableDao.addFilesToSnapshot(
datasetTableServiceClient,
snapshotTableServiceClient,
datasetId,
datasetName,
snapshot,
refIds);
} catch (ApiException ex) {
// retry step if thread is interrupted
if (ex.getMessage().contains(FutureUtils.INTERRUPTED_THREAD_MESSAGE)) {
return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, ex);
} else {
return new StepResult(StepStatus.STEP_RESULT_FAILURE_FATAL, ex);
}
} catch (FileSystemExecutionException ex) {
// retry for case that Azure Table operations fail in transient way
return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, ex);
}

return StepResult.getStepResultSuccess();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ public SnapshotCreateFlight(FlightMap inputParameters, Object applicationContext
snapshotService,
datasetId,
datasetName,
snapshotId));
snapshotId),
randomBackoffRetry);

addStep(
new CreateSnapshotStorageTableDependenciesStep(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package bio.terra.service.snapshot.flight.create;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;

import bio.terra.common.FutureUtils;
import bio.terra.common.category.Unit;
import bio.terra.common.exception.ApiException;
import bio.terra.service.common.CommonMapKeys;
import bio.terra.service.filedata.azure.AzureSynapsePdao;
import bio.terra.service.filedata.azure.tables.TableDao;
import bio.terra.service.filedata.exception.FileSystemExecutionException;
import bio.terra.service.resourcemanagement.azure.AzureAuthService;
import bio.terra.service.resourcemanagement.azure.AzureStorageAuthInfo;
import bio.terra.service.snapshot.Snapshot;
import bio.terra.service.snapshot.SnapshotService;
import bio.terra.stairway.FlightContext;
import bio.terra.stairway.FlightMap;
import bio.terra.stairway.StepStatus;
import java.util.Set;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
@Tag(Unit.TAG)
class CreateSnapshotStorageTableDataStepTest {
@Mock private TableDao tableDao;
@Mock private AzureAuthService azureAuthService;
@Mock private AzureSynapsePdao azureSynapsePdao;
@Mock private SnapshotService snapshotService;
@Mock private FlightContext flightContext;
private FlightMap workingMap;
private UUID datasetId;
private String datasetName;
private UUID snapshotId;
private CreateSnapshotStorageTableDataStep step;

@BeforeEach
void setUp() {
snapshotId = UUID.randomUUID();
datasetId = UUID.randomUUID();
datasetName = "datasetName";
workingMap = new FlightMap();
workingMap.put(
CommonMapKeys.DATASET_STORAGE_AUTH_INFO,
new AzureStorageAuthInfo(UUID.randomUUID(), "resourceGroup", "storageAccount"));
workingMap.put(
CommonMapKeys.SNAPSHOT_STORAGE_AUTH_INFO,
new AzureStorageAuthInfo(UUID.randomUUID(), "resourceGroup", "storageAccount"));
when(flightContext.getInputParameters()).thenReturn(new FlightMap());
when(flightContext.getWorkingMap()).thenReturn(workingMap);
when(azureAuthService.getTableServiceClient(any())).thenReturn(null);
when(snapshotService.retrieve(any())).thenReturn(new Snapshot().id(snapshotId));

when(azureSynapsePdao.getRefIdsForSnapshot(any()))
.thenReturn(Set.of(UUID.randomUUID().toString()));
step =
new CreateSnapshotStorageTableDataStep(
tableDao,
azureAuthService,
azureSynapsePdao,
snapshotService,
datasetId,
datasetName,
snapshotId);
}

@Test
void retryInterruptedThread() throws InterruptedException {
doThrow(new ApiException(FutureUtils.INTERRUPTED_THREAD_MESSAGE))
.when(tableDao)
.addFilesToSnapshot(any(), any(), any(), any(), any(), any());
var result = step.doStep(flightContext);
assertThat(result.getStepStatus(), equalTo(StepStatus.STEP_RESULT_FAILURE_RETRY));
}

@Test
void fatalApiException() throws InterruptedException {
doThrow(new ApiException("other error message"))
.when(tableDao)
.addFilesToSnapshot(any(), any(), any(), any(), any(), any());
var result = step.doStep(flightContext);
assertThat(result.getStepStatus(), equalTo(StepStatus.STEP_RESULT_FAILURE_FATAL));
}

@Test
void retryFileSystemExecutionException() throws InterruptedException {
doThrow(new FileSystemExecutionException(""))
.when(tableDao)
.addFilesToSnapshot(any(), any(), any(), any(), any(), any());
var result = step.doStep(flightContext);
assertThat(result.getStepStatus(), equalTo(StepStatus.STEP_RESULT_FAILURE_RETRY));
}

@Test
void addFilesToSnapshotSuccess() throws InterruptedException {
doNothing().when(tableDao).addFilesToSnapshot(any(), any(), any(), any(), any(), any());
var result = step.doStep(flightContext);
assertThat(result.getStepStatus(), equalTo(StepStatus.STEP_RESULT_SUCCESS));
}
}

0 comments on commit 9df5ed7

Please sign in to comment.