From 1c6540bfa42b8824ea264d8b7104ef41f61bd5e1 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Mon, 6 Jan 2025 14:38:24 +0100 Subject: [PATCH] [FLINK-37016] ClusterEntrypoint can be closed before initialization --- .../flink/runtime/entrypoint/ClusterEntrypoint.java | 4 +++- .../flink/runtime/entrypoint/ClusterEntrypointTest.java | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 8c2803fe4437c..28c66d0c21dd6 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -592,7 +592,9 @@ private CompletableFuture shutDownAsync( shutDownApplicationFuture, () -> stopClusterServices(cleanupHaData)); final CompletableFuture rpcSystemClassLoaderCloseFuture = - FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close); + rpcSystem != null + ? FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close) + : FutureUtils.completedVoidFuture(); final CompletableFuture cleanupDirectoriesFuture = FutureUtils.runAfterwards( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java index 8b2424e8694de..86796fdeba595 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java @@ -69,6 +69,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; @@ -131,6 +132,13 @@ public void testClusterStartShouldObtainTokens() throws Exception { ExceptionThrowingDelegationTokenReceiver.onNewTokensObtainedCallCount.get(), is(1)); } + @Test + public void testCloseAsyncDoesNotFailBeforeInitialization() { + TestingEntryPoint entryPoint = new TestingEntryPoint.Builder().build(); + + assertThatCode(() -> entryPoint.closeAsync().join()).doesNotThrowAnyException(); + } + @Test public void testCloseAsyncShouldNotCleanUpHAData() throws Exception { final CompletableFuture closeFuture = new CompletableFuture<>();