Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

okhttp: Removed Annotation SuppressWarnings("GuardedBy") #11620

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
168 changes: 90 additions & 78 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void uncaughtException(Thread t, Throwable e) {
private final boolean isHedging;

/** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
private final Object lock = new Object();
static final Object lock = new Object();

private final ChannelBufferMeter channelBufferUsed;
private final long perRpcBufferLimit;
Expand Down Expand Up @@ -149,11 +149,11 @@ public void uncaughtException(Thread t, Throwable e) {
this.throttle = throttle;
}

@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
@Nullable // null if already committed
@CheckReturnValue
@GuardedBy("lock")
private Runnable commit(final Substream winningSubstream) {
synchronized (lock) {
synchronized (RetriableStream.lock) {
if (state.winningSubstream != null) {
return null;
}
Expand All @@ -164,21 +164,26 @@ private Runnable commit(final Substream winningSubstream) {
// subtract the share of this RPC from channelBufferUsed.
channelBufferUsed.addAndGet(-perRpcBufferUsed);

final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
final boolean wasCancelled;
final Future<?> retryFuture;
if (scheduledRetry != null) {
retryFuture = scheduledRetry.markCancelled();
scheduledRetry = null;
} else {
retryFuture = null;
synchronized (scheduledRetry.lock) {
wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
if (scheduledRetry != null) {
retryFuture = scheduledRetry.markCancelled();
scheduledRetry = null;
} else {
retryFuture = null;
}
}
// cancel the scheduled hedging if it is scheduled prior to the commitment
final Future<?> hedgingFuture;
if (scheduledHedging != null) {
hedgingFuture = scheduledHedging.markCancelled();
scheduledHedging = null;
} else {
hedgingFuture = null;
synchronized (scheduledHedging.lock) {
if (scheduledHedging != null) {
hedgingFuture = scheduledHedging.markCancelled();
scheduledHedging = null;
} else {
hedgingFuture = null;
}
}

class CommitTask implements Runnable {
Expand Down Expand Up @@ -234,6 +239,7 @@ public void run() {
* For a failed/closed winning stream, the last closed stream closes the master listener, and
* callExecutor scheduling happens-before that.
*/
@GuardedBy("lock")
private void commitAndRun(Substream winningSubstream) {
Runnable postCommitTask = commit(winningSubstream);

Expand Down Expand Up @@ -385,47 +391,47 @@ public void runWith(Substream substream) {
/** Starts the first PRC attempt. */
@Override
public final void start(ClientStreamListener listener) {
masterListener = listener;

Status shutdownStatus = prestart();
synchronized (RetriableStream.lock) {
masterListener = listener;
Status shutdownStatus = prestart();

if (shutdownStatus != null) {
cancel(shutdownStatus);
return;
}

synchronized (lock) {
state.buffer.add(new StartEntry());
}

Substream substream = createSubstream(0, false);
if (substream == null) {
return;
}
if (isHedging) {
FutureCanceller scheduledHedgingRef = null;
if (shutdownStatus != null) {
cancel(shutdownStatus);
return;
}

synchronized (lock) {
state = state.addActiveHedge(substream);
if (hasPotentialHedging(state)
&& (throttle == null || throttle.isAboveThreshold())) {
scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
}
state.buffer.add(new StartEntry());
}

if (scheduledHedgingRef != null) {
scheduledHedgingRef.setFuture(
scheduledExecutorService.schedule(
new HedgingRunnable(scheduledHedgingRef),
hedgingPolicy.hedgingDelayNanos,
TimeUnit.NANOSECONDS));
Substream substream = createSubstream(0, false);
if (substream == null) {
return;
}
}
if (isHedging) {
FutureCanceller scheduledHedgingRef = null;

synchronized (lock) {
state = state.addActiveHedge(substream);
if (hasPotentialHedging(state)
&& (throttle == null || throttle.isAboveThreshold())) {
scheduledHedging = scheduledHedgingRef = new FutureCanceller(lock);
}
}

drain(substream);
if (scheduledHedgingRef != null) {
scheduledHedgingRef.setFuture(
scheduledExecutorService.schedule(
new HedgingRunnable(scheduledHedgingRef),
hedgingPolicy.hedgingDelayNanos,
TimeUnit.NANOSECONDS));
}
}
drain(substream);
}
}

@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
@GuardedBy("lock")
private void pushbackHedging(@Nullable Integer delayMillis) {
if (delayMillis == null) {
return;
Expand All @@ -439,7 +445,7 @@ private void pushbackHedging(@Nullable Integer delayMillis) {
FutureCanceller future;
Future<?> futureToBeCancelled;

synchronized (lock) {
synchronized (scheduledHedging.lock) {
if (scheduledHedging == null) {
return;
}
Expand Down Expand Up @@ -477,23 +483,24 @@ public void run() {
}
callExecutor.execute(
new Runnable() {
@SuppressWarnings("GuardedBy") //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
@Override
public void run() {
boolean cancelled = false;
FutureCanceller future = null;

synchronized (lock) {
synchronized (scheduledHedgingRef.lock) {
if (scheduledHedgingRef.isCancelled()) {
cancelled = true;
} else {
state = state.addActiveHedge(newSubstream);
if (hasPotentialHedging(state)
&& (throttle == null || throttle.isAboveThreshold())) {
scheduledHedging = future = new FutureCanceller(lock);
} else {
state = state.freezeHedging();
scheduledHedging = null;
synchronized (RetriableStream.lock) {
if (hasPotentialHedging(state)
&& (throttle == null || throttle.isAboveThreshold())) {
scheduledHedging = future = new FutureCanceller(lock);
} else {
state = state.freezeHedging();
scheduledHedging = null;
}
}
}
}
Expand All @@ -517,33 +524,36 @@ public void run() {
}
}

@GuardedBy("lock")
@Override
public final void cancel(final Status reason) {
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream);
synchronized (RetriableStream.lock) {
Substream noopSubstream = new Substream(0 /* previousAttempts doesn't matter here */);
noopSubstream.stream = new NoopClientStream();
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
synchronized (lock) {
state = state.substreamDrained(noopSubstream);
}
runnable.run();
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
return;
}

if (runnable != null) {
Substream winningSubstreamToCancel = null;
synchronized (lock) {
state = state.substreamDrained(noopSubstream);
if (state.drainedSubstreams.contains(state.winningSubstream)) {
winningSubstreamToCancel = state.winningSubstream;
} else { // the winningSubstream will be cancelled while draining
cancellationStatus = reason;
}
state = state.cancelled();
}
runnable.run();
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
return;
}

Substream winningSubstreamToCancel = null;
synchronized (lock) {
if (state.drainedSubstreams.contains(state.winningSubstream)) {
winningSubstreamToCancel = state.winningSubstream;
} else { // the winningSubstream will be cancelled while draining
cancellationStatus = reason;
if (winningSubstreamToCancel != null) {
winningSubstreamToCancel.stream.cancel(reason);
}
state = state.cancelled();
}
if (winningSubstreamToCancel != null) {
winningSubstreamToCancel.stream.cancel(reason);
}
}
}

private void delayOrExecute(BufferEntry bufferEntry) {
Expand Down Expand Up @@ -815,10 +825,10 @@ private boolean hasPotentialHedging(State state) {
&& !state.hedgingFrozen;
}

@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
@GuardedBy("lock")
private void freezeHedging() {
Future<?> futureToBeCancelled = null;
synchronized (lock) {
synchronized (scheduledHedging.lock) {
if (scheduledHedging != null) {
futureToBeCancelled = scheduledHedging.markCancelled();
scheduledHedging = null;
Expand Down Expand Up @@ -870,6 +880,7 @@ private final class Sublistener implements ClientStreamListener {
this.substream = substream;
}

@GuardedBy("lock")
@Override
public void headersRead(final Metadata headers) {
if (substream.previousAttemptCount > 0) {
Expand All @@ -891,6 +902,7 @@ public void run() {
}
}

@GuardedBy("lock")
@Override
public void closed(
final Status status, final RpcProgress rpcProgress, final Metadata trailers) {
Expand Down
11 changes: 6 additions & 5 deletions cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,14 @@ public void run() {
return new StartCallback().clientStream;
}

@SuppressWarnings("GuardedBy")
@GuardedBy("lock")
private void startStream(CronetClientStream stream) {
streams.add(stream);
// TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
// found: 'this.lock'
stream.transportState().start(streamFactory);
synchronized (lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized (lock) already exists at the caller site (newStream method) and should not be necessary to add.

Copy link
Contributor Author

@vinodhabib vinodhabib Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see a query raised with Eric as below , hope as of now no action required. please confirm?
#6578 (comment)

FYI, I can also see a note in the issue description as below.
If we can't remove them or if it would make the code worse, we can leave them as-is and close this issue.

streams.add(stream);
synchronized (stream.transportState().lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream.transportState().lock is never accessed outside of synchronized(lock) so this change should also not be needed.

Copy link
Contributor Author

@vinodhabib vinodhabib Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see a query raised with Eric as below , hope as of now no action required. please confirm?
#6578 (comment)

Copy link
Contributor Author

@vinodhabib vinodhabib Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shivaspeaks @kannanjgithub I can see Eric replied on last raised query #6578 (comment) as below.

I wasn't suggesting we remove necessary locks. I was suggesting organize the code such that the tooling can verify the locking. stream.transportState().lock is the same object as this.lock, but ErrorProne isn't able to figure that out because it requires data flow analysis.

I have made similar changes in current PR. Kindly suggest how to proceed further on the same.

stream.transportState().start(streamFactory);
}
}
}

@Override
Expand Down
Loading
Loading