Skip to content
This repository has been archived by the owner on Dec 19, 2017. It is now read-only.

Make number of KeepAlives Configurable #339

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion client/src/main/java/io/atomix/copycat/client/CopycatClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ final class Builder implements io.atomix.catalyst.util.Builder<CopycatClient> {
private Serializer serializer;
private Duration sessionTimeout = Duration.ZERO;
private Duration unstabilityTimeout = Duration.ZERO;
private int keepAlivesPerTimeoutInterval = 2;
private ConnectionStrategy connectionStrategy = ConnectionStrategies.ONCE;
private ServerSelectionStrategy serverSelectionStrategy = ServerSelectionStrategies.ANY;
private RecoveryStrategy recoveryStrategy = RecoveryStrategies.CLOSE;
Expand Down Expand Up @@ -623,6 +624,19 @@ public Builder withSessionTimeout(Duration sessionTimeout) {
return this;
}

/**
* Sets the number of keep-alives to send during a session timeout interval. Default value is 2.
*
* @param keepAlivesPerTimeoutInterval The number of keep-alives to send during a session timeout interval
* @return The client builder.
* @throws IllegalArgumentException if the keepAlivesPerTimeoutInterval is not positive
*/
public Builder withKeepAlivesPerTimeoutInterval(int keepAlivesPerTimeoutInterval) {
Assert.arg(keepAlivesPerTimeoutInterval > 0, "keepAlivesPerTimeoutInterval must be positive");
this.keepAlivesPerTimeoutInterval = keepAlivesPerTimeoutInterval;
return this;
}

/**
* Sets the timeout for session unstability. Client is automatically closed if it can not reach servers within
* the given timeout.
Expand Down Expand Up @@ -711,7 +725,8 @@ public CopycatClient build() {
connectionStrategy,
recoveryStrategy,
sessionTimeout,
unstabilityTimeout
unstabilityTimeout,
keepAlivesPerTimeoutInterval
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class DefaultCopycatClient implements CopycatClient {
private final AddressSelector selector;
private final Duration sessionTimeout;
private final Duration unstabilityTimeout;
private final int keepAlivesPerTimeoutInterval;
private final ConnectionStrategy connectionStrategy;
private final RecoveryStrategy recoveryStrategy;
private ClientSession session;
Expand All @@ -68,7 +69,10 @@ public class DefaultCopycatClient implements CopycatClient {
private final Set<EventListener<?>> eventListeners = new CopyOnWriteArraySet<>();
private Listener<Session.State> changeListener;

DefaultCopycatClient(String clientId, Collection<Address> cluster, Transport transport, ThreadContext ioContext, ThreadContext eventContext, ServerSelectionStrategy selectionStrategy, ConnectionStrategy connectionStrategy, RecoveryStrategy recoveryStrategy, Duration sessionTimeout, Duration unstabilityTimeout) {
DefaultCopycatClient(String clientId, Collection<Address> cluster, Transport transport, ThreadContext ioContext,
ThreadContext eventContext, ServerSelectionStrategy selectionStrategy, ConnectionStrategy
connectionStrategy, RecoveryStrategy recoveryStrategy, Duration sessionTimeout, Duration
unstabilityTimeout, int keepAlivesPerTimeoutInterval) {
this.clientId = Assert.notNull(clientId, "clientId");
this.cluster = Assert.notNull(cluster, "cluster");
this.transport = Assert.notNull(transport, "transport");
Expand All @@ -78,7 +82,8 @@ public class DefaultCopycatClient implements CopycatClient {
this.connectionStrategy = Assert.notNull(connectionStrategy, "connectionStrategy");
this.recoveryStrategy = Assert.notNull(recoveryStrategy, "recoveryStrategy");
this.sessionTimeout = Assert.notNull(sessionTimeout, "sessionTimeout");
this.unstabilityTimeout = Assert.notNull(unstabilityTimeout, "unstabilityTimeout");;
this.unstabilityTimeout = Assert.notNull(unstabilityTimeout, "unstabilityTimeout");
this.keepAlivesPerTimeoutInterval = keepAlivesPerTimeoutInterval;
}

@Override
Expand Down Expand Up @@ -128,7 +133,7 @@ public ThreadContext context() {
*/
private ClientSession newSession() {
ClientSession session = new ClientSession(clientId, transport.client(), selector, ioContext, connectionStrategy, sessionTimeout,
unstabilityTimeout
unstabilityTimeout, keepAlivesPerTimeoutInterval
);

// Update the session change listener.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ public class ClientSession implements Session {
private final ClientSessionListener listener;
private final ClientSessionSubmitter submitter;

public ClientSession(String id, Client client, AddressSelector selector, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, Duration unstabilityTimeout) {
this(new ClientConnection(id, client, selector), new ClientSessionState(id, unstabilityTimeout), context, connectionStrategy, sessionTimeout);
public ClientSession(String id, Client client, AddressSelector selector, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, Duration unstabilityTimeout, int keepalivesPerTimeoutInterval) {
this(new ClientConnection(id, client, selector), new ClientSessionState(id, unstabilityTimeout), context, connectionStrategy, sessionTimeout, keepalivesPerTimeoutInterval);
}

private ClientSession(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout) {
private ClientSession(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, int keepalivesPerTimeoutInterval) {
this.connection = Assert.notNull(connection, "connection");
this.state = Assert.notNull(state, "state");
this.manager = new ClientSessionManager(connection, state, context, connectionStrategy, sessionTimeout);
this.manager = new ClientSessionManager(connection, state, context, connectionStrategy, sessionTimeout, keepalivesPerTimeoutInterval);
ClientSequencer sequencer = new ClientSequencer(state);
this.listener = new ClientSessionListener(connection, state, sequencer, context);
this.submitter = new ClientSessionSubmitter(connection, state, sequencer, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ final class ClientSessionManager {
private final ThreadContext context;
private final ConnectionStrategy strategy;
private final Duration sessionTimeout;
private final int keepAlivesPerTimeoutInterval;
private Duration interval;
private Scheduled keepAlive;

ClientSessionManager(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout) {
ClientSessionManager(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, int keepAlivesPerTimeoutInterval) {
this.connection = Assert.notNull(connection, "connection");
this.state = Assert.notNull(state, "state");
this.context = Assert.notNull(context, "context");
this.strategy = Assert.notNull(connectionStrategy, "connectionStrategy");
this.sessionTimeout = Assert.notNull(sessionTimeout, "sessionTimeout");
this.keepAlivesPerTimeoutInterval = Assert.arg(keepAlivesPerTimeoutInterval, keepAlivesPerTimeoutInterval > 0, "keepAlivesPerTimeoutInterval");
}

/**
Expand Down Expand Up @@ -94,7 +96,7 @@ private void register(RegisterAttempt attempt) {
if (error == null) {
state.getLogger().trace("Received {}", response);
if (response.status() == Response.Status.OK) {
interval = Duration.ofMillis(response.timeout()).dividedBy(2);
interval = Duration.ofMillis(response.timeout()).dividedBy(keepAlivesPerTimeoutInterval);
connection.reset(response.leader(), response.members());
state.setSessionId(response.session())
.setState(Session.State.OPEN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testSessionRegisterUnregister() throws Throwable {
Executor executor = new MockExecutor();
when(context.executor()).thenReturn(executor);

ClientSessionManager manager = new ClientSessionManager(connection, state, context, ConnectionStrategies.EXPONENTIAL_BACKOFF, Duration.ZERO);
ClientSessionManager manager = new ClientSessionManager(connection, state, context, ConnectionStrategies.EXPONENTIAL_BACKOFF, Duration.ZERO, 2);
manager.open().join();

assertEquals(state.getSessionId(), 1);
Expand Down