Skip to content

Commit

Permalink
[FLINK-37016] ClusterEntrypoint can be closed before initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jan 9, 2025
1 parent 7dbc646 commit 1c6540b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,9 @@ private CompletableFuture<ApplicationStatus> shutDownAsync(
shutDownApplicationFuture, () -> stopClusterServices(cleanupHaData));

final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture =
FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close);
rpcSystem != null
? FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close)
: FutureUtils.completedVoidFuture();

final CompletableFuture<Void> cleanupDirectoriesFuture =
FutureUtils.runAfterwards(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThatCode;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -131,6 +132,13 @@ public void testClusterStartShouldObtainTokens() throws Exception {
ExceptionThrowingDelegationTokenReceiver.onNewTokensObtainedCallCount.get(), is(1));
}

@Test
public void testCloseAsyncDoesNotFailBeforeInitialization() {
TestingEntryPoint entryPoint = new TestingEntryPoint.Builder().build();

assertThatCode(() -> entryPoint.closeAsync().join()).doesNotThrowAnyException();
}

@Test
public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
Expand Down

0 comments on commit 1c6540b

Please sign in to comment.