Skip to content

Commit

Permalink
Let's evaluate this
Browse files Browse the repository at this point in the history
  • Loading branch information
sundargates committed Aug 9, 2024
1 parent 82f46a2 commit b4f7249
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/mantisrx/api/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static Func1<Observable<? extends Throwable>, Observable<?>> getRetryFunc
final int limit = retries == Integer.MAX_VALUE ? retries : retries + 1;
return attempts -> attempts
.zipWith(Observable.range(1, limit), (t1, integer) -> {
logger.warn("Caught exception connecting for {}.", name, t1);
logger.warn(String.format("Caught exception connecting for %s.", name), t1);
return new ImmutablePair<Throwable, Integer>(t1, integer);
})
.flatMap(pair -> {
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/mantisrx/api/push/ConnectionBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public Observable<String> connect(PushConnectionDetails details) {
if (!connectionCache.containsKey(details)) {
switch (details.type) {
case CONNECT_BY_NAME:
log.info("Getting connection by name for {} since it is not in cache.", details);
return getConnectByNameFor(details)
.subscribeOn(scheduler)
.doOnUnsubscribe(() -> {
Expand All @@ -81,6 +82,7 @@ public Observable<String> connect(PushConnectionDetails details) {
log.info("Purging {} from cache.", details);
connectionCache.remove(details);
})
.doOnError(t -> log.error("Error in connect by name for " + details, t))
.share();
case CONNECT_BY_ID:
return getConnectByIdFor(details)
Expand Down Expand Up @@ -191,13 +193,15 @@ private static SinkConnectionFunc<MantisServerSentEvent> getSseConnFunc(final St

private static Observable<Observable<MantisServerSentEvent>> getResults(boolean isJobId, MantisClient mantisClient,
final String target, SinkParameters sinkParameters) {
log.info("Getting results for target: {}", target);
final AtomicBoolean hasError = new AtomicBoolean();
return isJobId ?
mantisClient.getSinkClientByJobId(target, getSseConnFunc(target, sinkParameters), null).getResults() :
mantisClient.getSinkClientByJobName(target, getSseConnFunc(target, sinkParameters), null)
.switchMap(serverSentEventSinkClient -> {
if (serverSentEventSinkClient.hasError()) {
hasError.set(true);
log.error("Error getting sink client for job " + target, serverSentEventSinkClient.getError());
return Observable.error(new Exception(serverSentEventSinkClient.getError()));
}
return serverSentEventSinkClient.getResults();
Expand All @@ -220,7 +224,7 @@ private Observable<String> getRemoteDataObservable(String uri, String target, Li
log.info("Connecting to remote region {} at {}.", region, uri);
return mantisCrossRegionalClient.getSecureSseClient(region)
.submit(HttpClientRequest.createGet(uri))
.retryWhen(Util.getRetryFunc(log, uri + " in " + region))
// .retryWhen(Util.getRetryFunc(log, uri + " in " + region))
.doOnError(throwable -> log.warn(
"Error getting response from remote SSE server for uri {} in region {}: {}",
uri, region, throwable.getMessage(), throwable)
Expand All @@ -239,7 +243,7 @@ private Observable<String> getRemoteDataObservable(String uri, String target, Li
})
.subscribeOn(scheduler)
.observeOn(scheduler)
.doOnError(t -> log.warn("Error streaming in remote data ({}). Will retry: {}", region, t.getMessage(), t))
.doOnError(t -> log.error("Error streaming in remote data ({}). Will retry: {}", region, t.getMessage(), t))
.doOnCompleted(() -> log.info(String.format("remote sink connection complete for uri %s, region=%s", uri, region)));
}
})
Expand Down

0 comments on commit b4f7249

Please sign in to comment.