From 650ee00905a68e03ffd69a0606064ed33180c2eb Mon Sep 17 00:00:00 2001 From: Vijaya Gopal Yarramneni Date: Tue, 25 Jun 2019 03:55:21 -0700 Subject: [PATCH] Fixing a rare case where idle connection is closed by the service at the exact time the client is creating a link. (#373) If the connection is closed by service after CBS token is sent, link open operaiton was failing with AuthorizsationFailed exception. --- azure-servicebus/azure-servicebus.pom | 2 +- .../primitives/CoreMessageReceiver.java | 53 +++++++++++++++++-- .../primitives/CoreMessageSender.java | 28 +++++++++- .../primitives/MessagingFactory.java | 14 ++++- .../primitives/RequestResponseLink.java | 22 +++++++- pom.xml | 2 +- 6 files changed, 109 insertions(+), 12 deletions(-) diff --git a/azure-servicebus/azure-servicebus.pom b/azure-servicebus/azure-servicebus.pom index d2f8db8b..09875011 100644 --- a/azure-servicebus/azure-servicebus.pom +++ b/azure-servicebus/azure-servicebus.pom @@ -4,7 +4,7 @@ 4.0.0 com.microsoft.azure azure-servicebus - 1.2.14 + 1.2.15 The MIT License (MIT) diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java index 57edc305..18a42f63 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java @@ -102,11 +102,13 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, private ScheduledFuture sasTokenRenewTimerFuture; private CompletableFuture requestResponseLinkCreationFuture; private CompletableFuture receiveLinkReopenFuture; + private CompletableFuture ensureLinkReopenFutureToWaitOn; private final Runnable timedOutUpdateStateRequestsDaemon; private final Runnable returnMesagesLoopDaemon; private final ScheduledFuture updateStateRequestsTimeoutChecker; private final ScheduledFuture returnMessagesLoopRunner; private final MessagingEntityType entityType; + private boolean shouldRetryLinkReopenOnTransientFailure = true; // TODO Change onReceiveComplete to handle empty deliveries. Change onError to retry updateState requests. private CoreMessageReceiver(final MessagingFactory factory, @@ -370,8 +372,25 @@ private void closeRequestResponseLink() private void createReceiveLink() { TRACE_LOGGER.info("Creating receive link to '{}'", this.receivePath); - Connection connection = this.underlyingFactory.getConnection(); - + Connection connection = this.underlyingFactory.getActiveConnectionOrNothing(); + if (connection == null) { + // Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending + // CBS token but before opening a link. + TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry."); + ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry."); + if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) { + // Should never happen + AsyncUtil.completeFutureExceptionally(this.linkOpen.getWork(), exception); + } + + if(this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) { + // Complete the future and re-attempt link creation + AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception); + } + + return; + } + final Session session = connection.session(); session.setIncomingCapacity(Integer.MAX_VALUE); session.open(); @@ -877,7 +896,7 @@ public void onEvent() catch (IOException ioException) { this.pendingReceives.remove(receiveWorkItem); - this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount()); + this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount()); receiveWorkItem.getWork().completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", ioException)); receiveWorkItem.cancelTimeoutTask(false); } @@ -1191,7 +1210,33 @@ public void onEvent() }, MessagingFactory.INTERNAL_THREAD_POOL); } - return this.receiveLinkReopenFuture; + if (this.ensureLinkReopenFutureToWaitOn == null || this.ensureLinkReopenFutureToWaitOn.isDone()) { + this.ensureLinkReopenFutureToWaitOn = new CompletableFuture(); + this.shouldRetryLinkReopenOnTransientFailure = true; + } + + this.receiveLinkReopenFuture.handleAsync((v, ex) -> { + if (ex == null) { + this.ensureLinkReopenFutureToWaitOn.complete(null); + } else { + if (ex instanceof ServiceBusException && ((ServiceBusException)ex).getIsTransient()) { + if (this.shouldRetryLinkReopenOnTransientFailure) { + // Retry link creation + this.shouldRetryLinkReopenOnTransientFailure = false; + this.ensureLinkIsOpen(); + } else { + this.ensureLinkReopenFutureToWaitOn.completeExceptionally(ex); + } + } else { + this.ensureLinkReopenFutureToWaitOn.completeExceptionally(ex); + } + + } + return null; + }, + MessagingFactory.INTERNAL_THREAD_POOL); + + return this.ensureLinkReopenFutureToWaitOn; } else { diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java index 44cc22a7..f1af29bd 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java @@ -86,6 +86,7 @@ public class CoreMessageSender extends ClientEntity implements IAmqpSender, IErr private CompletableFuture requestResponseLinkCreationFuture; private CompletableFuture sendLinkReopenFuture; private int maxMessageSize; + private boolean shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true; @Deprecated public static CompletableFuture create( @@ -375,6 +376,7 @@ public CompletableFuture sendAsync(Message msg) @Override public void onOpenComplete(Exception completionException) { + this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = true; if (completionException == null) { this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink); @@ -576,13 +578,35 @@ private void cleanupFailedSend(final SendWorkItem failedSend, final Throwa private void createSendLink() { TRACE_LOGGER.info("Creating send link to '{}'", this.sendPath); - final Connection connection = this.underlyingFactory.getConnection(); + Connection connection = this.underlyingFactory.getActiveConnectionOrNothing(); + if (connection == null) { + // Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending + // CBS token but before opening a link. + TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry."); + ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry."); + if (this.linkFirstOpen != null && !this.linkFirstOpen.isDone()) { + // Should never happen + AsyncUtil.completeFutureExceptionally(this.linkFirstOpen, exception); + } + + if (this.sendLinkReopenFuture != null && !this.sendLinkReopenFuture.isDone()) { + // Complete the future and re-attempt link creation + AsyncUtil.completeFutureExceptionally(this.sendLinkReopenFuture, exception); + if(this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent) { + this.shouldRetryLinkOpenIfConnectionIsClosedAfterCBSTokenSent = false; + Timer.schedule(() -> {this.ensureLinkIsOpen();}, Duration.ZERO, TimerType.OneTimeRun); + } + } + + return; + } + final Session session = connection.session(); session.setOutgoingWindow(Integer.MAX_VALUE); session.open(); BaseHandler.setHandler(session, new SessionHandler(sendPath)); - final String sendLinkNamePrefix = StringUtil.getShortRandomString(); + final String sendLinkNamePrefix = "sender".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString()); final String sendLinkName = !StringUtil.isNullOrEmpty(connection.getRemoteContainer()) ? sendLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()) : sendLinkNamePrefix; diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java index dbe230eb..29d4c20b 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java @@ -140,7 +140,17 @@ private void startReactor(ReactorHandler reactorHandler) throws IOException TRACE_LOGGER.info("Started reactor"); } - Connection getConnection() + Connection getActiveConnectionOrNothing() + { + if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) { + return null; + } + else { + return this.connection; + } + } + + Connection getActiveConnectionCreateIfNecessary() { if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) { @@ -667,7 +677,7 @@ private void createCBSLinkAsync() this.cbsLinkCreationFuture.completeExceptionally(completionEx); } else - { + { String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath(); TRACE_LOGGER.info("Creating CBS link to {}", requestResponseLinkPath); RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null).handleAsync((cbsLink, ex) -> diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java index 3e261b3c..151c2d28 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java @@ -239,9 +239,27 @@ private void createInternalLinks() commonLinkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, this.entityType.getIntValue()); } - // Create send link - final Connection connection = this.underlyingFactory.getConnection(); + Connection connection; + if(this.sasTokenAudienceURI == null) { + // CBS link. Doesn't have to send SAS token + connection = this.underlyingFactory.getActiveConnectionCreateIfNecessary(); + } + else { + connection = this.underlyingFactory.getActiveConnectionOrNothing(); + if (connection == null) { + // Connection closed after sending CBS token. Happens only in the rare case of azure service bus closing idle connection, just right after sending + // CBS token but before opening a link. + TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry."); + ServiceBusException exception = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry."); + AsyncUtil.completeFutureExceptionally(this.amqpSender.openFuture, exception); + AsyncUtil.completeFutureExceptionally(this.amqpReceiver.openFuture, exception); + // Retry with little delay so that link recreation in progress flag is reset + Timer.schedule(() -> {RequestResponseLink.this.ensureUniqueLinkRecreation();}, Duration.ofMillis(1000), TimerType.OneTimeRun); + return; + } + } + // Create send link Session session = connection.session(); session.setOutgoingWindow(Integer.MAX_VALUE); session.open(); diff --git a/pom.xml b/pom.xml index 18d530ee..d471add3 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ 0.31.0 4.12 1.7.0 - 1.2.14 + 1.2.15