Skip to content

Commit

Permalink
[ISSUE-3108][Improve] Improve streampark-testcontainer module based o…
Browse files Browse the repository at this point in the history
…n [3.1 Naming Style]
  • Loading branch information
RocMarshal committed Oct 9, 2023
1 parent 095be65 commit e55dc1d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

class FlinkContainer extends GenericContainer<FlinkContainer> {

public static final AtomicInteger TM_COUNT = new AtomicInteger(0);
public static final AtomicInteger TM_INDEX_SUFFIX = new AtomicInteger(0);

public static final String FLINK_PROPS_KEY = "FLINK_PROPERTIES";

Expand All @@ -43,22 +43,22 @@ class FlinkContainer extends GenericContainer<FlinkContainer> {
@Nonnull DockerImageName dockerImageName,
@Nonnull FlinkComponent component,
@Nonnull Network network,
@Nonnull String yamlPropStr,
@Nonnull String yamlPropContent,
@Nullable Slf4jLogConsumer slf4jLogConsumer) {
super(dockerImageName);
this.component = component;
this.withCommand("/docker-entrypoint.sh", component.getName());
this.withCreateContainerCmdModifier(
createContainerCmd -> createContainerCmd.withName(getFlinkContainerName()));
this.withNetwork(network);
this.withEnv(FLINK_PROPS_KEY, yamlPropStr);
this.withEnv(FLINK_PROPS_KEY, yamlPropContent);
Optional.ofNullable(slf4jLogConsumer).ifPresent(this::withLogConsumer);
}

protected String getFlinkContainerName() {
if (component == JOBMANAGER) {
return JOBMANAGER.getName();
}
return String.format("%s_%s", TASKMANAGER.getName(), TM_COUNT.incrementAndGet());
return String.format("%s_%s", TASKMANAGER.getName(), TM_INDEX_SUFFIX.incrementAndGet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public class FlinkStandaloneSessionCluster implements Startable {
public static final Network NETWORK = Network.newNetwork();

public static final String JM_RPC_ADDR_KEY = "jobmanager.rpc.address";
public static final String SLOT_CONF_KEY = "taskmanager.numberOfTaskSlots";
public static final String SLOT_CONF_FORMAT = String.format("%s: %%s", SLOT_CONF_KEY);
public static final String TM_SLOT_NUM_KEY = "taskmanager.numberOfTaskSlots";
public static final String SLOT_CONF_FORMAT = String.format("%s: %%s", TM_SLOT_NUM_KEY);

public static final int BLOB_SERVER_PORT = 6123;
public static final int WEB_PORT = 8081;

private String yamlConfStr = String.format("%s: %s", JM_RPC_ADDR_KEY, JOBMANAGER.getName());
private String yamlConfContent = String.format("%s: %s", JM_RPC_ADDR_KEY, JOBMANAGER.getName());

private final FlinkContainer jobManagerContainer;

Expand All @@ -64,17 +64,17 @@ private FlinkStandaloneSessionCluster(
DockerImageName dockerImageName,
int taskManagerNum,
int slotsNumPerTm,
@Nullable String yamlConfStr,
@Nullable String yamlConfContent,
Slf4jLogConsumer slf4jLogConsumer) {

renderJmRpcConfIfNeeded(yamlConfStr);
renderJmRpcConfIfNeeded(yamlConfContent);

renderSlotNumIfNeeded(slotsNumPerTm);

// Set for JM.
this.jobManagerContainer =
new FlinkContainer(
dockerImageName, JOBMANAGER, NETWORK, this.yamlConfStr, slf4jLogConsumer);
dockerImageName, JOBMANAGER, NETWORK, this.yamlConfContent, slf4jLogConsumer);
this.jobManagerContainer.addExposedPort(BLOB_SERVER_PORT);
this.jobManagerContainer.addExposedPort(WEB_PORT);

Expand All @@ -88,7 +88,7 @@ private FlinkStandaloneSessionCluster(
for (int i = 0; i < taskManagerNum; i++) {
FlinkContainer taskManager =
new FlinkContainer(
dockerImageName, TASKMANAGER, NETWORK, this.yamlConfStr, slf4jLogConsumer);
dockerImageName, TASKMANAGER, NETWORK, this.yamlConfContent, slf4jLogConsumer);
this.taskManagerContainers.add(taskManager);
}
}
Expand Down Expand Up @@ -119,20 +119,20 @@ public void stop() {
}

private void renderSlotNumIfNeeded(int slotsNumPerTm) {
if (!this.yamlConfStr.contains(SLOT_CONF_KEY)) {
this.yamlConfStr =
if (!this.yamlConfContent.contains(TM_SLOT_NUM_KEY)) {
this.yamlConfContent =
String.format(
"%s\n%s\n", this.yamlConfStr, String.format(SLOT_CONF_FORMAT, slotsNumPerTm));
"%s\n%s\n", this.yamlConfContent, String.format(SLOT_CONF_FORMAT, slotsNumPerTm));
}
}

private void renderJmRpcConfIfNeeded(@Nullable String yamlConfStr) {
this.yamlConfStr =
this.yamlConfContent =
yamlConfStr == null
? this.yamlConfStr
? this.yamlConfContent
: (yamlConfStr.contains(JM_RPC_ADDR_KEY)
? yamlConfStr
: String.format("%s\n%s\n", this.yamlConfStr, yamlConfStr));
: String.format("%s\n%s\n", this.yamlConfContent, yamlConfStr));
}

public static class Builder {
Expand All @@ -141,7 +141,7 @@ public static class Builder {
DockerImageName.parse("apache/flink:1.17.1-scala_2.12");
private int taskManagerNum = 1;
private int slotsNumPerTm = 8;
private @Nullable String yamlConfStr;
private @Nullable String yamlConfContent;
private Slf4jLogConsumer slf4jLogConsumer = new Slf4jLogConsumer(LOG, false);

private Builder() {}
Expand All @@ -163,8 +163,8 @@ public Builder slotsNumPerTm(int slotsNumPerTm) {
return this;
}

public Builder yamlConfStr(@Nullable String yamlConfStr) {
this.yamlConfStr = yamlConfStr;
public Builder yamlConfContent(@Nullable String yamlConfContent) {
this.yamlConfContent = yamlConfContent;
return this;
}

Expand All @@ -175,7 +175,7 @@ public Builder slf4jLogConsumer(Slf4jLogConsumer slf4jLogConsumer) {

public FlinkStandaloneSessionCluster build() {
return new FlinkStandaloneSessionCluster(
dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfStr, slf4jLogConsumer);
dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfContent, slf4jLogConsumer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,23 @@
/** Test for flink standalone session cluster env available. */
class FlinkStandaloneSessionClusterITest {

private final FlinkStandaloneSessionCluster cluster =
private final FlinkStandaloneSessionCluster flinkStandaloneSessionCluster =
FlinkStandaloneSessionCluster.builder().build();

@BeforeEach
void up() {
cluster.start();
flinkStandaloneSessionCluster.start();
}

@AfterEach
void down() {
cluster.stop();
flinkStandaloneSessionCluster.stop();
}

@Test
void testRestApiAvailable() throws IOException {
String url = String.format("%s/%s", cluster.getFlinkJobManagerUrl(), "config");
String url =
String.format("%s/%s", flinkStandaloneSessionCluster.getFlinkJobManagerUrl(), "config");
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(new HttpGet(url));
assertThat(response.getCode()).isEqualTo(200);
Expand Down

0 comments on commit e55dc1d

Please sign in to comment.