Skip to content

Commit

Permalink
xds:Move creating the retry timer in handleRpcStreamClosed to as late…
Browse files Browse the repository at this point in the history
… as possible and call close() (#11776)

* Move creating the retry timer in handleRpcStreamClosed to as late as possible and call `close` so that the `call` is cancelled.
Also add some debug logging.
  • Loading branch information
larry-safran authored Jan 6, 2025
1 parent 6c12c2b commit 4222f77
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
10 changes: 10 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
import io.perfmark.PerfMark;
import io.perfmark.TaskCloseable;
import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;

/**
* The stream and stream state as used by the application. Must only be called from the sending
* application thread.
*/
public abstract class AbstractStream implements Stream {
private static final Logger log = Logger.getLogger(AbstractStream.class.getName());

/** The framer to use for sending messages. */
protected abstract Framer framer();

Expand Down Expand Up @@ -371,6 +375,12 @@ private void notifyIfReady() {
boolean doNotify;
synchronized (onReadyLock) {
doNotify = isReady();
if (!doNotify && log.isLoggable(Level.FINEST)) {
log.log(Level.FINEST,
"Stream not ready so skip notifying listener.\n"
+ "details: allocated/deallocated:{0}/{3}, sent queued: {1}, ready thresh: {2}",
new Object[] {allocated, numSentBytesQueued, onReadyThreshold, deallocated});
}
}
if (doNotify) {
listener().onReady();
Expand Down
22 changes: 10 additions & 12 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,6 @@ private void handleRpcStreamClosed(Status status) {
stopwatch.reset();
}

// FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
// to avoid TSAN races, since tests may wait until callbacks are called but then would run
// concurrently with the stopwatch and schedule.

long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);

rpcRetryTimer =
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);

Status newStatus = status;
if (responseReceived) {
// A closed ADS stream after a successful response is not considered an error. Servers may
Expand Down Expand Up @@ -490,9 +480,17 @@ private void handleRpcStreamClosed(Status status) {
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
}

closed = true;
close(newStatus.asException());

// FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
// to avoid TSAN races, since tests may wait until callbacks are called but then would run
// concurrently with the stopwatch and schedule.
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
rpcRetryTimer =
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);

xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived);
cleanUp();
}

private void close(Exception error) {
Expand Down
5 changes: 5 additions & 0 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ private Set<String> getResourceKeys(XdsResourceType<?> xdsResourceType) {
// cpcForThisStream is null when doing shutdown
private void cleanUpResourceTimers(ControlPlaneClient cpcForThisStream) {
Collection<String> authoritiesForCpc = getActiveAuthorities(cpcForThisStream);
String target = cpcForThisStream == null ? "null" : cpcForThisStream.getServerInfo().target();
logger.log(XdsLogLevel.DEBUG, "Cleaning up resource timers for CPC {0}, authorities {1}",
target, authoritiesForCpc);

for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
Expand Down Expand Up @@ -957,6 +960,8 @@ public void handleStreamClosed(Status status, boolean shouldTryFallback) {

ControlPlaneClient cpcClosed = serverCpClientMap.get(serverInfo);
if (cpcClosed == null) {
logger.log(XdsLogLevel.DEBUG,
"Couldn't find closing CPC for {0}, so skipping cleanup and reporting", serverInfo);
return;
}

Expand Down

0 comments on commit 4222f77

Please sign in to comment.