Skip to content

Commit

Permalink
Merge pull request #230 from overture-stack/225-use-new-song-endpoint
Browse files Browse the repository at this point in the history
use song performance upgraded paginated get analysis endpoint
  • Loading branch information
Buwujiu authored Feb 18, 2022
2 parents 877e324 + 2a62983 commit 73e6971
Show file tree
Hide file tree
Showing 26 changed files with 2,953 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import bio.overture.maestro.app.infra.config.properties.ApplicationProperties;
import bio.overture.maestro.domain.api.exception.NotFoundException;
import bio.overture.maestro.domain.entities.metadata.study.Analysis;
import bio.overture.maestro.domain.entities.metadata.study.GetAnalysisResponse;
import bio.overture.maestro.domain.entities.metadata.study.Study;
import bio.overture.maestro.domain.port.outbound.metadata.study.GetAllStudiesCommand;
import bio.overture.maestro.domain.port.outbound.metadata.study.GetAnalysisCommand;
import bio.overture.maestro.domain.port.outbound.metadata.study.GetStudyAnalysesCommand;
import bio.overture.maestro.domain.port.outbound.metadata.study.StudyDAO;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.inject.Inject;
import lombok.NonNull;
Expand All @@ -48,7 +50,7 @@
class SongStudyDAO implements StudyDAO {

private static final String STUDY_ANALYSES_URL_TEMPLATE =
"{0}/studies/{1}/analysis?analysisStates={2}";
"{0}/studies/{1}/analysis/paginated?analysisStates={2}&limit={3}&offset={4}";
private static final String STUDY_ANALYSIS_URL_TEMPLATE = "{0}/studies/{1}/analysis/{2}";
private static final String STUDIES_URL_TEMPLATE = "{0}/studies/all";
private static final String MSG_STUDY_DOES_NOT_EXIST =
Expand All @@ -58,6 +60,7 @@ class SongStudyDAO implements StudyDAO {
private static final int FALLBACK_SONG_TIMEOUT = 60;
private static final int FALLBACK_SONG_ANALYSIS_TIMEOUT = 5;
private static final int FALLBACK_SONG_MAX_RETRY = 0;
private static final int DEFAULT_SONG_PAGE_LIMIT = 25;
private final WebClient webClient;
private final int songMaxRetries;
private final int minBackoffSec = 1;
Expand All @@ -68,11 +71,16 @@ class SongStudyDAO implements StudyDAO {
private final int studyCallTimeoutSeconds;

private final int analysisCallTimeoutSeconds;
private final int pageLimit;

@Inject
public SongStudyDAO(
@NonNull WebClient webClient, @NonNull ApplicationProperties applicationProperties) {
this.webClient = webClient;
this.pageLimit =
applicationProperties.pageLimit() > 0
? applicationProperties.pageLimit()
: DEFAULT_SONG_PAGE_LIMIT;
this.indexableStudyStatuses = applicationProperties.indexableStudyStatuses();
this.indexableStudyStatusesList = List.of(indexableStudyStatuses.split(","));
this.songMaxRetries =
Expand All @@ -94,34 +102,68 @@ public Mono<List<Analysis>> getStudyAnalyses(GetStudyAnalysesCommand getStudyAna
log.trace("in getStudyAnalyses, args: {} ", getStudyAnalysesCommand);
val repoBaseUrl = getStudyAnalysesCommand.getFilesRepositoryBaseUrl();
val studyId = getStudyAnalysesCommand.getStudyId();
val analysisListType = new ParameterizedTypeReference<List<Analysis>>() {};

var initialOffset = 0;
val url =
format(
STUDY_ANALYSES_URL_TEMPLATE,
repoBaseUrl,
studyId,
this.indexableStudyStatuses,
this.pageLimit,
initialOffset);
val threadSafeOffset = new AtomicInteger(0);

return fetchItems(url, studyId)
// The expand method recursively calls fetchItems() and emits response of first page to the
// last.
// the first request being made is offset = 0, and the second request is offset = 25,
// and all the way to the last page.
.expand(
rep -> {
if (rep.getAnalyses().size() == 0) {
return Mono.empty();
}
threadSafeOffset.addAndGet(this.pageLimit);
val currentUrl =
format(
STUDY_ANALYSES_URL_TEMPLATE,
repoBaseUrl,
studyId,
this.indexableStudyStatuses,
this.pageLimit,
threadSafeOffset.get());
return fetchItems(currentUrl, studyId);
})
.flatMap(rep -> Flux.fromIterable(rep.getAnalyses()))
.collectList()
.doOnSuccess(
(list) ->
log.trace(
"getStudyAnalyses out, analyses count {} args: {}",
list.size(),
getStudyAnalysesCommand));
}

private Mono<GetAnalysisResponse> fetchItems(@NonNull String url, @NonNull String studyId) {
log.trace("get paged analyses, url = {}", url);
val retryConfig =
Retry.allBut(NotFoundException.class)
.retryMax(this.songMaxRetries)
.doOnRetry(
retryCtx ->
log.error(
"exception happened, retrying {}",
getStudyAnalysesCommand,
retryCtx.exception()))
log.error("exception happened, retrying {}", url, retryCtx.exception()))
.exponentialBackoff(
Duration.ofSeconds(minBackoffSec), Duration.ofSeconds(maxBackoffSec));

return this.webClient
.get()
.uri(format(STUDY_ANALYSES_URL_TEMPLATE, repoBaseUrl, studyId, this.indexableStudyStatuses))
.uri(url)
.retrieve()
.onStatus(
HttpStatus.NOT_FOUND::equals,
clientResponse -> error(notFound(MSG_STUDY_DOES_NOT_EXIST, studyId)))
.bodyToMono(analysisListType)
.transform(retryAndTimeout(retryConfig, Duration.ofSeconds(this.studyCallTimeoutSeconds)))
.doOnSuccess(
(list) ->
log.trace(
"getStudyAnalyses out, analyses count {} args: {}",
list.size(),
getStudyAnalysesCommand));
.bodyToMono(GetAnalysisResponse.class)
.transform(retryAndTimeout(retryConfig, Duration.ofSeconds(this.studyCallTimeoutSeconds)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public interface ApplicationProperties {

String indexableStudyStatuses();

int pageLimit();

int songAnalysisCallTimeoutSeconds();

Slack.SlackChannelInfo getSlackChannelInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ public String indexableStudyStatuses() {
return this.song.getIndexableStudyStatesCsv();
}

@Override
public int pageLimit() {
return this.song.getPageLimit();
}

@Override
public int songAnalysisCallTimeoutSeconds() {
return this.song.getTimeoutSec().getAnalysis();
Expand Down Expand Up @@ -270,6 +275,7 @@ private static class Song {
private int maxRetries = 3;
// FIXME: This configuration is called three different things in this codebase
private String indexableStudyStatesCsv = "PUBLISHED";
private int pageLimit = 25;
}

@Data
Expand Down
2 changes: 2 additions & 0 deletions maestro-app/src/main/resources/config/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ maestro:
maxInMemorySize: -1
song:
indexableStudyStatesCsv: PUBLISHED
# page limit for getting analysis from song
pageLimit: 25
maxRetries: 3
timeoutSec:
study: 100 # some studies take really long, +30 secs, to be downloaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,30 +135,50 @@ void shouldRetryFetchingAnalysisOnFailure() {
@Test
@SneakyThrows
void shouldRetryFetchingStudyAnalysesOnFailure() {
val analyses = loadJsonString(this.getClass(), "PEME-CA.study.json");
val analyses = loadJsonString(this.getClass(), "PEME-CA.response.json");
val emptyResp = loadJsonString(this.getClass(), "empty-response.json");
val analysesList =
loadJsonFixture(
this.getClass(), "PEME-CA.study.json", new TypeReference<List<Analysis>>() {});
stubFor(
request("GET", urlEqualTo("/studies/PEME-CA/analysis?analysisStates=PUBLISHED"))
request(
"GET",
urlMatching(
"/studies/PEME-CA/analysis/paginated\\?analysisStates=PUBLISHED&limit=25&offset=\\d+"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs(Scenario.STARTED)
.willReturn(
aResponse()
.withStatus(400)
.withBody("<p> some wierd unexpected text </p>")
.withBody("<p> some weird unexpected text </p>")
.withHeader("content-type", "text/html"))
.willSetStateTo("WORKING"));

stubFor(
request("GET", urlEqualTo("/studies/PEME-CA/analysis?analysisStates=PUBLISHED"))
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs("WORKING")
.willReturn(
aResponse()
.withBody(analyses)
.withStatus(200)
.withHeader("content-type", "application/json")));
// Mock the second/last request:
stubFor(
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=25"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs("WORKING")
.willReturn(
aResponse()
.withBody(emptyResp)
.withStatus(200)
.withHeader("content-type", "application/json")));

val analysesMono =
songStudyDAO.getStudyAnalyses(
Expand All @@ -179,11 +199,24 @@ void fetchingStudyAnalysesShouldReturnRetryExhaustedException() {
.studyId("PEME-CA")
.build();
stubFor(
request("GET", urlEqualTo("/studies/PEME-CA/analysis?analysisStates=PUBLISHED"))
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.willReturn(
aResponse()
.withStatus(400)
.withBody("<p> Some wierd unexpected text </p>")
.withBody("<p> Some weird unexpected text </p>")
.withHeader("content-type", "text/html")));
stubFor(
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=100"))
.willReturn(
aResponse()
.withStatus(400)
.withBody("<p> Some weird unexpected text </p>")
.withHeader("content-type", "text/html")));

// when
Expand Down
Loading

0 comments on commit 73e6971

Please sign in to comment.