From 724a677cb420be30785a2b562c32412b336808c3 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Mon, 26 Aug 2024 23:26:37 +0300 Subject: [PATCH 1/5] Always pass requestConnect=true flag to getState --- .../core/impl/FixedCallOptionsTransport.java | 2 +- .../ydb/core/impl/SingleChannelTransport.java | 2 +- .../tech/ydb/core/impl/pool/GrpcChannel.java | 19 +++++--- .../ydb/core/impl/pool/GrpcChannelPool.java | 2 +- .../ydb/core/impl/pool/GrpcChannelTest.java | 45 ++++++------------- 5 files changed, 28 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/FixedCallOptionsTransport.java b/core/src/main/java/tech/ydb/core/impl/FixedCallOptionsTransport.java index b204be802..c951e8229 100644 --- a/core/src/main/java/tech/ydb/core/impl/FixedCallOptionsTransport.java +++ b/core/src/main/java/tech/ydb/core/impl/FixedCallOptionsTransport.java @@ -33,7 +33,7 @@ public FixedCallOptionsTransport( this.scheduler = scheduler; this.callOptions = callOptions; this.database = database; - this.channel = new GrpcChannel(endpoint, channelFactory, true); + this.channel = new GrpcChannel(endpoint, channelFactory); } @Override diff --git a/core/src/main/java/tech/ydb/core/impl/SingleChannelTransport.java b/core/src/main/java/tech/ydb/core/impl/SingleChannelTransport.java index 4b8a6ef16..c4b4de5eb 100644 --- a/core/src/main/java/tech/ydb/core/impl/SingleChannelTransport.java +++ b/core/src/main/java/tech/ydb/core/impl/SingleChannelTransport.java @@ -33,7 +33,7 @@ public SingleChannelTransport(GrpcTransportBuilder builder) { logger.info("creating signle channel transport with endpoint {}", endpoint); this.database = Strings.nullToEmpty(builder.getDatabase()); - this.channel = new GrpcChannel(endpoint, channelFactory, true); + this.channel = new GrpcChannel(endpoint, channelFactory); this.scheduler = builder.getSchedulerFactory().get(); this.callOptions = new AuthCallOptions(scheduler, Arrays.asList(endpoint), channelFactory, builder); diff --git a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java index f07f67f12..919e24543 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java @@ -23,13 +23,13 @@ public class GrpcChannel { private final long connectTimeoutMs; private final ReadyWatcher readyWatcher; - public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory, boolean tryToConnect) { + public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory) { logger.debug("Creating grpc channel with {}", endpoint); this.endpoint = endpoint; this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort()); this.connectTimeoutMs = factory.getConnectTimeoutMs(); this.readyWatcher = new ReadyWatcher(); - this.readyWatcher.check(tryToConnect); + this.readyWatcher.checkState(); } public EndpointRecord getEndpoint() { @@ -67,7 +67,7 @@ public boolean shutdown() { } private class ReadyWatcher implements Runnable { - private final CompletableFuture future = new CompletableFuture<>(); + private CompletableFuture future = new CompletableFuture<>(); public Channel getReadyChannel() { try { @@ -85,12 +85,14 @@ public Channel getReadyChannel() { return null; } - public void check(boolean tryToConnect) { - ConnectivityState state = channel.getState(tryToConnect); + public void checkState() { + ConnectivityState state = channel.getState(true); logger.debug("Grpc channel {} new state: {}", endpoint, state); switch (state) { case READY: future.complete(channel); + // keep tracking channel state + channel.notifyWhenStateChanged(state, this); break; case SHUTDOWN: future.completeExceptionally(new IllegalStateException("Grpc channel already closed")); @@ -99,7 +101,10 @@ public void check(boolean tryToConnect) { case CONNECTING: case IDLE: default: - // repeat watch + if (future.isDone()) { + future = new CompletableFuture<>(); + } + // keep tracking channel state channel.notifyWhenStateChanged(state, this); break; } @@ -107,7 +112,7 @@ public void check(boolean tryToConnect) { @Override public void run() { - check(false); + checkState(); } } } diff --git a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannelPool.java b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannelPool.java index eaeab8a9f..78ec7bf85 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannelPool.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannelPool.java @@ -36,7 +36,7 @@ public GrpcChannel getChannel(EndpointRecord endpoint) { return result != null ? result : channels.computeIfAbsent(endpoint.getHostAndPort(), (key) -> { logger.debug("channel " + endpoint.getHostAndPort() + " was not found in pool, creating one..."); - return new GrpcChannel(endpoint, channelFactory, true); + return new GrpcChannel(endpoint, channelFactory); }); } diff --git a/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java b/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java index 3368aa3f9..288024d62 100644 --- a/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java +++ b/core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelTest.java @@ -31,16 +31,11 @@ public void goodChannels() { EndpointRecord endpoint = new EndpointRecord("host1", 1234); - GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false); - Assert.assertEquals(endpoint, lazy.getEndpoint()); - Assert.assertNotNull(lazy.getReadyChannel()); - lazy.shutdown(); - lazy.shutdown(); // double shutdown is ok - - GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true); - Assert.assertEquals(endpoint, eager.getEndpoint()); - Assert.assertNotNull(eager.getReadyChannel()); - eager.shutdown(); + GrpcChannel channel = new GrpcChannel(endpoint, factoryMock); + Assert.assertEquals(endpoint, channel.getEndpoint()); + Assert.assertNotNull(channel.getReadyChannel()); + channel.shutdown(); + channel.shutdown(); // double shutdown is ok } @Test @@ -63,15 +58,10 @@ public void slowChannels() { EndpointRecord endpoint = new EndpointRecord("host1", 1234); - GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false); - Assert.assertEquals(endpoint, lazy.getEndpoint()); - Assert.assertNotNull(lazy.getReadyChannel()); - lazy.shutdown(); - - GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true); - Assert.assertEquals(endpoint, eager.getEndpoint()); - Assert.assertNotNull(eager.getReadyChannel()); - eager.shutdown(); + GrpcChannel channel = new GrpcChannel(endpoint, factoryMock); + Assert.assertEquals(endpoint, channel.getEndpoint()); + Assert.assertNotNull(channel.getReadyChannel()); + channel.shutdown(); } @Test @@ -90,22 +80,13 @@ public void badChannels() { EndpointRecord endpoint = new EndpointRecord("host1", 1234); - GrpcChannel lazy = new GrpcChannel(endpoint, factoryMock, false); - Assert.assertEquals(endpoint, lazy.getEndpoint()); + GrpcChannel channel = new GrpcChannel(endpoint, factoryMock); + Assert.assertEquals(endpoint, channel.getEndpoint()); - RuntimeException ex1 = Assert.assertThrows(RuntimeException.class, lazy::getReadyChannel); + RuntimeException ex1 = Assert.assertThrows(RuntimeException.class, channel::getReadyChannel); Assert.assertEquals("Channel Endpoint{host=host1, port=1234, node=0, location=null} connecting problem", ex1.getMessage()); - lazy.shutdown(); - - GrpcChannel eager = new GrpcChannel(endpoint, factoryMock, true); - Assert.assertEquals(endpoint, eager.getEndpoint()); - - RuntimeException ex2 = Assert.assertThrows(RuntimeException.class, eager::getReadyChannel); - Assert.assertEquals("Channel Endpoint{host=host1, port=1234, node=0, location=null} connecting problem", - ex2.getMessage()); - - eager.shutdown(); + channel.shutdown(); } } From 34baa8b89c61bcb1201b035f13379347bd08d4e0 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Mon, 26 Aug 2024 23:29:49 +0300 Subject: [PATCH 2/5] Fix typo --- core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java index 919e24543..a72b76722 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java @@ -79,7 +79,7 @@ public Channel getReadyChannel() { logger.error("Grpc channel {} connecting problem", endpoint, ex); throw new RuntimeException("Channel " + endpoint + " connecting problem", ex); } catch (TimeoutException ex) { - logger.error("Grpc channel {} connect timeout excided", endpoint); + logger.error("Grpc channel {} connect timeout exceeded", endpoint); throw new RuntimeException("Channel " + endpoint + " connecting timeout"); } return null; From 4586c50dfd205551cbacaf37b84ffab0b707d222 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Mon, 26 Aug 2024 23:39:56 +0300 Subject: [PATCH 3/5] Improve error logging --- core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java index a72b76722..efd18c83d 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java @@ -58,7 +58,7 @@ public boolean shutdown() { } return closed; } catch (InterruptedException e) { - logger.warn("transport shutdown interrupted for channel {}: {}", endpoint, e); + logger.warn("transport shutdown interrupted for channel {}: ", endpoint, e); Thread.currentThread().interrupt(); return false; } finally { @@ -73,10 +73,10 @@ public Channel getReadyChannel() { try { return future.get(connectTimeoutMs, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { - logger.error("Grpc channel {} ready waiting is interrupted", endpoint, ex); + logger.error("Grpc channel {} ready waiting is interrupted: ", endpoint, ex); Thread.currentThread().interrupt(); } catch (ExecutionException ex) { - logger.error("Grpc channel {} connecting problem", endpoint, ex); + logger.error("Grpc channel {} connecting problem: ", endpoint, ex); throw new RuntimeException("Channel " + endpoint + " connecting problem", ex); } catch (TimeoutException ex) { logger.error("Grpc channel {} connect timeout exceeded", endpoint); From d10cd39523da5255a06eb8b87a2c70db483a89a4 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Tue, 27 Aug 2024 00:11:41 +0300 Subject: [PATCH 4/5] Do not refresh future. Only use it for first connection --- core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java index efd18c83d..adab0c5d0 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java @@ -101,9 +101,6 @@ public void checkState() { case CONNECTING: case IDLE: default: - if (future.isDone()) { - future = new CompletableFuture<>(); - } // keep tracking channel state channel.notifyWhenStateChanged(state, this); break; From 3f52adfbcfc3373911f4ab42b11e61fe81b26f83 Mon Sep 17 00:00:00 2001 From: Nikolay Perfilov Date: Tue, 27 Aug 2024 00:26:24 +0300 Subject: [PATCH 5/5] Return final keyword --- core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java index adab0c5d0..bdee43f27 100644 --- a/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java +++ b/core/src/main/java/tech/ydb/core/impl/pool/GrpcChannel.java @@ -67,7 +67,7 @@ public boolean shutdown() { } private class ReadyWatcher implements Runnable { - private CompletableFuture future = new CompletableFuture<>(); + private final CompletableFuture future = new CompletableFuture<>(); public Channel getReadyChannel() { try {