Skip to content

Commit

Permalink
add feature 3861 (#3890)
Browse files Browse the repository at this point in the history
  • Loading branch information
SbloodyS authored Jul 19, 2024
1 parent de11f39 commit 942116b
Show file tree
Hide file tree
Showing 14 changed files with 541 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ export const renderFlinkCluster = (clusters, { model, field }: RenderCallbackPar
placeholder={t('flink.app.flinkCluster')}
value={model[field]}
onChange={(value: any) => (model[field] = value)}
codeField={field}
>
{clusters.map((item) => {
return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
import org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams;
import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;

import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -34,7 +36,7 @@
import org.openqa.selenium.remote.RemoteWebDriver;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
import static org.apache.streampark.e2e.pages.common.Constants.TEST_FLINK_SQL;
import static org.assertj.core.api.Assertions.assertThat;

@StreamPark(composeFiles = "docker/flink-1.16-on-yarn/docker-compose.yaml")
Expand All @@ -54,34 +56,49 @@ public class FlinkSQL116OnYarnTest {

private static final String applicationName = "flink-116-e2e-test";

private static final String flinkDescription = "description test";

private static final String flinkClusterName = "flink_1.16.3_cluster_e2e";

@BeforeAll
public static void setup() {
FlinkHomePage flinkHomePage = new LoginPage(browser)
.login(userName, password, teamName)
.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkHomePage.class);

flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);

FlinkClustersPage flinkClustersPage = flinkHomePage.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkClustersPage.class);

flinkClustersPage.createFlinkCluster()
.<YarnSessionForm>addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION)
.resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
.clusterName(flinkClusterName)
.flinkVersion(flinkName)
.submit();

flinkClustersPage.startFlinkCluster(flinkClusterName);

flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class);
flinkClustersPage.goToNav(ApacheFlinkPage.class)
.goToTab(ApplicationsPage.class);
}

@Test
@Order(10)
void testCreateFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

ApplicationsDynamicParams applicationsDynamicParams = new ApplicationsDynamicParams();

applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_APPLICATION,
applicationName,
flinkName,
applicationsDynamicParams);
applicationName)
.flinkVersion(flinkName)
.flinkSql(TEST_FLINK_SQL)
.submit();

Awaitility.await()
.untilAsserted(
Expand Down Expand Up @@ -176,16 +193,15 @@ void testDeleteFlinkApplicationOnYarnApplicationMode() {
void testCreateFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

ApplicationsDynamicParams applicationsDynamicParams = new ApplicationsDynamicParams();
applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_PER_JOB,
applicationName,
flinkName,
applicationsDynamicParams);
applicationName)
.flinkVersion(flinkName)
.flinkSql(TEST_FLINK_SQL)
.submit();

Awaitility.await()
.untilAsserted(
Expand Down Expand Up @@ -248,4 +264,81 @@ void testDeleteFlinkApplicationOnYarnPerJobMode() {
.noneMatch(it -> it.getText().contains(applicationName));
});
}

@Test
@Order(90)
void testCreateFlinkApplicationOnYarnSessionMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_SESSION,
applicationName)
.flinkVersion(flinkName)
.flinkSql(TEST_FLINK_SQL)
.flinkCluster(flinkClusterName)
.submit();

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList())
.as("Applications list should contain newly-created application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains(applicationName)));
}

@Test
@Order(100)
void testReleaseFlinkApplicationOnYarnSessionMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.releaseApplication(applicationName);

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList())
.as("Applications list should contain released application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains("SUCCESS")));
}

@Test
@Order(110)
void testStartFlinkApplicationOnYarnSessionMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.startApplication(applicationName);

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList())
.as("Applications list should contain started application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains("RUNNING")));

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList())
.as("Applications list should contain finished application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains("FINISHED")));
}

@Test
@Order(120)
void testDeleteFlinkApplicationOnYarnSessionMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.deleteApplication(applicationName);

