Skip to content

Commit

Permalink
change default page limit to 25 & code update
Browse files Browse the repository at this point in the history
  • Loading branch information
Buwujiu committed Feb 18, 2022
1 parent 9e6ea35 commit 2a62983
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import bio.overture.maestro.domain.port.outbound.metadata.study.StudyDAO;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.inject.Inject;
import lombok.NonNull;
Expand All @@ -52,12 +53,14 @@ class SongStudyDAO implements StudyDAO {
"{0}/studies/{1}/analysis/paginated?analysisStates={2}&limit={3}&offset={4}";
private static final String STUDY_ANALYSIS_URL_TEMPLATE = "{0}/studies/{1}/analysis/{2}";
private static final String STUDIES_URL_TEMPLATE = "{0}/studies/all";
private static final String MSG_STUDY_DOES_NOT_EXIST =
"studyId {0} doesn't exist in the specified repository";
private static final String MSG_ANALYSIS_DOES_NOT_EXIST =
"analysis {0} doesn't exist for studyId {1}, repository {2} (or not in a matching state)";
private static final int FALLBACK_SONG_TIMEOUT = 60;
private static final int FALLBACK_SONG_ANALYSIS_TIMEOUT = 5;
private static final int FALLBACK_SONG_MAX_RETRY = 0;
private static final int DEFAULT_SONG_PAGE_LIMIT = 100;
private static final int DEFAULT_SONG_PAGE_LIMIT = 25;
private final WebClient webClient;
private final int songMaxRetries;
private final int minBackoffSec = 1;
Expand Down Expand Up @@ -99,52 +102,41 @@ public Mono<List<Analysis>> getStudyAnalyses(GetStudyAnalysesCommand getStudyAna
log.trace("in getStudyAnalyses, args: {} ", getStudyAnalysesCommand);
val repoBaseUrl = getStudyAnalysesCommand.getFilesRepositoryBaseUrl();
val studyId = getStudyAnalysesCommand.getStudyId();
val retryConfig =
Retry.allBut(NotFoundException.class)
.retryMax(this.songMaxRetries)
.doOnRetry(
retryCtx ->
log.error(
"exception happened, retrying {}",
getStudyAnalysesCommand,
retryCtx.exception()))
.exponentialBackoff(
Duration.ofSeconds(minBackoffSec), Duration.ofSeconds(maxBackoffSec));

var offset = 0;
var initialOffset = 0;
val url =
format(
STUDY_ANALYSES_URL_TEMPLATE,
repoBaseUrl,
studyId,
this.indexableStudyStatuses,
this.pageLimit,
offset);
var wrapper =
new Object() {
int offset = 0;
};

return fetchItems(url)
initialOffset);
val threadSafeOffset = new AtomicInteger(0);

return fetchItems(url, studyId)
// The expand method recursively calls fetchItems() and emits response of first page to the
// last.
// the first request being made is offset = 0, and the second request is offset = 25,
// and all the way to the last page.
.expand(
rep -> {
if (rep.getAnalyses().size() == 0) {
return Mono.empty();
}
wrapper.offset += this.pageLimit;
threadSafeOffset.addAndGet(this.pageLimit);
val currentUrl =
format(
STUDY_ANALYSES_URL_TEMPLATE,
repoBaseUrl,
studyId,
this.indexableStudyStatuses,
this.pageLimit,
wrapper.offset);
return fetchItems(currentUrl);
threadSafeOffset.get());
return fetchItems(currentUrl, studyId);
})
.flatMap(rep -> Flux.fromIterable(rep.getAnalyses()))
.collectList()
.transform(retryAndTimeout(retryConfig, Duration.ofSeconds(this.studyCallTimeoutSeconds)))
.doOnSuccess(
(list) ->
log.trace(
Expand All @@ -153,9 +145,25 @@ public Mono<List<Analysis>> getStudyAnalyses(GetStudyAnalysesCommand getStudyAna
getStudyAnalysesCommand));
}

