Skip to content

Commit

Permalink
[Improve] isLocalProcessing bug fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Nov 30, 2024
1 parent c87c805 commit de0be93
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.streampark.console.base.util;

import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.util.SystemPropertyUtils;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -90,4 +91,9 @@ public static File getAppClientDir() {
public static File getPluginDir() {
return getAppDir(PLUGINS);
}

public static boolean isHaEnable() {
return SystemPropertyUtils.getBoolean("high-availability.enable", false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ private void initConfig() {
}

private void initRegistryService() {
boolean enable = SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
if (enable) {
if (WebUtils.isHaEnable()) {
RegistryService registryService = SpringContextUtils.getBean(RegistryService.class);
registryService.registry();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public interface DistributedTaskService extends IService<DistributedTask> {
* @param appId Long
* @return boolean
*/
public boolean isLocalProcessing(Long appId);
boolean isLocalProcessing(Long appId);

/**
* Save Distributed Task.
Expand All @@ -70,5 +70,5 @@ public interface DistributedTaskService extends IService<DistributedTask> {
* @param autoStart boolean
* @param action It may be one of the following values: START, RESTART, REVOKE, CANCEL, ABORT
*/
public void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action);
void saveDistributedTask(BaseEntity appParam, boolean autoStart, DistributedTaskEnum action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.streampark.console.base.mybatis.entity.BaseEntity;
import org.apache.streampark.console.base.util.ConsistentHash;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.FlinkTaskItem;
import org.apache.streampark.console.core.bean.SparkTaskItem;
import org.apache.streampark.console.core.entity.DistributedTask;
Expand Down Expand Up @@ -199,7 +200,7 @@ public void removeServer(String serverId) {
*/
@Override
public boolean isLocalProcessing(Long appId) {
return consistentHash.get(appId).equals(serverId);
return !WebUtils.isHaEnable() && consistentHash.get(appId).equals(serverId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class RegistryServiceImpl implements RegistryService {
private String zkAddress;
private ZooKeeper zk;
private String nodePath;
private Watcher watcher = event -> {

private final Watcher watcher = event -> {
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged
&& event.getPath().equals(REGISTRY_PATH)) {
handleNodeChanges();
Expand All @@ -72,7 +73,6 @@ public void registry() {
try {
zkAddress = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181");
zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher);

if (zk.exists(REGISTRY_PATH, false) == null) {
zk.create(REGISTRY_PATH, new byte[0], OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ void testStartFlinkApplicationOnYarnPerJobMode() {
@Order(9)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnPerJobMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.startApplication(applicationName);
Expand Down Expand Up @@ -330,13 +329,6 @@ void testStartFlinkApplicationOnYarnSessionMode() {

applicationsPage.startApplication(applicationName);

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList)
.as("Applications list should contain started application")
.extracting(WebElement::getText)
.anyMatch(it -> it.contains("RUNNING")));

Awaitility.await()
.untilAsserted(
() -> assertThat(applicationsPage.applicationsList)
Expand All @@ -349,9 +341,7 @@ void testStartFlinkApplicationOnYarnSessionMode() {
@Order(14)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnSessionMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.startApplication(applicationName);

Awaitility.await()
Expand All @@ -375,9 +365,7 @@ void testRestartAndCancelFlinkApplicationOnYarnSessionMode() {
@Order(15)
void testDeleteFlinkApplicationOnYarnSessionMode() {
final ApplicationsPage applicationsPage = new ApplicationsPage(browser);

applicationsPage.deleteApplication(applicationName);

Awaitility.await()
.untilAsserted(
() -> {
Expand Down

0 comments on commit de0be93

Please sign in to comment.