Awaitility.await()
.untilAsserted(
() -> {
browser.navigate().refresh();

assertThat(applicationsPage.applicationsList())
.noneMatch(it -> it.getText().contains(applicationName));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm;
import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage;
import org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams;
import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm;

import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -34,7 +36,6 @@
import org.openqa.selenium.remote.RemoteWebDriver;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
import static org.assertj.core.api.Assertions.assertThat;

@StreamPark(composeFiles = "docker/flink-1.17-on-yarn/docker-compose.yaml")
Expand All @@ -54,34 +55,49 @@ public class FlinkSQL117OnYarnTest {

private static final String applicationName = "flink-117-e2e-test";

private static final String flinkDescription = "description test";

private static final String flinkClusterName = "flink_1.17.2_cluster_e2e";

@BeforeAll
public static void setup() {
FlinkHomePage flinkHomePage = new LoginPage(browser)
.login(userName, password, teamName)
.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkHomePage.class);

flinkHomePage.createFlinkHome(flinkName, flinkHome, "");
flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);

FlinkClustersPage flinkClustersPage = flinkHomePage.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkClustersPage.class);

flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class);
flinkClustersPage.createFlinkCluster()
.<YarnSessionForm>addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION)
.resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST)
.clusterName(flinkClusterName)
.flinkVersion(flinkName)
.submit();

flinkClustersPage.startFlinkCluster(flinkClusterName);

flinkClustersPage.goToNav(ApacheFlinkPage.class)
.goToTab(ApplicationsPage.class);
}

@Test
@Order(10)
void testCreateFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

ApplicationsDynamicParams applicationsDynamicParams = new ApplicationsDynamicParams();

applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_APPLICATION,
applicationName,
flinkName,
applicationsDynamicParams);
applicationName)
.flinkVersion(flinkName)
.flinkSql(Constants.TEST_FLINK_SQL)
.submit();

Awaitility.await()
.untilAsserted(
Expand Down Expand Up @@ -176,17 +192,15 @@ void testDeleteFlinkApplicationOnYarnApplicationMode() {
void testCreateFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

ApplicationsDynamicParams applicationsDynamicParams = new ApplicationsDynamicParams();

applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL);
applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_PER_JOB,
applicationName,
flinkName,
applicationsDynamicParams);
applicationName)
.flinkVersion(flinkName)
.flinkSql(Constants.TEST_FLINK_SQL)
.submit();

Awaitility.await()
.untilAsserted(
Expand Down Expand Up @@ -249,4 +263,108 @@ void testDeleteFlinkApplicationOnYarnPerJobMode() {
.noneMatch(it -> it.getText().contains(applicationName));
});
}

@Test
@Order(90)
void testCreateFlinkApplicationOnYarnSessionMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage
.createApplication()
.addApplication(
ApplicationForm.DevelopmentMode.FLINK_SQL,
ApplicationForm.ExecutionMode.YARN_SESSION,
applicationName)
.flinkVersion(flinkName)
.flinkSql(Constants.TEST_FLINK_SQL)
.flinkCluster(flinkClusterName)
.submit();

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList())
.as("Applications list should contain newly-created application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains(applicationName)));
}

@Test
@Order(100)
void testReleaseFlinkApplicationOnYarnSessionMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.releaseApplication(applicationName);

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList())
.as("Applications list should contain released application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains("SUCCESS")));
}

@Test
@Order(110)
void testStartFlinkApplicationOnYarnSessionMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.startApplication(applicationName);

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList())
.as("Applications list should contain started application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains("RUNNING")));

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList())
.as("Applications list should contain finished application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains("FINISHED")));
}

@Test
@Order(120)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnSessionMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.startApplication(applicationName);

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList())
.as("Applications list should contain restarted application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains("RUNNING")));

applicationsPage.cancelApplication(applicationName);

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList())
.as("Applications list should contain canceled application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains("CANCELED")));
}

@Test
@Order(130)
void testDeleteFlinkApplicationOnYarnSessionMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.deleteApplication(applicationName);

Awaitility.await()
.untilAsserted(
() -> {
browser.navigate().refresh();

assertThat(applicationsPage.applicationsList())
.noneMatch(it -> it.getText().contains(applicationName));
});
}
}
Loading

0 comments on commit 942116b

Please sign in to comment.