private Mono<GetAnalysisResponse> fetchItems(@NonNull String url) {
private Mono<GetAnalysisResponse> fetchItems(@NonNull String url, @NonNull String studyId) {
log.trace("get paged analyses, url = {}", url);
return this.webClient.get().uri(url).retrieve().bodyToMono(GetAnalysisResponse.class);
val retryConfig =
Retry.allBut(NotFoundException.class)
.retryMax(this.songMaxRetries)
.doOnRetry(
retryCtx ->
log.error("exception happened, retrying {}", url, retryCtx.exception()))
.exponentialBackoff(
Duration.ofSeconds(minBackoffSec), Duration.ofSeconds(maxBackoffSec));
return this.webClient
.get()
.uri(url)
.retrieve()
.onStatus(
HttpStatus.NOT_FOUND::equals,
clientResponse -> error(notFound(MSG_STUDY_DOES_NOT_EXIST, studyId)))
.bodyToMono(GetAnalysisResponse.class)
.transform(retryAndTimeout(retryConfig, Duration.ofSeconds(this.studyCallTimeoutSeconds)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private static class Song {
private int maxRetries = 3;
// FIXME: This configuration is called three different things in this codebase
private String indexableStudyStatesCsv = "PUBLISHED";
private int pageLimit = 100;
private int pageLimit = 25;
}

@Data
Expand Down
2 changes: 1 addition & 1 deletion maestro-app/src/main/resources/config/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ maestro:
song:
indexableStudyStatesCsv: PUBLISHED
# page limit for getting analysis from song
pageLimit: 100
pageLimit: 25
maxRetries: 3
timeoutSec:
study: 100 # some studies take really long, +30 secs, to be downloaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void shouldRetryFetchingStudyAnalysesOnFailure() {
request(
"GET",
urlMatching(
"/studies/PEME-CA/analysis/paginated\\?analysisStates=PUBLISHED&limit=100&offset=\\d+"))
"/studies/PEME-CA/analysis/paginated\\?analysisStates=PUBLISHED&limit=25&offset=\\d+"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs(Scenario.STARTED)
.willReturn(
Expand All @@ -158,7 +158,7 @@ void shouldRetryFetchingStudyAnalysesOnFailure() {
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs("WORKING")
.willReturn(
Expand All @@ -171,7 +171,7 @@ void shouldRetryFetchingStudyAnalysesOnFailure() {
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=25"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs("WORKING")
.willReturn(
Expand Down Expand Up @@ -202,7 +202,7 @@ void fetchingStudyAnalysesShouldReturnRetryExhaustedException() {
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.willReturn(
aResponse()
.withStatus(400)
Expand All @@ -212,7 +212,7 @@ void fetchingStudyAnalysesShouldReturnRetryExhaustedException() {
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=100"))
.willReturn(
aResponse()
.withStatus(400)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void shouldIndexStudyRepositoryWithExclusionsApplied() throws InterruptedExcepti
urlEqualTo(
"/collab/studies/"
+ studyId
+ "/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
+ "/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.willReturn(
aResponse()
.withStatus(200)
Expand All @@ -246,7 +246,7 @@ void shouldIndexStudyRepositoryWithExclusionsApplied() throws InterruptedExcepti
urlEqualTo(
"/collab/studies/"
+ studyId
+ "/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
+ "/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=25"))
.willReturn(
aResponse()
.withStatus(200)
Expand Down Expand Up @@ -312,7 +312,7 @@ void shouldIndexStudyWithExclusionsApplied() throws InterruptedException, IOExce
request(
"GET",
urlEqualTo(
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.willReturn(
aResponse()
.withStatus(200)
Expand All @@ -322,7 +322,7 @@ void shouldIndexStudyWithExclusionsApplied() throws InterruptedException, IOExce
request(
"GET",
urlEqualTo(
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=25"))
.willReturn(
aResponse()
.withStatus(200)
Expand Down Expand Up @@ -385,7 +385,7 @@ void shouldDeleteSingleAnalysis() throws InterruptedException, IOException {
request(
"GET",
urlEqualTo(
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.willReturn(
aResponse()
.withBody(resp)
Expand All @@ -395,7 +395,7 @@ void shouldDeleteSingleAnalysis() throws InterruptedException, IOException {
request(
"GET",
urlEqualTo(
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=25"))
.willReturn(
aResponse()
.withBody(emptyResp)
Expand Down Expand Up @@ -468,7 +468,7 @@ void shouldUpdateExistingDocRepository() throws InterruptedException, IOExceptio
request(
"GET",
urlEqualTo(
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.willReturn(
aResponse()
.withStatus(200)
Expand All @@ -478,7 +478,7 @@ void shouldUpdateExistingDocRepository() throws InterruptedException, IOExceptio
request(
"GET",
urlEqualTo(
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=25"))
.willReturn(
aResponse()
.withStatus(200)
Expand All @@ -489,7 +489,7 @@ void shouldUpdateExistingDocRepository() throws InterruptedException, IOExceptio
request(
"GET",
urlEqualTo(
"/aws/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
"/aws/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.willReturn(
aResponse()
.withStatus(200)
Expand All @@ -499,7 +499,7 @@ void shouldUpdateExistingDocRepository() throws InterruptedException, IOExceptio
request(
"GET",
urlEqualTo(
"/aws/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
"/aws/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=25"))
.willReturn(
aResponse()
.withStatus(200)
Expand Down Expand Up @@ -587,7 +587,7 @@ void shouldDetectAndNotifyConflictingDocuments() throws InterruptedException, IO
request(
"GET",
urlEqualTo(
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.willReturn(
aResponse()
.withStatus(200)
Expand All @@ -597,7 +597,7 @@ void shouldDetectAndNotifyConflictingDocuments() throws InterruptedException, IO
request(
"GET",
urlEqualTo(
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
"/collab/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=25"))
.willReturn(
aResponse()
.withStatus(200)
Expand All @@ -608,7 +608,7 @@ void shouldDetectAndNotifyConflictingDocuments() throws InterruptedException, IO
request(
"GET",
urlEqualTo(
"/aws/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=0"))
"/aws/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.willReturn(
aResponse()
.withStatus(200)
Expand All @@ -618,7 +618,7 @@ void shouldDetectAndNotifyConflictingDocuments() throws InterruptedException, IO
request(
"GET",
urlEqualTo(
"/aws/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=100&offset=100"))
"/aws/studies/EUCANCAN-BE/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=25"))
.willReturn(
aResponse()
.withStatus(200)
Expand Down
Loading

0 comments on commit 2a62983

Please sign in to comment.