diff --git a/src/main/java/io/mantisrx/api/Util.java b/src/main/java/io/mantisrx/api/Util.java index e8fe3f4..0f740a5 100644 --- a/src/main/java/io/mantisrx/api/Util.java +++ b/src/main/java/io/mantisrx/api/Util.java @@ -116,7 +116,7 @@ public static Func1<Observable<? extends Throwable>, Observable<?>> getRetryFunc final int limit = retries == Integer.MAX_VALUE ? retries : retries + 1; return attempts -> attempts .zipWith(Observable.range(1, limit), (t1, integer) -> { - logger.warn("Caught exception connecting for {}.", name, t1); + logger.warn(String.format("Caught exception connecting for %s.", name), t1); return new ImmutablePair<Throwable, Integer>(t1, integer); }) .flatMap(pair -> { diff --git a/src/main/java/io/mantisrx/api/push/ConnectionBroker.java b/src/main/java/io/mantisrx/api/push/ConnectionBroker.java index 17e1106..d38b7fb 100644 --- a/src/main/java/io/mantisrx/api/push/ConnectionBroker.java +++ b/src/main/java/io/mantisrx/api/push/ConnectionBroker.java @@ -71,6 +71,7 @@ public Observable<String> connect(PushConnectionDetails details) { if (!connectionCache.containsKey(details)) { switch (details.type) { case CONNECT_BY_NAME: + log.info("Getting connection by name for {} since it is not in cache.", details); return getConnectByNameFor(details) .subscribeOn(scheduler) .doOnUnsubscribe(() -> { @@ -81,6 +82,7 @@ public Observable<String> connect(PushConnectionDetails details) { log.info("Purging {} from cache.", details); connectionCache.remove(details); }) + .doOnError(t -> log.error("Error in connect by name for " + details, t)) .share(); case CONNECT_BY_ID: return getConnectByIdFor(details) @@ -191,6 +193,7 @@ private static SinkConnectionFunc<MantisServerSentEvent> getSseConnFunc(final St private static Observable<Observable<MantisServerSentEvent>> getResults(boolean isJobId, MantisClient mantisClient, final String target, SinkParameters sinkParameters) { + log.info("Getting results for target: {}", target); final AtomicBoolean hasError = new AtomicBoolean(); return isJobId ? mantisClient.getSinkClientByJobId(target, getSseConnFunc(target, sinkParameters), null).getResults() : @@ -198,6 +201,7 @@ private static Observable<Observable<MantisServerSentEvent>> getResults(boolean .switchMap(serverSentEventSinkClient -> { if (serverSentEventSinkClient.hasError()) { hasError.set(true); + log.error("Error getting sink client for job " + target, serverSentEventSinkClient.getError()); return Observable.error(new Exception(serverSentEventSinkClient.getError())); } return serverSentEventSinkClient.getResults(); @@ -220,7 +224,7 @@ private Observable<String> getRemoteDataObservable(String uri, String target, Li log.info("Connecting to remote region {} at {}.", region, uri); return mantisCrossRegionalClient.getSecureSseClient(region) .submit(HttpClientRequest.createGet(uri)) - .retryWhen(Util.getRetryFunc(log, uri + " in " + region)) +// .retryWhen(Util.getRetryFunc(log, uri + " in " + region)) .doOnError(throwable -> log.warn( "Error getting response from remote SSE server for uri {} in region {}: {}", uri, region, throwable.getMessage(), throwable) @@ -239,7 +243,7 @@ private Observable<String> getRemoteDataObservable(String uri, String target, Li }) .subscribeOn(scheduler) .observeOn(scheduler) - .doOnError(t -> log.warn("Error streaming in remote data ({}). Will retry: {}", region, t.getMessage(), t)) + .doOnError(t -> log.error("Error streaming in remote data ({}). Will retry: {}", region, t.getMessage(), t)) .doOnCompleted(() -> log.info(String.format("remote sink connection complete for uri %s, region=%s", uri, region))); } })