Skip to content

Commit

Permalink
Merge pull request #232 from overture-stack/rc/4.0.0
Browse files Browse the repository at this point in the history
Release Candidate 4.0.0
  • Loading branch information
Buwujiu authored Feb 22, 2022
2 parents 6d18621 + 2491de1 commit 45bb5c2
Show file tree
Hide file tree
Showing 27 changed files with 2,955 additions and 56 deletions.
4 changes: 2 additions & 2 deletions .mvn/maven.config
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
-Drevision=3.12.0
-Drevision=4.0.0
-Dsha1=
-Dchangelist=
-Dchangelist=-SNAPSHOT
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import bio.overture.maestro.app.infra.config.properties.ApplicationProperties;
import bio.overture.maestro.domain.api.exception.NotFoundException;
import bio.overture.maestro.domain.entities.metadata.study.Analysis;
import bio.overture.maestro.domain.entities.metadata.study.GetAnalysisResponse;
import bio.overture.maestro.domain.entities.metadata.study.Study;
import bio.overture.maestro.domain.port.outbound.metadata.study.GetAllStudiesCommand;
import bio.overture.maestro.domain.port.outbound.metadata.study.GetAnalysisCommand;
import bio.overture.maestro.domain.port.outbound.metadata.study.GetStudyAnalysesCommand;
import bio.overture.maestro.domain.port.outbound.metadata.study.StudyDAO;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.inject.Inject;
import lombok.NonNull;
Expand All @@ -48,7 +50,7 @@
class SongStudyDAO implements StudyDAO {

private static final String STUDY_ANALYSES_URL_TEMPLATE =
"{0}/studies/{1}/analysis?analysisStates={2}";
"{0}/studies/{1}/analysis/paginated?analysisStates={2}&limit={3}&offset={4}";
private static final String STUDY_ANALYSIS_URL_TEMPLATE = "{0}/studies/{1}/analysis/{2}";
private static final String STUDIES_URL_TEMPLATE = "{0}/studies/all";
private static final String MSG_STUDY_DOES_NOT_EXIST =
Expand All @@ -58,6 +60,7 @@ class SongStudyDAO implements StudyDAO {
private static final int FALLBACK_SONG_TIMEOUT = 60;
private static final int FALLBACK_SONG_ANALYSIS_TIMEOUT = 5;
private static final int FALLBACK_SONG_MAX_RETRY = 0;
private static final int DEFAULT_SONG_PAGE_LIMIT = 25;
private final WebClient webClient;
private final int songMaxRetries;
private final int minBackoffSec = 1;
Expand All @@ -68,11 +71,16 @@ class SongStudyDAO implements StudyDAO {
private final int studyCallTimeoutSeconds;

private final int analysisCallTimeoutSeconds;
private final int pageLimit;

@Inject
public SongStudyDAO(
@NonNull WebClient webClient, @NonNull ApplicationProperties applicationProperties) {
this.webClient = webClient;
this.pageLimit =
applicationProperties.pageLimit() > 0
? applicationProperties.pageLimit()
: DEFAULT_SONG_PAGE_LIMIT;
this.indexableStudyStatuses = applicationProperties.indexableStudyStatuses();
this.indexableStudyStatusesList = List.of(indexableStudyStatuses.split(","));
this.songMaxRetries =
Expand All @@ -94,34 +102,68 @@ public Mono<List<Analysis>> getStudyAnalyses(GetStudyAnalysesCommand getStudyAna
log.trace("in getStudyAnalyses, args: {} ", getStudyAnalysesCommand);
val repoBaseUrl = getStudyAnalysesCommand.getFilesRepositoryBaseUrl();
val studyId = getStudyAnalysesCommand.getStudyId();
val analysisListType = new ParameterizedTypeReference<List<Analysis>>() {};

var initialOffset = 0;
val url =
format(
STUDY_ANALYSES_URL_TEMPLATE,
repoBaseUrl,
studyId,
this.indexableStudyStatuses,
this.pageLimit,
initialOffset);
val threadSafeOffset = new AtomicInteger(0);

return fetchItems(url, studyId)
// The expand method recursively calls fetchItems() and emits response of first page to the
// last.
// the first request being made is offset = 0, and the second request is offset = 25,
// and all the way to the last page.
.expand(
rep -> {
if (rep.getAnalyses().size() == 0) {
return Mono.empty();
}
threadSafeOffset.addAndGet(this.pageLimit);
val currentUrl =
format(
STUDY_ANALYSES_URL_TEMPLATE,
repoBaseUrl,
studyId,
this.indexableStudyStatuses,
this.pageLimit,
threadSafeOffset.get());
return fetchItems(currentUrl, studyId);
})
.flatMap(rep -> Flux.fromIterable(rep.getAnalyses()))
.collectList()
.doOnSuccess(
(list) ->
log.trace(
"getStudyAnalyses out, analyses count {} args: {}",
list.size(),
getStudyAnalysesCommand));
}

private Mono<GetAnalysisResponse> fetchItems(@NonNull String url, @NonNull String studyId) {
log.trace("get paged analyses, url = {}", url);
val retryConfig =
Retry.allBut(NotFoundException.class)
.retryMax(this.songMaxRetries)
.doOnRetry(
retryCtx ->
log.error(
"exception happened, retrying {}",
getStudyAnalysesCommand,
retryCtx.exception()))
log.error("exception happened, retrying {}", url, retryCtx.exception()))
.exponentialBackoff(
Duration.ofSeconds(minBackoffSec), Duration.ofSeconds(maxBackoffSec));

return this.webClient
.get()
.uri(format(STUDY_ANALYSES_URL_TEMPLATE, repoBaseUrl, studyId, this.indexableStudyStatuses))
.uri(url)
.retrieve()
.onStatus(
HttpStatus.NOT_FOUND::equals,
clientResponse -> error(notFound(MSG_STUDY_DOES_NOT_EXIST, studyId)))
.bodyToMono(analysisListType)
.transform(retryAndTimeout(retryConfig, Duration.ofSeconds(this.studyCallTimeoutSeconds)))
.doOnSuccess(
(list) ->
log.trace(
"getStudyAnalyses out, analyses count {} args: {}",
list.size(),
getStudyAnalysesCommand));
.bodyToMono(GetAnalysisResponse.class)
.transform(retryAndTimeout(retryConfig, Duration.ofSeconds(this.studyCallTimeoutSeconds)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public interface ApplicationProperties {

String indexableStudyStatuses();

int pageLimit();

int songAnalysisCallTimeoutSeconds();

Slack.SlackChannelInfo getSlackChannelInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ public String indexableStudyStatuses() {
return this.song.getIndexableStudyStatesCsv();
}

@Override
public int pageLimit() {
return this.song.getPageLimit();
}

@Override
public int songAnalysisCallTimeoutSeconds() {
return this.song.getTimeoutSec().getAnalysis();
Expand Down Expand Up @@ -270,6 +275,7 @@ private static class Song {
private int maxRetries = 3;
// FIXME: This configuration is called three different things in this codebase
private String indexableStudyStatesCsv = "PUBLISHED";
private int pageLimit = 25;
}

@Data
Expand Down
2 changes: 2 additions & 0 deletions maestro-app/src/main/resources/config/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ maestro:
maxInMemorySize: -1
song:
indexableStudyStatesCsv: PUBLISHED
# page limit for getting analysis from song
pageLimit: 25
maxRetries: 3
timeoutSec:
study: 100 # some studies take really long, +30 secs, to be downloaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,30 +135,50 @@ void shouldRetryFetchingAnalysisOnFailure() {
@Test
@SneakyThrows
void shouldRetryFetchingStudyAnalysesOnFailure() {
val analyses = loadJsonString(this.getClass(), "PEME-CA.study.json");
val analyses = loadJsonString(this.getClass(), "PEME-CA.response.json");
val emptyResp = loadJsonString(this.getClass(), "empty-response.json");
val analysesList =
loadJsonFixture(
this.getClass(), "PEME-CA.study.json", new TypeReference<List<Analysis>>() {});
stubFor(
request("GET", urlEqualTo("/studies/PEME-CA/analysis?analysisStates=PUBLISHED"))
request(
"GET",
urlMatching(
"/studies/PEME-CA/analysis/paginated\\?analysisStates=PUBLISHED&limit=25&offset=\\d+"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs(Scenario.STARTED)
.willReturn(
aResponse()
.withStatus(400)
.withBody("<p> some wierd unexpected text </p>")
.withBody("<p> some weird unexpected text </p>")
.withHeader("content-type", "text/html"))
.willSetStateTo("WORKING"));

stubFor(
request("GET", urlEqualTo("/studies/PEME-CA/analysis?analysisStates=PUBLISHED"))
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs("WORKING")
.willReturn(
aResponse()
.withBody(analyses)
.withStatus(200)
.withHeader("content-type", "application/json")));
// Mock the second/last request:
stubFor(
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=25"))
.inScenario("RANDOM_FAILURE")
.whenScenarioStateIs("WORKING")
.willReturn(
aResponse()
.withBody(emptyResp)
.withStatus(200)
.withHeader("content-type", "application/json")));

val analysesMono =
songStudyDAO.getStudyAnalyses(
Expand All @@ -179,11 +199,24 @@ void fetchingStudyAnalysesShouldReturnRetryExhaustedException() {
.studyId("PEME-CA")
.build();
stubFor(
request("GET", urlEqualTo("/studies/PEME-CA/analysis?analysisStates=PUBLISHED"))
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=0"))
.willReturn(
aResponse()
.withStatus(400)
.withBody("<p> Some wierd unexpected text </p>")
.withBody("<p> Some weird unexpected text </p>")
.withHeader("content-type", "text/html")));
stubFor(
request(
"GET",
urlEqualTo(
"/studies/PEME-CA/analysis/paginated?analysisStates=PUBLISHED&limit=25&offset=100"))
.willReturn(
aResponse()
.withStatus(400)
.withBody("<p> Some weird unexpected text </p>")
.withHeader("content-type", "text/html")));

// when
Expand Down
Loading

0 comments on commit 45bb5c2

Please sign in to comment.