Skip to content

Commit

Permalink
use song performance upgraded paginated get analysis endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Buwujiu committed Feb 17, 2022
1 parent 877e324 commit 7545aa3
Show file tree
Hide file tree
Showing 26 changed files with 2,941 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import bio.overture.maestro.app.infra.config.properties.ApplicationProperties;
import bio.overture.maestro.domain.api.exception.NotFoundException;
import bio.overture.maestro.domain.entities.metadata.study.Analysis;
import bio.overture.maestro.domain.entities.metadata.study.GetAnalysisResponse;
import bio.overture.maestro.domain.entities.metadata.study.Study;
import bio.overture.maestro.domain.port.outbound.metadata.study.GetAllStudiesCommand;
import bio.overture.maestro.domain.port.outbound.metadata.study.GetAnalysisCommand;
Expand All @@ -48,16 +49,15 @@
class SongStudyDAO implements StudyDAO {

private static final String STUDY_ANALYSES_URL_TEMPLATE =
"{0}/studies/{1}/analysis?analysisStates={2}";
"{0}/studies/{1}/analysis/paginated?analysisStates={2}&limit={3}&offset={4}";
private static final String STUDY_ANALYSIS_URL_TEMPLATE = "{0}/studies/{1}/analysis/{2}";
private static final String STUDIES_URL_TEMPLATE = "{0}/studies/all";
private static final String MSG_STUDY_DOES_NOT_EXIST =
"studyId {0} doesn't exist in the specified repository";
private static final String MSG_ANALYSIS_DOES_NOT_EXIST =
"analysis {0} doesn't exist for studyId {1}, repository {2} (or not in a matching state)";
private static final int FALLBACK_SONG_TIMEOUT = 60;
private static final int FALLBACK_SONG_ANALYSIS_TIMEOUT = 5;
private static final int FALLBACK_SONG_MAX_RETRY = 0;
private static final int DEFAULT_SONG_PAGE_LIMIT = 100;
private final WebClient webClient;
private final int songMaxRetries;
private final int minBackoffSec = 1;
Expand All @@ -68,11 +68,16 @@ class SongStudyDAO implements StudyDAO {
private final int studyCallTimeoutSeconds;

private final int analysisCallTimeoutSeconds;
private final int pageLimit;

@Inject
public SongStudyDAO(
@NonNull WebClient webClient, @NonNull ApplicationProperties applicationProperties) {
this.webClient = webClient;
this.pageLimit =
applicationProperties.pageLimit() > 0
? applicationProperties.pageLimit()
: DEFAULT_SONG_PAGE_LIMIT;
this.indexableStudyStatuses = applicationProperties.indexableStudyStatuses();
this.indexableStudyStatusesList = List.of(indexableStudyStatuses.split(","));
this.songMaxRetries =
Expand All @@ -94,7 +99,6 @@ public Mono<List<Analysis>> getStudyAnalyses(GetStudyAnalysesCommand getStudyAna
log.trace("in getStudyAnalyses, args: {} ", getStudyAnalysesCommand);
val repoBaseUrl = getStudyAnalysesCommand.getFilesRepositoryBaseUrl();
val studyId = getStudyAnalysesCommand.getStudyId();
val analysisListType = new ParameterizedTypeReference<List<Analysis>>() {};
val retryConfig =
Retry.allBut(NotFoundException.class)
.retryMax(this.songMaxRetries)
Expand All @@ -107,14 +111,39 @@ public Mono<List<Analysis>> getStudyAnalyses(GetStudyAnalysesCommand getStudyAna
.exponentialBackoff(
Duration.ofSeconds(minBackoffSec), Duration.ofSeconds(maxBackoffSec));

return this.webClient
.get()
.uri(format(STUDY_ANALYSES_URL_TEMPLATE, repoBaseUrl, studyId, this.indexableStudyStatuses))
.retrieve()
.onStatus(
HttpStatus.NOT_FOUND::equals,
clientResponse -> error(notFound(MSG_STUDY_DOES_NOT_EXIST, studyId)))
.bodyToMono(analysisListType)
var offset = 0;
val url =
format(
STUDY_ANALYSES_URL_TEMPLATE,
repoBaseUrl,
studyId,
this.indexableStudyStatuses,
this.pageLimit,
offset);
var wrapper =
new Object() {
int offset = 0;
};

return fetchItems(url)
.expand(
rep -> {
if (rep.getAnalyses().size() == 0) {
return Mono.empty();
}
wrapper.offset += this.pageLimit;
val currentUrl =
format(
STUDY_ANALYSES_URL_TEMPLATE,
repoBaseUrl,
studyId,
this.indexableStudyStatuses,
this.pageLimit,
wrapper.offset);
return fetchItems(currentUrl);
})
.flatMap(rep -> Flux.fromIterable(rep.getAnalyses()))
.collectList()
.transform(retryAndTimeout(retryConfig, Duration.ofSeconds(this.studyCallTimeoutSeconds)))
.doOnSuccess(
(list) ->
Expand All @@ -124,6 +153,11 @@ public Mono<List<Analysis>> getStudyAnalyses(GetStudyAnalysesCommand getStudyAna
getStudyAnalysesCommand));
}

private Mono<GetAnalysisResponse> fetchItems(@NonNull String url) {
log.trace("get paged analyses, url = {}", url);
return this.webClient.get().uri(url).retrieve().bodyToMono(GetAnalysisResponse.class);
}

@Override
public Flux<Study> getStudies(@NonNull GetAllStudiesCommand getAllStudiesCommand) {
log.trace("in getStudyAnalyses, args: {} ", getAllStudiesCommand);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public interface ApplicationProperties {

String indexableStudyStatuses();

int pageLimit();

int songAnalysisCallTimeoutSeconds();

Slack.SlackChannelInfo getSlackChannelInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ public String indexableStudyStatuses() {
return this.song.getIndexableStudyStatesCsv();
}

@Override
public int pageLimit() {
return this.song.getPageLimit();
}

@Override
public int songAnalysisCallTimeoutSeconds() {
return this.song.getTimeoutSec().getAnalysis();
Expand Down Expand Up @@ -270,6 +275,7 @@ private static class Song {
private int maxRetries = 3;
// FIXME: This configuration is called three different things in this codebase
private String indexableStudyStatesCsv = "PUBLISHED";
private int pageLimit = 100;
}

@Data
Expand Down
2 changes: 2 additions & 0 deletions maestro-app/src/main/resources/config/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ maestro:
maxInMemorySize: -1
song:
indexableStudyStatesCsv: PUBLISHED
# page limit for getting analysis from song
pageLimit: 50
maxRetries: 3
timeoutSec:
study: 100 # some studies take really long, +30 secs, to be downloaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,30 +135,50 @@ void shouldRetryFetchingAnalysisOnFailure() {
@Test
@SneakyThrows
void shouldRetryFetchingStudyAnalysesOnFailure() {
val analyses = loadJsonString(this.getClass(), "PEME-CA.study.json");
val analyses = loadJsonString(this.getClass(), "PEME-CA.response.json");
val emptyResp = loadJsonString(this.getClass(), "empty-response.json");
val analysesList =
loadJsonFixture(
this.getClass(), "PEME-CA.study.json", new TypeReference<List<Analysis>>() {});
stubFor(
request("GET", urlEqualTo("/studies/PEME-CA/analysis?analysisStates=PUBLISHED"))
request(
"GET",
urlMatching(
"/studies/PEME-CA/analysis/paginated\\?analysisStates=PUBLISHED&limit=100&offset=\\d+"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs(Scenario.STARTED)
.willReturn(
aResponse()
.withStatus(400)
.withBody("<p> some wierd unexpected text </p>")
.withBody("<p> some weird unexpected text </p>")
.withHeader("content-type", "text/html"))
.willSetStateTo("WORKING"));

stubFor(
request("GET", urlEqualTo("/studies/PEME-CA/analysis?analysisStates=PUBLISHED"))
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs("WORKING")
.willReturn(
aResponse()
.withBody(analyses)
.withStatus(200)
.withHeader("content-type", "application/json")));
// Mock the second/last request:
stubFor(
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs("WORKING")
.willReturn(
aResponse()
.withBody(emptyResp)
.withStatus(200)
.withHeader("content-type", "application/json")));

val analysesMono =
songStudyDAO.getStudyAnalyses(
Expand All @@ -179,11 +199,24 @@ void fetchingStudyAnalysesShouldReturnRetryExhaustedException() {
.studyId("PEME-CA")
.build();
stubFor(
request("GET", urlEqualTo("/studies/PEME-CA/analysis?analysisStates=PUBLISHED"))
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
.willReturn(
aResponse()
.withStatus(400)
.withBody("<p> Some wierd unexpected text </p>")
.withBody("<p> Some weird unexpected text </p>")
.withHeader("content-type", "text/html")));
stubFor(
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
.willReturn(
aResponse()
.withStatus(400)
.withBody("<p> Some weird unexpected text </p>")
.withHeader("content-type", "text/html")));

// when
Expand Down
Loading

0 comments on commit 7545aa3

Please sign in to comment.