diff --git a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java index d9814f0d22..2c85b86c8f 100644 --- a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java +++ b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java @@ -33,7 +33,7 @@ class FlinkContainer extends GenericContainer { - public static final AtomicInteger TM_COUNT = new AtomicInteger(0); + public static final AtomicInteger TM_INDEX_SUFFIX = new AtomicInteger(0); public static final String FLINK_PROPS_KEY = "FLINK_PROPERTIES"; @@ -43,7 +43,7 @@ class FlinkContainer extends GenericContainer { @Nonnull DockerImageName dockerImageName, @Nonnull FlinkComponent component, @Nonnull Network network, - @Nonnull String yamlPropStr, + @Nonnull String yamlPropContent, @Nullable Slf4jLogConsumer slf4jLogConsumer) { super(dockerImageName); this.component = component; @@ -51,7 +51,7 @@ class FlinkContainer extends GenericContainer { this.withCreateContainerCmdModifier( createContainerCmd -> createContainerCmd.withName(getFlinkContainerName())); this.withNetwork(network); - this.withEnv(FLINK_PROPS_KEY, yamlPropStr); + this.withEnv(FLINK_PROPS_KEY, yamlPropContent); Optional.ofNullable(slf4jLogConsumer).ifPresent(this::withLogConsumer); } @@ -59,6 +59,6 @@ protected String getFlinkContainerName() { if (component == JOBMANAGER) { return JOBMANAGER.getName(); } - return String.format("%s_%s", TASKMANAGER.getName(), TM_COUNT.incrementAndGet()); + return String.format("%s_%s", TASKMANAGER.getName(), TM_INDEX_SUFFIX.incrementAndGet()); } } diff --git a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java index e5f01dea6f..c238675d5b 100644 --- a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java +++ b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java @@ -48,13 +48,13 @@ public class FlinkStandaloneSessionCluster implements Startable { public static final Network NETWORK = Network.newNetwork(); public static final String JM_RPC_ADDR_KEY = "jobmanager.rpc.address"; - public static final String SLOT_CONF_KEY = "taskmanager.numberOfTaskSlots"; - public static final String SLOT_CONF_FORMAT = String.format("%s: %%s", SLOT_CONF_KEY); + public static final String TM_SLOT_NUM_KEY = "taskmanager.numberOfTaskSlots"; + public static final String SLOT_CONF_FORMAT = String.format("%s: %%s", TM_SLOT_NUM_KEY); public static final int BLOB_SERVER_PORT = 6123; public static final int WEB_PORT = 8081; - private String yamlConfStr = String.format("%s: %s", JM_RPC_ADDR_KEY, JOBMANAGER.getName()); + private String yamlConfContent = String.format("%s: %s", JM_RPC_ADDR_KEY, JOBMANAGER.getName()); private final FlinkContainer jobManagerContainer; @@ -64,17 +64,17 @@ private FlinkStandaloneSessionCluster( DockerImageName dockerImageName, int taskManagerNum, int slotsNumPerTm, - @Nullable String yamlConfStr, + @Nullable String yamlConfContent, Slf4jLogConsumer slf4jLogConsumer) { - renderJmRpcConfIfNeeded(yamlConfStr); + renderJmRpcConfIfNeeded(yamlConfContent); renderSlotNumIfNeeded(slotsNumPerTm); // Set for JM. this.jobManagerContainer = new FlinkContainer( - dockerImageName, JOBMANAGER, NETWORK, this.yamlConfStr, slf4jLogConsumer); + dockerImageName, JOBMANAGER, NETWORK, this.yamlConfContent, slf4jLogConsumer); this.jobManagerContainer.addExposedPort(BLOB_SERVER_PORT); this.jobManagerContainer.addExposedPort(WEB_PORT); @@ -88,7 +88,7 @@ private FlinkStandaloneSessionCluster( for (int i = 0; i < taskManagerNum; i++) { FlinkContainer taskManager = new FlinkContainer( - dockerImageName, TASKMANAGER, NETWORK, this.yamlConfStr, slf4jLogConsumer); + dockerImageName, TASKMANAGER, NETWORK, this.yamlConfContent, slf4jLogConsumer); this.taskManagerContainers.add(taskManager); } } @@ -119,20 +119,20 @@ public void stop() { } private void renderSlotNumIfNeeded(int slotsNumPerTm) { - if (!this.yamlConfStr.contains(SLOT_CONF_KEY)) { - this.yamlConfStr = + if (!this.yamlConfContent.contains(TM_SLOT_NUM_KEY)) { + this.yamlConfContent = String.format( - "%s\n%s\n", this.yamlConfStr, String.format(SLOT_CONF_FORMAT, slotsNumPerTm)); + "%s\n%s\n", this.yamlConfContent, String.format(SLOT_CONF_FORMAT, slotsNumPerTm)); } } private void renderJmRpcConfIfNeeded(@Nullable String yamlConfStr) { - this.yamlConfStr = + this.yamlConfContent = yamlConfStr == null - ? this.yamlConfStr + ? this.yamlConfContent : (yamlConfStr.contains(JM_RPC_ADDR_KEY) ? yamlConfStr - : String.format("%s\n%s\n", this.yamlConfStr, yamlConfStr)); + : String.format("%s\n%s\n", this.yamlConfContent, yamlConfStr)); } public static class Builder { @@ -141,7 +141,7 @@ public static class Builder { DockerImageName.parse("apache/flink:1.17.1-scala_2.12"); private int taskManagerNum = 1; private int slotsNumPerTm = 8; - private @Nullable String yamlConfStr; + private @Nullable String yamlConfContent; private Slf4jLogConsumer slf4jLogConsumer = new Slf4jLogConsumer(LOG, false); private Builder() {} @@ -163,8 +163,8 @@ public Builder slotsNumPerTm(int slotsNumPerTm) { return this; } - public Builder yamlConfStr(@Nullable String yamlConfStr) { - this.yamlConfStr = yamlConfStr; + public Builder yamlConfContent(@Nullable String yamlConfContent) { + this.yamlConfContent = yamlConfContent; return this; } @@ -175,7 +175,7 @@ public Builder slf4jLogConsumer(Slf4jLogConsumer slf4jLogConsumer) { public FlinkStandaloneSessionCluster build() { return new FlinkStandaloneSessionCluster( - dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfStr, slf4jLogConsumer); + dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfContent, slf4jLogConsumer); } } diff --git a/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java b/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java index 6f34e720f7..f84ddf66c2 100644 --- a/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java +++ b/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java @@ -33,22 +33,23 @@ /** Test for flink standalone session cluster env available. */ class FlinkStandaloneSessionClusterITest { - private final FlinkStandaloneSessionCluster cluster = + private final FlinkStandaloneSessionCluster flinkStandaloneSessionCluster = FlinkStandaloneSessionCluster.builder().build(); @BeforeEach void up() { - cluster.start(); + flinkStandaloneSessionCluster.start(); } @AfterEach void down() { - cluster.stop(); + flinkStandaloneSessionCluster.stop(); } @Test void testRestApiAvailable() throws IOException { - String url = String.format("%s/%s", cluster.getFlinkJobManagerUrl(), "config"); + String url = + String.format("%s/%s", flinkStandaloneSessionCluster.getFlinkJobManagerUrl(), "config"); CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(new HttpGet(url)); assertThat(response.getCode()).isEqualTo(200);