From d13c3c8c9e3f0bb5991d0de33bb3dbb92f98f5e2 Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Tue, 15 Oct 2024 11:22:38 +0000 Subject: [PATCH 1/9] grpc-okhttp: Removed Annotation SuppressWarnings("GuardedBy") --- .../io/grpc/okhttp/OkHttpClientStream.java | 89 ++++++++++--------- .../io/grpc/okhttp/OkHttpClientTransport.java | 2 +- 2 files changed, 48 insertions(+), 43 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 9d9fe160715..189b69e1f71 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -147,7 +147,7 @@ public void writeHeaders(Metadata metadata, byte[] payload) { useGet = true; defaultPath += "?" + BaseEncoding.base64().encode(payload); } - synchronized (state.lock) { + synchronized(OkHttpClientStream.this.state.lock) { state.streamReady(metadata, defaultPath); } } @@ -178,7 +178,7 @@ public void writeFrame( @Override public void cancel(Status reason) { try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.cancel")) { - synchronized (state.lock) { + synchronized(OkHttpClientStream.this.state.lock) { state.cancel(reason, true, null); } } @@ -236,16 +236,17 @@ public TransportState( tag = PerfMark.createTag(methodName); } - @SuppressWarnings("GuardedBy") + //@SuppressWarnings("GuardedBy") @GuardedBy("lock") public void start(int streamId) { - checkState(id == ABSENT_ID, "the stream has been started with id %s", streamId); - id = streamId; - outboundFlowState = outboundFlow.createState(this, streamId); - // TODO(b/145386688): This access should be guarded by 'OkHttpClientStream.this.state.lock'; - // instead found: 'this.lock' - state.onStreamAllocated(); - + synchronized(OkHttpClientStream.this.state.lock) { + checkState(id == ABSENT_ID, "the stream has been started with id %s", streamId); + id = streamId; + outboundFlowState = outboundFlow.createState(this, streamId); + // TODO(b/145386688): This access should be guarded by 'OkHttpClientStream.this.state.lock'; + // instead found: 'this.lock' + state.onStreamAllocated(); + } if (canStart) { // Only happens when the stream has neither been started nor cancelled. frameWriter.synStream(useGet, false, id, 0, requestHeaders); @@ -352,28 +353,30 @@ private void onEndOfStream() { } } - @SuppressWarnings("GuardedBy") + //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void cancel(Status reason, boolean stopDelivery, Metadata trailers) { - if (cancelSent) { - return; - } - cancelSent = true; - if (canStart) { - // stream is pending. - // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found: - // 'this.lock' - transport.removePendingStream(OkHttpClientStream.this); - // release holding data, so they can be GCed or returned to pool earlier. - requestHeaders = null; - pendingData.clear(); - canStart = false; - transportReportStatus(reason, true, trailers != null ? trailers : new Metadata()); - } else { - // If pendingData is null, start must have already been called, which means synStream has - // been called as well. - transport.finishStream( - id(), reason, PROCESSED, stopDelivery, ErrorCode.CANCEL, trailers); + synchronized(transport.lock) { + if (cancelSent) { + return; + } + cancelSent = true; + if (canStart) { + // stream is pending. + // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found: + // 'this.lock' + transport.removePendingStream(OkHttpClientStream.this); + // release holding data, so they can be GCed or returned to pool earlier. + requestHeaders = null; + pendingData.clear(); + canStart = false; + transportReportStatus(reason, true, trailers != null ? trailers : new Metadata()); + } else { + // If pendingData is null, start must have already been called, which means synStream has + // been called as well. + transport.finishStream( + id(), reason, PROCESSED, stopDelivery, ErrorCode.CANCEL, trailers); + } } } @@ -396,20 +399,22 @@ private void sendBuffer(Buffer buffer, boolean endOfStream, boolean flush) { } } - @SuppressWarnings("GuardedBy") + //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void streamReady(Metadata metadata, String path) { - requestHeaders = - Headers.createRequestHeaders( - metadata, - path, - authority, - userAgent, - useGet, - transport.isUsingPlaintext()); - // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found: - // 'this.lock' - transport.streamReadyToStart(OkHttpClientStream.this); + synchronized (transport.lock) { + requestHeaders = + Headers.createRequestHeaders( + metadata, + path, + authority, + userAgent, + useGet, + transport.isUsingPlaintext()); + // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found: + // 'this.lock' + transport.streamReadyToStart(OkHttpClientStream.this); + } } Tag tag() { diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 2f6b836dc3a..871ffbd0c4b 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -156,7 +156,7 @@ private static Map buildErrorCodeToStatusMap() { @GuardedBy("lock") private ExceptionHandlingFrameWriter frameWriter; private OutboundFlowController outboundFlow; - private final Object lock = new Object(); + final Object lock = new Object(); private final InternalLogId logId; @GuardedBy("lock") private int nextStreamId; From 2274a7781af173cd5bcd7eb71cc8b0110a63ddce Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Wed, 16 Oct 2024 08:40:23 +0000 Subject: [PATCH 2/9] grpc-okhttp: Removed Annotation SuppressWarnings("GuardedBy") --- .../main/java/io/grpc/okhttp/OkHttpClientStream.java | 2 +- .../java/io/grpc/okhttp/OkHttpClientTransport.java | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 189b69e1f71..47f197dfc6d 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -188,7 +188,7 @@ public void cancel(Status reason) { class TransportState extends Http2ClientStreamTransportState implements OutboundFlowController.Stream { private final int initialWindowSize; - private final Object lock; + final Object lock; @GuardedBy("lock") private List
requestHeaders; @GuardedBy("lock") diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 871ffbd0c4b..a57e99df3e6 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -429,7 +429,7 @@ void streamReadyToStart(OkHttpClientStream clientStream) { } } - @SuppressWarnings("GuardedBy") + //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void startStream(OkHttpClientStream stream) { Preconditions.checkState( @@ -438,7 +438,9 @@ private void startStream(OkHttpClientStream stream) { setInUse(stream); // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead // found: 'this.lock' - stream.transportState().start(nextStreamId); + synchronized (stream.transportState().lock) { + stream.transportState().start(nextStreamId); + } // For unary and server streaming, there will be a data frame soon, no need to flush the header. if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING) || stream.useGet()) { @@ -1182,7 +1184,7 @@ public void run() { /** * Handle an HTTP2 DATA frame. */ - @SuppressWarnings("GuardedBy") + //@SuppressWarnings("GuardedBy") @Override public void data(boolean inFinished, int streamId, BufferedSource in, int length, int paddedLength) @@ -1208,7 +1210,7 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length buf.write(in.getBuffer(), length); PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data", stream.transportState().tag()); - synchronized (lock) { + synchronized (stream.transportState().lock) { // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; // instead found: 'OkHttpClientTransport.this.lock' stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length); From 3d0d34ce72368508fed19e8199dd51e3f74c59bc Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Wed, 16 Oct 2024 11:34:26 +0000 Subject: [PATCH 3/9] grpc-cronet: Removed Annotation SuppressWarnings("GuardedBy") --- .../java/io/grpc/cronet/CronetClientTransport.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java index b0b18620d0c..bdd3738316b 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java @@ -151,13 +151,17 @@ public void run() { return new StartCallback().clientStream; } - @SuppressWarnings("GuardedBy") + //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void startStream(CronetClientStream stream) { - streams.add(stream); - // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead - // found: 'this.lock' - stream.transportState().start(streamFactory); + synchronized (lock) { + streams.add(stream); + // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead + // found: 'this.lock' + synchronized (stream.transportState().lock) { + stream.transportState().start(streamFactory); + } + } } @Override From ae2865a96e94c0adbcfe0a0eea83eabc1074aa23 Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Wed, 16 Oct 2024 15:22:42 +0000 Subject: [PATCH 4/9] grpc-okhttp: Fixed checkstyle issues --- .../main/java/io/grpc/okhttp/OkHttpClientStream.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 47f197dfc6d..791882eb2e3 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -147,7 +147,7 @@ public void writeHeaders(Metadata metadata, byte[] payload) { useGet = true; defaultPath += "?" + BaseEncoding.base64().encode(payload); } - synchronized(OkHttpClientStream.this.state.lock) { + synchronized (OkHttpClientStream.this.state.lock) { state.streamReady(metadata, defaultPath); } } @@ -178,7 +178,7 @@ public void writeFrame( @Override public void cancel(Status reason) { try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.cancel")) { - synchronized(OkHttpClientStream.this.state.lock) { + synchronized (OkHttpClientStream.this.state.lock) { state.cancel(reason, true, null); } } @@ -239,7 +239,7 @@ public TransportState( //@SuppressWarnings("GuardedBy") @GuardedBy("lock") public void start(int streamId) { - synchronized(OkHttpClientStream.this.state.lock) { + synchronized (OkHttpClientStream.this.state.lock) { checkState(id == ABSENT_ID, "the stream has been started with id %s", streamId); id = streamId; outboundFlowState = outboundFlow.createState(this, streamId); @@ -356,15 +356,15 @@ private void onEndOfStream() { //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void cancel(Status reason, boolean stopDelivery, Metadata trailers) { - synchronized(transport.lock) { + synchronized (transport.lock) { if (cancelSent) { return; } cancelSent = true; if (canStart) { // stream is pending. - // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found: - // 'this.lock' + // TODO(b/145386688): This access should be guarded by 'this.transport.lock' + // instead found: 'this.lock' transport.removePendingStream(OkHttpClientStream.this); // release holding data, so they can be GCed or returned to pool earlier. requestHeaders = null; From 8cbadd9d7ada4369bddfa3737b57956354ba429e Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Fri, 18 Oct 2024 09:06:52 +0000 Subject: [PATCH 5/9] okHttp: Removed Annotation @SuppressWarnings("GuardedBy") --- .../java/io/grpc/okhttp/OkHttpClientStream.java | 4 ++-- .../io/grpc/okhttp/OkHttpClientTransport.java | 15 ++++++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 791882eb2e3..372614cb733 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -356,7 +356,7 @@ private void onEndOfStream() { //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void cancel(Status reason, boolean stopDelivery, Metadata trailers) { - synchronized (transport.lock) { + synchronized (OkHttpClientTransport.lock) { if (cancelSent) { return; } @@ -402,7 +402,7 @@ private void sendBuffer(Buffer buffer, boolean endOfStream, boolean flush) { //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void streamReady(Metadata metadata, String path) { - synchronized (transport.lock) { + synchronized (OkHttpClientTransport.lock) { requestHeaders = Headers.createRequestHeaders( metadata, diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index a57e99df3e6..03c797d83dc 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -156,7 +156,7 @@ private static Map buildErrorCodeToStatusMap() { @GuardedBy("lock") private ExceptionHandlingFrameWriter frameWriter; private OutboundFlowController outboundFlow; - final Object lock = new Object(); + static Object lock = new Object(); private final InternalLogId logId; @GuardedBy("lock") private int nextStreamId; @@ -1230,7 +1230,8 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length /** * Handle HTTP2 HEADER and CONTINUATION frames. */ - @SuppressWarnings("GuardedBy") + //@SuppressWarnings("GuardedBy") + @GuardedBy("lock") @Override public void headers(boolean outFinished, boolean inFinished, @@ -1253,15 +1254,19 @@ public void headers(boolean outFinished, metadataSize)); } } - synchronized (lock) { - OkHttpClientStream stream = streams.get(streamId); + OkHttpClientStream stream; + synchronized (OkHttpClientTransport.lock) { + stream = streams.get(streamId); if (stream == null) { if (mayHaveCreatedStream(streamId)) { frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED); } else { unknownStream = true; } - } else { + } + } + if (stream != null) { + synchronized (stream.transportState().lock) { if (failedStatus == null) { PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers", stream.transportState().tag()); From 8181b6c5cd8fd13d95c40cf219614553e15001c1 Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Mon, 21 Oct 2024 06:09:13 +0000 Subject: [PATCH 6/9] okHttp: Fixed the failing UT's --- .../main/java/io/grpc/okhttp/OkHttpClientTransport.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 03c797d83dc..cd6bd6a97ba 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -1231,7 +1231,6 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length * Handle HTTP2 HEADER and CONTINUATION frames. */ //@SuppressWarnings("GuardedBy") - @GuardedBy("lock") @Override public void headers(boolean outFinished, boolean inFinished, @@ -1274,8 +1273,10 @@ public void headers(boolean outFinished, // instead found: 'OkHttpClientTransport.this.lock' stream.transportState().transportHeadersReceived(headerBlock, inFinished); } else { - if (!inFinished) { - frameWriter.rstStream(streamId, ErrorCode.CANCEL); + synchronized (OkHttpClientTransport.lock) { + if (!inFinished) { + frameWriter.rstStream(streamId, ErrorCode.CANCEL); + } } stream.transportState().transportReportStatus(failedStatus, false, new Metadata()); } From ffbbb1e3cf0b4351ddc1fc8c56e717c577c8c0b7 Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Mon, 21 Oct 2024 09:17:16 +0000 Subject: [PATCH 7/9] okHttp: Code cleanup --- .../main/java/io/grpc/cronet/CronetClientTransport.java | 3 --- .../src/main/java/io/grpc/okhttp/OkHttpClientStream.java | 9 --------- .../main/java/io/grpc/okhttp/OkHttpClientTransport.java | 9 --------- 3 files changed, 21 deletions(-) diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java index bdd3738316b..1681139ccaa 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java @@ -151,13 +151,10 @@ public void run() { return new StartCallback().clientStream; } - //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void startStream(CronetClientStream stream) { synchronized (lock) { streams.add(stream); - // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead - // found: 'this.lock' synchronized (stream.transportState().lock) { stream.transportState().start(streamFactory); } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 372614cb733..654e995bc5c 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -236,15 +236,12 @@ public TransportState( tag = PerfMark.createTag(methodName); } - //@SuppressWarnings("GuardedBy") @GuardedBy("lock") public void start(int streamId) { synchronized (OkHttpClientStream.this.state.lock) { checkState(id == ABSENT_ID, "the stream has been started with id %s", streamId); id = streamId; outboundFlowState = outboundFlow.createState(this, streamId); - // TODO(b/145386688): This access should be guarded by 'OkHttpClientStream.this.state.lock'; - // instead found: 'this.lock' state.onStreamAllocated(); } if (canStart) { @@ -353,7 +350,6 @@ private void onEndOfStream() { } } - //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void cancel(Status reason, boolean stopDelivery, Metadata trailers) { synchronized (OkHttpClientTransport.lock) { @@ -363,8 +359,6 @@ private void cancel(Status reason, boolean stopDelivery, Metadata trailers) { cancelSent = true; if (canStart) { // stream is pending. - // TODO(b/145386688): This access should be guarded by 'this.transport.lock' - // instead found: 'this.lock' transport.removePendingStream(OkHttpClientStream.this); // release holding data, so they can be GCed or returned to pool earlier. requestHeaders = null; @@ -399,7 +393,6 @@ private void sendBuffer(Buffer buffer, boolean endOfStream, boolean flush) { } } - //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void streamReady(Metadata metadata, String path) { synchronized (OkHttpClientTransport.lock) { @@ -411,8 +404,6 @@ private void streamReady(Metadata metadata, String path) { userAgent, useGet, transport.isUsingPlaintext()); - // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found: - // 'this.lock' transport.streamReadyToStart(OkHttpClientStream.this); } } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index cd6bd6a97ba..fe0a0d26337 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -429,15 +429,12 @@ void streamReadyToStart(OkHttpClientStream clientStream) { } } - //@SuppressWarnings("GuardedBy") @GuardedBy("lock") private void startStream(OkHttpClientStream stream) { Preconditions.checkState( stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned"); streams.put(nextStreamId, stream); setInUse(stream); - // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead - // found: 'this.lock' synchronized (stream.transportState().lock) { stream.transportState().start(nextStreamId); } @@ -1184,7 +1181,6 @@ public void run() { /** * Handle an HTTP2 DATA frame. */ - //@SuppressWarnings("GuardedBy") @Override public void data(boolean inFinished, int streamId, BufferedSource in, int length, int paddedLength) @@ -1211,8 +1207,6 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data", stream.transportState().tag()); synchronized (stream.transportState().lock) { - // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; - // instead found: 'OkHttpClientTransport.this.lock' stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length); } } @@ -1230,7 +1224,6 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length /** * Handle HTTP2 HEADER and CONTINUATION frames. */ - //@SuppressWarnings("GuardedBy") @Override public void headers(boolean outFinished, boolean inFinished, @@ -1269,8 +1262,6 @@ public void headers(boolean outFinished, if (failedStatus == null) { PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers", stream.transportState().tag()); - // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; - // instead found: 'OkHttpClientTransport.this.lock' stream.transportState().transportHeadersReceived(headerBlock, inFinished); } else { synchronized (OkHttpClientTransport.lock) { From 003b8789b73606fb7597c77480e26b1905c40b8e Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Mon, 21 Oct 2024 12:54:07 +0000 Subject: [PATCH 8/9] internal: Removed Annotation @SuppressWarnings("GuardedBy") from RetriableStream --- .../io/grpc/internal/RetriableStream.java | 54 ++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index ba9424ea25c..e8cb79e9561 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -149,7 +149,6 @@ public void uncaughtException(Thread t, Throwable e) { this.throttle = throttle; } - @SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok @Nullable // null if already committed @CheckReturnValue private Runnable commit(final Substream winningSubstream) { @@ -164,21 +163,26 @@ private Runnable commit(final Substream winningSubstream) { // subtract the share of this RPC from channelBufferUsed. channelBufferUsed.addAndGet(-perRpcBufferUsed); - final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false; + final boolean wasCancelled; final Future retryFuture; - if (scheduledRetry != null) { - retryFuture = scheduledRetry.markCancelled(); - scheduledRetry = null; - } else { - retryFuture = null; + synchronized (scheduledRetry.lock) { + wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false; + if (scheduledRetry != null) { + retryFuture = scheduledRetry.markCancelled(); + scheduledRetry = null; + } else { + retryFuture = null; + } } // cancel the scheduled hedging if it is scheduled prior to the commitment final Future hedgingFuture; - if (scheduledHedging != null) { - hedgingFuture = scheduledHedging.markCancelled(); - scheduledHedging = null; - } else { - hedgingFuture = null; + synchronized (scheduledHedging.lock) { + if (scheduledHedging != null) { + hedgingFuture = scheduledHedging.markCancelled(); + scheduledHedging = null; + } else { + hedgingFuture = null; + } } class CommitTask implements Runnable { @@ -425,7 +429,7 @@ public final void start(ClientStreamListener listener) { drain(substream); } - @SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok + @GuardedBy("lock") private void pushbackHedging(@Nullable Integer delayMillis) { if (delayMillis == null) { return; @@ -439,7 +443,7 @@ private void pushbackHedging(@Nullable Integer delayMillis) { FutureCanceller future; Future futureToBeCancelled; - synchronized (lock) { + synchronized (scheduledHedging.lock) { if (scheduledHedging == null) { return; } @@ -477,23 +481,24 @@ public void run() { } callExecutor.execute( new Runnable() { - @SuppressWarnings("GuardedBy") //TODO(b/145386688) lock==ScheduledCancellor.lock so ok @Override public void run() { boolean cancelled = false; FutureCanceller future = null; - synchronized (lock) { + synchronized (scheduledHedgingRef.lock) { if (scheduledHedgingRef.isCancelled()) { cancelled = true; } else { state = state.addActiveHedge(newSubstream); - if (hasPotentialHedging(state) - && (throttle == null || throttle.isAboveThreshold())) { - scheduledHedging = future = new FutureCanceller(lock); - } else { - state = state.freezeHedging(); - scheduledHedging = null; + synchronized (RetriableStream.this.lock) { + if (hasPotentialHedging(state) + && (throttle == null || throttle.isAboveThreshold())) { + scheduledHedging = future = new FutureCanceller(lock); + } else { + state = state.freezeHedging(); + scheduledHedging = null; + } } } } @@ -815,10 +820,10 @@ private boolean hasPotentialHedging(State state) { && !state.hedgingFrozen; } - @SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok + @GuardedBy("lock") private void freezeHedging() { Future futureToBeCancelled = null; - synchronized (lock) { + synchronized (scheduledHedging.lock) { if (scheduledHedging != null) { futureToBeCancelled = scheduledHedging.markCancelled(); scheduledHedging = null; @@ -891,6 +896,7 @@ public void run() { } } + @GuardedBy("lock") @Override public void closed( final Status status, final RpcProgress rpcProgress, final Metadata trailers) { From c061f2a736b92fe6bc6627cec3ba201989489c22 Mon Sep 17 00:00:00 2001 From: vinodhabib Date: Tue, 22 Oct 2024 12:53:53 +0000 Subject: [PATCH 9/9] internal: Removed Annotation @SuppressWarnings("GuardedBy") from RetriableStream --- .../io/grpc/internal/RetriableStream.java | 116 +++++++++--------- 1 file changed, 61 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index e8cb79e9561..ba31a8420b3 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -86,7 +86,7 @@ public void uncaughtException(Thread t, Throwable e) { private final boolean isHedging; /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */ - private final Object lock = new Object(); + static final Object lock = new Object(); private final ChannelBufferMeter channelBufferUsed; private final long perRpcBufferLimit; @@ -151,8 +151,9 @@ public void uncaughtException(Thread t, Throwable e) { @Nullable // null if already committed @CheckReturnValue + @GuardedBy("lock") private Runnable commit(final Substream winningSubstream) { - synchronized (lock) { + synchronized (RetriableStream.lock) { if (state.winningSubstream != null) { return null; } @@ -238,6 +239,7 @@ public void run() { * For a failed/closed winning stream, the last closed stream closes the master listener, and * callExecutor scheduling happens-before that. */ + @GuardedBy("lock") private void commitAndRun(Substream winningSubstream) { Runnable postCommitTask = commit(winningSubstream); @@ -389,44 +391,44 @@ public void runWith(Substream substream) { /** Starts the first PRC attempt. */ @Override public final void start(ClientStreamListener listener) { - masterListener = listener; + synchronized (RetriableStream.lock) { + masterListener = listener; + Status shutdownStatus = prestart(); - Status shutdownStatus = prestart(); - - if (shutdownStatus != null) { - cancel(shutdownStatus); - return; - } - - synchronized (lock) { - state.buffer.add(new StartEntry()); - } - - Substream substream = createSubstream(0, false); - if (substream == null) { - return; - } - if (isHedging) { - FutureCanceller scheduledHedgingRef = null; + if (shutdownStatus != null) { + cancel(shutdownStatus); + return; + } synchronized (lock) { - state = state.addActiveHedge(substream); - if (hasPotentialHedging(state) - && (throttle == null || throttle.isAboveThreshold())) { - scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock); - } + state.buffer.add(new StartEntry()); } - if (scheduledHedgingRef != null) { - scheduledHedgingRef.setFuture( - scheduledExecutorService.schedule( - new HedgingRunnable(scheduledHedgingRef), - hedgingPolicy.hedgingDelayNanos, - TimeUnit.NANOSECONDS)); + Substream substream = createSubstream(0, false); + if (substream == null) { + return; } - } + if (isHedging) { + FutureCanceller scheduledHedgingRef = null; + + synchronized (lock) { + state = state.addActiveHedge(substream); + if (hasPotentialHedging(state) + && (throttle == null || throttle.isAboveThreshold())) { + scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock); + } + } - drain(substream); + if (scheduledHedgingRef != null) { + scheduledHedgingRef.setFuture( + scheduledExecutorService.schedule( + new HedgingRunnable(scheduledHedgingRef), + hedgingPolicy.hedgingDelayNanos, + TimeUnit.NANOSECONDS)); + } + } + drain(substream); + } } @GuardedBy("lock") @@ -491,7 +493,7 @@ public void run() { cancelled = true; } else { state = state.addActiveHedge(newSubstream); - synchronized (RetriableStream.this.lock) { + synchronized (RetriableStream.lock) { if (hasPotentialHedging(state) && (throttle == null || throttle.isAboveThreshold())) { scheduledHedging = future = new FutureCanceller(lock); @@ -522,33 +524,36 @@ public void run() { } } + @GuardedBy("lock") @Override public final void cancel(final Status reason) { - Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); - noopSubstream.stream = new NoopClientStream(); - Runnable runnable = commit(noopSubstream); + synchronized (RetriableStream.lock) { + Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */); + noopSubstream.stream = new NoopClientStream(); + Runnable runnable = commit(noopSubstream); + + if (runnable != null) { + synchronized (lock) { + state = state.substreamDrained(noopSubstream); + } + runnable.run(); + safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata()); + return; + } - if (runnable != null) { + Substream winningSubstreamToCancel = null; synchronized (lock) { - state = state.substreamDrained(noopSubstream); + if (state.drainedSubstreams.contains(state.winningSubstream)) { + winningSubstreamToCancel = state.winningSubstream; + } else { // the winningSubstream will be cancelled while draining + cancellationStatus = reason; + } + state = state.cancelled(); } - runnable.run(); - safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata()); - return; - } - - Substream winningSubstreamToCancel = null; - synchronized (lock) { - if (state.drainedSubstreams.contains(state.winningSubstream)) { - winningSubstreamToCancel = state.winningSubstream; - } else { // the winningSubstream will be cancelled while draining - cancellationStatus = reason; + if (winningSubstreamToCancel != null) { + winningSubstreamToCancel.stream.cancel(reason); } - state = state.cancelled(); - } - if (winningSubstreamToCancel != null) { - winningSubstreamToCancel.stream.cancel(reason); - } + } } private void delayOrExecute(BufferEntry bufferEntry) { @@ -875,6 +880,7 @@ private final class Sublistener implements ClientStreamListener { this.substream = substream; } + @GuardedBy("lock") @Override public void headersRead(final Metadata headers) { if (substream.previousAttemptCount > 0) {