From 6337327fef5db873edb2f9c1e549756f45100599 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Fri, 20 Sep 2024 18:49:44 -0700 Subject: [PATCH] core: reduce extra onReady() from hedging. --- .../main/java/io/grpc/internal/RetriableStream.java | 8 ++++---- .../java/io/grpc/internal/RetriableStreamTest.java | 10 ++-------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index ba9424ea25c..44f69d80562 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -311,7 +311,7 @@ private void drain(Substream substream) { } if (index == savedState.buffer.size()) { // I'm drained state = savedState.substreamDrained(substream); - if (!isReady()) { + if (!substream.stream.isReady()) { return; } onReadyRunnable = new Runnable() { @@ -1131,15 +1131,15 @@ public void run() { @Override public void onReady() { - // FIXME(#7089): hedging case is broken. - if (!isReady()) { + if (!substream.stream.isReady() + || (state.winningSubstream != null && state.winningSubstream != substream)) { return; } listenerSerializeExecutor.execute( new Runnable() { @Override public void run() { - if (!isClosed) { + if (!isClosed && substream.stream.isReady()) { masterListener.onReady(); } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 21ec46668fc..59266ac0a4b 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -2150,7 +2150,6 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream2).writeMessage(any(InputStream.class)); inOrder.verify(mockStream2).flush(); // Memory leak workaround inOrder.verify(mockStream2).request(456); - inOrder.verify(mockStream1).isReady(); inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); @@ -2200,8 +2199,8 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream3).flush(); // Memory leak workaround inOrder.verify(mockStream3).writeMessage(any(InputStream.class)); inOrder.verify(mockStream3).flush(); // Memory leak workaround - inOrder.verify(mockStream1).isReady(); - inOrder.verify(mockStream2).isReady(); + inOrder.verify(mockStream1, never()).isReady(); + inOrder.verify(mockStream2, never()).isReady(); inOrder.verify(mockStream3).isReady(); inOrder.verifyNoMoreInteractions(); @@ -2257,8 +2256,6 @@ public Void answer(InvocationOnMock in) { inOrder.verify(mockStream4).writeMessage(any(InputStream.class)); inOrder.verify(mockStream4).flush(); // Memory leak workaround } - inOrder.verify(mockStream1).isReady(); - inOrder.verify(mockStream2).isReady(); inOrder.verify(mockStream4).isReady(); inOrder.verifyNoMoreInteractions(); @@ -2701,7 +2698,6 @@ public void hedging_cancelled() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); inOrder.verify(mockStream2).start(sublistenerCaptor2.capture()); - inOrder.verify(mockStream1).isReady(); inOrder.verify(mockStream2).isReady(); inOrder.verifyNoMoreInteractions(); @@ -2744,7 +2740,6 @@ public void hedging_perRpcBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream2).start(sublistenerCaptor2.capture()); - verify(mockStream1, times(2)).isReady(); verify(mockStream2).isReady(); ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer; @@ -2786,7 +2781,6 @@ public void hedging_channelBufferLimitExceeded() { ArgumentCaptor sublistenerCaptor2 = ArgumentCaptor.forClass(ClientStreamListener.class); verify(mockStream2).start(sublistenerCaptor2.capture()); - verify(mockStream1, times(2)).isReady(); verify(mockStream2).isReady(); ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer;