Skip to content

Commit

Permalink
[Feature] Introduce flink on remote cluster e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzk1 committed Aug 21, 2024
1 parent 1557113 commit 0fe5db2
Show file tree
Hide file tree
Showing 12 changed files with 713 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ jobs:
strategy:
matrix:
case:
- name: Flink116OnRemoteClusterDeployTest
class: org.apache.streampark.e2e.cases.Flink118OnRemoteClusterDeployTest
- name: Flink116OnRemoteClusterDeployTest
class: org.apache.streampark.e2e.cases.Flink117OnRemoteClusterDeployTest
- name: Flink116OnRemoteClusterDeployTest
class: org.apache.streampark.e2e.cases.Flink116OnRemoteClusterDeployTest
- name: EnvironmentTest
class: org.apache.streampark.e2e.cases.EnvironmentTest
- name: AlarmTest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.e2e.cases;

import org.apache.streampark.e2e.core.StreamPark;
import org.apache.streampark.e2e.pages.LoginPage;
import org.apache.streampark.e2e.pages.common.Constants;
import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
import org.apache.streampark.e2e.pages.flink.clusters.RemoteForm;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import static org.assertj.core.api.Assertions.assertThat;

@StreamPark(composeFiles = "docker/flink-1.16-on-remote/docker-compose.yaml")
public class Flink116OnRemoteClusterDeployTest {

private static RemoteWebDriver browser;

private static final String userName = "admin";

private static final String password = "streampark";

private static final String teamName = "default";

private static final String flinkName = "flink-1.16.3";

private static final String flinkHome = "/opt/flink/";

private static final String flinkDescription = "description test";

private static final String flinkClusterName = "flink_1.16.3_cluster_e2e";

private static final String flinkJobManagerUrl = "http://jobmanager:8081";

private static final ClusterDetailForm.ExecutionMode executionMode = ClusterDetailForm.ExecutionMode.REMOTE;

@BeforeAll
public static void setUp() {
FlinkHomePage flinkHomePage = new LoginPage(browser)
.login(userName, password, teamName)
.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkHomePage.class);

flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);

flinkHomePage.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkClustersPage.class);
}

@Test
@Order(10)
public void testCreateFlinkCluster() {
FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser);

flinkClustersPage.createFlinkCluster()
.<RemoteForm>addCluster(executionMode)
.jobManagerURL(flinkJobManagerUrl)
.clusterName(flinkClusterName)
.flinkVersion(flinkName)
.submit();

Awaitility.await()
.untilAsserted(
() -> assertThat(flinkClustersPage.flinkClusterList())
.as("Flink clusters list should contain newly-created application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains(flinkClusterName)));
}

@Test
@Order(50)
public void testDeleteFlinkCluster() {
final FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser);

flinkClustersPage.deleteFlinkCluster(flinkClusterName);

Awaitility.await()
.untilAsserted(
() -> {
browser.navigate().refresh();
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
assertThat(flinkClustersPage.flinkClusterList())
.noneMatch(it -> it.getText().contains(flinkClusterName));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.e2e.cases;

import org.apache.streampark.e2e.core.StreamPark;
import org.apache.streampark.e2e.pages.LoginPage;
import org.apache.streampark.e2e.pages.common.Constants;
import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
import org.apache.streampark.e2e.pages.flink.clusters.RemoteForm;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import static org.assertj.core.api.Assertions.assertThat;

@StreamPark(composeFiles = "docker/flink-1.17-on-remote/docker-compose.yaml")
public class Flink117OnRemoteClusterDeployTest {

private static RemoteWebDriver browser;

private static final String userName = "admin";

private static final String password = "streampark";

private static final String teamName = "default";

private static final String flinkName = "flink-1.17.2";

private static final String flinkHome = "/opt/flink/";

private static final String flinkDescription = "description test";

private static final String flinkClusterName = "flink_1.17.2_cluster_e2e";

private static final String flinkJobManagerUrl = "http://jobmanager:8081";

private static final ClusterDetailForm.ExecutionMode executionMode = ClusterDetailForm.ExecutionMode.REMOTE;

@BeforeAll
public static void setUp() {
FlinkHomePage flinkHomePage = new LoginPage(browser)
.login(userName, password, teamName)
.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkHomePage.class);

flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);

flinkHomePage.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkClustersPage.class);
}

@Test
@Order(10)
public void testCreateFlinkCluster() {
FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser);

flinkClustersPage.createFlinkCluster()
.<RemoteForm>addCluster(executionMode)
.jobManagerURL(flinkJobManagerUrl)
.clusterName(flinkClusterName)
.flinkVersion(flinkName)
.submit();

Awaitility.await()
.untilAsserted(
() -> assertThat(flinkClustersPage.flinkClusterList())
.as("Flink clusters list should contain newly-created application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains(flinkClusterName)));
}

@Test
@Order(50)
public void testDeleteFlinkCluster() {
final FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser);

flinkClustersPage.deleteFlinkCluster(flinkClusterName);

Awaitility.await()
.untilAsserted(
() -> {
browser.navigate().refresh();
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
assertThat(flinkClustersPage.flinkClusterList())
.noneMatch(it -> it.getText().contains(flinkClusterName));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.e2e.cases;

import org.apache.streampark.e2e.core.StreamPark;
import org.apache.streampark.e2e.pages.LoginPage;
import org.apache.streampark.e2e.pages.common.Constants;
import org.apache.streampark.e2e.pages.flink.ApacheFlinkPage;
import org.apache.streampark.e2e.pages.flink.FlinkHomePage;
import org.apache.streampark.e2e.pages.flink.clusters.ClusterDetailForm;
import org.apache.streampark.e2e.pages.flink.clusters.FlinkClustersPage;
import org.apache.streampark.e2e.pages.flink.clusters.RemoteForm;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import static org.assertj.core.api.Assertions.assertThat;

@StreamPark(composeFiles = "docker/flink-1.18-on-remote/docker-compose.yaml")
public class Flink118OnRemoteClusterDeployTest {

private static RemoteWebDriver browser;

private static final String userName = "admin";

private static final String password = "streampark";

private static final String teamName = "default";

private static final String flinkName = "flink-1.18.1";

private static final String flinkHome = "/opt/flink/";

private static final String flinkDescription = "description test";

private static final String flinkClusterName = "flink_1.18.1_cluster_e2e";

private static final String flinkJobManagerUrl = "http://jobmanager:8081";

private static final ClusterDetailForm.ExecutionMode executionMode = ClusterDetailForm.ExecutionMode.REMOTE;

@BeforeAll
public static void setUp() {
FlinkHomePage flinkHomePage = new LoginPage(browser)
.login(userName, password, teamName)
.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkHomePage.class);

flinkHomePage.createFlinkHome(flinkName, flinkHome, flinkDescription);

flinkHomePage.goToNav(ApacheFlinkPage.class)
.goToTab(FlinkClustersPage.class);
}

@Test
@Order(10)
public void testCreateFlinkCluster() {
FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser);

flinkClustersPage.createFlinkCluster()
.<RemoteForm>addCluster(executionMode)
.jobManagerURL(flinkJobManagerUrl)
.clusterName(flinkClusterName)
.flinkVersion(flinkName)
.submit();

Awaitility.await()
.untilAsserted(
() -> assertThat(flinkClustersPage.flinkClusterList())
.as("Flink clusters list should contain newly-created application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains(flinkClusterName)));
}

@Test
@Order(50)
public void testDeleteFlinkCluster() {
final FlinkClustersPage flinkClustersPage = new FlinkClustersPage(browser);

flinkClustersPage.deleteFlinkCluster(flinkClusterName);

Awaitility.await()
.untilAsserted(
() -> {
browser.navigate().refresh();
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
assertThat(flinkClustersPage.flinkClusterList())
.noneMatch(it -> it.getText().contains(flinkClusterName));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ public ClusterDetailForm(WebDriver driver) {
public <T> T addCluster(ExecutionMode executionMode) {
buttonExecutionModeDropdown.click();
switch (executionMode) {
case REMOTE:
selectExecutionMode.stream()
.filter(e -> e.getText().equalsIgnoreCase(ExecutionMode.REMOTE.desc()))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(
String.format("Execution Mode not found: %s", executionMode.desc())))
.click();
return (T) new RemoteForm(this);
case YARN_SESSION:
selectExecutionMode.stream()
.filter(e -> e.getText().equalsIgnoreCase(ExecutionMode.YARN_SESSION.desc()))
Expand Down
Loading

0 comments on commit 0fe5db2

Please sign in to comment.