Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leave idle state after endpoint becomes available again #307

Merged
merged 5 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public FixedCallOptionsTransport(
this.scheduler = scheduler;
this.callOptions = callOptions;
this.database = database;
this.channel = new GrpcChannel(endpoint, channelFactory, true);
this.channel = new GrpcChannel(endpoint, channelFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public SingleChannelTransport(GrpcTransportBuilder builder) {
logger.info("creating signle channel transport with endpoint {}", endpoint);

this.database = Strings.nullToEmpty(builder.getDatabase());
this.channel = new GrpcChannel(endpoint, channelFactory, true);
this.channel = new GrpcChannel(endpoint, channelFactory);

this.scheduler = builder.getSchedulerFactory().get();
this.callOptions = new AuthCallOptions(scheduler, Arrays.asList(endpoint), channelFactory, builder);
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public class GrpcChannel {
private final long connectTimeoutMs;
private final ReadyWatcher readyWatcher;

public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory, boolean tryToConnect) {
public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory) {
logger.debug("Creating grpc channel with {}", endpoint);
this.endpoint = endpoint;
this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort());
this.connectTimeoutMs = factory.getConnectTimeoutMs();
this.readyWatcher = new ReadyWatcher();
this.readyWatcher.check(tryToConnect);
this.readyWatcher.checkState();
}

public EndpointRecord getEndpoint() {
Expand Down Expand Up @@ -58,7 +58,7 @@ public boolean shutdown() {
}
return closed;
} catch (InterruptedException e) {
logger.warn("transport shutdown interrupted for channel {}: {}", endpoint, e);
logger.warn("transport shutdown interrupted for channel {}: ", endpoint, e);
Thread.currentThread().interrupt();
return false;
} finally {
Expand All @@ -73,24 +73,26 @@ public Channel getReadyChannel() {
try {
return future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
logger.error("Grpc channel {} ready waiting is interrupted", endpoint, ex);
logger.error("Grpc channel {} ready waiting is interrupted: ", endpoint, ex);
Thread.currentThread().interrupt();
} catch (ExecutionException ex) {
logger.error("Grpc channel {} connecting problem", endpoint, ex);
logger.error("Grpc channel {} connecting problem: ", endpoint, ex);
throw new RuntimeException("Channel " + endpoint + " connecting problem", ex);
} catch (TimeoutException ex) {
logger.error("Grpc channel {} connect timeout excided", endpoint);
logger.error("Grpc channel {} connect timeout exceeded", endpoint);
throw new RuntimeException("Channel " + endpoint + " connecting timeout");
}
return null;
}

public void check(boolean tryToConnect) {
ConnectivityState state = channel.getState(tryToConnect);
public void checkState() {
ConnectivityState state = channel.getState(true);
logger.debug("Grpc channel {} new state: {}", endpoint, state);
switch (state) {
case READY:
future.complete(channel);
// keep tracking channel state
channel.notifyWhenStateChanged(state, this);
break;
case SHUTDOWN:
future.completeExceptionally(new IllegalStateException("Grpc channel already closed"));
Expand All @@ -99,15 +101,15 @@ public void check(boolean tryToConnect) {
case CONNECTING:
case IDLE:
default:
// repeat watch
// keep tracking channel state
channel.notifyWhenStateChanged(state, this);
break;
}
}

@Override
public void run() {
check(false);
checkState();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public GrpcChannel getChannel(EndpointRecord endpoint) {

return result != null ? result : channels.computeIfAbsent(endpoint.getHostAndPort(), (key) -> {
logger.debug("channel " + endpoint.getHostAndPort() + " was not found in pool, creating one...");
return new GrpcChannel(endpoint, channelFactory, true);
return new GrpcChannel(endpoint, channelFactory);
});
}

Expand Down
45 changes: 13 additions & 32 deletions core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,11 @@ public void goodChannels() {

EndpointRecord endpoint = new EndpointRecord("host1", 1234);

GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false);
Assert.assertEquals(endpoint, lazy.getEndpoint());
Assert.assertNotNull(lazy.getReadyChannel());
lazy.shutdown();
lazy.shutdown(); // double shutdown is ok

GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true);
Assert.assertEquals(endpoint, eager.getEndpoint());
Assert.assertNotNull(eager.getReadyChannel());
eager.shutdown();
GrpcChannel channel = new GrpcChannel(endpoint, factoryMock);
Assert.assertEquals(endpoint, channel.getEndpoint());
Assert.assertNotNull(channel.getReadyChannel());
channel.shutdown();
channel.shutdown(); // double shutdown is ok
}

@Test
Expand All @@ -63,15 +58,10 @@ public void slowChannels() {

EndpointRecord endpoint = new EndpointRecord("host1", 1234);

GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false);
Assert.assertEquals(endpoint, lazy.getEndpoint());
Assert.assertNotNull(lazy.getReadyChannel());
lazy.shutdown();

GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true);
Assert.assertEquals(endpoint, eager.getEndpoint());
Assert.assertNotNull(eager.getReadyChannel());
eager.shutdown();
GrpcChannel channel = new GrpcChannel(endpoint, factoryMock);
Assert.assertEquals(endpoint, channel.getEndpoint());
Assert.assertNotNull(channel.getReadyChannel());
channel.shutdown();
}

@Test
Expand All @@ -90,22 +80,13 @@ public void badChannels() {

EndpointRecord endpoint = new EndpointRecord("host1", 1234);

GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false);
Assert.assertEquals(endpoint, lazy.getEndpoint());
GrpcChannel channel = new GrpcChannel(endpoint, factoryMock);
Assert.assertEquals(endpoint, channel.getEndpoint());

RuntimeException ex1 = Assert.assertThrows(RuntimeException.class, lazy::getReadyChannel);
RuntimeException ex1 = Assert.assertThrows(RuntimeException.class, channel::getReadyChannel);
Assert.assertEquals("Channel Endpoint{host=host1, port=1234, node=0, location=null} connecting problem",
ex1.getMessage());

lazy.shutdown();

GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true);
Assert.assertEquals(endpoint, eager.getEndpoint());

RuntimeException ex2 = Assert.assertThrows(RuntimeException.class, eager::getReadyChannel);
Assert.assertEquals("Channel Endpoint{host=host1, port=1234, node=0, location=null} connecting problem",
ex2.getMessage());

eager.shutdown();
channel.shutdown();
}
}