diff --git a/core/src/main/java/tech/ydb/core/impl/FixedCallOptionsTransport.java b/core/src/main/java/tech/ydb/core/impl/FixedCallOptionsTransport.java index b204be80..c951e822 100644 --- a/core/src/main/java/tech/ydb/core/impl/FixedCallOptionsTransport.java +++ b/core/src/main/java/tech/ydb/core/impl/FixedCallOptionsTransport.java @@ -33,7 +33,7 @@ public FixedCallOptionsTransport( this.scheduler = scheduler; this.callOptions = callOptions; this.database = database; - this.channel = new GrpcChannel(endpoint, channelFactory, true); + this.channel = new GrpcChannel(endpoint, channelFactory); } @Override diff --git a/core/src/main/java/tech/ydb/core/impl/SingleChannelTransport.java b/core/src/main/java/tech/ydb/core/impl/SingleChannelTransport.java index 4b8a6ef1..c4b4de5e 100644 --- a/core/src/main/java/tech/ydb/core/impl/SingleChannelTransport.java +++ b/core/src/main/java/tech/ydb/core/impl/SingleChannelTransport.java @@ -33,7 +33,7 @@ public SingleChannelTransport(GrpcTransportBuilder builder) { logger.info("creating signle channel transport with endpoint {}", endpoint); this.database = Strings.nullToEmpty(builder.getDatabase()); - this.channel = new GrpcChannel(endpoint, channelFactory, true); + this.channel = new GrpcChannel(endpoint, channelFactory); this.scheduler = builder.getSchedulerFactory().get(); this.callOptions = new AuthCallOptions(scheduler, Arrays.asList(endpoint), channelFactory, builder); diff --git a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java index f07f67f1..bdee43f2 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java @@ -23,13 +23,13 @@ public class GrpcChannel { private final long connectTimeoutMs; private final ReadyWatcher readyWatcher; - public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory, boolean tryToConnect) { + public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory) { logger.debug("Creating grpc channel with {}", endpoint); this.endpoint = endpoint; this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort()); this.connectTimeoutMs = factory.getConnectTimeoutMs(); this.readyWatcher = new ReadyWatcher(); - this.readyWatcher.check(tryToConnect); + this.readyWatcher.checkState(); } public EndpointRecord getEndpoint() { @@ -58,7 +58,7 @@ public boolean shutdown() { } return closed; } catch (InterruptedException e) { - logger.warn("transport shutdown interrupted for channel {}: {}", endpoint, e); + logger.warn("transport shutdown interrupted for channel {}: ", endpoint, e); Thread.currentThread().interrupt(); return false; } finally { @@ -73,24 +73,26 @@ public Channel getReadyChannel() { try { return future.get(connectTimeoutMs, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { - logger.error("Grpc channel {} ready waiting is interrupted", endpoint, ex); + logger.error("Grpc channel {} ready waiting is interrupted: ", endpoint, ex); Thread.currentThread().interrupt(); } catch (ExecutionException ex) { - logger.error("Grpc channel {} connecting problem", endpoint, ex); + logger.error("Grpc channel {} connecting problem: ", endpoint, ex); throw new RuntimeException("Channel " + endpoint + " connecting problem", ex); } catch (TimeoutException ex) { - logger.error("Grpc channel {} connect timeout excided", endpoint); + logger.error("Grpc channel {} connect timeout exceeded", endpoint); throw new RuntimeException("Channel " + endpoint + " connecting timeout"); } return null; } - public void check(boolean tryToConnect) { - ConnectivityState state = channel.getState(tryToConnect); + public void checkState() { + ConnectivityState state = channel.getState(true); logger.debug("Grpc channel {} new state: {}", endpoint, state); switch (state) { case READY: future.complete(channel); + // keep tracking channel state + channel.notifyWhenStateChanged(state, this); break; case SHUTDOWN: future.completeExceptionally(new IllegalStateException("Grpc channel already closed")); @@ -99,7 +101,7 @@ public void check(boolean tryToConnect) { case CONNECTING: case IDLE: default: - // repeat watch + // keep tracking channel state channel.notifyWhenStateChanged(state, this); break; } @@ -107,7 +109,7 @@ public void check(boolean tryToConnect) { @Override public void run() { - check(false); + checkState(); } } } diff --git a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannelPool.java b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannelPool.java index eaeab8a9..78ec7bf8 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannelPool.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannelPool.java @@ -36,7 +36,7 @@ public GrpcChannel getChannel(EndpointRecord endpoint) { return result != null ? result : channels.computeIfAbsent(endpoint.getHostAndPort(), (key) -> { logger.debug("channel " + endpoint.getHostAndPort() + " was not found in pool, creating one..."); - return new GrpcChannel(endpoint, channelFactory, true); + return new GrpcChannel(endpoint, channelFactory); }); } diff --git a/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java b/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java index 3368aa3f..288024d6 100644 --- a/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java +++ b/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java @@ -31,16 +31,11 @@ public void goodChannels() { EndpointRecord endpoint = new EndpointRecord("host1", 1234); - GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false); - Assert.assertEquals(endpoint, lazy.getEndpoint()); - Assert.assertNotNull(lazy.getReadyChannel()); - lazy.shutdown(); - lazy.shutdown(); // double shutdown is ok - - GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true); - Assert.assertEquals(endpoint, eager.getEndpoint()); - Assert.assertNotNull(eager.getReadyChannel()); - eager.shutdown(); + GrpcChannel channel = new GrpcChannel(endpoint, factoryMock); + Assert.assertEquals(endpoint, channel.getEndpoint()); + Assert.assertNotNull(channel.getReadyChannel()); + channel.shutdown(); + channel.shutdown(); // double shutdown is ok } @Test @@ -63,15 +58,10 @@ public void slowChannels() { EndpointRecord endpoint = new EndpointRecord("host1", 1234); - GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false); - Assert.assertEquals(endpoint, lazy.getEndpoint()); - Assert.assertNotNull(lazy.getReadyChannel()); - lazy.shutdown(); - - GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true); - Assert.assertEquals(endpoint, eager.getEndpoint()); - Assert.assertNotNull(eager.getReadyChannel()); - eager.shutdown(); + GrpcChannel channel = new GrpcChannel(endpoint, factoryMock); + Assert.assertEquals(endpoint, channel.getEndpoint()); + Assert.assertNotNull(channel.getReadyChannel()); + channel.shutdown(); } @Test @@ -90,22 +80,13 @@ public void badChannels() { EndpointRecord endpoint = new EndpointRecord("host1", 1234); - GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false); - Assert.assertEquals(endpoint, lazy.getEndpoint()); + GrpcChannel channel = new GrpcChannel(endpoint, factoryMock); + Assert.assertEquals(endpoint, channel.getEndpoint()); - RuntimeException ex1 = Assert.assertThrows(RuntimeException.class, lazy::getReadyChannel); + RuntimeException ex1 = Assert.assertThrows(RuntimeException.class, channel::getReadyChannel); Assert.assertEquals("Channel Endpoint{host=host1, port=1234, node=0, location=null} connecting problem", ex1.getMessage()); - lazy.shutdown(); - - GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true); - Assert.assertEquals(endpoint, eager.getEndpoint()); - - RuntimeException ex2 = Assert.assertThrows(RuntimeException.class, eager::getReadyChannel); - Assert.assertEquals("Channel Endpoint{host=host1, port=1234, node=0, location=null} connecting problem", - ex2.getMessage()); - - eager.shutdown(); + channel.shutdown(); } }