Skip to content

Commit

Permalink
Merge pull request #307 from ydb-platform/leave-idle-state
Browse files Browse the repository at this point in the history
Leave idle state after endpoint becomes available again
  • Loading branch information
pnv1 authored Aug 26, 2024
2 parents 05e5822 + 3f52adf commit 44e0e5f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public FixedCallOptionsTransport(
this.scheduler = scheduler;
this.callOptions = callOptions;
this.database = database;
this.channel = new GrpcChannel(endpoint, channelFactory, true);
this.channel = new GrpcChannel(endpoint, channelFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public SingleChannelTransport(GrpcTransportBuilder builder) {
logger.info("creating signle channel transport with endpoint {}", endpoint);

this.database = Strings.nullToEmpty(builder.getDatabase());
this.channel = new GrpcChannel(endpoint, channelFactory, true);
this.channel = new GrpcChannel(endpoint, channelFactory);

this.scheduler = builder.getSchedulerFactory().get();
this.callOptions = new AuthCallOptions(scheduler, Arrays.asList(endpoint), channelFactory, builder);
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public class GrpcChannel {
private final long connectTimeoutMs;
private final ReadyWatcher readyWatcher;

public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory, boolean tryToConnect) {
public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory) {
logger.debug("Creating grpc channel with {}", endpoint);
this.endpoint = endpoint;
this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort());
this.connectTimeoutMs = factory.getConnectTimeoutMs();
this.readyWatcher = new ReadyWatcher();
this.readyWatcher.check(tryToConnect);
this.readyWatcher.checkState();
}

public EndpointRecord getEndpoint() {
Expand Down Expand Up @@ -58,7 +58,7 @@ public boolean shutdown() {
}
return closed;
} catch (InterruptedException e) {
logger.warn("transport shutdown interrupted for channel {}: {}", endpoint, e);
logger.warn("transport shutdown interrupted for channel {}: ", endpoint, e);
Thread.currentThread().interrupt();
return false;
} finally {
Expand All @@ -73,24 +73,26 @@ public Channel getReadyChannel() {
try {
return future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
logger.error("Grpc channel {} ready waiting is interrupted", endpoint, ex);
logger.error("Grpc channel {} ready waiting is interrupted: ", endpoint, ex);
Thread.currentThread().interrupt();
} catch (ExecutionException ex) {
logger.error("Grpc channel {} connecting problem", endpoint, ex);
logger.error("Grpc channel {} connecting problem: ", endpoint, ex);
throw new RuntimeException("Channel " + endpoint + " connecting problem", ex);
} catch (TimeoutException ex) {
logger.error("Grpc channel {} connect timeout excided", endpoint);
logger.error("Grpc channel {} connect timeout exceeded", endpoint);
throw new RuntimeException("Channel " + endpoint + " connecting timeout");
}
return null;
}

public void check(boolean tryToConnect) {
ConnectivityState state = channel.getState(tryToConnect);
public void checkState() {
ConnectivityState state = channel.getState(true);
logger.debug("Grpc channel {} new state: {}", endpoint, state);
switch (state) {
case READY:
future.complete(channel);
// keep tracking channel state
channel.notifyWhenStateChanged(state, this);
break;
case SHUTDOWN:
future.completeExceptionally(new IllegalStateException("Grpc channel already closed"));
Expand All @@ -99,15 +101,15 @@ public void check(boolean tryToConnect) {
case CONNECTING:
case IDLE:
default:
// repeat watch
// keep tracking channel state
channel.notifyWhenStateChanged(state, this);
break;
}
}

@Override
public void run() {
check(false);
checkState();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public GrpcChannel getChannel(EndpointRecord endpoint) {

return result != null ? result : channels.computeIfAbsent(endpoint.getHostAndPort(), (key) -> {
logger.debug("channel " + endpoint.getHostAndPort() + " was not found in pool, creating one...");
return new GrpcChannel(endpoint, channelFactory, true);
return new GrpcChannel(endpoint, channelFactory);
});
}

Expand Down
45 changes: 13 additions & 32 deletions core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,11 @@ public void goodChannels() {

EndpointRecord endpoint = new EndpointRecord("host1", 1234);

GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false);
Assert.assertEquals(endpoint, lazy.getEndpoint());
Assert.assertNotNull(lazy.getReadyChannel());
lazy.shutdown();
lazy.shutdown(); // double shutdown is ok

GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true);
Assert.assertEquals(endpoint, eager.getEndpoint());
Assert.assertNotNull(eager.getReadyChannel());
eager.shutdown();
GrpcChannel channel = new GrpcChannel(endpoint, factoryMock);
Assert.assertEquals(endpoint, channel.getEndpoint());
Assert.assertNotNull(channel.getReadyChannel());
channel.shutdown();
channel.shutdown(); // double shutdown is ok
}

@Test
Expand All @@ -63,15 +58,10 @@ public void slowChannels() {

EndpointRecord endpoint = new EndpointRecord("host1", 1234);

GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false);
Assert.assertEquals(endpoint, lazy.getEndpoint());
Assert.assertNotNull(lazy.getReadyChannel());
lazy.shutdown();

GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true);
Assert.assertEquals(endpoint, eager.getEndpoint());
Assert.assertNotNull(eager.getReadyChannel());
eager.shutdown();
GrpcChannel channel = new GrpcChannel(endpoint, factoryMock);
Assert.assertEquals(endpoint, channel.getEndpoint());
Assert.assertNotNull(channel.getReadyChannel());
channel.shutdown();
}

@Test
Expand All @@ -90,22 +80,13 @@ public void badChannels() {

EndpointRecord endpoint = new EndpointRecord("host1", 1234);

GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false);
Assert.assertEquals(endpoint, lazy.getEndpoint());
GrpcChannel channel = new GrpcChannel(endpoint, factoryMock);
Assert.assertEquals(endpoint, channel.getEndpoint());

RuntimeException ex1 = Assert.assertThrows(RuntimeException.class, lazy::getReadyChannel);
RuntimeException ex1 = Assert.assertThrows(RuntimeException.class, channel::getReadyChannel);
Assert.assertEquals("Channel Endpoint{host=host1, port=1234, node=0, location=null} connecting problem",
ex1.getMessage());

lazy.shutdown();

GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true);
Assert.assertEquals(endpoint, eager.getEndpoint());

RuntimeException ex2 = Assert.assertThrows(RuntimeException.class, eager::getReadyChannel);
Assert.assertEquals("Channel Endpoint{host=host1, port=1234, node=0, location=null} connecting problem",
ex2.getMessage());

eager.shutdown();
channel.shutdown();
}
}

0 comments on commit 44e0e5f

Please sign in to comment.