From 942116bd25e2e226bf3b71edb4c1ce8147ec1ba3 Mon Sep 17 00:00:00 2001 From: xiangzihao <460888207@qq.com> Date: Fri, 19 Jul 2024 12:29:46 +0800 Subject: [PATCH] add feature 3861 (#3890) --- .../views/flink/app/hooks/useFlinkRender.tsx | 1 + .../e2e/cases/FlinkSQL116OnYarnTest.java | 123 ++++++++++++-- .../e2e/cases/FlinkSQL117OnYarnTest.java | 150 ++++++++++++++++-- .../e2e/cases/FlinkSQL118OnYarnTest.java | 150 ++++++++++++++++-- .../e2e/pages/common/Constants.java | 32 ++++ .../flink/applications/ApplicationForm.java | 68 ++++++-- .../flink/applications/FlinkSQLEditor.java | 75 ++++++++- .../FlinkSQLYarnApplicationForm.java | 69 -------- .../entity/ApplicationsDynamicParams.java | 26 --- .../flink/clusters/FlinkClustersPage.java | 4 +- .../flink-1.16-on-yarn/docker-compose.config | 1 + .../flink-1.17-on-yarn/docker-compose.config | 1 + .../flink-1.18-on-yarn/docker-compose.config | 1 + .../apache/streampark/e2e/core/Constants.java | 27 ---- 14 files changed, 541 insertions(+), 187 deletions(-) delete mode 100644 streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLYarnApplicationForm.java delete mode 100644 streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/entity/ApplicationsDynamicParams.java diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx index 62975b584c..79105228be 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx @@ -274,6 +274,7 @@ export const renderFlinkCluster = (clusters, { model, field }: RenderCallbackPar placeholder={t('flink.app.flinkCluster')} value={model[field]} onChange={(value: any) => (model[field] = value)} + codeField={field} > {clusters.map((item) => { return ( diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java index 92d4c5daea..9fd649ff05 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java @@ -24,7 +24,9 @@ import org.apache.streampark.e2e.pages.flink.FlinkHomePage; import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm; import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage; -import org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams; +import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm; +import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage; +import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm; import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeAll; @@ -34,7 +36,7 @@ import org.openqa.selenium.remote.RemoteWebDriver; import org.testcontainers.shaded.org.awaitility.Awaitility; -import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL; +import static org.apache.streampark.e2e.pages.common.Constants.TEST_FLINK_SQL; import static org.assertj.core.api.Assertions.assertThat; @StreamPark(composeFiles = "docker/flink-1.16-on-yarn/docker-compose.yaml") @@ -54,6 +56,10 @@ public class FlinkSQL116OnYarnTest { private static final String applicationName = "flink-116-e2e-test"; + private static final String flinkDescription = "description test"; + + private static final String flinkClusterName = "flink_1.16.3_cluster_e2e"; + @BeforeAll public static void setup() { FlinkHomePage flinkHomePage = new LoginPage(browser) @@ -61,9 +67,22 @@ public static void setup() { .goToNav(ApacheFlinkPage.class) .goToTab(FlinkHomePage.class); - flinkHomePage.createFlinkHome(flinkName, flinkHome, ""); + flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription); + + FlinkClustersPage flinkClustersPage = flinkHomePage.goToNav(ApacheFlinkPage.class) + .goToTab(FlinkClustersPage.class); + + flinkClustersPage.createFlinkCluster() + .addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION) + .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST) + .clusterName(flinkClusterName) + .flinkVersion(flinkName) + .submit(); + + flinkClustersPage.startFlinkCluster(flinkClusterName); - flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class); + flinkClustersPage.goToNav(ApacheFlinkPage.class) + .goToTab(ApplicationsPage.class); } @Test @@ -71,17 +90,15 @@ public static void setup() { void testCreateFlinkApplicationOnYarnApplicationMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); - ApplicationsDynamicParams applicationsDynamicParams = new ApplicationsDynamicParams(); - - applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL); applicationsPage .createApplication() .addApplication( ApplicationForm.DevelopmentMode.FLINK_SQL, ApplicationForm.ExecutionMode.YARN_APPLICATION, - applicationName, - flinkName, - applicationsDynamicParams); + applicationName) + .flinkVersion(flinkName) + .flinkSql(TEST_FLINK_SQL) + .submit(); Awaitility.await() .untilAsserted( @@ -176,16 +193,15 @@ void testDeleteFlinkApplicationOnYarnApplicationMode() { void testCreateFlinkApplicationOnYarnPerJobMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); - ApplicationsDynamicParams applicationsDynamicParams = new ApplicationsDynamicParams(); - applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL); applicationsPage .createApplication() .addApplication( ApplicationForm.DevelopmentMode.FLINK_SQL, ApplicationForm.ExecutionMode.YARN_PER_JOB, - applicationName, - flinkName, - applicationsDynamicParams); + applicationName) + .flinkVersion(flinkName) + .flinkSql(TEST_FLINK_SQL) + .submit(); Awaitility.await() .untilAsserted( @@ -248,4 +264,81 @@ void testDeleteFlinkApplicationOnYarnPerJobMode() { .noneMatch(it -> it.getText().contains(applicationName)); }); } + + @Test + @Order(90) + void testCreateFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.createApplication() + .addApplication( + ApplicationForm.DevelopmentMode.FLINK_SQL, + ApplicationForm.ExecutionMode.YARN_SESSION, + applicationName) + .flinkVersion(flinkName) + .flinkSql(TEST_FLINK_SQL) + .flinkCluster(flinkClusterName) + .submit(); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain newly-created application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains(applicationName))); + } + + @Test + @Order(100) + void testReleaseFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.releaseApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain released application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("SUCCESS"))); + } + + @Test + @Order(110) + void testStartFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain started application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("RUNNING"))); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain finished application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("FINISHED"))); + } + + @Test + @Order(120) + void testDeleteFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.deleteApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> { + browser.navigate().refresh(); + + assertThat(applicationsPage.applicationsList()) + .noneMatch(it -> it.getText().contains(applicationName)); + }); + } } diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java index b98fae390e..f636a94c8f 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java @@ -24,7 +24,9 @@ import org.apache.streampark.e2e.pages.flink.FlinkHomePage; import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm; import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage; -import org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams; +import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm; +import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage; +import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm; import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeAll; @@ -34,7 +36,6 @@ import org.openqa.selenium.remote.RemoteWebDriver; import org.testcontainers.shaded.org.awaitility.Awaitility; -import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL; import static org.assertj.core.api.Assertions.assertThat; @StreamPark(composeFiles = "docker/flink-1.17-on-yarn/docker-compose.yaml") @@ -54,6 +55,10 @@ public class FlinkSQL117OnYarnTest { private static final String applicationName = "flink-117-e2e-test"; + private static final String flinkDescription = "description test"; + + private static final String flinkClusterName = "flink_1.17.2_cluster_e2e"; + @BeforeAll public static void setup() { FlinkHomePage flinkHomePage = new LoginPage(browser) @@ -61,9 +66,22 @@ public static void setup() { .goToNav(ApacheFlinkPage.class) .goToTab(FlinkHomePage.class); - flinkHomePage.createFlinkHome(flinkName, flinkHome, ""); + flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription); + + FlinkClustersPage flinkClustersPage = flinkHomePage.goToNav(ApacheFlinkPage.class) + .goToTab(FlinkClustersPage.class); - flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class); + flinkClustersPage.createFlinkCluster() + .addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION) + .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST) + .clusterName(flinkClusterName) + .flinkVersion(flinkName) + .submit(); + + flinkClustersPage.startFlinkCluster(flinkClusterName); + + flinkClustersPage.goToNav(ApacheFlinkPage.class) + .goToTab(ApplicationsPage.class); } @Test @@ -71,17 +89,15 @@ public static void setup() { void testCreateFlinkApplicationOnYarnApplicationMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); - ApplicationsDynamicParams applicationsDynamicParams = new ApplicationsDynamicParams(); - - applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL); applicationsPage .createApplication() .addApplication( ApplicationForm.DevelopmentMode.FLINK_SQL, ApplicationForm.ExecutionMode.YARN_APPLICATION, - applicationName, - flinkName, - applicationsDynamicParams); + applicationName) + .flinkVersion(flinkName) + .flinkSql(Constants.TEST_FLINK_SQL) + .submit(); Awaitility.await() .untilAsserted( @@ -176,17 +192,15 @@ void testDeleteFlinkApplicationOnYarnApplicationMode() { void testCreateFlinkApplicationOnYarnPerJobMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); - ApplicationsDynamicParams applicationsDynamicParams = new ApplicationsDynamicParams(); - - applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL); applicationsPage .createApplication() .addApplication( ApplicationForm.DevelopmentMode.FLINK_SQL, ApplicationForm.ExecutionMode.YARN_PER_JOB, - applicationName, - flinkName, - applicationsDynamicParams); + applicationName) + .flinkVersion(flinkName) + .flinkSql(Constants.TEST_FLINK_SQL) + .submit(); Awaitility.await() .untilAsserted( @@ -249,4 +263,108 @@ void testDeleteFlinkApplicationOnYarnPerJobMode() { .noneMatch(it -> it.getText().contains(applicationName)); }); } + + @Test + @Order(90) + void testCreateFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage + .createApplication() + .addApplication( + ApplicationForm.DevelopmentMode.FLINK_SQL, + ApplicationForm.ExecutionMode.YARN_SESSION, + applicationName) + .flinkVersion(flinkName) + .flinkSql(Constants.TEST_FLINK_SQL) + .flinkCluster(flinkClusterName) + .submit(); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain newly-created application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains(applicationName))); + } + + @Test + @Order(100) + void testReleaseFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.releaseApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain released application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("SUCCESS"))); + } + + @Test + @Order(110) + void testStartFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain started application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("RUNNING"))); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain finished application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("FINISHED"))); + } + + @Test + @Order(120) + @SneakyThrows + void testRestartAndCancelFlinkApplicationOnYarnSessionMode() { + Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain restarted application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("RUNNING"))); + + applicationsPage.cancelApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain canceled application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("CANCELED"))); + } + + @Test + @Order(130) + void testDeleteFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.deleteApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> { + browser.navigate().refresh(); + + assertThat(applicationsPage.applicationsList()) + .noneMatch(it -> it.getText().contains(applicationName)); + }); + } } diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java index a91da141aa..89367e8c5e 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java @@ -24,7 +24,9 @@ import org.apache.streampark.e2e.pages.flink.FlinkHomePage; import org.apache.streampark.e2e.pages.flink.applications.ApplicationForm; import org.apache.streampark.e2e.pages.flink.applications.ApplicationsPage; -import org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams; +import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm; +import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage; +import org.apache.streampark.e2e.pages.flink.clusters.YarnSessionForm; import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeAll; @@ -34,7 +36,6 @@ import org.openqa.selenium.remote.RemoteWebDriver; import org.testcontainers.shaded.org.awaitility.Awaitility; -import static org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL; import static org.assertj.core.api.Assertions.assertThat; @StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml") @@ -54,6 +55,10 @@ public class FlinkSQL118OnYarnTest { private static final String applicationName = "flink-118-e2e-test"; + private static final String flinkDescription = "description test"; + + private static final String flinkClusterName = "flink_1.18.1_cluster_e2e"; + @BeforeAll public static void setup() { FlinkHomePage flinkHomePage = new LoginPage(browser) @@ -61,9 +66,22 @@ public static void setup() { .goToNav(ApacheFlinkPage.class) .goToTab(FlinkHomePage.class); - flinkHomePage.createFlinkHome(flinkName, flinkHome, ""); + flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription); + + FlinkClustersPage flinkClustersPage = flinkHomePage.goToNav(ApacheFlinkPage.class) + .goToTab(FlinkClustersPage.class); - flinkHomePage.goToNav(ApacheFlinkPage.class).goToTab(ApplicationsPage.class); + flinkClustersPage.createFlinkCluster() + .addCluster(ClusterDetailForm.ExecutionMode.YARN_SESSION) + .resolveOrder(YarnSessionForm.ResolveOrder.PARENT_FIRST) + .clusterName(flinkClusterName) + .flinkVersion(flinkName) + .submit(); + + flinkClustersPage.startFlinkCluster(flinkClusterName); + + flinkClustersPage.goToNav(ApacheFlinkPage.class) + .goToTab(ApplicationsPage.class); } @Test @@ -71,17 +89,15 @@ public static void setup() { void testCreateFlinkApplicationOnYarnApplicationMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); - ApplicationsDynamicParams applicationsDynamicParams = new ApplicationsDynamicParams(); - - applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL); applicationsPage .createApplication() .addApplication( ApplicationForm.DevelopmentMode.FLINK_SQL, ApplicationForm.ExecutionMode.YARN_APPLICATION, - applicationName, - flinkName, - applicationsDynamicParams); + applicationName) + .flinkVersion(flinkName) + .flinkSql(Constants.TEST_FLINK_SQL) + .submit(); Awaitility.await() .untilAsserted( @@ -176,17 +192,15 @@ void testDeleteFlinkApplicationOnYarnApplicationMode() { void testCreateFlinkApplicationOnYarnPerJobMode() { final ApplicationsPage applicationsPage = new ApplicationsPage(browser); - ApplicationsDynamicParams applicationsDynamicParams = new ApplicationsDynamicParams(); - - applicationsDynamicParams.flinkSQL(TEST_FLINK_SQL); applicationsPage .createApplication() .addApplication( ApplicationForm.DevelopmentMode.FLINK_SQL, ApplicationForm.ExecutionMode.YARN_PER_JOB, - applicationName, - flinkName, - applicationsDynamicParams); + applicationName) + .flinkVersion(flinkName) + .flinkSql(Constants.TEST_FLINK_SQL) + .submit(); Awaitility.await() .untilAsserted( @@ -275,4 +289,108 @@ void testDeleteFlinkApplicationOnYarnPerJobMode() { .noneMatch(it -> it.getText().contains(applicationName)); }); } + + @Test + @Order(90) + void testCreateFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage + .createApplication() + .addApplication( + ApplicationForm.DevelopmentMode.FLINK_SQL, + ApplicationForm.ExecutionMode.YARN_SESSION, + applicationName) + .flinkVersion(flinkName) + .flinkSql(Constants.TEST_FLINK_SQL) + .flinkCluster(flinkClusterName) + .submit(); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain newly-created application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains(applicationName))); + } + + @Test + @Order(100) + void testReleaseFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.releaseApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain released application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("SUCCESS"))); + } + + @Test + @Order(110) + void testStartFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain started application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("RUNNING"))); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain finished application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("FINISHED"))); + } + + @Test + @Order(120) + @SneakyThrows + void testRestartAndCancelFlinkApplicationOnYarnSessionMode() { + Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.startApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain restarted application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("RUNNING"))); + + applicationsPage.cancelApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> assertThat(applicationsPage.applicationsList()) + .as("Applications list should contain canceled application") + .extracting(WebElement::getText) + .anyMatch(it -> it.contains("CANCELED"))); + } + + @Test + @Order(130) + void testDeleteFlinkApplicationOnYarnSessionMode() { + final ApplicationsPage applicationsPage = new ApplicationsPage(browser); + + applicationsPage.deleteApplication(applicationName); + + Awaitility.await() + .untilAsserted( + () -> { + browser.navigate().refresh(); + + assertThat(applicationsPage.applicationsList()) + .noneMatch(it -> it.getText().contains(applicationName)); + }); + } } diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java index 7319fc2831..956556cee9 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/common/Constants.java @@ -26,7 +26,39 @@ public class Constants { public static final Integer DEFAULT_SLEEP_MILLISECONDS = 2000; + public static final Integer DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS = 1000; + public static final Integer DEFAULT_PROJECT_BUILD_TIMEOUT_MINUTES = 5; public static final Duration DEFAULT_WEBDRIVER_WAIT_DURATION = Duration.ofSeconds(10); + + public static final String LINE_SEPARATOR = "\n"; + + /** datagen flink sql for test */ + public static final String TEST_FLINK_SQL = "CREATE TABLE datagen (\n" + + "f_sequence INT,\n" + + "f_random INT,\n" + + "f_random_str STRING,\n" + + "ts AS localtimestamp,\n" + + "WATERMARK FOR ts AS ts\n" + + ") WITH (\n" + + "'connector' = 'datagen',\n" + + "'rows-per-second'='5',\n" + + "'fields.f_sequence.kind'='sequence',\n" + + "'fields.f_sequence.start'='1',\n" + + "'fields.f_sequence.end'='100',\n" + + "'fields.f_random.min'='1',\n" + + "'fields.f_random.max'='100',\n" + + "'fields.f_random_str.length'='10'\n" + + ");\n" + + "\n" + + "CREATE TABLE print_table (\n" + + "f_sequence INT,\n" + + "f_random INT,\n" + + "f_random_str STRING\n" + + ") WITH (\n" + + "'connector' = 'print'\n" + + ");\n" + + "\n" + + "INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;"; } diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java index 1f977301fc..eb93e3cb71 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java @@ -18,12 +18,12 @@ package org.apache.streampark.e2e.pages.flink.applications; import org.apache.streampark.e2e.pages.common.Constants; -import org.apache.streampark.e2e.pages.flink.applications.entity.ApplicationsDynamicParams; import lombok.Getter; import lombok.SneakyThrows; import org.openqa.selenium.WebDriver; import org.openqa.selenium.WebElement; +import org.openqa.selenium.interactions.Actions; import org.openqa.selenium.support.FindBy; import org.openqa.selenium.support.FindBys; import org.openqa.selenium.support.PageFactory; @@ -58,6 +58,21 @@ public final class ApplicationForm { @FindBy(id = "form_item_jobName") private WebElement inputApplicationName; + @FindBy(xpath = "//div[contains(@codefield, 'yarnSessionClusterId')]//div[contains(@class, 'ant-select-selector')]") + private WebElement buttonFlinkClusterDropdown; + + @FindBy(className = "ant-select-item-option-content") + private List selectFlinkCluster; + + @FindBy(xpath = "//div[contains(@codefield, 'versionId')]//div[contains(@class, 'ant-select-selector')]") + private WebElement buttonFlinkVersionDropdown; + + @FindBys({ + @FindBy(css = "[codefield=versionId]"), + @FindBy(className = "ant-select-item-option-content") + }) + private List selectFlinkVersion; + @FindBy(xpath = "//button[contains(@class, 'ant-btn')]//span[contains(text(), 'Submit')]") private WebElement buttonSubmit; @@ -71,12 +86,9 @@ public ApplicationForm(WebDriver driver) { } @SneakyThrows - public ApplicationForm addApplication( - DevelopmentMode developmentMode, + public ApplicationForm addApplication(DevelopmentMode developmentMode, ExecutionMode executionMode, - String applicationName, - String flinkVersion, - ApplicationsDynamicParams applicationsDynamicParams) { + String applicationName) { Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION) .until(ExpectedConditions.elementToBeClickable(buttonDevelopmentModeDropdown)); @@ -125,8 +137,7 @@ public ApplicationForm addApplication( String.format("Execution mode not found: %s", executionMode.desc()))) .click(); - new FlinkSQLYarnApplicationForm(driver) - .add(flinkVersion, applicationsDynamicParams.flinkSQL()); + break; case YARN_SESSION: selectExecutionMode.stream() @@ -170,8 +181,6 @@ public ApplicationForm addApplication( String.format("Execution mode not found: %s", executionMode.desc()))) .click(); - new FlinkSQLYarnApplicationForm(driver) - .add(flinkVersion, applicationsDynamicParams.flinkSQL()); break; default: throw new IllegalArgumentException( @@ -194,6 +203,45 @@ public ApplicationForm addApplication( } inputApplicationName.sendKeys(applicationName); + return this; + } + + public ApplicationForm flinkVersion(String flinkVersion) { + new Actions(driver).moveToElement(buttonFlinkVersionDropdown).build().perform(); + buttonFlinkVersionDropdown.click(); + new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION) + .until(ExpectedConditions.visibilityOfAllElements(selectFlinkVersion)); + selectFlinkVersion.stream() + .filter(e -> e.getText().equalsIgnoreCase(flinkVersion)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Flink version not found")) + .click(); + + return this; + } + + public ApplicationForm flinkSql(String flinkSql) { + new FlinkSQLEditor(driver).content(flinkSql); + return this; + } + + @SneakyThrows + public ApplicationForm flinkCluster(String flinkClusterName) { + new Actions(driver).moveToElement(buttonFlinkClusterDropdown).build().perform(); + buttonFlinkClusterDropdown.click(); + Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); + selectFlinkCluster.stream() + .filter(e -> e.getText().contains(flinkClusterName)) + .findFirst() + .orElseThrow( + () -> new IllegalArgumentException( + String.format("Flink cluster not found: %s", flinkClusterName))) + .click(); + + return this; + } + + public ApplicationForm submit() { buttonSubmit.click(); return this; diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java index c409011871..75d5fdd103 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLEditor.java @@ -20,6 +20,10 @@ import org.apache.streampark.e2e.pages.common.Constants; import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.platform.commons.util.StringUtils; +import org.openqa.selenium.Keys; import org.openqa.selenium.WebDriver; import org.openqa.selenium.WebElement; import org.openqa.selenium.interactions.Actions; @@ -28,11 +32,14 @@ import org.openqa.selenium.support.ui.ExpectedConditions; import org.openqa.selenium.support.ui.WebDriverWait; +import java.util.List; + @Getter +@Slf4j public final class FlinkSQLEditor { - @FindBy(xpath = "//label[contains(@for, 'form_item_flinkSql')]/../..//div[contains(@class, 'monaco-editor')]//div[contains(@class, 'view-line')]") - private WebElement flinkSqlEditor; + @FindBy(xpath = "//label[contains(@for, 'form_item_flinkSql')]/../..//div[contains(@class, 'monaco-editor')]//div[contains(@class, 'view-line') and not(contains(@class, 'view-lines'))]") + private List flinkSqlEditor; private WebDriver driver; @@ -41,15 +48,69 @@ public FlinkSQLEditor(WebDriver driver) { this.driver = driver; } + @SneakyThrows public FlinkSQLEditor content(String content) { - new WebDriverWait(this.driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION) - .until(ExpectedConditions.elementToBeClickable(flinkSqlEditor)); - - flinkSqlEditor.click(); + new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION) + .until(ExpectedConditions.elementToBeClickable(flinkSqlEditor.get(0))); Actions actions = new Actions(this.driver); - actions.moveToElement(flinkSqlEditor).sendKeys(content).perform(); + + List contentList = List.of(content.split(Constants.LINE_SEPARATOR)); + + for (int i = 0; i < contentList.size(); i++) { + String editorLineText; + String inputContent = contentList.get(i); + int flinkSqlEditorIndex = Math.min(i, 21); + + if (i == 0) { + actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex)) + .click() + .sendKeys(inputContent) + .sendKeys(Constants.LINE_SEPARATOR) + .perform(); + continue; + } else { + editorLineText = flinkSqlEditor.get(flinkSqlEditorIndex).getText(); + } + + if (StringUtils.isNotBlank(inputContent)) { + if (editorLineText.isEmpty()) { + actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex)) + .click() + .sendKeys(inputContent) + .sendKeys(Constants.LINE_SEPARATOR) + .perform(); + Thread.sleep(Constants.DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS); + } else { + for (int p = 0; p < editorLineText.strip().length(); p++) { + clearLine(actions, flinkSqlEditor.get(flinkSqlEditorIndex)); + } + if (!editorLineText.isEmpty()) { + clearLine(actions, flinkSqlEditor.get(flinkSqlEditorIndex)); + } + actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex)) + .click() + .sendKeys(inputContent) + .sendKeys(Constants.LINE_SEPARATOR) + .perform(); + Thread.sleep(Constants.DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS); + } + } else { + actions.moveToElement(flinkSqlEditor.get(flinkSqlEditorIndex)) + .click() + .sendKeys(Constants.LINE_SEPARATOR) + .perform(); + Thread.sleep(Constants.DEFAULT_FLINK_SQL_EDITOR_SLEEP_MILLISECONDS); + } + } return this; } + + private void clearLine(Actions actions, WebElement element) { + actions.moveToElement(element) + .click() + .sendKeys(Keys.BACK_SPACE) + .perform(); + } } diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLYarnApplicationForm.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLYarnApplicationForm.java deleted file mode 100644 index d418b96883..0000000000 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/FlinkSQLYarnApplicationForm.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.e2e.pages.flink.applications; - -import org.apache.streampark.e2e.pages.common.Constants; - -import lombok.Getter; -import lombok.SneakyThrows; -import org.openqa.selenium.WebDriver; -import org.openqa.selenium.WebElement; -import org.openqa.selenium.support.FindBy; -import org.openqa.selenium.support.FindBys; -import org.openqa.selenium.support.PageFactory; -import org.openqa.selenium.support.ui.ExpectedConditions; -import org.openqa.selenium.support.ui.WebDriverWait; - -import java.util.List; - -@Getter -public final class FlinkSQLYarnApplicationForm { - - private WebDriver driver; - - @FindBy(xpath = "//div[contains(@codefield, 'versionId')]//div[contains(@class, 'ant-select-selector')]") - private WebElement buttonFlinkVersionDropdown; - - @FindBys({ - @FindBy(css = "[codefield=versionId]"), - @FindBy(className = "ant-select-item-option-content") - }) - private List selectFlinkVersion; - - public FlinkSQLYarnApplicationForm(WebDriver driver) { - this.driver = driver; - - PageFactory.initElements(driver, this); - } - - @SneakyThrows - public FlinkSQLYarnApplicationForm add(String flinkVersion, String flinkSql) { - buttonFlinkVersionDropdown.click(); - new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION) - .until(ExpectedConditions.visibilityOfAllElements(selectFlinkVersion)); - selectFlinkVersion.stream() - .filter(e -> e.getText().equalsIgnoreCase(flinkVersion)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("Flink version not found")) - .click(); - - new FlinkSQLEditor(driver).content(flinkSql); - - return this; - } -} diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/entity/ApplicationsDynamicParams.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/entity/ApplicationsDynamicParams.java deleted file mode 100644 index 5dffe13a3d..0000000000 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/entity/ApplicationsDynamicParams.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.e2e.pages.flink.applications.entity; - -import lombok.Data; - -@Data -public class ApplicationsDynamicParams { - - private String flinkSQL; -} diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java index 977f00a638..c5793ecb53 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/clusters/FlinkClustersPage.java @@ -22,6 +22,7 @@ import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage; import lombok.Getter; +import lombok.SneakyThrows; import org.openqa.selenium.By; import org.openqa.selenium.WebElement; import org.openqa.selenium.remote.RemoteWebDriver; @@ -83,9 +84,10 @@ public ClusterDetailForm editFlinkCluster(String flinkClusterName) { return new ClusterDetailForm(driver); } + @SneakyThrows public FlinkClustersPage startFlinkCluster(String flinkClusterName) { waitForPageLoading(); - + Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS); flinkClusterList().stream() .filter(it -> it.getText().contains(flinkClusterName)) .flatMap( diff --git a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config index 04a1ae2392..441fa11187 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config +++ b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.16-on-yarn/docker-compose.config @@ -35,6 +35,7 @@ CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=* CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=* diff --git a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config index 04a1ae2392..441fa11187 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config +++ b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.17-on-yarn/docker-compose.config @@ -35,6 +35,7 @@ CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=* CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=* diff --git a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config index 04a1ae2392..441fa11187 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config +++ b/streampark-e2e/streampark-e2e-case/src/test/resources/docker/flink-1.18-on-yarn/docker-compose.config @@ -35,6 +35,7 @@ CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-am-resource-percent=0.9 CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=* CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=* diff --git a/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java b/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java index e18f3828db..3d57754c2d 100644 --- a/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java +++ b/streampark-e2e/streampark-e2e-core/src/main/java/org/apache/streampark/e2e/core/Constants.java @@ -34,31 +34,4 @@ public final class Constants { /** chrome download path in selenium/standalone-chrome-debug container */ public static final String SELENIUM_CONTAINER_CHROME_DOWNLOAD_PATH = "/home/seluser/Downloads"; - /** datagen flink sql for test */ - public static final String TEST_FLINK_SQL = "CREATE TABLE datagen (\n" - + "f_sequence INT,\n" - + "f_random INT,\n" - + "f_random_str STRING,\n" - + "ts AS localtimestamp,\n" - + "WATERMARK FOR ts AS ts\n" - + ") WITH (\n" - + "'connector' = 'datagen',\n" - + "'rows-per-second'='5',\n" - + "'fields.f_sequence.kind'='sequence',\n" - + "'fields.f_sequence.start'='1',\n" - + "'fields.f_sequence.end'='100',\n" - + "'fields.f_random.min'='1',\n" - + "'fields.f_random.max'='100',\n" - + "'fields.f_random_str.length'='10'\n" - + ");\n" - + "\n" - + "CREATE TABLE print_table (\n" - + "f_sequence INT,\n" - + "f_random INT,\n" - + "f_random_str STRING\n" - + ") WITH (\n" - + "'connector' = 'print'\n" - + ");\n" - + "\n" - + "INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;"; }