diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index 95de53ca..527e905e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -34,7 +34,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.thrift.protocol.TMessage; import org.slf4j.Logger; @@ -81,7 +80,7 @@ public void initChannel(SocketChannel ch) { } }); - this.firstRecentTimedOutMs = new AtomicLong(0); + failureDetector = new SessionFailureDetector(); } // You can specify a message response filter with constructor or with "setMessageResponseFilter" @@ -296,8 +295,7 @@ void markSessionDisconnect() { } // Notify the RPC sender if failure occurred. - void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTask) - throws Exception { + void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTask) { logger.debug( "{}: {} is notified with error {}, isTimeoutTask {}", name(), @@ -309,24 +307,18 @@ void tryNotifyFailureWithSeqID(int seqID, error_types errno, boolean isTimeoutTa if (!isTimeoutTask && entry.timeoutTask != null) { entry.timeoutTask.cancel(true); } + // The error must be ERR_TIMEOUT or ERR_SESSION_RESET if (errno == error_types.ERR_TIMEOUT) { - long firstTs = firstRecentTimedOutMs.get(); - if (firstTs == 0) { - // it is the first timeout in the window. - firstRecentTimedOutMs.set(System.currentTimeMillis()); - } else if (System.currentTimeMillis() - firstTs >= sessionResetTimeWindowMs) { - // ensure that closeSession() will be invoked only once. - if (firstRecentTimedOutMs.compareAndSet(firstTs, 0)) { - logger.warn( - "{}: actively close the session because it's not responding for {} seconds", - name(), - sessionResetTimeWindowMs / 1000); - closeSession(); // maybe fail when the session is already disconnected. - errno = error_types.ERR_SESSION_RESET; - } + if (failureDetector.markTimeout()) { + logger.warn( + "{}: actively close the session because it's not responding for {} seconds", + name(), + SessionFailureDetector.FAILURE_DETECT_WINDOW_MS / 1000); + closeSession(); // maybe fail when the session is already disconnected. + errno = error_types.ERR_SESSION_RESET; } } else { - firstRecentTimedOutMs.set(0); + failureDetector.markOK(); } entry.op.rpc_error.errno = errno; entry.callback.run(); @@ -397,7 +389,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void channelRead0(ChannelHandlerContext ctx, final RequestEntry msg) { logger.debug("{}: handle response with seqid({})", name(), msg.sequenceId); - firstRecentTimedOutMs.set(0); // This session is currently healthy. + failureDetector.markOK(); // This session is currently healthy. if (msg.callback != null) { msg.callback.run(); } else { @@ -446,12 +438,7 @@ static final class VolatileFields { private EventLoopGroup rpcGroup; private ReplicaSessionInterceptorManager interceptorManager; - // Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs` - // are timed out, in that case we suspect that the server is unavailable. - - // Timestamp of the first timed out rpc. - private AtomicLong firstRecentTimedOutMs; - private static final long sessionResetTimeWindowMs = 10 * 1000; // 10s + private SessionFailureDetector failureDetector; private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSession.class); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java new file mode 100644 index 00000000..a31fef87 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/SessionFailureDetector.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.xiaomi.infra.pegasus.rpc.async; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * SessionFailureDetector detects whether the session is half-closed by the remote host, in which + * case we need to actively close the session and reconnect. + */ +public class SessionFailureDetector { + + // Timestamp of the first timed out rpc. + private AtomicLong firstRecentTimedOutMs; + + // Session is marked failure if all the RPCs across + // `FAILURE_DETECT_WINDOW_MS` are timed out. + public static final long FAILURE_DETECT_WINDOW_MS = 10 * 1000; // 10s + + public SessionFailureDetector() { + this.firstRecentTimedOutMs = new AtomicLong(0); + } + + /** @return true if session is confirmed to be failed. */ + public boolean markTimeout() { + // The error must be ERR_TIMEOUT or ERR_SESSION_RESET + long firstTs = firstRecentTimedOutMs.get(); + if (firstTs == 0) { + // it is the first timeout in the window. + firstRecentTimedOutMs.set(System.currentTimeMillis()); + } else if (System.currentTimeMillis() - firstTs >= FAILURE_DETECT_WINDOW_MS) { + // ensure that session will be closed only once. + return firstRecentTimedOutMs.compareAndSet(firstTs, 0); + } + return false; + } + + /** Mark this session to be healthy. */ + public void markOK() { + firstRecentTimedOutMs.set(0); + } +}